Imported Upstream version 2.4.0
[scm/test.git] / commands / uploader.go
1 package commands
2
3 import (
4         "fmt"
5         "io"
6         "io/ioutil"
7         "net/url"
8         "os"
9         "path/filepath"
10         "strings"
11         "sync"
12
13         "github.com/git-lfs/git-lfs/errors"
14         "github.com/git-lfs/git-lfs/git"
15         "github.com/git-lfs/git-lfs/lfs"
16         "github.com/git-lfs/git-lfs/tasklog"
17         "github.com/git-lfs/git-lfs/tools"
18         "github.com/git-lfs/git-lfs/tq"
19         "github.com/rubyist/tracerx"
20 )
21
22 func uploadForRefUpdates(ctx *uploadContext, updates []*git.RefUpdate, pushAll bool) error {
23         gitscanner, err := ctx.buildGitScanner()
24         if err != nil {
25                 return err
26         }
27
28         defer func() {
29                 gitscanner.Close()
30                 ctx.ReportErrors()
31         }()
32
33         verifyLocksForUpdates(ctx.lockVerifier, updates)
34         for _, update := range updates {
35                 // initialized here to prevent looped defer
36                 q := ctx.NewQueue(
37                         tq.RemoteRef(update.Right()),
38                 )
39                 err := uploadLeftOrAll(gitscanner, ctx, q, update, pushAll)
40                 ctx.CollectErrors(q)
41
42                 if err != nil {
43                         return errors.Wrap(err, fmt.Sprintf("ref %s:", update.Left().Name))
44                 }
45         }
46
47         return nil
48 }
49
50 func uploadLeftOrAll(g *lfs.GitScanner, ctx *uploadContext, q *tq.TransferQueue, update *git.RefUpdate, pushAll bool) error {
51         cb := ctx.gitScannerCallback(q)
52         if pushAll {
53                 if err := g.ScanRefWithDeleted(update.LeftCommitish(), cb); err != nil {
54                         return err
55                 }
56         } else {
57                 if err := g.ScanLeftToRemote(update.LeftCommitish(), cb); err != nil {
58                         return err
59                 }
60         }
61         return ctx.scannerError()
62 }
63
64 type uploadContext struct {
65         Remote       string
66         DryRun       bool
67         Manifest     *tq.Manifest
68         uploadedOids tools.StringSet
69         gitfilter    *lfs.GitFilter
70
71         logger *tasklog.Logger
72         meter  *tq.Meter
73
74         committerName  string
75         committerEmail string
76
77         lockVerifier *lockVerifier
78
79         // allowMissing specifies whether pushes containing missing/corrupt
80         // pointers should allow pushing Git blobs
81         allowMissing bool
82
83         // tracks errors from gitscanner callbacks
84         scannerErr error
85         errMu      sync.Mutex
86
87         // filename => oid
88         missing   map[string]string
89         corrupt   map[string]string
90         otherErrs []error
91 }
92
93 func newUploadContext(dryRun bool) *uploadContext {
94         remote := cfg.PushRemote()
95         manifest := getTransferManifestOperationRemote("upload", remote)
96         ctx := &uploadContext{
97                 Remote:       remote,
98                 Manifest:     manifest,
99                 DryRun:       dryRun,
100                 uploadedOids: tools.NewStringSet(),
101                 gitfilter:    lfs.NewGitFilter(cfg),
102                 lockVerifier: newLockVerifier(manifest),
103                 allowMissing: cfg.Git.Bool("lfs.allowincompletepush", true),
104                 missing:      make(map[string]string),
105                 corrupt:      make(map[string]string),
106                 otherErrs:    make([]error, 0),
107         }
108
109         var sink io.Writer = os.Stdout
110         if dryRun {
111                 sink = ioutil.Discard
112         }
113
114         ctx.logger = tasklog.NewLogger(sink)
115         ctx.meter = buildProgressMeter(ctx.DryRun, tq.Upload)
116         ctx.logger.Enqueue(ctx.meter)
117         ctx.committerName, ctx.committerEmail = cfg.CurrentCommitter()
118         return ctx
119 }
120
121 func (c *uploadContext) NewQueue(options ...tq.Option) *tq.TransferQueue {
122         return tq.NewTransferQueue(tq.Upload, c.Manifest, c.Remote, append(options,
123                 tq.DryRun(c.DryRun),
124                 tq.WithProgress(c.meter),
125         )...)
126 }
127
128 func (c *uploadContext) scannerError() error {
129         c.errMu.Lock()
130         defer c.errMu.Unlock()
131
132         return c.scannerErr
133 }
134
135 func (c *uploadContext) addScannerError(err error) {
136         c.errMu.Lock()
137         defer c.errMu.Unlock()
138
139         if c.scannerErr != nil {
140                 c.scannerErr = fmt.Errorf("%v\n%v", c.scannerErr, err)
141         } else {
142                 c.scannerErr = err
143         }
144 }
145
146 func (c *uploadContext) buildGitScanner() (*lfs.GitScanner, error) {
147         gitscanner := lfs.NewGitScanner(nil)
148         gitscanner.FoundLockable = func(n string) { c.lockVerifier.LockedByThem(n) }
149         gitscanner.PotentialLockables = c.lockVerifier
150         return gitscanner, gitscanner.RemoteForPush(c.Remote)
151 }
152
153 func (c *uploadContext) gitScannerCallback(tqueue *tq.TransferQueue) func(*lfs.WrappedPointer, error) {
154         return func(p *lfs.WrappedPointer, err error) {
155                 if err != nil {
156                         c.addScannerError(err)
157                 } else {
158                         c.UploadPointers(tqueue, p)
159                 }
160         }
161 }
162
163 // AddUpload adds the given oid to the set of oids that have been uploaded in
164 // the current process.
165 func (c *uploadContext) SetUploaded(oid string) {
166         c.uploadedOids.Add(oid)
167 }
168
169 // HasUploaded determines if the given oid has already been uploaded in the
170 // current process.
171 func (c *uploadContext) HasUploaded(oid string) bool {
172         return c.uploadedOids.Contains(oid)
173 }
174
175 func (c *uploadContext) prepareUpload(unfiltered ...*lfs.WrappedPointer) []*lfs.WrappedPointer {
176         numUnfiltered := len(unfiltered)
177         uploadables := make([]*lfs.WrappedPointer, 0, numUnfiltered)
178
179         // XXX(taylor): temporary measure to fix duplicate (broken) results from
180         // scanner
181         uniqOids := tools.NewStringSet()
182
183         // separate out objects that _should_ be uploaded, but don't exist in
184         // .git/lfs/objects. Those will skipped if the server already has them.
185         for _, p := range unfiltered {
186                 // object already uploaded in this process, or we've already
187                 // seen this OID (see above), skip!
188                 if uniqOids.Contains(p.Oid) || c.HasUploaded(p.Oid) {
189                         continue
190                 }
191                 uniqOids.Add(p.Oid)
192
193                 // canUpload determines whether the current pointer "p" can be
194                 // uploaded through the TransferQueue below. It is set to false
195                 // only when the file is locked by someone other than the
196                 // current committer.
197                 var canUpload bool = true
198
199                 if c.lockVerifier.LockedByThem(p.Name) {
200                         // If the verification state is enabled, this failed
201                         // locks verification means that the push should fail.
202                         //
203                         // If the state is disabled, the verification error is
204                         // silent and the user can upload.
205                         //
206                         // If the state is undefined, the verification error is
207                         // sent as a warning and the user can upload.
208                         canUpload = !c.lockVerifier.Enabled()
209                 }
210
211                 c.lockVerifier.LockedByUs(p.Name)
212
213                 if canUpload {
214                         // estimate in meter early (even if it's not going into
215                         // uploadables), since we will call Skip() based on the
216                         // results of the download check queue.
217                         c.meter.Add(p.Size)
218
219                         uploadables = append(uploadables, p)
220                 }
221         }
222
223         return uploadables
224 }
225
226 func (c *uploadContext) UploadPointers(q *tq.TransferQueue, unfiltered ...*lfs.WrappedPointer) {
227         if c.DryRun {
228                 for _, p := range unfiltered {
229                         if c.HasUploaded(p.Oid) {
230                                 continue
231                         }
232
233                         Print("push %s => %s", p.Oid, p.Name)
234                         c.SetUploaded(p.Oid)
235                 }
236
237                 return
238         }
239
240         pointers := c.prepareUpload(unfiltered...)
241         for _, p := range pointers {
242                 t, err := c.uploadTransfer(p)
243                 if err != nil && !errors.IsCleanPointerError(err) {
244                         ExitWithError(err)
245                 }
246
247                 q.Add(t.Name, t.Path, t.Oid, t.Size)
248                 c.SetUploaded(p.Oid)
249         }
250 }
251
252 func (c *uploadContext) CollectErrors(tqueue *tq.TransferQueue) {
253         tqueue.Wait()
254
255         for _, err := range tqueue.Errors() {
256                 if malformed, ok := err.(*tq.MalformedObjectError); ok {
257                         if malformed.Missing() {
258                                 c.missing[malformed.Name] = malformed.Oid
259                         } else if malformed.Corrupt() {
260                                 c.corrupt[malformed.Name] = malformed.Oid
261                         }
262                 } else {
263                         c.otherErrs = append(c.otherErrs, err)
264                 }
265         }
266 }
267
268 func (c *uploadContext) ReportErrors() {
269         c.meter.Finish()
270
271         for _, err := range c.otherErrs {
272                 FullError(err)
273         }
274
275         if len(c.missing) > 0 || len(c.corrupt) > 0 {
276                 var action string
277                 if c.allowMissing {
278                         action = "missing objects"
279                 } else {
280                         action = "failed"
281                 }
282
283                 Print("LFS upload %s:", action)
284                 for name, oid := range c.missing {
285                         Print("  (missing) %s (%s)", name, oid)
286                 }
287                 for name, oid := range c.corrupt {
288                         Print("  (corrupt) %s (%s)", name, oid)
289                 }
290
291                 if !c.allowMissing {
292                         os.Exit(2)
293                 }
294         }
295
296         if len(c.otherErrs) > 0 {
297                 os.Exit(2)
298         }
299
300         if c.lockVerifier.HasUnownedLocks() {
301                 Print("Unable to push locked files:")
302                 for _, unowned := range c.lockVerifier.UnownedLocks() {
303                         Print("* %s - %s", unowned.Path(), unowned.Owners())
304                 }
305
306                 if c.lockVerifier.Enabled() {
307                         Exit("ERROR: Cannot update locked files.")
308                 } else {
309                         Error("WARNING: The above files would have halted this push.")
310                 }
311         } else if c.lockVerifier.HasOwnedLocks() {
312                 Print("Consider unlocking your own locked files: (`git lfs unlock <path>`)")
313                 for _, owned := range c.lockVerifier.OwnedLocks() {
314                         Print("* %s", owned.Path())
315                 }
316         }
317 }
318
319 var (
320         githubHttps, _ = url.Parse("https://github.com")
321         githubSsh, _   = url.Parse("ssh://github.com")
322
323         // hostsWithKnownLockingSupport is a list of scheme-less hostnames
324         // (without port numbers) that are known to implement the LFS locking
325         // API.
326         //
327         // Additions are welcome.
328         hostsWithKnownLockingSupport = []*url.URL{
329                 githubHttps, githubSsh,
330         }
331 )
332
333 func (c *uploadContext) uploadTransfer(p *lfs.WrappedPointer) (*tq.Transfer, error) {
334         filename := p.Name
335         oid := p.Oid
336
337         localMediaPath, err := c.gitfilter.ObjectPath(oid)
338         if err != nil {
339                 return nil, errors.Wrapf(err, "Error uploading file %s (%s)", filename, oid)
340         }
341
342         if len(filename) > 0 {
343                 if err = c.ensureFile(filename, localMediaPath); err != nil && !errors.IsCleanPointerError(err) {
344                         return nil, err
345                 }
346         }
347
348         return &tq.Transfer{
349                 Name: filename,
350                 Path: localMediaPath,
351                 Oid:  oid,
352                 Size: p.Size,
353         }, nil
354 }
355
356 // ensureFile makes sure that the cleanPath exists before pushing it.  If it
357 // does not exist, it attempts to clean it by reading the file at smudgePath.
358 func (c *uploadContext) ensureFile(smudgePath, cleanPath string) error {
359         if _, err := os.Stat(cleanPath); err == nil {
360                 return nil
361         }
362
363         localPath := filepath.Join(cfg.LocalWorkingDir(), smudgePath)
364         file, err := os.Open(localPath)
365         if err != nil {
366                 if c.allowMissing {
367                         return nil
368                 }
369                 return err
370         }
371
372         defer file.Close()
373
374         stat, err := file.Stat()
375         if err != nil {
376                 return err
377         }
378
379         cleaned, err := c.gitfilter.Clean(file, file.Name(), stat.Size(), nil)
380         if cleaned != nil {
381                 cleaned.Teardown()
382         }
383
384         if err != nil {
385                 return err
386         }
387         return nil
388 }
389
390 // supportsLockingAPI returns whether or not a given url is known to support
391 // the LFS locking API by whether or not its hostname is included in the list
392 // above.
393 func supportsLockingAPI(rawurl string) bool {
394         u, err := url.Parse(rawurl)
395         if err != nil {
396                 tracerx.Printf("commands: unable to parse %q to determine locking support: %v", rawurl, err)
397                 return false
398         }
399
400         for _, supported := range hostsWithKnownLockingSupport {
401                 if supported.Scheme == u.Scheme &&
402                         supported.Hostname() == u.Hostname() &&
403                         strings.HasPrefix(u.Path, supported.Path) {
404                         return true
405                 }
406         }
407         return false
408 }
409
410 // disableFor disables lock verification for the given lfsapi.Endpoint,
411 // "endpoint".
412 func disableFor(rawurl string) error {
413         tracerx.Printf("commands: disabling lock verification for %q", rawurl)
414
415         key := strings.Join([]string{"lfs", rawurl, "locksverify"}, ".")
416
417         _, err := cfg.SetGitLocalKey(key, "false")
418         return err
419 }