From 8d6393282727b4720b5478227301ef4dbe1c6230 Mon Sep 17 00:00:00 2001 From: Aleksander Mistewicz Date: Fri, 16 Feb 2018 14:51:07 +0100 Subject: [PATCH] Add superviserReception to rpc/superviser package boruta server requires IP address of the connected dryads to be saved so that it can connect back to the Go RPC service they expose. superviserReception wraps WorkerList in Superviser interface. It saves IP address of a connection and registers a new Go RPC service. The saved IP address is later used during call to Register. Change-Id: I72f91d1c981e02ac13a07327b0b910e82a32fae9 Signed-off-by: Aleksander Mistewicz Reviewed-on: https://mcdsrvbld02.digital.local/review/49609 Reviewed-by: Maciej Wereski Tested-by: Maciej Wereski --- rpc/superviser/reception.go | 106 ++++++++++++++++++++++++++++++++ rpc/superviser/reception_test.go | 55 +++++++++++++++++ rpc/superviser/superviser_suite_test.go | 29 +++++++++ 3 files changed, 190 insertions(+) create mode 100644 rpc/superviser/reception.go create mode 100644 rpc/superviser/reception_test.go create mode 100644 rpc/superviser/superviser_suite_test.go diff --git a/rpc/superviser/reception.go b/rpc/superviser/reception.go new file mode 100644 index 0000000..bb20c64 --- /dev/null +++ b/rpc/superviser/reception.go @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2018 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +// Package superviser provides Go RPC implementation of client and server for Superviser interface. +// +// It also provides superviserReception that may be used with StartSuperviserReception to record IP +// address of the client and call SetWorkerIP. +package superviser + +import ( + "net" + "net/rpc" + + "git.tizen.org/tools/boruta" + "git.tizen.org/tools/boruta/workers" +) + +type superviserReception struct { + wl *workers.WorkerList + listener net.Listener +} + +type addressBook struct { + ip net.IP + wl *workers.WorkerList +} + +func startSuperviserReception(wl *workers.WorkerList, addr string) (sr *superviserReception, err error) { + sr = new(superviserReception) + sr.listener, err = net.Listen("tcp", addr) + if err != nil { + return + } + sr.wl = wl + go sr.listenAndServe() + return +} + +// StartSuperviserReception starts listener on addr. For each connection it extracts information +// about client's IP address and serves the connection. In the handler of Register, the extracted +// address is used in call to SetWorkerIP of WorkerList after successful Register of WorkerList. +// +// SetFail is unchanged, i.e. it calls SetFail of WorkerList without modification of arguments and +// return values. +func StartSuperviserReception(wl *workers.WorkerList, addr string) (err error) { + _, err = startSuperviserReception(wl, addr) + return err +} + +func (sr *superviserReception) listenAndServe() { + for { + conn, err := sr.listener.Accept() + if err != nil { + // FIXME(amistewicz): properly handle the error as a busy loop is possible. + continue + } + go sr.serve(conn) + } +} + +// serve extracts IP address of the client and stores it in a newly created instance of +// connIntercepter, which will use it for SetWorkerIP call. +func (sr *superviserReception) serve(conn net.Conn) { + ip := conn.RemoteAddr().(*net.TCPAddr).IP + + sub := &addressBook{ + ip: ip, + wl: sr.wl, + } + + srv := rpc.NewServer() + err := RegisterSuperviserService(srv, sub) + if err != nil { + // TODO(amistewicz): log an error. + return + } + + srv.ServeConn(conn) +} + +// Register calls Register and SetWorkerIP of WorkerList if the former call was successful. +func (ab *addressBook) Register(caps boruta.Capabilities) (err error) { + err = ab.wl.Register(caps) + if err != nil { + return + } + return ab.wl.SetWorkerIP(caps.GetWorkerUUID(), ab.ip) +} + +// SetFail calls SetFail of WorkerList. +func (ab *addressBook) SetFail(uuid boruta.WorkerUUID, reason string) (err error) { + return ab.wl.SetFail(uuid, reason) +} diff --git a/rpc/superviser/reception_test.go b/rpc/superviser/reception_test.go new file mode 100644 index 0000000..b9aba08 --- /dev/null +++ b/rpc/superviser/reception_test.go @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2018 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package superviser + +import ( + "net" + + "git.tizen.org/tools/boruta" + "git.tizen.org/tools/boruta/workers" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("superviserReception", func() { + var i *superviserReception + var wl *workers.WorkerList + var addr net.Addr + + BeforeEach(func() { + var err error + wl = workers.NewWorkerList() + i, err = startSuperviserReception(wl, "") + Expect(err).ToNot(HaveOccurred()) + addr = i.listener.Addr() + }) + + It("should get IP from connection", func() { + uuidStr := "test-uuid" + uuid := boruta.WorkerUUID(uuidStr) + c, err := DialSuperviserClient(addr.String()) + Expect(err).ToNot(HaveOccurred()) + + err = c.Register(boruta.Capabilities{"UUID": uuidStr}) + Expect(err).ToNot(HaveOccurred()) + + ip, err := wl.GetWorkerIP(uuid) + Expect(err).ToNot(HaveOccurred()) + Expect(ip).ToNot(BeNil()) + }) +}) diff --git a/rpc/superviser/superviser_suite_test.go b/rpc/superviser/superviser_suite_test.go new file mode 100644 index 0000000..8ea0c0d --- /dev/null +++ b/rpc/superviser/superviser_suite_test.go @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2018 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package superviser_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "testing" +) + +func TestSuperviser(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Superviser Suite") +} -- 2.7.4