From 167a3729197514cb851efdd2138fa3a68f385d53 Mon Sep 17 00:00:00 2001 From: Aleksander Mistewicz Date: Thu, 19 Oct 2017 14:35:55 +0200 Subject: [PATCH] Trigger appropriate actions on Drayd in SetState Usage of mutexes is added into tests, as now more goroutines can operate on WorkerList at the same time. SetState changes are updated and enhanced to cover all possible scenarios. Change-Id: I99b00309e5ab937720d5848e3a3279e966c0cbad Signed-off-by: Lukasz Wojciechowski --- workers/worker_list_test.go | 268 ++++++++++++++++++++++++++++++++++++++++---- workers/workers.go | 36 +++++- 2 files changed, 282 insertions(+), 22 deletions(-) diff --git a/workers/worker_list_test.go b/workers/worker_list_test.go index 5ee4606..0ee64a1 100644 --- a/workers/worker_list_test.go +++ b/workers/worker_list_test.go @@ -53,6 +53,8 @@ var _ = Describe("WorkerList", func() { }) compareLists := func() { + wl.mutex.RLock() + defer wl.mutex.RUnlock() // Check if all registeredWorkers are present for _, uuid := range registeredWorkers { _, ok := wl.workers[WorkerUUID(uuid)] @@ -87,13 +89,17 @@ var _ = Describe("WorkerList", func() { err := wl.Register(caps) Expect(err).ToNot(HaveOccurred()) uuid := WorkerUUID(caps[UUID]) + wl.mutex.RLock() + defer wl.mutex.RUnlock() Expect(wl.workers).To(HaveKey(uuid)) Expect(wl.workers[uuid].State).To(Equal(MAINTENANCE)) }) It("should update the caps when called twice for the same worker", func() { var err error + wl.mutex.RLock() Expect(wl.workers).To(BeEmpty()) + wl.mutex.RUnlock() caps := getRandomCaps() By("registering worker") @@ -106,13 +112,17 @@ var _ = Describe("WorkerList", func() { caps["test-key"] = "test-value" err = wl.Register(caps) Expect(err).ToNot(HaveOccurred()) + wl.mutex.RLock() Expect(wl.workers[WorkerUUID(caps[UUID])].Caps).To(Equal(caps)) + wl.mutex.RUnlock() compareLists() }) It("should work when called once", func() { var err error + wl.mutex.RLock() Expect(wl.workers).To(BeEmpty()) + wl.mutex.RUnlock() caps := getRandomCaps() By("registering first worker") @@ -124,7 +134,9 @@ var _ = Describe("WorkerList", func() { It("should work when called twice with different caps", func() { var err error + wl.mutex.RLock() Expect(wl.workers).To(BeEmpty()) + wl.mutex.RUnlock() caps1 := getRandomCaps() caps2 := getRandomCaps() @@ -156,12 +168,16 @@ var _ = Describe("WorkerList", func() { capsUUID := uuid.NewV4().String() err := wl.Register(Capabilities{UUID: capsUUID}) Expect(err).ToNot(HaveOccurred()) + wl.mutex.RLock() Expect(wl.workers).ToNot(BeEmpty()) + wl.mutex.RUnlock() return WorkerUUID(capsUUID) } BeforeEach(func() { + wl.mutex.RLock() Expect(wl.workers).To(BeEmpty()) + wl.mutex.RUnlock() worker = registerWorker() }) @@ -174,18 +190,26 @@ var _ = Describe("WorkerList", func() { It("should work to SetFail", func() { for _, state := range []WorkerState{IDLE, RUN} { + wl.mutex.Lock() wl.workers[worker].State = state + wl.mutex.Unlock() err := wl.SetFail(worker, "") Expect(err).ToNot(HaveOccurred()) + wl.mutex.RLock() Expect(wl.workers[worker].State).To(Equal(FAIL)) + wl.mutex.RUnlock() } }) It("Should fail to SetFail in MAINTENANCE state", func() { + wl.mutex.Lock() Expect(wl.workers[worker].State).To(Equal(MAINTENANCE)) + wl.mutex.Unlock() err := wl.SetFail(worker, "") Expect(err).To(Equal(ErrInMaintenance)) + wl.mutex.RLock() Expect(wl.workers[worker].State).To(Equal(MAINTENANCE)) + wl.mutex.RUnlock() }) }) @@ -199,13 +223,17 @@ var _ = Describe("WorkerList", func() { It("should work to deregister", func() { err := wl.Deregister(worker) Expect(err).ToNot(HaveOccurred()) + wl.mutex.RLock() Expect(wl.workers).ToNot(HaveKey(worker)) + wl.mutex.RUnlock() }) It("should fail to deregister same worker twice", func() { err := wl.Deregister(worker) Expect(err).ToNot(HaveOccurred()) + wl.mutex.RLock() Expect(wl.workers).ToNot(HaveKey(worker)) + wl.mutex.RUnlock() err = wl.Deregister(worker) Expect(err).To(Equal(ErrWorkerNotFound)) @@ -213,10 +241,14 @@ var _ = Describe("WorkerList", func() { It("should fail to deregister worker not in MAINTENANCE state", func() { for _, state := range []WorkerState{IDLE, RUN, FAIL} { + wl.mutex.Lock() wl.workers[worker].State = state + wl.mutex.Unlock() err := wl.Deregister(worker) Expect(err).To(Equal(ErrNotInMaintenance)) + wl.mutex.RLock() Expect(wl.workers).To(HaveKey(worker)) + wl.mutex.RUnlock() } }) }) @@ -228,22 +260,6 @@ var _ = Describe("WorkerList", func() { Expect(err).To(Equal(ErrWorkerNotFound)) }) - It("should work to SetState for valid transitions", func() { - validTransitions := [][]WorkerState{ - {MAINTENANCE, IDLE}, - {IDLE, MAINTENANCE}, - {RUN, MAINTENANCE}, - {FAIL, MAINTENANCE}, - } - for _, transition := range validTransitions { - fromState, toState := transition[0], transition[1] - wl.workers[worker].State = fromState - err := wl.SetState(worker, toState) - Expect(err).ToNot(HaveOccurred()) - Expect(wl.workers[worker].State).To(Equal(toState)) - } - }) - It("should fail to SetState for invalid transitions", func() { invalidTransitions := [][]WorkerState{ {RUN, IDLE}, @@ -251,10 +267,14 @@ var _ = Describe("WorkerList", func() { } for _, transition := range invalidTransitions { fromState, toState := transition[0], transition[1] + wl.mutex.Lock() wl.workers[worker].State = fromState + wl.mutex.Unlock() err := wl.SetState(worker, toState) Expect(err).To(Equal(ErrForbiddenStateChange)) + wl.mutex.RLock() Expect(wl.workers[worker].State).To(Equal(fromState)) + wl.mutex.RUnlock() } }) @@ -269,11 +289,201 @@ var _ = Describe("WorkerList", func() { } for _, transition := range invalidArgument { fromState, toState := transition[0], transition[1] + wl.mutex.Lock() wl.workers[worker].State = fromState + wl.mutex.Unlock() err := wl.SetState(worker, toState) Expect(err).To(Equal(ErrWrongStateArgument)) + wl.mutex.RLock() Expect(wl.workers[worker].State).To(Equal(fromState)) + wl.mutex.RUnlock() + } + }) + Describe("with dryadClientManager mockup", func() { + var ctrl *gomock.Controller + var dcm *MockDryadClientManager + ip := net.IPv4(2, 4, 6, 8) + key := &rsa.PrivateKey{} + testerr := errors.New("Test Error") + var info *mapWorker + noWorker := WorkerUUID("There's no such worker") + putStr := "maintenance" + + eventuallyState := func(info *mapWorker, state WorkerState) { + EventuallyWithOffset(1, func() WorkerState { + wl.mutex.RLock() + defer wl.mutex.RUnlock() + return info.State + }).Should(Equal(state)) + } + eventuallyKey := func(info *mapWorker, key *rsa.PrivateKey) { + EventuallyWithOffset(1, func() *rsa.PrivateKey { + wl.mutex.RLock() + defer wl.mutex.RUnlock() + return info.key + }).Should(Equal(key)) + } + + BeforeEach(func() { + ctrl = gomock.NewController(GinkgoT()) + dcm = NewMockDryadClientManager(ctrl) + wl.newDryadClient = func() dryad.ClientManager { + return dcm + } + + var ok bool + wl.mutex.Lock() + info, ok = wl.workers[worker] + Expect(ok).To(BeTrue()) + Expect(info.key).To(BeNil()) + info.ip = ip + wl.mutex.Unlock() + }) + AfterEach(func() { + ctrl.Finish() + }) + + Describe("from MAINTENANCE to IDLE", func() { + BeforeEach(func() { + wl.mutex.Lock() + info.State = MAINTENANCE + wl.mutex.Unlock() + }) + + It("should work to SetState", func() { + gomock.InOrder( + dcm.EXPECT().Create(ip, conf.DefaultRPCPort), + dcm.EXPECT().Prepare().Return(key, nil), + dcm.EXPECT().Close(), + ) + + err := wl.SetState(worker, IDLE) + Expect(err).ToNot(HaveOccurred()) + eventuallyState(info, IDLE) + eventuallyKey(info, key) + }) + + It("should fail to SetState if dryadClientManager fails to prepare client", func() { + gomock.InOrder( + dcm.EXPECT().Create(ip, conf.DefaultRPCPort), + dcm.EXPECT().Prepare().Return(nil, testerr), + dcm.EXPECT().Close(), + ) + + err := wl.SetState(worker, IDLE) + Expect(err).ToNot(HaveOccurred()) + eventuallyState(info, FAIL) + Expect(info.key).To(BeNil()) + }) + + It("should fail to SetState if dryadClientManager fails to create client", func() { + dcm.EXPECT().Create(ip, conf.DefaultRPCPort).Return(testerr) + + err := wl.SetState(worker, IDLE) + Expect(err).ToNot(HaveOccurred()) + eventuallyState(info, FAIL) + Expect(info.key).To(BeNil()) + }) + }) + + trigger := make(chan int, 1) + + setTrigger := func(val int) { + trigger <- val + } + eventuallyTrigger := func(val int) { + EventuallyWithOffset(1, trigger).Should(Receive(Equal(val))) + } + + fromStates := []WorkerState{IDLE, RUN, FAIL} + for _, from := range fromStates { + Describe("from "+string(from)+" to MAINTENANCE", func() { + BeforeEach(func() { + wl.mutex.Lock() + info.State = from + wl.mutex.Unlock() + }) + + It("should work to SetState", func() { + gomock.InOrder( + dcm.EXPECT().Create(ip, conf.DefaultRPCPort), + dcm.EXPECT().PutInMaintenance(putStr), + dcm.EXPECT().Close(), + ) + + err := wl.SetState(worker, MAINTENANCE) + Expect(err).ToNot(HaveOccurred()) + eventuallyState(info, MAINTENANCE) + }) + + It("should fail to SetState if dryadClientManager fails to put dryad in maintenance state", func() { + gomock.InOrder( + dcm.EXPECT().Create(ip, conf.DefaultRPCPort), + dcm.EXPECT().PutInMaintenance(putStr).Return(testerr), + dcm.EXPECT().Close().Do(func() { + wl.mutex.Lock() + info.State = WorkerState("TEST") + wl.mutex.Unlock() + setTrigger(1) + }), + ) + + err := wl.SetState(worker, MAINTENANCE) + Expect(err).ToNot(HaveOccurred()) + eventuallyTrigger(1) + eventuallyState(info, FAIL) + }) + + It("should fail to SetState if dryadClientManager fails to create client", func() { + dcm.EXPECT().Create(ip, conf.DefaultRPCPort).Return(testerr).Do(func(net.IP, int) { + wl.mutex.Lock() + info.State = WorkerState("TEST") + wl.mutex.Unlock() + setTrigger(2) + }) + + err := wl.SetState(worker, MAINTENANCE) + Expect(err).ToNot(HaveOccurred()) + eventuallyTrigger(2) + eventuallyState(info, FAIL) + }) + }) } + Describe("putInMaintenance", func() { + It("should work", func() { + gomock.InOrder( + dcm.EXPECT().Create(ip, conf.DefaultRPCPort), + dcm.EXPECT().PutInMaintenance(putStr), + dcm.EXPECT().Close(), + ) + + err := wl.putInMaintenance(worker) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should fail if dryadClientManager fails to put dryad in maintenance state", func() { + gomock.InOrder( + dcm.EXPECT().Create(ip, conf.DefaultRPCPort), + dcm.EXPECT().PutInMaintenance(putStr).Return(testerr), + dcm.EXPECT().Close(), + ) + + err := wl.putInMaintenance(worker) + Expect(err).To(Equal(testerr)) + }) + + It("should fail if dryadClientManager fails to create client", func() { + dcm.EXPECT().Create(ip, conf.DefaultRPCPort).Return(testerr) + + err := wl.putInMaintenance(worker) + Expect(err).To(Equal(testerr)) + }) + + It("should fail if worker is not registered", func() { + err := wl.putInMaintenance(noWorker) + Expect(err).To(Equal(ErrWorkerNotFound)) + }) + }) }) }) @@ -292,12 +502,16 @@ var _ = Describe("WorkerList", func() { By("setting it") err := wl.SetGroups(worker, group) Expect(err).ToNot(HaveOccurred()) + wl.mutex.RLock() Expect(wl.workers[worker].Groups).To(Equal(group)) + wl.mutex.RUnlock() By("setting it to nil") err = wl.SetGroups(worker, nil) Expect(err).ToNot(HaveOccurred()) + wl.mutex.RLock() Expect(wl.workers[worker].Groups).To(BeNil()) + wl.mutex.RUnlock() }) }) @@ -314,13 +528,19 @@ var _ = Describe("WorkerList", func() { err = wl.SetGroups(workerID, groups) Expect(err).ToNot(HaveOccurred()) - return wl.workers[workerID].WorkerInfo + wl.mutex.RLock() + info := wl.workers[workerID].WorkerInfo + wl.mutex.RUnlock() + + return info } BeforeEach(func() { refWorkerList = make([]WorkerInfo, 1) // Add worker with minimal caps and empty groups. + wl.mutex.RLock() refWorkerList[0] = wl.workers[worker].WorkerInfo + wl.mutex.RUnlock() // Add worker with both groups and caps declared. refWorkerList = append(refWorkerList, registerAndSetGroups( Groups{"all", "small_1", "small_2"}, @@ -458,7 +678,9 @@ var _ = Describe("WorkerList", func() { It("should work to GetWorkerInfo", func() { workerInfo, err := wl.GetWorkerInfo(worker) Expect(err).ToNot(HaveOccurred()) + wl.mutex.RLock() Expect(workerInfo).To(Equal(wl.workers[worker].WorkerInfo)) + wl.mutex.RUnlock() }) }) @@ -540,15 +762,15 @@ var _ = Describe("WorkerList", func() { eventuallyKey := func(info *mapWorker, key *rsa.PrivateKey) { EventuallyWithOffset(1, func() *rsa.PrivateKey { - wl.mutex.Lock() - defer wl.mutex.Unlock() + wl.mutex.RLock() + defer wl.mutex.RUnlock() return info.key }).Should(Equal(key)) } eventuallyState := func(info *mapWorker, state WorkerState) { EventuallyWithOffset(1, func() WorkerState { - wl.mutex.Lock() - defer wl.mutex.Unlock() + wl.mutex.RLock() + defer wl.mutex.RUnlock() return info.State }).Should(Equal(state)) } @@ -567,7 +789,9 @@ var _ = Describe("WorkerList", func() { It("should set worker into IDLE in without-key preparation", func() { err := wl.PrepareWorker(worker, false) Expect(err).NotTo(HaveOccurred()) + wl.mutex.RLock() info, ok := wl.workers[worker] + wl.mutex.RUnlock() Expect(ok).To(BeTrue()) Expect(info.State).To(Equal(IDLE)) }) @@ -634,7 +858,9 @@ var _ = Describe("WorkerList", func() { caps[UUID] = capsUUID wl.Register(caps) + wl.mutex.RLock() w, ok := wl.workers[workerUUID] + wl.mutex.RUnlock() Expect(ok).To(BeTrue()) Expect(w.State).To(Equal(MAINTENANCE)) diff --git a/workers/workers.go b/workers/workers.go index 9efb6c1..b6df880 100644 --- a/workers/workers.go +++ b/workers/workers.go @@ -131,7 +131,12 @@ func (wl *WorkerList) SetState(uuid WorkerUUID, state WorkerState) error { if state == IDLE && worker.State != MAINTENANCE { return ErrForbiddenStateChange } - worker.State = state + switch state { + case IDLE: + go wl.prepareKeyAndSetState(uuid) + case MAINTENANCE: + go wl.putInMaintenanceWorker(uuid) + } return nil } @@ -371,6 +376,20 @@ func (wl *WorkerList) prepareKeyAndSetState(worker WorkerUUID) { wl.setState(worker, IDLE) } +// putInMaintenanceWorker puts Dryad into maintenance mode and sets worker +// into MAINTENANCE state in case of success. In case of failure of entering +// maintenance mode, worker is put into FAIL state instead. +func (wl *WorkerList) putInMaintenanceWorker(worker WorkerUUID) { + err := wl.putInMaintenance(worker) + wl.mutex.Lock() + defer wl.mutex.Unlock() + if err != nil { + wl.setState(worker, FAIL) + return + } + wl.setState(worker, MAINTENANCE) +} + // setState changes state of worker. It does not contain any verification if change // is feasible. It should be used only for internal boruta purposes. It must be // called inside WorkerList critical section guarded by WorkerList.mutex. @@ -404,6 +423,21 @@ func (wl *WorkerList) prepareKey(worker WorkerUUID) error { return err } +// putInMaintenance orders Dryad to enter maintenance mode. +func (wl *WorkerList) putInMaintenance(worker WorkerUUID) error { + ip, err := wl.GetWorkerIP(worker) + if err != nil { + return err + } + client := wl.newDryadClient() + err = client.Create(ip, conf.DefaultRPCPort) + if err != nil { + return err + } + defer client.Close() + return client.PutInMaintenance("maintenance") +} + // SetChangeListener sets change listener object in WorkerList. Listener should be // notified in case of changes of workers' states, when worker becomes IDLE // or must break its job because of fail or maintenance. -- 2.7.4