Imported Upstream version 2.4.0
[scm/test.git] / tasklog / log.go
1 package tasklog
2
3 import (
4         "fmt"
5         "io"
6         "io/ioutil"
7         "strings"
8         "sync"
9         "time"
10
11         "github.com/olekukonko/ts"
12 )
13
14 const (
15         DefaultLoggingThrottle = 200 * time.Millisecond
16 )
17
18 // Logger logs a series of tasks to an io.Writer, processing each task in order
19 // until completion .
20 type Logger struct {
21         // sink is the writer to write to.
22         sink io.Writer
23
24         // widthFn is a function that returns the width of the terminal that
25         // this logger is running within.
26         widthFn func() int
27
28         // throttle is the minimum amount of time that must pass between each
29         // instant data is logged.
30         throttle time.Duration
31
32         // queue is the incoming, unbuffered queue of tasks to enqueue.
33         queue chan Task
34         // tasks is the set of tasks to process.
35         tasks chan Task
36         // wg is a WaitGroup that is incremented when new tasks are enqueued,
37         // and decremented when tasks finish.
38         wg *sync.WaitGroup
39 }
40
41 // NewLogger retuns a new *Logger instance that logs to "sink" and uses the
42 // current terminal width as the width of the line.
43 func NewLogger(sink io.Writer) *Logger {
44         if sink == nil {
45                 sink = ioutil.Discard
46         }
47
48         l := &Logger{
49                 sink:     sink,
50                 throttle: DefaultLoggingThrottle,
51                 widthFn: func() int {
52                         size, err := ts.GetSize()
53                         if err != nil {
54                                 return 80
55                         }
56                         return size.Col()
57                 },
58                 queue: make(chan Task),
59                 tasks: make(chan Task),
60                 wg:    new(sync.WaitGroup),
61         }
62
63         go l.consume()
64
65         return l
66 }
67
68 // Close closes the queue and does not allow new Tasks to be `enqueue()`'d. It
69 // waits until the currently running Task has completed.
70 func (l *Logger) Close() {
71         if l == nil {
72                 return
73         }
74
75         close(l.queue)
76
77         l.wg.Wait()
78 }
79
80 // Waitier creates and enqueues a new *WaitingTask.
81 func (l *Logger) Waiter(msg string) *WaitingTask {
82         t := NewWaitingTask(msg)
83         l.Enqueue(t)
84
85         return t
86 }
87
88 // Percentage creates and enqueues a new *PercentageTask.
89 func (l *Logger) Percentage(msg string, total uint64) *PercentageTask {
90         t := NewPercentageTask(msg, total)
91         l.Enqueue(t)
92
93         return t
94 }
95
96 // List creates and enqueues a new *ListTask.
97 func (l *Logger) List(msg string) *ListTask {
98         t := NewListTask(msg)
99         l.Enqueue(t)
100
101         return t
102 }
103
104 // List creates and enqueues a new *SimpleTask.
105 func (l *Logger) Simple() *SimpleTask {
106         t := NewSimpleTask()
107         l.Enqueue(t)
108
109         return t
110 }
111
112 // Enqueue enqueues the given Tasks "ts".
113 func (l *Logger) Enqueue(ts ...Task) {
114         if l == nil {
115                 for _, t := range ts {
116                         if t == nil {
117                                 // NOTE: Do not allow nil tasks which are unable
118                                 // to be completed.
119                                 continue
120                         }
121                         go func(t Task) {
122                                 for range t.Updates() {
123                                         // Discard all updates.
124                                 }
125                         }(t)
126                 }
127                 return
128         }
129
130         l.wg.Add(len(ts))
131         for _, t := range ts {
132                 if t == nil {
133                         // NOTE: See above.
134                         continue
135                 }
136                 l.queue <- t
137         }
138 }
139
140 // consume creates a pseudo-infinte buffer between the incoming set of tasks and
141 // the queue of tasks to work on.
142 func (l *Logger) consume() {
143         go func() {
144                 // Process the single next task in sequence until completion,
145                 // then consume the next task.
146                 for task := range l.tasks {
147                         l.logTask(task)
148                 }
149         }()
150
151         defer close(l.tasks)
152
153         pending := make([]Task, 0)
154
155         for {
156                 // If there is a pending task, "peek" it off of the set of
157                 // pending tasks.
158                 var next Task
159                 if len(pending) > 0 {
160                         next = pending[0]
161                 }
162
163                 if next == nil {
164                         // If there was no pending task, wait for either a)
165                         // l.queue to close, or b) a new task to be submitted.
166                         task, ok := <-l.queue
167                         if !ok {
168                                 // If the queue is closed, no more new tasks may
169                                 // be added.
170                                 return
171                         }
172
173                         // Otherwise, add a new task to the set of tasks to
174                         // process immediately, since there is no current
175                         // buffer.
176                         l.tasks <- task
177                 } else {
178                         // If there is a pending task, wait for either a) a
179                         // write to process the task to become non-blocking, or
180                         // b) a new task to enter the queue.
181                         select {
182                         case task, ok := <-l.queue:
183                                 if !ok {
184                                         // If the queue is closed, no more tasks
185                                         // may be added.
186                                         return
187                                 }
188                                 // Otherwise, add the next task to the set of
189                                 // pending, active tasks.
190                                 pending = append(pending, task)
191                         case l.tasks <- next:
192                                 // Or "pop" the peeked task off of the pending
193                                 // set.
194                                 pending = pending[1:]
195                         }
196                 }
197         }
198 }
199
200 // logTask logs the set of updates from a given task to the sink, then logs a
201 // "done" message, and then marks the task as done.
202 //
203 // By default, the *Logger throttles log entry updates to once per the duration
204 // of time specified by `l.throttle time.Duration`.
205 //
206 // If the duration if 0, or the task is "durable" (by implementing
207 // github.com/git-lfs/git-lfs/tasklog#DurableTask), then all entries will be
208 // logged.
209 func (l *Logger) logTask(task Task) {
210         defer l.wg.Done()
211
212         logAll := !task.Throttled()
213         var last time.Time
214
215         var update *Update
216         for update = range task.Updates() {
217                 if logAll || l.throttle == 0 || !update.Throttled(last.Add(l.throttle)) {
218                         l.logLine(update.S)
219                         last = update.At
220                 }
221         }
222
223         if update != nil {
224                 // If a task sent no updates, the last recorded update will be
225                 // nil. Given this, only log a message when there was at least
226                 // (1) update.
227                 l.log(fmt.Sprintf("%s, done\n", update.S))
228         }
229
230         if v, ok := task.(interface {
231                 // OnComplete is called after the Task "task" is closed, but
232                 // before new tasks are accepted.
233                 OnComplete()
234         }); ok {
235                 // If the Task implements this interface, call it and block
236                 // before accepting new tasks.
237                 v.OnComplete()
238         }
239 }
240
241 // logLine writes a complete line and moves the cursor to the beginning of the
242 // line.
243 //
244 // It returns the number of bytes "n" written to the sink and the error "err",
245 // if one was encountered.
246 func (l *Logger) logLine(str string) (n int, err error) {
247         padding := strings.Repeat(" ", maxInt(0, l.widthFn()-len(str)))
248
249         return l.log(str + padding + "\r")
250 }
251
252 // log writes a string verbatim to the sink.
253 //
254 // It returns the number of bytes "n" written to the sink and the error "err",
255 // if one was encountered.
256 func (l *Logger) log(str string) (n int, err error) {
257         return fmt.Fprint(l.sink, str)
258 }
259
260 func maxInt(a, b int) int {
261         if a > b {
262                 return a
263         }
264         return b
265 }