9 "github.com/git-lfs/git-lfs/errors"
10 "github.com/git-lfs/git-lfs/git"
11 "github.com/git-lfs/git-lfs/lfsapi"
12 "github.com/git-lfs/git-lfs/tools"
13 "github.com/rubyist/tracerx"
17 defaultBatchSize = 100
20 type retryCounter struct {
21 MaxRetries int `git:"lfs.transfer.maxretries"`
25 // count maps OIDs to number of retry attempts
29 // newRetryCounter instantiates a new *retryCounter. It parses the gitconfig
30 // value: `lfs.transfer.maxretries`, and falls back to defaultMaxRetries if none
33 // If it encountered an error in Unmarshaling the *config.Configuration, it will
34 // be returned, otherwise nil.
35 func newRetryCounter() *retryCounter {
37 MaxRetries: defaultMaxRetries,
38 count: make(map[string]int),
42 // Increment increments the number of retries for a given OID. It is safe to
43 // call across multiple goroutines.
44 func (r *retryCounter) Increment(oid string) {
51 // CountFor returns the current number of retries for a given OID. It is safe to
52 // call across multiple goroutines.
53 func (r *retryCounter) CountFor(oid string) int {
60 // CanRetry returns the current number of retries, and whether or not it exceeds
61 // the maximum number of retries (see: retryCounter.MaxRetries).
62 func (r *retryCounter) CanRetry(oid string) (int, bool) {
63 count := r.CountFor(oid)
64 return count, count < r.MaxRetries
67 // batch implements the sort.Interface interface and enables sorting on a slice
68 // of `*Transfer`s by object size.
70 // This interface is implemented here so that the largest objects can be
71 // processed first. Since adding a new batch is unable to occur until the
72 // current batch has finished processing, this enables us to reduce the risk of
73 // a single worker getting tied up on a large item at the end of a batch while
74 // all other workers are sitting idle.
75 type batch []*objectTuple
77 // Concat concatenates two batches together, returning a single, clamped batch as
78 // "left", and the remainder of elements as "right". If the union of the
79 // receiver and "other" has cardinality less than "size", "right" will be
81 func (b batch) Concat(other batch, size int) (left, right batch) {
82 u := batch(append(b, other...))
86 return u[:size], u[size:]
89 func (b batch) ToTransfers() []*Transfer {
90 transfers := make([]*Transfer, 0, len(b))
92 transfers = append(transfers, &Transfer{Oid: t.Oid, Size: t.Size})
97 func (b batch) Len() int { return len(b) }
98 func (b batch) Less(i, j int) bool { return b[i].Size < b[j].Size }
99 func (b batch) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
101 // TransferQueue organises the wider process of uploading and downloading,
102 // including calling the API, passing the actual transfer request to transfer
103 // adapters, and dealing with progress, errors and retries.
104 type TransferQueue struct {
110 adapterInProgress bool
111 adapterInitMutex sync.Mutex
113 cb tools.CopyCallback
116 transfers map[string]*objects
119 incoming chan *objectTuple // Channel for processing incoming items
120 errorc chan error // Channel for processing errors
121 watchers []chan *Transfer
123 collectorWait sync.WaitGroup
124 errorwait sync.WaitGroup
125 // wait is used to keep track of pending transfers. It is incremented
126 // once per unique OID on Add(), and is decremented when that transfer
127 // is marked as completed or failed, but not retried.
132 // unsupportedContentType indicates whether the transfer queue ever saw
133 // an HTTP 422 response indicating that their upload destination does
134 // not support Content-Type detection.
135 unsupportedContentType bool
138 // objects holds a set of objects.
139 type objects struct {
141 objects []*objectTuple
144 // All returns all *objectTuple's contained in the *objects set.
145 func (s *objects) All() []*objectTuple {
149 // Append returns a new *objects with the given *objectTuple(s) appended to the
150 // end of the known objects.
151 func (s *objects) Append(os ...*objectTuple) *objects {
153 completed: s.completed,
154 objects: append(s.objects, os...),
158 // First returns the first *objectTuple in the chain of objects.
159 func (s *objects) First() *objectTuple {
160 if len(s.objects) == 0 {
166 type objectTuple struct {
167 Name, Path, Oid string
171 func (o *objectTuple) ToTransfer() *Transfer {
180 type Option func(*TransferQueue)
182 func DryRun(dryRun bool) Option {
183 return func(tq *TransferQueue) {
188 func WithProgress(m *Meter) Option {
189 return func(tq *TransferQueue) {
194 func RemoteRef(ref *git.Ref) Option {
195 return func(tq *TransferQueue) {
200 func WithProgressCallback(cb tools.CopyCallback) Option {
201 return func(tq *TransferQueue) {
206 func WithBatchSize(size int) Option {
207 return func(tq *TransferQueue) { tq.batchSize = size }
210 func WithBufferDepth(depth int) Option {
211 return func(tq *TransferQueue) { tq.bufferDepth = depth }
214 // NewTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter
215 func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options ...Option) *TransferQueue {
218 client: &tqClient{Client: manifest.APIClient()},
220 errorc: make(chan error),
221 transfers: make(map[string]*objects),
222 trMutex: &sync.Mutex{},
224 rc: newRetryCounter(),
227 for _, opt := range options {
231 q.rc.MaxRetries = q.manifest.maxRetries
232 q.client.MaxRetries = q.manifest.maxRetries
234 if q.batchSize <= 0 {
235 q.batchSize = defaultBatchSize
237 if q.bufferDepth <= 0 {
238 q.bufferDepth = q.batchSize
241 q.meter.Direction = q.direction
244 q.incoming = make(chan *objectTuple, q.bufferDepth)
245 q.collectorWait.Add(1)
252 // Add adds a *Transfer to the transfer queue. It only increments the amount
253 // of waiting the TransferQueue has to do if the *Transfer "t" is new.
255 // If another transfer(s) with the same OID has been added to the *TransferQueue
256 // already, the given transfer will not be enqueued, but will be sent to any
257 // channel created by Watch() once the oldest transfer has completed.
259 // Only one file will be transferred to/from the Path element of the first
261 func (q *TransferQueue) Add(name, path, oid string, size int64) {
269 if objs := q.remember(t); len(objs.objects) > 1 {
271 // If there is already a completed transfer chain for
272 // this OID, then this object is already "done", and can
273 // be sent through as completed to the watchers.
274 for _, w := range q.watchers {
279 // If the chain is not done, there is no reason to enqueue this
280 // transfer into 'q.incoming'.
281 tracerx.Printf("already transferring %q, skipping duplicate", t.Oid)
288 // remember remembers the *Transfer "t" if the *TransferQueue doesn't already
289 // know about a Transfer with the same OID.
291 // It returns if the value is new or not.
292 func (q *TransferQueue) remember(t *objectTuple) objects {
294 defer q.trMutex.Unlock()
296 if _, ok := q.transfers[t.Oid]; !ok {
298 q.transfers[t.Oid] = &objects{
299 objects: []*objectTuple{t},
302 return *q.transfers[t.Oid]
305 q.transfers[t.Oid] = q.transfers[t.Oid].Append(t)
307 return *q.transfers[t.Oid]
310 // collectBatches collects batches in a loop, prioritizing failed items from the
311 // previous before adding new items. The process works as follows:
313 // 1. Create a new batch, of size `q.batchSize`, and containing no items
314 // 2. While the batch contains less items than `q.batchSize` AND the channel
315 // is open, read one item from the `q.incoming` channel.
316 // a. If the read was a channel close, go to step 4.
317 // b. If the read was a transferable item, go to step 3.
318 // 3. Append the item to the batch.
319 // 4. Sort the batch by descending object size, make a batch API call, send
320 // the items to the `*adapterBase`.
321 // 5. In a separate goroutine, process the worker results, incrementing and
322 // appending retries if possible. On the main goroutine, accept new items
324 // 6. Concat() the "next" and "pending" batches such that no more items than
325 // the maximum allowed per batch are in next, and the rest are in pending.
326 // 7. If the `q.incoming` channel is open, go to step 2.
327 // 8. If the next batch is empty AND the `q.incoming` channel is closed,
328 // terminate immediately.
330 // collectBatches runs in its own goroutine.
331 func (q *TransferQueue) collectBatches() {
332 defer q.collectorWait.Done()
335 next := q.makeBatch()
336 pending := q.makeBatch()
339 for !closing && (len(next) < q.batchSize) {
340 t, ok := <-q.incoming
346 next = append(next, t)
349 // Before enqueuing the next batch, sort by descending object
351 sort.Sort(sort.Reverse(next))
353 done := make(chan struct{})
360 retries, err = q.enqueueAndCollectRetriesFor(next)
369 collected, closing = q.collectPendingUntil(done)
371 // Ensure the next batch is filled with, in order:
373 // - retries from the previous batch,
374 // - new additions that were enqueued behind retries, &
375 // - items collected while the batch was processing.
376 next, pending = retries.Concat(append(pending, collected...), q.batchSize)
378 if closing && len(next) == 0 {
379 // If len(next) == 0, there are no items in "pending",
380 // and it is safe to exit.
386 // collectPendingUntil collects items from q.incoming into a "pending" batch
387 // until the given "done" channel is written to, or is closed.
389 // A "pending" batch is returned, along with whether or not "q.incoming" is
391 func (q *TransferQueue) collectPendingUntil(done <-chan struct{}) (pending batch, closing bool) {
394 case t, ok := <-q.incoming:
401 pending = append(pending, t)
408 // enqueueAndCollectRetriesFor makes a Batch API call and returns a "next" batch
409 // containing all of the objects that failed from the previous batch and had
410 // retries availale to them.
412 // If an error was encountered while making the API request, _all_ of the items
413 // from the previous batch (that have retries available to them) will be
414 // returned immediately, along with the error that was encountered.
416 // enqueueAndCollectRetriesFor blocks until the entire Batch "batch" has been
418 func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error) {
419 next := q.makeBatch()
420 tracerx.Printf("tq: sending batch of size %d", len(batch))
423 var bRes *BatchResponse
424 if q.manifest.standaloneTransferAgent != "" {
425 // Trust the external transfer agent can do everything by itself.
426 objects := make([]*Transfer, 0, len(batch))
427 for _, t := range batch {
428 objects = append(objects, &Transfer{Oid: t.Oid, Size: t.Size, Path: t.Path})
430 bRes = &BatchResponse{
432 TransferAdapterName: q.manifest.standaloneTransferAgent,
435 // Query the Git LFS server for what transfer method to use and
436 // details such as URLs, authentication, etc.
438 bRes, err = Batch(q.manifest, q.direction, q.remote, q.ref, batch.ToTransfers())
440 // If there was an error making the batch API call, mark all of
441 // the objects for retry, and return them along with the error
442 // that was encountered. If any of the objects couldn't be
443 // retried, they will be marked as failed.
444 for _, t := range batch {
445 if q.canRetryObject(t.Oid, err) {
446 q.rc.Increment(t.Oid)
448 next = append(next, t)
458 if len(bRes.Objects) == 0 {
462 q.useAdapter(bRes.TransferAdapterName)
465 toTransfer := make([]*Transfer, 0, len(bRes.Objects))
467 for _, o := range bRes.Objects {
469 q.errorc <- errors.Wrapf(o.Error, "[%v] %v", o.Oid, o.Error.Message)
477 objects, ok := q.transfers[o.Oid]
480 // If we couldn't find any associated
481 // Transfer object, then we give up on the
482 // transfer by telling the progress meter to
483 // skip the number of bytes in "o".
484 q.errorc <- errors.Errorf("[%v] The server returned an unknown OID.", o.Oid)
489 // Pick t[0], since it will cover all transfers with the
491 tr := newTransfer(o, objects.First().Name, objects.First().Path)
493 if a, err := tr.Rel(q.direction.String()); err != nil {
494 // XXX(taylor): duplication
495 if q.canRetryObject(tr.Oid, err) {
496 q.rc.Increment(tr.Oid)
497 count := q.rc.CountFor(tr.Oid)
499 tracerx.Printf("tq: enqueue retry #%d for %q (size: %d): %s", count, tr.Oid, tr.Size, err)
500 next = append(next, objects.First())
502 q.errorc <- errors.Errorf("[%v] %v", tr.Name, err)
507 } else if a == nil && q.manifest.standaloneTransferAgent == "" {
511 q.meter.StartTransfer(objects.First().Name)
512 toTransfer = append(toTransfer, tr)
517 retries := q.addToAdapter(bRes.endpoint, toTransfer)
518 for t := range retries {
519 q.rc.Increment(t.Oid)
520 count := q.rc.CountFor(t.Oid)
522 tracerx.Printf("tq: enqueue retry #%d for %q (size: %d)", count, t.Oid, t.Size)
524 next = append(next, t)
530 // makeBatch returns a new, empty batch, with a capacity equal to the maximum
531 // batch size designated by the `*TransferQueue`.
532 func (q *TransferQueue) makeBatch() batch { return make(batch, 0, q.batchSize) }
534 // addToAdapter adds the given "pending" transfers to the transfer adapters and
535 // returns a channel of Transfers that are to be retried in the next batch.
536 // After all of the items in the batch have been processed, the channel is
539 // addToAdapter returns immediately, and does not block.
540 func (q *TransferQueue) addToAdapter(e lfsapi.Endpoint, pending []*Transfer) <-chan *objectTuple {
541 retries := make(chan *objectTuple, len(pending))
543 if err := q.ensureAdapterBegun(e); err != nil {
547 for _, t := range pending {
555 present, missingResults := q.partitionTransfers(pending)
560 var results <-chan TransferResult
562 results = q.makeDryRunResults(present)
564 results = q.adapter.Add(present...)
567 for _, res := range missingResults {
568 q.handleTransferResult(res, retries)
570 for res := range results {
571 q.handleTransferResult(res, retries)
578 func (q *TransferQueue) partitionTransfers(transfers []*Transfer) (present []*Transfer, results []TransferResult) {
579 if q.direction != Upload {
580 return transfers, nil
583 present = make([]*Transfer, 0, len(transfers))
584 results = make([]TransferResult, 0, len(transfers))
586 for _, t := range transfers {
590 err = errors.Errorf("Git LFS: object %q has invalid size (got: %d)", t.Oid, t.Size)
592 fd, serr := os.Stat(t.Path)
594 if os.IsNotExist(serr) {
595 err = newObjectMissingError(t.Name, t.Oid)
599 } else if t.Size != fd.Size() {
600 err = newCorruptObjectError(t.Name, t.Oid)
605 results = append(results, TransferResult{
610 present = append(present, t)
617 // makeDryRunResults returns a channel populated immediately with "successful"
618 // results for all of the given transfers in "ts".
619 func (q *TransferQueue) makeDryRunResults(ts []*Transfer) <-chan TransferResult {
620 results := make(chan TransferResult, len(ts))
621 for _, t := range ts {
622 results <- TransferResult{t, nil}
630 // handleTransferResult observes the transfer result, sending it on the retries
631 // channel if it was able to be retried.
632 func (q *TransferQueue) handleTransferResult(
633 res TransferResult, retries chan<- *objectTuple,
635 oid := res.Transfer.Oid
637 if res.Error != nil {
638 // If there was an error encountered when processing the
639 // transfer (res.Transfer), handle the error as is appropriate:
641 if q.canRetryObject(oid, res.Error) {
642 // If the object can be retried, send it on the retries
643 // channel, where it will be read at the call-site and
644 // its retry count will be incremented.
645 tracerx.Printf("tq: retrying object %s: %s", oid, res.Error)
648 objects, ok := q.transfers[oid]
652 retries <- objects.First()
654 q.errorc <- res.Error
657 // If the error wasn't retriable, OR the object has
658 // exceeded its retry budget, it will be NOT be sent to
659 // the retry channel, and the error will be reported
660 // immediately (unless the error is in response to a
662 if errors.IsUnprocessableEntityError(res.Error) {
663 q.unsupportedContentType = true
665 q.errorc <- res.Error
671 objects := q.transfers[oid]
672 objects.completed = true
674 // Otherwise, if the transfer was successful, notify all of the
675 // watchers, and mark it as finished.
676 for _, c := range q.watchers {
677 // Send one update for each transfer with the
679 for _, t := range objects.All() {
691 q.meter.FinishTransfer(res.Transfer.Name)
696 func (q *TransferQueue) useAdapter(name string) {
697 q.adapterInitMutex.Lock()
698 defer q.adapterInitMutex.Unlock()
700 if q.adapter != nil {
701 if q.adapter.Name() == name {
702 // re-use, this is the normal path
705 // If the adapter we're using isn't the same as the one we've been
706 // told to use now, must wait for the current one to finish then switch
707 // This will probably never happen but is just in case server starts
708 // changing adapter support in between batches
711 q.adapter = q.manifest.NewAdapterOrDefault(name, q.direction)
714 func (q *TransferQueue) finishAdapter() {
715 if q.adapterInProgress {
717 q.adapterInProgress = false
722 // BatchSize returns the batch size of the receiving *TransferQueue, or, the
723 // number of transfers to accept before beginning work on them.
724 func (q *TransferQueue) BatchSize() int {
728 func (q *TransferQueue) Skip(size int64) {
732 func (q *TransferQueue) ensureAdapterBegun(e lfsapi.Endpoint) error {
733 q.adapterInitMutex.Lock()
734 defer q.adapterInitMutex.Unlock()
736 if q.adapterInProgress {
740 // Progress callback - receives byte updates
741 cb := func(name string, total, read int64, current int) error {
742 q.meter.TransferBytes(q.direction.String(), name, read, total, current)
744 // NOTE: this is the mechanism by which the logpath
745 // specified by GIT_LFS_PROGRESS is written to.
747 // See: lfs.downloadFile() for more.
748 q.cb(total, read, current)
753 tracerx.Printf("tq: starting transfer adapter %q", q.adapter.Name())
754 err := q.adapter.Begin(q.toAdapterCfg(e), cb)
758 q.adapterInProgress = true
763 func (q *TransferQueue) toAdapterCfg(e lfsapi.Endpoint) AdapterConfig {
764 apiClient := q.manifest.APIClient()
765 concurrency := q.manifest.ConcurrentTransfers()
766 if apiClient.Endpoints.AccessFor(e.Url) == lfsapi.NTLMAccess {
770 return &adapterConfig{
771 concurrentTransfers: concurrency,
772 apiClient: apiClient,
778 // contentTypeWarning is the message printed when a server returns an
779 // HTTP 422 at the end of a push.
780 contentTypeWarning = []string{
781 "Uploading failed due to unsupported Content-Type header(s).",
782 "Consider disabling Content-Type detection with:",
784 " $ git config lfs.contenttype false",
788 // Wait waits for the queue to finish processing all transfers. Once Wait is
789 // called, Add will no longer add transfers to the queue. Any failed
790 // transfers will be automatically retried once.
791 func (q *TransferQueue) Wait() {
795 q.collectorWait.Wait()
800 for _, watcher := range q.watchers {
807 if q.unsupportedContentType {
808 for _, line := range contentTypeWarning {
809 fmt.Fprintf(os.Stderr, "info: %s\n", line)
814 // Watch returns a channel where the queue will write the value of each transfer
815 // as it completes. If multiple transfers exist with the same OID, they will all
816 // be recorded here, even though only one actual transfer took place. The
817 // channel will be closed when the queue finishes processing.
818 func (q *TransferQueue) Watch() chan *Transfer {
819 c := make(chan *Transfer, q.batchSize)
820 q.watchers = append(q.watchers, c)
824 // This goroutine collects errors returned from transfers
825 func (q *TransferQueue) errorCollector() {
826 for err := range q.errorc {
827 q.errors = append(q.errors, err)
832 // run begins the transfer queue. It transfers files sequentially or
833 // concurrently depending on the Config.ConcurrentTransfers() value.
834 func (q *TransferQueue) run() {
835 tracerx.Printf("tq: running as batched queue, batch size of %d", q.batchSize)
837 go q.errorCollector()
838 go q.collectBatches()
841 // canRetry returns whether or not the given error "err" is retriable.
842 func (q *TransferQueue) canRetry(err error) bool {
843 return errors.IsRetriableError(err)
846 // canRetryObject returns whether the given error is retriable for the object
847 // given by "oid". If the an OID has met its retry limit, then it will not be
848 // able to be retried again. If so, canRetryObject returns whether or not that
849 // given error "err" is retriable.
850 func (q *TransferQueue) canRetryObject(oid string, err error) bool {
851 if count, ok := q.rc.CanRetry(oid); !ok {
852 tracerx.Printf("tq: refusing to retry %q, too many retries (%d)", oid, count)
856 return q.canRetry(err)
859 // Errors returns any errors encountered during transfer.
860 func (q *TransferQueue) Errors() []error {