From: Lukasz Wojciechowski Date: Wed, 22 Nov 2017 08:53:17 +0000 (+0100) Subject: Add Dryader implementation with tests X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=refs%2Fchanges%2F20%2F162020%2F4;p=tools%2Fweles.git Add Dryader implementation with tests DryaderImpl implements delegation and control of Job execution in DryadJobManager. Change-Id: I7d30f21bf0f838829f6c2d0b65362a6d7b66d0a3 Signed-off-by: Lukasz Wojciechowski --- diff --git a/controller/dryaderimpl.go b/controller/dryaderimpl.go new file mode 100644 index 0000000..0387ea9 --- /dev/null +++ b/controller/dryaderimpl.go @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +// File controller/dryaderimpl.go implements Dryader interface. It delegates +// and controls Job execution by DryadJobManager. + +package controller + +import ( + "fmt" + "sync" + + . "git.tizen.org/tools/weles" + "git.tizen.org/tools/weles/controller/notifier" +) + +// DryaderImpl implements Dryader. It delegates and controls Job execution +// by DryadJobManager. +type DryaderImpl struct { + // Notifier provides channel for communication with Controller. + notifier.Notifier + // jobs references module implementing Jobs management. + jobs JobsController + // djm manages DryadJobs. + djm DryadJobManager + // info contains Jobs delegated to DryadJobManager and not completed yet + // - active Jobs collection. + info map[JobID]bool + // mutex protects access to info map. + mutex *sync.Mutex + // listener listens on notifications from DryadJobManager. + listener chan DryadJobStatusChange + // finish is channel for stopping internal goroutine. + finish chan int + // looper waits for internal goroutine running loop to finish. + looper sync.WaitGroup +} + +// NewDryader creates a new DryaderImpl structure setting up references +// to used Weles modules. +func NewDryader(j JobsController, d DryadJobManager) Dryader { + ret := &DryaderImpl{ + Notifier: notifier.NewNotifier(), + jobs: j, + djm: d, + info: make(map[JobID]bool), + mutex: new(sync.Mutex), + listener: make(chan DryadJobStatusChange), + finish: make(chan int), + } + ret.looper.Add(1) + go ret.loop() + return ret +} + +// Finish internal goroutine. +func (h *DryaderImpl) Finish() { + h.finish <- 1 + h.looper.Wait() +} + +// add adds a new Job delegated to DryadJobManager to active Jobs collection. +func (h *DryaderImpl) add(j JobID) { + h.mutex.Lock() + defer h.mutex.Unlock() + + h.info[j] = true +} + +// remove Job from active Jobs collection. +func (h *DryaderImpl) remove(j JobID) { + h.mutex.Lock() + defer h.mutex.Unlock() + + delete(h.info, j) +} + +// setStatus sets Jobs status to RUNNING and updates info. +func (h *DryaderImpl) setStatus(j JobID, msg string) { + err := h.jobs.SetStatusAndInfo(j, JOB_RUNNING, msg) + if err != nil { + h.remove(j) + h.SendFail(j, fmt.Sprintf("Internal Weles error while changing Job status : %s", err.Error())) + } +} + +// loop monitors DryadJob's status. +func (h *DryaderImpl) loop() { + defer h.looper.Done() + for { + select { + case <-h.finish: + return + case recv := <-h.listener: + change := DryadJobInfo(recv) + h.mutex.Lock() + _, ok := h.info[change.Job] + h.mutex.Unlock() + if !ok { + continue + } + + switch change.Status { + case DJ_NEW: + h.setStatus(change.Job, "Started") + case DJ_DEPLOY: + h.setStatus(change.Job, "Deploying") + case DJ_BOOT: + h.setStatus(change.Job, "Booting") + case DJ_TEST: + h.setStatus(change.Job, "Testing") + case DJ_FAIL: + h.remove(change.Job) + h.SendFail(change.Job, "Failed to execute test on Dryad.") + case DJ_OK: + h.remove(change.Job) + h.SendOK(change.Job) + } + } + } +} + +// Start registers new Job to be executed in DryadJobManager. +func (h *DryaderImpl) Start(j JobID) { + d, err := h.jobs.GetDryad(j) + if err != nil { + h.SendFail(j, fmt.Sprintf("Internal Weles error while getting Dryad for Job : %s", err.Error())) + return + } + + h.add(j) + + err = h.djm.Create(j, d, h.listener) + if err != nil { + h.remove(j) + h.SendFail(j, fmt.Sprintf("Cannot delegate Job to Dryad : %s", err.Error())) + return + } +} + +// Cancel breaks Job execution in DryadJobManager +func (h *DryaderImpl) Cancel(j JobID) { + h.mutex.Lock() + _, ok := h.info[j] + h.mutex.Unlock() + if !ok { + return + } + + h.remove(j) + h.djm.Cancel(j) +} diff --git a/controller/dryaderimpl_test.go b/controller/dryaderimpl_test.go new file mode 100644 index 0000000..08ac8a3 --- /dev/null +++ b/controller/dryaderimpl_test.go @@ -0,0 +1,219 @@ +/* + * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package controller + +import ( + "errors" + "net" + + . "git.tizen.org/tools/weles" + cmock "git.tizen.org/tools/weles/controller/mock" + . "git.tizen.org/tools/weles/controller/notifier" + mock "git.tizen.org/tools/weles/mock" + gomock "github.com/golang/mock/gomock" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("DryaderImpl", func() { + var r <-chan Notification + var jc *cmock.MockJobsController + var djm *mock.MockDryadJobManager + var h Dryader + var ctrl *gomock.Controller + j := JobID(0xCAFE) + dryad := Dryad{Addr: &net.IPNet{IP: net.IPv4(1, 2, 3, 4), Mask: net.IPv4Mask(5, 6, 7, 8)}} + err := errors.New("test error") + + expectRegistered := func(offset int) { + h.(*DryaderImpl).mutex.Lock() + defer h.(*DryaderImpl).mutex.Unlock() + + ExpectWithOffset(offset, len(h.(*DryaderImpl).info)).To(Equal(1)) + info, ok := h.(*DryaderImpl).info[j] + ExpectWithOffset(offset, ok).To(BeTrue()) + ExpectWithOffset(offset, info).To(BeTrue()) + } + eventuallyEmpty := func(offset int) { + EventuallyWithOffset(offset, func() int { + h.(*DryaderImpl).mutex.Lock() + defer h.(*DryaderImpl).mutex.Unlock() + return len(h.(*DryaderImpl).info) + }).Should(BeZero()) + } + eventuallyNoti := func(offset int, ok bool, msg string) { + notification := Notification{} + expectedNotification := Notification{ + JobID: j, + OK: ok, + Msg: msg, + } + EventuallyWithOffset(offset, r).Should(Receive(¬ification)) + ExpectWithOffset(offset, notification).To(Equal(expectedNotification)) + } + + BeforeEach(func() { + ctrl = gomock.NewController(GinkgoT()) + + jc = cmock.NewMockJobsController(ctrl) + djm = mock.NewMockDryadJobManager(ctrl) + + h = NewDryader(jc, djm) + r = h.Listen() + }) + AfterEach(func() { + h.(*DryaderImpl).Finish() + ctrl.Finish() + }) + + Describe("NewBoruter", func() { + It("should create a new object", func() { + Expect(h).NotTo(BeNil()) + Expect(h.(*DryaderImpl).jobs).To(Equal(jc)) + Expect(h.(*DryaderImpl).djm).To(Equal(djm)) + Expect(h.(*DryaderImpl).info).NotTo(BeNil()) + Expect(h.(*DryaderImpl).mutex).NotTo(BeNil()) + Expect(h.(*DryaderImpl).finish).NotTo(BeNil()) + }) + }) + + Describe("Start", func() { + It("should register job successfully", func() { + jc.EXPECT().GetDryad(j).Return(dryad, nil) + djm.EXPECT().Create(j, dryad, (chan<- DryadJobStatusChange)(h.(*DryaderImpl).listener)) + + h.Start(j) + + expectRegistered(1) + }) + It("should fail if DryadJobManager.Create fails", func() { + jc.EXPECT().GetDryad(j).Return(dryad, nil) + djm.EXPECT().Create(j, dryad, (chan<- DryadJobStatusChange)(h.(*DryaderImpl).listener)).Return(err) + + h.Start(j) + + eventuallyNoti(1, false, "Cannot delegate Job to Dryad : test error") + eventuallyEmpty(1) + }) + It("should fail if JobManager.GetDryad fails", func() { + jc.EXPECT().GetDryad(j).Return(Dryad{}, err) + + h.Start(j) + + eventuallyNoti(1, false, "Internal Weles error while getting Dryad for Job : test error") + eventuallyEmpty(1) + }) + }) + + Describe("With registered request", func() { + updateStates := []DryadJobStatus{ + DJ_NEW, + DJ_DEPLOY, + DJ_BOOT, + DJ_TEST, + } + updateMsgs := []string{ + "Started", + "Deploying", + "Booting", + "Testing", + } + BeforeEach(func() { + jc.EXPECT().GetDryad(j).Return(dryad, nil) + djm.EXPECT().Create(j, dryad, (chan<- DryadJobStatusChange)(h.(*DryaderImpl).listener)) + + h.Start(j) + + expectRegistered(1) + }) + + It("should ignore ID of not registered request", func() { + states := []DryadJobStatus{ + DJ_NEW, + DJ_DEPLOY, + DJ_BOOT, + DJ_TEST, + DJ_FAIL, + DJ_OK, + } + for _, s := range states { + change := DryadJobInfo{Job: JobID(0x0BCA), Status: s} + h.(*DryaderImpl).listener <- DryadJobStatusChange(change) + + expectRegistered(1) + } + }) + It("should update status of the Job", func() { + for i, s := range updateStates { + change := DryadJobInfo{Job: j, Status: s} + jc.EXPECT().SetStatusAndInfo(j, JOB_RUNNING, updateMsgs[i]) + + h.(*DryaderImpl).listener <- DryadJobStatusChange(change) + + expectRegistered(1) + } + }) + for i, s := range updateStates { // requires new registered job for every test. + It("should fail if updating status of the Job fails", func() { + change := DryadJobInfo{Job: j, Status: s} + jc.EXPECT().SetStatusAndInfo(j, JOB_RUNNING, updateMsgs[i]).Return(err) + + h.(*DryaderImpl).listener <- DryadJobStatusChange(change) + + eventuallyNoti(1, false, "Internal Weles error while changing Job status : test error") + eventuallyEmpty(1) + }) + } + It("should fail if Dryad Job fails", func() { + change := DryadJobInfo{Job: j, Status: DJ_FAIL} + + h.(*DryaderImpl).listener <- DryadJobStatusChange(change) + + eventuallyNoti(1, false, "Failed to execute test on Dryad.") + eventuallyEmpty(1) + }) + It("should notify about successfully completed Dryad Job", func() { + change := DryadJobInfo{Job: j, Status: DJ_OK} + + h.(*DryaderImpl).listener <- DryadJobStatusChange(change) + + eventuallyNoti(1, true, "") + eventuallyEmpty(1) + }) + + Describe("Cancel", func() { + It("should remove Job and cancel it in Dryad Job Manager", func() { + djm.EXPECT().Cancel(j) + + h.Cancel(j) + + eventuallyEmpty(1) + }) + It("should ignore djm's Cancel error", func() { + djm.EXPECT().Cancel(j).Return(err) + + h.Cancel(j) + + eventuallyEmpty(1) + }) + It("should ignore not existing request", func() { + h.Cancel(JobID(0x0BCA)) + expectRegistered(1) + }) + }) + }) +})