/*
- * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ * Copyright (c) 2017-2018 Samsung Electronics Co., Ltd All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import (
"crypto/rand"
"crypto/rsa"
+ "errors"
+ "fmt"
"net"
. "git.tizen.org/tools/boruta"
+ "git.tizen.org/tools/boruta/dryad/conf"
+ "git.tizen.org/tools/boruta/rpc/dryad"
+ gomock "github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/satori/go.uuid"
wl = NewWorkerList()
})
+ It("should return non-nil new DryadClient every time called", func() {
+ for i := 0; i < 3; i++ {
+ Expect(wl.newDryadClient()).NotTo(BeNil(), "i = %d", i)
+ }
+ })
+
Describe("Register", func() {
var registeredWorkers []string
}
})
})
+ Describe("PrepareWorker", func() {
+ var ctrl *gomock.Controller
+ var dcm *MockDryadClientManager
+ ip := net.IPv4(2, 4, 6, 8)
+ key := &rsa.PrivateKey{}
+ testerr := errors.New("Test Error")
+ noWorker := WorkerUUID("There's no such worker")
+
+ eventuallyKey := func(info *mapWorker, key *rsa.PrivateKey) {
+ EventuallyWithOffset(1, func() *rsa.PrivateKey {
+ wl.mutex.Lock()
+ defer wl.mutex.Unlock()
+ return info.key
+ }).Should(Equal(key))
+ }
+ eventuallyState := func(info *mapWorker, state WorkerState) {
+ EventuallyWithOffset(1, func() WorkerState {
+ wl.mutex.Lock()
+ defer wl.mutex.Unlock()
+ return info.State
+ }).Should(Equal(state))
+ }
+
+ BeforeEach(func() {
+ ctrl = gomock.NewController(GinkgoT())
+ dcm = NewMockDryadClientManager(ctrl)
+ wl.newDryadClient = func() dryad.ClientManager {
+ return dcm
+ }
+ })
+ AfterEach(func() {
+ ctrl.Finish()
+ })
+
+ It("should set worker into IDLE in without-key preparation", func() {
+ err := wl.PrepareWorker(worker, false)
+ Expect(err).NotTo(HaveOccurred())
+ info, ok := wl.workers[worker]
+ Expect(ok).To(BeTrue())
+ Expect(info.State).To(Equal(IDLE))
+ })
+ It("should fail to prepare not existing worker in without-key preparation", func() {
+ uuid := randomUUID()
+ err := wl.PrepareWorker(uuid, false)
+ Expect(err).To(Equal(ErrWorkerNotFound))
+ })
+ It("should ignore to prepare worker for non-existing worker", func() {
+ err := wl.PrepareWorker(noWorker, true)
+ Expect(err).NotTo(HaveOccurred())
+ })
+ Describe("with worker's IP set", func() {
+ var info *mapWorker
+ BeforeEach(func() {
+ var ok bool
+ info, ok = wl.workers[worker]
+ Expect(ok).To(BeTrue())
+ Expect(info.key).To(BeNil())
+ info.ip = ip
+ })
+ It("should set worker into IDLE state and prepare a key", func() {
+ gomock.InOrder(
+ dcm.EXPECT().Create(ip, conf.DefaultRPCPort),
+ dcm.EXPECT().Prepare().Return(key, nil),
+ dcm.EXPECT().Close(),
+ )
+
+ err := wl.PrepareWorker(worker, true)
+ Expect(err).NotTo(HaveOccurred())
+
+ eventuallyState(info, IDLE)
+ eventuallyKey(info, key)
+ })
+ It("should fail to prepare worker if dryadClientManager fails to prepare client", func() {
+ gomock.InOrder(
+ dcm.EXPECT().Create(ip, conf.DefaultRPCPort),
+ dcm.EXPECT().Prepare().Return(nil, testerr),
+ dcm.EXPECT().Close(),
+ )
+
+ err := wl.PrepareWorker(worker, true)
+ Expect(err).NotTo(HaveOccurred())
+
+ eventuallyState(info, FAIL)
+ Expect(info.key).To(BeNil())
+ })
+ It("should fail to prepare worker if dryadClientManager fails to create client", func() {
+ dcm.EXPECT().Create(ip, conf.DefaultRPCPort).Return(testerr)
+
+ err := wl.PrepareWorker(worker, true)
+ Expect(err).NotTo(HaveOccurred())
+
+ eventuallyState(info, FAIL)
+ Expect(info.key).To(BeNil())
+ })
+ })
+ })
+ })
+ Describe("TakeBestMatchingWorker", func() {
+ addWorker := func(groups Groups, caps Capabilities) *mapWorker {
+ capsUUID := uuid.NewV4().String()
+ workerUUID := WorkerUUID(capsUUID)
+
+ caps[UUID] = capsUUID
+ wl.Register(caps)
+ w, ok := wl.workers[workerUUID]
+ Expect(ok).To(BeTrue())
+ Expect(w.State).To(Equal(MAINTENANCE))
+
+ err := wl.SetGroups(workerUUID, groups)
+ Expect(err).NotTo(HaveOccurred())
+
+ return w
+ }
+ addIdleWorker := func(groups Groups, caps Capabilities) *mapWorker {
+ w := addWorker(groups, caps)
+
+ err := wl.PrepareWorker(w.WorkerUUID, false)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(w.State).To(Equal(IDLE))
+
+ return w
+ }
+ generateGroups := func(count int) Groups {
+ var groups Groups
+ for i := 0; i < count; i++ {
+ groups = append(groups, Group(fmt.Sprintf("testGroup_%d", i)))
+ }
+ return groups
+ }
+ generateCaps := func(count int) Capabilities {
+ caps := make(Capabilities)
+ for i := 0; i < count; i++ {
+ k := fmt.Sprintf("testCapKey_%d", i)
+ v := fmt.Sprintf("testCapValue_%d", i)
+ caps[k] = v
+ }
+ return caps
+ }
+ It("should fail to find matching worker when there are no workers", func() {
+ ret, err := wl.TakeBestMatchingWorker(Groups{}, Capabilities{})
+ Expect(err).To(Equal(ErrNoMatchingWorker))
+ Expect(ret).To(BeZero())
+ })
+ It("should match fitting worker and set it into RUN state", func() {
+ w := addIdleWorker(Groups{}, Capabilities{})
+
+ ret, err := wl.TakeBestMatchingWorker(Groups{}, Capabilities{})
+ Expect(err).NotTo(HaveOccurred())
+ Expect(ret).To(Equal(w.WorkerUUID))
+ Expect(w.State).To(Equal(RUN))
+ })
+ It("should not match not IDLE workers", func() {
+ addWorker(Groups{}, Capabilities{})
+
+ ret, err := wl.TakeBestMatchingWorker(Groups{}, Capabilities{})
+ Expect(err).To(Equal(ErrNoMatchingWorker))
+ Expect(ret).To(BeZero())
+ })
+ It("should choose least capable worker", func() {
+ // Create matching workers.
+ w5g5c := addIdleWorker(generateGroups(5), generateCaps(5))
+ w1g7c := addIdleWorker(generateGroups(1), generateCaps(7))
+ w5g1c := addIdleWorker(generateGroups(5), generateCaps(1))
+ // Create non-matching workers.
+ w2g0c := addIdleWorker(generateGroups(2), generateCaps(0))
+ w0g2c := addIdleWorker(generateGroups(0), generateCaps(2))
+
+ expectedWorkers := []*mapWorker{w5g1c, w1g7c, w5g5c}
+ for _, w := range expectedWorkers {
+ ret, err := wl.TakeBestMatchingWorker(generateGroups(1), generateCaps(1))
+ Expect(err).NotTo(HaveOccurred())
+ Expect(ret).To(Equal(w.WorkerUUID))
+ Expect(w.State).To(Equal(RUN))
+ }
+ ret, err := wl.TakeBestMatchingWorker(generateGroups(1), generateCaps(1))
+ Expect(err).To(Equal(ErrNoMatchingWorker))
+ Expect(ret).To(BeZero())
+
+ leftWorkers := []*mapWorker{w2g0c, w0g2c}
+ for _, w := range leftWorkers {
+ Expect(w.State).To(Equal(IDLE))
+ }
+ })
+ })
+ Describe("SetChangeListener", func() {
+ It("should set WorkerChange", func() {
+ ctrl := gomock.NewController(GinkgoT())
+ defer ctrl.Finish()
+ wc := NewMockWorkerChange(ctrl)
+
+ Expect(wl.changeListener).To(BeNil())
+ wl.SetChangeListener(wc)
+ Expect(wl.changeListener).To(Equal(wc))
+ })
})
})
import (
"crypto/rsa"
+ "math"
"net"
"sync"
. "git.tizen.org/tools/boruta"
+ "git.tizen.org/tools/boruta/dryad/conf"
+ "git.tizen.org/tools/boruta/rpc/dryad"
)
// UUID denotes a key in Capabilities where WorkerUUID is stored.
// WorkerList implements Superviser and Workers interfaces.
// It manages a list of Workers.
+// It implements also WorkersManager from matcher package making it usable
+// as interface for acquiring workers by Matcher.
+// The implemnetation requires changeListener, which is notified after Worker's
+// state changes.
+// The dryad.ClientManager allows managing Dryads' clients for key generation.
+// One can be created using newDryadClient function.
type WorkerList struct {
Superviser
Workers
- workers map[WorkerUUID]*mapWorker
- mutex *sync.RWMutex
+ workers map[WorkerUUID]*mapWorker
+ mutex *sync.RWMutex
+ changeListener WorkerChange
+ newDryadClient func() dryad.ClientManager
+}
+
+// newDryadClient provides default implementation of dryad.ClientManager interface.
+// It uses dryad package implementation of DryadClient.
+// The function is set as WorkerList.newDryadClient. Field can be replaced
+// by another function providing dryad.ClientManager for tests purposes.
+func newDryadClient() dryad.ClientManager {
+ return new(dryad.DryadClient)
}
// NewWorkerList returns a new WorkerList with all fields set.
func NewWorkerList() *WorkerList {
return &WorkerList{
- workers: make(map[WorkerUUID]*mapWorker),
- mutex: new(sync.RWMutex),
+ workers: make(map[WorkerUUID]*mapWorker),
+ mutex: new(sync.RWMutex),
+ newDryadClient: newDryadClient,
}
}
return true
}
for _, workerGroup := range worker.Groups {
- _, ok := groupsMatcher[workerGroup]
- if ok {
+ _, match := groupsMatcher[workerGroup]
+ if match {
return true
}
}
}
// ListWorkers is an implementation of ListWorkers from Workers interface.
-// It lists all workers when both:
+func (wl *WorkerList) ListWorkers(groups Groups, caps Capabilities) ([]WorkerInfo, error) {
+ wl.mutex.RLock()
+ defer wl.mutex.RUnlock()
+
+ return wl.listWorkers(groups, caps)
+}
+
+// listWorkers lists all workers when both:
// * any of the groups is matching (or groups is nil)
// * all of the caps is matching (or caps is nil)
-func (wl *WorkerList) ListWorkers(groups Groups, caps Capabilities) ([]WorkerInfo, error) {
+// Caller of this method should own the mutex.
+func (wl *WorkerList) listWorkers(groups Groups, caps Capabilities) ([]WorkerInfo, error) {
matching := make([]WorkerInfo, 0, len(wl.workers))
groupsMatcher := make(map[Group]interface{})
}
return *worker.key, nil
}
+
+// TakeBestMatchingWorker verifies which IDLE workers can satisfy Groups and
+// Capabilities required by the request. Among all matched workers a best worker
+// is choosen (least capable worker still fitting request). If a worker is found
+// it is put into RUN state and its UUID is returned. An error is returned if no
+// matching IDLE worker is found.
+// It is a part of WorkersManager interface implementation by WorkerList.
+func (wl *WorkerList) TakeBestMatchingWorker(groups Groups, caps Capabilities) (bestWorker WorkerUUID, err error) {
+ wl.mutex.Lock()
+ defer wl.mutex.Unlock()
+
+ var bestScore = math.MaxInt32
+
+ matching, _ := wl.listWorkers(groups, caps)
+ for _, info := range matching {
+ if info.State != IDLE {
+ continue
+ }
+ score := len(info.Caps) + len(info.Groups)
+ if score < bestScore {
+ bestScore = score
+ bestWorker = info.WorkerUUID
+ }
+ }
+ if bestScore == math.MaxInt32 {
+ err = ErrNoMatchingWorker
+ return
+ }
+
+ err = wl.setState(bestWorker, RUN)
+ return
+}
+
+// PrepareWorker brings worker into IDLE state and prepares it to be ready for
+// running a job. In some of the situations if a worker has been matched for a job,
+// but has not been used, there is no need for regeneration of the key. Caller of
+// this method can decide (with 2nd parameter) if key generation is required for
+// preparing worker.
+//
+// As key creation can take some time, the method is asynchronous and the worker's
+// state might not be changed when it returns.
+// It is a part of WorkersManager interface implementation by WorkerList.
+func (wl *WorkerList) PrepareWorker(worker WorkerUUID, withKeyGeneration bool) error {
+ if !withKeyGeneration {
+ wl.mutex.Lock()
+ defer wl.mutex.Unlock()
+ return wl.setState(worker, IDLE)
+ }
+
+ go wl.prepareKeyAndSetState(worker)
+
+ return nil
+}
+
+// prepareKeyAndSetState prepares private RSA key for the worker and sets worker
+// into IDLE state in case of success. In case of failure of key preparation,
+// worker is put into FAIL state instead.
+func (wl *WorkerList) prepareKeyAndSetState(worker WorkerUUID) {
+ err := wl.prepareKey(worker)
+ wl.mutex.Lock()
+ defer wl.mutex.Unlock()
+ if err != nil {
+ // TODO log error.
+ wl.setState(worker, FAIL)
+ return
+ }
+ wl.setState(worker, IDLE)
+}
+
+// setState changes state of worker. It does not contain any verification if change
+// is feasible. It should be used only for internal boruta purposes. It must be
+// called inside WorkerList critical section guarded by WorkerList.mutex.
+func (wl *WorkerList) setState(worker WorkerUUID, state WorkerState) error {
+ w, ok := wl.workers[worker]
+ if !ok {
+ return ErrWorkerNotFound
+ }
+ w.State = state
+ return nil
+}
+
+// prepareKey delegates key generation to Dryad and sets up generated key in the
+// worker. In case of any failure it returns an error.
+func (wl *WorkerList) prepareKey(worker WorkerUUID) error {
+ ip, err := wl.GetWorkerIP(worker)
+ if err != nil {
+ return err
+ }
+ client := wl.newDryadClient()
+ err = client.Create(ip, conf.DefaultRPCPort)
+ if err != nil {
+ return err
+ }
+ defer client.Close()
+ key, err := client.Prepare()
+ if err != nil {
+ return err
+ }
+ err = wl.SetWorkerKey(worker, key)
+ return err
+}
+
+// SetChangeListener sets change listener object in WorkerList. Listener should be
+// notified in case of changes of workers' states, when worker becomes IDLE
+// or must break its job because of fail or maintenance.
+// It is a part of WorkersManager interface implementation by WorkerList.
+func (wl *WorkerList) SetChangeListener(listener WorkerChange) {
+ wl.changeListener = listener
+}