81367ceb77215ae0ef51cbb296e0c63cfaacfabc
[scm/test.git] / tq / transfer_queue.go
1 package tq
2
3 import (
4         "os"
5         "sort"
6         "sync"
7
8         "github.com/git-lfs/git-lfs/errors"
9         "github.com/git-lfs/git-lfs/git"
10         "github.com/git-lfs/git-lfs/lfsapi"
11         "github.com/git-lfs/git-lfs/tools"
12         "github.com/rubyist/tracerx"
13 )
14
15 const (
16         defaultBatchSize = 100
17 )
18
19 type retryCounter struct {
20         MaxRetries int `git:"lfs.transfer.maxretries"`
21
22         // cmu guards count
23         cmu sync.Mutex
24         // count maps OIDs to number of retry attempts
25         count map[string]int
26 }
27
28 // newRetryCounter instantiates a new *retryCounter. It parses the gitconfig
29 // value: `lfs.transfer.maxretries`, and falls back to defaultMaxRetries if none
30 // was provided.
31 //
32 // If it encountered an error in Unmarshaling the *config.Configuration, it will
33 // be returned, otherwise nil.
34 func newRetryCounter() *retryCounter {
35         return &retryCounter{
36                 MaxRetries: defaultMaxRetries,
37                 count:      make(map[string]int),
38         }
39 }
40
41 // Increment increments the number of retries for a given OID. It is safe to
42 // call across multiple goroutines.
43 func (r *retryCounter) Increment(oid string) {
44         r.cmu.Lock()
45         defer r.cmu.Unlock()
46
47         r.count[oid]++
48 }
49
50 // CountFor returns the current number of retries for a given OID. It is safe to
51 // call across multiple goroutines.
52 func (r *retryCounter) CountFor(oid string) int {
53         r.cmu.Lock()
54         defer r.cmu.Unlock()
55
56         return r.count[oid]
57 }
58
59 // CanRetry returns the current number of retries, and whether or not it exceeds
60 // the maximum number of retries (see: retryCounter.MaxRetries).
61 func (r *retryCounter) CanRetry(oid string) (int, bool) {
62         count := r.CountFor(oid)
63         return count, count < r.MaxRetries
64 }
65
66 // batch implements the sort.Interface interface and enables sorting on a slice
67 // of `*Transfer`s by object size.
68 //
69 // This interface is implemented here so that the largest objects can be
70 // processed first. Since adding a new batch is unable to occur until the
71 // current batch has finished processing, this enables us to reduce the risk of
72 // a single worker getting tied up on a large item at the end of a batch while
73 // all other workers are sitting idle.
74 type batch []*objectTuple
75
76 // Concat concatenates two batches together, returning a single, clamped batch as
77 // "left", and the remainder of elements as "right". If the union of the
78 // receiver and "other" has cardinality less than "size", "right" will be
79 // returned as nil.
80 func (b batch) Concat(other batch, size int) (left, right batch) {
81         u := batch(append(b, other...))
82         if len(u) <= size {
83                 return u, nil
84         }
85         return u[:size], u[size:]
86 }
87
88 func (b batch) ToTransfers() []*Transfer {
89         transfers := make([]*Transfer, 0, len(b))
90         for _, t := range b {
91                 transfers = append(transfers, &Transfer{Oid: t.Oid, Size: t.Size})
92         }
93         return transfers
94 }
95
96 func (b batch) Len() int           { return len(b) }
97 func (b batch) Less(i, j int) bool { return b[i].Size < b[j].Size }
98 func (b batch) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }
99
100 // TransferQueue organises the wider process of uploading and downloading,
101 // including calling the API, passing the actual transfer request to transfer
102 // adapters, and dealing with progress, errors and retries.
103 type TransferQueue struct {
104         direction         Direction
105         client            *tqClient
106         remote            string
107         ref               *git.Ref
108         adapter           Adapter
109         adapterInProgress bool
110         adapterInitMutex  sync.Mutex
111         dryRun            bool
112         cb                tools.CopyCallback
113         meter             *Meter
114         errors            []error
115         transfers         map[string]*objects
116         batchSize         int
117         bufferDepth       int
118         incoming          chan *objectTuple // Channel for processing incoming items
119         errorc            chan error        // Channel for processing errors
120         watchers          []chan *Transfer
121         trMutex           *sync.Mutex
122         collectorWait     sync.WaitGroup
123         errorwait         sync.WaitGroup
124         // wait is used to keep track of pending transfers. It is incremented
125         // once per unique OID on Add(), and is decremented when that transfer
126         // is marked as completed or failed, but not retried.
127         wait     sync.WaitGroup
128         manifest *Manifest
129         rc       *retryCounter
130 }
131
132 // objects holds a set of objects.
133 type objects struct {
134         completed bool
135         objects   []*objectTuple
136 }
137
138 // All returns all *objectTuple's contained in the *objects set.
139 func (s *objects) All() []*objectTuple {
140         return s.objects
141 }
142
143 // Append returns a new *objects with the given *objectTuple(s) appended to the
144 // end of the known objects.
145 func (s *objects) Append(os ...*objectTuple) *objects {
146         return &objects{
147                 completed: s.completed,
148                 objects:   append(s.objects, os...),
149         }
150 }
151
152 // First returns the first *objectTuple in the chain of objects.
153 func (s *objects) First() *objectTuple {
154         if len(s.objects) == 0 {
155                 return nil
156         }
157         return s.objects[0]
158 }
159
160 type objectTuple struct {
161         Name, Path, Oid string
162         Size            int64
163 }
164
165 func (o *objectTuple) ToTransfer() *Transfer {
166         return &Transfer{
167                 Name: o.Name,
168                 Path: o.Path,
169                 Oid:  o.Oid,
170                 Size: o.Size,
171         }
172 }
173
174 type Option func(*TransferQueue)
175
176 func DryRun(dryRun bool) Option {
177         return func(tq *TransferQueue) {
178                 tq.dryRun = dryRun
179         }
180 }
181
182 func WithProgress(m *Meter) Option {
183         return func(tq *TransferQueue) {
184                 tq.meter = m
185         }
186 }
187
188 func RemoteRef(ref *git.Ref) Option {
189         return func(tq *TransferQueue) {
190                 tq.ref = ref
191         }
192 }
193
194 func WithProgressCallback(cb tools.CopyCallback) Option {
195         return func(tq *TransferQueue) {
196                 tq.cb = cb
197         }
198 }
199
200 func WithBatchSize(size int) Option {
201         return func(tq *TransferQueue) { tq.batchSize = size }
202 }
203
204 func WithBufferDepth(depth int) Option {
205         return func(tq *TransferQueue) { tq.bufferDepth = depth }
206 }
207
208 // NewTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter
209 func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options ...Option) *TransferQueue {
210         q := &TransferQueue{
211                 direction: dir,
212                 client:    &tqClient{Client: manifest.APIClient()},
213                 remote:    remote,
214                 errorc:    make(chan error),
215                 transfers: make(map[string]*objects),
216                 trMutex:   &sync.Mutex{},
217                 manifest:  manifest,
218                 rc:        newRetryCounter(),
219         }
220
221         for _, opt := range options {
222                 opt(q)
223         }
224
225         q.rc.MaxRetries = q.manifest.maxRetries
226         q.client.MaxRetries = q.manifest.maxRetries
227
228         if q.batchSize <= 0 {
229                 q.batchSize = defaultBatchSize
230         }
231         if q.bufferDepth <= 0 {
232                 q.bufferDepth = q.batchSize
233         }
234         if q.meter != nil {
235                 q.meter.Direction = q.direction
236         }
237
238         q.incoming = make(chan *objectTuple, q.bufferDepth)
239         q.collectorWait.Add(1)
240         q.errorwait.Add(1)
241         q.run()
242
243         return q
244 }
245
246 // Add adds a *Transfer to the transfer queue. It only increments the amount
247 // of waiting the TransferQueue has to do if the *Transfer "t" is new.
248 //
249 // If another transfer(s) with the same OID has been added to the *TransferQueue
250 // already, the given transfer will not be enqueued, but will be sent to any
251 // channel created by Watch() once the oldest transfer has completed.
252 //
253 // Only one file will be transferred to/from the Path element of the first
254 // transfer.
255 func (q *TransferQueue) Add(name, path, oid string, size int64) {
256         t := &objectTuple{
257                 Name: name,
258                 Path: path,
259                 Oid:  oid,
260                 Size: size,
261         }
262
263         if objs := q.remember(t); len(objs.objects) > 1 {
264                 if objs.completed {
265                         // If there is already a completed transfer chain for
266                         // this OID, then this object is already "done", and can
267                         // be sent through as completed to the watchers.
268                         for _, w := range q.watchers {
269                                 w <- t.ToTransfer()
270                         }
271                 }
272
273                 // If the chain is not done, there is no reason to enqueue this
274                 // transfer into 'q.incoming'.
275                 tracerx.Printf("already transferring %q, skipping duplicate", t.Oid)
276                 return
277         }
278
279         q.incoming <- t
280 }
281
282 // remember remembers the *Transfer "t" if the *TransferQueue doesn't already
283 // know about a Transfer with the same OID.
284 //
285 // It returns if the value is new or not.
286 func (q *TransferQueue) remember(t *objectTuple) objects {
287         q.trMutex.Lock()
288         defer q.trMutex.Unlock()
289
290         if _, ok := q.transfers[t.Oid]; !ok {
291                 q.wait.Add(1)
292                 q.transfers[t.Oid] = &objects{
293                         objects: []*objectTuple{t},
294                 }
295
296                 return *q.transfers[t.Oid]
297         }
298
299         q.transfers[t.Oid] = q.transfers[t.Oid].Append(t)
300
301         return *q.transfers[t.Oid]
302 }
303
304 // collectBatches collects batches in a loop, prioritizing failed items from the
305 // previous before adding new items. The process works as follows:
306 //
307 //   1. Create a new batch, of size `q.batchSize`, and containing no items
308 //   2. While the batch contains less items than `q.batchSize` AND the channel
309 //      is open, read one item from the `q.incoming` channel.
310 //      a. If the read was a channel close, go to step 4.
311 //      b. If the read was a transferable item, go to step 3.
312 //   3. Append the item to the batch.
313 //   4. Sort the batch by descending object size, make a batch API call, send
314 //      the items to the `*adapterBase`.
315 //   5. In a separate goroutine, process the worker results, incrementing and
316 //      appending retries if possible. On the main goroutine, accept new items
317 //      into "pending".
318 //   6. Concat() the "next" and "pending" batches such that no more items than
319 //      the maximum allowed per batch are in next, and the rest are in pending.
320 //   7. If the `q.incoming` channel is open, go to step 2.
321 //   8. If the next batch is empty AND the `q.incoming` channel is closed,
322 //      terminate immediately.
323 //
324 // collectBatches runs in its own goroutine.
325 func (q *TransferQueue) collectBatches() {
326         defer q.collectorWait.Done()
327
328         var closing bool
329         next := q.makeBatch()
330         pending := q.makeBatch()
331
332         for {
333                 for !closing && (len(next) < q.batchSize) {
334                         t, ok := <-q.incoming
335                         if !ok {
336                                 closing = true
337                                 break
338                         }
339
340                         next = append(next, t)
341                 }
342
343                 // Before enqueuing the next batch, sort by descending object
344                 // size.
345                 sort.Sort(sort.Reverse(next))
346
347                 done := make(chan struct{})
348
349                 var retries batch
350
351                 go func() {
352                         var err error
353
354                         retries, err = q.enqueueAndCollectRetriesFor(next)
355                         if err != nil {
356                                 q.errorc <- err
357                         }
358
359                         close(done)
360                 }()
361
362                 var collected batch
363                 collected, closing = q.collectPendingUntil(done)
364
365                 // Ensure the next batch is filled with, in order:
366                 //
367                 // - retries from the previous batch,
368                 // - new additions that were enqueued behind retries, &
369                 // - items collected while the batch was processing.
370                 next, pending = retries.Concat(append(pending, collected...), q.batchSize)
371
372                 if closing && len(next) == 0 {
373                         // If len(next) == 0, there are no items in "pending",
374                         // and it is safe to exit.
375                         break
376                 }
377         }
378 }
379
380 // collectPendingUntil collects items from q.incoming into a "pending" batch
381 // until the given "done" channel is written to, or is closed.
382 //
383 // A "pending" batch is returned, along with whether or not "q.incoming" is
384 // closed.
385 func (q *TransferQueue) collectPendingUntil(done <-chan struct{}) (pending batch, closing bool) {
386         for {
387                 select {
388                 case t, ok := <-q.incoming:
389                         if !ok {
390                                 closing = true
391                                 <-done
392                                 return
393                         }
394
395                         pending = append(pending, t)
396                 case <-done:
397                         return
398                 }
399         }
400 }
401
402 // enqueueAndCollectRetriesFor makes a Batch API call and returns a "next" batch
403 // containing all of the objects that failed from the previous batch and had
404 // retries availale to them.
405 //
406 // If an error was encountered while making the API request, _all_ of the items
407 // from the previous batch (that have retries available to them) will be
408 // returned immediately, along with the error that was encountered.
409 //
410 // enqueueAndCollectRetriesFor blocks until the entire Batch "batch" has been
411 // processed.
412 func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) {
413         next := q.makeBatch()
414         tracerx.Printf("tq: sending batch of size %d", len(batch))
415
416         q.meter.Pause()
417         var bRes *BatchResponse
418         if q.manifest.standaloneTransferAgent != "" {
419                 // Trust the external transfer agent can do everything by itself.
420                 objects := make([]*Transfer, 0, len(batch))
421                 for _, t := range batch {
422                         objects = append(objects, &Transfer{Oid: t.Oid, Size: t.Size, Path: t.Path})
423                 }
424                 bRes = &BatchResponse{
425                         Objects:             objects,
426                         TransferAdapterName: q.manifest.standaloneTransferAgent,
427                 }
428         } else {
429                 // Query the Git LFS server for what transfer method to use and
430                 // details such as URLs, authentication, etc.
431                 var err error
432                 bRes, err = Batch(q.manifest, q.direction, q.remote, q.ref, batch.ToTransfers())
433                 if err != nil {
434                         // If there was an error making the batch API call, mark all of
435                         // the objects for retry, and return them along with the error
436                         // that was encountered. If any of the objects couldn't be
437                         // retried, they will be marked as failed.
438                         for _, t := range batch {
439                                 if q.canRetryObject(t.Oid, err) {
440                                         q.rc.Increment(t.Oid)
441
442                                         next = append(next, t)
443                                 } else {
444                                         q.wait.Done()
445                                 }
446                         }
447
448                         return next, err
449                 }
450         }
451
452         if len(bRes.Objects) == 0 {
453                 return next, nil
454         }
455
456         q.useAdapter(bRes.TransferAdapterName)
457         q.meter.Start()
458
459         toTransfer := make([]*Transfer, 0, len(bRes.Objects))
460
461         for _, o := range bRes.Objects {
462                 if o.Error != nil {
463                         q.errorc <- errors.Wrapf(o.Error, "[%v] %v", o.Oid, o.Error.Message)
464                         q.Skip(o.Size)
465                         q.wait.Done()
466
467                         continue
468                 }
469
470                 q.trMutex.Lock()
471                 objects, ok := q.transfers[o.Oid]
472                 q.trMutex.Unlock()
473                 if !ok {
474                         // If we couldn't find any associated
475                         // Transfer object, then we give up on the
476                         // transfer by telling the progress meter to
477                         // skip the number of bytes in "o".
478                         q.errorc <- errors.Errorf("[%v] The server returned an unknown OID.", o.Oid)
479
480                         q.Skip(o.Size)
481                         q.wait.Done()
482                 } else {
483                         // Pick t[0], since it will cover all transfers with the
484                         // same OID.
485                         tr := newTransfer(o, objects.First().Name, objects.First().Path)
486
487                         if a, err := tr.Rel(q.direction.String()); err != nil {
488                                 // XXX(taylor): duplication
489                                 if q.canRetryObject(tr.Oid, err) {
490                                         q.rc.Increment(tr.Oid)
491                                         count := q.rc.CountFor(tr.Oid)
492
493                                         tracerx.Printf("tq: enqueue retry #%d for %q (size: %d): %s", count, tr.Oid, tr.Size, err)
494                                         next = append(next, objects.First())
495                                 } else {
496                                         q.errorc <- errors.Errorf("[%v] %v", tr.Name, err)
497
498                                         q.Skip(o.Size)
499                                         q.wait.Done()
500                                 }
501                         } else if a == nil && q.manifest.standaloneTransferAgent == "" {
502                                 q.Skip(o.Size)
503                                 q.wait.Done()
504                         } else {
505                                 q.meter.StartTransfer(objects.First().Name)
506                                 toTransfer = append(toTransfer, tr)
507                         }
508                 }
509         }
510
511         retries := q.addToAdapter(bRes.endpoint, toTransfer)
512         for t := range retries {
513                 q.rc.Increment(t.Oid)
514                 count := q.rc.CountFor(t.Oid)
515
516                 tracerx.Printf("tq: enqueue retry #%d for %q (size: %d)", count, t.Oid, t.Size)
517
518                 next = append(next, t)
519         }
520
521         return next, nil
522 }
523
524 // makeBatch returns a new, empty batch, with a capacity equal to the maximum
525 // batch size designated by the `*TransferQueue`.
526 func (q *TransferQueue) makeBatch() batch { return make(batch, 0, q.batchSize) }
527
528 // addToAdapter adds the given "pending" transfers to the transfer adapters and
529 // returns a channel of Transfers that are to be retried in the next batch.
530 // After all of the items in the batch have been processed, the channel is
531 // closed.
532 //
533 // addToAdapter returns immediately, and does not block.
534 func (q *TransferQueue) addToAdapter(e lfsapi.Endpoint, pending []*Transfer) <-chan *objectTuple {
535         retries := make(chan *objectTuple, len(pending))
536
537         if err := q.ensureAdapterBegun(e); err != nil {
538                 close(retries)
539
540                 q.errorc <- err
541                 for _, t := range pending {
542                         q.Skip(t.Size)
543                         q.wait.Done()
544                 }
545
546                 return retries
547         }
548
549         present, missingResults := q.partitionTransfers(pending)
550
551         go func() {
552                 defer close(retries)
553
554                 var results <-chan TransferResult
555                 if q.dryRun {
556                         results = q.makeDryRunResults(present)
557                 } else {
558                         results = q.adapter.Add(present...)
559                 }
560
561                 for _, res := range missingResults {
562                         q.handleTransferResult(res, retries)
563                 }
564                 for res := range results {
565                         q.handleTransferResult(res, retries)
566                 }
567         }()
568
569         return retries
570 }
571
572 func (q *TransferQueue) partitionTransfers(transfers []*Transfer) (present []*Transfer, results []TransferResult) {
573         if q.direction != Upload {
574                 return transfers, nil
575         }
576
577         present = make([]*Transfer, 0, len(transfers))
578         results = make([]TransferResult, 0, len(transfers))
579
580         for _, t := range transfers {
581                 var err error
582
583                 if t.Size < 0 {
584                         err = errors.Errorf("Git LFS: object %q has invalid size (got: %d)", t.Oid, t.Size)
585                 } else {
586                         fd, serr := os.Stat(t.Path)
587                         if serr != nil {
588                                 if os.IsNotExist(serr) {
589                                         err = newObjectMissingError(t.Name, t.Oid)
590                                 } else {
591                                         err = serr
592                                 }
593                         } else if t.Size != fd.Size() {
594                                 err = newCorruptObjectError(t.Name, t.Oid)
595                         }
596                 }
597
598                 if err != nil {
599                         results = append(results, TransferResult{
600                                 Transfer: t,
601                                 Error:    err,
602                         })
603                 } else {
604                         present = append(present, t)
605                 }
606         }
607
608         return
609 }
610
611 // makeDryRunResults returns a channel populated immediately with "successful"
612 // results for all of the given transfers in "ts".
613 func (q *TransferQueue) makeDryRunResults(ts []*Transfer) <-chan TransferResult {
614         results := make(chan TransferResult, len(ts))
615         for _, t := range ts {
616                 results <- TransferResult{t, nil}
617         }
618
619         close(results)
620
621         return results
622 }
623
624 // handleTransferResult observes the transfer result, sending it on the retries
625 // channel if it was able to be retried.
626 func (q *TransferQueue) handleTransferResult(
627         res TransferResult, retries chan<- *objectTuple,
628 ) {
629         oid := res.Transfer.Oid
630
631         if res.Error != nil {
632                 // If there was an error encountered when processing the
633                 // transfer (res.Transfer), handle the error as is appropriate:
634
635                 if q.canRetryObject(oid, res.Error) {
636                         // If the object can be retried, send it on the retries
637                         // channel, where it will be read at the call-site and
638                         // its retry count will be incremented.
639                         tracerx.Printf("tq: retrying object %s: %s", oid, res.Error)
640
641                         q.trMutex.Lock()
642                         objects, ok := q.transfers[oid]
643                         q.trMutex.Unlock()
644
645                         if ok {
646                                 retries <- objects.First()
647                         } else {
648                                 q.errorc <- res.Error
649                         }
650                 } else {
651                         // If the error wasn't retriable, OR the object has
652                         // exceeded its retry budget, it will be NOT be sent to
653                         // the retry channel, and the error will be reported
654                         // immediately.
655                         q.errorc <- res.Error
656                         q.wait.Done()
657                 }
658         } else {
659                 q.trMutex.Lock()
660                 objects := q.transfers[oid]
661                 objects.completed = true
662
663                 // Otherwise, if the transfer was successful, notify all of the
664                 // watchers, and mark it as finished.
665                 for _, c := range q.watchers {
666                         // Send one update for each transfer with the
667                         // same OID.
668                         for _, t := range objects.All() {
669                                 c <- &Transfer{
670                                         Name: t.Name,
671                                         Path: t.Path,
672                                         Oid:  t.Oid,
673                                         Size: t.Size,
674                                 }
675                         }
676                 }
677
678                 q.trMutex.Unlock()
679
680                 q.meter.FinishTransfer(res.Transfer.Name)
681                 q.wait.Done()
682         }
683 }
684
685 func (q *TransferQueue) useAdapter(name string) {
686         q.adapterInitMutex.Lock()
687         defer q.adapterInitMutex.Unlock()
688
689         if q.adapter != nil {
690                 if q.adapter.Name() == name {
691                         // re-use, this is the normal path
692                         return
693                 }
694                 // If the adapter we're using isn't the same as the one we've been
695                 // told to use now, must wait for the current one to finish then switch
696                 // This will probably never happen but is just in case server starts
697                 // changing adapter support in between batches
698                 q.finishAdapter()
699         }
700         q.adapter = q.manifest.NewAdapterOrDefault(name, q.direction)
701 }
702
703 func (q *TransferQueue) finishAdapter() {
704         if q.adapterInProgress {
705                 q.adapter.End()
706                 q.adapterInProgress = false
707                 q.adapter = nil
708         }
709 }
710
711 // BatchSize returns the batch size of the receiving *TransferQueue, or, the
712 // number of transfers to accept before beginning work on them.
713 func (q *TransferQueue) BatchSize() int {
714         return q.batchSize
715 }
716
717 func (q *TransferQueue) Skip(size int64) {
718         q.meter.Skip(size)
719 }
720
721 func (q *TransferQueue) ensureAdapterBegun(e lfsapi.Endpoint) error {
722         q.adapterInitMutex.Lock()
723         defer q.adapterInitMutex.Unlock()
724
725         if q.adapterInProgress {
726                 return nil
727         }
728
729         // Progress callback - receives byte updates
730         cb := func(name string, total, read int64, current int) error {
731                 q.meter.TransferBytes(q.direction.String(), name, read, total, current)
732                 if q.cb != nil {
733                         // NOTE: this is the mechanism by which the logpath
734                         // specified by GIT_LFS_PROGRESS is written to.
735                         //
736                         // See: lfs.downloadFile() for more.
737                         q.cb(total, read, current)
738                 }
739                 return nil
740         }
741
742         tracerx.Printf("tq: starting transfer adapter %q", q.adapter.Name())
743         err := q.adapter.Begin(q.toAdapterCfg(e), cb)
744         if err != nil {
745                 return err
746         }
747         q.adapterInProgress = true
748
749         return nil
750 }
751
752 func (q *TransferQueue) toAdapterCfg(e lfsapi.Endpoint) AdapterConfig {
753         apiClient := q.manifest.APIClient()
754         concurrency := q.manifest.ConcurrentTransfers()
755         if apiClient.Endpoints.AccessFor(e.Url) == lfsapi.NTLMAccess {
756                 concurrency = 1
757         }
758
759         return &adapterConfig{
760                 concurrentTransfers: concurrency,
761                 apiClient:           apiClient,
762                 remote:              q.remote,
763         }
764 }
765
766 // Wait waits for the queue to finish processing all transfers. Once Wait is
767 // called, Add will no longer add transfers to the queue. Any failed
768 // transfers will be automatically retried once.
769 func (q *TransferQueue) Wait() {
770         close(q.incoming)
771
772         q.wait.Wait()
773         q.collectorWait.Wait()
774
775         q.finishAdapter()
776         close(q.errorc)
777
778         for _, watcher := range q.watchers {
779                 close(watcher)
780         }
781
782         q.meter.Flush()
783         q.errorwait.Wait()
784 }
785
786 // Watch returns a channel where the queue will write the value of each transfer
787 // as it completes. If multiple transfers exist with the same OID, they will all
788 // be recorded here, even though only one actual transfer took place. The
789 // channel will be closed when the queue finishes processing.
790 func (q *TransferQueue) Watch() chan *Transfer {
791         c := make(chan *Transfer, q.batchSize)
792         q.watchers = append(q.watchers, c)
793         return c
794 }
795
796 // This goroutine collects errors returned from transfers
797 func (q *TransferQueue) errorCollector() {
798         for err := range q.errorc {
799                 q.errors = append(q.errors, err)
800         }
801         q.errorwait.Done()
802 }
803
804 // run begins the transfer queue. It transfers files sequentially or
805 // concurrently depending on the Config.ConcurrentTransfers() value.
806 func (q *TransferQueue) run() {
807         tracerx.Printf("tq: running as batched queue, batch size of %d", q.batchSize)
808
809         go q.errorCollector()
810         go q.collectBatches()
811 }
812
813 // canRetry returns whether or not the given error "err" is retriable.
814 func (q *TransferQueue) canRetry(err error) bool {
815         return errors.IsRetriableError(err)
816 }
817
818 // canRetryObject returns whether the given error is retriable for the object
819 // given by "oid". If the an OID has met its retry limit, then it will not be
820 // able to be retried again. If so, canRetryObject returns whether or not that
821 // given error "err" is retriable.
822 func (q *TransferQueue) canRetryObject(oid string, err error) bool {
823         if count, ok := q.rc.CanRetry(oid); !ok {
824                 tracerx.Printf("tq: refusing to retry %q, too many retries (%d)", oid, count)
825                 return false
826         }
827
828         return q.canRetry(err)
829 }
830
831 // Errors returns any errors encountered during transfer.
832 func (q *TransferQueue) Errors() []error {
833         return q.errors
834 }