Implement RequestsManager in ReqsCollection
authorLukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Tue, 10 Oct 2017 08:25:06 +0000 (10:25 +0200)
committerLukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Fri, 27 Apr 2018 15:34:20 +0000 (17:34 +0200)
Add implementation of RequestsManager interface by *ReqsCollection.
Implementation is required for internal boruta access to requests
structures to react on time passed events.

requests_requestsmanager_test.go file contains tests
of this implementation.

Change-Id: I2480138ca2625c8dca84b8191ef62d47fb9bc164
Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
errors.go
requests/requests.go
requests/requests_requestsmanager_test.go [new file with mode: 0644]

index 43f553e..3a35927 100644 (file)
--- a/errors.go
+++ b/errors.go
@@ -1,5 +1,5 @@
 /*
- *  Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *  Copyright (c) 2017-2018 Samsung Electronics Co., Ltd All Rights Reserved
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
 
 package boruta
 
-import "fmt"
+import (
+       "errors"
+       "fmt"
+)
 
 // NotFoundError is used whenever searched element is missing.
 type NotFoundError string
@@ -26,3 +29,8 @@ type NotFoundError string
 func (err NotFoundError) Error() string {
        return fmt.Sprintf("%s not found", string(err))
 }
+
+var (
+       // ErrInternalLogicError means that boruta's implementation has detected unexpected behaviour.
+       ErrInternalLogicError = errors.New("Boruta's internal logic error")
+)
index de4fb44..e2b177f 100644 (file)
@@ -25,11 +25,12 @@ import (
 )
 
 // ReqsCollection contains information (also historical) about handled requests.
-// It implements Requests interface.
+// It implements Requests and RequestsManager interfaces.
 type ReqsCollection struct {
-       requests map[ReqID]*ReqInfo
-       queue    *prioQueue
-       mutex    *sync.RWMutex
+       requests  map[ReqID]*ReqInfo
+       queue     *prioQueue
+       mutex     *sync.RWMutex
+       iterating bool
 }
 
 // NewRequestQueue provides initialized priority queue for requests.
@@ -90,11 +91,20 @@ func (reqs *ReqsCollection) NewRequest(caps Capabilities,
        return req.ID, nil
 }
 
-// CloseRequest is part of implementation of Requests interface. It checks that
-// request is in WAIT state and changes it to CANCEL or in INPROGRESS state and
-// changes it to DONE. NotFoundError may be returned if request with given reqID
-// doesn't exist in the queue or ErrModificationForbidden if request is in state
-// which can't be closed.
+// closeRequest is an internal ReqsCollection method for closing running request.
+// It is used by both Close and CloseRequest methods after verification that
+// all required conditions to close request are met.
+// The method must be called in reqs.mutex critical section.
+func (reqs *ReqsCollection) closeRequest(req *ReqInfo) {
+       req.State = DONE
+       // TODO(mwereski): release worker
+}
+
+// CloseRequest is part of implementation of Requests interface.
+// It checks that request is in WAIT state and changes it to CANCEL or
+// in INPROGRESS state and changes it to DONE. NotFoundError may be returned
+// if request with given reqID doesn't exist in the queue
+// or ErrModificationForbidden if request is in state which can't be closed.
 func (reqs *ReqsCollection) CloseRequest(reqID ReqID) error {
        reqs.mutex.Lock()
        defer reqs.mutex.Unlock()
@@ -107,8 +117,7 @@ func (reqs *ReqsCollection) CloseRequest(reqID ReqID) error {
                req.State = CANCEL
                reqs.queue.removeRequest(req)
        case INPROGRESS:
-               req.State = DONE
-               // TODO(mwereski): release worker
+               reqs.closeRequest(req)
        default:
                return ErrModificationForbidden
        }
@@ -182,11 +191,7 @@ func (reqs *ReqsCollection) UpdateRequest(src *ReqInfo) error {
 func (reqs *ReqsCollection) GetRequestInfo(reqID ReqID) (ReqInfo, error) {
        reqs.mutex.RLock()
        defer reqs.mutex.RUnlock()
-       req, ok := reqs.requests[reqID]
-       if !ok {
-               return ReqInfo{}, NotFoundError("Request")
-       }
-       return *req, nil
+       return reqs.Get(reqID)
 }
 
 // ListRequests is part of implementation of Requests interface. It returns slice
@@ -236,3 +241,122 @@ func (reqs *ReqsCollection) ProlongAccess(reqID ReqID) error {
        // TODO(mwereski): prolong access
        return nil
 }
+
+// InitIteration initializes queue iterator and sets global lock for requests
+// structures. It is part of implementation of RequestsManager interface.
+func (reqs *ReqsCollection) InitIteration() error {
+       reqs.mutex.Lock()
+       if reqs.iterating {
+               reqs.mutex.Unlock()
+               return ErrInternalLogicError
+       }
+       reqs.queue.initIterator()
+       reqs.iterating = true
+       return nil
+}
+
+// TerminateIteration releases queue iterator if iterations are in progress
+// and release global lock for requests structures.
+// It is part of implementation of RequestsManager interface.
+func (reqs *ReqsCollection) TerminateIteration() {
+       if reqs.iterating {
+               reqs.queue.releaseIterator()
+               reqs.iterating = false
+       }
+       reqs.mutex.Unlock()
+}
+
+// Next gets next ID from request queue. Method returns {ID, true} if there is
+// pending request or {ReqID(0), false} if queue's end has been reached.
+// It is part of implementation of RequestsManager interface.
+func (reqs *ReqsCollection) Next() (ReqID, bool) {
+       if reqs.iterating {
+               return reqs.queue.next()
+       }
+       panic("Should never call Next(), when not iterating")
+}
+
+// VerifyIfReady checks if the request is ready to be run on worker.
+// It is part of implementation of RequestsManager interface.
+func (reqs *ReqsCollection) VerifyIfReady(rid ReqID, now time.Time) bool {
+       req, ok := reqs.requests[rid]
+       return ok && req.State == WAIT && req.Deadline.After(now) && !req.ValidAfter.After(now)
+}
+
+// Get retrieves request's information structure for request with given ID.
+// It is part of implementation of RequestsManager interface.
+func (reqs *ReqsCollection) Get(rid ReqID) (ReqInfo, error) {
+       req, ok := reqs.requests[rid]
+       if !ok {
+               return ReqInfo{}, NotFoundError("Request")
+       }
+       return *req, nil
+}
+
+// Timeout sets request to TIMEOUT state after Deadline time is exceeded.
+// It is part of implementation of RequestsManager interface.
+func (reqs *ReqsCollection) Timeout(rid ReqID) error {
+       reqs.mutex.Lock()
+       defer reqs.mutex.Unlock()
+       req, ok := reqs.requests[rid]
+       if !ok {
+               return NotFoundError("Request")
+       }
+       if req.State != WAIT || req.Deadline.After(time.Now()) {
+               return ErrModificationForbidden
+       }
+       req.State = TIMEOUT
+       reqs.queue.removeRequest(req)
+       return nil
+}
+
+// Close verifies if request time has been exceeded and if so closes it.
+// If request is still valid to continue it's job an error is returned.
+// It is part of implementation of RequestsManager interface.
+func (reqs *ReqsCollection) Close(reqID ReqID) error {
+       reqs.mutex.Lock()
+       defer reqs.mutex.Unlock()
+       req, ok := reqs.requests[reqID]
+       if !ok {
+               return NotFoundError("Request")
+       }
+       if req.State != INPROGRESS {
+               return ErrModificationForbidden
+       }
+       if req.Job == nil {
+               // TODO log a critical logic error. Job should be assigned to the request
+               // in INPROGRESS state.
+               return ErrInternalLogicError
+       }
+       if req.Job.Timeout.After(time.Now()) {
+               // Request prolonged not yet ready to be closed because of timeout.
+               return ErrModificationForbidden
+       }
+
+       reqs.closeRequest(req)
+
+       return nil
+}
+
+// Run starts job performing the request on the worker.
+// It is part of implementation of RequestsManager interface.
+func (reqs *ReqsCollection) Run(rid ReqID, worker WorkerUUID) error {
+       req, ok := reqs.requests[rid]
+       if !ok {
+               return NotFoundError("Request")
+       }
+
+       if req.State != WAIT {
+               return ErrModificationForbidden
+       }
+       req.State = INPROGRESS
+
+       if reqs.iterating {
+               reqs.queue.releaseIterator()
+               reqs.iterating = false
+       }
+       reqs.queue.removeRequest(req)
+
+       // TODO(lwojciechow) assign req.Job.
+       return nil
+}
diff --git a/requests/requests_requestsmanager_test.go b/requests/requests_requestsmanager_test.go
new file mode 100644 (file)
index 0000000..014726d
--- /dev/null
@@ -0,0 +1,332 @@
+/*
+ *  Copyright (c) 2017-2018 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+package requests
+
+import (
+       "time"
+
+       . "git.tizen.org/tools/boruta"
+
+       . "github.com/onsi/ginkgo"
+       . "github.com/onsi/gomega"
+)
+
+var _ = Describe("Requests as RequestsManager", func() {
+       var R *ReqsCollection
+       BeforeEach(func() {
+               R = NewRequestQueue()
+       })
+       Describe("Iterations", func() {
+               var entered chan int
+               testMutex := func() {
+                       R.mutex.Lock()
+                       defer R.mutex.Unlock()
+                       entered <- 1
+               }
+               BeforeEach(func() {
+                       entered = make(chan int)
+               })
+               Describe("InitIteration", func() {
+                       It("should init iterations and lock requests mutex", func() {
+                               err := R.InitIteration()
+                               Expect(err).NotTo(HaveOccurred())
+                               Expect(R.iterating).To(BeTrue())
+
+                               // Verify that mutex is locked.
+                               go testMutex()
+                               Consistently(entered).ShouldNot(Receive())
+
+                               // Release the mutex
+                               R.mutex.Unlock()
+                               Eventually(entered).Should(Receive())
+                       })
+                       It("should return error and remain mutex unlocked if iterations are already started", func() {
+                               R.mutex.Lock()
+                               R.iterating = true
+                               R.mutex.Unlock()
+
+                               err := R.InitIteration()
+                               Expect(err).To(Equal(ErrInternalLogicError))
+
+                               // Verify that mutex is not locked.
+                               go testMutex()
+                               Eventually(entered).Should(Receive())
+                       })
+               })
+               Describe("TerminateIteration", func() {
+                       It("should terminate iterations and unlock requests mutex", func() {
+                               err := R.InitIteration()
+                               Expect(err).NotTo(HaveOccurred())
+
+                               R.TerminateIteration()
+                               Expect(R.iterating).To(BeFalse())
+
+                               // Verify that mutex is not locked.
+                               go testMutex()
+                               Eventually(entered).Should(Receive())
+                       })
+                       It("should just release mutex if iterations are not started", func() {
+                               R.mutex.Lock()
+
+                               R.TerminateIteration()
+
+                               // Verify that mutex is not locked.
+                               go testMutex()
+                               Eventually(entered).Should(Receive())
+                       })
+               })
+       })
+       Describe("Iterating over requests", func() {
+               verify := []ReqID{3, 5, 1, 2, 7, 4, 6}
+               BeforeEach(func() {
+                       now := time.Now()
+                       tomorrow := now.AddDate(0, 0, 1)
+                       insert := func(p Priority) {
+                               _, err := R.NewRequest(Capabilities{}, p, UserInfo{}, now, tomorrow)
+                               Expect(err).NotTo(HaveOccurred())
+                       }
+                       insert(3) //1
+                       insert(3) //2
+                       insert(1) //3
+                       insert(5) //4
+                       insert(1) //5
+                       insert(5) //6
+                       insert(3) //7
+               })
+               It("should properly iterate over requests", func() {
+                       reqs := make([]ReqID, 0)
+
+                       R.InitIteration()
+                       for r, ok := R.Next(); ok; r, ok = R.Next() {
+                               reqs = append(reqs, r)
+                       }
+                       R.TerminateIteration()
+
+                       Expect(reqs).To(Equal(verify))
+               })
+               It("should restart iterations in new critical section", func() {
+                       for times := 0; times < len(verify); times++ {
+                               reqs := make([]ReqID, 0)
+                               i := 0
+                               R.InitIteration()
+                               for r, ok := R.Next(); ok && i < times; r, ok = R.Next() {
+                                       reqs = append(reqs, r)
+                                       i++
+                               }
+                               R.TerminateIteration()
+                               Expect(reqs).To(Equal(verify[:times]))
+                       }
+               })
+               It("should panic if Next is called without InitIteration", func() {
+                       wrap := func() {
+                               R.mutex.Lock()
+                               defer R.mutex.Unlock()
+                               R.Next()
+                       }
+                       Expect(wrap).To(Panic())
+               })
+       })
+       Describe("With request in the queue", func() {
+               var now, tomorrow time.Time
+               var req, noreq ReqID
+               var rinfo *ReqInfo
+               BeforeEach(func() {
+                       now = time.Now()
+                       tomorrow = now.AddDate(0, 0, 1)
+                       var err error
+                       req, err = R.NewRequest(Capabilities{}, 3, UserInfo{}, now, tomorrow)
+                       Expect(err).NotTo(HaveOccurred())
+                       var ok bool
+                       rinfo, ok = R.requests[req]
+                       Expect(ok).To(BeTrue())
+                       noreq = req + 1
+               })
+               Describe("VerifyIfReady", func() {
+                       It("should fail if reqID is unknown", func() {
+                               Expect(R.VerifyIfReady(noreq, now)).To(BeFalse())
+                       })
+                       It("should fail if state is not WAIT", func() {
+                               states := []ReqState{INPROGRESS, CANCEL, TIMEOUT, INVALID, DONE, FAILED}
+                               for _, s := range states {
+                                       rinfo.State = s
+                                       Expect(R.VerifyIfReady(req, now)).To(BeFalse(), "state = %v", s)
+                               }
+                       })
+                       It("should fail if Deadline is reached or passed", func() {
+                               Expect(R.VerifyIfReady(req, tomorrow.Add(-time.Hour))).To(BeTrue())
+                               Expect(R.VerifyIfReady(req, tomorrow)).To(BeFalse())
+                               Expect(R.VerifyIfReady(req, tomorrow.Add(time.Hour))).To(BeFalse())
+                       })
+                       It("should fail if ValidAfter is in future", func() {
+                               Expect(R.VerifyIfReady(req, now.Add(-time.Hour))).To(BeFalse())
+                               Expect(R.VerifyIfReady(req, now)).To(BeTrue())
+                               Expect(R.VerifyIfReady(req, now.Add(time.Hour))).To(BeTrue())
+                       })
+                       It("should succeed if request is known, in WAIT state and now is between ValidAfter and Deadline", func() {
+                               Expect(R.VerifyIfReady(req, now.Add(12*time.Hour))).To(BeTrue())
+                       })
+               })
+               Describe("Get", func() {
+                       It("should fail if reqID is unknown", func() {
+                               r, err := R.Get(noreq)
+                               Expect(err).To(Equal(NotFoundError("Request")))
+                               Expect(r).To(Equal(ReqInfo{}))
+                       })
+                       It("should succeed if reqID is valid", func() {
+                               r, err := R.Get(req)
+                               Expect(err).NotTo(HaveOccurred())
+                               Expect(r).To(Equal(*rinfo))
+                       })
+               })
+               Describe("Timeout", func() {
+                       It("should fail if reqID is unknown", func() {
+                               Expect(R.queue.length).To(Equal(uint(1)))
+                               err := R.Timeout(noreq)
+                               Expect(err).To(Equal(NotFoundError("Request")))
+                               Expect(R.queue.length).To(Equal(uint(1)))
+                       })
+                       It("should fail if request is not in WAIT state", func() {
+                               rinfo.Deadline = now.Add(-time.Hour)
+                               Expect(R.queue.length).To(Equal(uint(1)))
+                               states := []ReqState{INPROGRESS, CANCEL, TIMEOUT, INVALID, DONE, FAILED}
+                               for _, s := range states {
+                                       rinfo.State = s
+                                       err := R.Timeout(req)
+                                       Expect(err).To(Equal(ErrModificationForbidden), "state = %v", s)
+                                       Expect(R.queue.length).To(Equal(uint(1)), "state = %v", s)
+                               }
+                       })
+                       It("should fail if deadline is in the future", func() {
+                               Expect(R.queue.length).To(Equal(uint(1)))
+                               err := R.Timeout(req)
+                               Expect(err).To(Equal(ErrModificationForbidden))
+                               Expect(R.queue.length).To(Equal(uint(1)))
+                       })
+                       It("should pass if deadline is past", func() {
+                               rinfo.Deadline = now.Add(-time.Hour)
+                               Expect(R.queue.length).To(Equal(uint(1)))
+                               err := R.Timeout(req)
+                               Expect(err).NotTo(HaveOccurred())
+                               Expect(rinfo.State).To(Equal(TIMEOUT))
+                               Expect(R.queue.length).To(BeZero())
+                       })
+               })
+               Describe("Close", func() {
+                       It("should fail if reqID is unknown", func() {
+                               err := R.Close(noreq)
+                               Expect(err).To(Equal(NotFoundError("Request")))
+                       })
+                       It("should fail if request is not in INPROGRESS state", func() {
+                               states := []ReqState{WAIT, CANCEL, TIMEOUT, INVALID, DONE, FAILED}
+                               for _, state := range states {
+                                       R.mutex.Lock()
+                                       rinfo.State = state
+                                       R.mutex.Unlock()
+
+                                       err := R.Close(req)
+                                       Expect(err).To(Equal(ErrModificationForbidden), "state = %s", state)
+                               }
+                       })
+                       It("should fail if request has no job assigned", func() {
+                               R.mutex.Lock()
+                               rinfo.State = INPROGRESS
+                               Expect(rinfo.Job).To(BeNil())
+                               R.mutex.Unlock()
+
+                               err := R.Close(req)
+                               Expect(err).To(Equal(ErrInternalLogicError))
+                       })
+                       It("should fail if job's is not yet timed out", func() {
+                               R.mutex.Lock()
+                               rinfo.State = INPROGRESS
+                               rinfo.Job = &JobInfo{
+                                       Timeout: time.Now().AddDate(0, 0, 1),
+                               }
+                               R.mutex.Unlock()
+
+                               err := R.Close(req)
+                               Expect(err).To(Equal(ErrModificationForbidden))
+                       })
+                       It("should close request and release worker", func() {
+                               R.mutex.Lock()
+                               rinfo.State = INPROGRESS
+                               rinfo.Job = &JobInfo{
+                                       Timeout: time.Now().AddDate(0, 0, -1),
+                               }
+                               R.mutex.Unlock()
+
+                               err := R.Close(req)
+                               Expect(err).NotTo(HaveOccurred())
+                               Expect(rinfo.State).To(Equal(DONE))
+                               // TODO verify releasing worker when implemented
+                       })
+               })
+               Describe("Run", func() {
+                       It("should fail if reqID is unknown", func() {
+                               R.mutex.Lock()
+                               defer R.mutex.Unlock()
+                               Expect(R.queue.length).To(Equal(uint(1)))
+                               err := R.Run(noreq, WorkerUUID("TestWorker"))
+                               Expect(err).To(Equal(NotFoundError("Request")))
+                               Expect(R.queue.length).To(Equal(uint(1)))
+                       })
+                       It("should fail if request is not in WAIT state", func() {
+                               states := []ReqState{INPROGRESS, CANCEL, TIMEOUT, INVALID, DONE, FAILED}
+                               for _, state := range states {
+                                       R.InitIteration()
+                                       Expect(R.queue.length).To(Equal(uint(1)), "state = %s", state)
+                                       rinfo.State = state
+                                       err := R.Run(req, WorkerUUID("TestWorker"))
+                                       Expect(err).To(Equal(ErrModificationForbidden), "state = %s", state)
+                                       Expect(R.queue.length).To(Equal(uint(1)), "state = %s", state)
+                                       R.TerminateIteration()
+                               }
+                       })
+                       It("should fail if reqID is unknown during iteration", func() {
+                               R.InitIteration()
+                               defer R.TerminateIteration()
+                               Expect(R.iterating).To(BeTrue())
+                               Expect(R.queue.length).To(Equal(uint(1)))
+                               err := R.Run(noreq, WorkerUUID("TestWorker"))
+                               Expect(err).To(Equal(NotFoundError("Request")))
+                               Expect(R.iterating).To(BeTrue())
+                               Expect(R.queue.length).To(Equal(uint(1)))
+                       })
+                       It("should start progress for valid reqID", func() {
+                               R.InitIteration()
+                               defer R.TerminateIteration()
+                               Expect(R.queue.length).To(Equal(uint(1)))
+                               err := R.Run(req, WorkerUUID("TestWorker"))
+                               Expect(err).NotTo(HaveOccurred())
+                               Expect(rinfo.State).To(Equal(INPROGRESS))
+                               Expect(R.queue.length).To(BeZero())
+                       })
+                       It("should start progress and break iterations when iterating", func() {
+                               R.InitIteration()
+                               defer R.TerminateIteration()
+                               Expect(R.queue.length).To(Equal(uint(1)))
+                               Expect(R.iterating).To(BeTrue())
+                               err := R.Run(req, WorkerUUID("TestWorker"))
+                               Expect(err).NotTo(HaveOccurred())
+                               Expect(rinfo.State).To(Equal(INPROGRESS))
+                               Expect(R.iterating).To(BeFalse())
+                               Expect(R.queue.length).To(BeZero())
+                       })
+                       // TODO use and verify Job when Run's implementation is complete.
+               })
+       })
+})