Fix rpc and workers separation 15/188715/6
authorLukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Fri, 7 Sep 2018 18:08:33 +0000 (20:08 +0200)
committerLukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Mon, 17 Sep 2018 10:02:59 +0000 (12:02 +0200)
rpc package:

The rpc package should use Superviser interface for accessing workers,
but the implementation was based on direct access to WorkerList.
Tests also used methods not included in Superviser interface.

This patch fixes that, making rpc package independent of workers package.
It uses Superviser interface and MockSuperviser type for tests.

The test coverage of the rpc package has been increased
from 75.9% to 96.3% leaving only errors from net package's functions
not covered.

workers package:

All internal functions from workers package have been changed
to lowercase, unexported methods.

All exported methods are part of interfaces now:
* Superviser (used by Dryad through RPC calls):
 > Register
 > SetFail
* Workers (used by external HTTP clients):
 > SetState
 > SetGroups
 > Deregister
 > ListWorkers
 > GetWorkerInfo
* WorkersManager (used by higher internal layers (matcher, requests)):
 > GetWorkerSSHAddr
 > GetWorkerKey
 > TakeBestMatchingWorker
 > PrepareWorker
 > SetChangeListener

Change-Id: Ied9763b276ca02d05a904f4df2a7757a1892a772
Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
rpc/superviser/reception.go
rpc/superviser/reception_test.go
workers/worker_list_test.go
workers/workers.go

index 988b99c..6d022dd 100644 (file)
@@ -28,26 +28,25 @@ import (
        "net/rpc"
 
        "git.tizen.org/tools/boruta"
-       "git.tizen.org/tools/boruta/workers"
 )
 
 type superviserReception struct {
-       wl       *workers.WorkerList
+       sv       boruta.Superviser
        listener net.Listener
 }
 
 type addressBook struct {
        ip net.IP
-       wl *workers.WorkerList
+       sv boruta.Superviser
 }
 
