--- /dev/null
+/*
+ * Copyright (c) 2017-2018 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+// File matcher/jobs.go contains implementation JobsManagerImpl of JobsManager
+// interface. It creates new jobs, stores them, make their data available and
+// allows to finish them.
+
+package matcher
+
+import (
+ "sync"
+
+ . "git.tizen.org/tools/boruta"
+ "git.tizen.org/tools/boruta/tunnels"
+ "git.tizen.org/tools/boruta/workers"
+)
+
+const defaultDryadUsername = "boruta-user"
+
+// JobsManagerImpl manages jobs. It is an implementation of JobsManager interface.
+// It provides methods for creation of new jobs, getting their data and finishing
+// them.
+type JobsManagerImpl struct {
+ JobsManager
+ // jobs stores all running jobs indexed by ID of the worker they are running on.
+ jobs map[WorkerUUID]*workers.Job
+ // workers provides access to workers.
+ workers WorkersManager
+ // mutex protects JobsManagerImpl from concurrent access.
+ mutex *sync.RWMutex
+ // hack for mocking purposes
+ newTunnel func() tunnels.Tunneler
+}
+
+// newTunnel provides default implementation of Tunneler interface.
+// It uses tunnels package implementation of Tunnel.
+// The function is set as JobsManagerImpl.newTunnel. Field can be replaced
+// by another function providing Tunneler for tests purposes.
+func newTunnel() tunnels.Tunneler {
+ return new(tunnels.Tunnel)
+}
+
+// NewJobsManager creates and returns new JobsManagerImpl structure.
+func NewJobsManager(w WorkersManager) JobsManager {
+ return &JobsManagerImpl{
+ jobs: make(map[WorkerUUID]*workers.Job),
+ workers: w,
+ mutex: new(sync.RWMutex),
+ newTunnel: newTunnel,
+ }
+}
+
+// Create method creates a new job for the request and the worker. It also prepares
+// communication to Dryad by creating a tunnel. It is a part of JobsManager
+// interface implementation.
+func (m *JobsManagerImpl) Create(req ReqID, worker WorkerUUID) error {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ _, present := m.jobs[worker]
+ if present {
+ return ErrJobAlreadyExists
+ }
+
+ ip, err := m.workers.GetWorkerIP(worker)
+ if err != nil {
+ return err
+ }
+ key, err := m.workers.GetWorkerKey(worker)
+ if err != nil {
+ return err
+ }
+ t := m.newTunnel()
+ err = t.Create(nil, ip)
+ if err != nil {
+ return err
+ }
+
+ job := &workers.Job{
+ Access: AccessInfo{
+ Addr: t.Addr(),
+ Key: key,
+ // TODO (m.wereski) Acquire username from config.
+ Username: defaultDryadUsername,
+ },
+ Tunnel: t,
+ Req: req,
+ }
+ m.jobs[worker] = job
+
+ return nil
+}
+
+// Get returns job information related to the worker ID or error if no job for
+// that worker was found. It is a part of JobsManager interface implementation.
+func (m *JobsManagerImpl) Get(worker WorkerUUID) (*workers.Job, error) {
+ m.mutex.RLock()
+ defer m.mutex.RUnlock()
+
+ job, present := m.jobs[worker]
+ if !present {
+ return nil, NotFoundError("Job")
+ }
+ return job, nil
+}
+
+// Finish closes the job execution, breaks the tunnel to Dryad and removes job
+// from jobs collection. It is a part of JobsManager interface implementation.
+func (m *JobsManagerImpl) Finish(worker WorkerUUID) error {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+
+ job, present := m.jobs[worker]
+ if !present {
+ return NotFoundError("Job")
+ }
+ job.Tunnel.Close()
+ // TODO log an error in case of tunnel closing failure. Nothing more can be done.
+ delete(m.jobs, worker)
+ return nil
+}
--- /dev/null
+/*
+ * Copyright (c) 2017-2018 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package matcher
+
+import (
+ "crypto/rsa"
+ "errors"
+ "net"
+
+ . "git.tizen.org/tools/boruta"
+ "git.tizen.org/tools/boruta/tunnels"
+ "git.tizen.org/tools/boruta/workers"
+
+ gomock "github.com/golang/mock/gomock"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+var _ = Describe("Jobs", func() {
+ Describe("NewJobsManager", func() {
+ It("should init all fields", func() {
+ w := &MockWorkersManager{}
+
+ jm := NewJobsManager(w)
+ Expect(jm).NotTo(BeNil())
+ Expect(jm.(*JobsManagerImpl).jobs).NotTo(BeNil())
+ Expect(jm.(*JobsManagerImpl).workers).To(Equal(w))
+ Expect(jm.(*JobsManagerImpl).mutex).NotTo(BeNil())
+ Expect(jm.(*JobsManagerImpl).newTunnel).NotTo(BeNil())
+ Expect(jm.(*JobsManagerImpl).newTunnel()).NotTo(BeNil())
+ })
+ It("should not use workers", func() {
+ ctrl := gomock.NewController(GinkgoT())
+ defer ctrl.Finish()
+
+ w := NewMockWorkersManager(ctrl)
+
+ NewJobsManager(w)
+ })
+ })
+ Describe("With prepared job data", func() {
+ var (
+ ctrl *gomock.Controller
+ w *MockWorkersManager
+ ttm *MockTunneler
+ jm JobsManager
+ ip net.IP = net.IPv4(5, 6, 7, 8)
+ key rsa.PrivateKey = rsa.PrivateKey{}
+ addr net.Addr = &net.TCPAddr{IP: net.IPv4(10, 11, 12, 13), Port: 12345}
+ req ReqID = ReqID(67)
+ worker WorkerUUID = WorkerUUID("TestWorker")
+ testerr error = errors.New("TestError")
+ )
+ BeforeEach(func() {
+ ctrl = gomock.NewController(GinkgoT())
+ w = NewMockWorkersManager(ctrl)
+ ttm = NewMockTunneler(ctrl)
+ jm = NewJobsManager(w)
+ jm.(*JobsManagerImpl).newTunnel = func() tunnels.Tunneler { return ttm }
+ })
+ AfterEach(func() {
+ ctrl.Finish()
+ })
+ assertJob := func(job *workers.Job) {
+ Expect(job.Access.Addr).To(Equal(addr))
+ Expect(job.Access.Key).To(Equal(key))
+ Expect(job.Req).To(Equal(req))
+ }
+ Describe("Create", func() {
+ It("should create a new, properly initialized job", func() {
+ gomock.InOrder(
+ w.EXPECT().GetWorkerIP(worker).Return(ip, nil),
+ w.EXPECT().GetWorkerKey(worker).Return(key, nil),
+ ttm.EXPECT().Create(nil, ip).Return(nil),
+ ttm.EXPECT().Addr().Return(addr),
+ )
+
+ err := jm.Create(req, worker)
+ Expect(err).NotTo(HaveOccurred())
+
+ Expect(len(jm.(*JobsManagerImpl).jobs)).To(Equal(1))
+ job, ok := jm.(*JobsManagerImpl).jobs[worker]
+ Expect(ok).To(BeTrue())
+ assertJob(job)
+ })
+ It("should fail to create another job for same worker", func() {
+ gomock.InOrder(
+ w.EXPECT().GetWorkerIP(worker).Return(ip, nil),
+ w.EXPECT().GetWorkerKey(worker).Return(key, nil),
+ ttm.EXPECT().Create(nil, ip).Return(nil),
+ ttm.EXPECT().Addr().Return(addr),
+ )
+
+ // Create first job.
+ err := jm.Create(req, worker)
+ Expect(err).NotTo(HaveOccurred())
+
+ // Create another job for the same worker.
+ err = jm.Create(req, worker)
+ Expect(err).To(Equal(ErrJobAlreadyExists))
+ })
+ It("should fail when GetWorkerIP fails", func() {
+ w.EXPECT().GetWorkerIP(worker).Return(nil, testerr)
+
+ err := jm.Create(req, worker)
+ Expect(err).To(Equal(testerr))
+ })
+ It("should fail and close tunnel when GetWorkerKey fails", func() {
+ gomock.InOrder(
+ w.EXPECT().GetWorkerIP(worker).Return(ip, nil),
+ w.EXPECT().GetWorkerKey(worker).Return(rsa.PrivateKey{}, testerr),
+ )
+
+ err := jm.Create(req, worker)
+ Expect(err).To(Equal(testerr))
+ })
+ It("should fail when tunnel creation fails", func() {
+ gomock.InOrder(
+ w.EXPECT().GetWorkerIP(worker).Return(ip, nil),
+ w.EXPECT().GetWorkerKey(worker).Return(key, nil),
+ ttm.EXPECT().Create(nil, ip).Return(testerr),
+ )
+
+ err := jm.Create(req, worker)
+ Expect(err).To(Equal(testerr))
+ })
+ })
+ Describe("Get", func() {
+ It("should get existing job", func() {
+ gomock.InOrder(
+ w.EXPECT().GetWorkerIP(worker).Return(ip, nil),
+ w.EXPECT().GetWorkerKey(worker).Return(key, nil),
+ ttm.EXPECT().Create(nil, ip).Return(nil),
+ ttm.EXPECT().Addr().Return(addr),
+ )
+
+ err := jm.Create(req, worker)
+ Expect(err).NotTo(HaveOccurred())
+
+ job, err := jm.Get(worker)
+ Expect(err).NotTo(HaveOccurred())
+
+ assertJob(job)
+ })
+ It("should fail getting nonexistent job", func() {
+ job, err := jm.Get(worker)
+ Expect(err).To(Equal(NotFoundError("Job")))
+ Expect(job).To(BeNil())
+ })
+ })
+ Describe("Finish", func() {
+ It("should finish existing job", func() {
+ gomock.InOrder(
+ w.EXPECT().GetWorkerIP(worker).Return(ip, nil),
+ w.EXPECT().GetWorkerKey(worker).Return(key, nil),
+ ttm.EXPECT().Create(nil, ip).Return(nil),
+ ttm.EXPECT().Addr().Return(addr),
+ ttm.EXPECT().Close(),
+ )
+
+ err := jm.Create(req, worker)
+ Expect(err).NotTo(HaveOccurred())
+
+ err = jm.Finish(worker)
+ Expect(err).NotTo(HaveOccurred())
+
+ Expect(jm.(*JobsManagerImpl).jobs).To(BeEmpty())
+ })
+ It("should fail to finish nonexistent job", func() {
+ err := jm.Finish(worker)
+ Expect(err).To(Equal(NotFoundError("Job")))
+ })
+ })
+ })
+})