Implement WorkersManager interface in WorkerList
authorLukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Fri, 27 Oct 2017 19:41:56 +0000 (21:41 +0200)
committerLukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Fri, 27 Apr 2018 15:43:52 +0000 (17:43 +0200)
Implementation of WorkersManager from matcher package makes WorkerList usable
as interface for acquiring workers by Matcher.

The implemnetation adds 2 new fields:
* changeListener which is notified after Worker's state changes;
* newDryadClient which provides dryad.ClientManager.

The implementation is covered by tests.
2 mock up types: MockDryadClientManager and MockWorkerChange for mocking up
rpc.dryad.ClientManager and WorkerChange.

Change-Id: I2177824aef7aea564cd0a9900d9970c8a8386ca7
Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
workers/error.go
workers/worker_list_test.go
workers/workers.go

index 666f021..6cf60bd 100644 (file)
@@ -1,5 +1,5 @@
 /*
- *  Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *  Copyright (c) 2017-2018 Samsung Electronics Co., Ltd All Rights Reserved
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -41,4 +41,7 @@ var (
        // ErrForbiddenStateChange is returned when transition from state, Worker is in,
        // to state, SetState has been called with, is forbidden.
        ErrForbiddenStateChange = errors.New("Invalid state transition was requested")
+       // ErrNoMatchingWorker is returned when there is no worker matching groups nor
+       // capabilities required by request.
+       ErrNoMatchingWorker = errors.New("No matching worker")
 )
index 9e9ab2a..5ee4606 100644 (file)
@@ -1,5 +1,5 @@
 /*
- *  Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *  Copyright (c) 2017-2018 Samsung Electronics Co., Ltd All Rights Reserved
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -19,10 +19,15 @@ package workers
 import (
        "crypto/rand"
        "crypto/rsa"
+       "errors"
+       "fmt"
        "net"
 
        . "git.tizen.org/tools/boruta"
+       "git.tizen.org/tools/boruta/dryad/conf"
+       "git.tizen.org/tools/boruta/rpc/dryad"
 
+       gomock "github.com/golang/mock/gomock"
        . "github.com/onsi/ginkgo"
        . "github.com/onsi/gomega"
        "github.com/satori/go.uuid"
@@ -34,6 +39,12 @@ var _ = Describe("WorkerList", func() {
                wl = NewWorkerList()
        })
 
+       It("should return non-nil new DryadClient every time called", func() {
+               for i := 0; i < 3; i++ {
+                       Expect(wl.newDryadClient()).NotTo(BeNil(), "i = %d", i)
+               }
+       })
+
        Describe("Register", func() {
                var registeredWorkers []string
 
@@ -519,5 +530,199 @@ var _ = Describe("WorkerList", func() {
                                }
                        })
                })
+               Describe("PrepareWorker", func() {
+                       var ctrl *gomock.Controller
+                       var dcm *MockDryadClientManager
+                       ip := net.IPv4(2, 4, 6, 8)
+                       key := &rsa.PrivateKey{}
+                       testerr := errors.New("Test Error")
+                       noWorker := WorkerUUID("There's no such worker")
+
+                       eventuallyKey := func(info *mapWorker, key *rsa.PrivateKey) {
+                               EventuallyWithOffset(1, func() *rsa.PrivateKey {
+                                       wl.mutex.Lock()
+                                       defer wl.mutex.Unlock()
+                                       return info.key
+                               }).Should(Equal(key))
+                       }
+                       eventuallyState := func(info *mapWorker, state WorkerState) {
+                               EventuallyWithOffset(1, func() WorkerState {
+                                       wl.mutex.Lock()
+                                       defer wl.mutex.Unlock()
+                                       return info.State
+                               }).Should(Equal(state))
+                       }
+
+                       BeforeEach(func() {
+                               ctrl = gomock.NewController(GinkgoT())
+                               dcm = NewMockDryadClientManager(ctrl)
+                               wl.newDryadClient = func() dryad.ClientManager {
+                                       return dcm
+                               }
+                       })
+                       AfterEach(func() {
+                               ctrl.Finish()
+                       })
+
+                       It("should set worker into IDLE in without-key preparation", func() {
+                               err := wl.PrepareWorker(worker, false)
+                               Expect(err).NotTo(HaveOccurred())
+                               info, ok := wl.workers[worker]
+                               Expect(ok).To(BeTrue())
+                               Expect(info.State).To(Equal(IDLE))
+                       })
+                       It("should fail to prepare not existing worker in without-key preparation", func() {
+                               uuid := randomUUID()
+                               err := wl.PrepareWorker(uuid, false)
+                               Expect(err).To(Equal(ErrWorkerNotFound))
+                       })
+                       It("should ignore to prepare worker for non-existing worker", func() {
+                               err := wl.PrepareWorker(noWorker, true)
+                               Expect(err).NotTo(HaveOccurred())
+                       })
+                       Describe("with worker's IP set", func() {
+                               var info *mapWorker
+                               BeforeEach(func() {
+                                       var ok bool
+                                       info, ok = wl.workers[worker]
+                                       Expect(ok).To(BeTrue())
+                                       Expect(info.key).To(BeNil())
+                                       info.ip = ip
+                               })
+                               It("should set worker into IDLE state and prepare a key", func() {
+                                       gomock.InOrder(
+                                               dcm.EXPECT().Create(ip, conf.DefaultRPCPort),
+                                               dcm.EXPECT().Prepare().Return(key, nil),
+                                               dcm.EXPECT().Close(),
+                                       )
+
+                                       err := wl.PrepareWorker(worker, true)
+                                       Expect(err).NotTo(HaveOccurred())
+
+                                       eventuallyState(info, IDLE)
+                                       eventuallyKey(info, key)
+                               })
+                               It("should fail to prepare worker if dryadClientManager fails to prepare client", func() {
+                                       gomock.InOrder(
+                                               dcm.EXPECT().Create(ip, conf.DefaultRPCPort),
+                                               dcm.EXPECT().Prepare().Return(nil, testerr),
+                                               dcm.EXPECT().Close(),
+                                       )
+
+                                       err := wl.PrepareWorker(worker, true)
+                                       Expect(err).NotTo(HaveOccurred())
+
+                                       eventuallyState(info, FAIL)
+                                       Expect(info.key).To(BeNil())
+                               })
+                               It("should fail to prepare worker if dryadClientManager fails to create client", func() {
+                                       dcm.EXPECT().Create(ip, conf.DefaultRPCPort).Return(testerr)
+
+                                       err := wl.PrepareWorker(worker, true)
+                                       Expect(err).NotTo(HaveOccurred())
+
+                                       eventuallyState(info, FAIL)
+                                       Expect(info.key).To(BeNil())
+                               })
+                       })
+               })
+       })
+       Describe("TakeBestMatchingWorker", func() {
+               addWorker := func(groups Groups, caps Capabilities) *mapWorker {
+                       capsUUID := uuid.NewV4().String()
+                       workerUUID := WorkerUUID(capsUUID)
+
+                       caps[UUID] = capsUUID
+                       wl.Register(caps)
+                       w, ok := wl.workers[workerUUID]
+                       Expect(ok).To(BeTrue())
+                       Expect(w.State).To(Equal(MAINTENANCE))
+
+                       err := wl.SetGroups(workerUUID, groups)
+                       Expect(err).NotTo(HaveOccurred())
+
+                       return w
+               }
+               addIdleWorker := func(groups Groups, caps Capabilities) *mapWorker {
+                       w := addWorker(groups, caps)
+
+                       err := wl.PrepareWorker(w.WorkerUUID, false)
+                       Expect(err).NotTo(HaveOccurred())
+                       Expect(w.State).To(Equal(IDLE))
+
+                       return w
+               }
+               generateGroups := func(count int) Groups {
+                       var groups Groups
+                       for i := 0; i < count; i++ {
+                               groups = append(groups, Group(fmt.Sprintf("testGroup_%d", i)))
+                       }
+                       return groups
+               }
+               generateCaps := func(count int) Capabilities {
+                       caps := make(Capabilities)
+                       for i := 0; i < count; i++ {
+                               k := fmt.Sprintf("testCapKey_%d", i)
+                               v := fmt.Sprintf("testCapValue_%d", i)
+                               caps[k] = v
+                       }
+                       return caps
+               }
+               It("should fail to find matching worker when there are no workers", func() {
+                       ret, err := wl.TakeBestMatchingWorker(Groups{}, Capabilities{})
+                       Expect(err).To(Equal(ErrNoMatchingWorker))
+                       Expect(ret).To(BeZero())
+               })
+               It("should match fitting worker and set it into RUN state", func() {
+                       w := addIdleWorker(Groups{}, Capabilities{})
+
+                       ret, err := wl.TakeBestMatchingWorker(Groups{}, Capabilities{})
+                       Expect(err).NotTo(HaveOccurred())
+                       Expect(ret).To(Equal(w.WorkerUUID))
+                       Expect(w.State).To(Equal(RUN))
+               })
+               It("should not match not IDLE workers", func() {
+                       addWorker(Groups{}, Capabilities{})
+
+                       ret, err := wl.TakeBestMatchingWorker(Groups{}, Capabilities{})
+                       Expect(err).To(Equal(ErrNoMatchingWorker))
+                       Expect(ret).To(BeZero())
+               })
+               It("should choose least capable worker", func() {
+                       // Create matching workers.
+                       w5g5c := addIdleWorker(generateGroups(5), generateCaps(5))
+                       w1g7c := addIdleWorker(generateGroups(1), generateCaps(7))
+                       w5g1c := addIdleWorker(generateGroups(5), generateCaps(1))
+                       // Create non-matching workers.
+                       w2g0c := addIdleWorker(generateGroups(2), generateCaps(0))
+                       w0g2c := addIdleWorker(generateGroups(0), generateCaps(2))
+
+                       expectedWorkers := []*mapWorker{w5g1c, w1g7c, w5g5c}
+                       for _, w := range expectedWorkers {
+                               ret, err := wl.TakeBestMatchingWorker(generateGroups(1), generateCaps(1))
+                               Expect(err).NotTo(HaveOccurred())
+                               Expect(ret).To(Equal(w.WorkerUUID))
+                               Expect(w.State).To(Equal(RUN))
+                       }
+                       ret, err := wl.TakeBestMatchingWorker(generateGroups(1), generateCaps(1))
+                       Expect(err).To(Equal(ErrNoMatchingWorker))
+                       Expect(ret).To(BeZero())
+
+                       leftWorkers := []*mapWorker{w2g0c, w0g2c}
+                       for _, w := range leftWorkers {
+                               Expect(w.State).To(Equal(IDLE))
+                       }
+               })
+       })
+       Describe("SetChangeListener", func() {
+               It("should set WorkerChange", func() {
+                       ctrl := gomock.NewController(GinkgoT())
+                       defer ctrl.Finish()
+                       wc := NewMockWorkerChange(ctrl)
+
+                       Expect(wl.changeListener).To(BeNil())
+                       wl.SetChangeListener(wc)
+                       Expect(wl.changeListener).To(Equal(wc))
+               })
        })
 })
index cdc2b2e..9efb6c1 100644 (file)
@@ -19,10 +19,13 @@ package workers
 
 import (
        "crypto/rsa"
+       "math"
        "net"
        "sync"
 
        . "git.tizen.org/tools/boruta"
+       "git.tizen.org/tools/boruta/dryad/conf"
+       "git.tizen.org/tools/boruta/rpc/dryad"
 )
 
 // UUID denotes a key in Capabilities where WorkerUUID is stored.
@@ -38,18 +41,35 @@ type mapWorker struct {
 
 // WorkerList implements Superviser and Workers interfaces.
 // It manages a list of Workers.
+// It implements also WorkersManager from matcher package making it usable
+// as interface for acquiring workers by Matcher.
+// The implemnetation requires changeListener, which is notified after Worker's
+// state changes.
+// The dryad.ClientManager allows managing Dryads' clients for key generation.
+// One can be created using newDryadClient function.
 type WorkerList struct {
        Superviser
        Workers
-       workers map[WorkerUUID]*mapWorker
-       mutex   *sync.RWMutex
+       workers        map[WorkerUUID]*mapWorker
+       mutex          *sync.RWMutex
+       changeListener WorkerChange
+       newDryadClient func() dryad.ClientManager
+}
+
+// newDryadClient provides default implementation of dryad.ClientManager interface.
+// It uses dryad package implementation of DryadClient.
+// The function is set as WorkerList.newDryadClient. Field can be replaced
+// by another function providing dryad.ClientManager for tests purposes.
+func newDryadClient() dryad.ClientManager {
+       return new(dryad.DryadClient)
 }
 
 // NewWorkerList returns a new WorkerList with all fields set.
 func NewWorkerList() *WorkerList {
        return &WorkerList{
-               workers: make(map[WorkerUUID]*mapWorker),
-               mutex:   new(sync.RWMutex),
+               workers:        make(map[WorkerUUID]*mapWorker),
+               mutex:          new(sync.RWMutex),
+               newDryadClient: newDryadClient,
        }
 }
 
@@ -184,8 +204,8 @@ func isGroupsMatching(worker WorkerInfo, groupsMatcher map[Group]interface{}) bo
                return true
        }
        for _, workerGroup := range worker.Groups {
-               _, ok := groupsMatcher[workerGroup]
-               if ok {
+               _, match := groupsMatcher[workerGroup]
+               if match {
                        return true
                }
        }
@@ -193,10 +213,18 @@ func isGroupsMatching(worker WorkerInfo, groupsMatcher map[Group]interface{}) bo
 }
 
 // ListWorkers is an implementation of ListWorkers from Workers interface.
-// It lists all workers when both:
+func (wl *WorkerList) ListWorkers(groups Groups, caps Capabilities) ([]WorkerInfo, error) {
+       wl.mutex.RLock()
+       defer wl.mutex.RUnlock()
+
+       return wl.listWorkers(groups, caps)
+}
+
+// listWorkers lists all workers when both:
 // * any of the groups is matching (or groups is nil)
 // * all of the caps is matching (or caps is nil)
-func (wl *WorkerList) ListWorkers(groups Groups, caps Capabilities) ([]WorkerInfo, error) {
+// Caller of this method should own the mutex.
+func (wl *WorkerList) listWorkers(groups Groups, caps Capabilities) ([]WorkerInfo, error) {
        matching := make([]WorkerInfo, 0, len(wl.workers))
 
        groupsMatcher := make(map[Group]interface{})
@@ -274,3 +302,112 @@ func (wl *WorkerList) GetWorkerKey(uuid WorkerUUID) (rsa.PrivateKey, error) {
        }
        return *worker.key, nil
 }
+
+// TakeBestMatchingWorker verifies which IDLE workers can satisfy Groups and
+// Capabilities required by the request. Among all matched workers a best worker
+// is choosen (least capable worker still fitting request). If a worker is found
+// it is put into RUN state and its UUID is returned. An error is returned if no
+// matching IDLE worker is found.
+// It is a part of WorkersManager interface implementation by WorkerList.
+func (wl *WorkerList) TakeBestMatchingWorker(groups Groups, caps Capabilities) (bestWorker WorkerUUID, err error) {
+       wl.mutex.Lock()
+       defer wl.mutex.Unlock()
+
+       var bestScore = math.MaxInt32
+
+       matching, _ := wl.listWorkers(groups, caps)
+       for _, info := range matching {
+               if info.State != IDLE {
+                       continue
+               }
+               score := len(info.Caps) + len(info.Groups)
+               if score < bestScore {
+                       bestScore = score
+                       bestWorker = info.WorkerUUID
+               }
+       }
+       if bestScore == math.MaxInt32 {
+               err = ErrNoMatchingWorker
+               return
+       }
+
+       err = wl.setState(bestWorker, RUN)
+       return
+}
+
+// PrepareWorker brings worker into IDLE state and prepares it to be ready for
+// running a job. In some of the situations if a worker has been matched for a job,
+// but has not been used, there is no need for regeneration of the key. Caller of
+// this method can decide (with 2nd parameter) if key generation is required for
+// preparing worker.
+//
+// As key creation can take some time, the method is asynchronous and the worker's
+// state might not be changed when it returns.
+// It is a part of WorkersManager interface implementation by WorkerList.
+func (wl *WorkerList) PrepareWorker(worker WorkerUUID, withKeyGeneration bool) error {
+       if !withKeyGeneration {
+               wl.mutex.Lock()
+               defer wl.mutex.Unlock()
+               return wl.setState(worker, IDLE)
+       }
+
+       go wl.prepareKeyAndSetState(worker)
+
+       return nil
+}
+
+// prepareKeyAndSetState prepares private RSA key for the worker and sets worker
+// into IDLE state in case of success. In case of failure of key preparation,
+// worker is put into FAIL state instead.
+func (wl *WorkerList) prepareKeyAndSetState(worker WorkerUUID) {
+       err := wl.prepareKey(worker)
+       wl.mutex.Lock()
+       defer wl.mutex.Unlock()
+       if err != nil {
+               // TODO log error.
+               wl.setState(worker, FAIL)
+               return
+       }
+       wl.setState(worker, IDLE)
+}
+
+// setState changes state of worker. It does not contain any verification if change
+// is feasible. It should be used only for internal boruta purposes. It must be
+// called inside WorkerList critical section guarded by WorkerList.mutex.
+func (wl *WorkerList) setState(worker WorkerUUID, state WorkerState) error {
+       w, ok := wl.workers[worker]
+       if !ok {
+               return ErrWorkerNotFound
+       }
+       w.State = state
+       return nil
+}
+
+// prepareKey delegates key generation to Dryad and sets up generated key in the
+// worker. In case of any failure it returns an error.
+func (wl *WorkerList) prepareKey(worker WorkerUUID) error {
+       ip, err := wl.GetWorkerIP(worker)
+       if err != nil {
+               return err
+       }
+       client := wl.newDryadClient()
+       err = client.Create(ip, conf.DefaultRPCPort)
+       if err != nil {
+               return err
+       }
+       defer client.Close()
+       key, err := client.Prepare()
+       if err != nil {
+               return err
+       }
+       err = wl.SetWorkerKey(worker, key)
+       return err
+}
+
+// SetChangeListener sets change listener object in WorkerList. Listener should be
+// notified in case of changes of workers' states, when worker becomes IDLE
+// or must break its job because of fail or maintenance.
+// It is a part of WorkersManager interface implementation by WorkerList.
+func (wl *WorkerList) SetChangeListener(listener WorkerChange) {
+       wl.changeListener = listener
+}