From: Lukasz Wojciechowski Date: Tue, 10 Oct 2017 08:25:06 +0000 (+0200) Subject: Implement RequestsManager in ReqsCollection X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=50253707e0a543c24efa5c5f189e6a5f17e27794;p=tools%2Fboruta.git Implement RequestsManager in ReqsCollection Add implementation of RequestsManager interface by *ReqsCollection. Implementation is required for internal boruta access to requests structures to react on time passed events. requests_requestsmanager_test.go file contains tests of this implementation. Change-Id: I2480138ca2625c8dca84b8191ef62d47fb9bc164 Signed-off-by: Lukasz Wojciechowski --- diff --git a/errors.go b/errors.go index 43f553e..3a35927 100644 --- a/errors.go +++ b/errors.go @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved + * Copyright (c) 2017-2018 Samsung Electronics Co., Ltd All Rights Reserved * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,10 @@ package boruta -import "fmt" +import ( + "errors" + "fmt" +) // NotFoundError is used whenever searched element is missing. type NotFoundError string @@ -26,3 +29,8 @@ type NotFoundError string func (err NotFoundError) Error() string { return fmt.Sprintf("%s not found", string(err)) } + +var ( + // ErrInternalLogicError means that boruta's implementation has detected unexpected behaviour. + ErrInternalLogicError = errors.New("Boruta's internal logic error") +) diff --git a/requests/requests.go b/requests/requests.go index de4fb44..e2b177f 100644 --- a/requests/requests.go +++ b/requests/requests.go @@ -25,11 +25,12 @@ import ( ) // ReqsCollection contains information (also historical) about handled requests. -// It implements Requests interface. +// It implements Requests and RequestsManager interfaces. type ReqsCollection struct { - requests map[ReqID]*ReqInfo - queue *prioQueue - mutex *sync.RWMutex + requests map[ReqID]*ReqInfo + queue *prioQueue + mutex *sync.RWMutex + iterating bool } // NewRequestQueue provides initialized priority queue for requests. @@ -90,11 +91,20 @@ func (reqs *ReqsCollection) NewRequest(caps Capabilities, return req.ID, nil } -// CloseRequest is part of implementation of Requests interface. It checks that -// request is in WAIT state and changes it to CANCEL or in INPROGRESS state and -// changes it to DONE. NotFoundError may be returned if request with given reqID -// doesn't exist in the queue or ErrModificationForbidden if request is in state -// which can't be closed. +// closeRequest is an internal ReqsCollection method for closing running request. +// It is used by both Close and CloseRequest methods after verification that +// all required conditions to close request are met. +// The method must be called in reqs.mutex critical section. +func (reqs *ReqsCollection) closeRequest(req *ReqInfo) { + req.State = DONE + // TODO(mwereski): release worker +} + +// CloseRequest is part of implementation of Requests interface. +// It checks that request is in WAIT state and changes it to CANCEL or +// in INPROGRESS state and changes it to DONE. NotFoundError may be returned +// if request with given reqID doesn't exist in the queue +// or ErrModificationForbidden if request is in state which can't be closed. func (reqs *ReqsCollection) CloseRequest(reqID ReqID) error { reqs.mutex.Lock() defer reqs.mutex.Unlock() @@ -107,8 +117,7 @@ func (reqs *ReqsCollection) CloseRequest(reqID ReqID) error { req.State = CANCEL reqs.queue.removeRequest(req) case INPROGRESS: - req.State = DONE - // TODO(mwereski): release worker + reqs.closeRequest(req) default: return ErrModificationForbidden } @@ -182,11 +191,7 @@ func (reqs *ReqsCollection) UpdateRequest(src *ReqInfo) error { func (reqs *ReqsCollection) GetRequestInfo(reqID ReqID) (ReqInfo, error) { reqs.mutex.RLock() defer reqs.mutex.RUnlock() - req, ok := reqs.requests[reqID] - if !ok { - return ReqInfo{}, NotFoundError("Request") - } - return *req, nil + return reqs.Get(reqID) } // ListRequests is part of implementation of Requests interface. It returns slice @@ -236,3 +241,122 @@ func (reqs *ReqsCollection) ProlongAccess(reqID ReqID) error { // TODO(mwereski): prolong access return nil } + +// InitIteration initializes queue iterator and sets global lock for requests +// structures. It is part of implementation of RequestsManager interface. +func (reqs *ReqsCollection) InitIteration() error { + reqs.mutex.Lock() + if reqs.iterating { + reqs.mutex.Unlock() + return ErrInternalLogicError + } + reqs.queue.initIterator() + reqs.iterating = true + return nil +} + +// TerminateIteration releases queue iterator if iterations are in progress +// and release global lock for requests structures. +// It is part of implementation of RequestsManager interface. +func (reqs *ReqsCollection) TerminateIteration() { + if reqs.iterating { + reqs.queue.releaseIterator() + reqs.iterating = false + } + reqs.mutex.Unlock() +} + +// Next gets next ID from request queue. Method returns {ID, true} if there is +// pending request or {ReqID(0), false} if queue's end has been reached. +// It is part of implementation of RequestsManager interface. +func (reqs *ReqsCollection) Next() (ReqID, bool) { + if reqs.iterating { + return reqs.queue.next() + } + panic("Should never call Next(), when not iterating") +} + +// VerifyIfReady checks if the request is ready to be run on worker. +// It is part of implementation of RequestsManager interface. +func (reqs *ReqsCollection) VerifyIfReady(rid ReqID, now time.Time) bool { + req, ok := reqs.requests[rid] + return ok && req.State == WAIT && req.Deadline.After(now) && !req.ValidAfter.After(now) +} + +// Get retrieves request's information structure for request with given ID. +// It is part of implementation of RequestsManager interface. +func (reqs *ReqsCollection) Get(rid ReqID) (ReqInfo, error) { + req, ok := reqs.requests[rid] + if !ok { + return ReqInfo{}, NotFoundError("Request") + } + return *req, nil +} + +// Timeout sets request to TIMEOUT state after Deadline time is exceeded. +// It is part of implementation of RequestsManager interface. +func (reqs *ReqsCollection) Timeout(rid ReqID) error { + reqs.mutex.Lock() + defer reqs.mutex.Unlock() + req, ok := reqs.requests[rid] + if !ok { + return NotFoundError("Request") + } + if req.State != WAIT || req.Deadline.After(time.Now()) { + return ErrModificationForbidden + } + req.State = TIMEOUT + reqs.queue.removeRequest(req) + return nil +} + +// Close verifies if request time has been exceeded and if so closes it. +// If request is still valid to continue it's job an error is returned. +// It is part of implementation of RequestsManager interface. +func (reqs *ReqsCollection) Close(reqID ReqID) error { + reqs.mutex.Lock() + defer reqs.mutex.Unlock() + req, ok := reqs.requests[reqID] + if !ok { + return NotFoundError("Request") + } + if req.State != INPROGRESS { + return ErrModificationForbidden + } + if req.Job == nil { + // TODO log a critical logic error. Job should be assigned to the request + // in INPROGRESS state. + return ErrInternalLogicError + } + if req.Job.Timeout.After(time.Now()) { + // Request prolonged not yet ready to be closed because of timeout. + return ErrModificationForbidden + } + + reqs.closeRequest(req) + + return nil +} + +// Run starts job performing the request on the worker. +// It is part of implementation of RequestsManager interface. +func (reqs *ReqsCollection) Run(rid ReqID, worker WorkerUUID) error { + req, ok := reqs.requests[rid] + if !ok { + return NotFoundError("Request") + } + + if req.State != WAIT { + return ErrModificationForbidden + } + req.State = INPROGRESS + + if reqs.iterating { + reqs.queue.releaseIterator() + reqs.iterating = false + } + reqs.queue.removeRequest(req) + + // TODO(lwojciechow) assign req.Job. + return nil +} diff --git a/requests/requests_requestsmanager_test.go b/requests/requests_requestsmanager_test.go new file mode 100644 index 0000000..014726d --- /dev/null +++ b/requests/requests_requestsmanager_test.go @@ -0,0 +1,332 @@ +/* + * Copyright (c) 2017-2018 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package requests + +import ( + "time" + + . "git.tizen.org/tools/boruta" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Requests as RequestsManager", func() { + var R *ReqsCollection + BeforeEach(func() { + R = NewRequestQueue() + }) + Describe("Iterations", func() { + var entered chan int + testMutex := func() { + R.mutex.Lock() + defer R.mutex.Unlock() + entered <- 1 + } + BeforeEach(func() { + entered = make(chan int) + }) + Describe("InitIteration", func() { + It("should init iterations and lock requests mutex", func() { + err := R.InitIteration() + Expect(err).NotTo(HaveOccurred()) + Expect(R.iterating).To(BeTrue()) + + // Verify that mutex is locked. + go testMutex() + Consistently(entered).ShouldNot(Receive()) + + // Release the mutex + R.mutex.Unlock() + Eventually(entered).Should(Receive()) + }) + It("should return error and remain mutex unlocked if iterations are already started", func() { + R.mutex.Lock() + R.iterating = true + R.mutex.Unlock() + + err := R.InitIteration() + Expect(err).To(Equal(ErrInternalLogicError)) + + // Verify that mutex is not locked. + go testMutex() + Eventually(entered).Should(Receive()) + }) + }) + Describe("TerminateIteration", func() { + It("should terminate iterations and unlock requests mutex", func() { + err := R.InitIteration() + Expect(err).NotTo(HaveOccurred()) + + R.TerminateIteration() + Expect(R.iterating).To(BeFalse()) + + // Verify that mutex is not locked. + go testMutex() + Eventually(entered).Should(Receive()) + }) + It("should just release mutex if iterations are not started", func() { + R.mutex.Lock() + + R.TerminateIteration() + + // Verify that mutex is not locked. + go testMutex() + Eventually(entered).Should(Receive()) + }) + }) + }) + Describe("Iterating over requests", func() { + verify := []ReqID{3, 5, 1, 2, 7, 4, 6} + BeforeEach(func() { + now := time.Now() + tomorrow := now.AddDate(0, 0, 1) + insert := func(p Priority) { + _, err := R.NewRequest(Capabilities{}, p, UserInfo{}, now, tomorrow) + Expect(err).NotTo(HaveOccurred()) + } + insert(3) //1 + insert(3) //2 + insert(1) //3 + insert(5) //4 + insert(1) //5 + insert(5) //6 + insert(3) //7 + }) + It("should properly iterate over requests", func() { + reqs := make([]ReqID, 0) + + R.InitIteration() + for r, ok := R.Next(); ok; r, ok = R.Next() { + reqs = append(reqs, r) + } + R.TerminateIteration() + + Expect(reqs).To(Equal(verify)) + }) + It("should restart iterations in new critical section", func() { + for times := 0; times < len(verify); times++ { + reqs := make([]ReqID, 0) + i := 0 + R.InitIteration() + for r, ok := R.Next(); ok && i < times; r, ok = R.Next() { + reqs = append(reqs, r) + i++ + } + R.TerminateIteration() + Expect(reqs).To(Equal(verify[:times])) + } + }) + It("should panic if Next is called without InitIteration", func() { + wrap := func() { + R.mutex.Lock() + defer R.mutex.Unlock() + R.Next() + } + Expect(wrap).To(Panic()) + }) + }) + Describe("With request in the queue", func() { + var now, tomorrow time.Time + var req, noreq ReqID + var rinfo *ReqInfo + BeforeEach(func() { + now = time.Now() + tomorrow = now.AddDate(0, 0, 1) + var err error + req, err = R.NewRequest(Capabilities{}, 3, UserInfo{}, now, tomorrow) + Expect(err).NotTo(HaveOccurred()) + var ok bool + rinfo, ok = R.requests[req] + Expect(ok).To(BeTrue()) + noreq = req + 1 + }) + Describe("VerifyIfReady", func() { + It("should fail if reqID is unknown", func() { + Expect(R.VerifyIfReady(noreq, now)).To(BeFalse()) + }) + It("should fail if state is not WAIT", func() { + states := []ReqState{INPROGRESS, CANCEL, TIMEOUT, INVALID, DONE, FAILED} + for _, s := range states { + rinfo.State = s + Expect(R.VerifyIfReady(req, now)).To(BeFalse(), "state = %v", s) + } + }) + It("should fail if Deadline is reached or passed", func() { + Expect(R.VerifyIfReady(req, tomorrow.Add(-time.Hour))).To(BeTrue()) + Expect(R.VerifyIfReady(req, tomorrow)).To(BeFalse()) + Expect(R.VerifyIfReady(req, tomorrow.Add(time.Hour))).To(BeFalse()) + }) + It("should fail if ValidAfter is in future", func() { + Expect(R.VerifyIfReady(req, now.Add(-time.Hour))).To(BeFalse()) + Expect(R.VerifyIfReady(req, now)).To(BeTrue()) + Expect(R.VerifyIfReady(req, now.Add(time.Hour))).To(BeTrue()) + }) + It("should succeed if request is known, in WAIT state and now is between ValidAfter and Deadline", func() { + Expect(R.VerifyIfReady(req, now.Add(12*time.Hour))).To(BeTrue()) + }) + }) + Describe("Get", func() { + It("should fail if reqID is unknown", func() { + r, err := R.Get(noreq) + Expect(err).To(Equal(NotFoundError("Request"))) + Expect(r).To(Equal(ReqInfo{})) + }) + It("should succeed if reqID is valid", func() { + r, err := R.Get(req) + Expect(err).NotTo(HaveOccurred()) + Expect(r).To(Equal(*rinfo)) + }) + }) + Describe("Timeout", func() { + It("should fail if reqID is unknown", func() { + Expect(R.queue.length).To(Equal(uint(1))) + err := R.Timeout(noreq) + Expect(err).To(Equal(NotFoundError("Request"))) + Expect(R.queue.length).To(Equal(uint(1))) + }) + It("should fail if request is not in WAIT state", func() { + rinfo.Deadline = now.Add(-time.Hour) + Expect(R.queue.length).To(Equal(uint(1))) + states := []ReqState{INPROGRESS, CANCEL, TIMEOUT, INVALID, DONE, FAILED} + for _, s := range states { + rinfo.State = s + err := R.Timeout(req) + Expect(err).To(Equal(ErrModificationForbidden), "state = %v", s) + Expect(R.queue.length).To(Equal(uint(1)), "state = %v", s) + } + }) + It("should fail if deadline is in the future", func() { + Expect(R.queue.length).To(Equal(uint(1))) + err := R.Timeout(req) + Expect(err).To(Equal(ErrModificationForbidden)) + Expect(R.queue.length).To(Equal(uint(1))) + }) + It("should pass if deadline is past", func() { + rinfo.Deadline = now.Add(-time.Hour) + Expect(R.queue.length).To(Equal(uint(1))) + err := R.Timeout(req) + Expect(err).NotTo(HaveOccurred()) + Expect(rinfo.State).To(Equal(TIMEOUT)) + Expect(R.queue.length).To(BeZero()) + }) + }) + Describe("Close", func() { + It("should fail if reqID is unknown", func() { + err := R.Close(noreq) + Expect(err).To(Equal(NotFoundError("Request"))) + }) + It("should fail if request is not in INPROGRESS state", func() { + states := []ReqState{WAIT, CANCEL, TIMEOUT, INVALID, DONE, FAILED} + for _, state := range states { + R.mutex.Lock() + rinfo.State = state + R.mutex.Unlock() + + err := R.Close(req) + Expect(err).To(Equal(ErrModificationForbidden), "state = %s", state) + } + }) + It("should fail if request has no job assigned", func() { + R.mutex.Lock() + rinfo.State = INPROGRESS + Expect(rinfo.Job).To(BeNil()) + R.mutex.Unlock() + + err := R.Close(req) + Expect(err).To(Equal(ErrInternalLogicError)) + }) + It("should fail if job's is not yet timed out", func() { + R.mutex.Lock() + rinfo.State = INPROGRESS + rinfo.Job = &JobInfo{ + Timeout: time.Now().AddDate(0, 0, 1), + } + R.mutex.Unlock() + + err := R.Close(req) + Expect(err).To(Equal(ErrModificationForbidden)) + }) + It("should close request and release worker", func() { + R.mutex.Lock() + rinfo.State = INPROGRESS + rinfo.Job = &JobInfo{ + Timeout: time.Now().AddDate(0, 0, -1), + } + R.mutex.Unlock() + + err := R.Close(req) + Expect(err).NotTo(HaveOccurred()) + Expect(rinfo.State).To(Equal(DONE)) + // TODO verify releasing worker when implemented + }) + }) + Describe("Run", func() { + It("should fail if reqID is unknown", func() { + R.mutex.Lock() + defer R.mutex.Unlock() + Expect(R.queue.length).To(Equal(uint(1))) + err := R.Run(noreq, WorkerUUID("TestWorker")) + Expect(err).To(Equal(NotFoundError("Request"))) + Expect(R.queue.length).To(Equal(uint(1))) + }) + It("should fail if request is not in WAIT state", func() { + states := []ReqState{INPROGRESS, CANCEL, TIMEOUT, INVALID, DONE, FAILED} + for _, state := range states { + R.InitIteration() + Expect(R.queue.length).To(Equal(uint(1)), "state = %s", state) + rinfo.State = state + err := R.Run(req, WorkerUUID("TestWorker")) + Expect(err).To(Equal(ErrModificationForbidden), "state = %s", state) + Expect(R.queue.length).To(Equal(uint(1)), "state = %s", state) + R.TerminateIteration() + } + }) + It("should fail if reqID is unknown during iteration", func() { + R.InitIteration() + defer R.TerminateIteration() + Expect(R.iterating).To(BeTrue()) + Expect(R.queue.length).To(Equal(uint(1))) + err := R.Run(noreq, WorkerUUID("TestWorker")) + Expect(err).To(Equal(NotFoundError("Request"))) + Expect(R.iterating).To(BeTrue()) + Expect(R.queue.length).To(Equal(uint(1))) + }) + It("should start progress for valid reqID", func() { + R.InitIteration() + defer R.TerminateIteration() + Expect(R.queue.length).To(Equal(uint(1))) + err := R.Run(req, WorkerUUID("TestWorker")) + Expect(err).NotTo(HaveOccurred()) + Expect(rinfo.State).To(Equal(INPROGRESS)) + Expect(R.queue.length).To(BeZero()) + }) + It("should start progress and break iterations when iterating", func() { + R.InitIteration() + defer R.TerminateIteration() + Expect(R.queue.length).To(Equal(uint(1))) + Expect(R.iterating).To(BeTrue()) + err := R.Run(req, WorkerUUID("TestWorker")) + Expect(err).NotTo(HaveOccurred()) + Expect(rinfo.State).To(Equal(INPROGRESS)) + Expect(R.iterating).To(BeFalse()) + Expect(R.queue.length).To(BeZero()) + }) + // TODO use and verify Job when Run's implementation is complete. + }) + }) +})