--- /dev/null
+/*
+ * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+// File controller/dryaderimpl.go implements Dryader interface. It delegates
+// and controls Job execution by DryadJobManager.
+
+package controller
+
+import (
+ "fmt"
+ "sync"
+
+ . "git.tizen.org/tools/weles"
+ "git.tizen.org/tools/weles/controller/notifier"
+)
+
+// DryaderImpl implements Dryader. It delegates and controls Job execution
+// by DryadJobManager.
+type DryaderImpl struct {
+ // Notifier provides channel for communication with Controller.
+ notifier.Notifier
+ // jobs references module implementing Jobs management.
+ jobs JobsController
+ // djm manages DryadJobs.
+ djm DryadJobManager
+ // info contains Jobs delegated to DryadJobManager and not completed yet
+ // - active Jobs collection.
+ info map[JobID]bool
+ // mutex protects access to info map.
+ mutex *sync.Mutex
+ // listener listens on notifications from DryadJobManager.
+ listener chan DryadJobStatusChange
+ // finish is channel for stopping internal goroutine.
+ finish chan int
+ // looper waits for internal goroutine running loop to finish.
+ looper sync.WaitGroup
+}
+
+// NewDryader creates a new DryaderImpl structure setting up references
+// to used Weles modules.
+func NewDryader(j JobsController, d DryadJobManager) Dryader {
+ ret := &DryaderImpl{
+ Notifier: notifier.NewNotifier(),
+ jobs: j,
+ djm: d,
+ info: make(map[JobID]bool),
+ mutex: new(sync.Mutex),
+ listener: make(chan DryadJobStatusChange),
+ finish: make(chan int),
+ }
+ ret.looper.Add(1)
+ go ret.loop()
+ return ret
+}
+
+// Finish internal goroutine.
+func (h *DryaderImpl) Finish() {
+ h.finish <- 1
+ h.looper.Wait()
+}
+
+// add adds a new Job delegated to DryadJobManager to active Jobs collection.
+func (h *DryaderImpl) add(j JobID) {
+ h.mutex.Lock()
+ defer h.mutex.Unlock()
+
+ h.info[j] = true
+}
+
+// remove Job from active Jobs collection.
+func (h *DryaderImpl) remove(j JobID) {
+ h.mutex.Lock()
+ defer h.mutex.Unlock()
+
+ delete(h.info, j)
+}
+
+// setStatus sets Jobs status to RUNNING and updates info.
+func (h *DryaderImpl) setStatus(j JobID, msg string) {
+ err := h.jobs.SetStatusAndInfo(j, JOB_RUNNING, msg)
+ if err != nil {
+ h.remove(j)
+ h.SendFail(j, fmt.Sprintf("Internal Weles error while changing Job status : %s", err.Error()))
+ }
+}
+
+// loop monitors DryadJob's status.
+func (h *DryaderImpl) loop() {
+ defer h.looper.Done()
+ for {
+ select {
+ case <-h.finish:
+ return
+ case recv := <-h.listener:
+ change := DryadJobInfo(recv)
+ h.mutex.Lock()
+ _, ok := h.info[change.Job]
+ h.mutex.Unlock()
+ if !ok {
+ continue
+ }
+
+ switch change.Status {
+ case DJ_NEW:
+ h.setStatus(change.Job, "Started")
+ case DJ_DEPLOY:
+ h.setStatus(change.Job, "Deploying")
+ case DJ_BOOT:
+ h.setStatus(change.Job, "Booting")
+ case DJ_TEST:
+ h.setStatus(change.Job, "Testing")
+ case DJ_FAIL:
+ h.remove(change.Job)
+ h.SendFail(change.Job, "Failed to execute test on Dryad.")
+ case DJ_OK:
+ h.remove(change.Job)
+ h.SendOK(change.Job)
+ }
+ }
+ }
+}
+
+// Start registers new Job to be executed in DryadJobManager.
+func (h *DryaderImpl) Start(j JobID) {
+ d, err := h.jobs.GetDryad(j)
+ if err != nil {
+ h.SendFail(j, fmt.Sprintf("Internal Weles error while getting Dryad for Job : %s", err.Error()))
+ return
+ }
+
+ h.add(j)
+
+ err = h.djm.Create(j, d, h.listener)
+ if err != nil {
+ h.remove(j)
+ h.SendFail(j, fmt.Sprintf("Cannot delegate Job to Dryad : %s", err.Error()))
+ return
+ }
+}
+
+// Cancel breaks Job execution in DryadJobManager
+func (h *DryaderImpl) Cancel(j JobID) {
+ h.mutex.Lock()
+ _, ok := h.info[j]
+ h.mutex.Unlock()
+ if !ok {
+ return
+ }
+
+ h.remove(j)
+ h.djm.Cancel(j)
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package controller
+
+import (
+ "errors"
+ "net"
+
+ . "git.tizen.org/tools/weles"
+ cmock "git.tizen.org/tools/weles/controller/mock"
+ . "git.tizen.org/tools/weles/controller/notifier"
+ mock "git.tizen.org/tools/weles/mock"
+ gomock "github.com/golang/mock/gomock"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+var _ = Describe("DryaderImpl", func() {
+ var r <-chan Notification
+ var jc *cmock.MockJobsController
+ var djm *mock.MockDryadJobManager
+ var h Dryader
+ var ctrl *gomock.Controller
+ j := JobID(0xCAFE)
+ dryad := Dryad{Addr: &net.IPNet{IP: net.IPv4(1, 2, 3, 4), Mask: net.IPv4Mask(5, 6, 7, 8)}}
+ err := errors.New("test error")
+
+ expectRegistered := func(offset int) {
+ h.(*DryaderImpl).mutex.Lock()
+ defer h.(*DryaderImpl).mutex.Unlock()
+
+ ExpectWithOffset(offset, len(h.(*DryaderImpl).info)).To(Equal(1))
+ info, ok := h.(*DryaderImpl).info[j]
+ ExpectWithOffset(offset, ok).To(BeTrue())
+ ExpectWithOffset(offset, info).To(BeTrue())
+ }
+ eventuallyEmpty := func(offset int) {
+ EventuallyWithOffset(offset, func() int {
+ h.(*DryaderImpl).mutex.Lock()
+ defer h.(*DryaderImpl).mutex.Unlock()
+ return len(h.(*DryaderImpl).info)
+ }).Should(BeZero())
+ }
+ eventuallyNoti := func(offset int, ok bool, msg string) {
+ notification := Notification{}
+ expectedNotification := Notification{
+ JobID: j,
+ OK: ok,
+ Msg: msg,
+ }
+ EventuallyWithOffset(offset, r).Should(Receive(¬ification))
+ ExpectWithOffset(offset, notification).To(Equal(expectedNotification))
+ }
+
+ BeforeEach(func() {
+ ctrl = gomock.NewController(GinkgoT())
+
+ jc = cmock.NewMockJobsController(ctrl)
+ djm = mock.NewMockDryadJobManager(ctrl)
+
+ h = NewDryader(jc, djm)
+ r = h.Listen()
+ })
+ AfterEach(func() {
+ h.(*DryaderImpl).Finish()
+ ctrl.Finish()
+ })
+
+ Describe("NewBoruter", func() {
+ It("should create a new object", func() {
+ Expect(h).NotTo(BeNil())
+ Expect(h.(*DryaderImpl).jobs).To(Equal(jc))
+ Expect(h.(*DryaderImpl).djm).To(Equal(djm))
+ Expect(h.(*DryaderImpl).info).NotTo(BeNil())
+ Expect(h.(*DryaderImpl).mutex).NotTo(BeNil())
+ Expect(h.(*DryaderImpl).finish).NotTo(BeNil())
+ })
+ })
+
+ Describe("Start", func() {
+ It("should register job successfully", func() {
+ jc.EXPECT().GetDryad(j).Return(dryad, nil)
+ djm.EXPECT().Create(j, dryad, (chan<- DryadJobStatusChange)(h.(*DryaderImpl).listener))
+
+ h.Start(j)
+
+ expectRegistered(1)
+ })
+ It("should fail if DryadJobManager.Create fails", func() {
+ jc.EXPECT().GetDryad(j).Return(dryad, nil)
+ djm.EXPECT().Create(j, dryad, (chan<- DryadJobStatusChange)(h.(*DryaderImpl).listener)).Return(err)
+
+ h.Start(j)
+
+ eventuallyNoti(1, false, "Cannot delegate Job to Dryad : test error")
+ eventuallyEmpty(1)
+ })
+ It("should fail if JobManager.GetDryad fails", func() {
+ jc.EXPECT().GetDryad(j).Return(Dryad{}, err)
+
+ h.Start(j)
+
+ eventuallyNoti(1, false, "Internal Weles error while getting Dryad for Job : test error")
+ eventuallyEmpty(1)
+ })
+ })
+
+ Describe("With registered request", func() {
+ updateStates := []DryadJobStatus{
+ DJ_NEW,
+ DJ_DEPLOY,
+ DJ_BOOT,
+ DJ_TEST,
+ }
+ updateMsgs := []string{
+ "Started",
+ "Deploying",
+ "Booting",
+ "Testing",
+ }
+ BeforeEach(func() {
+ jc.EXPECT().GetDryad(j).Return(dryad, nil)
+ djm.EXPECT().Create(j, dryad, (chan<- DryadJobStatusChange)(h.(*DryaderImpl).listener))
+
+ h.Start(j)
+
+ expectRegistered(1)
+ })
+
+ It("should ignore ID of not registered request", func() {
+ states := []DryadJobStatus{
+ DJ_NEW,
+ DJ_DEPLOY,
+ DJ_BOOT,
+ DJ_TEST,
+ DJ_FAIL,
+ DJ_OK,
+ }
+ for _, s := range states {
+ change := DryadJobInfo{Job: JobID(0x0BCA), Status: s}
+ h.(*DryaderImpl).listener <- DryadJobStatusChange(change)
+
+ expectRegistered(1)
+ }
+ })
+ It("should update status of the Job", func() {
+ for i, s := range updateStates {
+ change := DryadJobInfo{Job: j, Status: s}
+ jc.EXPECT().SetStatusAndInfo(j, JOB_RUNNING, updateMsgs[i])
+
+ h.(*DryaderImpl).listener <- DryadJobStatusChange(change)
+
+ expectRegistered(1)
+ }
+ })
+ for i, s := range updateStates { // requires new registered job for every test.
+ It("should fail if updating status of the Job fails", func() {
+ change := DryadJobInfo{Job: j, Status: s}
+ jc.EXPECT().SetStatusAndInfo(j, JOB_RUNNING, updateMsgs[i]).Return(err)
+
+ h.(*DryaderImpl).listener <- DryadJobStatusChange(change)
+
+ eventuallyNoti(1, false, "Internal Weles error while changing Job status : test error")
+ eventuallyEmpty(1)
+ })
+ }
+ It("should fail if Dryad Job fails", func() {
+ change := DryadJobInfo{Job: j, Status: DJ_FAIL}
+
+ h.(*DryaderImpl).listener <- DryadJobStatusChange(change)
+
+ eventuallyNoti(1, false, "Failed to execute test on Dryad.")
+ eventuallyEmpty(1)
+ })
+ It("should notify about successfully completed Dryad Job", func() {
+ change := DryadJobInfo{Job: j, Status: DJ_OK}
+
+ h.(*DryaderImpl).listener <- DryadJobStatusChange(change)
+
+ eventuallyNoti(1, true, "")
+ eventuallyEmpty(1)
+ })
+
+ Describe("Cancel", func() {
+ It("should remove Job and cancel it in Dryad Job Manager", func() {
+ djm.EXPECT().Cancel(j)
+
+ h.Cancel(j)
+
+ eventuallyEmpty(1)
+ })
+ It("should ignore djm's Cancel error", func() {
+ djm.EXPECT().Cancel(j).Return(err)
+
+ h.Cancel(j)
+
+ eventuallyEmpty(1)
+ })
+ It("should ignore not existing request", func() {
+ h.Cancel(JobID(0x0BCA))
+ expectRegistered(1)
+ })
+ })
+ })
+})