Add requestTimes with tests
authorLukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Wed, 13 Sep 2017 18:02:18 +0000 (20:02 +0200)
committerLukasz Wojciechowski <l.wojciechow@partner.samsung.com>
Fri, 27 Apr 2018 15:14:51 +0000 (17:14 +0200)
requestTimes collects requestTime objects and notifies registered
matcher.Matcher when the time comes. Past times are removed from
collection. timesHeap is used for storing requestTime objects.
Notifications are called asynchronously from dedicated goroutine.
requestTimes will be used by requests package for monitoring ValidAfter,
Deadline and Timeout time of requests.

Change-Id: Ib27608f89057791bb1d615ec881ba2c06512f6f0
Signed-off-by: Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
requests/times.go [new file with mode: 0644]
requests/times_test.go [new file with mode: 0644]

diff --git a/requests/times.go b/requests/times.go
new file mode 100644 (file)
index 0000000..456f23f
--- /dev/null
@@ -0,0 +1,150 @@
+/*
+ *  Copyright (c) 2017-2018 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+// File requests/times.go provides requestTimes structure, which collects
+// time.Time objects associated with requests and notifies registered
+// matcher.Matcher when proper time comes. Past times are removed from collection.
+// timesHeap is used for storing time.Time objects. Notifications are called
+// asynchronously from dedicated goroutine.
+
+package requests
+
+import (
+       "sync"
+       "time"
+
+       . "git.tizen.org/tools/boruta"
+       "git.tizen.org/tools/boruta/matcher"
+)
+
+// requestTimes collects requestTime entities and notifies registered
+// matcher.Matcher when the time comes.
+type requestTimes struct {
+       times   *timesHeap      // stores requestTime entities collection.
+       timer   *time.Timer     // set for earliest time in the collection.
+       matcher matcher.Matcher // notified when time is reached.
+       mutex   *sync.Mutex     // synchronizes internal goroutine.
+       stop    chan bool       // stops internal goroutine.
+       done    sync.WaitGroup  // waits for internal goroutine to finish.
+}
+
+// newRequestTimes creates and initializes new requestTimes structure.
+// It runs internal goroutine. finish() method should be used for clearing object
+// and stopping internal goroutine.
+func newRequestTimes() *requestTimes {
+       farFuture := time.Now().AddDate(100, 0, 0)
+       rt := &requestTimes{
+               times: newTimesHeap(),
+               timer: time.NewTimer(time.Until(farFuture)),
+               mutex: new(sync.Mutex),
+               stop:  make(chan bool),
+       }
+       rt.done.Add(1)
+       go rt.loop()
+       return rt
+}
+
+// finish clears requestTimes object and stops internal goroutine. It should be
+// called exactly once. Object cannot be used after calling this method anymore.
+func (rt *requestTimes) finish() {
+       // Stop timer.
+       rt.timer.Stop()
+
+       // Break loop goroutine and wait until it's done.
+       close(rt.stop)
+       rt.done.Wait()
+}
+
+// loop is the main procedure of the internal goroutine. It waits for either timer
+// event or being stopped by Finish.
+func (rt *requestTimes) loop() {
+       defer rt.done.Done()
+       for {
+               // get event from timer
+               select {
+               case t := <-rt.timer.C:
+                       rt.process(t)
+               case <-rt.stop:
+                       return
+               }
+       }
+}
+
+// process notifies registered matcher, removes all past times from the collection
+// and sets up timer for earliest time from the collection.
+func (rt *requestTimes) process(t time.Time) {
+       rt.mutex.Lock()
+       defer rt.mutex.Unlock()
+
+       // Remove all past times. It might happen that the same point in time is
+       // added multiple times or that deltas between added values are so small,
+       // that in the time of processing some of next pending points in time
+       // are already in the past. There is no need to set timer for them,
+       // as it will return immediately. So all past times must be removed
+       // and timer set to earliest future time.
+       past := make([]ReqID, 0)
+       for rt.times.Len() > 0 && t.After(rt.minTime()) {
+               x := rt.times.Pop()
+               past = append(past, x.req)
+       }
+
+       // Notify matcher (if one is registered).
+       if rt.matcher != nil {
+               rt.matcher.Notify(past)
+       }
+
+       // Set up timer to earliest pending time.
+       if rt.times.Len() > 0 {
+               rt.reset()
+       }
+}
+
+// insert adds time to the collection and possibly restarts timer if required.
+func (rt *requestTimes) insert(t requestTime) {
+       rt.mutex.Lock()
+       defer rt.mutex.Unlock()
+
+       rt.times.Push(t)
+
+       // If inserted time is minimal (first element of the heap) timer needs to be
+       // restarted.
+       if rt.minTime().Equal(t.time) {
+               rt.timer.Stop()
+               rt.reset()
+       }
+}
+
+// setMatcher registers object implementing Matcher interface. Registered object
+// is notified, when collected times pass.
+func (rt *requestTimes) setMatcher(m matcher.Matcher) {
+       rt.mutex.Lock()
+       defer rt.mutex.Unlock()
+
+       rt.matcher = m
+}
+
+// minTime is a helper function extracting minimal time from the heap. This method
+// should be called in requestTimes mutex protected critical section.
+// Method panics if called on empty collection.
+func (rt *requestTimes) minTime() time.Time {
+       return rt.times.Min().time
+}
+
+// reset is the helper function for resetting stopped timer to the time of next event.
+// This method should be called in requestTimes mutex protected critical section.
+func (rt *requestTimes) reset() {
+       rt.timer.Reset(time.Until(rt.minTime()))
+}
diff --git a/requests/times_test.go b/requests/times_test.go
new file mode 100644 (file)
index 0000000..0910ef1
--- /dev/null
@@ -0,0 +1,243 @@
+/*
+ *  Copyright (c) 2017-2018 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+package requests
+
+import (
+       "runtime"
+       "runtime/debug"
+       "time"
+
+       . "git.tizen.org/tools/boruta"
+       . "github.com/onsi/ginkgo"
+       . "github.com/onsi/gomega"
+)
+
+type TestMatcher struct {
+       Counter  int
+       Notified []ReqID
+}
+
+func (m *TestMatcher) Notify(reqs []ReqID) {
+       m.Counter++
+       if m.Notified == nil {
+               m.Notified = make([]ReqID, 0)
+       }
+       m.Notified = append(m.Notified, reqs...)
+}
+
+var _ = Describe("Times", func() {
+       loopRoutineName := "git.tizen.org/tools/boruta/requests.(*requestTimes).loop"
+       debug.SetGCPercent(1)
+       var t *requestTimes
+       var baseCount int
+
+       countGoRoutine := func(name string) int {
+               runtime.GC()
+
+               counter := 0
+               p := make([]runtime.StackRecord, 1)
+               n, _ := runtime.GoroutineProfile(p)
+               p = make([]runtime.StackRecord, 2*n)
+               runtime.GoroutineProfile(p)
+               for _, s := range p {
+                       for _, f := range s.Stack() {
+                               if f != 0 {
+                                       if runtime.FuncForPC(f).Name() == name {
+                                               counter++
+                                       }
+                               }
+                       }
+               }
+               return counter
+       }
+       getLen := func() int {
+               t.mutex.Lock()
+               defer t.mutex.Unlock()
+               return t.times.Len()
+       }
+       getMin := func() requestTime {
+               t.mutex.Lock()
+               defer t.mutex.Unlock()
+               return t.times.Min()
+       }
+       prepareRequestTime := func(after time.Duration, req ReqID) requestTime {
+               d := time.Duration(after)
+               n := time.Now().Add(d)
+               return requestTime{time: n, req: req}
+       }
+
+       BeforeEach(func() {
+               baseCount = countGoRoutine(loopRoutineName)
+               t = newRequestTimes()
+       })
+       AfterEach(func() {
+               if t != nil {
+                       t.finish()
+               }
+               Expect(countGoRoutine(loopRoutineName)).To(Equal(baseCount))
+       })
+       Describe("newRequestTimes", func() {
+               It("should init all fields", func() {
+                       Expect(t).NotTo(BeNil(), "t")
+                       Expect(t.times).NotTo(BeNil())
+                       Expect(t.times.Len()).To(BeZero())
+                       Expect(t.timer).NotTo(BeNil())
+                       Expect(t.matcher).To(BeNil())
+                       Expect(t.mutex).NotTo(BeNil())
+                       Expect(t.stop).NotTo(BeClosed())
+                       Expect(countGoRoutine(loopRoutineName)).To(Equal(baseCount + 1))
+               })
+               It("should create separate object in every call", func() {
+                       t2 := newRequestTimes()
+                       defer t2.finish()
+
+                       Expect(t).NotTo(BeIdenticalTo(t2))
+                       Expect(t.times).NotTo(BeIdenticalTo(t2.times))
+                       Expect(t.timer).NotTo(BeIdenticalTo(t2.timer))
+                       Expect(t.mutex).NotTo(BeIdenticalTo(t2.mutex))
+                       Expect(t.stop).NotTo(BeIdenticalTo(t2.stop))
+                       Expect(countGoRoutine(loopRoutineName)).To(Equal(baseCount + 2))
+               })
+       })
+       Describe("finish", func() {
+               It("should work with unused empty structure", func() {
+                       t.finish()
+
+                       Expect(t.stop).To(BeClosed())
+                       Expect(countGoRoutine(loopRoutineName)).To(Equal(baseCount))
+                       // Avoid extra finish in AfterEach.
+                       t = nil
+               })
+               It("should work when times heap is not empty", func() {
+                       t.insert(prepareRequestTime(time.Minute, 1))
+                       t.insert(prepareRequestTime(time.Hour, 2))
+                       t.insert(prepareRequestTime(0, 3))
+
+                       t.finish()
+
+                       Expect(t.stop).To(BeClosed())
+                       Expect(countGoRoutine(loopRoutineName)).To(Equal(baseCount))
+                       // Avoid extra finish in AfterEach.
+                       t = nil
+               })
+       })
+       Describe("insert", func() {
+               It("should insert single time", func() {
+                       r100m := prepareRequestTime(100*time.Millisecond, 100)
+
+                       t.insert(r100m)
+                       Expect(getLen()).To(Equal(1))
+                       Expect(getMin()).To(Equal(r100m))
+
+                       Eventually(getLen).Should(BeZero())
+               })
+               It("should insert multiple times", func() {
+                       r100m := prepareRequestTime(100*time.Millisecond, 100)
+                       r200m := prepareRequestTime(200*time.Millisecond, 200)
+                       r500m := prepareRequestTime(500*time.Millisecond, 500)
+                       r800m := prepareRequestTime(800*time.Millisecond, 800)
+
+                       t.insert(r100m)
+                       t.insert(r200m)
+                       t.insert(r100m)
+                       t.insert(r800m)
+                       Expect(getLen()).To(Equal(4))
+                       Expect(getMin()).To(Equal(r100m))
+
+                       // Expect process() to remove 2 elements after 100 ms [100 ms].
+                       Eventually(getLen).Should(Equal(2))
+                       Expect(getMin()).To(Equal(r200m))
+
+                       // Expect process() to remove 1 element after another 100 ms [200 ms].
+                       Eventually(getLen).Should(Equal(1))
+                       Expect(getMin()).To(Equal(r800m))
+
+                       t.insert(r500m)
+                       Expect(getLen()).To(Equal(2))
+                       Expect(getMin()).To(Equal(r500m))
+
+                       // Expect process() to remove 1 element after another 300 ms [500 ms].
+                       Eventually(getLen).Should(Equal(1))
+                       Expect(getMin()).To(Equal(r800m))
+
+                       // Expect process() to remove 1 element after another 300 ms [800 ms].
+                       Eventually(getLen).Should(BeZero())
+               })
+       })
+       Describe("setMatcher", func() {
+               It("should set matcher", func() {
+                       var m TestMatcher
+
+                       Expect(t.matcher).To(BeNil())
+                       t.setMatcher(&m)
+                       Expect(t.matcher).To(Equal(&m))
+                       Expect(m.Counter).To(BeZero())
+               })
+               It("should notify matcher", func() {
+                       var m TestMatcher
+                       t.setMatcher(&m)
+
+                       rid := ReqID(100)
+                       t.insert(prepareRequestTime(100*time.Millisecond, rid))
+
+                       Expect(m.Counter).To(BeZero())
+                       Expect(m.Notified).To(BeNil())
+
+                       // Expect process() to remove 1 element after 100 ms [100 ms].
+                       Eventually(getLen).Should(BeZero())
+                       Expect(m.Counter).To(Equal(1))
+                       Expect(len(m.Notified)).To(Equal(1))
+                       Expect(m.Notified).To(ContainElement(rid))
+
+               })
+       })
+       Describe("process", func() {
+               It("should be run once for same times", func() {
+                       var m TestMatcher
+                       r100m := prepareRequestTime(100*time.Millisecond, 0)
+                       reqs := []ReqID{101, 102, 103, 104, 105}
+
+                       t.setMatcher(&m)
+                       for _, r := range reqs {
+                               r100m.req = r
+                               t.insert(r100m)
+                       }
+                       Expect(m.Counter).To(BeZero())
+
+                       // Expect process() to remove all elements after 100 ms [100 ms].
+                       Eventually(getLen).Should(BeZero())
+                       Expect(m.Counter).To(Equal(1))
+                       Expect(len(m.Notified)).To(Equal(len(reqs)))
+                       Expect(m.Notified).To(ConsistOf(reqs))
+               })
+       })
+       Describe("past time", func() {
+               It("should handle times in the past properly", func() {
+                       var m TestMatcher
+                       t.setMatcher(&m)
+
+                       rid := ReqID(200)
+                       t.insert(prepareRequestTime(-time.Hour, rid))
+
+                       // Expect process() to remove element.
+                       Eventually(getLen).Should(BeZero())
+                       Expect(m.Counter).To(Equal(1))
+                       Expect(len(m.Notified)).To(Equal(1))
+                       Expect(m.Notified).To(ContainElement(rid))
+               })
+       })
+})