Imported Upstream version 2.5.1
[scm/test.git] / tq / transfer_queue.go
1 package tq
2
3 import (
4         "fmt"
5         "os"
6         "sort"
7         "sync"
8
9         "github.com/git-lfs/git-lfs/errors"
10         "github.com/git-lfs/git-lfs/git"
11         "github.com/git-lfs/git-lfs/lfsapi"
12         "github.com/git-lfs/git-lfs/tools"
13         "github.com/rubyist/tracerx"
14 )
15
16 const (
17         defaultBatchSize = 100
18 )
19
20 type retryCounter struct {
21         MaxRetries int `git:"lfs.transfer.maxretries"`
22
23         // cmu guards count
24         cmu sync.Mutex
25         // count maps OIDs to number of retry attempts
26         count map[string]int
27 }
28
29 // newRetryCounter instantiates a new *retryCounter. It parses the gitconfig
30 // value: `lfs.transfer.maxretries`, and falls back to defaultMaxRetries if none
31 // was provided.
32 //
33 // If it encountered an error in Unmarshaling the *config.Configuration, it will
34 // be returned, otherwise nil.
35 func newRetryCounter() *retryCounter {
36         return &retryCounter{
37                 MaxRetries: defaultMaxRetries,
38                 count:      make(map[string]int),
39         }
40 }
41
42 // Increment increments the number of retries for a given OID. It is safe to
43 // call across multiple goroutines.
44 func (r *retryCounter) Increment(oid string) {
45         r.cmu.Lock()
46         defer r.cmu.Unlock()
47
48         r.count[oid]++
49 }
50
51 // CountFor returns the current number of retries for a given OID. It is safe to
52 // call across multiple goroutines.
53 func (r *retryCounter) CountFor(oid string) int {
54         r.cmu.Lock()
55         defer r.cmu.Unlock()
56
57         return r.count[oid]
58 }
59
60 // CanRetry returns the current number of retries, and whether or not it exceeds
61 // the maximum number of retries (see: retryCounter.MaxRetries).
62 func (r *retryCounter) CanRetry(oid string) (int, bool) {
63         count := r.CountFor(oid)
64         return count, count < r.MaxRetries
65 }
66
67 // batch implements the sort.Interface interface and enables sorting on a slice
68 // of `*Transfer`s by object size.
69 //
70 // This interface is implemented here so that the largest objects can be
71 // processed first. Since adding a new batch is unable to occur until the
72 // current batch has finished processing, this enables us to reduce the risk of
73 // a single worker getting tied up on a large item at the end of a batch while
74 // all other workers are sitting idle.
75 type batch []*objectTuple
76
77 // Concat concatenates two batches together, returning a single, clamped batch as
78 // "left", and the remainder of elements as "right". If the union of the
79 // receiver and "other" has cardinality less than "size", "right" will be
80 // returned as nil.
81 func (b batch) Concat(other batch, size int) (left, right batch) {
82         u := batch(append(b, other...))
83         if len(u) <= size {
84                 return u, nil
85         }
86         return u[:size], u[size:]
87 }
88
89 func (b batch) ToTransfers() []*Transfer {
90         transfers := make([]*Transfer, 0, len(b))
91         for _, t := range b {
92                 transfers = append(transfers, &Transfer{Oid: t.Oid, Size: t.Size})
93         }
94         return transfers
95 }
96
97 func (b batch) Len() int           { return len(b) }
98 func (b batch) Less(i, j int) bool { return b[i].Size < b[j].Size }
99 func (b batch) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }
100
101 // TransferQueue organises the wider process of uploading and downloading,
102 // including calling the API, passing the actual transfer request to transfer
103 // adapters, and dealing with progress, errors and retries.
104 type TransferQueue struct {
105         direction         Direction
106         client            *tqClient
107         remote            string
108         ref               *git.Ref
109         adapter           Adapter
110         adapterInProgress bool
111         adapterInitMutex  sync.Mutex
112         dryRun            bool
113         cb                tools.CopyCallback
114         meter             *Meter
115         errors            []error
116         transfers         map[string]*objects
117         batchSize         int
118         bufferDepth       int
119         incoming          chan *objectTuple // Channel for processing incoming items
120         errorc            chan error        // Channel for processing errors
121         watchers          []chan *Transfer
122         trMutex           *sync.Mutex
123         collectorWait     sync.WaitGroup
124         errorwait         sync.WaitGroup
125         // wait is used to keep track of pending transfers. It is incremented
126         // once per unique OID on Add(), and is decremented when that transfer
127         // is marked as completed or failed, but not retried.
128         wait     sync.WaitGroup
129         manifest *Manifest
130         rc       *retryCounter
131
132         // unsupportedContentType indicates whether the transfer queue ever saw
133         // an HTTP 422 response indicating that their upload destination does
134         // not support Content-Type detection.
135         unsupportedContentType bool
136 }
137
138 // objects holds a set of objects.
139 type objects struct {
140         completed bool
141         objects   []*objectTuple
142 }
143
144 // All returns all *objectTuple's contained in the *objects set.
145 func (s *objects) All() []*objectTuple {
146         return s.objects
147 }
148
149 // Append returns a new *objects with the given *objectTuple(s) appended to the
150 // end of the known objects.
151 func (s *objects) Append(os ...*objectTuple) *objects {
152         return &objects{
153                 completed: s.completed,
154                 objects:   append(s.objects, os...),
155         }
156 }
157
158 // First returns the first *objectTuple in the chain of objects.
159 func (s *objects) First() *objectTuple {
160         if len(s.objects) == 0 {
161                 return nil
162         }
163         return s.objects[0]
164 }
165
166 type objectTuple struct {
167         Name, Path, Oid string
168         Size            int64
169 }
170
171 func (o *objectTuple) ToTransfer() *Transfer {
172         return &Transfer{
173                 Name: o.Name,
174                 Path: o.Path,
175                 Oid:  o.Oid,
176                 Size: o.Size,
177         }
178 }
179
180 type Option func(*TransferQueue)
181
182 func DryRun(dryRun bool) Option {
183         return func(tq *TransferQueue) {
184                 tq.dryRun = dryRun
185         }
186 }
187
188 func WithProgress(m *Meter) Option {
189         return func(tq *TransferQueue) {
190                 tq.meter = m
191         }
192 }
193
194 func RemoteRef(ref *git.Ref) Option {
195         return func(tq *TransferQueue) {
196                 tq.ref = ref
197         }
198 }
199
200 func WithProgressCallback(cb tools.CopyCallback) Option {
201         return func(tq *TransferQueue) {
202                 tq.cb = cb
203         }
204 }
205
206 func WithBatchSize(size int) Option {
207         return func(tq *TransferQueue) { tq.batchSize = size }
208 }
209
210 func WithBufferDepth(depth int) Option {
211         return func(tq *TransferQueue) { tq.bufferDepth = depth }
212 }
213
214 // NewTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter
215 func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options ...Option) *TransferQueue {
216         q := &TransferQueue{
217                 direction: dir,
218                 client:    &tqClient{Client: manifest.APIClient()},
219                 remote:    remote,
220                 errorc:    make(chan error),
221                 transfers: make(map[string]*objects),
222                 trMutex:   &sync.Mutex{},
223                 manifest:  manifest,
224                 rc:        newRetryCounter(),
225         }
226
227         for _, opt := range options {
228                 opt(q)
229         }
230
231         q.rc.MaxRetries = q.manifest.maxRetries
232         q.client.MaxRetries = q.manifest.maxRetries
233
234         if q.batchSize <= 0 {
235                 q.batchSize = defaultBatchSize
236         }
237         if q.bufferDepth <= 0 {
238                 q.bufferDepth = q.batchSize
239         }
240         if q.meter != nil {
241                 q.meter.Direction = q.direction
242         }
243
244         q.incoming = make(chan *objectTuple, q.bufferDepth)
245         q.collectorWait.Add(1)
246         q.errorwait.Add(1)
247         q.run()
248
249         return q
250 }
251
252 // Add adds a *Transfer to the transfer queue. It only increments the amount
253 // of waiting the TransferQueue has to do if the *Transfer "t" is new.
254 //
255 // If another transfer(s) with the same OID has been added to the *TransferQueue
256 // already, the given transfer will not be enqueued, but will be sent to any
257 // channel created by Watch() once the oldest transfer has completed.
258 //
259 // Only one file will be transferred to/from the Path element of the first
260 // transfer.
261 func (q *TransferQueue) Add(name, path, oid string, size int64) {
262         t := &objectTuple{
263                 Name: name,
264                 Path: path,
265                 Oid:  oid,
266                 Size: size,
267         }
268
269         if objs := q.remember(t); len(objs.objects) > 1 {
270                 if objs.completed {
271                         // If there is already a completed transfer chain for
272                         // this OID, then this object is already "done", and can
273                         // be sent through as completed to the watchers.
274                         for _, w := range q.watchers {
275                                 w <- t.ToTransfer()
276                         }
277                 }
278
279                 // If the chain is not done, there is no reason to enqueue this
280                 // transfer into 'q.incoming'.
281                 tracerx.Printf("already transferring %q, skipping duplicate", t.Oid)
282                 return
283         }
284
285         q.incoming <- t
286 }
287
288 // remember remembers the *Transfer "t" if the *TransferQueue doesn't already
289 // know about a Transfer with the same OID.
290 //
291 // It returns if the value is new or not.
292 func (q *TransferQueue) remember(t *objectTuple) objects {
293         q.trMutex.Lock()
294         defer q.trMutex.Unlock()
295
296         if _, ok := q.transfers[t.Oid]; !ok {
297                 q.wait.Add(1)
298                 q.transfers[t.Oid] = &objects{
299                         objects: []*objectTuple{t},
300                 }
301
302                 return *q.transfers[t.Oid]
303         }
304
305         q.transfers[t.Oid] = q.transfers[t.Oid].Append(t)
306
307         return *q.transfers[t.Oid]
308 }
309
310 // collectBatches collects batches in a loop, prioritizing failed items from the
311 // previous before adding new items. The process works as follows:
312 //
313 //   1. Create a new batch, of size `q.batchSize`, and containing no items
314 //   2. While the batch contains less items than `q.batchSize` AND the channel
315 //      is open, read one item from the `q.incoming` channel.
316 //      a. If the read was a channel close, go to step 4.
317 //      b. If the read was a transferable item, go to step 3.
318 //   3. Append the item to the batch.
319 //   4. Sort the batch by descending object size, make a batch API call, send
320 //      the items to the `*adapterBase`.
321 //   5. In a separate goroutine, process the worker results, incrementing and
322 //      appending retries if possible. On the main goroutine, accept new items
323 //      into "pending".
324 //   6. Concat() the "next" and "pending" batches such that no more items than
325 //      the maximum allowed per batch are in next, and the rest are in pending.
326 //   7. If the `q.incoming` channel is open, go to step 2.
327 //   8. If the next batch is empty AND the `q.incoming` channel is closed,
328 //      terminate immediately.
329 //
330 // collectBatches runs in its own goroutine.
331 func (q *TransferQueue) collectBatches() {
332         defer q.collectorWait.Done()
333
334         var closing bool
335         next := q.makeBatch()
336         pending := q.makeBatch()
337
338         for {
339                 for !closing && (len(next) < q.batchSize) {
340                         t, ok := <-q.incoming
341                         if !ok {
342                                 closing = true
343                                 break
344                         }
345
346                         next = append(next, t)
347                 }
348
349                 // Before enqueuing the next batch, sort by descending object
350                 // size.
351                 sort.Sort(sort.Reverse(next))
352
353                 done := make(chan struct{})
354
355                 var retries batch
356
357                 go func() {
358                         var err error
359
360                         retries, err = q.enqueueAndCollectRetriesFor(next)
361                         if err != nil {
362                                 q.errorc <- err
363                         }
364
365                         close(done)
366                 }()
367
368                 var collected batch
369                 collected, closing = q.collectPendingUntil(done)
370
371                 // Ensure the next batch is filled with, in order:
372                 //
373                 // - retries from the previous batch,
374                 // - new additions that were enqueued behind retries, &
375                 // - items collected while the batch was processing.
376                 next, pending = retries.Concat(append(pending, collected...), q.batchSize)
377
378                 if closing && len(next) == 0 {
379                         // If len(next) == 0, there are no items in "pending",
380                         // and it is safe to exit.
381                         break
382                 }
383         }
384 }
385
386 // collectPendingUntil collects items from q.incoming into a "pending" batch
387 // until the given "done" channel is written to, or is closed.
388 //
389 // A "pending" batch is returned, along with whether or not "q.incoming" is
390 // closed.
391 func (q *TransferQueue) collectPendingUntil(done <-chan struct{}) (pending batch, closing bool) {
392         for {
393                 select {
394                 case t, ok := <-q.incoming:
395                         if !ok {
396                                 closing = true
397                                 <-done
398                                 return
399                         }
400
401                         pending = append(pending, t)
402                 case <-done:
403                         return
404                 }
405         }
406 }
407
408 // enqueueAndCollectRetriesFor makes a Batch API call and returns a "next" batch
409 // containing all of the objects that failed from the previous batch and had
410 // retries availale to them.
411 //
412 // If an error was encountered while making the API request, _all_ of the items
413 // from the previous batch (that have retries available to them) will be
414 // returned immediately, along with the error that was encountered.
415 //
416 // enqueueAndCollectRetriesFor blocks until the entire Batch "batch" has been
417 // processed.
418 func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) {
419         next := q.makeBatch()
420         tracerx.Printf("tq: sending batch of size %d", len(batch))
421
422         q.meter.Pause()
423         var bRes *BatchResponse
424         if q.manifest.standaloneTransferAgent != "" {
425                 // Trust the external transfer agent can do everything by itself.
426                 objects := make([]*Transfer, 0, len(batch))
427                 for _, t := range batch {
428                         objects = append(objects, &Transfer{Oid: t.Oid, Size: t.Size, Path: t.Path})
429                 }
430                 bRes = &BatchResponse{
431                         Objects:             objects,
432                         TransferAdapterName: q.manifest.standaloneTransferAgent,
433                 }
434         } else {
435                 // Query the Git LFS server for what transfer method to use and
436                 // details such as URLs, authentication, etc.
437                 var err error
438                 bRes, err = Batch(q.manifest, q.direction, q.remote, q.ref, batch.ToTransfers())
439                 if err != nil {
440                         // If there was an error making the batch API call, mark all of
441                         // the objects for retry, and return them along with the error
442                         // that was encountered. If any of the objects couldn't be
443                         // retried, they will be marked as failed.
444                         for _, t := range batch {
445                                 if q.canRetryObject(t.Oid, err) {
446                                         q.rc.Increment(t.Oid)
447
448                                         next = append(next, t)
449                                 } else {
450                                         q.wait.Done()
451                                 }
452                         }
453
454                         return next, err
455                 }
456         }
457
458         if len(bRes.Objects) == 0 {
459                 return next, nil
460         }
461
462         q.useAdapter(bRes.TransferAdapterName)
463         q.meter.Start()
464
465         toTransfer := make([]*Transfer, 0, len(bRes.Objects))
466
467         for _, o := range bRes.Objects {
468                 if o.Error != nil {
469                         q.errorc <- errors.Wrapf(o.Error, "[%v] %v", o.Oid, o.Error.Message)
470                         q.Skip(o.Size)
471                         q.wait.Done()
472
473                         continue
474                 }
475
476                 q.trMutex.Lock()
477                 objects, ok := q.transfers[o.Oid]
478                 q.trMutex.Unlock()
479                 if !ok {
480                         // If we couldn't find any associated
481                         // Transfer object, then we give up on the
482                         // transfer by telling the progress meter to
483                         // skip the number of bytes in "o".
484                         q.errorc <- errors.Errorf("[%v] The server returned an unknown OID.", o.Oid)
485
486                         q.Skip(o.Size)
487                         q.wait.Done()
488                 } else {
489                         // Pick t[0], since it will cover all transfers with the
490                         // same OID.
491                         tr := newTransfer(o, objects.First().Name, objects.First().Path)
492
493                         if a, err := tr.Rel(q.direction.String()); err != nil {
494                                 // XXX(taylor): duplication
495                                 if q.canRetryObject(tr.Oid, err) {
496                                         q.rc.Increment(tr.Oid)
497                                         count := q.rc.CountFor(tr.Oid)
498
499                                         tracerx.Printf("tq: enqueue retry #%d for %q (size: %d): %s", count, tr.Oid, tr.Size, err)
500                                         next = append(next, objects.First())
501                                 } else {
502                                         q.errorc <- errors.Errorf("[%v] %v", tr.Name, err)
503
504                                         q.Skip(o.Size)
505                                         q.wait.Done()
506                                 }
507                         } else if a == nil && q.manifest.standaloneTransferAgent == "" {
508                                 q.Skip(o.Size)
509                                 q.wait.Done()
510                         } else {
511                                 q.meter.StartTransfer(objects.First().Name)
512                                 toTransfer = append(toTransfer, tr)
513                         }
514                 }
515         }
516
517         retries := q.addToAdapter(bRes.endpoint, toTransfer)
518         for t := range retries {
519                 q.rc.Increment(t.Oid)
520                 count := q.rc.CountFor(t.Oid)
521
522                 tracerx.Printf("tq: enqueue retry #%d for %q (size: %d)", count, t.Oid, t.Size)
523
524                 next = append(next, t)
525         }
526
527         return next, nil
528 }
529
530 // makeBatch returns a new, empty batch, with a capacity equal to the maximum
531 // batch size designated by the `*TransferQueue`.
532 func (q *TransferQueue) makeBatch() batch { return make(batch, 0, q.batchSize) }
533
534 // addToAdapter adds the given "pending" transfers to the transfer adapters and
535 // returns a channel of Transfers that are to be retried in the next batch.
536 // After all of the items in the batch have been processed, the channel is
537 // closed.
538 //
539 // addToAdapter returns immediately, and does not block.
540 func (q *TransferQueue) addToAdapter(e lfsapi.Endpoint, pending []*Transfer) <-chan *objectTuple {
541         retries := make(chan *objectTuple, len(pending))
542
543         if err := q.ensureAdapterBegun(e); err != nil {
544                 close(retries)
545
546                 q.errorc <- err
547                 for _, t := range pending {
548                         q.Skip(t.Size)
549                         q.wait.Done()
550                 }
551
552                 return retries
553         }
554
555         present, missingResults := q.partitionTransfers(pending)
556
557         go func() {
558                 defer close(retries)
559
560                 var results <-chan TransferResult
561                 if q.dryRun {
562                         results = q.makeDryRunResults(present)
563                 } else {
564                         results = q.adapter.Add(present...)
565                 }
566
567                 for _, res := range missingResults {
568                         q.handleTransferResult(res, retries)
569                 }
570                 for res := range results {
571                         q.handleTransferResult(res, retries)
572                 }
573         }()
574
575         return retries
576 }
577
578 func (q *TransferQueue) partitionTransfers(transfers []*Transfer) (present []*Transfer, results []TransferResult) {
579         if q.direction != Upload {
580                 return transfers, nil
581         }
582
583         present = make([]*Transfer, 0, len(transfers))
584         results = make([]TransferResult, 0, len(transfers))
585
586         for _, t := range transfers {
587                 var err error
588
589                 if t.Size < 0 {
590                         err = errors.Errorf("Git LFS: object %q has invalid size (got: %d)", t.Oid, t.Size)
591                 } else {
592                         fd, serr := os.Stat(t.Path)
593                         if serr != nil {
594                                 if os.IsNotExist(serr) {
595                                         err = newObjectMissingError(t.Name, t.Oid)
596                                 } else {
597                                         err = serr
598                                 }
599                         } else if t.Size != fd.Size() {
600                                 err = newCorruptObjectError(t.Name, t.Oid)
601                         }
602                 }
603
604                 if err != nil {
605                         results = append(results, TransferResult{
606                                 Transfer: t,
607                                 Error:    err,
608                         })
609                 } else {
610                         present = append(present, t)
611                 }
612         }
613
614         return
615 }
616
617 // makeDryRunResults returns a channel populated immediately with "successful"
618 // results for all of the given transfers in "ts".
619 func (q *TransferQueue) makeDryRunResults(ts []*Transfer) <-chan TransferResult {
620         results := make(chan TransferResult, len(ts))
621         for _, t := range ts {
622                 results <- TransferResult{t, nil}
623         }
624
625         close(results)
626
627         return results
628 }
629
630 // handleTransferResult observes the transfer result, sending it on the retries
631 // channel if it was able to be retried.
632 func (q *TransferQueue) handleTransferResult(
633         res TransferResult, retries chan<- *objectTuple,
634 ) {
635         oid := res.Transfer.Oid
636
637         if res.Error != nil {
638                 // If there was an error encountered when processing the
639                 // transfer (res.Transfer), handle the error as is appropriate:
640
641                 if q.canRetryObject(oid, res.Error) {
642                         // If the object can be retried, send it on the retries
643                         // channel, where it will be read at the call-site and
644                         // its retry count will be incremented.
645                         tracerx.Printf("tq: retrying object %s: %s", oid, res.Error)
646
647                         q.trMutex.Lock()
648                         objects, ok := q.transfers[oid]
649                         q.trMutex.Unlock()
650
651                         if ok {
652                                 retries <- objects.First()
653                         } else {
654                                 q.errorc <- res.Error
655                         }
656                 } else {
657                         // If the error wasn't retriable, OR the object has
658                         // exceeded its retry budget, it will be NOT be sent to
659                         // the retry channel, and the error will be reported
660                         // immediately (unless the error is in response to a
661                         // HTTP 422).
662                         if errors.IsUnprocessableEntityError(res.Error) {
663                                 q.unsupportedContentType = true
664                         } else {
665                                 q.errorc <- res.Error
666                         }
667                         q.wait.Done()
668                 }
669         } else {
670                 q.trMutex.Lock()
671                 objects := q.transfers[oid]
672                 objects.completed = true
673
674                 // Otherwise, if the transfer was successful, notify all of the
675                 // watchers, and mark it as finished.
676                 for _, c := range q.watchers {
677                         // Send one update for each transfer with the
678                         // same OID.
679                         for _, t := range objects.All() {
680                                 c <- &Transfer{
681                                         Name: t.Name,
682                                         Path: t.Path,
683                                         Oid:  t.Oid,
684                                         Size: t.Size,
685                                 }
686                         }
687                 }
688
689                 q.trMutex.Unlock()
690
691                 q.meter.FinishTransfer(res.Transfer.Name)
692                 q.wait.Done()
693         }
694 }
695
696 func (q *TransferQueue) useAdapter(name string) {
697         q.adapterInitMutex.Lock()
698         defer q.adapterInitMutex.Unlock()
699
700         if q.adapter != nil {
701                 if q.adapter.Name() == name {
702                         // re-use, this is the normal path
703                         return
704                 }
705                 // If the adapter we're using isn't the same as the one we've been
706                 // told to use now, must wait for the current one to finish then switch
707                 // This will probably never happen but is just in case server starts
708                 // changing adapter support in between batches
709                 q.finishAdapter()
710         }
711         q.adapter = q.manifest.NewAdapterOrDefault(name, q.direction)
712 }
713
714 func (q *TransferQueue) finishAdapter() {
715         if q.adapterInProgress {
716                 q.adapter.End()
717                 q.adapterInProgress = false
718                 q.adapter = nil
719         }
720 }
721
722 // BatchSize returns the batch size of the receiving *TransferQueue, or, the
723 // number of transfers to accept before beginning work on them.
724 func (q *TransferQueue) BatchSize() int {
725         return q.batchSize
726 }
727
728 func (q *TransferQueue) Skip(size int64) {
729         q.meter.Skip(size)
730 }
731
732 func (q *TransferQueue) ensureAdapterBegun(e lfsapi.Endpoint) error {
733         q.adapterInitMutex.Lock()
734         defer q.adapterInitMutex.Unlock()
735
736         if q.adapterInProgress {
737                 return nil
738         }
739
740         // Progress callback - receives byte updates
741         cb := func(name string, total, read int64, current int) error {
742                 q.meter.TransferBytes(q.direction.String(), name, read, total, current)
743                 if q.cb != nil {
744                         // NOTE: this is the mechanism by which the logpath
745                         // specified by GIT_LFS_PROGRESS is written to.
746                         //
747                         // See: lfs.downloadFile() for more.
748                         q.cb(total, read, current)
749                 }
750                 return nil
751         }
752
753         tracerx.Printf("tq: starting transfer adapter %q", q.adapter.Name())
754         err := q.adapter.Begin(q.toAdapterCfg(e), cb)
755         if err != nil {
756                 return err
757         }
758         q.adapterInProgress = true
759
760         return nil
761 }
762
763 func (q *TransferQueue) toAdapterCfg(e lfsapi.Endpoint) AdapterConfig {
764         apiClient := q.manifest.APIClient()
765         concurrency := q.manifest.ConcurrentTransfers()
766         if apiClient.Endpoints.AccessFor(e.Url) == lfsapi.NTLMAccess {
767                 concurrency = 1
768         }
769
770         return &adapterConfig{
771                 concurrentTransfers: concurrency,
772                 apiClient:           apiClient,
773                 remote:              q.remote,
774         }
775 }
776
777 var (
778         // contentTypeWarning is the message printed when a server returns an
779         // HTTP 422 at the end of a push.
780         contentTypeWarning = []string{
781                 "Uploading failed due to unsupported Content-Type header(s).",
782                 "Consider disabling Content-Type detection with:",
783                 "",
784                 "  $ git config lfs.contenttype false",
785         }
786 )
787
788 // Wait waits for the queue to finish processing all transfers. Once Wait is
789 // called, Add will no longer add transfers to the queue. Any failed
790 // transfers will be automatically retried once.
791 func (q *TransferQueue) Wait() {
792         close(q.incoming)
793
794         q.wait.Wait()
795         q.collectorWait.Wait()
796
797         q.finishAdapter()
798         close(q.errorc)
799
800         for _, watcher := range q.watchers {
801                 close(watcher)
802         }
803
804         q.meter.Flush()
805         q.errorwait.Wait()
806
807         if q.unsupportedContentType {
808                 for _, line := range contentTypeWarning {
809                         fmt.Fprintf(os.Stderr, "info: %s\n", line)
810                 }
811         }
812 }
813
814 // Watch returns a channel where the queue will write the value of each transfer
815 // as it completes. If multiple transfers exist with the same OID, they will all
816 // be recorded here, even though only one actual transfer took place. The
817 // channel will be closed when the queue finishes processing.
818 func (q *TransferQueue) Watch() chan *Transfer {
819         c := make(chan *Transfer, q.batchSize)
820         q.watchers = append(q.watchers, c)
821         return c
822 }
823
824 // This goroutine collects errors returned from transfers
825 func (q *TransferQueue) errorCollector() {
826         for err := range q.errorc {
827                 q.errors = append(q.errors, err)
828         }
829         q.errorwait.Done()
830 }
831
832 // run begins the transfer queue. It transfers files sequentially or
833 // concurrently depending on the Config.ConcurrentTransfers() value.
834 func (q *TransferQueue) run() {
835         tracerx.Printf("tq: running as batched queue, batch size of %d", q.batchSize)
836
837         go q.errorCollector()
838         go q.collectBatches()
839 }
840
841 // canRetry returns whether or not the given error "err" is retriable.
842 func (q *TransferQueue) canRetry(err error) bool {
843         return errors.IsRetriableError(err)
844 }
845
846 // canRetryObject returns whether the given error is retriable for the object
847 // given by "oid". If the an OID has met its retry limit, then it will not be
848 // able to be retried again. If so, canRetryObject returns whether or not that
849 // given error "err" is retriable.
850 func (q *TransferQueue) canRetryObject(oid string, err error) bool {
851         if count, ok := q.rc.CanRetry(oid); !ok {
852                 tracerx.Printf("tq: refusing to retry %q, too many retries (%d)", oid, count)
853                 return false
854         }
855
856         return q.canRetry(err)
857 }
858
859 // Errors returns any errors encountered during transfer.
860 func (q *TransferQueue) Errors() []error {
861         return q.errors
862 }