Add JobsManager implementation with tests
authorLukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Thu, 26 Apr 2018 10:16:28 +0000 (12:16 +0200)
committerLukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Fri, 27 Apr 2018 15:34:20 +0000 (17:34 +0200)
JobsManagerImpl implements JobsManager interface providing support
for jobs management. It uses provided WorkManager for managing
workers and Tunnels implementation of Tunneler for handling tunnels.

Tests base on usage of MockWorkersManager and MockTunneler
for mockuping access to workers and tunnels.

Change-Id: I4c469f32fc4f918641f69843567b39bfcce8da4c
Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Signed-off-by: Maciej Wereski <m.wereski@partner.samsung.com>
matcher/error.go [new file with mode: 0644]
matcher/jobs.go [new file with mode: 0644]
matcher/jobs_test.go [new file with mode: 0644]
matcher/matcher_suite_test.go [new file with mode: 0644]

diff --git a/matcher/error.go b/matcher/error.go
new file mode 100644 (file)
index 0000000..9f8c8b7
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ *  Copyright (c) 2017-2018 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+// File matcher/error.go provides matcher package related errors definitions.
+
+package matcher
+
+import (
+       "errors"
+)
+
+var (
+       // ErrJobAlreadyExists is returned during job creation if previous job
+       // run by the worker was not released properly.
+       ErrJobAlreadyExists = errors.New("job already exists")
+)
diff --git a/matcher/jobs.go b/matcher/jobs.go
new file mode 100644 (file)
index 0000000..9eb79e8
--- /dev/null
@@ -0,0 +1,134 @@
+/*
+ *  Copyright (c) 2017-2018 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+// File matcher/jobs.go contains implementation JobsManagerImpl of JobsManager
+// interface. It creates new jobs, stores them, make their data available and
+// allows to finish them.
+
+package matcher
+
+import (
+       "sync"
+
+       . "git.tizen.org/tools/boruta"
+       "git.tizen.org/tools/boruta/tunnels"
+       "git.tizen.org/tools/boruta/workers"
+)
+
+const defaultDryadUsername = "boruta-user"
+
+// JobsManagerImpl manages jobs. It is an implementation of JobsManager interface.
+// It provides methods for creation of new jobs, getting their data and finishing
+// them.
+type JobsManagerImpl struct {
+       JobsManager
+       // jobs stores all running jobs indexed by ID of the worker they are running on.
+       jobs map[WorkerUUID]*workers.Job
+       // workers provides access to workers.
+       workers WorkersManager
+       // mutex protects JobsManagerImpl from concurrent access.
+       mutex *sync.RWMutex
+       // hack for mocking purposes
+       newTunnel func() tunnels.Tunneler
+}
+
+// newTunnel provides default implementation of Tunneler interface.
+// It uses tunnels package implementation of Tunnel.
+// The function is set as JobsManagerImpl.newTunnel. Field can be replaced
+// by another function providing Tunneler for tests purposes.
+func newTunnel() tunnels.Tunneler {
+       return new(tunnels.Tunnel)
+}
+
+// NewJobsManager creates and returns new JobsManagerImpl structure.
+func NewJobsManager(w WorkersManager) JobsManager {
+       return &JobsManagerImpl{
+               jobs:      make(map[WorkerUUID]*workers.Job),
+               workers:   w,
+               mutex:     new(sync.RWMutex),
+               newTunnel: newTunnel,
+       }
+}
+
+// Create method creates a new job for the request and the worker. It also prepares
+// communication to Dryad by creating a tunnel. It is a part of JobsManager
+// interface implementation.
+func (m *JobsManagerImpl) Create(req ReqID, worker WorkerUUID) error {
+       m.mutex.Lock()
+       defer m.mutex.Unlock()
+
+       _, present := m.jobs[worker]
+       if present {
+               return ErrJobAlreadyExists
+       }
+
+       ip, err := m.workers.GetWorkerIP(worker)
+       if err != nil {
+               return err
+       }
+       key, err := m.workers.GetWorkerKey(worker)
+       if err != nil {
+               return err
+       }
+       t := m.newTunnel()
+       err = t.Create(nil, ip)
+       if err != nil {
+               return err
+       }
+
+       job := &workers.Job{
+               Access: AccessInfo{
+                       Addr: t.Addr(),
+                       Key:  key,
+                       // TODO (m.wereski) Acquire username from config.
+                       Username: defaultDryadUsername,
+               },
+               Tunnel: t,
+               Req:    req,
+       }
+       m.jobs[worker] = job
+
+       return nil
+}
+
+// Get returns job information related to the worker ID or error if no job for
+// that worker was found. It is a part of JobsManager interface implementation.
+func (m *JobsManagerImpl) Get(worker WorkerUUID) (*workers.Job, error) {
+       m.mutex.RLock()
+       defer m.mutex.RUnlock()
+
+       job, present := m.jobs[worker]
+       if !present {
+               return nil, NotFoundError("Job")
+       }
+       return job, nil
+}
+
+// Finish closes the job execution, breaks the tunnel to Dryad and removes job
+// from jobs collection. It is a part of JobsManager interface implementation.
+func (m *JobsManagerImpl) Finish(worker WorkerUUID) error {
+       m.mutex.Lock()
+       defer m.mutex.Unlock()
+
+       job, present := m.jobs[worker]
+       if !present {
+               return NotFoundError("Job")
+       }
+       job.Tunnel.Close()
+       // TODO log an error in case of tunnel closing failure. Nothing more can be done.
+       delete(m.jobs, worker)
+       return nil
+}
diff --git a/matcher/jobs_test.go b/matcher/jobs_test.go
new file mode 100644 (file)
index 0000000..f934744
--- /dev/null
@@ -0,0 +1,190 @@
+/*
+ *  Copyright (c) 2017-2018 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+package matcher
+
+import (
+       "crypto/rsa"
+       "errors"
+       "net"
+
+       . "git.tizen.org/tools/boruta"
+       "git.tizen.org/tools/boruta/tunnels"
+       "git.tizen.org/tools/boruta/workers"
+
+       gomock "github.com/golang/mock/gomock"
+
+       . "github.com/onsi/ginkgo"
+       . "github.com/onsi/gomega"
+)
+
+var _ = Describe("Jobs", func() {
+       Describe("NewJobsManager", func() {
+               It("should init all fields", func() {
+                       w := &MockWorkersManager{}
+
+                       jm := NewJobsManager(w)
+                       Expect(jm).NotTo(BeNil())
+                       Expect(jm.(*JobsManagerImpl).jobs).NotTo(BeNil())
+                       Expect(jm.(*JobsManagerImpl).workers).To(Equal(w))
+                       Expect(jm.(*JobsManagerImpl).mutex).NotTo(BeNil())
+                       Expect(jm.(*JobsManagerImpl).newTunnel).NotTo(BeNil())
+                       Expect(jm.(*JobsManagerImpl).newTunnel()).NotTo(BeNil())
+               })
+               It("should not use workers", func() {
+                       ctrl := gomock.NewController(GinkgoT())
+                       defer ctrl.Finish()
+
+                       w := NewMockWorkersManager(ctrl)
+
+                       NewJobsManager(w)
+               })
+       })
+       Describe("With prepared job data", func() {
+               var (
+                       ctrl    *gomock.Controller
+                       w       *MockWorkersManager
+                       ttm     *MockTunneler
+                       jm      JobsManager
+                       ip      net.IP         = net.IPv4(5, 6, 7, 8)
+                       key     rsa.PrivateKey = rsa.PrivateKey{}
+                       addr    net.Addr       = &net.TCPAddr{IP: net.IPv4(10, 11, 12, 13), Port: 12345}
+                       req     ReqID          = ReqID(67)
+                       worker  WorkerUUID     = WorkerUUID("TestWorker")
+                       testerr error          = errors.New("TestError")
+               )
+               BeforeEach(func() {
+                       ctrl = gomock.NewController(GinkgoT())
+                       w = NewMockWorkersManager(ctrl)
+                       ttm = NewMockTunneler(ctrl)
+                       jm = NewJobsManager(w)
+                       jm.(*JobsManagerImpl).newTunnel = func() tunnels.Tunneler { return ttm }
+               })
+               AfterEach(func() {
+                       ctrl.Finish()
+               })
+               assertJob := func(job *workers.Job) {
+                       Expect(job.Access.Addr).To(Equal(addr))
+                       Expect(job.Access.Key).To(Equal(key))
+                       Expect(job.Req).To(Equal(req))
+               }
+               Describe("Create", func() {
+                       It("should create a new, properly initialized job", func() {
+                               gomock.InOrder(
+                                       w.EXPECT().GetWorkerIP(worker).Return(ip, nil),
+                                       w.EXPECT().GetWorkerKey(worker).Return(key, nil),
+                                       ttm.EXPECT().Create(nil, ip).Return(nil),
+                                       ttm.EXPECT().Addr().Return(addr),
+                               )
+
+                               err := jm.Create(req, worker)
+                               Expect(err).NotTo(HaveOccurred())
+
+                               Expect(len(jm.(*JobsManagerImpl).jobs)).To(Equal(1))
+                               job, ok := jm.(*JobsManagerImpl).jobs[worker]
+                               Expect(ok).To(BeTrue())
+                               assertJob(job)
+                       })
+                       It("should fail to create another job for same worker", func() {
+                               gomock.InOrder(
+                                       w.EXPECT().GetWorkerIP(worker).Return(ip, nil),
+                                       w.EXPECT().GetWorkerKey(worker).Return(key, nil),
+                                       ttm.EXPECT().Create(nil, ip).Return(nil),
+                                       ttm.EXPECT().Addr().Return(addr),
+                               )
+
+                               // Create first job.
+                               err := jm.Create(req, worker)
+                               Expect(err).NotTo(HaveOccurred())
+
+                               // Create another job for the same worker.
+                               err = jm.Create(req, worker)
+                               Expect(err).To(Equal(ErrJobAlreadyExists))
+                       })
+                       It("should fail when GetWorkerIP fails", func() {
+                               w.EXPECT().GetWorkerIP(worker).Return(nil, testerr)
+
+                               err := jm.Create(req, worker)
+                               Expect(err).To(Equal(testerr))
+                       })
+                       It("should fail and close tunnel when GetWorkerKey fails", func() {
+                               gomock.InOrder(
+                                       w.EXPECT().GetWorkerIP(worker).Return(ip, nil),
+                                       w.EXPECT().GetWorkerKey(worker).Return(rsa.PrivateKey{}, testerr),
+                               )
+
+                               err := jm.Create(req, worker)
+                               Expect(err).To(Equal(testerr))
+                       })
+                       It("should fail when tunnel creation fails", func() {
+                               gomock.InOrder(
+                                       w.EXPECT().GetWorkerIP(worker).Return(ip, nil),
+                                       w.EXPECT().GetWorkerKey(worker).Return(key, nil),
+                                       ttm.EXPECT().Create(nil, ip).Return(testerr),
+                               )
+
+                               err := jm.Create(req, worker)
+                               Expect(err).To(Equal(testerr))
+                       })
+               })
+               Describe("Get", func() {
+                       It("should get existing job", func() {
+                               gomock.InOrder(
+                                       w.EXPECT().GetWorkerIP(worker).Return(ip, nil),
+                                       w.EXPECT().GetWorkerKey(worker).Return(key, nil),
+                                       ttm.EXPECT().Create(nil, ip).Return(nil),
+                                       ttm.EXPECT().Addr().Return(addr),
+                               )
+
+                               err := jm.Create(req, worker)
+                               Expect(err).NotTo(HaveOccurred())
+
+                               job, err := jm.Get(worker)
+                               Expect(err).NotTo(HaveOccurred())
+
+                               assertJob(job)
+                       })
+                       It("should fail getting nonexistent job", func() {
+                               job, err := jm.Get(worker)
+                               Expect(err).To(Equal(NotFoundError("Job")))
+                               Expect(job).To(BeNil())
+                       })
+               })
+               Describe("Finish", func() {
+                       It("should finish existing job", func() {
+                               gomock.InOrder(
+                                       w.EXPECT().GetWorkerIP(worker).Return(ip, nil),
+                                       w.EXPECT().GetWorkerKey(worker).Return(key, nil),
+                                       ttm.EXPECT().Create(nil, ip).Return(nil),
+                                       ttm.EXPECT().Addr().Return(addr),
+                                       ttm.EXPECT().Close(),
+                               )
+
+                               err := jm.Create(req, worker)
+                               Expect(err).NotTo(HaveOccurred())
+
+                               err = jm.Finish(worker)
+                               Expect(err).NotTo(HaveOccurred())
+
+                               Expect(jm.(*JobsManagerImpl).jobs).To(BeEmpty())
+                       })
+                       It("should fail to finish nonexistent job", func() {
+                               err := jm.Finish(worker)
+                               Expect(err).To(Equal(NotFoundError("Job")))
+                       })
+               })
+       })
+})
diff --git a/matcher/matcher_suite_test.go b/matcher/matcher_suite_test.go
new file mode 100644 (file)
index 0000000..b558b4c
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ *  Copyright (c) 2017-2018 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+package matcher
+
+import (
+       . "github.com/onsi/ginkgo"
+       . "github.com/onsi/gomega"
+
+       "testing"
+)
+
+func TestMatcher(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Matcher Suite")
+}