-func startSuperviserReception(wl *workers.WorkerList, addr string) (sr *superviserReception, err error) {
+func startSuperviserReception(sv boruta.Superviser, addr string) (sr *superviserReception, err error) {
        sr = new(superviserReception)
        sr.listener, err = net.Listen("tcp", addr)
        if err != nil {
                return
        }
-       sr.wl = wl
+       sr.sv = sv
        go sr.listenAndServe()
        return
 }
@@ -58,8 +57,8 @@ func startSuperviserReception(wl *workers.WorkerList, addr string) (sr *supervis
 //
 // SetFail is unchanged, i.e. it calls SetFail of WorkerList without modification of arguments and
 // return values.
-func StartSuperviserReception(wl *workers.WorkerList, addr string) (err error) {
-       _, err = startSuperviserReception(wl, addr)
+func StartSuperviserReception(sv boruta.Superviser, addr string) (err error) {
+       _, err = startSuperviserReception(sv, addr)
        return err
 }
 
@@ -81,7 +80,7 @@ func (sr *superviserReception) serve(conn net.Conn) {
 
        sub := &addressBook{
                ip: ip,
-               wl: sr.wl,
+               sv: sr.sv,
        }
 
        srv := rpc.NewServer()
@@ -118,10 +117,10 @@ func (ab *addressBook) Register(caps boruta.Capabilities, dryadAddress string, s
        if sshd.IP == nil {
                sshd.IP = ab.ip
        }
-       return ab.wl.Register(caps, dryad.String(), sshd.String())
+       return ab.sv.Register(caps, dryad.String(), sshd.String())
 }
 
 // SetFail calls SetFail of WorkerList.
 func (ab *addressBook) SetFail(uuid boruta.WorkerUUID, reason string) (err error) {
-       return ab.wl.SetFail(uuid, reason)
+       return ab.sv.SetFail(uuid, reason)
 }
index 1010fb7..e72ad43 100644 (file)
 package superviser
 
 import (
+       "errors"
        "net"
        "net/rpc"
 
        "git.tizen.org/tools/boruta"
-       "git.tizen.org/tools/boruta/workers"
-
+       "git.tizen.org/tools/boruta/mocks"
+       "github.com/golang/mock/gomock"
        . "github.com/onsi/ginkgo"
        . "github.com/onsi/gomega"
 )
 
 var _ = Describe("superviserReception", func() {
-       var i *superviserReception
-       var wl *workers.WorkerList
-       var addr net.Addr
-       uuidStr := "test-uuid"
-       refAddr := net.TCPAddr{
-               IP:   net.IPv4(127, 0, 0, 1),
-               Port: 7175,
-       }
+       var ctrl *gomock.Controller
+       var msv *mocks.MockSuperviser
+
+       const badAddr = "500.100.900"
 
        BeforeEach(func() {
-               var err error
-               wl = workers.NewWorkerList()
-               i, err = startSuperviserReception(wl, "")
-               Expect(err).ToNot(HaveOccurred())
-               addr = i.listener.Addr()
+               ctrl = gomock.NewController(GinkgoT())
+               msv = mocks.NewMockSuperviser(ctrl)
+       })
+       AfterEach(func() {
+               ctrl.Finish()
        })
 
-       It("should get IP from connection", func() {
-               uuid := boruta.WorkerUUID(uuidStr)
-               conn, err := net.Dial(addr.Network(), addr.String())
+       It("should start superviserReception", func() {
+               i, err := startSuperviserReception(msv, "")
                Expect(err).ToNot(HaveOccurred())
-               c := NewSuperviserClient(rpc.NewClient(conn))
-
-               err = c.Register(boruta.Capabilities{"UUID": uuidStr}, ":7175", ":22")
+               Expect(i).NotTo(BeNil())
+               Expect(i.listener.Addr().Network()).To(Equal("tcp"))
+               sshd, err := net.ResolveTCPAddr("tcp", i.listener.Addr().String())
                Expect(err).ToNot(HaveOccurred())
+               Expect(sshd.IP.IsLoopback())
+       })
 
-               addr, err := wl.GetWorkerAddr(uuid)
-               Expect(err).ToNot(HaveOccurred())
-               Expect(addr.Port).To(Equal(refAddr.Port))
-               Expect(addr.IP.IsLoopback()).To(BeTrue())
+       It("should fail to superviserReception if address is bad", func() {
+               _, err := startSuperviserReception(msv, badAddr)
+               Expect(err).To(HaveOccurred())
        })
 
-       It("should get IP from argument", func() {
+       It("should pass StartSuperviserReception call to superviserReception and return error", func() {
+               err := StartSuperviserReception(msv, badAddr)
+               Expect(err).To(HaveOccurred())
+       })
+
+       Describe("with Superviser reception started", func() {
+               var i *superviserReception
+               var addr net.Addr
+               uuidStr := "test-uuid"
                uuid := boruta.WorkerUUID(uuidStr)
-               c, err := DialSuperviserClient(addr.String())
-               Expect(err).ToNot(HaveOccurred())
+               caps := boruta.Capabilities{"UUID": uuidStr}
+               dryadRefAddr := net.TCPAddr{Port: 7175}
+               sshRefAddr := net.TCPAddr{Port: 22}
+               testErr := errors.New("test error")
 
-               err = c.Register(boruta.Capabilities{"UUID": uuidStr}, refAddr.String(), refAddr.IP.String()+":22")
-               Expect(err).ToNot(HaveOccurred())
+               BeforeEach(func() {
+                       var err error
+                       i, err = startSuperviserReception(msv, "")
+                       Expect(err).ToNot(HaveOccurred())
+                       addr = i.listener.Addr()
+               })
 
-               addr, err := wl.GetWorkerAddr(uuid)
-               Expect(err).ToNot(HaveOccurred())
-               Expect(addr).To(Equal(refAddr))
-       })
+               It("should get IP from connection", func() {
+                       conn, err := net.Dial(addr.Network(), addr.String())
+                       Expect(err).ToNot(HaveOccurred())
+                       c := NewSuperviserClient(rpc.NewClient(conn))
 
-       It("should fail to call with either address empty", func() {
-               c, err := DialSuperviserClient(addr.String())
-               Expect(err).ToNot(HaveOccurred())
+                       msv.EXPECT().Register(caps, gomock.Any(), gomock.Any()).DoAndReturn(
+                               func(c boruta.Capabilities, dryad string, ssh string) (err error) {
+                                       dryadAddr, err0 := net.ResolveTCPAddr("tcp", dryad)
+                                       Expect(err0).NotTo(HaveOccurred())
+                                       Expect(dryadAddr.IP.IsLoopback()).To(BeTrue())
+                                       Expect(dryadAddr.Port).To(Equal(dryadRefAddr.Port))
+                                       sshAddr, err0 := net.ResolveTCPAddr("tcp", ssh)
+                                       Expect(err0).NotTo(HaveOccurred())
+                                       Expect(sshAddr.IP.IsLoopback()).To(BeTrue())
+                                       Expect(sshAddr.Port).To(Equal(sshRefAddr.Port))
+                                       return testErr
+                               })
+                       err = c.Register(caps, dryadRefAddr.String(), sshRefAddr.String())
+                       Expect(err).To(Equal(rpc.ServerError(testErr.Error())))
+               })
 
-               err = c.Register(boruta.Capabilities{"UUID": uuidStr}, "", refAddr.IP.String()+":22")
-               Expect(err).To(HaveOccurred())
+               It("should get IP from argument", func() {
+                       c, err := DialSuperviserClient(addr.String())
+                       Expect(err).ToNot(HaveOccurred())
 
-               err = c.Register(boruta.Capabilities{"UUID": uuidStr}, refAddr.String(), "")
-               Expect(err).To(HaveOccurred())
+                       msv.EXPECT().Register(caps, gomock.Any(), gomock.Any()).DoAndReturn(
+                               func(c boruta.Capabilities, dryad string, ssh string) (err error) {
+                                       dryadAddr, err0 := net.ResolveTCPAddr("tcp", dryad)
+                                       Expect(err0).NotTo(HaveOccurred())
+                                       Expect(dryadAddr.IP.IsLoopback()).To(BeTrue())
+                                       Expect(dryadAddr.Port).To(Equal(dryadRefAddr.Port))
+                                       sshAddr, err0 := net.ResolveTCPAddr("tcp", ssh)
+                                       Expect(err0).NotTo(HaveOccurred())
+                                       Expect(sshAddr.IP.IsLoopback()).To(BeTrue())
+                                       Expect(sshAddr.Port).To(Equal(sshRefAddr.Port))
+                                       return testErr
+                               })
+                       ip := net.IPv4(127, 0, 0, 1)
+                       da := net.TCPAddr{IP: ip, Port: dryadRefAddr.Port}
+                       sa := net.TCPAddr{IP: ip, Port: sshRefAddr.Port}
+                       err = c.Register(caps, da.String(), sa.String())
+                       Expect(err).To(Equal(rpc.ServerError(testErr.Error())))
+               })
+
+               It("should fail to call with either address empty", func() {
+                       c, err := DialSuperviserClient(addr.String())
+                       Expect(err).ToNot(HaveOccurred())
+
+                       err = c.Register(boruta.Capabilities{"UUID": uuidStr}, "", sshRefAddr.String())
+                       Expect(err).To(HaveOccurred())
+
+                       err = c.Register(boruta.Capabilities{"UUID": uuidStr}, dryadRefAddr.String(), "")
+                       Expect(err).To(HaveOccurred())
+               })
+
+               It("should call superviser's SetFail method", func() {
+                       testReason := "test reason"
+
+                       c, err := DialSuperviserClient(addr.String())
+                       Expect(err).ToNot(HaveOccurred())
+
+                       msv.EXPECT().SetFail(uuid, testReason).Return(testErr)
+                       err = c.SetFail(uuid, testReason)
+                       Expect(err).To(Equal(rpc.ServerError(testErr.Error())))
+               })
+
+               It("should properly close connection", func() {
+                       c, err := DialSuperviserClient(addr.String())
+                       Expect(err).ToNot(HaveOccurred())
+
+                       c.Close()
+               })
        })
 })
index 6c83877..52523ae 100644 (file)
@@ -726,7 +726,7 @@ var _ = Describe("WorkerList", func() {
                Describe("Setters and Getters", func() {
                        type genericGet func(wl *WorkerList, uuid boruta.WorkerUUID, expectedItem interface{}, expectedErr error)
                        getDryad := genericGet(func(wl *WorkerList, uuid boruta.WorkerUUID, expectedItem interface{}, expectedErr error) {
-                               item, err := wl.GetWorkerAddr(uuid)
+                               item, err := wl.getWorkerAddr(uuid)
                                if expectedErr != nil {
                                        Expect(item).To(Equal(net.TCPAddr{}))
                                        Expect(err).To(Equal(expectedErr))
@@ -760,7 +760,7 @@ var _ = Describe("WorkerList", func() {
                        setKey := genericSet(func(wl *WorkerList, uuid boruta.WorkerUUID, expectedErr error) interface{} {
                                key, err := rsa.GenerateKey(rand.Reader, 128)
                                Expect(err).ToNot(HaveOccurred())
-                               err = wl.SetWorkerKey(uuid, key)
+                               err = wl.setWorkerKey(uuid, key)
                                if expectedErr != nil {
                                        Expect(err).To(Equal(expectedErr))
                                        return nil
index 6d4ff95..6ca5392 100644 (file)
@@ -297,8 +297,8 @@ func (wl *WorkerList) GetWorkerInfo(uuid boruta.WorkerUUID) (boruta.WorkerInfo,
        return worker.WorkerInfo, nil
 }
 
-// GetWorkerAddr retrieves IP address from the internal structure.
-func (wl *WorkerList) GetWorkerAddr(uuid boruta.WorkerUUID) (net.TCPAddr, error) {
+// getWorkerAddr retrieves IP address from the internal structure.
+func (wl *WorkerList) getWorkerAddr(uuid boruta.WorkerUUID) (net.TCPAddr, error) {
        wl.mutex.RLock()
        defer wl.mutex.RUnlock()
        worker, ok := wl.workers[uuid]
@@ -319,9 +319,9 @@ func (wl *WorkerList) GetWorkerSSHAddr(uuid boruta.WorkerUUID) (net.TCPAddr, err
        return *worker.sshd, nil
 }
 
-// SetWorkerKey stores private key in the worker structure referenced by uuid.
+// setWorkerKey stores private key in the worker structure referenced by uuid.
 // It is safe to modify key after call to this function.
-func (wl *WorkerList) SetWorkerKey(uuid boruta.WorkerUUID, key *rsa.PrivateKey) error {
+func (wl *WorkerList) setWorkerKey(uuid boruta.WorkerUUID, key *rsa.PrivateKey) error {
        wl.mutex.Lock()
        defer wl.mutex.Unlock()
        worker, ok := wl.workers[uuid]
@@ -450,7 +450,7 @@ func (wl *WorkerList) setState(worker boruta.WorkerUUID, state boruta.WorkerStat
 
 // prepareKey generates key, installs public part on worker and stores private part in WorkerList.
 func (wl *WorkerList) prepareKey(worker boruta.WorkerUUID) error {
-       addr, err := wl.GetWorkerAddr(worker)
+       addr, err := wl.getWorkerAddr(worker)
        if err != nil {
                return err
        }
@@ -472,13 +472,13 @@ func (wl *WorkerList) prepareKey(worker boruta.WorkerUUID) error {
        if err != nil {
                return err
        }
-       err = wl.SetWorkerKey(worker, key)
+       err = wl.setWorkerKey(worker, key)
        return err
 }
 
 // putInMaintenance orders Dryad to enter maintenance mode.
 func (wl *WorkerList) putInMaintenance(worker boruta.WorkerUUID) error {
-       addr, err := wl.GetWorkerAddr(worker)
+       addr, err := wl.getWorkerAddr(worker)
        if err != nil {
                return err
        }