From: Lukasz Wojciechowski Date: Fri, 27 Oct 2017 19:41:56 +0000 (+0200) Subject: Implement WorkersManager interface in WorkerList X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=09a98342d046e1ce032e28a8187d4f2de9f38f09;p=tools%2Fboruta.git Implement WorkersManager interface in WorkerList Implementation of WorkersManager from matcher package makes WorkerList usable as interface for acquiring workers by Matcher. The implemnetation adds 2 new fields: * changeListener which is notified after Worker's state changes; * newDryadClient which provides dryad.ClientManager. The implementation is covered by tests. 2 mock up types: MockDryadClientManager and MockWorkerChange for mocking up rpc.dryad.ClientManager and WorkerChange. Change-Id: I2177824aef7aea564cd0a9900d9970c8a8386ca7 Signed-off-by: Lukasz Wojciechowski --- diff --git a/workers/error.go b/workers/error.go index 666f021..6cf60bd 100644 --- a/workers/error.go +++ b/workers/error.go @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved + * Copyright (c) 2017-2018 Samsung Electronics Co., Ltd All Rights Reserved * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,4 +41,7 @@ var ( // ErrForbiddenStateChange is returned when transition from state, Worker is in, // to state, SetState has been called with, is forbidden. ErrForbiddenStateChange = errors.New("Invalid state transition was requested") + // ErrNoMatchingWorker is returned when there is no worker matching groups nor + // capabilities required by request. + ErrNoMatchingWorker = errors.New("No matching worker") ) diff --git a/workers/worker_list_test.go b/workers/worker_list_test.go index 9e9ab2a..5ee4606 100644 --- a/workers/worker_list_test.go +++ b/workers/worker_list_test.go @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved + * Copyright (c) 2017-2018 Samsung Electronics Co., Ltd All Rights Reserved * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,10 +19,15 @@ package workers import ( "crypto/rand" "crypto/rsa" + "errors" + "fmt" "net" . "git.tizen.org/tools/boruta" + "git.tizen.org/tools/boruta/dryad/conf" + "git.tizen.org/tools/boruta/rpc/dryad" + gomock "github.com/golang/mock/gomock" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/satori/go.uuid" @@ -34,6 +39,12 @@ var _ = Describe("WorkerList", func() { wl = NewWorkerList() }) + It("should return non-nil new DryadClient every time called", func() { + for i := 0; i < 3; i++ { + Expect(wl.newDryadClient()).NotTo(BeNil(), "i = %d", i) + } + }) + Describe("Register", func() { var registeredWorkers []string @@ -519,5 +530,199 @@ var _ = Describe("WorkerList", func() { } }) }) + Describe("PrepareWorker", func() { + var ctrl *gomock.Controller + var dcm *MockDryadClientManager + ip := net.IPv4(2, 4, 6, 8) + key := &rsa.PrivateKey{} + testerr := errors.New("Test Error") + noWorker := WorkerUUID("There's no such worker") + + eventuallyKey := func(info *mapWorker, key *rsa.PrivateKey) { + EventuallyWithOffset(1, func() *rsa.PrivateKey { + wl.mutex.Lock() + defer wl.mutex.Unlock() + return info.key + }).Should(Equal(key)) + } + eventuallyState := func(info *mapWorker, state WorkerState) { + EventuallyWithOffset(1, func() WorkerState { + wl.mutex.Lock() + defer wl.mutex.Unlock() + return info.State + }).Should(Equal(state)) + } + + BeforeEach(func() { + ctrl = gomock.NewController(GinkgoT()) + dcm = NewMockDryadClientManager(ctrl) + wl.newDryadClient = func() dryad.ClientManager { + return dcm + } + }) + AfterEach(func() { + ctrl.Finish() + }) + + It("should set worker into IDLE in without-key preparation", func() { + err := wl.PrepareWorker(worker, false) + Expect(err).NotTo(HaveOccurred()) + info, ok := wl.workers[worker] + Expect(ok).To(BeTrue()) + Expect(info.State).To(Equal(IDLE)) + }) + It("should fail to prepare not existing worker in without-key preparation", func() { + uuid := randomUUID() + err := wl.PrepareWorker(uuid, false) + Expect(err).To(Equal(ErrWorkerNotFound)) + }) + It("should ignore to prepare worker for non-existing worker", func() { + err := wl.PrepareWorker(noWorker, true) + Expect(err).NotTo(HaveOccurred()) + }) + Describe("with worker's IP set", func() { + var info *mapWorker + BeforeEach(func() { + var ok bool + info, ok = wl.workers[worker] + Expect(ok).To(BeTrue()) + Expect(info.key).To(BeNil()) + info.ip = ip + }) + It("should set worker into IDLE state and prepare a key", func() { + gomock.InOrder( + dcm.EXPECT().Create(ip, conf.DefaultRPCPort), + dcm.EXPECT().Prepare().Return(key, nil), + dcm.EXPECT().Close(), + ) + + err := wl.PrepareWorker(worker, true) + Expect(err).NotTo(HaveOccurred()) + + eventuallyState(info, IDLE) + eventuallyKey(info, key) + }) + It("should fail to prepare worker if dryadClientManager fails to prepare client", func() { + gomock.InOrder( + dcm.EXPECT().Create(ip, conf.DefaultRPCPort), + dcm.EXPECT().Prepare().Return(nil, testerr), + dcm.EXPECT().Close(), + ) + + err := wl.PrepareWorker(worker, true) + Expect(err).NotTo(HaveOccurred()) + + eventuallyState(info, FAIL) + Expect(info.key).To(BeNil()) + }) + It("should fail to prepare worker if dryadClientManager fails to create client", func() { + dcm.EXPECT().Create(ip, conf.DefaultRPCPort).Return(testerr) + + err := wl.PrepareWorker(worker, true) + Expect(err).NotTo(HaveOccurred()) + + eventuallyState(info, FAIL) + Expect(info.key).To(BeNil()) + }) + }) + }) + }) + Describe("TakeBestMatchingWorker", func() { + addWorker := func(groups Groups, caps Capabilities) *mapWorker { + capsUUID := uuid.NewV4().String() + workerUUID := WorkerUUID(capsUUID) + + caps[UUID] = capsUUID + wl.Register(caps) + w, ok := wl.workers[workerUUID] + Expect(ok).To(BeTrue()) + Expect(w.State).To(Equal(MAINTENANCE)) + + err := wl.SetGroups(workerUUID, groups) + Expect(err).NotTo(HaveOccurred()) + + return w + } + addIdleWorker := func(groups Groups, caps Capabilities) *mapWorker { + w := addWorker(groups, caps) + + err := wl.PrepareWorker(w.WorkerUUID, false) + Expect(err).NotTo(HaveOccurred()) + Expect(w.State).To(Equal(IDLE)) + + return w + } + generateGroups := func(count int) Groups { + var groups Groups + for i := 0; i < count; i++ { + groups = append(groups, Group(fmt.Sprintf("testGroup_%d", i))) + } + return groups + } + generateCaps := func(count int) Capabilities { + caps := make(Capabilities) + for i := 0; i < count; i++ { + k := fmt.Sprintf("testCapKey_%d", i) + v := fmt.Sprintf("testCapValue_%d", i) + caps[k] = v + } + return caps + } + It("should fail to find matching worker when there are no workers", func() { + ret, err := wl.TakeBestMatchingWorker(Groups{}, Capabilities{}) + Expect(err).To(Equal(ErrNoMatchingWorker)) + Expect(ret).To(BeZero()) + }) + It("should match fitting worker and set it into RUN state", func() { + w := addIdleWorker(Groups{}, Capabilities{}) + + ret, err := wl.TakeBestMatchingWorker(Groups{}, Capabilities{}) + Expect(err).NotTo(HaveOccurred()) + Expect(ret).To(Equal(w.WorkerUUID)) + Expect(w.State).To(Equal(RUN)) + }) + It("should not match not IDLE workers", func() { + addWorker(Groups{}, Capabilities{}) + + ret, err := wl.TakeBestMatchingWorker(Groups{}, Capabilities{}) + Expect(err).To(Equal(ErrNoMatchingWorker)) + Expect(ret).To(BeZero()) + }) + It("should choose least capable worker", func() { + // Create matching workers. + w5g5c := addIdleWorker(generateGroups(5), generateCaps(5)) + w1g7c := addIdleWorker(generateGroups(1), generateCaps(7)) + w5g1c := addIdleWorker(generateGroups(5), generateCaps(1)) + // Create non-matching workers. + w2g0c := addIdleWorker(generateGroups(2), generateCaps(0)) + w0g2c := addIdleWorker(generateGroups(0), generateCaps(2)) + + expectedWorkers := []*mapWorker{w5g1c, w1g7c, w5g5c} + for _, w := range expectedWorkers { + ret, err := wl.TakeBestMatchingWorker(generateGroups(1), generateCaps(1)) + Expect(err).NotTo(HaveOccurred()) + Expect(ret).To(Equal(w.WorkerUUID)) + Expect(w.State).To(Equal(RUN)) + } + ret, err := wl.TakeBestMatchingWorker(generateGroups(1), generateCaps(1)) + Expect(err).To(Equal(ErrNoMatchingWorker)) + Expect(ret).To(BeZero()) + + leftWorkers := []*mapWorker{w2g0c, w0g2c} + for _, w := range leftWorkers { + Expect(w.State).To(Equal(IDLE)) + } + }) + }) + Describe("SetChangeListener", func() { + It("should set WorkerChange", func() { + ctrl := gomock.NewController(GinkgoT()) + defer ctrl.Finish() + wc := NewMockWorkerChange(ctrl) + + Expect(wl.changeListener).To(BeNil()) + wl.SetChangeListener(wc) + Expect(wl.changeListener).To(Equal(wc)) + }) }) }) diff --git a/workers/workers.go b/workers/workers.go index cdc2b2e..9efb6c1 100644 --- a/workers/workers.go +++ b/workers/workers.go @@ -19,10 +19,13 @@ package workers import ( "crypto/rsa" + "math" "net" "sync" . "git.tizen.org/tools/boruta" + "git.tizen.org/tools/boruta/dryad/conf" + "git.tizen.org/tools/boruta/rpc/dryad" ) // UUID denotes a key in Capabilities where WorkerUUID is stored. @@ -38,18 +41,35 @@ type mapWorker struct { // WorkerList implements Superviser and Workers interfaces. // It manages a list of Workers. +// It implements also WorkersManager from matcher package making it usable +// as interface for acquiring workers by Matcher. +// The implemnetation requires changeListener, which is notified after Worker's +// state changes. +// The dryad.ClientManager allows managing Dryads' clients for key generation. +// One can be created using newDryadClient function. type WorkerList struct { Superviser Workers - workers map[WorkerUUID]*mapWorker - mutex *sync.RWMutex + workers map[WorkerUUID]*mapWorker + mutex *sync.RWMutex + changeListener WorkerChange + newDryadClient func() dryad.ClientManager +} + +// newDryadClient provides default implementation of dryad.ClientManager interface. +// It uses dryad package implementation of DryadClient. +// The function is set as WorkerList.newDryadClient. Field can be replaced +// by another function providing dryad.ClientManager for tests purposes. +func newDryadClient() dryad.ClientManager { + return new(dryad.DryadClient) } // NewWorkerList returns a new WorkerList with all fields set. func NewWorkerList() *WorkerList { return &WorkerList{ - workers: make(map[WorkerUUID]*mapWorker), - mutex: new(sync.RWMutex), + workers: make(map[WorkerUUID]*mapWorker), + mutex: new(sync.RWMutex), + newDryadClient: newDryadClient, } } @@ -184,8 +204,8 @@ func isGroupsMatching(worker WorkerInfo, groupsMatcher map[Group]interface{}) bo return true } for _, workerGroup := range worker.Groups { - _, ok := groupsMatcher[workerGroup] - if ok { + _, match := groupsMatcher[workerGroup] + if match { return true } } @@ -193,10 +213,18 @@ func isGroupsMatching(worker WorkerInfo, groupsMatcher map[Group]interface{}) bo } // ListWorkers is an implementation of ListWorkers from Workers interface. -// It lists all workers when both: +func (wl *WorkerList) ListWorkers(groups Groups, caps Capabilities) ([]WorkerInfo, error) { + wl.mutex.RLock() + defer wl.mutex.RUnlock() + + return wl.listWorkers(groups, caps) +} + +// listWorkers lists all workers when both: // * any of the groups is matching (or groups is nil) // * all of the caps is matching (or caps is nil) -func (wl *WorkerList) ListWorkers(groups Groups, caps Capabilities) ([]WorkerInfo, error) { +// Caller of this method should own the mutex. +func (wl *WorkerList) listWorkers(groups Groups, caps Capabilities) ([]WorkerInfo, error) { matching := make([]WorkerInfo, 0, len(wl.workers)) groupsMatcher := make(map[Group]interface{}) @@ -274,3 +302,112 @@ func (wl *WorkerList) GetWorkerKey(uuid WorkerUUID) (rsa.PrivateKey, error) { } return *worker.key, nil } + +// TakeBestMatchingWorker verifies which IDLE workers can satisfy Groups and +// Capabilities required by the request. Among all matched workers a best worker +// is choosen (least capable worker still fitting request). If a worker is found +// it is put into RUN state and its UUID is returned. An error is returned if no +// matching IDLE worker is found. +// It is a part of WorkersManager interface implementation by WorkerList. +func (wl *WorkerList) TakeBestMatchingWorker(groups Groups, caps Capabilities) (bestWorker WorkerUUID, err error) { + wl.mutex.Lock() + defer wl.mutex.Unlock() + + var bestScore = math.MaxInt32 + + matching, _ := wl.listWorkers(groups, caps) + for _, info := range matching { + if info.State != IDLE { + continue + } + score := len(info.Caps) + len(info.Groups) + if score < bestScore { + bestScore = score + bestWorker = info.WorkerUUID + } + } + if bestScore == math.MaxInt32 { + err = ErrNoMatchingWorker + return + } + + err = wl.setState(bestWorker, RUN) + return +} + +// PrepareWorker brings worker into IDLE state and prepares it to be ready for +// running a job. In some of the situations if a worker has been matched for a job, +// but has not been used, there is no need for regeneration of the key. Caller of +// this method can decide (with 2nd parameter) if key generation is required for +// preparing worker. +// +// As key creation can take some time, the method is asynchronous and the worker's +// state might not be changed when it returns. +// It is a part of WorkersManager interface implementation by WorkerList. +func (wl *WorkerList) PrepareWorker(worker WorkerUUID, withKeyGeneration bool) error { + if !withKeyGeneration { + wl.mutex.Lock() + defer wl.mutex.Unlock() + return wl.setState(worker, IDLE) + } + + go wl.prepareKeyAndSetState(worker) + + return nil +} + +// prepareKeyAndSetState prepares private RSA key for the worker and sets worker +// into IDLE state in case of success. In case of failure of key preparation, +// worker is put into FAIL state instead. +func (wl *WorkerList) prepareKeyAndSetState(worker WorkerUUID) { + err := wl.prepareKey(worker) + wl.mutex.Lock() + defer wl.mutex.Unlock() + if err != nil { + // TODO log error. + wl.setState(worker, FAIL) + return + } + wl.setState(worker, IDLE) +} + +// setState changes state of worker. It does not contain any verification if change +// is feasible. It should be used only for internal boruta purposes. It must be +// called inside WorkerList critical section guarded by WorkerList.mutex. +func (wl *WorkerList) setState(worker WorkerUUID, state WorkerState) error { + w, ok := wl.workers[worker] + if !ok { + return ErrWorkerNotFound + } + w.State = state + return nil +} + +// prepareKey delegates key generation to Dryad and sets up generated key in the +// worker. In case of any failure it returns an error. +func (wl *WorkerList) prepareKey(worker WorkerUUID) error { + ip, err := wl.GetWorkerIP(worker) + if err != nil { + return err + } + client := wl.newDryadClient() + err = client.Create(ip, conf.DefaultRPCPort) + if err != nil { + return err + } + defer client.Close() + key, err := client.Prepare() + if err != nil { + return err + } + err = wl.SetWorkerKey(worker, key) + return err +} + +// SetChangeListener sets change listener object in WorkerList. Listener should be +// notified in case of changes of workers' states, when worker becomes IDLE +// or must break its job because of fail or maintenance. +// It is a part of WorkersManager interface implementation by WorkerList. +func (wl *WorkerList) SetChangeListener(listener WorkerChange) { + wl.changeListener = listener +}