Implement WorkerChange in requests
authorLukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Tue, 24 Oct 2017 09:13:48 +0000 (11:13 +0200)
committerLukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Fri, 27 Apr 2018 15:43:52 +0000 (17:43 +0200)
Implementation covers OnWorkerIdle and OnWorkerFail methods.
The tests for both functions are available in requests_workerchange_test.go.

Change-Id: I0b35bf275e61448e2d9cf1ec943745592bb85394
Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
requests/requests.go
requests/requests_workerchange_test.go [new file with mode: 0644]

index 89253d6..ec4c596 100644 (file)
@@ -26,7 +26,7 @@ import (
 )
 
 // ReqsCollection contains information (also historical) about handled requests.
-// It implements Requests and RequestsManager interfaces.
+// It implements Requests, RequestsManager and WorkerChange interfaces.
 type ReqsCollection struct {
        requests          map[ReqID]*ReqInfo
        queue             *prioQueue
@@ -436,3 +436,27 @@ func (reqs *ReqsCollection) Run(rid ReqID, worker WorkerUUID) error {
 
        return nil
 }
+
+// OnWorkerIdle triggers ValidMatcher to rematch requests with idle worker.
+func (reqs *ReqsCollection) OnWorkerIdle(worker WorkerUUID) {
+       reqs.validAfterTimes.insert(requestTime{time: time.Now()})
+}
+
+// OnWorkerFail sets request being processed by failed worker into FAILED state.
+func (reqs *ReqsCollection) OnWorkerFail(worker WorkerUUID) {
+       reqs.mutex.Lock()
+       defer reqs.mutex.Unlock()
+
+       job, err := reqs.jobs.Get(worker)
+       if err != nil {
+               panic("no job related to running worker")
+       }
+
+       reqID := job.Req
+       req, ok := reqs.requests[reqID]
+       if !ok {
+               panic("request related to job not found")
+       }
+       reqs.jobs.Finish(worker)
+       req.State = FAILED
+}
diff --git a/requests/requests_workerchange_test.go b/requests/requests_workerchange_test.go
new file mode 100644 (file)
index 0000000..9090e15
--- /dev/null
@@ -0,0 +1,127 @@
+/*
+ *  Copyright (c) 2018 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+package requests
+
+import (
+       "errors"
+       "time"
+
+       . "git.tizen.org/tools/boruta"
+       "git.tizen.org/tools/boruta/workers"
+
+       gomock "github.com/golang/mock/gomock"
+       . "github.com/onsi/ginkgo"
+       . "github.com/onsi/gomega"
+)
+
+var _ = Describe("Requests as WorkerChange", func() {
+       var ctrl *gomock.Controller
+       var wm *MockWorkersManager
+       var jm *MockJobsManager
+       var R *ReqsCollection
+       testErr := errors.New("Test Error")
+       testWorker := WorkerUUID("Test Worker UUID")
+       noWorker := WorkerUUID("")
+       testCapabilities := Capabilities{"key": "value"}
+       testPriority := (HiPrio + LoPrio) / 2
+       testUser := UserInfo{Groups: []Group{"Test Group"}}
+       now := time.Now()
+       tomorrow := now.AddDate(0, 0, 1)
+       trigger := make(chan int, 1)
+
+       setTrigger := func(val int) {
+               trigger <- val
+       }
+       eventuallyTrigger := func(val int) {
+               EventuallyWithOffset(1, trigger).Should(Receive(Equal(val)))
+       }
+       eventuallyState := func(reqid ReqID, state ReqState) {
+               EventuallyWithOffset(1, func() ReqState {
+                       info, err := R.GetRequestInfo(reqid)
+                       ExpectWithOffset(1, err).NotTo(HaveOccurred())
+                       ExpectWithOffset(1, info).NotTo(BeNil())
+                       return info.State
+               }).Should(Equal(state))
+       }
+
+       BeforeEach(func() {
+               ctrl = gomock.NewController(GinkgoT())
+               wm = NewMockWorkersManager(ctrl)
+               jm = NewMockJobsManager(ctrl)
+               R = NewRequestQueue(wm, jm)
+       })
+       AfterEach(func() {
+               R.Finish()
+               ctrl.Finish()
+       })
+
+       Describe("OnWorkerIdle", func() {
+               It("ValidMatcher should do nothing if there are no waiting requests", func() {
+                       R.OnWorkerIdle(testWorker)
+               })
+               It("ValidMatcher should try matching request", func() {
+                       // Add Request. Use trigger to wait for ValidMatcher goroutine to match worker.
+                       wm.EXPECT().TakeBestMatchingWorker(testUser.Groups, testCapabilities).Return(noWorker, testErr).Do(func(Groups, Capabilities) {
+                               setTrigger(1)
+                       })
+                       reqid, err := R.NewRequest(testCapabilities, testPriority, testUser, now, tomorrow)
+                       Expect(err).NotTo(HaveOccurred())
+                       Expect(reqid).NotTo(BeZero())
+                       eventuallyTrigger(1)
+
+                       // Test. Use trigger to wait for ValidMatcher goroutine to match worker.
+                       wm.EXPECT().TakeBestMatchingWorker(testUser.Groups, testCapabilities).Return(noWorker, testErr).Do(func(Groups, Capabilities) {
+                               setTrigger(2)
+                       })
+                       R.OnWorkerIdle(testWorker)
+                       eventuallyTrigger(2)
+               })
+       })
+       Describe("OnWorkerFail", func() {
+               It("should panic if jobs.Get fails", func() {
+                       jm.EXPECT().Get(testWorker).Return(nil, testErr)
+                       Expect(func() {
+                               R.OnWorkerFail(testWorker)
+                       }).To(Panic())
+               })
+               It("should panic if failing worker was processing unknown Job", func() {
+                       noReq := ReqID(0)
+                       job := workers.Job{Req: noReq}
+                       jm.EXPECT().Get(testWorker).Return(&job, nil)
+                       Expect(func() {
+                               R.OnWorkerFail(testWorker)
+                       }).To(Panic())
+               })
+               It("should set request to FAILED state if call succeeds", func() {
+                       // Add Request. Use trigger to wait for ValidMatcher goroutine to match worker.
+                       wm.EXPECT().TakeBestMatchingWorker(testUser.Groups, testCapabilities).Return(noWorker, testErr).Do(func(Groups, Capabilities) {
+                               setTrigger(3)
+                       })
+                       reqid, err := R.NewRequest(testCapabilities, testPriority, testUser, now, tomorrow)
+                       Expect(err).NotTo(HaveOccurred())
+                       Expect(reqid).NotTo(BeZero())
+                       eventuallyTrigger(3)
+
+                       // Test.
+                       job := workers.Job{Req: reqid}
+                       jm.EXPECT().Get(testWorker).Return(&job, nil)
+                       jm.EXPECT().Finish(testWorker)
+                       R.OnWorkerFail(testWorker)
+                       eventuallyState(reqid, FAILED)
+               })
+       })
+})