Imported Upstream version 2.5.0
[scm/test.git] / commands / uploader.go
1 package commands
2
3 import (
4         "fmt"
5         "io"
6         "io/ioutil"
7         "net/url"
8         "os"
9         "path/filepath"
10         "strings"
11         "sync"
12
13         "github.com/git-lfs/git-lfs/errors"
14         "github.com/git-lfs/git-lfs/git"
15         "github.com/git-lfs/git-lfs/lfs"
16         "github.com/git-lfs/git-lfs/tasklog"
17         "github.com/git-lfs/git-lfs/tools"
18         "github.com/git-lfs/git-lfs/tq"
19         "github.com/rubyist/tracerx"
20 )
21
22 func uploadForRefUpdates(ctx *uploadContext, updates []*git.RefUpdate, pushAll bool) error {
23         gitscanner, err := ctx.buildGitScanner()
24         if err != nil {
25                 return err
26         }
27
28         defer func() {
29                 gitscanner.Close()
30                 ctx.ReportErrors()
31         }()
32
33         verifyLocksForUpdates(ctx.lockVerifier, updates)
34         for _, update := range updates {
35                 // initialized here to prevent looped defer
36                 q := ctx.NewQueue(
37                         tq.RemoteRef(update.Right()),
38                 )
39                 err := uploadLeftOrAll(gitscanner, ctx, q, update, pushAll)
40                 ctx.CollectErrors(q)
41
42                 if err != nil {
43                         return errors.Wrap(err, fmt.Sprintf("ref %s:", update.Left().Name))
44                 }
45         }
46
47         return nil
48 }
49
50 func uploadLeftOrAll(g *lfs.GitScanner, ctx *uploadContext, q *tq.TransferQueue, update *git.RefUpdate, pushAll bool) error {
51         cb := ctx.gitScannerCallback(q)
52         if pushAll {
53                 if err := g.ScanRefWithDeleted(update.LeftCommitish(), cb); err != nil {
54                         return err
55                 }
56         } else {
57                 if err := g.ScanLeftToRemote(update.LeftCommitish(), cb); err != nil {
58                         return err
59                 }
60         }
61         return ctx.scannerError()
62 }
63
64 type uploadContext struct {
65         Remote       string
66         DryRun       bool
67         Manifest     *tq.Manifest
68         uploadedOids tools.StringSet
69         gitfilter    *lfs.GitFilter
70
71         logger *tasklog.Logger
72         meter  *tq.Meter
73
74         committerName  string
75         committerEmail string
76
77         lockVerifier *lockVerifier
78
79         // allowMissing specifies whether pushes containing missing/corrupt
80         // pointers should allow pushing Git blobs
81         allowMissing bool
82
83         // tracks errors from gitscanner callbacks
84         scannerErr error
85         errMu      sync.Mutex
86
87         // filename => oid
88         missing   map[string]string
89         corrupt   map[string]string
90         otherErrs []error
91 }
92
93 func newUploadContext(dryRun bool) *uploadContext {
94         remote := cfg.PushRemote()
95         manifest := getTransferManifestOperationRemote("upload", remote)
96         ctx := &uploadContext{
97                 Remote:       remote,
98                 Manifest:     manifest,
99                 DryRun:       dryRun,
100                 uploadedOids: tools.NewStringSet(),
101                 gitfilter:    lfs.NewGitFilter(cfg),
102                 lockVerifier: newLockVerifier(manifest),
103                 allowMissing: cfg.Git.Bool("lfs.allowincompletepush", false),
104                 missing:      make(map[string]string),
105                 corrupt:      make(map[string]string),
106                 otherErrs:    make([]error, 0),
107         }
108
109         var sink io.Writer = os.Stdout
110         if dryRun {
111                 sink = ioutil.Discard
112         }
113
114         ctx.logger = tasklog.NewLogger(sink)
115         ctx.meter = buildProgressMeter(ctx.DryRun, tq.Upload)
116         ctx.logger.Enqueue(ctx.meter)
117         ctx.committerName, ctx.committerEmail = cfg.CurrentCommitter()
118         return ctx
119 }
120
121 func (c *uploadContext) NewQueue(options ...tq.Option) *tq.TransferQueue {
122         return tq.NewTransferQueue(tq.Upload, c.Manifest, c.Remote, append(options,
123                 tq.DryRun(c.DryRun),
124                 tq.WithProgress(c.meter),
125         )...)
126 }
127
128 func (c *uploadContext) scannerError() error {
129         c.errMu.Lock()
130         defer c.errMu.Unlock()
131
132         return c.scannerErr
133 }
134
135 func (c *uploadContext) addScannerError(err error) {
136         c.errMu.Lock()
137         defer c.errMu.Unlock()
138
139         if c.scannerErr != nil {
140                 c.scannerErr = fmt.Errorf("%v\n%v", c.scannerErr, err)
141         } else {
142                 c.scannerErr = err
143         }
144 }
145
146 func (c *uploadContext) buildGitScanner() (*lfs.GitScanner, error) {
147         gitscanner := lfs.NewGitScanner(nil)
148         gitscanner.FoundLockable = func(n string) { c.lockVerifier.LockedByThem(n) }
149         gitscanner.PotentialLockables = c.lockVerifier
150         return gitscanner, gitscanner.RemoteForPush(c.Remote)
151 }
152
153 func (c *uploadContext) gitScannerCallback(tqueue *tq.TransferQueue) func(*lfs.WrappedPointer, error) {
154         return func(p *lfs.WrappedPointer, err error) {
155                 if err != nil {
156                         c.addScannerError(err)
157                 } else {
158                         c.UploadPointers(tqueue, p)
159                 }
160         }
161 }
162
163 // AddUpload adds the given oid to the set of oids that have been uploaded in
164 // the current process.
165 func (c *uploadContext) SetUploaded(oid string) {
166         c.uploadedOids.Add(oid)
167 }
168
169 // HasUploaded determines if the given oid has already been uploaded in the
170 // current process.
171 func (c *uploadContext) HasUploaded(oid string) bool {
172         return c.uploadedOids.Contains(oid)
173 }
174
175 func (c *uploadContext) prepareUpload(unfiltered ...*lfs.WrappedPointer) []*lfs.WrappedPointer {
176         numUnfiltered := len(unfiltered)
177         uploadables := make([]*lfs.WrappedPointer, 0, numUnfiltered)
178
179         // XXX(taylor): temporary measure to fix duplicate (broken) results from
180         // scanner
181         uniqOids := tools.NewStringSet()
182
183         // separate out objects that _should_ be uploaded, but don't exist in
184         // .git/lfs/objects. Those will skipped if the server already has them.
185         for _, p := range unfiltered {
186                 // object already uploaded in this process, or we've already
187                 // seen this OID (see above), skip!
188                 if uniqOids.Contains(p.Oid) || c.HasUploaded(p.Oid) {
189                         continue
190                 }
191                 uniqOids.Add(p.Oid)
192
193                 // canUpload determines whether the current pointer "p" can be
194                 // uploaded through the TransferQueue below. It is set to false
195                 // only when the file is locked by someone other than the
196                 // current committer.
197                 var canUpload bool = true
198
199                 if c.lockVerifier.LockedByThem(p.Name) {
200                         // If the verification state is enabled, this failed
201                         // locks verification means that the push should fail.
202                         //
203                         // If the state is disabled, the verification error is
204                         // silent and the user can upload.
205                         //
206                         // If the state is undefined, the verification error is
207                         // sent as a warning and the user can upload.
208                         canUpload = !c.lockVerifier.Enabled()
209                 }
210
211                 c.lockVerifier.LockedByUs(p.Name)
212
213                 if canUpload {
214                         // estimate in meter early (even if it's not going into
215                         // uploadables), since we will call Skip() based on the
216                         // results of the download check queue.
217                         c.meter.Add(p.Size)
218
219                         uploadables = append(uploadables, p)
220                 }
221         }
222
223         return uploadables
224 }
225
226 func (c *uploadContext) UploadPointers(q *tq.TransferQueue, unfiltered ...*lfs.WrappedPointer) {
227         if c.DryRun {
228                 for _, p := range unfiltered {
229                         if c.HasUploaded(p.Oid) {
230                                 continue
231                         }
232
233                         Print("push %s => %s", p.Oid, p.Name)
234                         c.SetUploaded(p.Oid)
235                 }
236
237                 return
238         }
239
240         pointers := c.prepareUpload(unfiltered...)
241         for _, p := range pointers {
242                 t, err := c.uploadTransfer(p)
243                 if err != nil && !errors.IsCleanPointerError(err) {
244                         ExitWithError(err)
245                 }
246
247                 q.Add(t.Name, t.Path, t.Oid, t.Size)
248                 c.SetUploaded(p.Oid)
249         }
250 }
251
252 func (c *uploadContext) CollectErrors(tqueue *tq.TransferQueue) {
253         tqueue.Wait()
254
255         for _, err := range tqueue.Errors() {
256                 if malformed, ok := err.(*tq.MalformedObjectError); ok {
257                         if malformed.Missing() {
258                                 c.missing[malformed.Name] = malformed.Oid
259                         } else if malformed.Corrupt() {
260                                 c.corrupt[malformed.Name] = malformed.Oid
261                         }
262                 } else {
263                         c.otherErrs = append(c.otherErrs, err)
264                 }
265         }
266 }
267
268 func (c *uploadContext) ReportErrors() {
269         c.meter.Finish()
270
271         for _, err := range c.otherErrs {
272                 FullError(err)
273         }
274
275         if len(c.missing) > 0 || len(c.corrupt) > 0 {
276                 var action string
277                 if c.allowMissing {
278                         action = "missing objects"
279                 } else {
280                         action = "failed"
281                 }
282
283                 Print("LFS upload %s:", action)
284                 for name, oid := range c.missing {
285                         Print("  (missing) %s (%s)", name, oid)
286                 }
287                 for name, oid := range c.corrupt {
288                         Print("  (corrupt) %s (%s)", name, oid)
289                 }
290
291                 if !c.allowMissing {
292                         pushMissingHint := []string{
293                                 "hint: Your push was rejected due to missing or corrupt local objects.",
294                                 "hint: You can disable this check with: 'git config lfs.allowincompletepush true'",
295                         }
296                         Print(strings.Join(pushMissingHint, "\n"))
297                         os.Exit(2)
298                 }
299         }
300
301         if len(c.otherErrs) > 0 {
302                 os.Exit(2)
303         }
304
305         if c.lockVerifier.HasUnownedLocks() {
306                 Print("Unable to push locked files:")
307                 for _, unowned := range c.lockVerifier.UnownedLocks() {
308                         Print("* %s - %s", unowned.Path(), unowned.Owners())
309                 }
310
311                 if c.lockVerifier.Enabled() {
312                         Exit("ERROR: Cannot update locked files.")
313                 } else {
314                         Error("WARNING: The above files would have halted this push.")
315                 }
316         } else if c.lockVerifier.HasOwnedLocks() {
317                 Print("Consider unlocking your own locked files: (`git lfs unlock <path>`)")
318                 for _, owned := range c.lockVerifier.OwnedLocks() {
319                         Print("* %s", owned.Path())
320                 }
321         }
322 }
323
324 var (
325         githubHttps, _ = url.Parse("https://github.com")
326         githubSsh, _   = url.Parse("ssh://github.com")
327
328         // hostsWithKnownLockingSupport is a list of scheme-less hostnames
329         // (without port numbers) that are known to implement the LFS locking
330         // API.
331         //
332         // Additions are welcome.
333         hostsWithKnownLockingSupport = []*url.URL{
334                 githubHttps, githubSsh,
335         }
336 )
337
338 func (c *uploadContext) uploadTransfer(p *lfs.WrappedPointer) (*tq.Transfer, error) {
339         filename := p.Name
340         oid := p.Oid
341
342         localMediaPath, err := c.gitfilter.ObjectPath(oid)
343         if err != nil {
344                 return nil, errors.Wrapf(err, "Error uploading file %s (%s)", filename, oid)
345         }
346
347         if len(filename) > 0 {
348                 if err = c.ensureFile(filename, localMediaPath); err != nil && !errors.IsCleanPointerError(err) {
349                         return nil, err
350                 }
351         }
352
353         return &tq.Transfer{
354                 Name: filename,
355                 Path: localMediaPath,
356                 Oid:  oid,
357                 Size: p.Size,
358         }, nil
359 }
360
361 // ensureFile makes sure that the cleanPath exists before pushing it.  If it
362 // does not exist, it attempts to clean it by reading the file at smudgePath.
363 func (c *uploadContext) ensureFile(smudgePath, cleanPath string) error {
364         if _, err := os.Stat(cleanPath); err == nil {
365                 return nil
366         }
367
368         localPath := filepath.Join(cfg.LocalWorkingDir(), smudgePath)
369         file, err := os.Open(localPath)
370         if err != nil {
371                 if c.allowMissing {
372                         return nil
373                 }
374                 return err
375         }
376
377         defer file.Close()
378
379         stat, err := file.Stat()
380         if err != nil {
381                 return err
382         }
383
384         cleaned, err := c.gitfilter.Clean(file, file.Name(), stat.Size(), nil)
385         if cleaned != nil {
386                 cleaned.Teardown()
387         }
388
389         if err != nil {
390                 return err
391         }
392         return nil
393 }
394
395 // supportsLockingAPI returns whether or not a given url is known to support
396 // the LFS locking API by whether or not its hostname is included in the list
397 // above.
398 func supportsLockingAPI(rawurl string) bool {
399         u, err := url.Parse(rawurl)
400         if err != nil {
401                 tracerx.Printf("commands: unable to parse %q to determine locking support: %v", rawurl, err)
402                 return false
403         }
404
405         for _, supported := range hostsWithKnownLockingSupport {
406                 if supported.Scheme == u.Scheme &&
407                         supported.Hostname() == u.Hostname() &&
408                         strings.HasPrefix(u.Path, supported.Path) {
409                         return true
410                 }
411         }
412         return false
413 }
414
415 // disableFor disables lock verification for the given lfsapi.Endpoint,
416 // "endpoint".
417 func disableFor(rawurl string) error {
418         tracerx.Printf("commands: disabling lock verification for %q", rawurl)
419
420         key := strings.Join([]string{"lfs", rawurl, "locksverify"}, ".")
421
422         _, err := cfg.SetGitLocalKey(key, "false")
423         return err
424 }