From 1a9e53461bbde08e3c2235d07c2c89d8731d16a7 Mon Sep 17 00:00:00 2001 From: Aleksander Mistewicz Date: Tue, 29 May 2018 11:20:44 +0200 Subject: [PATCH] Add dryadAddress and sshAddress to Register When running multiple dryads on a single host or behind NAT, they must listen on different ports and inform boruta about this fact. Change-Id: I35e084b8ee2e2177d36055f7dedacb53ac74bbf0 Signed-off-by: Aleksander Mistewicz --- boruta.go | 4 +++- cmd/dryad/dryad.go | 2 +- rpc/dryad/clientmanager.go | 2 ++ rpc/superviser/reception.go | 31 +++++++++++++++++++++++----- rpc/superviser/reception_test.go | 37 ++++++++++++++++++++++++++++++--- rpc/superviser/superviser.go | 10 +++++---- workers/worker_list_test.go | 38 +++++++++++++++++++++------------- workers/workers.go | 44 ++++++++++++++++++++++++++++++++-------- workers/workers_test.go | 2 +- 9 files changed, 133 insertions(+), 37 deletions(-) diff --git a/boruta.go b/boruta.go index 19bfee8..c4a7e9b 100644 --- a/boruta.go +++ b/boruta.go @@ -196,7 +196,9 @@ type Requests interface { type Superviser interface { // Register adds a new Worker to the system in the MAINTENANCE state. // Capabilities are set on the Worker and can be changed by subsequent Register calls. - Register(caps Capabilities) (err error) + // dryadAddress and sshAddress inform Superviser on address it should use to connect + // using Go RPC and SSH respectively. They should parse to net.TCPAddr. + Register(caps Capabilities, dryadAddress string, sshAddress string) (err error) // SetFail notifies the Server about the Failure of the Worker. // It can additionally contain non-empty reason of the failure. SetFail(uuid WorkerUUID, reason string) (err error) diff --git a/cmd/dryad/dryad.go b/cmd/dryad/dryad.go index 7ebe7df..cf3a2f4 100644 --- a/cmd/dryad/dryad.go +++ b/cmd/dryad/dryad.go @@ -116,7 +116,7 @@ func main() { exitOnErr("failed to initialize connection to boruta:", err) defer boruta.Close() - err = boruta.Register(configuration.Caps) + err = boruta.Register(configuration.Caps, configuration.Address, configuration.SSHAdress) exitOnErr("failed to register to boruta:", err) // Wait for interrupt. diff --git a/rpc/dryad/clientmanager.go b/rpc/dryad/clientmanager.go index 9077d64..f4f4b18 100644 --- a/rpc/dryad/clientmanager.go +++ b/rpc/dryad/clientmanager.go @@ -19,6 +19,8 @@ package dryad +//go:generate go-rpcgen --source=../../boruta.go --type=Dryad --target=dryad.go --package dryad --imports net/rpc,.=git.tizen.org/tools/boruta + import ( "net" diff --git a/rpc/superviser/reception.go b/rpc/superviser/reception.go index bb20c64..988b99c 100644 --- a/rpc/superviser/reception.go +++ b/rpc/superviser/reception.go @@ -20,7 +20,10 @@ // address of the client and call SetWorkerIP. package superviser +//go:generate go-rpcgen --source=../../boruta.go --type=Superviser --target=superviser.go --package=superviser --imports net/rpc,.=git.tizen.org/tools/boruta + import ( + "errors" "net" "net/rpc" @@ -91,13 +94,31 @@ func (sr *superviserReception) serve(conn net.Conn) { srv.ServeConn(conn) } -// Register calls Register and SetWorkerIP of WorkerList if the former call was successful. -func (ab *addressBook) Register(caps boruta.Capabilities) (err error) { - err = ab.wl.Register(caps) +func (ab *addressBook) getTCPAddr(str, ctx string) (*net.TCPAddr, error) { + if str == "" { + return nil, errors.New(ctx + " can't be empty") + } + return net.ResolveTCPAddr("tcp", str) +} + +// Register calls Register of WorkerList. It additionally fills dryadAddress +// and sshAddress with IP address if one is missing from parameters. +func (ab *addressBook) Register(caps boruta.Capabilities, dryadAddress string, sshAddress string) (err error) { + dryad, err := ab.getTCPAddr(dryadAddress, "dryadAddress") if err != nil { - return + return err + } + if dryad.IP == nil { + dryad.IP = ab.ip + } + sshd, err := ab.getTCPAddr(sshAddress, "sshAddress") + if err != nil { + return err + } + if sshd.IP == nil { + sshd.IP = ab.ip } - return ab.wl.SetWorkerIP(caps.GetWorkerUUID(), ab.ip) + return ab.wl.Register(caps, dryad.String(), sshd.String()) } // SetFail calls SetFail of WorkerList. diff --git a/rpc/superviser/reception_test.go b/rpc/superviser/reception_test.go index b9aba08..890c51b 100644 --- a/rpc/superviser/reception_test.go +++ b/rpc/superviser/reception_test.go @@ -18,6 +18,7 @@ package superviser import ( "net" + "net/rpc" "git.tizen.org/tools/boruta" "git.tizen.org/tools/boruta/workers" @@ -30,6 +31,11 @@ var _ = Describe("superviserReception", func() { var i *superviserReception var wl *workers.WorkerList var addr net.Addr + uuidStr := "test-uuid" + refAddr := net.TCPAddr{ + IP: net.IPv4(127, 0, 0, 1), + Port: 7175, + } BeforeEach(func() { var err error @@ -40,16 +46,41 @@ var _ = Describe("superviserReception", func() { }) It("should get IP from connection", func() { - uuidStr := "test-uuid" uuid := boruta.WorkerUUID(uuidStr) - c, err := DialSuperviserClient(addr.String()) + conn, err := net.DialTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)}, addr.(*net.TCPAddr)) Expect(err).ToNot(HaveOccurred()) + c := NewSuperviserClient(rpc.NewClient(conn)) - err = c.Register(boruta.Capabilities{"UUID": uuidStr}) + err = c.Register(boruta.Capabilities{"UUID": uuidStr}, ":7175", ":22") Expect(err).ToNot(HaveOccurred()) ip, err := wl.GetWorkerIP(uuid) Expect(err).ToNot(HaveOccurred()) Expect(ip).ToNot(BeNil()) }) + + It("should get IP from argument", func() { + refIP := net.IPv4(127, 0, 0, 1) + uuid := boruta.WorkerUUID(uuidStr) + c, err := DialSuperviserClient(addr.String()) + Expect(err).ToNot(HaveOccurred()) + + err = c.Register(boruta.Capabilities{"UUID": uuidStr}, refIP.String()+":7175", refIP.String()+":22") + Expect(err).ToNot(HaveOccurred()) + + ip, err := wl.GetWorkerIP(uuid) + Expect(err).ToNot(HaveOccurred()) + Expect(ip).To(Equal(refIP)) + }) + + It("should fail to call with either address empty", func() { + c, err := DialSuperviserClient(addr.String()) + Expect(err).ToNot(HaveOccurred()) + + err = c.Register(boruta.Capabilities{"UUID": uuidStr}, "", refAddr.IP.String()+":22") + Expect(err).To(HaveOccurred()) + + err = c.Register(boruta.Capabilities{"UUID": uuidStr}, refAddr.String(), "") + Expect(err).To(HaveOccurred()) + }) }) diff --git a/rpc/superviser/superviser.go b/rpc/superviser/superviser.go index a31081e..61658f2 100644 --- a/rpc/superviser/superviser.go +++ b/rpc/superviser/superviser.go @@ -24,7 +24,9 @@ func RegisterSuperviserService(server *rpc.Server, impl Superviser) error { // SuperviserRegisterRequest is a helper structure for Register method. type SuperviserRegisterRequest struct { - Caps Capabilities + Caps Capabilities + DryadAddress string + SshAddress string } // SuperviserRegisterResponse is a helper structure for Register method. @@ -33,7 +35,7 @@ type SuperviserRegisterResponse struct { // Register is RPC implementation of Register calling it. func (s *SuperviserService) Register(request *SuperviserRegisterRequest, response *SuperviserRegisterResponse) (err error) { - err = s.impl.Register(request.Caps) + err = s.impl.Register(request.Caps, request.DryadAddress, request.SshAddress) return } @@ -75,8 +77,8 @@ func (_c *SuperviserClient) Close() error { } // Register is part of implementation of Superviser calling corresponding method on RPC server. -func (_c *SuperviserClient) Register(caps Capabilities) (err error) { - _request := &SuperviserRegisterRequest{caps} +func (_c *SuperviserClient) Register(caps Capabilities, dryadAddress string, sshAddress string) (err error) { + _request := &SuperviserRegisterRequest{caps, dryadAddress, sshAddress} _response := &SuperviserRegisterResponse{} err = _c.client.Call("Superviser.Register", _request, _response) return err diff --git a/workers/worker_list_test.go b/workers/worker_list_test.go index b3b0ddd..1c85098 100644 --- a/workers/worker_list_test.go +++ b/workers/worker_list_test.go @@ -36,6 +36,14 @@ import ( var _ = Describe("WorkerList", func() { var wl *WorkerList + dryadAddr := net.TCPAddr{ + IP: net.IPv4(127, 0, 0, 1), + Port: 7175, + } + sshdAddr := net.TCPAddr{ + IP: net.IPv4(127, 0, 0, 1), + Port: 22, + } BeforeEach(func() { wl = NewWorkerList() }) @@ -81,7 +89,7 @@ var _ = Describe("WorkerList", func() { } It("should fail if UUID is not present", func() { - err := wl.Register(nil) + err := wl.Register(nil, "", "") Expect(err).To(Equal(ErrMissingUUID)) }) @@ -93,7 +101,7 @@ var _ = Describe("WorkerList", func() { It("should add Worker in MAINTENANCE state", func() { caps := getRandomCaps() - err := wl.Register(caps) + err := wl.Register(caps, dryadAddr.String(), sshdAddr.String()) Expect(err).ToNot(HaveOccurred()) uuid := WorkerUUID(caps[UUID]) wl.mutex.RLock() @@ -110,14 +118,14 @@ var _ = Describe("WorkerList", func() { caps := getRandomCaps() By("registering worker") - err = wl.Register(caps) + err = wl.Register(caps, dryadAddr.String(), sshdAddr.String()) Expect(err).ToNot(HaveOccurred()) registeredWorkers = append(registeredWorkers, caps[UUID]) compareLists() By("updating the caps") caps["test-key"] = "test-value" - err = wl.Register(caps) + err = wl.Register(caps, dryadAddr.String(), sshdAddr.String()) Expect(err).ToNot(HaveOccurred()) wl.mutex.RLock() Expect(wl.workers[WorkerUUID(caps[UUID])].Caps).To(Equal(caps)) @@ -133,7 +141,7 @@ var _ = Describe("WorkerList", func() { caps := getRandomCaps() By("registering first worker") - err = wl.Register(caps) + err = wl.Register(caps, dryadAddr.String(), sshdAddr.String()) Expect(err).ToNot(HaveOccurred()) registeredWorkers = append(registeredWorkers, caps[UUID]) compareLists() @@ -148,13 +156,13 @@ var _ = Describe("WorkerList", func() { caps2 := getRandomCaps() By("registering first worker") - err = wl.Register(caps1) + err = wl.Register(caps1, dryadAddr.String(), sshdAddr.String()) Expect(err).ToNot(HaveOccurred()) registeredWorkers = append(registeredWorkers, caps1[UUID]) compareLists() By("registering second worker") - err = wl.Register(caps2) + err = wl.Register(caps2, dryadAddr.String(), sshdAddr.String()) Expect(err).ToNot(HaveOccurred()) registeredWorkers = append(registeredWorkers, caps2[UUID]) compareLists() @@ -172,13 +180,13 @@ var _ = Describe("WorkerList", func() { return newUUID } registerWorker := func() WorkerUUID { - capsUUID := getUUID() - err := wl.Register(Capabilities{UUID: capsUUID}) + capsUUID := randomUUID() + err := wl.Register(Capabilities{UUID: string(capsUUID)}, dryadAddr.String(), sshdAddr.String()) Expect(err).ToNot(HaveOccurred()) wl.mutex.RLock() Expect(wl.workers).ToNot(BeEmpty()) wl.mutex.RUnlock() - return WorkerUUID(capsUUID) + return capsUUID } BeforeEach(func() { @@ -343,7 +351,8 @@ var _ = Describe("WorkerList", func() { info, ok = wl.workers[worker] Expect(ok).To(BeTrue()) Expect(info.key).To(BeNil()) - info.ip = ip + info.dryad = new(net.TCPAddr) + info.dryad.IP = ip wl.mutex.Unlock() }) AfterEach(func() { @@ -528,7 +537,7 @@ var _ = Describe("WorkerList", func() { registerAndSetGroups := func(groups Groups, caps Capabilities) WorkerInfo { capsUUID := getUUID() caps[UUID] = capsUUID - err := wl.Register(caps) + err := wl.Register(caps, dryadAddr.String(), sshdAddr.String()) Expect(err).ToNot(HaveOccurred()) workerID := WorkerUUID(capsUUID) @@ -818,7 +827,8 @@ var _ = Describe("WorkerList", func() { info, ok = wl.workers[worker] Expect(ok).To(BeTrue()) Expect(info.key).To(BeNil()) - info.ip = ip + info.dryad = new(net.TCPAddr) + info.dryad.IP = ip }) It("should set worker into IDLE state and prepare a key", func() { gomock.InOrder( @@ -930,7 +940,7 @@ var _ = Describe("WorkerList", func() { workerUUID := WorkerUUID(capsUUID) caps[UUID] = capsUUID - wl.Register(caps) + wl.Register(caps, dryadAddr.String(), sshdAddr.String()) wl.mutex.RLock() w, ok := wl.workers[workerUUID] wl.mutex.RUnlock() diff --git a/workers/workers.go b/workers/workers.go index 4771226..6e3dd7c 100644 --- a/workers/workers.go +++ b/workers/workers.go @@ -19,6 +19,7 @@ package workers import ( "crypto/rsa" + "errors" "math" "net" "sync" @@ -35,8 +36,9 @@ const UUID string = "UUID" // (public and private) structures representing Worker. type mapWorker struct { WorkerInfo - ip net.IP - key *rsa.PrivateKey + dryad *net.TCPAddr + sshd *net.TCPAddr + key *rsa.PrivateKey } // WorkerList implements Superviser and Workers interfaces. @@ -74,26 +76,51 @@ func NewWorkerList() *WorkerList { } // Register is an implementation of Register from Superviser interface. -// UUID, which identifies Worker, must be present in caps. -func (wl *WorkerList) Register(caps Capabilities) error { +// UUID, which identifies Worker, must be present in caps. Both dryadAddress and +// sshAddress must resolve and parse to net.TCPAddr. Neither IP address nor port number +// can not be ommited. +func (wl *WorkerList) Register(caps Capabilities, dryadAddress string, sshAddress string) error { capsUUID, present := caps[UUID] if !present { return ErrMissingUUID } uuid := WorkerUUID(capsUUID) + + dryad, err := net.ResolveTCPAddr("tcp", dryadAddress) + if err != nil { + return err + } + // dryad.IP is empty if dryadAddress provided port number only. + if dryad.IP == nil { + return errors.New("missing IP in dryad address") + } + sshd, err := net.ResolveTCPAddr("tcp", sshAddress) + if err != nil { + return err + } + // same as with dryad.IP + if sshd.IP == nil { + return errors.New("missing IP in ssh address") + } + wl.mutex.Lock() defer wl.mutex.Unlock() worker, registered := wl.workers[uuid] if registered { - // Subsequent Register calls update the caps. + // Subsequent Register calls update the caps and addresses. worker.Caps = caps + worker.dryad = dryad + worker.sshd = sshd } else { wl.workers[uuid] = &mapWorker{ WorkerInfo: WorkerInfo{ WorkerUUID: uuid, State: MAINTENANCE, Caps: caps, - }} + }, + dryad: dryad, + sshd: sshd, + } } return nil } @@ -266,7 +293,8 @@ func (wl *WorkerList) SetWorkerIP(uuid WorkerUUID, ip net.IP) error { if !ok { return ErrWorkerNotFound } - worker.ip = ip + // FIXME + worker.dryad.IP = ip return nil } @@ -278,7 +306,7 @@ func (wl *WorkerList) GetWorkerIP(uuid WorkerUUID) (net.IP, error) { if !ok { return nil, ErrWorkerNotFound } - return worker.ip, nil + return worker.dryad.IP, nil } // SetWorkerKey stores private key in the worker structure referenced by uuid. diff --git a/workers/workers_test.go b/workers/workers_test.go index 3a49000..43f25da 100644 --- a/workers/workers_test.go +++ b/workers/workers_test.go @@ -57,7 +57,7 @@ var _ = Describe("WorkerList", func() { } b.Time("register", func() { for i := 0; i < maximumWorkers; i++ { - err := wl.Register(caps[i]) + err := wl.Register(caps[i], "127.0.0.1:7175", "127.0.0.1:22") Expect(err).ToNot(HaveOccurred()) } }) -- 2.7.4