"errors"
"fmt"
"net"
+ "sync"
"git.tizen.org/tools/boruta"
"git.tizen.org/tools/boruta/rpc/dryad"
defer wl.mutex.RUnlock()
Expect(wl.workers).To(HaveKey(uuid))
Expect(wl.workers[uuid].State).To(Equal(boruta.MAINTENANCE))
+ Expect(wl.workers[uuid].backgroundOperation).NotTo(BeNil())
})
It("should update the caps when called twice for the same worker", func() {
worker = registerWorker()
})
+ AfterEach(func() {
+ wl.Deregister(worker)
+ })
+
Describe("SetFail", func() {
It("should fail to SetFail of nonexistent worker", func() {
uuid := randomUUID()
})
It("should work to SetFail", func() {
- for _, state := range []boruta.WorkerState{boruta.IDLE, boruta.RUN} {
+ for _, state := range []boruta.WorkerState{boruta.IDLE, boruta.RUN, boruta.PREPARE} {
wl.mutex.Lock()
wl.workers[worker].State = state
wl.mutex.Unlock()
}
})
- It("Should fail to SetFail in MAINTENANCE state", func() {
- wl.mutex.Lock()
- Expect(wl.workers[worker].State).To(Equal(boruta.MAINTENANCE))
- wl.mutex.Unlock()
- err := wl.SetFail(worker, "")
- Expect(err).To(Equal(ErrInMaintenance))
- wl.mutex.RLock()
- Expect(wl.workers[worker].State).To(Equal(boruta.MAINTENANCE))
- wl.mutex.RUnlock()
+ It("Should fail to SetFail in MAINTENANCE or BUSY state", func() {
+ for _, state := range []boruta.WorkerState{boruta.MAINTENANCE, boruta.BUSY} {
+ wl.mutex.Lock()
+ wl.workers[worker].State = state
+ wl.mutex.Unlock()
+ err := wl.SetFail(worker, "")
+ Expect(err).To(Equal(ErrInMaintenance))
+ wl.mutex.RLock()
+ Expect(wl.workers[worker].State).To(Equal(state))
+ wl.mutex.RUnlock()
+ }
})
})
})
It("should fail to deregister worker not in MAINTENANCE state", func() {
- for _, state := range []boruta.WorkerState{boruta.IDLE, boruta.RUN, boruta.FAIL} {
+ for _, state := range []boruta.WorkerState{boruta.IDLE, boruta.RUN, boruta.FAIL, boruta.PREPARE, boruta.BUSY} {
wl.mutex.Lock()
wl.workers[worker].State = state
wl.mutex.Unlock()
})
})
- Describe("SetState", func() {
- It("should fail to SetState of nonexistent worker", func() {
- uuid := randomUUID()
- err := wl.SetState(uuid, boruta.MAINTENANCE)
- Expect(err).To(Equal(ErrWorkerNotFound))
- })
+ Describe("with dryadClientManager mockup", func() {
+ var ctrl *gomock.Controller
+ var dcm *MockDryadClientManager
+ ip := net.IPv4(2, 4, 6, 8)
+ testerr := errors.New("Test Error")
+ var info *mapWorker
+ noWorker := boruta.WorkerUUID("There's no such worker")
+ putStr := "maintenance"
- It("should fail to SetState for invalid transitions", func() {
- invalidTransitions := [][]boruta.WorkerState{
- {boruta.RUN, boruta.IDLE},
- {boruta.FAIL, boruta.IDLE},
- }
- for _, transition := range invalidTransitions {
- fromState, toState := transition[0], transition[1]
- wl.mutex.Lock()
- wl.workers[worker].State = fromState
- wl.mutex.Unlock()
- err := wl.SetState(worker, toState)
- Expect(err).To(Equal(ErrForbiddenStateChange))
+ eventuallyState := func(info *mapWorker, state boruta.WorkerState) {
+ EventuallyWithOffset(1, func() boruta.WorkerState {
wl.mutex.RLock()
- Expect(wl.workers[worker].State).To(Equal(fromState))
- wl.mutex.RUnlock()
+ defer wl.mutex.RUnlock()
+ return info.State
+ }).Should(Equal(state))
+ }
+ eventuallyKey := func(info *mapWorker, match types.GomegaMatcher) {
+ EventuallyWithOffset(1, func() *rsa.PrivateKey {
+ wl.mutex.RLock()
+ defer wl.mutex.RUnlock()
+ return info.key
+ }).Should(match)
+ }
+
+ BeforeEach(func() {
+ ctrl = gomock.NewController(GinkgoT())
+ dcm = NewMockDryadClientManager(ctrl)
+ wl.newDryadClient = func() dryad.ClientManager {
+ return dcm
}
+
+ var ok bool
+ wl.mutex.Lock()
+ info, ok = wl.workers[worker]
+ Expect(ok).To(BeTrue())
+ Expect(info.key).To(BeNil())
+ info.dryad = new(net.TCPAddr)
+ info.dryad.IP = ip
+ wl.mutex.Unlock()
})
- It("should fail to SetState for incorrect state argument", func() {
- invalidArgument := [][]boruta.WorkerState{
- {boruta.MAINTENANCE, boruta.RUN},
- {boruta.MAINTENANCE, boruta.FAIL},
- {boruta.IDLE, boruta.FAIL},
- {boruta.IDLE, boruta.RUN},
- {boruta.RUN, boruta.FAIL},
- {boruta.FAIL, boruta.RUN},
- }
- for _, transition := range invalidArgument {
- fromState, toState := transition[0], transition[1]
- wl.mutex.Lock()
- wl.workers[worker].State = fromState
- wl.mutex.Unlock()
- err := wl.SetState(worker, toState)
- Expect(err).To(Equal(ErrWrongStateArgument))
- wl.mutex.RLock()
- Expect(wl.workers[worker].State).To(Equal(fromState))
- wl.mutex.RUnlock()
- }
+ AfterEach(func() {
+ ctrl.Finish()
})
- Describe("with dryadClientManager mockup", func() {
- var ctrl *gomock.Controller
- var dcm *MockDryadClientManager
- ip := net.IPv4(2, 4, 6, 8)
- testerr := errors.New("Test Error")
- var info *mapWorker
- noWorker := boruta.WorkerUUID("There's no such worker")
- putStr := "maintenance"
- eventuallyState := func(info *mapWorker, state boruta.WorkerState) {
- EventuallyWithOffset(1, func() boruta.WorkerState {
- wl.mutex.RLock()
- defer wl.mutex.RUnlock()
- return info.State
- }).Should(Equal(state))
- }
- eventuallyKey := func(info *mapWorker, match types.GomegaMatcher) {
- EventuallyWithOffset(1, func() *rsa.PrivateKey {
- wl.mutex.RLock()
- defer wl.mutex.RUnlock()
- return info.key
- }).Should(match)
- }
+ Describe("SetState", func() {
+ It("should fail to SetState of nonexistent worker", func() {
+ uuid := randomUUID()
+ err := wl.SetState(uuid, boruta.MAINTENANCE)
+ Expect(err).To(Equal(ErrWorkerNotFound))
+ })
- BeforeEach(func() {
- ctrl = gomock.NewController(GinkgoT())
- dcm = NewMockDryadClientManager(ctrl)
- wl.newDryadClient = func() dryad.ClientManager {
- return dcm
+ It("should fail to SetState for invalid transitions", func() {
+ invalidTransitions := [][]boruta.WorkerState{
+ {boruta.RUN, boruta.IDLE},
+ {boruta.FAIL, boruta.IDLE},
+ {boruta.BUSY, boruta.IDLE},
}
+ for _, transition := range invalidTransitions {
+ fromState, toState := transition[0], transition[1]
+ wl.mutex.Lock()
+ wl.workers[worker].State = fromState
+ wl.mutex.Unlock()
+ err := wl.SetState(worker, toState)
+ Expect(err).To(Equal(ErrForbiddenStateChange))
+ wl.mutex.RLock()
+ Expect(wl.workers[worker].State).To(Equal(fromState))
+ wl.mutex.RUnlock()
+ }
+ })
- var ok bool
- wl.mutex.Lock()
- info, ok = wl.workers[worker]
- Expect(ok).To(BeTrue())
- Expect(info.key).To(BeNil())
- info.dryad = new(net.TCPAddr)
- info.dryad.IP = ip
- wl.mutex.Unlock()
+ It("should ignore SetState if transition is already ongoing", func() {
+ invalidTransitions := [][]boruta.WorkerState{
+ {boruta.PREPARE, boruta.IDLE},
+ {boruta.BUSY, boruta.MAINTENANCE},
+ }
+ for _, transition := range invalidTransitions {
+ fromState, toState := transition[0], transition[1]
+ wl.mutex.Lock()
+ wl.workers[worker].State = fromState
+ wl.mutex.Unlock()
+ err := wl.SetState(worker, toState)
+ Expect(err).NotTo(HaveOccurred())
+ wl.mutex.RLock()
+ Expect(wl.workers[worker].State).To(Equal(fromState))
+ wl.mutex.RUnlock()
+ }
})
- AfterEach(func() {
- ctrl.Finish()
+
+ It("should fail to SetState for incorrect state argument", func() {
+ invalidArgument := [][]boruta.WorkerState{
+ {boruta.MAINTENANCE, boruta.RUN},
+ {boruta.MAINTENANCE, boruta.FAIL},
+ {boruta.MAINTENANCE, boruta.PREPARE},
+ {boruta.MAINTENANCE, boruta.BUSY},
+ {boruta.IDLE, boruta.FAIL},
+ {boruta.IDLE, boruta.RUN},
+ {boruta.IDLE, boruta.PREPARE},
+ {boruta.IDLE, boruta.BUSY},
+ {boruta.RUN, boruta.FAIL},
+ {boruta.RUN, boruta.PREPARE},
+ {boruta.RUN, boruta.BUSY},
+ {boruta.FAIL, boruta.RUN},
+ {boruta.FAIL, boruta.PREPARE},
+ {boruta.FAIL, boruta.BUSY},
+ {boruta.PREPARE, boruta.FAIL},
+ {boruta.PREPARE, boruta.RUN},
+ {boruta.PREPARE, boruta.BUSY},
+ {boruta.BUSY, boruta.FAIL},
+ {boruta.BUSY, boruta.RUN},
+ {boruta.BUSY, boruta.PREPARE},
+ }
+ for _, transition := range invalidArgument {
+ fromState, toState := transition[0], transition[1]
+ wl.mutex.Lock()
+ wl.workers[worker].State = fromState
+ wl.mutex.Unlock()
+ err := wl.SetState(worker, toState)
+ Expect(err).To(Equal(ErrWrongStateArgument))
+ wl.mutex.RLock()
+ Expect(wl.workers[worker].State).To(Equal(fromState))
+ wl.mutex.RUnlock()
+ }
})
Describe("from MAINTENANCE to IDLE", func() {
gomock.InOrder(
dcm.EXPECT().Create(info.dryad),
dcm.EXPECT().Prepare(gomock.Any()).Return(nil),
- dcm.EXPECT().Close(),
+ dcm.EXPECT().Close().Do(func() {
+ wl.mutex.Lock()
+ Expect(info.State).To(Equal(boruta.PREPARE))
+ wl.mutex.Unlock()
+ }),
)
err := wl.SetState(worker, boruta.IDLE)
gomock.InOrder(
dcm.EXPECT().Create(info.dryad),
dcm.EXPECT().Prepare(gomock.Any()).Return(testerr),
- dcm.EXPECT().Close(),
+ dcm.EXPECT().Close().Do(func() {
+ wl.mutex.Lock()
+ Expect(info.State).To(Equal(boruta.PREPARE))
+ wl.mutex.Unlock()
+ }),
)
err := wl.SetState(worker, boruta.IDLE)
})
It("should fail to SetState if dryadClientManager fails to create client", func() {
- dcm.EXPECT().Create(info.dryad).Return(testerr)
+ dcm.EXPECT().Create(info.dryad).DoAndReturn(func(*net.TCPAddr) error {
+ wl.mutex.Lock()
+ Expect(info.State).To(Equal(boruta.PREPARE))
+ wl.mutex.Unlock()
+ return testerr
+ })
err := wl.SetState(worker, boruta.IDLE)
Expect(err).ToNot(HaveOccurred())
EventuallyWithOffset(1, trigger).Should(Receive(Equal(val)))
}
- fromStates := []boruta.WorkerState{boruta.IDLE, boruta.RUN, boruta.FAIL}
+ fromStates := []boruta.WorkerState{boruta.IDLE, boruta.RUN, boruta.FAIL, boruta.PREPARE}
for _, from := range fromStates {
Describe("from "+string(from)+" to MAINTENANCE", func() {
BeforeEach(func() {
gomock.InOrder(
dcm.EXPECT().Create(info.dryad),
dcm.EXPECT().PutInMaintenance(putStr),
- dcm.EXPECT().Close(),
+ dcm.EXPECT().Close().Do(func() {
+ wl.mutex.Lock()
+ Expect(info.State).To(Equal(boruta.BUSY))
+ wl.mutex.Unlock()
+ }),
)
err := wl.SetState(worker, boruta.MAINTENANCE)
dcm.EXPECT().PutInMaintenance(putStr).Return(testerr),
dcm.EXPECT().Close().Do(func() {
wl.mutex.Lock()
- info.State = boruta.WorkerState("TEST")
+ Expect(info.State).To(Equal(boruta.BUSY))
wl.mutex.Unlock()
setTrigger(1)
}),
It("should fail to SetState if dryadClientManager fails to create client", func() {
dcm.EXPECT().Create(info.dryad).Return(testerr).Do(func(*net.TCPAddr) {
wl.mutex.Lock()
- info.State = boruta.WorkerState("TEST")
+ Expect(info.State).To(Equal(boruta.BUSY))
wl.mutex.Unlock()
setTrigger(2)
})
})
})
}
+ })
+ Describe("withBackgroundContext", func() {
+ var bc, noWorkerBc backgroundContext
+ var c chan boruta.WorkerState
+ var testState boruta.WorkerState = boruta.RUN
+ var opEmpty = pendingOperation{}
+ var opClosed = pendingOperation{got: true}
+ var opState = pendingOperation{got: true, open: true, state: testState}
+
+ BeforeEach(func() {
+ c = make(chan boruta.WorkerState, backgroundOperationsBufferSize)
+ bc = backgroundContext{
+ c: c,
+ uuid: worker,
+ }
+ noWorkerBc = backgroundContext{
+ c: c,
+ uuid: noWorker,
+ }
+ })
+
+ AfterEach(func() {
+ if c != nil {
+ close(c)
+ }
+ })
+
Describe("putInMaintenance", func() {
It("should work", func() {
gomock.InOrder(
dcm.EXPECT().Close(),
)
- err := wl.putInMaintenance(worker)
+ op, err := wl.putInMaintenance(bc)
Expect(err).ToNot(HaveOccurred())
+ Expect(op).To(Equal(opEmpty))
})
It("should fail if dryadClientManager fails to put dryad in maintenance state", func() {
dcm.EXPECT().Close(),
)
- err := wl.putInMaintenance(worker)
+ op, err := wl.putInMaintenance(bc)
Expect(err).To(Equal(testerr))
+ Expect(op).To(Equal(opEmpty))
})
It("should fail if dryadClientManager fails to create client", func() {
dcm.EXPECT().Create(info.dryad).Return(testerr)
- err := wl.putInMaintenance(worker)
+ op, err := wl.putInMaintenance(bc)
Expect(err).To(Equal(testerr))
+ Expect(op).To(Equal(opEmpty))
})
It("should fail if worker is not registered", func() {
- err := wl.putInMaintenance(noWorker)
+ op, err := wl.putInMaintenance(noWorkerBc)
Expect(err).To(Equal(ErrWorkerNotFound))
+ Expect(op).To(Equal(opEmpty))
+ })
+
+ It("should break execution when channel is used before execution", func() {
+ c <- testState
+
+ op, err := wl.putInMaintenance(bc)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(op).To(Equal(opState))
+ })
+
+ It("should break execution when channel is used during execution", func() {
+ gomock.InOrder(
+ dcm.EXPECT().Create(info.dryad).Do(func(*net.TCPAddr) {
+ c <- testState
+ }),
+ dcm.EXPECT().Close(),
+ )
+
+ op, err := wl.putInMaintenance(bc)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(op).To(Equal(opState))
+ })
+
+ It("should break execution when channel is closed before execution", func() {
+ close(c)
+ c = nil
+
+ op, err := wl.putInMaintenance(bc)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(op).To(Equal(opClosed))
+ })
+
+ It("should break execution when channel is closed during execution", func() {
+ gomock.InOrder(
+ dcm.EXPECT().Create(info.dryad).Do(func(*net.TCPAddr) {
+ close(c)
+ c = nil
+ }),
+ dcm.EXPECT().Close(),
+ )
+
+ op, err := wl.putInMaintenance(bc)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(op).To(Equal(opClosed))
+ })
+ })
+
+ Describe("prepareKeyAndSetState", func() {
+ It("should ignore if worker is not registered", func() {
+ op := wl.prepareKeyAndSetState(noWorkerBc)
+ Expect(op).To(Equal(opEmpty))
+ })
+
+ It("should return immediately if new operation is pending on channel", func() {
+ c <- testState
+ op := wl.prepareKeyAndSetState(bc)
+ Expect(op).To(Equal(opState))
+ })
+
+ It("should return immediately if channel is closed", func() {
+ close(c)
+ c = nil
+ op := wl.prepareKeyAndSetState(bc)
+ Expect(op).To(Equal(opClosed))
})
+
+ It("should return if new operation appeared on channel after prepareKey is completed", func() {
+ gomock.InOrder(
+ dcm.EXPECT().Create(info.dryad),
+ dcm.EXPECT().Prepare(gomock.Any()),
+ dcm.EXPECT().Close().Do(func() {
+ c <- testState
+ }),
+ )
+
+ op := wl.prepareKeyAndSetState(bc)
+ Expect(op).To(Equal(opState))
+ })
+
+ It("should return if channel is closed after prepareKey is completed", func() {
+ gomock.InOrder(
+ dcm.EXPECT().Create(info.dryad),
+ dcm.EXPECT().Prepare(gomock.Any()),
+ dcm.EXPECT().Close().Do(func() {
+ close(c)
+ c = nil
+ }),
+ )
+
+ op := wl.prepareKeyAndSetState(bc)
+ Expect(op).To(Equal(opClosed))
+ })
+ })
+
+ Describe("putInMaintenanceWorker", func() {
+ It("should ignore if worker is not registered", func() {
+ op := wl.putInMaintenanceWorker(noWorkerBc)
+ Expect(op).To(Equal(opEmpty))
+ })
+
+ It("should return immediately if new operation is pending on channel", func() {
+ c <- testState
+ op := wl.putInMaintenanceWorker(bc)
+ Expect(op).To(Equal(opState))
+ })
+
+ It("should return immediately if channel is closed", func() {
+ close(c)
+ c = nil
+ op := wl.putInMaintenanceWorker(bc)
+ Expect(op).To(Equal(opClosed))
+ })
+
+ It("should return if new operation appeared on channel after putInMaintenance is completed", func() {
+ gomock.InOrder(
+ dcm.EXPECT().Create(info.dryad),
+ dcm.EXPECT().PutInMaintenance(putStr),
+ dcm.EXPECT().Close().Do(func() {
+ c <- testState
+ }),
+ )
+
+ op := wl.putInMaintenanceWorker(bc)
+ Expect(op).To(Equal(opState))
+ })
+
+ It("should return if channel is closed after putInMaintenance is completed", func() {
+ gomock.InOrder(
+ dcm.EXPECT().Create(info.dryad),
+ dcm.EXPECT().PutInMaintenance(putStr),
+ dcm.EXPECT().Close().Do(func() {
+ close(c)
+ c = nil
+ }),
+ )
+
+ op := wl.putInMaintenanceWorker(bc)
+ Expect(op).To(Equal(opClosed))
+ })
+ })
+
+ Describe("checkPendingOperation", func() {
+ It("should return got=false when nothing happens on channel", func() {
+ op := checkPendingOperation(c)
+ Expect(op).To(Equal(opEmpty))
+ })
+
+ It("should return state when new message appeared on channel", func() {
+ c <- testState
+ op := checkPendingOperation(c)
+ Expect(op).To(Equal(opState))
+ })
+
+ It("should return last state when new message appeared on channel", func() {
+ c <- boruta.FAIL
+ c <- boruta.BUSY
+ c <- boruta.FAIL
+ c <- testState
+ op := checkPendingOperation(c)
+ Expect(op).To(Equal(opState))
+ })
+
+ It("should return open=false when channel is closed", func() {
+ close(c)
+ defer func() { c = nil }()
+ op := checkPendingOperation(c)
+ Expect(op).To(Equal(opClosed))
+ })
+ })
+
+ Describe("backgroundLoop", func() {
+ var done bool
+ var m sync.Locker
+ isDone := func() bool {
+ m.Lock()
+ defer m.Unlock()
+ return done
+ }
+ setDone := func(val bool) {
+ m.Lock()
+ defer m.Unlock()
+ done = val
+ }
+ run := func() {
+ defer GinkgoRecover()
+ wl.backgroundLoop(bc)
+ setDone(true)
+ }
+ ignoredStates := []boruta.WorkerState{boruta.MAINTENANCE, boruta.IDLE, boruta.RUN, boruta.FAIL}
+
+ BeforeEach(func() {
+ m = new(sync.Mutex)
+ setDone(false)
+ })
+
+ It("should run infinitely, but stop if channel is closed", func() {
+ go run()
+ Consistently(isDone).Should(BeFalse())
+ close(c)
+ c = nil
+ Eventually(isDone).Should(BeTrue())
+ })
+
+ It("should ignore states not requiring any action and return after channel is closed", func() {
+ go run()
+ for _, state := range ignoredStates {
+ By(string(state))
+ c <- state
+ }
+ // Check that loop is still running.
+ Consistently(isDone).Should(BeFalse())
+
+ // Check loop returns after channel is closed.
+ close(c)
+ c = nil
+ Eventually(isDone).Should(BeTrue())
+ })
+
+ It("should run prepareKey if PREPARE state is sent over channel", func() {
+ go run()
+ gomock.InOrder(
+ dcm.EXPECT().Create(info.dryad),
+ dcm.EXPECT().Prepare(gomock.Any()),
+ dcm.EXPECT().Close().Do(func() {
+ close(c)
+ c = nil
+ }),
+ )
+ c <- boruta.PREPARE
+ Eventually(isDone).Should(BeTrue())
+ })
+
+ It("should run putInMaintenance if BUSY state is sent over channel", func() {
+ go run()
+ gomock.InOrder(
+ dcm.EXPECT().Create(info.dryad),
+ dcm.EXPECT().PutInMaintenance(putStr),
+ dcm.EXPECT().Close().Do(func() {
+ close(c)
+ c = nil
+ }),
+ )
+ c <- boruta.BUSY
+ Eventually(isDone).Should(BeTrue())
+ })
+
+ DescribeTable("should break running current background task if other state is sent over channel",
+ func(current, another boruta.WorkerState) {
+ go run()
+ gomock.InOrder(
+ dcm.EXPECT().Create(info.dryad).DoAndReturn(func(*net.TCPAddr) error {
+ c <- another
+ return nil
+ }),
+ dcm.EXPECT().Close().Do(func() {
+ close(c)
+ c = nil
+ }),
+ )
+ c <- current
+ Eventually(isDone).Should(BeTrue())
+ },
+ Entry("PREPARE->MAINTENANCE", boruta.PREPARE, boruta.MAINTENANCE),
+ Entry("PREPARE->IDLE", boruta.PREPARE, boruta.IDLE),
+ Entry("PREPARE->RUN", boruta.PREPARE, boruta.RUN),
+ Entry("PREPARE->FAIL", boruta.PREPARE, boruta.FAIL),
+ Entry("BUSY->MAINTENANCE", boruta.BUSY, boruta.MAINTENANCE),
+ Entry("BUSY->IDLE", boruta.BUSY, boruta.IDLE),
+ Entry("BUSY->RUN", boruta.BUSY, boruta.RUN),
+ Entry("BUSY->FAIL", boruta.BUSY, boruta.FAIL),
+ )
+
+ DescribeTable("should break running current background task and start prepareKey if PREPARE is sent over channel",
+ func(current boruta.WorkerState) {
+ go run()
+ gomock.InOrder(
+ dcm.EXPECT().Create(info.dryad).DoAndReturn(func(*net.TCPAddr) error {
+ c <- boruta.PREPARE
+ return nil
+ }),
+ dcm.EXPECT().Close(),
+ dcm.EXPECT().Create(info.dryad),
+ dcm.EXPECT().Prepare(gomock.Any()),
+ dcm.EXPECT().Close().Do(func() {
+ close(c)
+ c = nil
+ }),
+ )
+ c <- current
+ Eventually(isDone).Should(BeTrue())
+ },
+ Entry("PREPARE", boruta.PREPARE),
+ Entry("BUSY", boruta.BUSY),
+ )
+
+ DescribeTable("should break running current background task and start putInMaintenance if BUSY is sent over channel",
+ func(current boruta.WorkerState) {
+ go run()
+ gomock.InOrder(
+ dcm.EXPECT().Create(info.dryad).DoAndReturn(func(*net.TCPAddr) error {
+ c <- boruta.BUSY
+ return nil
+ }),
+ dcm.EXPECT().Close(),
+ dcm.EXPECT().Create(info.dryad),
+ dcm.EXPECT().PutInMaintenance(putStr),
+ dcm.EXPECT().Close().Do(func() {
+ close(c)
+ c = nil
+ }),
+ )
+ c <- current
+ Eventually(isDone).Should(BeTrue())
+ },
+ Entry("PREPARE", boruta.PREPARE),
+ Entry("BUSY", boruta.BUSY),
+ )
})
})
})
Expect(wl.workers[worker].Groups).To(BeNil())
wl.mutex.RUnlock()
})
+
Describe("SetGroup with ChangeListener", func() {
var ctrl *gomock.Controller
var wc *MockWorkerChange
Expect(err).ToNot(HaveOccurred())
})
It("should not notify changeListener if set and worker's state is other than IDLE", func() {
- for _, state := range []boruta.WorkerState{boruta.MAINTENANCE, boruta.FAIL, boruta.RUN} {
+ for _, state := range []boruta.WorkerState{boruta.MAINTENANCE, boruta.FAIL, boruta.RUN, boruta.PREPARE, boruta.BUSY} {
By(string(state))
wl.mutex.RLock()
})
})
It("should not notify changeListener if not set", func() {
- for _, state := range []boruta.WorkerState{boruta.MAINTENANCE, boruta.FAIL, boruta.RUN, boruta.IDLE} {
+ for _, state := range []boruta.WorkerState{boruta.MAINTENANCE, boruta.FAIL, boruta.RUN, boruta.IDLE, boruta.PREPARE, boruta.BUSY} {
By(string(state))
wl.mutex.RLock()
ctrl.Finish()
})
- It("should set worker into IDLE in without-key preparation", func() {
+ It("should set worker in RUN state into IDLE in without-key preparation", func() {
+ wl.mutex.RLock()
+ wl.workers[worker].State = boruta.RUN
+ wl.mutex.RUnlock()
err := wl.PrepareWorker(worker, false)
Expect(err).NotTo(HaveOccurred())
wl.mutex.RLock()
Expect(ok).To(BeTrue())
Expect(info.State).To(Equal(boruta.IDLE))
})
+
+ DescribeTable("should fail to set worker into IDLE in without-key preparation",
+ func(from boruta.WorkerState) {
+ wl.mutex.RLock()
+ wl.workers[worker].State = from
+ wl.mutex.RUnlock()
+ err := wl.PrepareWorker(worker, false)
+ Expect(err).To(Equal(ErrForbiddenStateChange))
+ wl.mutex.RLock()
+ info, ok := wl.workers[worker]
+ wl.mutex.RUnlock()
+ Expect(ok).To(BeTrue())
+ Expect(info.State).To(Equal(from))
+ },
+ Entry("MAINTENANCE", boruta.MAINTENANCE),
+ Entry("IDLE", boruta.IDLE),
+ Entry("FAIL", boruta.FAIL),
+ Entry("PREPARE", boruta.PREPARE),
+ Entry("BUSY", boruta.BUSY),
+ )
+
It("should fail to prepare not existing worker in without-key preparation", func() {
- uuid := randomUUID()
- err := wl.PrepareWorker(uuid, false)
+ err := wl.PrepareWorker(noWorker, false)
Expect(err).To(Equal(ErrWorkerNotFound))
})
- It("should ignore to prepare worker for non-existing worker", func() {
+
+ It("should fail to prepare not existing worker in with-key preparation", func() {
err := wl.PrepareWorker(noWorker, true)
- Expect(err).NotTo(HaveOccurred())
+ Expect(err).To(Equal(ErrWorkerNotFound))
})
+
Describe("with worker's IP set", func() {
var info *mapWorker
+
BeforeEach(func() {
var ok bool
info, ok = wl.workers[worker]
Expect(info.key).To(BeNil())
info.dryad = new(net.TCPAddr)
info.dryad.IP = ip
+ info.State = boruta.RUN
})
+
It("should set worker into IDLE state and prepare a key", func() {
gomock.InOrder(
dcm.EXPECT().Create(info.dryad),
dcm.EXPECT().Prepare(gomock.Any()).Return(nil),
- dcm.EXPECT().Close(),
+ dcm.EXPECT().Close().Do(func() {
+ wl.mutex.Lock()
+ Expect(info.State).To(Equal(boruta.PREPARE))
+ wl.mutex.Unlock()
+ }),
)
err := wl.PrepareWorker(worker, true)
eventuallyState(info, boruta.IDLE)
eventuallyKey(info, Not(Equal(&rsa.PrivateKey{})))
})
+
It("should fail to prepare worker if dryadClientManager fails to prepare client", func() {
gomock.InOrder(
dcm.EXPECT().Create(info.dryad),
dcm.EXPECT().Prepare(gomock.Any()).Return(testerr),
- dcm.EXPECT().Close(),
+ dcm.EXPECT().Close().Do(func() {
+ wl.mutex.Lock()
+ Expect(info.State).To(Equal(boruta.PREPARE))
+ wl.mutex.Unlock()
+ }),
)
err := wl.PrepareWorker(worker, true)
eventuallyState(info, boruta.FAIL)
Expect(info.key).To(BeNil())
})
+
It("should fail to prepare worker if dryadClientManager fails to create client", func() {
- dcm.EXPECT().Create(info.dryad).Return(testerr)
+ dcm.EXPECT().Create(info.dryad).DoAndReturn(func(*net.TCPAddr) error {
+ wl.mutex.Lock()
+ Expect(info.State).To(Equal(boruta.PREPARE))
+ wl.mutex.Unlock()
+ return testerr
+ })
err := wl.PrepareWorker(worker, true)
Expect(err).NotTo(HaveOccurred())
eventuallyState(info, boruta.FAIL)
Expect(info.key).To(BeNil())
})
+
+ It("should fail to prepare worker if key generation fails", func() {
+ gomock.InOrder(
+ dcm.EXPECT().Create(info.dryad),
+ dcm.EXPECT().Close().Do(func() {
+ wl.mutex.Lock()
+ Expect(info.State).To(Equal(boruta.PREPARE))
+ wl.mutex.Unlock()
+ }),
+ )
+
+ sizeRSA = 1
+ err := wl.PrepareWorker(worker, true)
+ Expect(err).NotTo(HaveOccurred())
+
+ eventuallyState(info, boruta.FAIL)
+ Expect(info.key).To(BeNil())
+ })
})
})
Expect(wl.workers[worker].State).To(Equal(state))
wl.mutex.RUnlock()
}
+ swap := func(newChan chan boruta.WorkerState) (oldChan chan boruta.WorkerState) {
+ wl.mutex.Lock()
+ oldChan = wl.workers[worker].backgroundOperation
+ wl.workers[worker].backgroundOperation = newChan
+ wl.mutex.Unlock()
+ return
+ }
BeforeEach(func() {
ctrl = gomock.NewController(GinkgoT())
wc = NewMockWorkerChange(ctrl)
DescribeTable("Should change state without calling changeListener",
func(from, to boruta.WorkerState) {
set(from)
+ newChan := make(chan boruta.WorkerState, backgroundOperationsBufferSize)
+ oldChan := swap(newChan)
+ defer swap(oldChan)
+
err := wl.setState(worker, to)
Expect(err).NotTo(HaveOccurred())
check(to)
+ Eventually(newChan).Should(Receive(Equal(to)))
},
Entry("MAINTENANCE->MAINTENANCE", boruta.MAINTENANCE, boruta.MAINTENANCE),
Entry("MAINTENANCE->RUN", boruta.MAINTENANCE, boruta.RUN),
Entry("MAINTENANCE->FAIL", boruta.MAINTENANCE, boruta.FAIL),
+ Entry("MAINTENANCE->PREPARE", boruta.MAINTENANCE, boruta.PREPARE),
+ Entry("MAINTENANCE->BUSY", boruta.MAINTENANCE, boruta.BUSY),
Entry("IDLE->MAINTENANCE", boruta.IDLE, boruta.MAINTENANCE),
Entry("IDLE->RUN", boruta.IDLE, boruta.RUN),
Entry("IDLE->FAIL", boruta.IDLE, boruta.FAIL),
+ Entry("IDLE->PREPARE", boruta.IDLE, boruta.PREPARE),
+ Entry("IDLE->BUSY", boruta.IDLE, boruta.BUSY),
+ Entry("RUN->PREPARE", boruta.RUN, boruta.PREPARE),
Entry("FAIL->MAINTENANCE", boruta.FAIL, boruta.MAINTENANCE),
Entry("FAIL->RUN", boruta.FAIL, boruta.RUN),
Entry("FAIL->FAIL", boruta.FAIL, boruta.FAIL),
+ Entry("FAIL->PREPARE", boruta.FAIL, boruta.PREPARE),
+ Entry("FAIL->BUSY", boruta.FAIL, boruta.BUSY),
+ Entry("PREPARE->MAINTENANCE", boruta.PREPARE, boruta.MAINTENANCE),
+ Entry("PREPARE->RUN", boruta.PREPARE, boruta.RUN),
+ Entry("PREPARE->FAIL", boruta.PREPARE, boruta.FAIL),
+ Entry("PREPARE->PREPARE", boruta.PREPARE, boruta.PREPARE),
+ Entry("PREPARE->BUSY", boruta.PREPARE, boruta.BUSY),
+ Entry("BUSY->MAINTENANCE", boruta.BUSY, boruta.MAINTENANCE),
+ Entry("BUSY->RUN", boruta.BUSY, boruta.RUN),
+ Entry("BUSY->FAIL", boruta.BUSY, boruta.FAIL),
+ Entry("BUSY->PREPARE", boruta.BUSY, boruta.PREPARE),
+ Entry("BUSY->BUSY", boruta.BUSY, boruta.BUSY),
)
+
DescribeTable("Should change state and call OnWorkerIdle",
func(from, to boruta.WorkerState) {
set(from)
+ newChan := make(chan boruta.WorkerState, backgroundOperationsBufferSize)
+ oldChan := swap(newChan)
+ defer swap(oldChan)
wc.EXPECT().OnWorkerIdle(worker)
+
err := wl.setState(worker, to)
Expect(err).NotTo(HaveOccurred())
check(to)
+ Eventually(newChan).Should(Receive(Equal(to)))
},
Entry("MAINTENANCE->IDLE", boruta.MAINTENANCE, boruta.IDLE),
Entry("IDLE->IDLE", boruta.IDLE, boruta.IDLE),
Entry("RUN->IDLE", boruta.RUN, boruta.IDLE),
Entry("FAIL->IDLE", boruta.FAIL, boruta.IDLE),
+ Entry("PREPARE->IDLE", boruta.PREPARE, boruta.IDLE),
+ Entry("BUSY->IDLE", boruta.BUSY, boruta.IDLE),
)
+
DescribeTable("Should change state and call OnWorkerFail",
func(from, to boruta.WorkerState) {
set(from)
+ newChan := make(chan boruta.WorkerState, backgroundOperationsBufferSize)
+ oldChan := swap(newChan)
+ defer swap(oldChan)
wc.EXPECT().OnWorkerFail(worker)
+
err := wl.setState(worker, to)
Expect(err).NotTo(HaveOccurred())
check(to)
+ Eventually(newChan).Should(Receive(Equal(to)))
},
Entry("RUN->MAINTENANCE", boruta.RUN, boruta.MAINTENANCE),
Entry("RUN->RUN", boruta.RUN, boruta.RUN),
Entry("RUN->FAIL", boruta.RUN, boruta.FAIL),
+ Entry("RUN->BUSY", boruta.RUN, boruta.BUSY),
)
})
})
+
Describe("TakeBestMatchingWorker", func() {
addWorker := func(groups boruta.Groups, caps boruta.Capabilities) *mapWorker {
capsUUID := getUUID()
}
addIdleWorker := func(groups boruta.Groups, caps boruta.Capabilities) *mapWorker {
w := addWorker(groups, caps)
-
- err := wl.PrepareWorker(w.WorkerUUID, false)
- Expect(err).NotTo(HaveOccurred())
- Expect(w.State).To(Equal(boruta.IDLE))
-
+ w.State = boruta.IDLE
return w
}
generateGroups := func(count int) boruta.Groups {
}
return caps
}
+
It("should fail to find matching worker when there are no workers", func() {
ret, err := wl.TakeBestMatchingWorker(boruta.Groups{}, boruta.Capabilities{})
Expect(err).To(Equal(ErrNoMatchingWorker))
Expect(ret).To(BeZero())
})
+
It("should match fitting worker and set it into RUN state", func() {
w := addIdleWorker(boruta.Groups{}, boruta.Capabilities{})
Expect(ret).To(Equal(w.WorkerUUID))
Expect(w.State).To(Equal(boruta.RUN))
})
+
It("should not match not IDLE workers", func() {
addWorker(boruta.Groups{}, boruta.Capabilities{})
Expect(err).To(Equal(ErrNoMatchingWorker))
Expect(ret).To(BeZero())
})
+
It("should choose least capable worker", func() {
// Create matching workers.
w5g5c := addIdleWorker(generateGroups(5), generateCaps(5))
}
})
})
+
Describe("SetChangeListener", func() {
It("should set WorkerChange", func() {
ctrl := gomock.NewController(GinkgoT())
// It is a variable for test purposes.
var sizeRSA = 4096
+// backgroundOperationsBufferSize defines buffer size of the channel
+// used for communication with background goroutine launched for every
+// registered worker. The goroutine processes long operations like:
+// preparation of Dryad to work or putting it into maintenance state.
+// Goroutines related with API calls use channel to initiate background
+// operations. Only one operation is run at the same time. The buffer
+// on channel allows non-blocking delegation of these operations.
+const backgroundOperationsBufferSize int = 32
+
+// pendingOperation describes status of reader's end of the channel used by
+// background routine.
+// The got flag indicates if there is any operation pending on channel.
+// Only if got is set to true, other fields should be analyzed.
+// The open flag indicates if channel is still open. It is set to false
+// when channel was closed on writer side (during deregistration of worker).
+// The state field contains new state that worker was switched to.
+type pendingOperation struct {
+ state boruta.WorkerState
+ open bool
+ got bool
+}
+
+// backgroundContext aggregates data required by functions running in background
+// goroutine to identify context of proper worker. The context is build of:
+// c - a reader's end of channel;
+// uuid - identificator of worker.
+type backgroundContext struct {
+ c <-chan boruta.WorkerState
+ uuid boruta.WorkerUUID
+}
+
// mapWorker is used by WorkerList to store all
// (public and private) structures representing Worker.
type mapWorker struct {
boruta.WorkerInfo
- dryad *net.TCPAddr
- sshd *net.TCPAddr
- ip net.IP
- key *rsa.PrivateKey
+ dryad *net.TCPAddr
+ sshd *net.TCPAddr
+ ip net.IP
+ key *rsa.PrivateKey
+ backgroundOperation chan boruta.WorkerState
}
// WorkerList implements Superviser and Workers interfaces.
worker.dryad = dryad
worker.sshd = sshd
} else {
+ c := make(chan boruta.WorkerState, backgroundOperationsBufferSize)
wl.workers[uuid] = &mapWorker{
WorkerInfo: boruta.WorkerInfo{
WorkerUUID: uuid,
State: boruta.MAINTENANCE,
Caps: caps,
},
- dryad: dryad,
- sshd: sshd,
+ dryad: dryad,
+ sshd: sshd,
+ backgroundOperation: c,
}
+ go wl.backgroundLoop(backgroundContext{c: c, uuid: uuid})
}
return nil
}
if !ok {
return ErrWorkerNotFound
}
- if worker.State == boruta.MAINTENANCE {
+ // Ignore entering FAIL state if administrator started maintenance already.
+ if worker.State == boruta.MAINTENANCE || worker.State == boruta.BUSY {
return ErrInMaintenance
}
return wl.setState(uuid, boruta.FAIL)
if !ok {
return ErrWorkerNotFound
}
+ // Do nothing if transition to MAINTENANCE state is already ongoing.
+ if state == boruta.MAINTENANCE && worker.State == boruta.BUSY {
+ return nil
+ }
+ // Do nothing if transition to IDLE state is already ongoing.
+ if state == boruta.IDLE && worker.State == boruta.PREPARE {
+ return nil
+ }
// State transitions to IDLE are allowed from MAINTENANCE state only.
if state == boruta.IDLE && worker.State != boruta.MAINTENANCE {
return ErrForbiddenStateChange
}
switch state {
case boruta.IDLE:
- go wl.prepareKeyAndSetState(uuid)
+ wl.setState(uuid, boruta.PREPARE)
case boruta.MAINTENANCE:
- go wl.putInMaintenanceWorker(uuid)
+ wl.setState(uuid, boruta.BUSY)
}
return nil
}
if worker.State != boruta.MAINTENANCE {
return ErrNotInMaintenance
}
+ close(worker.backgroundOperation)
delete(wl.workers, uuid)
return nil
}
wl.mutex.RLock()
defer wl.mutex.RUnlock()
- return wl.listWorkers(groups, caps)
+ return wl.listWorkers(groups, caps, false)
}
// listWorkers lists all workers when both:
// * any of the groups is matching (or groups is nil)
// * all of the caps is matching (or caps is nil)
// Caller of this method should own the mutex.
-func (wl *WorkerList) listWorkers(groups boruta.Groups, caps boruta.Capabilities) ([]boruta.WorkerInfo, error) {
+func (wl *WorkerList) listWorkers(groups boruta.Groups, caps boruta.Capabilities, onlyIdle bool) ([]boruta.WorkerInfo, error) {
matching := make([]boruta.WorkerInfo, 0, len(wl.workers))
groupsMatcher := make(map[boruta.Group]interface{})
for _, worker := range wl.workers {
if isGroupsMatching(worker.WorkerInfo, groupsMatcher) &&
isCapsMatching(worker.WorkerInfo, caps) {
+ if onlyIdle && (worker.State != boruta.IDLE) {
+ continue
+ }
matching = append(matching, worker.WorkerInfo)
}
}
var bestScore = math.MaxInt32
- matching, _ := wl.listWorkers(groups, caps)
+ matching, _ := wl.listWorkers(groups, caps, true)
for _, info := range matching {
- if info.State != boruta.IDLE {
- continue
- }
score := len(info.Caps) + len(info.Groups)
if score < bestScore {
bestScore = score
// As key creation can take some time, the method is asynchronous and the worker's
// state might not be changed when it returns.
// It is a part of WorkersManager interface implementation by WorkerList.
-func (wl *WorkerList) PrepareWorker(worker boruta.WorkerUUID, withKeyGeneration bool) error {
+func (wl *WorkerList) PrepareWorker(uuid boruta.WorkerUUID, withKeyGeneration bool) error {
+ wl.mutex.Lock()
+ defer wl.mutex.Unlock()
+
+ worker, ok := wl.workers[uuid]
+ if !ok {
+ return ErrWorkerNotFound
+ }
+ if worker.State != boruta.RUN {
+ return ErrForbiddenStateChange
+ }
+
if !withKeyGeneration {
- wl.mutex.Lock()
- defer wl.mutex.Unlock()
- return wl.setState(worker, boruta.IDLE)
+ return wl.setState(uuid, boruta.IDLE)
}
+ return wl.setState(uuid, boruta.PREPARE)
+}
- go wl.prepareKeyAndSetState(worker)
+// setState changes state of worker. It does not contain any verification if change
+// is feasible. It should be used only for internal boruta purposes. It must be
+// called inside WorkerList critical section guarded by WorkerList.mutex.
+func (wl *WorkerList) setState(uuid boruta.WorkerUUID, state boruta.WorkerState) error {
+ worker, ok := wl.workers[uuid]
+ if !ok {
+ return ErrWorkerNotFound
+ }
+ // Send information about changing state to the background loop to possible break some operations.
+ worker.backgroundOperation <- state
+ if wl.changeListener != nil {
+ if state == boruta.IDLE {
+ wl.changeListener.OnWorkerIdle(uuid)
+ }
+ // Inform that Job execution was possibly broken when changing RUN state
+ // to any other than IDLE or PREPARE.
+ if worker.State == boruta.RUN && state != boruta.IDLE && state != boruta.PREPARE {
+ wl.changeListener.OnWorkerFail(uuid)
+ }
+ }
+ worker.State = state
return nil
}
// prepareKeyAndSetState prepares private RSA key for the worker and sets worker
// into IDLE state in case of success. In case of failure of key preparation,
// worker is put into FAIL state instead.
-func (wl *WorkerList) prepareKeyAndSetState(worker boruta.WorkerUUID) {
- err := wl.prepareKey(worker)
- wl.mutex.Lock()
- defer wl.mutex.Unlock()
- if err != nil {
- // TODO log error.
- wl.setState(worker, boruta.FAIL)
+func (wl *WorkerList) prepareKeyAndSetState(bc backgroundContext) (op pendingOperation) {
+ var err error
+ op, err = wl.prepareKey(bc)
+ if op.got {
return
}
- wl.setState(worker, boruta.IDLE)
-}
-// putInMaintenanceWorker puts Dryad into maintenance mode and sets worker
-// into MAINTENANCE state in case of success. In case of failure of entering
-// maintenance mode, worker is put into FAIL state instead.
-func (wl *WorkerList) putInMaintenanceWorker(worker boruta.WorkerUUID) {
- err := wl.putInMaintenance(worker)
wl.mutex.Lock()
defer wl.mutex.Unlock()
- if err != nil {
- wl.setState(worker, boruta.FAIL)
+
+ if op = checkPendingOperation(bc.c); op.got {
return
}
- wl.setState(worker, boruta.MAINTENANCE)
-}
-// setState changes state of worker. It does not contain any verification if change
-// is feasible. It should be used only for internal boruta purposes. It must be
-// called inside WorkerList critical section guarded by WorkerList.mutex.
-func (wl *WorkerList) setState(worker boruta.WorkerUUID, state boruta.WorkerState) error {
- w, ok := wl.workers[worker]
- if !ok {
- return ErrWorkerNotFound
- }
- if wl.changeListener != nil {
- if state == boruta.IDLE {
- wl.changeListener.OnWorkerIdle(worker)
- } else {
- if w.State == boruta.RUN {
- wl.changeListener.OnWorkerFail(worker)
- }
- }
+ if err != nil {
+ // TODO log error.
+ wl.setState(bc.uuid, boruta.FAIL)
+ return
}
- w.State = state
- return nil
+ wl.setState(bc.uuid, boruta.IDLE)
+ return
}
// prepareKey generates key, installs public part on worker and stores private part in WorkerList.
-func (wl *WorkerList) prepareKey(worker boruta.WorkerUUID) error {
- addr, err := wl.getWorkerAddr(worker)
- if err != nil {
- return err
+func (wl *WorkerList) prepareKey(bc backgroundContext) (op pendingOperation, err error) {
+ if op = checkPendingOperation(bc.c); op.got {
+ return
+ }
+ addr, err := wl.getWorkerAddr(bc.uuid)
+ if op = checkPendingOperation(bc.c); op.got || err != nil {
+ return
}
client := wl.newDryadClient()
err = client.Create(&addr)
if err != nil {
- return err
+ op = checkPendingOperation(bc.c)
+ return
}
defer client.Close()
+ if op = checkPendingOperation(bc.c); op.got {
+ return
+ }
key, err := rsa.GenerateKey(rand.Reader, sizeRSA)
- if err != nil {
- return err
+ if op = checkPendingOperation(bc.c); op.got || err != nil {
+ return
}
pubKey, err := ssh.NewPublicKey(&key.PublicKey)
- if err != nil {
- return err
+ if op = checkPendingOperation(bc.c); op.got || err != nil {
+ return
}
err = client.Prepare(&pubKey)
+ if op = checkPendingOperation(bc.c); op.got || err != nil {
+ return
+ }
+ err = wl.setWorkerKey(bc.uuid, key)
+ op = checkPendingOperation(bc.c)
+ return
+}
+
+// putInMaintenanceWorker puts Dryad into maintenance mode and sets worker
+// into MAINTENANCE state in case of success. In case of failure of entering
+// maintenance mode, worker is put into FAIL state instead.
+func (wl *WorkerList) putInMaintenanceWorker(bc backgroundContext) (op pendingOperation) {
+ var err error
+ op, err = wl.putInMaintenance(bc)
+ if op.got {
+ return
+ }
+
+ wl.mutex.Lock()
+ defer wl.mutex.Unlock()
+
+ if op = checkPendingOperation(bc.c); op.got {
+ return
+ }
+
if err != nil {
- return err
+ // TODO log error.
+ wl.setState(bc.uuid, boruta.FAIL)
+ return
}
- err = wl.setWorkerKey(worker, key)
- return err
+ wl.setState(bc.uuid, boruta.MAINTENANCE)
+ return
}
// putInMaintenance orders Dryad to enter maintenance mode.
-func (wl *WorkerList) putInMaintenance(worker boruta.WorkerUUID) error {
- addr, err := wl.getWorkerAddr(worker)
- if err != nil {
- return err
+func (wl *WorkerList) putInMaintenance(bc backgroundContext) (op pendingOperation, err error) {
+ if op = checkPendingOperation(bc.c); op.got {
+ return
+ }
+ addr, err := wl.getWorkerAddr(bc.uuid)
+ if op = checkPendingOperation(bc.c); op.got || err != nil {
+ return
}
client := wl.newDryadClient()
err = client.Create(&addr)
if err != nil {
- return err
+ op = checkPendingOperation(bc.c)
+ return
}
defer client.Close()
- return client.PutInMaintenance("maintenance")
+ if op = checkPendingOperation(bc.c); op.got {
+ return
+ }
+ err = client.PutInMaintenance("maintenance")
+ op = checkPendingOperation(bc.c)
+ return
}
// SetChangeListener sets change listener object in WorkerList. Listener should be
func (wl *WorkerList) SetChangeListener(listener WorkerChange) {
wl.changeListener = listener
}
+
+// checkPendingOperation verifies status of the communication channel in a non-blocking way.
+// It returns pendingOperation structure containing status of the channel.
+func checkPendingOperation(c <-chan boruta.WorkerState) (op pendingOperation) {
+ for {
+ select {
+ case op.state, op.open = <-c:
+ op.got = true
+ if !op.open {
+ return
+ }
+ default:
+ return
+ }
+ }
+}
+
+// backgroundLoop is the main procedure of a background goroutine launched for every registered
+// worker. It hangs on channel, waiting for new worker state to be processed or for close
+// of the channel (when worker was deregistered).
+// If new state is received, proper long operation is launched.
+// If long operation has been broken by appearance of the new state on channel or channel closure,
+// new state is processed immediately.
+// Procedure ends, when channel is closed.
+func (wl *WorkerList) backgroundLoop(bc backgroundContext) {
+ var op pendingOperation
+
+ for {
+ if !op.got {
+ op.state, op.open = <-bc.c
+ }
+ if !op.open {
+ // Worker has been deregistered. Ending background loop.
+ return
+ }
+ // Clear op.got flag as we consume received state.
+ op.got = false
+ switch op.state {
+ case boruta.PREPARE:
+ op = wl.prepareKeyAndSetState(bc)
+ case boruta.BUSY:
+ op = wl.putInMaintenanceWorker(bc)
+ }
+ }
+}