Tizen_4.0 base
[platform/upstream/docker-engine.git] / vendor / github.com / hashicorp / serf / serf / coalesce.go
1 package serf
2
3 import (
4         "time"
5 )
6
7 // coalescer is a simple interface that must be implemented to be
8 // used inside of a coalesceLoop
9 type coalescer interface {
10         // Can the coalescer handle this event, if not it is
11         // directly passed through to the destination channel
12         Handle(Event) bool
13
14         // Invoked to coalesce the given event
15         Coalesce(Event)
16
17         // Invoked to flush the coalesced events
18         Flush(outChan chan<- Event)
19 }
20
21 // coalescedEventCh returns an event channel where the events are coalesced
22 // using the given coalescer.
23 func coalescedEventCh(outCh chan<- Event, shutdownCh <-chan struct{},
24         cPeriod time.Duration, qPeriod time.Duration, c coalescer) chan<- Event {
25         inCh := make(chan Event, 1024)
26         go coalesceLoop(inCh, outCh, shutdownCh, cPeriod, qPeriod, c)
27         return inCh
28 }
29
30 // coalesceLoop is a simple long-running routine that manages the high-level
31 // flow of coalescing based on quiescence and a maximum quantum period.
32 func coalesceLoop(inCh <-chan Event, outCh chan<- Event, shutdownCh <-chan struct{},
33         coalescePeriod time.Duration, quiescentPeriod time.Duration, c coalescer) {
34         var quiescent <-chan time.Time
35         var quantum <-chan time.Time
36         shutdown := false
37
38 INGEST:
39         // Reset the timers
40         quantum = nil
41         quiescent = nil
42
43         for {
44                 select {
45                 case e := <-inCh:
46                         // Ignore any non handled events
47                         if !c.Handle(e) {
48                                 outCh <- e
49                                 continue
50                         }
51
52                         // Start a new quantum if we need to
53                         // and restart the quiescent timer
54                         if quantum == nil {
55                                 quantum = time.After(coalescePeriod)
56                         }
57                         quiescent = time.After(quiescentPeriod)
58
59                         // Coalesce the event
60                         c.Coalesce(e)
61
62                 case <-quantum:
63                         goto FLUSH
64                 case <-quiescent:
65                         goto FLUSH
66                 case <-shutdownCh:
67                         shutdown = true
68                         goto FLUSH
69                 }
70         }
71
72 FLUSH:
73         // Flush the coalesced events
74         c.Flush(outCh)
75
76         // Restart ingestion if we are not done
77         if !shutdown {
78                 goto INGEST
79         }
80 }