Add dryadAddress and sshAddress to Register 15/180915/7
authorAleksander Mistewicz <a.mistewicz@samsung.com>
Tue, 29 May 2018 09:20:44 +0000 (11:20 +0200)
committerAleksander Mistewicz <a.mistewicz@samsung.com>
Fri, 3 Aug 2018 11:36:38 +0000 (13:36 +0200)
When running multiple dryads on a single host or behind NAT, they must
listen on different ports and inform boruta about this fact.

Change-Id: I35e084b8ee2e2177d36055f7dedacb53ac74bbf0
Signed-off-by: Aleksander Mistewicz <a.mistewicz@samsung.com>
boruta.go
cmd/dryad/dryad.go
rpc/dryad/clientmanager.go
rpc/superviser/reception.go
rpc/superviser/reception_test.go
rpc/superviser/superviser.go
workers/worker_list_test.go
workers/workers.go
workers/workers_test.go

index 19bfee8..c4a7e9b 100644 (file)
--- a/boruta.go
+++ b/boruta.go
@@ -196,7 +196,9 @@ type Requests interface {
 type Superviser interface {
        // Register adds a new Worker to the system in the MAINTENANCE state.
        // Capabilities are set on the Worker and can be changed by subsequent Register calls.
-       Register(caps Capabilities) (err error)
+       // dryadAddress and sshAddress inform Superviser on address it should use to connect
+       // using Go RPC and SSH respectively. They should parse to net.TCPAddr.
+       Register(caps Capabilities, dryadAddress string, sshAddress string) (err error)
        // SetFail notifies the Server about the Failure of the Worker.
        // It can additionally contain non-empty reason of the failure.
        SetFail(uuid WorkerUUID, reason string) (err error)
index 7ebe7df..cf3a2f4 100644 (file)
@@ -116,7 +116,7 @@ func main() {
        exitOnErr("failed to initialize connection to boruta:", err)
        defer boruta.Close()
 
-       err = boruta.Register(configuration.Caps)
+       err = boruta.Register(configuration.Caps, configuration.Address, configuration.SSHAdress)
        exitOnErr("failed to register to boruta:", err)
 
        // Wait for interrupt.
index 9077d64..f4f4b18 100644 (file)
@@ -19,6 +19,8 @@
 
 package dryad
 
+//go:generate go-rpcgen --source=../../boruta.go --type=Dryad --target=dryad.go --package dryad --imports net/rpc,.=git.tizen.org/tools/boruta
+
 import (
        "net"
 
index bb20c64..988b99c 100644 (file)
 // address of the client and call SetWorkerIP.
 package superviser
 
+//go:generate go-rpcgen --source=../../boruta.go --type=Superviser --target=superviser.go --package=superviser --imports net/rpc,.=git.tizen.org/tools/boruta
+
 import (
+       "errors"
        "net"
        "net/rpc"
 
@@ -91,13 +94,31 @@ func (sr *superviserReception) serve(conn net.Conn) {
        srv.ServeConn(conn)
 }
 
-// Register calls Register and SetWorkerIP of WorkerList if the former call was successful.
-func (ab *addressBook) Register(caps boruta.Capabilities) (err error) {
-       err = ab.wl.Register(caps)
+func (ab *addressBook) getTCPAddr(str, ctx string) (*net.TCPAddr, error) {
+       if str == "" {
+               return nil, errors.New(ctx + " can't be empty")
+       }
+       return net.ResolveTCPAddr("tcp", str)
+}
+
+// Register calls Register of WorkerList. It additionally fills dryadAddress
+// and sshAddress with IP address if one is missing from parameters.
+func (ab *addressBook) Register(caps boruta.Capabilities, dryadAddress string, sshAddress string) (err error) {
+       dryad, err := ab.getTCPAddr(dryadAddress, "dryadAddress")
        if err != nil {
-               return
+               return err
+       }
+       if dryad.IP == nil {
+               dryad.IP = ab.ip
+       }
+       sshd, err := ab.getTCPAddr(sshAddress, "sshAddress")
+       if err != nil {
+               return err
+       }
+       if sshd.IP == nil {
+               sshd.IP = ab.ip
        }
-       return ab.wl.SetWorkerIP(caps.GetWorkerUUID(), ab.ip)
+       return ab.wl.Register(caps, dryad.String(), sshd.String())
 }
 
 // SetFail calls SetFail of WorkerList.
index b9aba08..890c51b 100644 (file)
@@ -18,6 +18,7 @@ package superviser
 
 import (
        "net"
+       "net/rpc"
 
        "git.tizen.org/tools/boruta"
        "git.tizen.org/tools/boruta/workers"
@@ -30,6 +31,11 @@ var _ = Describe("superviserReception", func() {
        var i *superviserReception
        var wl *workers.WorkerList
        var addr net.Addr
+       uuidStr := "test-uuid"
+       refAddr := net.TCPAddr{
+               IP:   net.IPv4(127, 0, 0, 1),
+               Port: 7175,
+       }
 
        BeforeEach(func() {
                var err error
@@ -40,16 +46,41 @@ var _ = Describe("superviserReception", func() {
        })
 
        It("should get IP from connection", func() {
-               uuidStr := "test-uuid"
                uuid := boruta.WorkerUUID(uuidStr)
-               c, err := DialSuperviserClient(addr.String())
+               conn, err := net.DialTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)}, addr.(*net.TCPAddr))
                Expect(err).ToNot(HaveOccurred())
+               c := NewSuperviserClient(rpc.NewClient(conn))
 
-               err = c.Register(boruta.Capabilities{"UUID": uuidStr})
+               err = c.Register(boruta.Capabilities{"UUID": uuidStr}, ":7175", ":22")
                Expect(err).ToNot(HaveOccurred())
 
                ip, err := wl.GetWorkerIP(uuid)
                Expect(err).ToNot(HaveOccurred())
                Expect(ip).ToNot(BeNil())
        })
+
+       It("should get IP from argument", func() {
+               refIP := net.IPv4(127, 0, 0, 1)
+               uuid := boruta.WorkerUUID(uuidStr)
+               c, err := DialSuperviserClient(addr.String())
+               Expect(err).ToNot(HaveOccurred())
+
+               err = c.Register(boruta.Capabilities{"UUID": uuidStr}, refIP.String()+":7175", refIP.String()+":22")
+               Expect(err).ToNot(HaveOccurred())
+
+               ip, err := wl.GetWorkerIP(uuid)
+               Expect(err).ToNot(HaveOccurred())
+               Expect(ip).To(Equal(refIP))
+       })
+
+       It("should fail to call with either address empty", func() {
+               c, err := DialSuperviserClient(addr.String())
+               Expect(err).ToNot(HaveOccurred())
+
+               err = c.Register(boruta.Capabilities{"UUID": uuidStr}, "", refAddr.IP.String()+":22")
+               Expect(err).To(HaveOccurred())
+
+               err = c.Register(boruta.Capabilities{"UUID": uuidStr}, refAddr.String(), "")
+               Expect(err).To(HaveOccurred())
+       })
 })
index a31081e..61658f2 100644 (file)
@@ -24,7 +24,9 @@ func RegisterSuperviserService(server *rpc.Server, impl Superviser) error {
 
 // SuperviserRegisterRequest is a helper structure for Register method.
 type SuperviserRegisterRequest struct {
-       Caps Capabilities
+       Caps         Capabilities
+       DryadAddress string
+       SshAddress   string
 }
 
 // SuperviserRegisterResponse is a helper structure for Register method.
@@ -33,7 +35,7 @@ type SuperviserRegisterResponse struct {
 
 // Register is RPC implementation of Register calling it.
 func (s *SuperviserService) Register(request *SuperviserRegisterRequest, response *SuperviserRegisterResponse) (err error) {
-       err = s.impl.Register(request.Caps)
+       err = s.impl.Register(request.Caps, request.DryadAddress, request.SshAddress)
        return
 }
 
@@ -75,8 +77,8 @@ func (_c *SuperviserClient) Close() error {
 }
 
 // Register is part of implementation of Superviser calling corresponding method on RPC server.
-func (_c *SuperviserClient) Register(caps Capabilities) (err error) {
-       _request := &SuperviserRegisterRequest{caps}
+func (_c *SuperviserClient) Register(caps Capabilities, dryadAddress string, sshAddress string) (err error) {
+       _request := &SuperviserRegisterRequest{caps, dryadAddress, sshAddress}
        _response := &SuperviserRegisterResponse{}
        err = _c.client.Call("Superviser.Register", _request, _response)
        return err
index b3b0ddd..1c85098 100644 (file)
@@ -36,6 +36,14 @@ import (
 
 var _ = Describe("WorkerList", func() {
        var wl *WorkerList
+       dryadAddr := net.TCPAddr{
+               IP:   net.IPv4(127, 0, 0, 1),
+               Port: 7175,
+       }
+       sshdAddr := net.TCPAddr{
+               IP:   net.IPv4(127, 0, 0, 1),
+               Port: 22,
+       }
        BeforeEach(func() {
                wl = NewWorkerList()
        })
@@ -81,7 +89,7 @@ var _ = Describe("WorkerList", func() {
                }
 
                It("should fail if UUID is not present", func() {
-                       err := wl.Register(nil)
+                       err := wl.Register(nil, "", "")
                        Expect(err).To(Equal(ErrMissingUUID))
                })
 
@@ -93,7 +101,7 @@ var _ = Describe("WorkerList", func() {
 
                It("should add Worker in MAINTENANCE state", func() {
                        caps := getRandomCaps()
-                       err := wl.Register(caps)
+                       err := wl.Register(caps, dryadAddr.String(), sshdAddr.String())
                        Expect(err).ToNot(HaveOccurred())
                        uuid := WorkerUUID(caps[UUID])
                        wl.mutex.RLock()
@@ -110,14 +118,14 @@ var _ = Describe("WorkerList", func() {
                        caps := getRandomCaps()
 
                        By("registering worker")
-                       err = wl.Register(caps)
+                       err = wl.Register(caps, dryadAddr.String(), sshdAddr.String())
                        Expect(err).ToNot(HaveOccurred())
                        registeredWorkers = append(registeredWorkers, caps[UUID])
                        compareLists()
 
                        By("updating the caps")
                        caps["test-key"] = "test-value"
-                       err = wl.Register(caps)
+                       err = wl.Register(caps, dryadAddr.String(), sshdAddr.String())
                        Expect(err).ToNot(HaveOccurred())
                        wl.mutex.RLock()
                        Expect(wl.workers[WorkerUUID(caps[UUID])].Caps).To(Equal(caps))
@@ -133,7 +141,7 @@ var _ = Describe("WorkerList", func() {
                        caps := getRandomCaps()
 
                        By("registering first worker")
-                       err = wl.Register(caps)
+                       err = wl.Register(caps, dryadAddr.String(), sshdAddr.String())
                        Expect(err).ToNot(HaveOccurred())
                        registeredWorkers = append(registeredWorkers, caps[UUID])
                        compareLists()
@@ -148,13 +156,13 @@ var _ = Describe("WorkerList", func() {
                        caps2 := getRandomCaps()
 
                        By("registering first worker")
-                       err = wl.Register(caps1)
+                       err = wl.Register(caps1, dryadAddr.String(), sshdAddr.String())
                        Expect(err).ToNot(HaveOccurred())
                        registeredWorkers = append(registeredWorkers, caps1[UUID])
                        compareLists()
 
                        By("registering second worker")
-                       err = wl.Register(caps2)
+                       err = wl.Register(caps2, dryadAddr.String(), sshdAddr.String())
                        Expect(err).ToNot(HaveOccurred())
                        registeredWorkers = append(registeredWorkers, caps2[UUID])
                        compareLists()
@@ -172,13 +180,13 @@ var _ = Describe("WorkerList", func() {
                        return newUUID
                }
                registerWorker := func() WorkerUUID {
-                       capsUUID := getUUID()
-                       err := wl.Register(Capabilities{UUID: capsUUID})
+                       capsUUID := randomUUID()
+                       err := wl.Register(Capabilities{UUID: string(capsUUID)}, dryadAddr.String(), sshdAddr.String())
                        Expect(err).ToNot(HaveOccurred())
                        wl.mutex.RLock()
                        Expect(wl.workers).ToNot(BeEmpty())
                        wl.mutex.RUnlock()
-                       return WorkerUUID(capsUUID)
+                       return capsUUID
                }
 
                BeforeEach(func() {
@@ -343,7 +351,8 @@ var _ = Describe("WorkerList", func() {
                                        info, ok = wl.workers[worker]
                                        Expect(ok).To(BeTrue())
                                        Expect(info.key).To(BeNil())
-                                       info.ip = ip
+                                       info.dryad = new(net.TCPAddr)
+                                       info.dryad.IP = ip
                                        wl.mutex.Unlock()
                                })
                                AfterEach(func() {
@@ -528,7 +537,7 @@ var _ = Describe("WorkerList", func() {
                        registerAndSetGroups := func(groups Groups, caps Capabilities) WorkerInfo {
                                capsUUID := getUUID()
                                caps[UUID] = capsUUID
-                               err := wl.Register(caps)
+                               err := wl.Register(caps, dryadAddr.String(), sshdAddr.String())
                                Expect(err).ToNot(HaveOccurred())
                                workerID := WorkerUUID(capsUUID)
 
@@ -818,7 +827,8 @@ var _ = Describe("WorkerList", func() {
                                        info, ok = wl.workers[worker]
                                        Expect(ok).To(BeTrue())
                                        Expect(info.key).To(BeNil())
-                                       info.ip = ip
+                                       info.dryad = new(net.TCPAddr)
+                                       info.dryad.IP = ip
                                })
                                It("should set worker into IDLE state and prepare a key", func() {
                                        gomock.InOrder(
@@ -930,7 +940,7 @@ var _ = Describe("WorkerList", func() {
                        workerUUID := WorkerUUID(capsUUID)
 
                        caps[UUID] = capsUUID
-                       wl.Register(caps)
+                       wl.Register(caps, dryadAddr.String(), sshdAddr.String())
                        wl.mutex.RLock()
                        w, ok := wl.workers[workerUUID]
                        wl.mutex.RUnlock()
index 4771226..6e3dd7c 100644 (file)
@@ -19,6 +19,7 @@ package workers
 
 import (
        "crypto/rsa"
+       "errors"
        "math"
        "net"
        "sync"
@@ -35,8 +36,9 @@ const UUID string = "UUID"
 // (public and private) structures representing Worker.
 type mapWorker struct {
        WorkerInfo
-       ip  net.IP
-       key *rsa.PrivateKey
+       dryad *net.TCPAddr
+       sshd  *net.TCPAddr
+       key   *rsa.PrivateKey
 }
 
 // WorkerList implements Superviser and Workers interfaces.
@@ -74,26 +76,51 @@ func NewWorkerList() *WorkerList {
 }
 
 // Register is an implementation of Register from Superviser interface.
-// UUID, which identifies Worker, must be present in caps.
-func (wl *WorkerList) Register(caps Capabilities) error {
+// UUID, which identifies Worker, must be present in caps. Both dryadAddress and
+// sshAddress must resolve and parse to net.TCPAddr. Neither IP address nor port number
+// can not be ommited.
+func (wl *WorkerList) Register(caps Capabilities, dryadAddress string, sshAddress string) error {
        capsUUID, present := caps[UUID]
        if !present {
                return ErrMissingUUID
        }
        uuid := WorkerUUID(capsUUID)
+
+       dryad, err := net.ResolveTCPAddr("tcp", dryadAddress)
+       if err != nil {
+               return err
+       }
+       // dryad.IP is empty if dryadAddress provided port number only.
+       if dryad.IP == nil {
+               return errors.New("missing IP in dryad address")
+       }
+       sshd, err := net.ResolveTCPAddr("tcp", sshAddress)
+       if err != nil {
+               return err
+       }
+       // same as with dryad.IP
+       if sshd.IP == nil {
+               return errors.New("missing IP in ssh address")
+       }
+
        wl.mutex.Lock()
        defer wl.mutex.Unlock()
        worker, registered := wl.workers[uuid]
        if registered {
-               // Subsequent Register calls update the caps.
+               // Subsequent Register calls update the caps and addresses.
                worker.Caps = caps
+               worker.dryad = dryad
+               worker.sshd = sshd
        } else {
                wl.workers[uuid] = &mapWorker{
                        WorkerInfo: WorkerInfo{
                                WorkerUUID: uuid,
                                State:      MAINTENANCE,
                                Caps:       caps,
-                       }}
+                       },
+                       dryad: dryad,
+                       sshd:  sshd,
+               }
        }
        return nil
 }
@@ -266,7 +293,8 @@ func (wl *WorkerList) SetWorkerIP(uuid WorkerUUID, ip net.IP) error {
        if !ok {
                return ErrWorkerNotFound
        }
-       worker.ip = ip
+       // FIXME
+       worker.dryad.IP = ip
        return nil
 }
 
@@ -278,7 +306,7 @@ func (wl *WorkerList) GetWorkerIP(uuid WorkerUUID) (net.IP, error) {
        if !ok {
                return nil, ErrWorkerNotFound
        }
-       return worker.ip, nil
+       return worker.dryad.IP, nil
 }
 
 // SetWorkerKey stores private key in the worker structure referenced by uuid.
index 3a49000..43f25da 100644 (file)
@@ -57,7 +57,7 @@ var _ = Describe("WorkerList", func() {
                }
                b.Time("register", func() {
                        for i := 0; i < maximumWorkers; i++ {
-                               err := wl.Register(caps[i])
+                               err := wl.Register(caps[i], "127.0.0.1:7175", "127.0.0.1:22")
                                Expect(err).ToNot(HaveOccurred())
                        }
                })