type Superviser interface {
// Register adds a new Worker to the system in the MAINTENANCE state.
// Capabilities are set on the Worker and can be changed by subsequent Register calls.
- Register(caps Capabilities) (err error)
+ // dryadAddress and sshAddress inform Superviser on address it should use to connect
+ // using Go RPC and SSH respectively. They should parse to net.TCPAddr.
+ Register(caps Capabilities, dryadAddress string, sshAddress string) (err error)
// SetFail notifies the Server about the Failure of the Worker.
// It can additionally contain non-empty reason of the failure.
SetFail(uuid WorkerUUID, reason string) (err error)
exitOnErr("failed to initialize connection to boruta:", err)
defer boruta.Close()
- err = boruta.Register(configuration.Caps)
+ err = boruta.Register(configuration.Caps, configuration.Address, configuration.SSHAdress)
exitOnErr("failed to register to boruta:", err)
// Wait for interrupt.
package dryad
+//go:generate go-rpcgen --source=../../boruta.go --type=Dryad --target=dryad.go --package dryad --imports net/rpc,.=git.tizen.org/tools/boruta
+
import (
"net"
// address of the client and call SetWorkerIP.
package superviser
+//go:generate go-rpcgen --source=../../boruta.go --type=Superviser --target=superviser.go --package=superviser --imports net/rpc,.=git.tizen.org/tools/boruta
+
import (
+ "errors"
"net"
"net/rpc"
srv.ServeConn(conn)
}
-// Register calls Register and SetWorkerIP of WorkerList if the former call was successful.
-func (ab *addressBook) Register(caps boruta.Capabilities) (err error) {
- err = ab.wl.Register(caps)
+func (ab *addressBook) getTCPAddr(str, ctx string) (*net.TCPAddr, error) {
+ if str == "" {
+ return nil, errors.New(ctx + " can't be empty")
+ }
+ return net.ResolveTCPAddr("tcp", str)
+}
+
+// Register calls Register of WorkerList. It additionally fills dryadAddress
+// and sshAddress with IP address if one is missing from parameters.
+func (ab *addressBook) Register(caps boruta.Capabilities, dryadAddress string, sshAddress string) (err error) {
+ dryad, err := ab.getTCPAddr(dryadAddress, "dryadAddress")
if err != nil {
- return
+ return err
+ }
+ if dryad.IP == nil {
+ dryad.IP = ab.ip
+ }
+ sshd, err := ab.getTCPAddr(sshAddress, "sshAddress")
+ if err != nil {
+ return err
+ }
+ if sshd.IP == nil {
+ sshd.IP = ab.ip
}
- return ab.wl.SetWorkerIP(caps.GetWorkerUUID(), ab.ip)
+ return ab.wl.Register(caps, dryad.String(), sshd.String())
}
// SetFail calls SetFail of WorkerList.
import (
"net"
+ "net/rpc"
"git.tizen.org/tools/boruta"
"git.tizen.org/tools/boruta/workers"
var i *superviserReception
var wl *workers.WorkerList
var addr net.Addr
+ uuidStr := "test-uuid"
+ refAddr := net.TCPAddr{
+ IP: net.IPv4(127, 0, 0, 1),
+ Port: 7175,
+ }
BeforeEach(func() {
var err error
})
It("should get IP from connection", func() {
- uuidStr := "test-uuid"
uuid := boruta.WorkerUUID(uuidStr)
- c, err := DialSuperviserClient(addr.String())
+ conn, err := net.DialTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)}, addr.(*net.TCPAddr))
Expect(err).ToNot(HaveOccurred())
+ c := NewSuperviserClient(rpc.NewClient(conn))
- err = c.Register(boruta.Capabilities{"UUID": uuidStr})
+ err = c.Register(boruta.Capabilities{"UUID": uuidStr}, ":7175", ":22")
Expect(err).ToNot(HaveOccurred())
ip, err := wl.GetWorkerIP(uuid)
Expect(err).ToNot(HaveOccurred())
Expect(ip).ToNot(BeNil())
})
+
+ It("should get IP from argument", func() {
+ refIP := net.IPv4(127, 0, 0, 1)
+ uuid := boruta.WorkerUUID(uuidStr)
+ c, err := DialSuperviserClient(addr.String())
+ Expect(err).ToNot(HaveOccurred())
+
+ err = c.Register(boruta.Capabilities{"UUID": uuidStr}, refIP.String()+":7175", refIP.String()+":22")
+ Expect(err).ToNot(HaveOccurred())
+
+ ip, err := wl.GetWorkerIP(uuid)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(ip).To(Equal(refIP))
+ })
+
+ It("should fail to call with either address empty", func() {
+ c, err := DialSuperviserClient(addr.String())
+ Expect(err).ToNot(HaveOccurred())
+
+ err = c.Register(boruta.Capabilities{"UUID": uuidStr}, "", refAddr.IP.String()+":22")
+ Expect(err).To(HaveOccurred())
+
+ err = c.Register(boruta.Capabilities{"UUID": uuidStr}, refAddr.String(), "")
+ Expect(err).To(HaveOccurred())
+ })
})
// SuperviserRegisterRequest is a helper structure for Register method.
type SuperviserRegisterRequest struct {
- Caps Capabilities
+ Caps Capabilities
+ DryadAddress string
+ SshAddress string
}
// SuperviserRegisterResponse is a helper structure for Register method.
// Register is RPC implementation of Register calling it.
func (s *SuperviserService) Register(request *SuperviserRegisterRequest, response *SuperviserRegisterResponse) (err error) {
- err = s.impl.Register(request.Caps)
+ err = s.impl.Register(request.Caps, request.DryadAddress, request.SshAddress)
return
}
}
// Register is part of implementation of Superviser calling corresponding method on RPC server.
-func (_c *SuperviserClient) Register(caps Capabilities) (err error) {
- _request := &SuperviserRegisterRequest{caps}
+func (_c *SuperviserClient) Register(caps Capabilities, dryadAddress string, sshAddress string) (err error) {
+ _request := &SuperviserRegisterRequest{caps, dryadAddress, sshAddress}
_response := &SuperviserRegisterResponse{}
err = _c.client.Call("Superviser.Register", _request, _response)
return err
var _ = Describe("WorkerList", func() {
var wl *WorkerList
+ dryadAddr := net.TCPAddr{
+ IP: net.IPv4(127, 0, 0, 1),
+ Port: 7175,
+ }
+ sshdAddr := net.TCPAddr{
+ IP: net.IPv4(127, 0, 0, 1),
+ Port: 22,
+ }
BeforeEach(func() {
wl = NewWorkerList()
})
}
It("should fail if UUID is not present", func() {
- err := wl.Register(nil)
+ err := wl.Register(nil, "", "")
Expect(err).To(Equal(ErrMissingUUID))
})
It("should add Worker in MAINTENANCE state", func() {
caps := getRandomCaps()
- err := wl.Register(caps)
+ err := wl.Register(caps, dryadAddr.String(), sshdAddr.String())
Expect(err).ToNot(HaveOccurred())
uuid := WorkerUUID(caps[UUID])
wl.mutex.RLock()
caps := getRandomCaps()
By("registering worker")
- err = wl.Register(caps)
+ err = wl.Register(caps, dryadAddr.String(), sshdAddr.String())
Expect(err).ToNot(HaveOccurred())
registeredWorkers = append(registeredWorkers, caps[UUID])
compareLists()
By("updating the caps")
caps["test-key"] = "test-value"
- err = wl.Register(caps)
+ err = wl.Register(caps, dryadAddr.String(), sshdAddr.String())
Expect(err).ToNot(HaveOccurred())
wl.mutex.RLock()
Expect(wl.workers[WorkerUUID(caps[UUID])].Caps).To(Equal(caps))
caps := getRandomCaps()
By("registering first worker")
- err = wl.Register(caps)
+ err = wl.Register(caps, dryadAddr.String(), sshdAddr.String())
Expect(err).ToNot(HaveOccurred())
registeredWorkers = append(registeredWorkers, caps[UUID])
compareLists()
caps2 := getRandomCaps()
By("registering first worker")
- err = wl.Register(caps1)
+ err = wl.Register(caps1, dryadAddr.String(), sshdAddr.String())
Expect(err).ToNot(HaveOccurred())
registeredWorkers = append(registeredWorkers, caps1[UUID])
compareLists()
By("registering second worker")
- err = wl.Register(caps2)
+ err = wl.Register(caps2, dryadAddr.String(), sshdAddr.String())
Expect(err).ToNot(HaveOccurred())
registeredWorkers = append(registeredWorkers, caps2[UUID])
compareLists()
return newUUID
}
registerWorker := func() WorkerUUID {
- capsUUID := getUUID()
- err := wl.Register(Capabilities{UUID: capsUUID})
+ capsUUID := randomUUID()
+ err := wl.Register(Capabilities{UUID: string(capsUUID)}, dryadAddr.String(), sshdAddr.String())
Expect(err).ToNot(HaveOccurred())
wl.mutex.RLock()
Expect(wl.workers).ToNot(BeEmpty())
wl.mutex.RUnlock()
- return WorkerUUID(capsUUID)
+ return capsUUID
}
BeforeEach(func() {
info, ok = wl.workers[worker]
Expect(ok).To(BeTrue())
Expect(info.key).To(BeNil())
- info.ip = ip
+ info.dryad = new(net.TCPAddr)
+ info.dryad.IP = ip
wl.mutex.Unlock()
})
AfterEach(func() {
registerAndSetGroups := func(groups Groups, caps Capabilities) WorkerInfo {
capsUUID := getUUID()
caps[UUID] = capsUUID
- err := wl.Register(caps)
+ err := wl.Register(caps, dryadAddr.String(), sshdAddr.String())
Expect(err).ToNot(HaveOccurred())
workerID := WorkerUUID(capsUUID)
info, ok = wl.workers[worker]
Expect(ok).To(BeTrue())
Expect(info.key).To(BeNil())
- info.ip = ip
+ info.dryad = new(net.TCPAddr)
+ info.dryad.IP = ip
})
It("should set worker into IDLE state and prepare a key", func() {
gomock.InOrder(
workerUUID := WorkerUUID(capsUUID)
caps[UUID] = capsUUID
- wl.Register(caps)
+ wl.Register(caps, dryadAddr.String(), sshdAddr.String())
wl.mutex.RLock()
w, ok := wl.workers[workerUUID]
wl.mutex.RUnlock()
import (
"crypto/rsa"
+ "errors"
"math"
"net"
"sync"
// (public and private) structures representing Worker.
type mapWorker struct {
WorkerInfo
- ip net.IP
- key *rsa.PrivateKey
+ dryad *net.TCPAddr
+ sshd *net.TCPAddr
+ key *rsa.PrivateKey
}
// WorkerList implements Superviser and Workers interfaces.
}
// Register is an implementation of Register from Superviser interface.
-// UUID, which identifies Worker, must be present in caps.
-func (wl *WorkerList) Register(caps Capabilities) error {
+// UUID, which identifies Worker, must be present in caps. Both dryadAddress and
+// sshAddress must resolve and parse to net.TCPAddr. Neither IP address nor port number
+// can not be ommited.
+func (wl *WorkerList) Register(caps Capabilities, dryadAddress string, sshAddress string) error {
capsUUID, present := caps[UUID]
if !present {
return ErrMissingUUID
}
uuid := WorkerUUID(capsUUID)
+
+ dryad, err := net.ResolveTCPAddr("tcp", dryadAddress)
+ if err != nil {
+ return err
+ }
+ // dryad.IP is empty if dryadAddress provided port number only.
+ if dryad.IP == nil {
+ return errors.New("missing IP in dryad address")
+ }
+ sshd, err := net.ResolveTCPAddr("tcp", sshAddress)
+ if err != nil {
+ return err
+ }
+ // same as with dryad.IP
+ if sshd.IP == nil {
+ return errors.New("missing IP in ssh address")
+ }
+
wl.mutex.Lock()
defer wl.mutex.Unlock()
worker, registered := wl.workers[uuid]
if registered {
- // Subsequent Register calls update the caps.
+ // Subsequent Register calls update the caps and addresses.
worker.Caps = caps
+ worker.dryad = dryad
+ worker.sshd = sshd
} else {
wl.workers[uuid] = &mapWorker{
WorkerInfo: WorkerInfo{
WorkerUUID: uuid,
State: MAINTENANCE,
Caps: caps,
- }}
+ },
+ dryad: dryad,
+ sshd: sshd,
+ }
}
return nil
}
if !ok {
return ErrWorkerNotFound
}
- worker.ip = ip
+ // FIXME
+ worker.dryad.IP = ip
return nil
}
if !ok {
return nil, ErrWorkerNotFound
}
- return worker.ip, nil
+ return worker.dryad.IP, nil
}
// SetWorkerKey stores private key in the worker structure referenced by uuid.
}
b.Time("register", func() {
for i := 0; i < maximumWorkers; i++ {
- err := wl.Register(caps[i])
+ err := wl.Register(caps[i], "127.0.0.1:7175", "127.0.0.1:22")
Expect(err).ToNot(HaveOccurred())
}
})