From: Lukasz Wojciechowski Date: Tue, 24 Oct 2017 09:13:48 +0000 (+0200) Subject: Implement WorkerChange in requests X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=5f21ae58229a35be363f71ed45d05fc7d5130185;p=tools%2Fboruta.git Implement WorkerChange in requests Implementation covers OnWorkerIdle and OnWorkerFail methods. The tests for both functions are available in requests_workerchange_test.go. Change-Id: I0b35bf275e61448e2d9cf1ec943745592bb85394 Signed-off-by: Lukasz Wojciechowski --- diff --git a/requests/requests.go b/requests/requests.go index 89253d6..ec4c596 100644 --- a/requests/requests.go +++ b/requests/requests.go @@ -26,7 +26,7 @@ import ( ) // ReqsCollection contains information (also historical) about handled requests. -// It implements Requests and RequestsManager interfaces. +// It implements Requests, RequestsManager and WorkerChange interfaces. type ReqsCollection struct { requests map[ReqID]*ReqInfo queue *prioQueue @@ -436,3 +436,27 @@ func (reqs *ReqsCollection) Run(rid ReqID, worker WorkerUUID) error { return nil } + +// OnWorkerIdle triggers ValidMatcher to rematch requests with idle worker. +func (reqs *ReqsCollection) OnWorkerIdle(worker WorkerUUID) { + reqs.validAfterTimes.insert(requestTime{time: time.Now()}) +} + +// OnWorkerFail sets request being processed by failed worker into FAILED state. +func (reqs *ReqsCollection) OnWorkerFail(worker WorkerUUID) { + reqs.mutex.Lock() + defer reqs.mutex.Unlock() + + job, err := reqs.jobs.Get(worker) + if err != nil { + panic("no job related to running worker") + } + + reqID := job.Req + req, ok := reqs.requests[reqID] + if !ok { + panic("request related to job not found") + } + reqs.jobs.Finish(worker) + req.State = FAILED +} diff --git a/requests/requests_workerchange_test.go b/requests/requests_workerchange_test.go new file mode 100644 index 0000000..9090e15 --- /dev/null +++ b/requests/requests_workerchange_test.go @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2018 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package requests + +import ( + "errors" + "time" + + . "git.tizen.org/tools/boruta" + "git.tizen.org/tools/boruta/workers" + + gomock "github.com/golang/mock/gomock" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Requests as WorkerChange", func() { + var ctrl *gomock.Controller + var wm *MockWorkersManager + var jm *MockJobsManager + var R *ReqsCollection + testErr := errors.New("Test Error") + testWorker := WorkerUUID("Test Worker UUID") + noWorker := WorkerUUID("") + testCapabilities := Capabilities{"key": "value"} + testPriority := (HiPrio + LoPrio) / 2 + testUser := UserInfo{Groups: []Group{"Test Group"}} + now := time.Now() + tomorrow := now.AddDate(0, 0, 1) + trigger := make(chan int, 1) + + setTrigger := func(val int) { + trigger <- val + } + eventuallyTrigger := func(val int) { + EventuallyWithOffset(1, trigger).Should(Receive(Equal(val))) + } + eventuallyState := func(reqid ReqID, state ReqState) { + EventuallyWithOffset(1, func() ReqState { + info, err := R.GetRequestInfo(reqid) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + ExpectWithOffset(1, info).NotTo(BeNil()) + return info.State + }).Should(Equal(state)) + } + + BeforeEach(func() { + ctrl = gomock.NewController(GinkgoT()) + wm = NewMockWorkersManager(ctrl) + jm = NewMockJobsManager(ctrl) + R = NewRequestQueue(wm, jm) + }) + AfterEach(func() { + R.Finish() + ctrl.Finish() + }) + + Describe("OnWorkerIdle", func() { + It("ValidMatcher should do nothing if there are no waiting requests", func() { + R.OnWorkerIdle(testWorker) + }) + It("ValidMatcher should try matching request", func() { + // Add Request. Use trigger to wait for ValidMatcher goroutine to match worker. + wm.EXPECT().TakeBestMatchingWorker(testUser.Groups, testCapabilities).Return(noWorker, testErr).Do(func(Groups, Capabilities) { + setTrigger(1) + }) + reqid, err := R.NewRequest(testCapabilities, testPriority, testUser, now, tomorrow) + Expect(err).NotTo(HaveOccurred()) + Expect(reqid).NotTo(BeZero()) + eventuallyTrigger(1) + + // Test. Use trigger to wait for ValidMatcher goroutine to match worker. + wm.EXPECT().TakeBestMatchingWorker(testUser.Groups, testCapabilities).Return(noWorker, testErr).Do(func(Groups, Capabilities) { + setTrigger(2) + }) + R.OnWorkerIdle(testWorker) + eventuallyTrigger(2) + }) + }) + Describe("OnWorkerFail", func() { + It("should panic if jobs.Get fails", func() { + jm.EXPECT().Get(testWorker).Return(nil, testErr) + Expect(func() { + R.OnWorkerFail(testWorker) + }).To(Panic()) + }) + It("should panic if failing worker was processing unknown Job", func() { + noReq := ReqID(0) + job := workers.Job{Req: noReq} + jm.EXPECT().Get(testWorker).Return(&job, nil) + Expect(func() { + R.OnWorkerFail(testWorker) + }).To(Panic()) + }) + It("should set request to FAILED state if call succeeds", func() { + // Add Request. Use trigger to wait for ValidMatcher goroutine to match worker. + wm.EXPECT().TakeBestMatchingWorker(testUser.Groups, testCapabilities).Return(noWorker, testErr).Do(func(Groups, Capabilities) { + setTrigger(3) + }) + reqid, err := R.NewRequest(testCapabilities, testPriority, testUser, now, tomorrow) + Expect(err).NotTo(HaveOccurred()) + Expect(reqid).NotTo(BeZero()) + eventuallyTrigger(3) + + // Test. + job := workers.Job{Req: reqid} + jm.EXPECT().Get(testWorker).Return(&job, nil) + jm.EXPECT().Finish(testWorker) + R.OnWorkerFail(testWorker) + eventuallyState(reqid, FAILED) + }) + }) +})