From 70f4a1bb1750c17c5b3691c22a621291b32fa067 Mon Sep 17 00:00:00 2001 From: Lukasz Wojciechowski Date: Tue, 11 Sep 2018 14:53:05 +0200 Subject: [PATCH] Fix workers state changing behaviour Changing state to IDLE (with key generation) or to MAINTENANCE is a time consuming operation. That's why new workers' states are introduced: * PREPARE - when entering IDLE; * BUSY - when entering MAINTENANCE. Without them, state of workers can be changed during these long operations as another method call can change the worker state or initiate another long operations breaking actual state of workers. To fix this issue not only new states are introduced, but also an additional goroutine for every registered worker is started. The goroutine is responsible for running long operations in background. It is controlled with a channel through which new worker state is sent every time setState is called. The goroutine breaks execution of current long operation if new state is set for worker. As new states appear some changes in workers' methods behaviour needs to be changed: * SetFail returns ErrInMaintenance also for BUSY state (formerly for MAINTENANCE only). * SetState ignores attempts to set MAINTENANCE when worker is BUSY and to set IDLE when worker is in PREPARE because requested state changes are already in progress. * PrepareWorker is allowed to be run only for workers in RUN state. This method is used in two cases: when Job is finished or when matching of reserved worker and request fails to create the Job. In both cases worker should be bring to IDLE state only if if it was in RUN state. If it was in MAINTENANCE or FAILED it should be kept that way. * OnWorkerFail is sent when worker's Job was interrupted (if it was in RUN state, and enters any other than IDLE or PREPARE states). Naming of methods parameters has been unified. All methods running for a boruta.WorkerUUID use uuid variable and a worker variable is used for mapWorker structure. Change-Id: I8d1d8b55e48e432746ebf819650a77698b8c27ab Signed-off-by: Lukasz Wojciechowski --- boruta.go | 4 + workers/error.go | 4 + workers/worker_list_test.go | 743 +++++++++++++++++++++++++++++++++++++------- workers/workers.go | 288 ++++++++++++----- 4 files changed, 851 insertions(+), 188 deletions(-) diff --git a/boruta.go b/boruta.go index 6689693..220398a 100644 --- a/boruta.go +++ b/boruta.go @@ -66,6 +66,10 @@ const ( RUN WorkerState = "RUNNING" // FAIL - An error occured, reported by the Worker itself or the Server. FAIL WorkerState = "FAILED" + // PREPARE - Worker is being prepared to enter IDLE state. + PREPARE WorkerState = "PREPARE" + // BUSY - Worker is being prepared to enter MAINTENANCE state. + BUSY WorkerState = "BUSY" ) // Capabilities describe the features provided by the Worker and required by the Request. diff --git a/workers/error.go b/workers/error.go index 3076e40..e083c57 100644 --- a/workers/error.go +++ b/workers/error.go @@ -50,4 +50,8 @@ var ( // ErrMissingPort is returned when Register is called with either dryad or sshd // address missing Port value. ErrMissingPort = errors.New("Port is missing from address") + // ErrWorkerBusy is returned when worker is preparing to enter IDLE or MAINTENANCE state + // which requires time consuming operations to be run on Dryad. During this preparations + // Worker is blocked and cannot change state. + ErrWorkerBusy = errors.New("worker is busy") ) diff --git a/workers/worker_list_test.go b/workers/worker_list_test.go index f8d1535..7e97f59 100644 --- a/workers/worker_list_test.go +++ b/workers/worker_list_test.go @@ -24,6 +24,7 @@ import ( "errors" "fmt" "net" + "sync" "git.tizen.org/tools/boruta" "git.tizen.org/tools/boruta/rpc/dryad" @@ -132,6 +133,7 @@ var _ = Describe("WorkerList", func() { defer wl.mutex.RUnlock() Expect(wl.workers).To(HaveKey(uuid)) Expect(wl.workers[uuid].State).To(Equal(boruta.MAINTENANCE)) + Expect(wl.workers[uuid].backgroundOperation).NotTo(BeNil()) }) It("should update the caps when called twice for the same worker", func() { @@ -220,6 +222,10 @@ var _ = Describe("WorkerList", func() { worker = registerWorker() }) + AfterEach(func() { + wl.Deregister(worker) + }) + Describe("SetFail", func() { It("should fail to SetFail of nonexistent worker", func() { uuid := randomUUID() @@ -228,7 +234,7 @@ var _ = Describe("WorkerList", func() { }) It("should work to SetFail", func() { - for _, state := range []boruta.WorkerState{boruta.IDLE, boruta.RUN} { + for _, state := range []boruta.WorkerState{boruta.IDLE, boruta.RUN, boruta.PREPARE} { wl.mutex.Lock() wl.workers[worker].State = state wl.mutex.Unlock() @@ -240,15 +246,17 @@ var _ = Describe("WorkerList", func() { } }) - It("Should fail to SetFail in MAINTENANCE state", func() { - wl.mutex.Lock() - Expect(wl.workers[worker].State).To(Equal(boruta.MAINTENANCE)) - wl.mutex.Unlock() - err := wl.SetFail(worker, "") - Expect(err).To(Equal(ErrInMaintenance)) - wl.mutex.RLock() - Expect(wl.workers[worker].State).To(Equal(boruta.MAINTENANCE)) - wl.mutex.RUnlock() + It("Should fail to SetFail in MAINTENANCE or BUSY state", func() { + for _, state := range []boruta.WorkerState{boruta.MAINTENANCE, boruta.BUSY} { + wl.mutex.Lock() + wl.workers[worker].State = state + wl.mutex.Unlock() + err := wl.SetFail(worker, "") + Expect(err).To(Equal(ErrInMaintenance)) + wl.mutex.RLock() + Expect(wl.workers[worker].State).To(Equal(state)) + wl.mutex.RUnlock() + } }) }) @@ -279,7 +287,7 @@ var _ = Describe("WorkerList", func() { }) It("should fail to deregister worker not in MAINTENANCE state", func() { - for _, state := range []boruta.WorkerState{boruta.IDLE, boruta.RUN, boruta.FAIL} { + for _, state := range []boruta.WorkerState{boruta.IDLE, boruta.RUN, boruta.FAIL, boruta.PREPARE, boruta.BUSY} { wl.mutex.Lock() wl.workers[worker].State = state wl.mutex.Unlock() @@ -292,94 +300,129 @@ var _ = Describe("WorkerList", func() { }) }) - Describe("SetState", func() { - It("should fail to SetState of nonexistent worker", func() { - uuid := randomUUID() - err := wl.SetState(uuid, boruta.MAINTENANCE) - Expect(err).To(Equal(ErrWorkerNotFound)) - }) + Describe("with dryadClientManager mockup", func() { + var ctrl *gomock.Controller + var dcm *MockDryadClientManager + ip := net.IPv4(2, 4, 6, 8) + testerr := errors.New("Test Error") + var info *mapWorker + noWorker := boruta.WorkerUUID("There's no such worker") + putStr := "maintenance" - It("should fail to SetState for invalid transitions", func() { - invalidTransitions := [][]boruta.WorkerState{ - {boruta.RUN, boruta.IDLE}, - {boruta.FAIL, boruta.IDLE}, - } - for _, transition := range invalidTransitions { - fromState, toState := transition[0], transition[1] - wl.mutex.Lock() - wl.workers[worker].State = fromState - wl.mutex.Unlock() - err := wl.SetState(worker, toState) - Expect(err).To(Equal(ErrForbiddenStateChange)) + eventuallyState := func(info *mapWorker, state boruta.WorkerState) { + EventuallyWithOffset(1, func() boruta.WorkerState { wl.mutex.RLock() - Expect(wl.workers[worker].State).To(Equal(fromState)) - wl.mutex.RUnlock() + defer wl.mutex.RUnlock() + return info.State + }).Should(Equal(state)) + } + eventuallyKey := func(info *mapWorker, match types.GomegaMatcher) { + EventuallyWithOffset(1, func() *rsa.PrivateKey { + wl.mutex.RLock() + defer wl.mutex.RUnlock() + return info.key + }).Should(match) + } + + BeforeEach(func() { + ctrl = gomock.NewController(GinkgoT()) + dcm = NewMockDryadClientManager(ctrl) + wl.newDryadClient = func() dryad.ClientManager { + return dcm } + + var ok bool + wl.mutex.Lock() + info, ok = wl.workers[worker] + Expect(ok).To(BeTrue()) + Expect(info.key).To(BeNil()) + info.dryad = new(net.TCPAddr) + info.dryad.IP = ip + wl.mutex.Unlock() }) - It("should fail to SetState for incorrect state argument", func() { - invalidArgument := [][]boruta.WorkerState{ - {boruta.MAINTENANCE, boruta.RUN}, - {boruta.MAINTENANCE, boruta.FAIL}, - {boruta.IDLE, boruta.FAIL}, - {boruta.IDLE, boruta.RUN}, - {boruta.RUN, boruta.FAIL}, - {boruta.FAIL, boruta.RUN}, - } - for _, transition := range invalidArgument { - fromState, toState := transition[0], transition[1] - wl.mutex.Lock() - wl.workers[worker].State = fromState - wl.mutex.Unlock() - err := wl.SetState(worker, toState) - Expect(err).To(Equal(ErrWrongStateArgument)) - wl.mutex.RLock() - Expect(wl.workers[worker].State).To(Equal(fromState)) - wl.mutex.RUnlock() - } + AfterEach(func() { + ctrl.Finish() }) - Describe("with dryadClientManager mockup", func() { - var ctrl *gomock.Controller - var dcm *MockDryadClientManager - ip := net.IPv4(2, 4, 6, 8) - testerr := errors.New("Test Error") - var info *mapWorker - noWorker := boruta.WorkerUUID("There's no such worker") - putStr := "maintenance" - eventuallyState := func(info *mapWorker, state boruta.WorkerState) { - EventuallyWithOffset(1, func() boruta.WorkerState { - wl.mutex.RLock() - defer wl.mutex.RUnlock() - return info.State - }).Should(Equal(state)) - } - eventuallyKey := func(info *mapWorker, match types.GomegaMatcher) { - EventuallyWithOffset(1, func() *rsa.PrivateKey { - wl.mutex.RLock() - defer wl.mutex.RUnlock() - return info.key - }).Should(match) - } + Describe("SetState", func() { + It("should fail to SetState of nonexistent worker", func() { + uuid := randomUUID() + err := wl.SetState(uuid, boruta.MAINTENANCE) + Expect(err).To(Equal(ErrWorkerNotFound)) + }) - BeforeEach(func() { - ctrl = gomock.NewController(GinkgoT()) - dcm = NewMockDryadClientManager(ctrl) - wl.newDryadClient = func() dryad.ClientManager { - return dcm + It("should fail to SetState for invalid transitions", func() { + invalidTransitions := [][]boruta.WorkerState{ + {boruta.RUN, boruta.IDLE}, + {boruta.FAIL, boruta.IDLE}, + {boruta.BUSY, boruta.IDLE}, } + for _, transition := range invalidTransitions { + fromState, toState := transition[0], transition[1] + wl.mutex.Lock() + wl.workers[worker].State = fromState + wl.mutex.Unlock() + err := wl.SetState(worker, toState) + Expect(err).To(Equal(ErrForbiddenStateChange)) + wl.mutex.RLock() + Expect(wl.workers[worker].State).To(Equal(fromState)) + wl.mutex.RUnlock() + } + }) - var ok bool - wl.mutex.Lock() - info, ok = wl.workers[worker] - Expect(ok).To(BeTrue()) - Expect(info.key).To(BeNil()) - info.dryad = new(net.TCPAddr) - info.dryad.IP = ip - wl.mutex.Unlock() + It("should ignore SetState if transition is already ongoing", func() { + invalidTransitions := [][]boruta.WorkerState{ + {boruta.PREPARE, boruta.IDLE}, + {boruta.BUSY, boruta.MAINTENANCE}, + } + for _, transition := range invalidTransitions { + fromState, toState := transition[0], transition[1] + wl.mutex.Lock() + wl.workers[worker].State = fromState + wl.mutex.Unlock() + err := wl.SetState(worker, toState) + Expect(err).NotTo(HaveOccurred()) + wl.mutex.RLock() + Expect(wl.workers[worker].State).To(Equal(fromState)) + wl.mutex.RUnlock() + } }) - AfterEach(func() { - ctrl.Finish() + + It("should fail to SetState for incorrect state argument", func() { + invalidArgument := [][]boruta.WorkerState{ + {boruta.MAINTENANCE, boruta.RUN}, + {boruta.MAINTENANCE, boruta.FAIL}, + {boruta.MAINTENANCE, boruta.PREPARE}, + {boruta.MAINTENANCE, boruta.BUSY}, + {boruta.IDLE, boruta.FAIL}, + {boruta.IDLE, boruta.RUN}, + {boruta.IDLE, boruta.PREPARE}, + {boruta.IDLE, boruta.BUSY}, + {boruta.RUN, boruta.FAIL}, + {boruta.RUN, boruta.PREPARE}, + {boruta.RUN, boruta.BUSY}, + {boruta.FAIL, boruta.RUN}, + {boruta.FAIL, boruta.PREPARE}, + {boruta.FAIL, boruta.BUSY}, + {boruta.PREPARE, boruta.FAIL}, + {boruta.PREPARE, boruta.RUN}, + {boruta.PREPARE, boruta.BUSY}, + {boruta.BUSY, boruta.FAIL}, + {boruta.BUSY, boruta.RUN}, + {boruta.BUSY, boruta.PREPARE}, + } + for _, transition := range invalidArgument { + fromState, toState := transition[0], transition[1] + wl.mutex.Lock() + wl.workers[worker].State = fromState + wl.mutex.Unlock() + err := wl.SetState(worker, toState) + Expect(err).To(Equal(ErrWrongStateArgument)) + wl.mutex.RLock() + Expect(wl.workers[worker].State).To(Equal(fromState)) + wl.mutex.RUnlock() + } }) Describe("from MAINTENANCE to IDLE", func() { @@ -393,7 +436,11 @@ var _ = Describe("WorkerList", func() { gomock.InOrder( dcm.EXPECT().Create(info.dryad), dcm.EXPECT().Prepare(gomock.Any()).Return(nil), - dcm.EXPECT().Close(), + dcm.EXPECT().Close().Do(func() { + wl.mutex.Lock() + Expect(info.State).To(Equal(boruta.PREPARE)) + wl.mutex.Unlock() + }), ) err := wl.SetState(worker, boruta.IDLE) @@ -406,7 +453,11 @@ var _ = Describe("WorkerList", func() { gomock.InOrder( dcm.EXPECT().Create(info.dryad), dcm.EXPECT().Prepare(gomock.Any()).Return(testerr), - dcm.EXPECT().Close(), + dcm.EXPECT().Close().Do(func() { + wl.mutex.Lock() + Expect(info.State).To(Equal(boruta.PREPARE)) + wl.mutex.Unlock() + }), ) err := wl.SetState(worker, boruta.IDLE) @@ -416,7 +467,12 @@ var _ = Describe("WorkerList", func() { }) It("should fail to SetState if dryadClientManager fails to create client", func() { - dcm.EXPECT().Create(info.dryad).Return(testerr) + dcm.EXPECT().Create(info.dryad).DoAndReturn(func(*net.TCPAddr) error { + wl.mutex.Lock() + Expect(info.State).To(Equal(boruta.PREPARE)) + wl.mutex.Unlock() + return testerr + }) err := wl.SetState(worker, boruta.IDLE) Expect(err).ToNot(HaveOccurred()) @@ -434,7 +490,7 @@ var _ = Describe("WorkerList", func() { EventuallyWithOffset(1, trigger).Should(Receive(Equal(val))) } - fromStates := []boruta.WorkerState{boruta.IDLE, boruta.RUN, boruta.FAIL} + fromStates := []boruta.WorkerState{boruta.IDLE, boruta.RUN, boruta.FAIL, boruta.PREPARE} for _, from := range fromStates { Describe("from "+string(from)+" to MAINTENANCE", func() { BeforeEach(func() { @@ -447,7 +503,11 @@ var _ = Describe("WorkerList", func() { gomock.InOrder( dcm.EXPECT().Create(info.dryad), dcm.EXPECT().PutInMaintenance(putStr), - dcm.EXPECT().Close(), + dcm.EXPECT().Close().Do(func() { + wl.mutex.Lock() + Expect(info.State).To(Equal(boruta.BUSY)) + wl.mutex.Unlock() + }), ) err := wl.SetState(worker, boruta.MAINTENANCE) @@ -461,7 +521,7 @@ var _ = Describe("WorkerList", func() { dcm.EXPECT().PutInMaintenance(putStr).Return(testerr), dcm.EXPECT().Close().Do(func() { wl.mutex.Lock() - info.State = boruta.WorkerState("TEST") + Expect(info.State).To(Equal(boruta.BUSY)) wl.mutex.Unlock() setTrigger(1) }), @@ -476,7 +536,7 @@ var _ = Describe("WorkerList", func() { It("should fail to SetState if dryadClientManager fails to create client", func() { dcm.EXPECT().Create(info.dryad).Return(testerr).Do(func(*net.TCPAddr) { wl.mutex.Lock() - info.State = boruta.WorkerState("TEST") + Expect(info.State).To(Equal(boruta.BUSY)) wl.mutex.Unlock() setTrigger(2) }) @@ -488,6 +548,33 @@ var _ = Describe("WorkerList", func() { }) }) } + }) + Describe("withBackgroundContext", func() { + var bc, noWorkerBc backgroundContext + var c chan boruta.WorkerState + var testState boruta.WorkerState = boruta.RUN + var opEmpty = pendingOperation{} + var opClosed = pendingOperation{got: true} + var opState = pendingOperation{got: true, open: true, state: testState} + + BeforeEach(func() { + c = make(chan boruta.WorkerState, backgroundOperationsBufferSize) + bc = backgroundContext{ + c: c, + uuid: worker, + } + noWorkerBc = backgroundContext{ + c: c, + uuid: noWorker, + } + }) + + AfterEach(func() { + if c != nil { + close(c) + } + }) + Describe("putInMaintenance", func() { It("should work", func() { gomock.InOrder( @@ -496,8 +583,9 @@ var _ = Describe("WorkerList", func() { dcm.EXPECT().Close(), ) - err := wl.putInMaintenance(worker) + op, err := wl.putInMaintenance(bc) Expect(err).ToNot(HaveOccurred()) + Expect(op).To(Equal(opEmpty)) }) It("should fail if dryadClientManager fails to put dryad in maintenance state", func() { @@ -507,21 +595,340 @@ var _ = Describe("WorkerList", func() { dcm.EXPECT().Close(), ) - err := wl.putInMaintenance(worker) + op, err := wl.putInMaintenance(bc) Expect(err).To(Equal(testerr)) + Expect(op).To(Equal(opEmpty)) }) It("should fail if dryadClientManager fails to create client", func() { dcm.EXPECT().Create(info.dryad).Return(testerr) - err := wl.putInMaintenance(worker) + op, err := wl.putInMaintenance(bc) Expect(err).To(Equal(testerr)) + Expect(op).To(Equal(opEmpty)) }) It("should fail if worker is not registered", func() { - err := wl.putInMaintenance(noWorker) + op, err := wl.putInMaintenance(noWorkerBc) Expect(err).To(Equal(ErrWorkerNotFound)) + Expect(op).To(Equal(opEmpty)) + }) + + It("should break execution when channel is used before execution", func() { + c <- testState + + op, err := wl.putInMaintenance(bc) + Expect(err).ToNot(HaveOccurred()) + Expect(op).To(Equal(opState)) + }) + + It("should break execution when channel is used during execution", func() { + gomock.InOrder( + dcm.EXPECT().Create(info.dryad).Do(func(*net.TCPAddr) { + c <- testState + }), + dcm.EXPECT().Close(), + ) + + op, err := wl.putInMaintenance(bc) + Expect(err).ToNot(HaveOccurred()) + Expect(op).To(Equal(opState)) + }) + + It("should break execution when channel is closed before execution", func() { + close(c) + c = nil + + op, err := wl.putInMaintenance(bc) + Expect(err).ToNot(HaveOccurred()) + Expect(op).To(Equal(opClosed)) + }) + + It("should break execution when channel is closed during execution", func() { + gomock.InOrder( + dcm.EXPECT().Create(info.dryad).Do(func(*net.TCPAddr) { + close(c) + c = nil + }), + dcm.EXPECT().Close(), + ) + + op, err := wl.putInMaintenance(bc) + Expect(err).ToNot(HaveOccurred()) + Expect(op).To(Equal(opClosed)) + }) + }) + + Describe("prepareKeyAndSetState", func() { + It("should ignore if worker is not registered", func() { + op := wl.prepareKeyAndSetState(noWorkerBc) + Expect(op).To(Equal(opEmpty)) + }) + + It("should return immediately if new operation is pending on channel", func() { + c <- testState + op := wl.prepareKeyAndSetState(bc) + Expect(op).To(Equal(opState)) + }) + + It("should return immediately if channel is closed", func() { + close(c) + c = nil + op := wl.prepareKeyAndSetState(bc) + Expect(op).To(Equal(opClosed)) }) + + It("should return if new operation appeared on channel after prepareKey is completed", func() { + gomock.InOrder( + dcm.EXPECT().Create(info.dryad), + dcm.EXPECT().Prepare(gomock.Any()), + dcm.EXPECT().Close().Do(func() { + c <- testState + }), + ) + + op := wl.prepareKeyAndSetState(bc) + Expect(op).To(Equal(opState)) + }) + + It("should return if channel is closed after prepareKey is completed", func() { + gomock.InOrder( + dcm.EXPECT().Create(info.dryad), + dcm.EXPECT().Prepare(gomock.Any()), + dcm.EXPECT().Close().Do(func() { + close(c) + c = nil + }), + ) + + op := wl.prepareKeyAndSetState(bc) + Expect(op).To(Equal(opClosed)) + }) + }) + + Describe("putInMaintenanceWorker", func() { + It("should ignore if worker is not registered", func() { + op := wl.putInMaintenanceWorker(noWorkerBc) + Expect(op).To(Equal(opEmpty)) + }) + + It("should return immediately if new operation is pending on channel", func() { + c <- testState + op := wl.putInMaintenanceWorker(bc) + Expect(op).To(Equal(opState)) + }) + + It("should return immediately if channel is closed", func() { + close(c) + c = nil + op := wl.putInMaintenanceWorker(bc) + Expect(op).To(Equal(opClosed)) + }) + + It("should return if new operation appeared on channel after putInMaintenance is completed", func() { + gomock.InOrder( + dcm.EXPECT().Create(info.dryad), + dcm.EXPECT().PutInMaintenance(putStr), + dcm.EXPECT().Close().Do(func() { + c <- testState + }), + ) + + op := wl.putInMaintenanceWorker(bc) + Expect(op).To(Equal(opState)) + }) + + It("should return if channel is closed after putInMaintenance is completed", func() { + gomock.InOrder( + dcm.EXPECT().Create(info.dryad), + dcm.EXPECT().PutInMaintenance(putStr), + dcm.EXPECT().Close().Do(func() { + close(c) + c = nil + }), + ) + + op := wl.putInMaintenanceWorker(bc) + Expect(op).To(Equal(opClosed)) + }) + }) + + Describe("checkPendingOperation", func() { + It("should return got=false when nothing happens on channel", func() { + op := checkPendingOperation(c) + Expect(op).To(Equal(opEmpty)) + }) + + It("should return state when new message appeared on channel", func() { + c <- testState + op := checkPendingOperation(c) + Expect(op).To(Equal(opState)) + }) + + It("should return last state when new message appeared on channel", func() { + c <- boruta.FAIL + c <- boruta.BUSY + c <- boruta.FAIL + c <- testState + op := checkPendingOperation(c) + Expect(op).To(Equal(opState)) + }) + + It("should return open=false when channel is closed", func() { + close(c) + defer func() { c = nil }() + op := checkPendingOperation(c) + Expect(op).To(Equal(opClosed)) + }) + }) + + Describe("backgroundLoop", func() { + var done bool + var m sync.Locker + isDone := func() bool { + m.Lock() + defer m.Unlock() + return done + } + setDone := func(val bool) { + m.Lock() + defer m.Unlock() + done = val + } + run := func() { + defer GinkgoRecover() + wl.backgroundLoop(bc) + setDone(true) + } + ignoredStates := []boruta.WorkerState{boruta.MAINTENANCE, boruta.IDLE, boruta.RUN, boruta.FAIL} + + BeforeEach(func() { + m = new(sync.Mutex) + setDone(false) + }) + + It("should run infinitely, but stop if channel is closed", func() { + go run() + Consistently(isDone).Should(BeFalse()) + close(c) + c = nil + Eventually(isDone).Should(BeTrue()) + }) + + It("should ignore states not requiring any action and return after channel is closed", func() { + go run() + for _, state := range ignoredStates { + By(string(state)) + c <- state + } + // Check that loop is still running. + Consistently(isDone).Should(BeFalse()) + + // Check loop returns after channel is closed. + close(c) + c = nil + Eventually(isDone).Should(BeTrue()) + }) + + It("should run prepareKey if PREPARE state is sent over channel", func() { + go run() + gomock.InOrder( + dcm.EXPECT().Create(info.dryad), + dcm.EXPECT().Prepare(gomock.Any()), + dcm.EXPECT().Close().Do(func() { + close(c) + c = nil + }), + ) + c <- boruta.PREPARE + Eventually(isDone).Should(BeTrue()) + }) + + It("should run putInMaintenance if BUSY state is sent over channel", func() { + go run() + gomock.InOrder( + dcm.EXPECT().Create(info.dryad), + dcm.EXPECT().PutInMaintenance(putStr), + dcm.EXPECT().Close().Do(func() { + close(c) + c = nil + }), + ) + c <- boruta.BUSY + Eventually(isDone).Should(BeTrue()) + }) + + DescribeTable("should break running current background task if other state is sent over channel", + func(current, another boruta.WorkerState) { + go run() + gomock.InOrder( + dcm.EXPECT().Create(info.dryad).DoAndReturn(func(*net.TCPAddr) error { + c <- another + return nil + }), + dcm.EXPECT().Close().Do(func() { + close(c) + c = nil + }), + ) + c <- current + Eventually(isDone).Should(BeTrue()) + }, + Entry("PREPARE->MAINTENANCE", boruta.PREPARE, boruta.MAINTENANCE), + Entry("PREPARE->IDLE", boruta.PREPARE, boruta.IDLE), + Entry("PREPARE->RUN", boruta.PREPARE, boruta.RUN), + Entry("PREPARE->FAIL", boruta.PREPARE, boruta.FAIL), + Entry("BUSY->MAINTENANCE", boruta.BUSY, boruta.MAINTENANCE), + Entry("BUSY->IDLE", boruta.BUSY, boruta.IDLE), + Entry("BUSY->RUN", boruta.BUSY, boruta.RUN), + Entry("BUSY->FAIL", boruta.BUSY, boruta.FAIL), + ) + + DescribeTable("should break running current background task and start prepareKey if PREPARE is sent over channel", + func(current boruta.WorkerState) { + go run() + gomock.InOrder( + dcm.EXPECT().Create(info.dryad).DoAndReturn(func(*net.TCPAddr) error { + c <- boruta.PREPARE + return nil + }), + dcm.EXPECT().Close(), + dcm.EXPECT().Create(info.dryad), + dcm.EXPECT().Prepare(gomock.Any()), + dcm.EXPECT().Close().Do(func() { + close(c) + c = nil + }), + ) + c <- current + Eventually(isDone).Should(BeTrue()) + }, + Entry("PREPARE", boruta.PREPARE), + Entry("BUSY", boruta.BUSY), + ) + + DescribeTable("should break running current background task and start putInMaintenance if BUSY is sent over channel", + func(current boruta.WorkerState) { + go run() + gomock.InOrder( + dcm.EXPECT().Create(info.dryad).DoAndReturn(func(*net.TCPAddr) error { + c <- boruta.BUSY + return nil + }), + dcm.EXPECT().Close(), + dcm.EXPECT().Create(info.dryad), + dcm.EXPECT().PutInMaintenance(putStr), + dcm.EXPECT().Close().Do(func() { + close(c) + c = nil + }), + ) + c <- current + Eventually(isDone).Should(BeTrue()) + }, + Entry("PREPARE", boruta.PREPARE), + Entry("BUSY", boruta.BUSY), + ) }) }) }) @@ -552,6 +959,7 @@ var _ = Describe("WorkerList", func() { Expect(wl.workers[worker].Groups).To(BeNil()) wl.mutex.RUnlock() }) + Describe("SetGroup with ChangeListener", func() { var ctrl *gomock.Controller var wc *MockWorkerChange @@ -576,7 +984,7 @@ var _ = Describe("WorkerList", func() { Expect(err).ToNot(HaveOccurred()) }) It("should not notify changeListener if set and worker's state is other than IDLE", func() { - for _, state := range []boruta.WorkerState{boruta.MAINTENANCE, boruta.FAIL, boruta.RUN} { + for _, state := range []boruta.WorkerState{boruta.MAINTENANCE, boruta.FAIL, boruta.RUN, boruta.PREPARE, boruta.BUSY} { By(string(state)) wl.mutex.RLock() @@ -589,7 +997,7 @@ var _ = Describe("WorkerList", func() { }) }) It("should not notify changeListener if not set", func() { - for _, state := range []boruta.WorkerState{boruta.MAINTENANCE, boruta.FAIL, boruta.RUN, boruta.IDLE} { + for _, state := range []boruta.WorkerState{boruta.MAINTENANCE, boruta.FAIL, boruta.RUN, boruta.IDLE, boruta.PREPARE, boruta.BUSY} { By(string(state)) wl.mutex.RLock() @@ -875,7 +1283,10 @@ var _ = Describe("WorkerList", func() { ctrl.Finish() }) - It("should set worker into IDLE in without-key preparation", func() { + It("should set worker in RUN state into IDLE in without-key preparation", func() { + wl.mutex.RLock() + wl.workers[worker].State = boruta.RUN + wl.mutex.RUnlock() err := wl.PrepareWorker(worker, false) Expect(err).NotTo(HaveOccurred()) wl.mutex.RLock() @@ -884,17 +1295,40 @@ var _ = Describe("WorkerList", func() { Expect(ok).To(BeTrue()) Expect(info.State).To(Equal(boruta.IDLE)) }) + + DescribeTable("should fail to set worker into IDLE in without-key preparation", + func(from boruta.WorkerState) { + wl.mutex.RLock() + wl.workers[worker].State = from + wl.mutex.RUnlock() + err := wl.PrepareWorker(worker, false) + Expect(err).To(Equal(ErrForbiddenStateChange)) + wl.mutex.RLock() + info, ok := wl.workers[worker] + wl.mutex.RUnlock() + Expect(ok).To(BeTrue()) + Expect(info.State).To(Equal(from)) + }, + Entry("MAINTENANCE", boruta.MAINTENANCE), + Entry("IDLE", boruta.IDLE), + Entry("FAIL", boruta.FAIL), + Entry("PREPARE", boruta.PREPARE), + Entry("BUSY", boruta.BUSY), + ) + It("should fail to prepare not existing worker in without-key preparation", func() { - uuid := randomUUID() - err := wl.PrepareWorker(uuid, false) + err := wl.PrepareWorker(noWorker, false) Expect(err).To(Equal(ErrWorkerNotFound)) }) - It("should ignore to prepare worker for non-existing worker", func() { + + It("should fail to prepare not existing worker in with-key preparation", func() { err := wl.PrepareWorker(noWorker, true) - Expect(err).NotTo(HaveOccurred()) + Expect(err).To(Equal(ErrWorkerNotFound)) }) + Describe("with worker's IP set", func() { var info *mapWorker + BeforeEach(func() { var ok bool info, ok = wl.workers[worker] @@ -902,12 +1336,18 @@ var _ = Describe("WorkerList", func() { Expect(info.key).To(BeNil()) info.dryad = new(net.TCPAddr) info.dryad.IP = ip + info.State = boruta.RUN }) + It("should set worker into IDLE state and prepare a key", func() { gomock.InOrder( dcm.EXPECT().Create(info.dryad), dcm.EXPECT().Prepare(gomock.Any()).Return(nil), - dcm.EXPECT().Close(), + dcm.EXPECT().Close().Do(func() { + wl.mutex.Lock() + Expect(info.State).To(Equal(boruta.PREPARE)) + wl.mutex.Unlock() + }), ) err := wl.PrepareWorker(worker, true) @@ -916,11 +1356,16 @@ var _ = Describe("WorkerList", func() { eventuallyState(info, boruta.IDLE) eventuallyKey(info, Not(Equal(&rsa.PrivateKey{}))) }) + It("should fail to prepare worker if dryadClientManager fails to prepare client", func() { gomock.InOrder( dcm.EXPECT().Create(info.dryad), dcm.EXPECT().Prepare(gomock.Any()).Return(testerr), - dcm.EXPECT().Close(), + dcm.EXPECT().Close().Do(func() { + wl.mutex.Lock() + Expect(info.State).To(Equal(boruta.PREPARE)) + wl.mutex.Unlock() + }), ) err := wl.PrepareWorker(worker, true) @@ -929,8 +1374,14 @@ var _ = Describe("WorkerList", func() { eventuallyState(info, boruta.FAIL) Expect(info.key).To(BeNil()) }) + It("should fail to prepare worker if dryadClientManager fails to create client", func() { - dcm.EXPECT().Create(info.dryad).Return(testerr) + dcm.EXPECT().Create(info.dryad).DoAndReturn(func(*net.TCPAddr) error { + wl.mutex.Lock() + Expect(info.State).To(Equal(boruta.PREPARE)) + wl.mutex.Unlock() + return testerr + }) err := wl.PrepareWorker(worker, true) Expect(err).NotTo(HaveOccurred()) @@ -938,6 +1389,24 @@ var _ = Describe("WorkerList", func() { eventuallyState(info, boruta.FAIL) Expect(info.key).To(BeNil()) }) + + It("should fail to prepare worker if key generation fails", func() { + gomock.InOrder( + dcm.EXPECT().Create(info.dryad), + dcm.EXPECT().Close().Do(func() { + wl.mutex.Lock() + Expect(info.State).To(Equal(boruta.PREPARE)) + wl.mutex.Unlock() + }), + ) + + sizeRSA = 1 + err := wl.PrepareWorker(worker, true) + Expect(err).NotTo(HaveOccurred()) + + eventuallyState(info, boruta.FAIL) + Expect(info.key).To(BeNil()) + }) }) }) @@ -955,6 +1424,13 @@ var _ = Describe("WorkerList", func() { Expect(wl.workers[worker].State).To(Equal(state)) wl.mutex.RUnlock() } + swap := func(newChan chan boruta.WorkerState) (oldChan chan boruta.WorkerState) { + wl.mutex.Lock() + oldChan = wl.workers[worker].backgroundOperation + wl.workers[worker].backgroundOperation = newChan + wl.mutex.Unlock() + return + } BeforeEach(func() { ctrl = gomock.NewController(GinkgoT()) wc = NewMockWorkerChange(ctrl) @@ -967,47 +1443,85 @@ var _ = Describe("WorkerList", func() { DescribeTable("Should change state without calling changeListener", func(from, to boruta.WorkerState) { set(from) + newChan := make(chan boruta.WorkerState, backgroundOperationsBufferSize) + oldChan := swap(newChan) + defer swap(oldChan) + err := wl.setState(worker, to) Expect(err).NotTo(HaveOccurred()) check(to) + Eventually(newChan).Should(Receive(Equal(to))) }, Entry("MAINTENANCE->MAINTENANCE", boruta.MAINTENANCE, boruta.MAINTENANCE), Entry("MAINTENANCE->RUN", boruta.MAINTENANCE, boruta.RUN), Entry("MAINTENANCE->FAIL", boruta.MAINTENANCE, boruta.FAIL), + Entry("MAINTENANCE->PREPARE", boruta.MAINTENANCE, boruta.PREPARE), + Entry("MAINTENANCE->BUSY", boruta.MAINTENANCE, boruta.BUSY), Entry("IDLE->MAINTENANCE", boruta.IDLE, boruta.MAINTENANCE), Entry("IDLE->RUN", boruta.IDLE, boruta.RUN), Entry("IDLE->FAIL", boruta.IDLE, boruta.FAIL), + Entry("IDLE->PREPARE", boruta.IDLE, boruta.PREPARE), + Entry("IDLE->BUSY", boruta.IDLE, boruta.BUSY), + Entry("RUN->PREPARE", boruta.RUN, boruta.PREPARE), Entry("FAIL->MAINTENANCE", boruta.FAIL, boruta.MAINTENANCE), Entry("FAIL->RUN", boruta.FAIL, boruta.RUN), Entry("FAIL->FAIL", boruta.FAIL, boruta.FAIL), + Entry("FAIL->PREPARE", boruta.FAIL, boruta.PREPARE), + Entry("FAIL->BUSY", boruta.FAIL, boruta.BUSY), + Entry("PREPARE->MAINTENANCE", boruta.PREPARE, boruta.MAINTENANCE), + Entry("PREPARE->RUN", boruta.PREPARE, boruta.RUN), + Entry("PREPARE->FAIL", boruta.PREPARE, boruta.FAIL), + Entry("PREPARE->PREPARE", boruta.PREPARE, boruta.PREPARE), + Entry("PREPARE->BUSY", boruta.PREPARE, boruta.BUSY), + Entry("BUSY->MAINTENANCE", boruta.BUSY, boruta.MAINTENANCE), + Entry("BUSY->RUN", boruta.BUSY, boruta.RUN), + Entry("BUSY->FAIL", boruta.BUSY, boruta.FAIL), + Entry("BUSY->PREPARE", boruta.BUSY, boruta.PREPARE), + Entry("BUSY->BUSY", boruta.BUSY, boruta.BUSY), ) + DescribeTable("Should change state and call OnWorkerIdle", func(from, to boruta.WorkerState) { set(from) + newChan := make(chan boruta.WorkerState, backgroundOperationsBufferSize) + oldChan := swap(newChan) + defer swap(oldChan) wc.EXPECT().OnWorkerIdle(worker) + err := wl.setState(worker, to) Expect(err).NotTo(HaveOccurred()) check(to) + Eventually(newChan).Should(Receive(Equal(to))) }, Entry("MAINTENANCE->IDLE", boruta.MAINTENANCE, boruta.IDLE), Entry("IDLE->IDLE", boruta.IDLE, boruta.IDLE), Entry("RUN->IDLE", boruta.RUN, boruta.IDLE), Entry("FAIL->IDLE", boruta.FAIL, boruta.IDLE), + Entry("PREPARE->IDLE", boruta.PREPARE, boruta.IDLE), + Entry("BUSY->IDLE", boruta.BUSY, boruta.IDLE), ) + DescribeTable("Should change state and call OnWorkerFail", func(from, to boruta.WorkerState) { set(from) + newChan := make(chan boruta.WorkerState, backgroundOperationsBufferSize) + oldChan := swap(newChan) + defer swap(oldChan) wc.EXPECT().OnWorkerFail(worker) + err := wl.setState(worker, to) Expect(err).NotTo(HaveOccurred()) check(to) + Eventually(newChan).Should(Receive(Equal(to))) }, Entry("RUN->MAINTENANCE", boruta.RUN, boruta.MAINTENANCE), Entry("RUN->RUN", boruta.RUN, boruta.RUN), Entry("RUN->FAIL", boruta.RUN, boruta.FAIL), + Entry("RUN->BUSY", boruta.RUN, boruta.BUSY), ) }) }) + Describe("TakeBestMatchingWorker", func() { addWorker := func(groups boruta.Groups, caps boruta.Capabilities) *mapWorker { capsUUID := getUUID() @@ -1028,11 +1542,7 @@ var _ = Describe("WorkerList", func() { } addIdleWorker := func(groups boruta.Groups, caps boruta.Capabilities) *mapWorker { w := addWorker(groups, caps) - - err := wl.PrepareWorker(w.WorkerUUID, false) - Expect(err).NotTo(HaveOccurred()) - Expect(w.State).To(Equal(boruta.IDLE)) - + w.State = boruta.IDLE return w } generateGroups := func(count int) boruta.Groups { @@ -1051,11 +1561,13 @@ var _ = Describe("WorkerList", func() { } return caps } + It("should fail to find matching worker when there are no workers", func() { ret, err := wl.TakeBestMatchingWorker(boruta.Groups{}, boruta.Capabilities{}) Expect(err).To(Equal(ErrNoMatchingWorker)) Expect(ret).To(BeZero()) }) + It("should match fitting worker and set it into RUN state", func() { w := addIdleWorker(boruta.Groups{}, boruta.Capabilities{}) @@ -1064,6 +1576,7 @@ var _ = Describe("WorkerList", func() { Expect(ret).To(Equal(w.WorkerUUID)) Expect(w.State).To(Equal(boruta.RUN)) }) + It("should not match not IDLE workers", func() { addWorker(boruta.Groups{}, boruta.Capabilities{}) @@ -1071,6 +1584,7 @@ var _ = Describe("WorkerList", func() { Expect(err).To(Equal(ErrNoMatchingWorker)) Expect(ret).To(BeZero()) }) + It("should choose least capable worker", func() { // Create matching workers. w5g5c := addIdleWorker(generateGroups(5), generateCaps(5)) @@ -1097,6 +1611,7 @@ var _ = Describe("WorkerList", func() { } }) }) + Describe("SetChangeListener", func() { It("should set WorkerChange", func() { ctrl := gomock.NewController(GinkgoT()) diff --git a/workers/workers.go b/workers/workers.go index ea37d9a..69dbaaf 100644 --- a/workers/workers.go +++ b/workers/workers.go @@ -37,14 +37,46 @@ const UUID string = "UUID" // It is a variable for test purposes. var sizeRSA = 4096 +// backgroundOperationsBufferSize defines buffer size of the channel +// used for communication with background goroutine launched for every +// registered worker. The goroutine processes long operations like: +// preparation of Dryad to work or putting it into maintenance state. +// Goroutines related with API calls use channel to initiate background +// operations. Only one operation is run at the same time. The buffer +// on channel allows non-blocking delegation of these operations. +const backgroundOperationsBufferSize int = 32 + +// pendingOperation describes status of reader's end of the channel used by +// background routine. +// The got flag indicates if there is any operation pending on channel. +// Only if got is set to true, other fields should be analyzed. +// The open flag indicates if channel is still open. It is set to false +// when channel was closed on writer side (during deregistration of worker). +// The state field contains new state that worker was switched to. +type pendingOperation struct { + state boruta.WorkerState + open bool + got bool +} + +// backgroundContext aggregates data required by functions running in background +// goroutine to identify context of proper worker. The context is build of: +// c - a reader's end of channel; +// uuid - identificator of worker. +type backgroundContext struct { + c <-chan boruta.WorkerState + uuid boruta.WorkerUUID +} + // mapWorker is used by WorkerList to store all // (public and private) structures representing Worker. type mapWorker struct { boruta.WorkerInfo - dryad *net.TCPAddr - sshd *net.TCPAddr - ip net.IP - key *rsa.PrivateKey + dryad *net.TCPAddr + sshd *net.TCPAddr + ip net.IP + key *rsa.PrivateKey + backgroundOperation chan boruta.WorkerState } // WorkerList implements Superviser and Workers interfaces. @@ -126,15 +158,18 @@ func (wl *WorkerList) Register(caps boruta.Capabilities, dryadAddress string, worker.dryad = dryad worker.sshd = sshd } else { + c := make(chan boruta.WorkerState, backgroundOperationsBufferSize) wl.workers[uuid] = &mapWorker{ WorkerInfo: boruta.WorkerInfo{ WorkerUUID: uuid, State: boruta.MAINTENANCE, Caps: caps, }, - dryad: dryad, - sshd: sshd, + dryad: dryad, + sshd: sshd, + backgroundOperation: c, } + go wl.backgroundLoop(backgroundContext{c: c, uuid: uuid}) } return nil } @@ -149,7 +184,8 @@ func (wl *WorkerList) SetFail(uuid boruta.WorkerUUID, reason string) error { if !ok { return ErrWorkerNotFound } - if worker.State == boruta.MAINTENANCE { + // Ignore entering FAIL state if administrator started maintenance already. + if worker.State == boruta.MAINTENANCE || worker.State == boruta.BUSY { return ErrInMaintenance } return wl.setState(uuid, boruta.FAIL) @@ -167,15 +203,23 @@ func (wl *WorkerList) SetState(uuid boruta.WorkerUUID, state boruta.WorkerState) if !ok { return ErrWorkerNotFound } + // Do nothing if transition to MAINTENANCE state is already ongoing. + if state == boruta.MAINTENANCE && worker.State == boruta.BUSY { + return nil + } + // Do nothing if transition to IDLE state is already ongoing. + if state == boruta.IDLE && worker.State == boruta.PREPARE { + return nil + } // State transitions to IDLE are allowed from MAINTENANCE state only. if state == boruta.IDLE && worker.State != boruta.MAINTENANCE { return ErrForbiddenStateChange } switch state { case boruta.IDLE: - go wl.prepareKeyAndSetState(uuid) + wl.setState(uuid, boruta.PREPARE) case boruta.MAINTENANCE: - go wl.putInMaintenanceWorker(uuid) + wl.setState(uuid, boruta.BUSY) } return nil } @@ -206,6 +250,7 @@ func (wl *WorkerList) Deregister(uuid boruta.WorkerUUID) error { if worker.State != boruta.MAINTENANCE { return ErrNotInMaintenance } + close(worker.backgroundOperation) delete(wl.workers, uuid) return nil } @@ -265,14 +310,14 @@ func (wl *WorkerList) ListWorkers(groups boruta.Groups, caps boruta.Capabilities wl.mutex.RLock() defer wl.mutex.RUnlock() - return wl.listWorkers(groups, caps) + return wl.listWorkers(groups, caps, false) } // listWorkers lists all workers when both: // * any of the groups is matching (or groups is nil) // * all of the caps is matching (or caps is nil) // Caller of this method should own the mutex. -func (wl *WorkerList) listWorkers(groups boruta.Groups, caps boruta.Capabilities) ([]boruta.WorkerInfo, error) { +func (wl *WorkerList) listWorkers(groups boruta.Groups, caps boruta.Capabilities, onlyIdle bool) ([]boruta.WorkerInfo, error) { matching := make([]boruta.WorkerInfo, 0, len(wl.workers)) groupsMatcher := make(map[boruta.Group]interface{}) @@ -283,6 +328,9 @@ func (wl *WorkerList) listWorkers(groups boruta.Groups, caps boruta.Capabilities for _, worker := range wl.workers { if isGroupsMatching(worker.WorkerInfo, groupsMatcher) && isCapsMatching(worker.WorkerInfo, caps) { + if onlyIdle && (worker.State != boruta.IDLE) { + continue + } matching = append(matching, worker.WorkerInfo) } } @@ -360,11 +408,8 @@ func (wl *WorkerList) TakeBestMatchingWorker(groups boruta.Groups, caps boruta.C var bestScore = math.MaxInt32 - matching, _ := wl.listWorkers(groups, caps) + matching, _ := wl.listWorkers(groups, caps, true) for _, info := range matching { - if info.State != boruta.IDLE { - continue - } score := len(info.Caps) + len(info.Groups) if score < bestScore { bestScore = score @@ -389,109 +434,159 @@ func (wl *WorkerList) TakeBestMatchingWorker(groups boruta.Groups, caps boruta.C // As key creation can take some time, the method is asynchronous and the worker's // state might not be changed when it returns. // It is a part of WorkersManager interface implementation by WorkerList. -func (wl *WorkerList) PrepareWorker(worker boruta.WorkerUUID, withKeyGeneration bool) error { +func (wl *WorkerList) PrepareWorker(uuid boruta.WorkerUUID, withKeyGeneration bool) error { + wl.mutex.Lock() + defer wl.mutex.Unlock() + + worker, ok := wl.workers[uuid] + if !ok { + return ErrWorkerNotFound + } + if worker.State != boruta.RUN { + return ErrForbiddenStateChange + } + if !withKeyGeneration { - wl.mutex.Lock() - defer wl.mutex.Unlock() - return wl.setState(worker, boruta.IDLE) + return wl.setState(uuid, boruta.IDLE) } + return wl.setState(uuid, boruta.PREPARE) +} - go wl.prepareKeyAndSetState(worker) +// setState changes state of worker. It does not contain any verification if change +// is feasible. It should be used only for internal boruta purposes. It must be +// called inside WorkerList critical section guarded by WorkerList.mutex. +func (wl *WorkerList) setState(uuid boruta.WorkerUUID, state boruta.WorkerState) error { + worker, ok := wl.workers[uuid] + if !ok { + return ErrWorkerNotFound + } + // Send information about changing state to the background loop to possible break some operations. + worker.backgroundOperation <- state + if wl.changeListener != nil { + if state == boruta.IDLE { + wl.changeListener.OnWorkerIdle(uuid) + } + // Inform that Job execution was possibly broken when changing RUN state + // to any other than IDLE or PREPARE. + if worker.State == boruta.RUN && state != boruta.IDLE && state != boruta.PREPARE { + wl.changeListener.OnWorkerFail(uuid) + } + } + worker.State = state return nil } // prepareKeyAndSetState prepares private RSA key for the worker and sets worker // into IDLE state in case of success. In case of failure of key preparation, // worker is put into FAIL state instead. -func (wl *WorkerList) prepareKeyAndSetState(worker boruta.WorkerUUID) { - err := wl.prepareKey(worker) - wl.mutex.Lock() - defer wl.mutex.Unlock() - if err != nil { - // TODO log error. - wl.setState(worker, boruta.FAIL) +func (wl *WorkerList) prepareKeyAndSetState(bc backgroundContext) (op pendingOperation) { + var err error + op, err = wl.prepareKey(bc) + if op.got { return } - wl.setState(worker, boruta.IDLE) -} -// putInMaintenanceWorker puts Dryad into maintenance mode and sets worker -// into MAINTENANCE state in case of success. In case of failure of entering -// maintenance mode, worker is put into FAIL state instead. -func (wl *WorkerList) putInMaintenanceWorker(worker boruta.WorkerUUID) { - err := wl.putInMaintenance(worker) wl.mutex.Lock() defer wl.mutex.Unlock() - if err != nil { - wl.setState(worker, boruta.FAIL) + + if op = checkPendingOperation(bc.c); op.got { return } - wl.setState(worker, boruta.MAINTENANCE) -} -// setState changes state of worker. It does not contain any verification if change -// is feasible. It should be used only for internal boruta purposes. It must be -// called inside WorkerList critical section guarded by WorkerList.mutex. -func (wl *WorkerList) setState(worker boruta.WorkerUUID, state boruta.WorkerState) error { - w, ok := wl.workers[worker] - if !ok { - return ErrWorkerNotFound - } - if wl.changeListener != nil { - if state == boruta.IDLE { - wl.changeListener.OnWorkerIdle(worker) - } else { - if w.State == boruta.RUN { - wl.changeListener.OnWorkerFail(worker) - } - } + if err != nil { + // TODO log error. + wl.setState(bc.uuid, boruta.FAIL) + return } - w.State = state - return nil + wl.setState(bc.uuid, boruta.IDLE) + return } // prepareKey generates key, installs public part on worker and stores private part in WorkerList. -func (wl *WorkerList) prepareKey(worker boruta.WorkerUUID) error { - addr, err := wl.getWorkerAddr(worker) - if err != nil { - return err +func (wl *WorkerList) prepareKey(bc backgroundContext) (op pendingOperation, err error) { + if op = checkPendingOperation(bc.c); op.got { + return + } + addr, err := wl.getWorkerAddr(bc.uuid) + if op = checkPendingOperation(bc.c); op.got || err != nil { + return } client := wl.newDryadClient() err = client.Create(&addr) if err != nil { - return err + op = checkPendingOperation(bc.c) + return } defer client.Close() + if op = checkPendingOperation(bc.c); op.got { + return + } key, err := rsa.GenerateKey(rand.Reader, sizeRSA) - if err != nil { - return err + if op = checkPendingOperation(bc.c); op.got || err != nil { + return } pubKey, err := ssh.NewPublicKey(&key.PublicKey) - if err != nil { - return err + if op = checkPendingOperation(bc.c); op.got || err != nil { + return } err = client.Prepare(&pubKey) + if op = checkPendingOperation(bc.c); op.got || err != nil { + return + } + err = wl.setWorkerKey(bc.uuid, key) + op = checkPendingOperation(bc.c) + return +} + +// putInMaintenanceWorker puts Dryad into maintenance mode and sets worker +// into MAINTENANCE state in case of success. In case of failure of entering +// maintenance mode, worker is put into FAIL state instead. +func (wl *WorkerList) putInMaintenanceWorker(bc backgroundContext) (op pendingOperation) { + var err error + op, err = wl.putInMaintenance(bc) + if op.got { + return + } + + wl.mutex.Lock() + defer wl.mutex.Unlock() + + if op = checkPendingOperation(bc.c); op.got { + return + } + if err != nil { - return err + // TODO log error. + wl.setState(bc.uuid, boruta.FAIL) + return } - err = wl.setWorkerKey(worker, key) - return err + wl.setState(bc.uuid, boruta.MAINTENANCE) + return } // putInMaintenance orders Dryad to enter maintenance mode. -func (wl *WorkerList) putInMaintenance(worker boruta.WorkerUUID) error { - addr, err := wl.getWorkerAddr(worker) - if err != nil { - return err +func (wl *WorkerList) putInMaintenance(bc backgroundContext) (op pendingOperation, err error) { + if op = checkPendingOperation(bc.c); op.got { + return + } + addr, err := wl.getWorkerAddr(bc.uuid) + if op = checkPendingOperation(bc.c); op.got || err != nil { + return } client := wl.newDryadClient() err = client.Create(&addr) if err != nil { - return err + op = checkPendingOperation(bc.c) + return } defer client.Close() - return client.PutInMaintenance("maintenance") + if op = checkPendingOperation(bc.c); op.got { + return + } + err = client.PutInMaintenance("maintenance") + op = checkPendingOperation(bc.c) + return } // SetChangeListener sets change listener object in WorkerList. Listener should be @@ -501,3 +596,48 @@ func (wl *WorkerList) putInMaintenance(worker boruta.WorkerUUID) error { func (wl *WorkerList) SetChangeListener(listener WorkerChange) { wl.changeListener = listener } + +// checkPendingOperation verifies status of the communication channel in a non-blocking way. +// It returns pendingOperation structure containing status of the channel. +func checkPendingOperation(c <-chan boruta.WorkerState) (op pendingOperation) { + for { + select { + case op.state, op.open = <-c: + op.got = true + if !op.open { + return + } + default: + return + } + } +} + +// backgroundLoop is the main procedure of a background goroutine launched for every registered +// worker. It hangs on channel, waiting for new worker state to be processed or for close +// of the channel (when worker was deregistered). +// If new state is received, proper long operation is launched. +// If long operation has been broken by appearance of the new state on channel or channel closure, +// new state is processed immediately. +// Procedure ends, when channel is closed. +func (wl *WorkerList) backgroundLoop(bc backgroundContext) { + var op pendingOperation + + for { + if !op.got { + op.state, op.open = <-bc.c + } + if !op.open { + // Worker has been deregistered. Ending background loop. + return + } + // Clear op.got flag as we consume received state. + op.got = false + switch op.state { + case boruta.PREPARE: + op = wl.prepareKeyAndSetState(bc) + case boruta.BUSY: + op = wl.putInMaintenanceWorker(bc) + } + } +} -- 2.7.4