)
// ReqsCollection contains information (also historical) about handled requests.
-// It implements Requests interface.
+// It implements Requests and RequestsManager interfaces.
type ReqsCollection struct {
- requests map[ReqID]*ReqInfo
- queue *prioQueue
- mutex *sync.RWMutex
+ requests map[ReqID]*ReqInfo
+ queue *prioQueue
+ mutex *sync.RWMutex
+ iterating bool
}
// NewRequestQueue provides initialized priority queue for requests.
return req.ID, nil
}
-// CloseRequest is part of implementation of Requests interface. It checks that
-// request is in WAIT state and changes it to CANCEL or in INPROGRESS state and
-// changes it to DONE. NotFoundError may be returned if request with given reqID
-// doesn't exist in the queue or ErrModificationForbidden if request is in state
-// which can't be closed.
+// closeRequest is an internal ReqsCollection method for closing running request.
+// It is used by both Close and CloseRequest methods after verification that
+// all required conditions to close request are met.
+// The method must be called in reqs.mutex critical section.
+func (reqs *ReqsCollection) closeRequest(req *ReqInfo) {
+ req.State = DONE
+ // TODO(mwereski): release worker
+}
+
+// CloseRequest is part of implementation of Requests interface.
+// It checks that request is in WAIT state and changes it to CANCEL or
+// in INPROGRESS state and changes it to DONE. NotFoundError may be returned
+// if request with given reqID doesn't exist in the queue
+// or ErrModificationForbidden if request is in state which can't be closed.
func (reqs *ReqsCollection) CloseRequest(reqID ReqID) error {
reqs.mutex.Lock()
defer reqs.mutex.Unlock()
req.State = CANCEL
reqs.queue.removeRequest(req)
case INPROGRESS:
- req.State = DONE
- // TODO(mwereski): release worker
+ reqs.closeRequest(req)
default:
return ErrModificationForbidden
}
func (reqs *ReqsCollection) GetRequestInfo(reqID ReqID) (ReqInfo, error) {
reqs.mutex.RLock()
defer reqs.mutex.RUnlock()
- req, ok := reqs.requests[reqID]
- if !ok {
- return ReqInfo{}, NotFoundError("Request")
- }
- return *req, nil
+ return reqs.Get(reqID)
}
// ListRequests is part of implementation of Requests interface. It returns slice
// TODO(mwereski): prolong access
return nil
}
+
+// InitIteration initializes queue iterator and sets global lock for requests
+// structures. It is part of implementation of RequestsManager interface.
+func (reqs *ReqsCollection) InitIteration() error {
+ reqs.mutex.Lock()
+ if reqs.iterating {
+ reqs.mutex.Unlock()
+ return ErrInternalLogicError
+ }
+ reqs.queue.initIterator()
+ reqs.iterating = true
+ return nil
+}
+
+// TerminateIteration releases queue iterator if iterations are in progress
+// and release global lock for requests structures.
+// It is part of implementation of RequestsManager interface.
+func (reqs *ReqsCollection) TerminateIteration() {
+ if reqs.iterating {
+ reqs.queue.releaseIterator()
+ reqs.iterating = false
+ }
+ reqs.mutex.Unlock()
+}
+
+// Next gets next ID from request queue. Method returns {ID, true} if there is
+// pending request or {ReqID(0), false} if queue's end has been reached.
+// It is part of implementation of RequestsManager interface.
+func (reqs *ReqsCollection) Next() (ReqID, bool) {
+ if reqs.iterating {
+ return reqs.queue.next()
+ }
+ panic("Should never call Next(), when not iterating")
+}
+
+// VerifyIfReady checks if the request is ready to be run on worker.
+// It is part of implementation of RequestsManager interface.
+func (reqs *ReqsCollection) VerifyIfReady(rid ReqID, now time.Time) bool {
+ req, ok := reqs.requests[rid]
+ return ok && req.State == WAIT && req.Deadline.After(now) && !req.ValidAfter.After(now)
+}
+
+// Get retrieves request's information structure for request with given ID.
+// It is part of implementation of RequestsManager interface.
+func (reqs *ReqsCollection) Get(rid ReqID) (ReqInfo, error) {
+ req, ok := reqs.requests[rid]
+ if !ok {
+ return ReqInfo{}, NotFoundError("Request")
+ }
+ return *req, nil
+}
+
+// Timeout sets request to TIMEOUT state after Deadline time is exceeded.
+// It is part of implementation of RequestsManager interface.
+func (reqs *ReqsCollection) Timeout(rid ReqID) error {
+ reqs.mutex.Lock()
+ defer reqs.mutex.Unlock()
+ req, ok := reqs.requests[rid]
+ if !ok {
+ return NotFoundError("Request")
+ }
+ if req.State != WAIT || req.Deadline.After(time.Now()) {
+ return ErrModificationForbidden
+ }
+ req.State = TIMEOUT
+ reqs.queue.removeRequest(req)
+ return nil
+}
+
+// Close verifies if request time has been exceeded and if so closes it.
+// If request is still valid to continue it's job an error is returned.
+// It is part of implementation of RequestsManager interface.
+func (reqs *ReqsCollection) Close(reqID ReqID) error {
+ reqs.mutex.Lock()
+ defer reqs.mutex.Unlock()
+ req, ok := reqs.requests[reqID]
+ if !ok {
+ return NotFoundError("Request")
+ }
+ if req.State != INPROGRESS {
+ return ErrModificationForbidden
+ }
+ if req.Job == nil {
+ // TODO log a critical logic error. Job should be assigned to the request
+ // in INPROGRESS state.
+ return ErrInternalLogicError
+ }
+ if req.Job.Timeout.After(time.Now()) {
+ // Request prolonged not yet ready to be closed because of timeout.
+ return ErrModificationForbidden
+ }
+
+ reqs.closeRequest(req)
+
+ return nil
+}
+
+// Run starts job performing the request on the worker.
+// It is part of implementation of RequestsManager interface.
+func (reqs *ReqsCollection) Run(rid ReqID, worker WorkerUUID) error {
+ req, ok := reqs.requests[rid]
+ if !ok {
+ return NotFoundError("Request")
+ }
+
+ if req.State != WAIT {
+ return ErrModificationForbidden
+ }
+ req.State = INPROGRESS
+
+ if reqs.iterating {
+ reqs.queue.releaseIterator()
+ reqs.iterating = false
+ }
+ reqs.queue.removeRequest(req)
+
+ // TODO(lwojciechow) assign req.Job.
+ return nil
+}
--- /dev/null
+/*
+ * Copyright (c) 2017-2018 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package requests
+
+import (
+ "time"
+
+ . "git.tizen.org/tools/boruta"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+var _ = Describe("Requests as RequestsManager", func() {
+ var R *ReqsCollection
+ BeforeEach(func() {
+ R = NewRequestQueue()
+ })
+ Describe("Iterations", func() {
+ var entered chan int
+ testMutex := func() {
+ R.mutex.Lock()
+ defer R.mutex.Unlock()
+ entered <- 1
+ }
+ BeforeEach(func() {
+ entered = make(chan int)
+ })
+ Describe("InitIteration", func() {
+ It("should init iterations and lock requests mutex", func() {
+ err := R.InitIteration()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(R.iterating).To(BeTrue())
+
+ // Verify that mutex is locked.
+ go testMutex()
+ Consistently(entered).ShouldNot(Receive())
+
+ // Release the mutex
+ R.mutex.Unlock()
+ Eventually(entered).Should(Receive())
+ })
+ It("should return error and remain mutex unlocked if iterations are already started", func() {
+ R.mutex.Lock()
+ R.iterating = true
+ R.mutex.Unlock()
+
+ err := R.InitIteration()
+ Expect(err).To(Equal(ErrInternalLogicError))
+
+ // Verify that mutex is not locked.
+ go testMutex()
+ Eventually(entered).Should(Receive())
+ })
+ })
+ Describe("TerminateIteration", func() {
+ It("should terminate iterations and unlock requests mutex", func() {
+ err := R.InitIteration()
+ Expect(err).NotTo(HaveOccurred())
+
+ R.TerminateIteration()
+ Expect(R.iterating).To(BeFalse())
+
+ // Verify that mutex is not locked.
+ go testMutex()
+ Eventually(entered).Should(Receive())
+ })
+ It("should just release mutex if iterations are not started", func() {
+ R.mutex.Lock()
+
+ R.TerminateIteration()
+
+ // Verify that mutex is not locked.
+ go testMutex()
+ Eventually(entered).Should(Receive())
+ })
+ })
+ })
+ Describe("Iterating over requests", func() {
+ verify := []ReqID{3, 5, 1, 2, 7, 4, 6}
+ BeforeEach(func() {
+ now := time.Now()
+ tomorrow := now.AddDate(0, 0, 1)
+ insert := func(p Priority) {
+ _, err := R.NewRequest(Capabilities{}, p, UserInfo{}, now, tomorrow)
+ Expect(err).NotTo(HaveOccurred())
+ }
+ insert(3) //1
+ insert(3) //2
+ insert(1) //3
+ insert(5) //4
+ insert(1) //5
+ insert(5) //6
+ insert(3) //7
+ })
+ It("should properly iterate over requests", func() {
+ reqs := make([]ReqID, 0)
+
+ R.InitIteration()
+ for r, ok := R.Next(); ok; r, ok = R.Next() {
+ reqs = append(reqs, r)
+ }
+ R.TerminateIteration()
+
+ Expect(reqs).To(Equal(verify))
+ })
+ It("should restart iterations in new critical section", func() {
+ for times := 0; times < len(verify); times++ {
+ reqs := make([]ReqID, 0)
+ i := 0
+ R.InitIteration()
+ for r, ok := R.Next(); ok && i < times; r, ok = R.Next() {
+ reqs = append(reqs, r)
+ i++
+ }
+ R.TerminateIteration()
+ Expect(reqs).To(Equal(verify[:times]))
+ }
+ })
+ It("should panic if Next is called without InitIteration", func() {
+ wrap := func() {
+ R.mutex.Lock()
+ defer R.mutex.Unlock()
+ R.Next()
+ }
+ Expect(wrap).To(Panic())
+ })
+ })
+ Describe("With request in the queue", func() {
+ var now, tomorrow time.Time
+ var req, noreq ReqID
+ var rinfo *ReqInfo
+ BeforeEach(func() {
+ now = time.Now()
+ tomorrow = now.AddDate(0, 0, 1)
+ var err error
+ req, err = R.NewRequest(Capabilities{}, 3, UserInfo{}, now, tomorrow)
+ Expect(err).NotTo(HaveOccurred())
+ var ok bool
+ rinfo, ok = R.requests[req]
+ Expect(ok).To(BeTrue())
+ noreq = req + 1
+ })
+ Describe("VerifyIfReady", func() {
+ It("should fail if reqID is unknown", func() {
+ Expect(R.VerifyIfReady(noreq, now)).To(BeFalse())
+ })
+ It("should fail if state is not WAIT", func() {
+ states := []ReqState{INPROGRESS, CANCEL, TIMEOUT, INVALID, DONE, FAILED}
+ for _, s := range states {
+ rinfo.State = s
+ Expect(R.VerifyIfReady(req, now)).To(BeFalse(), "state = %v", s)
+ }
+ })
+ It("should fail if Deadline is reached or passed", func() {
+ Expect(R.VerifyIfReady(req, tomorrow.Add(-time.Hour))).To(BeTrue())
+ Expect(R.VerifyIfReady(req, tomorrow)).To(BeFalse())
+ Expect(R.VerifyIfReady(req, tomorrow.Add(time.Hour))).To(BeFalse())
+ })
+ It("should fail if ValidAfter is in future", func() {
+ Expect(R.VerifyIfReady(req, now.Add(-time.Hour))).To(BeFalse())
+ Expect(R.VerifyIfReady(req, now)).To(BeTrue())
+ Expect(R.VerifyIfReady(req, now.Add(time.Hour))).To(BeTrue())
+ })
+ It("should succeed if request is known, in WAIT state and now is between ValidAfter and Deadline", func() {
+ Expect(R.VerifyIfReady(req, now.Add(12*time.Hour))).To(BeTrue())
+ })
+ })
+ Describe("Get", func() {
+ It("should fail if reqID is unknown", func() {
+ r, err := R.Get(noreq)
+ Expect(err).To(Equal(NotFoundError("Request")))
+ Expect(r).To(Equal(ReqInfo{}))
+ })
+ It("should succeed if reqID is valid", func() {
+ r, err := R.Get(req)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(r).To(Equal(*rinfo))
+ })
+ })
+ Describe("Timeout", func() {
+ It("should fail if reqID is unknown", func() {
+ Expect(R.queue.length).To(Equal(uint(1)))
+ err := R.Timeout(noreq)
+ Expect(err).To(Equal(NotFoundError("Request")))
+ Expect(R.queue.length).To(Equal(uint(1)))
+ })
+ It("should fail if request is not in WAIT state", func() {
+ rinfo.Deadline = now.Add(-time.Hour)
+ Expect(R.queue.length).To(Equal(uint(1)))
+ states := []ReqState{INPROGRESS, CANCEL, TIMEOUT, INVALID, DONE, FAILED}
+ for _, s := range states {
+ rinfo.State = s
+ err := R.Timeout(req)
+ Expect(err).To(Equal(ErrModificationForbidden), "state = %v", s)
+ Expect(R.queue.length).To(Equal(uint(1)), "state = %v", s)
+ }
+ })
+ It("should fail if deadline is in the future", func() {
+ Expect(R.queue.length).To(Equal(uint(1)))
+ err := R.Timeout(req)
+ Expect(err).To(Equal(ErrModificationForbidden))
+ Expect(R.queue.length).To(Equal(uint(1)))
+ })
+ It("should pass if deadline is past", func() {
+ rinfo.Deadline = now.Add(-time.Hour)
+ Expect(R.queue.length).To(Equal(uint(1)))
+ err := R.Timeout(req)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(rinfo.State).To(Equal(TIMEOUT))
+ Expect(R.queue.length).To(BeZero())
+ })
+ })
+ Describe("Close", func() {
+ It("should fail if reqID is unknown", func() {
+ err := R.Close(noreq)
+ Expect(err).To(Equal(NotFoundError("Request")))
+ })
+ It("should fail if request is not in INPROGRESS state", func() {
+ states := []ReqState{WAIT, CANCEL, TIMEOUT, INVALID, DONE, FAILED}
+ for _, state := range states {
+ R.mutex.Lock()
+ rinfo.State = state
+ R.mutex.Unlock()
+
+ err := R.Close(req)
+ Expect(err).To(Equal(ErrModificationForbidden), "state = %s", state)
+ }
+ })
+ It("should fail if request has no job assigned", func() {
+ R.mutex.Lock()
+ rinfo.State = INPROGRESS
+ Expect(rinfo.Job).To(BeNil())
+ R.mutex.Unlock()
+
+ err := R.Close(req)
+ Expect(err).To(Equal(ErrInternalLogicError))
+ })
+ It("should fail if job's is not yet timed out", func() {
+ R.mutex.Lock()
+ rinfo.State = INPROGRESS
+ rinfo.Job = &JobInfo{
+ Timeout: time.Now().AddDate(0, 0, 1),
+ }
+ R.mutex.Unlock()
+
+ err := R.Close(req)
+ Expect(err).To(Equal(ErrModificationForbidden))
+ })
+ It("should close request and release worker", func() {
+ R.mutex.Lock()
+ rinfo.State = INPROGRESS
+ rinfo.Job = &JobInfo{
+ Timeout: time.Now().AddDate(0, 0, -1),
+ }
+ R.mutex.Unlock()
+
+ err := R.Close(req)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(rinfo.State).To(Equal(DONE))
+ // TODO verify releasing worker when implemented
+ })
+ })
+ Describe("Run", func() {
+ It("should fail if reqID is unknown", func() {
+ R.mutex.Lock()
+ defer R.mutex.Unlock()
+ Expect(R.queue.length).To(Equal(uint(1)))
+ err := R.Run(noreq, WorkerUUID("TestWorker"))
+ Expect(err).To(Equal(NotFoundError("Request")))
+ Expect(R.queue.length).To(Equal(uint(1)))
+ })
+ It("should fail if request is not in WAIT state", func() {
+ states := []ReqState{INPROGRESS, CANCEL, TIMEOUT, INVALID, DONE, FAILED}
+ for _, state := range states {
+ R.InitIteration()
+ Expect(R.queue.length).To(Equal(uint(1)), "state = %s", state)
+ rinfo.State = state
+ err := R.Run(req, WorkerUUID("TestWorker"))
+ Expect(err).To(Equal(ErrModificationForbidden), "state = %s", state)
+ Expect(R.queue.length).To(Equal(uint(1)), "state = %s", state)
+ R.TerminateIteration()
+ }
+ })
+ It("should fail if reqID is unknown during iteration", func() {
+ R.InitIteration()
+ defer R.TerminateIteration()
+ Expect(R.iterating).To(BeTrue())
+ Expect(R.queue.length).To(Equal(uint(1)))
+ err := R.Run(noreq, WorkerUUID("TestWorker"))
+ Expect(err).To(Equal(NotFoundError("Request")))
+ Expect(R.iterating).To(BeTrue())
+ Expect(R.queue.length).To(Equal(uint(1)))
+ })
+ It("should start progress for valid reqID", func() {
+ R.InitIteration()
+ defer R.TerminateIteration()
+ Expect(R.queue.length).To(Equal(uint(1)))
+ err := R.Run(req, WorkerUUID("TestWorker"))
+ Expect(err).NotTo(HaveOccurred())
+ Expect(rinfo.State).To(Equal(INPROGRESS))
+ Expect(R.queue.length).To(BeZero())
+ })
+ It("should start progress and break iterations when iterating", func() {
+ R.InitIteration()
+ defer R.TerminateIteration()
+ Expect(R.queue.length).To(Equal(uint(1)))
+ Expect(R.iterating).To(BeTrue())
+ err := R.Run(req, WorkerUUID("TestWorker"))
+ Expect(err).NotTo(HaveOccurred())
+ Expect(rinfo.State).To(Equal(INPROGRESS))
+ Expect(R.iterating).To(BeFalse())
+ Expect(R.queue.length).To(BeZero())
+ })
+ // TODO use and verify Job when Run's implementation is complete.
+ })
+ })
+})