7 // coalescer is a simple interface that must be implemented to be
8 // used inside of a coalesceLoop
9 type coalescer interface {
10 // Can the coalescer handle this event, if not it is
11 // directly passed through to the destination channel
14 // Invoked to coalesce the given event
17 // Invoked to flush the coalesced events
18 Flush(outChan chan<- Event)
21 // coalescedEventCh returns an event channel where the events are coalesced
22 // using the given coalescer.
23 func coalescedEventCh(outCh chan<- Event, shutdownCh <-chan struct{},
24 cPeriod time.Duration, qPeriod time.Duration, c coalescer) chan<- Event {
25 inCh := make(chan Event, 1024)
26 go coalesceLoop(inCh, outCh, shutdownCh, cPeriod, qPeriod, c)
30 // coalesceLoop is a simple long-running routine that manages the high-level
31 // flow of coalescing based on quiescence and a maximum quantum period.
32 func coalesceLoop(inCh <-chan Event, outCh chan<- Event, shutdownCh <-chan struct{},
33 coalescePeriod time.Duration, quiescentPeriod time.Duration, c coalescer) {
34 var quiescent <-chan time.Time
35 var quantum <-chan time.Time
46 // Ignore any non handled events
52 // Start a new quantum if we need to
53 // and restart the quiescent timer
55 quantum = time.After(coalescePeriod)
57 quiescent = time.After(quiescentPeriod)
73 // Flush the coalesced events
76 // Restart ingestion if we are not done