Imported Upstream version 2.3.1
[scm/test.git] / commands / uploader.go
1 package commands
2
3 import (
4         "fmt"
5         "net/url"
6         "os"
7         "path/filepath"
8         "strconv"
9         "strings"
10         "sync"
11
12         "github.com/git-lfs/git-lfs/config"
13         "github.com/git-lfs/git-lfs/errors"
14         "github.com/git-lfs/git-lfs/git"
15         "github.com/git-lfs/git-lfs/lfs"
16         "github.com/git-lfs/git-lfs/lfsapi"
17         "github.com/git-lfs/git-lfs/locking"
18         "github.com/git-lfs/git-lfs/progress"
19         "github.com/git-lfs/git-lfs/tools"
20         "github.com/git-lfs/git-lfs/tq"
21         "github.com/rubyist/tracerx"
22 )
23
24 func uploadLeftOrAll(g *lfs.GitScanner, ctx *uploadContext, ref string) error {
25         if pushAll {
26                 if err := g.ScanRefWithDeleted(ref, nil); err != nil {
27                         return err
28                 }
29         } else {
30                 if err := g.ScanLeftToRemote(ref, nil); err != nil {
31                         return err
32                 }
33         }
34         return ctx.scannerError()
35 }
36
37 type uploadContext struct {
38         Remote       string
39         DryRun       bool
40         Manifest     *tq.Manifest
41         uploadedOids tools.StringSet
42
43         meter progress.Meter
44         tq    *tq.TransferQueue
45
46         committerName  string
47         committerEmail string
48
49         trackedLocksMu *sync.Mutex
50
51         // ALL verifiable locks
52         lockVerifyState verifyState
53         ourLocks        map[string]locking.Lock
54         theirLocks      map[string]locking.Lock
55
56         // locks from ourLocks that were modified in this push
57         ownedLocks []locking.Lock
58
59         // locks from theirLocks that were modified in this push
60         unownedLocks []locking.Lock
61
62         // allowMissing specifies whether pushes containing missing/corrupt
63         // pointers should allow pushing Git blobs
64         allowMissing bool
65
66         // tracks errors from gitscanner callbacks
67         scannerErr error
68         errMu      sync.Mutex
69 }
70
71 // Determines if a filename is lockable. Serves as a wrapper around theirLocks
72 // that implements GitScannerSet.
73 type gitScannerLockables struct {
74         m map[string]locking.Lock
75 }
76
77 func (l *gitScannerLockables) Contains(name string) bool {
78         if l == nil {
79                 return false
80         }
81         _, ok := l.m[name]
82         return ok
83 }
84
85 type verifyState byte
86
87 const (
88         verifyStateUnknown verifyState = iota
89         verifyStateEnabled
90         verifyStateDisabled
91 )
92
93 func newUploadContext(remote string, dryRun bool) *uploadContext {
94         cfg.CurrentRemote = remote
95
96         ctx := &uploadContext{
97                 Remote:         remote,
98                 Manifest:       getTransferManifestOperationRemote("upload", remote),
99                 DryRun:         dryRun,
100                 uploadedOids:   tools.NewStringSet(),
101                 ourLocks:       make(map[string]locking.Lock),
102                 theirLocks:     make(map[string]locking.Lock),
103                 trackedLocksMu: new(sync.Mutex),
104                 allowMissing:   cfg.Git.Bool("lfs.allowincompletepush", true),
105         }
106
107         ctx.meter = buildProgressMeter(ctx.DryRun)
108         ctx.tq = newUploadQueue(ctx.Manifest, ctx.Remote, tq.WithProgress(ctx.meter), tq.DryRun(ctx.DryRun))
109         ctx.committerName, ctx.committerEmail = cfg.CurrentCommitter()
110
111         // Do not check locks for standalone transfer, because there is no LFS
112         // server to ask.
113         if ctx.Manifest.IsStandaloneTransfer() {
114                 ctx.lockVerifyState = verifyStateDisabled
115                 return ctx
116         }
117
118         ourLocks, theirLocks, verifyState := verifyLocks(remote)
119         ctx.lockVerifyState = verifyState
120         for _, l := range theirLocks {
121                 ctx.theirLocks[l.Path] = l
122         }
123         for _, l := range ourLocks {
124                 ctx.ourLocks[l.Path] = l
125         }
126
127         return ctx
128 }
129
130 func verifyLocks(remote string) (ours, theirs []locking.Lock, st verifyState) {
131         endpoint := getAPIClient().Endpoints.Endpoint("upload", remote)
132         state := getVerifyStateFor(endpoint)
133         if state == verifyStateDisabled {
134                 return
135         }
136
137         lockClient := newLockClient(remote)
138
139         ours, theirs, err := lockClient.VerifiableLocks(0)
140         if err != nil {
141                 if errors.IsNotImplementedError(err) {
142                         disableFor(endpoint)
143                 } else if state == verifyStateUnknown || state == verifyStateEnabled {
144                         if errors.IsAuthError(err) {
145                                 if state == verifyStateUnknown {
146                                         Error("WARNING: Authentication error: %s", err)
147                                 } else if state == verifyStateEnabled {
148                                         Exit("ERROR: Authentication error: %s", err)
149                                 }
150                         } else {
151                                 Print("Remote %q does not support the LFS locking API. Consider disabling it with:", remote)
152                                 Print("  $ git config lfs.%s.locksverify false", endpoint.Url)
153                                 if state == verifyStateEnabled {
154                                         ExitWithError(err)
155                                 }
156                         }
157                 }
158         } else if state == verifyStateUnknown {
159                 Print("Locking support detected on remote %q. Consider enabling it with:", remote)
160                 Print("  $ git config lfs.%s.locksverify true", endpoint.Url)
161         }
162
163         return ours, theirs, state
164 }
165
166 func (c *uploadContext) scannerError() error {
167         c.errMu.Lock()
168         defer c.errMu.Unlock()
169
170         return c.scannerErr
171 }
172
173 func (c *uploadContext) addScannerError(err error) {
174         c.errMu.Lock()
175         defer c.errMu.Unlock()
176
177         if c.scannerErr != nil {
178                 c.scannerErr = fmt.Errorf("%v\n%v", c.scannerErr, err)
179         } else {
180                 c.scannerErr = err
181         }
182 }
183
184 func (c *uploadContext) buildGitScanner() (*lfs.GitScanner, error) {
185         gitscanner := lfs.NewGitScanner(func(p *lfs.WrappedPointer, err error) {
186                 if err != nil {
187                         c.addScannerError(err)
188                 } else {
189                         uploadPointers(c, p)
190                 }
191         })
192
193         gitscanner.FoundLockable = func(name string) {
194                 if lock, ok := c.theirLocks[name]; ok {
195                         c.trackedLocksMu.Lock()
196                         c.unownedLocks = append(c.unownedLocks, lock)
197                         c.trackedLocksMu.Unlock()
198                 }
199         }
200
201         gitscanner.PotentialLockables = &gitScannerLockables{m: c.theirLocks}
202         return gitscanner, gitscanner.RemoteForPush(c.Remote)
203 }
204
205 // AddUpload adds the given oid to the set of oids that have been uploaded in
206 // the current process.
207 func (c *uploadContext) SetUploaded(oid string) {
208         c.uploadedOids.Add(oid)
209 }
210
211 // HasUploaded determines if the given oid has already been uploaded in the
212 // current process.
213 func (c *uploadContext) HasUploaded(oid string) bool {
214         return c.uploadedOids.Contains(oid)
215 }
216
217 func (c *uploadContext) prepareUpload(unfiltered ...*lfs.WrappedPointer) (*tq.TransferQueue, []*lfs.WrappedPointer) {
218         numUnfiltered := len(unfiltered)
219         uploadables := make([]*lfs.WrappedPointer, 0, numUnfiltered)
220
221         // XXX(taylor): temporary measure to fix duplicate (broken) results from
222         // scanner
223         uniqOids := tools.NewStringSet()
224
225         // separate out objects that _should_ be uploaded, but don't exist in
226         // .git/lfs/objects. Those will skipped if the server already has them.
227         for _, p := range unfiltered {
228                 // object already uploaded in this process, or we've already
229                 // seen this OID (see above), skip!
230                 if uniqOids.Contains(p.Oid) || c.HasUploaded(p.Oid) {
231                         continue
232                 }
233                 uniqOids.Add(p.Oid)
234
235                 // canUpload determines whether the current pointer "p" can be
236                 // uploaded through the TransferQueue below. It is set to false
237                 // only when the file is locked by someone other than the
238                 // current committer.
239                 var canUpload bool = true
240
241                 if lock, ok := c.theirLocks[p.Name]; ok {
242                         c.trackedLocksMu.Lock()
243                         c.unownedLocks = append(c.unownedLocks, lock)
244                         c.trackedLocksMu.Unlock()
245
246                         // If the verification state is enabled, this failed
247                         // locks verification means that the push should fail.
248                         //
249                         // If the state is disabled, the verification error is
250                         // silent and the user can upload.
251                         //
252                         // If the state is undefined, the verification error is
253                         // sent as a warning and the user can upload.
254                         canUpload = c.lockVerifyState != verifyStateEnabled
255                 }
256
257                 if lock, ok := c.ourLocks[p.Name]; ok {
258                         c.trackedLocksMu.Lock()
259                         c.ownedLocks = append(c.ownedLocks, lock)
260                         c.trackedLocksMu.Unlock()
261                 }
262
263                 if canUpload {
264                         // estimate in meter early (even if it's not going into
265                         // uploadables), since we will call Skip() based on the
266                         // results of the download check queue.
267                         c.meter.Add(p.Size)
268
269                         uploadables = append(uploadables, p)
270                 }
271         }
272
273         return c.tq, uploadables
274 }
275
276 func uploadPointers(c *uploadContext, unfiltered ...*lfs.WrappedPointer) {
277         if c.DryRun {
278                 for _, p := range unfiltered {
279                         if c.HasUploaded(p.Oid) {
280                                 continue
281                         }
282
283                         Print("push %s => %s", p.Oid, p.Name)
284                         c.SetUploaded(p.Oid)
285                 }
286
287                 return
288         }
289
290         q, pointers := c.prepareUpload(unfiltered...)
291         for _, p := range pointers {
292                 t, err := uploadTransfer(p, c.allowMissing)
293                 if err != nil && !errors.IsCleanPointerError(err) {
294                         ExitWithError(err)
295                 }
296
297                 q.Add(t.Name, t.Path, t.Oid, t.Size)
298                 c.SetUploaded(p.Oid)
299         }
300 }
301
302 func (c *uploadContext) Await() {
303         c.tq.Wait()
304
305         var missing = make(map[string]string)
306         var corrupt = make(map[string]string)
307         var others = make([]error, 0, len(c.tq.Errors()))
308
309         for _, err := range c.tq.Errors() {
310                 if malformed, ok := err.(*tq.MalformedObjectError); ok {
311                         if malformed.Missing() {
312                                 missing[malformed.Name] = malformed.Oid
313                         } else if malformed.Corrupt() {
314                                 corrupt[malformed.Name] = malformed.Oid
315                         }
316                 } else {
317                         others = append(others, err)
318                 }
319         }
320
321         for _, err := range others {
322                 FullError(err)
323         }
324
325         if len(missing) > 0 || len(corrupt) > 0 {
326                 var action string
327                 if c.allowMissing {
328                         action = "missing objects"
329                 } else {
330                         action = "failed"
331                 }
332
333                 Print("LFS upload %s:", action)
334                 for name, oid := range missing {
335                         Print("  (missing) %s (%s)", name, oid)
336                 }
337                 for name, oid := range corrupt {
338                         Print("  (corrupt) %s (%s)", name, oid)
339                 }
340
341                 if !c.allowMissing {
342                         os.Exit(2)
343                 }
344         }
345
346         if len(others) > 0 {
347                 os.Exit(2)
348         }
349
350         c.trackedLocksMu.Lock()
351         if ul := len(c.unownedLocks); ul > 0 {
352                 Print("Unable to push %d locked file(s):", ul)
353                 for _, unowned := range c.unownedLocks {
354                         Print("* %s - %s", unowned.Path, unowned.Owner)
355                 }
356
357                 if c.lockVerifyState == verifyStateEnabled {
358                         Exit("ERROR: Cannot update locked files.")
359                 } else {
360                         Error("WARNING: The above files would have halted this push.")
361                 }
362         } else if len(c.ownedLocks) > 0 {
363                 Print("Consider unlocking your own locked file(s): (`git lfs unlock <path>`)")
364                 for _, owned := range c.ownedLocks {
365                         Print("* %s", owned.Path)
366                 }
367         }
368         c.trackedLocksMu.Unlock()
369 }
370
371 var (
372         githubHttps, _ = url.Parse("https://github.com")
373         githubSsh, _   = url.Parse("ssh://github.com")
374
375         // hostsWithKnownLockingSupport is a list of scheme-less hostnames
376         // (without port numbers) that are known to implement the LFS locking
377         // API.
378         //
379         // Additions are welcome.
380         hostsWithKnownLockingSupport = []*url.URL{
381                 githubHttps, githubSsh,
382         }
383 )
384
385 func uploadTransfer(p *lfs.WrappedPointer, allowMissing bool) (*tq.Transfer, error) {
386         filename := p.Name
387         oid := p.Oid
388
389         localMediaPath, err := lfs.LocalMediaPath(oid)
390         if err != nil {
391                 return nil, errors.Wrapf(err, "Error uploading file %s (%s)", filename, oid)
392         }
393
394         if len(filename) > 0 {
395                 if err = ensureFile(filename, localMediaPath, allowMissing); err != nil && !errors.IsCleanPointerError(err) {
396                         return nil, err
397                 }
398         }
399
400         return &tq.Transfer{
401                 Name: filename,
402                 Path: localMediaPath,
403                 Oid:  oid,
404                 Size: p.Size,
405         }, nil
406 }
407
408 // ensureFile makes sure that the cleanPath exists before pushing it.  If it
409 // does not exist, it attempts to clean it by reading the file at smudgePath.
410 func ensureFile(smudgePath, cleanPath string, allowMissing bool) error {
411         if _, err := os.Stat(cleanPath); err == nil {
412                 return nil
413         }
414
415         localPath := filepath.Join(config.LocalWorkingDir, smudgePath)
416         file, err := os.Open(localPath)
417         if err != nil {
418                 if allowMissing {
419                         return nil
420                 }
421                 return err
422         }
423
424         defer file.Close()
425
426         stat, err := file.Stat()
427         if err != nil {
428                 return err
429         }
430
431         cleaned, err := lfs.PointerClean(file, file.Name(), stat.Size(), nil)
432         if cleaned != nil {
433                 cleaned.Teardown()
434         }
435
436         if err != nil {
437                 return err
438         }
439         return nil
440 }
441
442 // getVerifyStateFor returns whether or not lock verification is enabled for the
443 // given "endpoint". If no state has been explicitly set, an "unknown" state
444 // will be returned instead.
445 func getVerifyStateFor(endpoint lfsapi.Endpoint) verifyState {
446         uc := config.NewURLConfig(cfg.Git)
447
448         v, ok := uc.Get("lfs", endpoint.Url, "locksverify")
449         if !ok {
450                 if supportsLockingAPI(endpoint) {
451                         return verifyStateEnabled
452                 }
453                 return verifyStateUnknown
454         }
455
456         if enabled, _ := strconv.ParseBool(v); enabled {
457                 return verifyStateEnabled
458         }
459         return verifyStateDisabled
460 }
461
462 // supportsLockingAPI returns whether or not a given lfsapi.Endpoint "e"
463 // is known to support the LFS locking API by whether or not its hostname is
464 // included in the list above.
465 func supportsLockingAPI(e lfsapi.Endpoint) bool {
466         u, err := url.Parse(e.Url)
467         if err != nil {
468                 tracerx.Printf("commands: unable to parse %q to determine locking support: %v", e.Url, err)
469                 return false
470         }
471
472         for _, supported := range hostsWithKnownLockingSupport {
473                 if supported.Scheme == u.Scheme &&
474                         supported.Hostname() == u.Hostname() &&
475                         strings.HasPrefix(u.Path, supported.Path) {
476                         return true
477                 }
478         }
479         return false
480 }
481
482 // disableFor disables lock verification for the given lfsapi.Endpoint,
483 // "endpoint".
484 func disableFor(endpoint lfsapi.Endpoint) error {
485         tracerx.Printf("commands: disabling lock verification for %q", endpoint.Url)
486
487         key := strings.Join([]string{"lfs", endpoint.Url, "locksverify"}, ".")
488
489         _, err := git.Config.SetLocal("", key, "false")
490         return err
491 }