Add Dryader implementation with tests 20/162020/4
authorLukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Wed, 22 Nov 2017 08:53:17 +0000 (09:53 +0100)
committerLukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Mon, 4 Dec 2017 10:01:24 +0000 (11:01 +0100)
DryaderImpl implements delegation and control of Job execution
in DryadJobManager.

Change-Id: I7d30f21bf0f838829f6c2d0b65362a6d7b66d0a3
Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
controller/dryaderimpl.go [new file with mode: 0644]
controller/dryaderimpl_test.go [new file with mode: 0644]

diff --git a/controller/dryaderimpl.go b/controller/dryaderimpl.go
new file mode 100644 (file)
index 0000000..0387ea9
--- /dev/null
@@ -0,0 +1,165 @@
+/*
+ *  Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+// File controller/dryaderimpl.go implements Dryader interface. It delegates
+// and controls Job execution by DryadJobManager.
+
+package controller
+
+import (
+       "fmt"
+       "sync"
+
+       . "git.tizen.org/tools/weles"
+       "git.tizen.org/tools/weles/controller/notifier"
+)
+
+// DryaderImpl implements Dryader. It delegates and controls Job execution
+// by DryadJobManager.
+type DryaderImpl struct {
+       // Notifier provides channel for communication with Controller.
+       notifier.Notifier
+       // jobs references module implementing Jobs management.
+       jobs JobsController
+       // djm manages DryadJobs.
+       djm DryadJobManager
+       // info contains Jobs delegated to DryadJobManager and not completed yet
+       // - active Jobs collection.
+       info map[JobID]bool
+       // mutex protects access to info map.
+       mutex *sync.Mutex
+       // listener listens on notifications from DryadJobManager.
+       listener chan DryadJobStatusChange
+       // finish is channel for stopping internal goroutine.
+       finish chan int
+       // looper waits for internal goroutine running loop to finish.
+       looper sync.WaitGroup
+}
+
+// NewDryader creates a new DryaderImpl structure setting up references
+// to used Weles modules.
+func NewDryader(j JobsController, d DryadJobManager) Dryader {
+       ret := &DryaderImpl{
+               Notifier: notifier.NewNotifier(),
+               jobs:     j,
+               djm:      d,
+               info:     make(map[JobID]bool),
+               mutex:    new(sync.Mutex),
+               listener: make(chan DryadJobStatusChange),
+               finish:   make(chan int),
+       }
+       ret.looper.Add(1)
+       go ret.loop()
+       return ret
+}
+
+// Finish internal goroutine.
+func (h *DryaderImpl) Finish() {
+       h.finish <- 1
+       h.looper.Wait()
+}
+
+// add adds a new Job delegated to DryadJobManager to active Jobs collection.
+func (h *DryaderImpl) add(j JobID) {
+       h.mutex.Lock()
+       defer h.mutex.Unlock()
+
+       h.info[j] = true
+}
+
+// remove Job from active Jobs collection.
+func (h *DryaderImpl) remove(j JobID) {
+       h.mutex.Lock()
+       defer h.mutex.Unlock()
+
+       delete(h.info, j)
+}
+
+// setStatus sets Jobs status to RUNNING and updates info.
+func (h *DryaderImpl) setStatus(j JobID, msg string) {
+       err := h.jobs.SetStatusAndInfo(j, JOB_RUNNING, msg)
+       if err != nil {
+               h.remove(j)
+               h.SendFail(j, fmt.Sprintf("Internal Weles error while changing Job status : %s", err.Error()))
+       }
+}
+
+// loop monitors DryadJob's status.
+func (h *DryaderImpl) loop() {
+       defer h.looper.Done()
+       for {
+               select {
+               case <-h.finish:
+                       return
+               case recv := <-h.listener:
+                       change := DryadJobInfo(recv)
+                       h.mutex.Lock()
+                       _, ok := h.info[change.Job]
+                       h.mutex.Unlock()
+                       if !ok {
+                               continue
+                       }
+
+                       switch change.Status {
+                       case DJ_NEW:
+                               h.setStatus(change.Job, "Started")
+                       case DJ_DEPLOY:
+                               h.setStatus(change.Job, "Deploying")
+                       case DJ_BOOT:
+                               h.setStatus(change.Job, "Booting")
+                       case DJ_TEST:
+                               h.setStatus(change.Job, "Testing")
+                       case DJ_FAIL:
+                               h.remove(change.Job)
+                               h.SendFail(change.Job, "Failed to execute test on Dryad.")
+                       case DJ_OK:
+                               h.remove(change.Job)
+                               h.SendOK(change.Job)
+                       }
+               }
+       }
+}
+
+// Start registers new Job to be executed in DryadJobManager.
+func (h *DryaderImpl) Start(j JobID) {
+       d, err := h.jobs.GetDryad(j)
+       if err != nil {
+               h.SendFail(j, fmt.Sprintf("Internal Weles error while getting Dryad for Job : %s", err.Error()))
+               return
+       }
+
+       h.add(j)
+
+       err = h.djm.Create(j, d, h.listener)
+       if err != nil {
+               h.remove(j)
+               h.SendFail(j, fmt.Sprintf("Cannot delegate Job to Dryad : %s", err.Error()))
+               return
+       }
+}
+
+// Cancel breaks Job execution in DryadJobManager
+func (h *DryaderImpl) Cancel(j JobID) {
+       h.mutex.Lock()
+       _, ok := h.info[j]
+       h.mutex.Unlock()
+       if !ok {
+               return
+       }
+
+       h.remove(j)
+       h.djm.Cancel(j)
+}
diff --git a/controller/dryaderimpl_test.go b/controller/dryaderimpl_test.go
new file mode 100644 (file)
index 0000000..08ac8a3
--- /dev/null
@@ -0,0 +1,219 @@
+/*
+ *  Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+package controller
+
+import (
+       "errors"
+       "net"
+
+       . "git.tizen.org/tools/weles"
+       cmock "git.tizen.org/tools/weles/controller/mock"
+       . "git.tizen.org/tools/weles/controller/notifier"
+       mock "git.tizen.org/tools/weles/mock"
+       gomock "github.com/golang/mock/gomock"
+       . "github.com/onsi/ginkgo"
+       . "github.com/onsi/gomega"
+)
+
+var _ = Describe("DryaderImpl", func() {
+       var r <-chan Notification
+       var jc *cmock.MockJobsController
+       var djm *mock.MockDryadJobManager
+       var h Dryader
+       var ctrl *gomock.Controller
+       j := JobID(0xCAFE)
+       dryad := Dryad{Addr: &net.IPNet{IP: net.IPv4(1, 2, 3, 4), Mask: net.IPv4Mask(5, 6, 7, 8)}}
+       err := errors.New("test error")
+
+       expectRegistered := func(offset int) {
+               h.(*DryaderImpl).mutex.Lock()
+               defer h.(*DryaderImpl).mutex.Unlock()
+
+               ExpectWithOffset(offset, len(h.(*DryaderImpl).info)).To(Equal(1))
+               info, ok := h.(*DryaderImpl).info[j]
+               ExpectWithOffset(offset, ok).To(BeTrue())
+               ExpectWithOffset(offset, info).To(BeTrue())
+       }
+       eventuallyEmpty := func(offset int) {
+               EventuallyWithOffset(offset, func() int {
+                       h.(*DryaderImpl).mutex.Lock()
+                       defer h.(*DryaderImpl).mutex.Unlock()
+                       return len(h.(*DryaderImpl).info)
+               }).Should(BeZero())
+       }
+       eventuallyNoti := func(offset int, ok bool, msg string) {
+               notification := Notification{}
+               expectedNotification := Notification{
+                       JobID: j,
+                       OK:    ok,
+                       Msg:   msg,
+               }
+               EventuallyWithOffset(offset, r).Should(Receive(&notification))
+               ExpectWithOffset(offset, notification).To(Equal(expectedNotification))
+       }
+
+       BeforeEach(func() {
+               ctrl = gomock.NewController(GinkgoT())
+
+               jc = cmock.NewMockJobsController(ctrl)
+               djm = mock.NewMockDryadJobManager(ctrl)
+
+               h = NewDryader(jc, djm)
+               r = h.Listen()
+       })
+       AfterEach(func() {
+               h.(*DryaderImpl).Finish()
+               ctrl.Finish()
+       })
+
+       Describe("NewBoruter", func() {
+               It("should create a new object", func() {
+                       Expect(h).NotTo(BeNil())
+                       Expect(h.(*DryaderImpl).jobs).To(Equal(jc))
+                       Expect(h.(*DryaderImpl).djm).To(Equal(djm))
+                       Expect(h.(*DryaderImpl).info).NotTo(BeNil())
+                       Expect(h.(*DryaderImpl).mutex).NotTo(BeNil())
+                       Expect(h.(*DryaderImpl).finish).NotTo(BeNil())
+               })
+       })
+
+       Describe("Start", func() {
+               It("should register job successfully", func() {
+                       jc.EXPECT().GetDryad(j).Return(dryad, nil)
+                       djm.EXPECT().Create(j, dryad, (chan<- DryadJobStatusChange)(h.(*DryaderImpl).listener))
+
+                       h.Start(j)
+
+                       expectRegistered(1)
+               })
+               It("should fail if DryadJobManager.Create fails", func() {
+                       jc.EXPECT().GetDryad(j).Return(dryad, nil)
+                       djm.EXPECT().Create(j, dryad, (chan<- DryadJobStatusChange)(h.(*DryaderImpl).listener)).Return(err)
+
+                       h.Start(j)
+
+                       eventuallyNoti(1, false, "Cannot delegate Job to Dryad : test error")
+                       eventuallyEmpty(1)
+               })
+               It("should fail if JobManager.GetDryad fails", func() {
+                       jc.EXPECT().GetDryad(j).Return(Dryad{}, err)
+
+                       h.Start(j)
+
+                       eventuallyNoti(1, false, "Internal Weles error while getting Dryad for Job : test error")
+                       eventuallyEmpty(1)
+               })
+       })
+
+       Describe("With registered request", func() {
+               updateStates := []DryadJobStatus{
+                       DJ_NEW,
+                       DJ_DEPLOY,
+                       DJ_BOOT,
+                       DJ_TEST,
+               }
+               updateMsgs := []string{
+                       "Started",
+                       "Deploying",
+                       "Booting",
+                       "Testing",
+               }
+               BeforeEach(func() {
+                       jc.EXPECT().GetDryad(j).Return(dryad, nil)
+                       djm.EXPECT().Create(j, dryad, (chan<- DryadJobStatusChange)(h.(*DryaderImpl).listener))
+
+                       h.Start(j)
+
+                       expectRegistered(1)
+               })
+
+               It("should ignore ID of not registered request", func() {
+                       states := []DryadJobStatus{
+                               DJ_NEW,
+                               DJ_DEPLOY,
+                               DJ_BOOT,
+                               DJ_TEST,
+                               DJ_FAIL,
+                               DJ_OK,
+                       }
+                       for _, s := range states {
+                               change := DryadJobInfo{Job: JobID(0x0BCA), Status: s}
+                               h.(*DryaderImpl).listener <- DryadJobStatusChange(change)
+
+                               expectRegistered(1)
+                       }
+               })
+               It("should update status of the Job", func() {
+                       for i, s := range updateStates {
+                               change := DryadJobInfo{Job: j, Status: s}
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_RUNNING, updateMsgs[i])
+
+                               h.(*DryaderImpl).listener <- DryadJobStatusChange(change)
+
+                               expectRegistered(1)
+                       }
+               })
+               for i, s := range updateStates { // requires new registered job for every test.
+                       It("should fail if updating status of the Job fails", func() {
+                               change := DryadJobInfo{Job: j, Status: s}
+                               jc.EXPECT().SetStatusAndInfo(j, JOB_RUNNING, updateMsgs[i]).Return(err)
+
+                               h.(*DryaderImpl).listener <- DryadJobStatusChange(change)
+
+                               eventuallyNoti(1, false, "Internal Weles error while changing Job status : test error")
+                               eventuallyEmpty(1)
+                       })
+               }
+               It("should fail if Dryad Job fails", func() {
+                       change := DryadJobInfo{Job: j, Status: DJ_FAIL}
+
+                       h.(*DryaderImpl).listener <- DryadJobStatusChange(change)
+
+                       eventuallyNoti(1, false, "Failed to execute test on Dryad.")
+                       eventuallyEmpty(1)
+               })
+               It("should notify about successfully completed Dryad Job", func() {
+                       change := DryadJobInfo{Job: j, Status: DJ_OK}
+
+                       h.(*DryaderImpl).listener <- DryadJobStatusChange(change)
+
+                       eventuallyNoti(1, true, "")
+                       eventuallyEmpty(1)
+               })
+
+               Describe("Cancel", func() {
+                       It("should remove Job and cancel it in Dryad Job Manager", func() {
+                               djm.EXPECT().Cancel(j)
+
+                               h.Cancel(j)
+
+                               eventuallyEmpty(1)
+                       })
+                       It("should ignore djm's Cancel error", func() {
+                               djm.EXPECT().Cancel(j).Return(err)
+
+                               h.Cancel(j)
+
+                               eventuallyEmpty(1)
+                       })
+                       It("should ignore not existing request", func() {
+                               h.Cancel(JobID(0x0BCA))
+                               expectRegistered(1)
+                       })
+               })
+       })
+})