Tizen_4.0 base
[platform/upstream/docker-engine.git] / distribution / push_v2.go
1 package distribution
2
3 import (
4         "errors"
5         "fmt"
6         "io"
7         "runtime"
8         "sort"
9         "strings"
10         "sync"
11
12         "golang.org/x/net/context"
13
14         "github.com/Sirupsen/logrus"
15         "github.com/docker/distribution"
16         "github.com/docker/distribution/manifest/schema1"
17         "github.com/docker/distribution/manifest/schema2"
18         "github.com/docker/distribution/reference"
19         "github.com/docker/distribution/registry/client"
20         apitypes "github.com/docker/docker/api/types"
21         "github.com/docker/docker/distribution/metadata"
22         "github.com/docker/docker/distribution/xfer"
23         "github.com/docker/docker/layer"
24         "github.com/docker/docker/pkg/ioutils"
25         "github.com/docker/docker/pkg/progress"
26         "github.com/docker/docker/pkg/stringid"
27         "github.com/docker/docker/registry"
28         "github.com/opencontainers/go-digest"
29 )
30
31 const (
32         smallLayerMaximumSize  = 100 * (1 << 10) // 100KB
33         middleLayerMaximumSize = 10 * (1 << 20)  // 10MB
34 )
35
36 type v2Pusher struct {
37         v2MetadataService metadata.V2MetadataService
38         ref               reference.Named
39         endpoint          registry.APIEndpoint
40         repoInfo          *registry.RepositoryInfo
41         config            *ImagePushConfig
42         repo              distribution.Repository
43
44         // pushState is state built by the Upload functions.
45         pushState pushState
46 }
47
48 type pushState struct {
49         sync.Mutex
50         // remoteLayers is the set of layers known to exist on the remote side.
51         // This avoids redundant queries when pushing multiple tags that
52         // involve the same layers. It is also used to fill in digest and size
53         // information when building the manifest.
54         remoteLayers map[layer.DiffID]distribution.Descriptor
55         // confirmedV2 is set to true if we confirm we're talking to a v2
56         // registry. This is used to limit fallbacks to the v1 protocol.
57         confirmedV2 bool
58 }
59
60 func (p *v2Pusher) Push(ctx context.Context) (err error) {
61         p.pushState.remoteLayers = make(map[layer.DiffID]distribution.Descriptor)
62
63         p.repo, p.pushState.confirmedV2, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
64         if err != nil {
65                 logrus.Debugf("Error getting v2 registry: %v", err)
66                 return err
67         }
68
69         if err = p.pushV2Repository(ctx); err != nil {
70                 if continueOnError(err) {
71                         return fallbackError{
72                                 err:         err,
73                                 confirmedV2: p.pushState.confirmedV2,
74                                 transportOK: true,
75                         }
76                 }
77         }
78         return err
79 }
80
81 func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) {
82         if namedTagged, isNamedTagged := p.ref.(reference.NamedTagged); isNamedTagged {
83                 imageID, err := p.config.ReferenceStore.Get(p.ref)
84                 if err != nil {
85                         return fmt.Errorf("tag does not exist: %s", reference.FamiliarString(p.ref))
86                 }
87
88                 return p.pushV2Tag(ctx, namedTagged, imageID)
89         }
90
91         if !reference.IsNameOnly(p.ref) {
92                 return errors.New("cannot push a digest reference")
93         }
94
95         // Pull all tags
96         pushed := 0
97         for _, association := range p.config.ReferenceStore.ReferencesByName(p.ref) {
98                 if namedTagged, isNamedTagged := association.Ref.(reference.NamedTagged); isNamedTagged {
99                         pushed++
100                         if err := p.pushV2Tag(ctx, namedTagged, association.ID); err != nil {
101                                 return err
102                         }
103                 }
104         }
105
106         if pushed == 0 {
107                 return fmt.Errorf("no tags to push for %s", reference.FamiliarName(p.repoInfo.Name))
108         }
109
110         return nil
111 }
112
113 func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error {
114         logrus.Debugf("Pushing repository: %s", reference.FamiliarString(ref))
115
116         imgConfig, err := p.config.ImageStore.Get(id)
117         if err != nil {
118                 return fmt.Errorf("could not find image from tag %s: %v", reference.FamiliarString(ref), err)
119         }
120
121         rootfs, _, err := p.config.ImageStore.RootFSAndPlatformFromConfig(imgConfig)
122         if err != nil {
123                 return fmt.Errorf("unable to get rootfs for image %s: %s", reference.FamiliarString(ref), err)
124         }
125
126         l, err := p.config.LayerStore.Get(rootfs.ChainID())
127         if err != nil {
128                 return fmt.Errorf("failed to get top layer from image: %v", err)
129         }
130         defer l.Release()
131
132         hmacKey, err := metadata.ComputeV2MetadataHMACKey(p.config.AuthConfig)
133         if err != nil {
134                 return fmt.Errorf("failed to compute hmac key of auth config: %v", err)
135         }
136
137         var descriptors []xfer.UploadDescriptor
138
139         descriptorTemplate := v2PushDescriptor{
140                 v2MetadataService: p.v2MetadataService,
141                 hmacKey:           hmacKey,
142                 repoInfo:          p.repoInfo.Name,
143                 ref:               p.ref,
144                 endpoint:          p.endpoint,
145                 repo:              p.repo,
146                 pushState:         &p.pushState,
147         }
148
149         // Loop bounds condition is to avoid pushing the base layer on Windows.
150         for range rootfs.DiffIDs {
151                 descriptor := descriptorTemplate
152                 descriptor.layer = l
153                 descriptor.checkedDigests = make(map[digest.Digest]struct{})
154                 descriptors = append(descriptors, &descriptor)
155
156                 l = l.Parent()
157         }
158
159         if err := p.config.UploadManager.Upload(ctx, descriptors, p.config.ProgressOutput); err != nil {
160                 return err
161         }
162
163         // Try schema2 first
164         builder := schema2.NewManifestBuilder(p.repo.Blobs(ctx), p.config.ConfigMediaType, imgConfig)
165         manifest, err := manifestFromBuilder(ctx, builder, descriptors)
166         if err != nil {
167                 return err
168         }
169
170         manSvc, err := p.repo.Manifests(ctx)
171         if err != nil {
172                 return err
173         }
174
175         putOptions := []distribution.ManifestServiceOption{distribution.WithTag(ref.Tag())}
176         if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
177                 if runtime.GOOS == "windows" || p.config.TrustKey == nil || p.config.RequireSchema2 {
178                         logrus.Warnf("failed to upload schema2 manifest: %v", err)
179                         return err
180                 }
181
182                 logrus.Warnf("failed to upload schema2 manifest: %v - falling back to schema1", err)
183
184                 manifestRef, err := reference.WithTag(p.repo.Named(), ref.Tag())
185                 if err != nil {
186                         return err
187                 }
188                 builder = schema1.NewConfigManifestBuilder(p.repo.Blobs(ctx), p.config.TrustKey, manifestRef, imgConfig)
189                 manifest, err = manifestFromBuilder(ctx, builder, descriptors)
190                 if err != nil {
191                         return err
192                 }
193
194                 if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
195                         return err
196                 }
197         }
198
199         var canonicalManifest []byte
200
201         switch v := manifest.(type) {
202         case *schema1.SignedManifest:
203                 canonicalManifest = v.Canonical
204         case *schema2.DeserializedManifest:
205                 _, canonicalManifest, err = v.Payload()
206                 if err != nil {
207                         return err
208                 }
209         }
210
211         manifestDigest := digest.FromBytes(canonicalManifest)
212         progress.Messagef(p.config.ProgressOutput, "", "%s: digest: %s size: %d", ref.Tag(), manifestDigest, len(canonicalManifest))
213
214         if err := addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id); err != nil {
215                 return err
216         }
217
218         // Signal digest to the trust client so it can sign the
219         // push, if appropriate.
220         progress.Aux(p.config.ProgressOutput, apitypes.PushResult{Tag: ref.Tag(), Digest: manifestDigest.String(), Size: len(canonicalManifest)})
221
222         return nil
223 }
224
225 func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuilder, descriptors []xfer.UploadDescriptor) (distribution.Manifest, error) {
226         // descriptors is in reverse order; iterate backwards to get references
227         // appended in the right order.
228         for i := len(descriptors) - 1; i >= 0; i-- {
229                 if err := builder.AppendReference(descriptors[i].(*v2PushDescriptor)); err != nil {
230                         return nil, err
231                 }
232         }
233
234         return builder.Build(ctx)
235 }
236
237 type v2PushDescriptor struct {
238         layer             PushLayer
239         v2MetadataService metadata.V2MetadataService
240         hmacKey           []byte
241         repoInfo          reference.Named
242         ref               reference.Named
243         endpoint          registry.APIEndpoint
244         repo              distribution.Repository
245         pushState         *pushState
246         remoteDescriptor  distribution.Descriptor
247         // a set of digests whose presence has been checked in a target repository
248         checkedDigests map[digest.Digest]struct{}
249 }
250
251 func (pd *v2PushDescriptor) Key() string {
252         return "v2push:" + pd.ref.Name() + " " + pd.layer.DiffID().String()
253 }
254
255 func (pd *v2PushDescriptor) ID() string {
256         return stringid.TruncateID(pd.layer.DiffID().String())
257 }
258
259 func (pd *v2PushDescriptor) DiffID() layer.DiffID {
260         return pd.layer.DiffID()
261 }
262
263 func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
264         // Skip foreign layers unless this registry allows nondistributable artifacts.
265         if !pd.endpoint.AllowNondistributableArtifacts {
266                 if fs, ok := pd.layer.(distribution.Describable); ok {
267                         if d := fs.Descriptor(); len(d.URLs) > 0 {
268                                 progress.Update(progressOutput, pd.ID(), "Skipped foreign layer")
269                                 return d, nil
270                         }
271                 }
272         }
273
274         diffID := pd.DiffID()
275
276         pd.pushState.Lock()
277         if descriptor, ok := pd.pushState.remoteLayers[diffID]; ok {
278                 // it is already known that the push is not needed and
279                 // therefore doing a stat is unnecessary
280                 pd.pushState.Unlock()
281                 progress.Update(progressOutput, pd.ID(), "Layer already exists")
282                 return descriptor, nil
283         }
284         pd.pushState.Unlock()
285
286         maxMountAttempts, maxExistenceChecks, checkOtherRepositories := getMaxMountAndExistenceCheckAttempts(pd.layer)
287
288         // Do we have any metadata associated with this layer's DiffID?
289         v2Metadata, err := pd.v2MetadataService.GetMetadata(diffID)
290         if err == nil {
291                 // check for blob existence in the target repository
292                 descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, true, 1, v2Metadata)
293                 if exists || err != nil {
294                         return descriptor, err
295                 }
296         }
297
298         // if digest was empty or not saved, or if blob does not exist on the remote repository,
299         // then push the blob.
300         bs := pd.repo.Blobs(ctx)
301
302         var layerUpload distribution.BlobWriter
303
304         // Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload
305         candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxMountAttempts, v2Metadata)
306         for _, mountCandidate := range candidates {
307                 logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountCandidate.Digest, mountCandidate.SourceRepository)
308                 createOpts := []distribution.BlobCreateOption{}
309
310                 if len(mountCandidate.SourceRepository) > 0 {
311                         namedRef, err := reference.ParseNormalizedNamed(mountCandidate.SourceRepository)
312                         if err != nil {
313                                 logrus.Errorf("failed to parse source repository reference %v: %v", reference.FamiliarString(namedRef), err)
314                                 pd.v2MetadataService.Remove(mountCandidate)
315                                 continue
316                         }
317
318                         // Candidates are always under same domain, create remote reference
319                         // with only path to set mount from with
320                         remoteRef, err := reference.WithName(reference.Path(namedRef))
321                         if err != nil {
322                                 logrus.Errorf("failed to make remote reference out of %q: %v", reference.Path(namedRef), err)
323                                 continue
324                         }
325
326                         canonicalRef, err := reference.WithDigest(reference.TrimNamed(remoteRef), mountCandidate.Digest)
327                         if err != nil {
328                                 logrus.Errorf("failed to make canonical reference: %v", err)
329                                 continue
330                         }
331
332                         createOpts = append(createOpts, client.WithMountFrom(canonicalRef))
333                 }
334
335                 // send the layer
336                 lu, err := bs.Create(ctx, createOpts...)
337                 switch err := err.(type) {
338                 case nil:
339                         // noop
340                 case distribution.ErrBlobMounted:
341                         progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", err.From.Name())
342
343                         err.Descriptor.MediaType = schema2.MediaTypeLayer
344
345                         pd.pushState.Lock()
346                         pd.pushState.confirmedV2 = true
347                         pd.pushState.remoteLayers[diffID] = err.Descriptor
348                         pd.pushState.Unlock()
349
350                         // Cache mapping from this layer's DiffID to the blobsum
351                         if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
352                                 Digest:           err.Descriptor.Digest,
353                                 SourceRepository: pd.repoInfo.Name(),
354                         }); err != nil {
355                                 return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
356                         }
357                         return err.Descriptor, nil
358                 default:
359                         logrus.Infof("failed to mount layer %s (%s) from %s: %v", diffID, mountCandidate.Digest, mountCandidate.SourceRepository, err)
360                 }
361
362                 if len(mountCandidate.SourceRepository) > 0 &&
363                         (metadata.CheckV2MetadataHMAC(&mountCandidate, pd.hmacKey) ||
364                                 len(mountCandidate.HMAC) == 0) {
365                         cause := "blob mount failure"
366                         if err != nil {
367                                 cause = fmt.Sprintf("an error: %v", err.Error())
368                         }
369                         logrus.Debugf("removing association between layer %s and %s due to %s", mountCandidate.Digest, mountCandidate.SourceRepository, cause)
370                         pd.v2MetadataService.Remove(mountCandidate)
371                 }
372
373                 if lu != nil {
374                         // cancel previous upload
375                         cancelLayerUpload(ctx, mountCandidate.Digest, layerUpload)
376                         layerUpload = lu
377                 }
378         }
379
380         if maxExistenceChecks-len(pd.checkedDigests) > 0 {
381                 // do additional layer existence checks with other known digests if any
382                 descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, checkOtherRepositories, maxExistenceChecks-len(pd.checkedDigests), v2Metadata)
383                 if exists || err != nil {
384                         return descriptor, err
385                 }
386         }
387
388         logrus.Debugf("Pushing layer: %s", diffID)
389         if layerUpload == nil {
390                 layerUpload, err = bs.Create(ctx)
391                 if err != nil {
392                         return distribution.Descriptor{}, retryOnError(err)
393                 }
394         }
395         defer layerUpload.Close()
396
397         // upload the blob
398         desc, err := pd.uploadUsingSession(ctx, progressOutput, diffID, layerUpload)
399         if err != nil {
400                 return desc, err
401         }
402
403         return desc, nil
404 }
405
406 func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
407         pd.remoteDescriptor = descriptor
408 }
409
410 func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor {
411         return pd.remoteDescriptor
412 }
413
414 func (pd *v2PushDescriptor) uploadUsingSession(
415         ctx context.Context,
416         progressOutput progress.Output,
417         diffID layer.DiffID,
418         layerUpload distribution.BlobWriter,
419 ) (distribution.Descriptor, error) {
420         var reader io.ReadCloser
421
422         contentReader, err := pd.layer.Open()
423         if err != nil {
424                 return distribution.Descriptor{}, retryOnError(err)
425         }
426
427         size, _ := pd.layer.Size()
428
429         reader = progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, contentReader), progressOutput, size, pd.ID(), "Pushing")
430
431         switch m := pd.layer.MediaType(); m {
432         case schema2.MediaTypeUncompressedLayer:
433                 compressedReader, compressionDone := compress(reader)
434                 defer func(closer io.Closer) {
435                         closer.Close()
436                         <-compressionDone
437                 }(reader)
438                 reader = compressedReader
439         case schema2.MediaTypeLayer:
440         default:
441                 reader.Close()
442                 return distribution.Descriptor{}, fmt.Errorf("unsupported layer media type %s", m)
443         }
444
445         digester := digest.Canonical.Digester()
446         tee := io.TeeReader(reader, digester.Hash())
447
448         nn, err := layerUpload.ReadFrom(tee)
449         reader.Close()
450         if err != nil {
451                 return distribution.Descriptor{}, retryOnError(err)
452         }
453
454         pushDigest := digester.Digest()
455         if _, err := layerUpload.Commit(ctx, distribution.Descriptor{Digest: pushDigest}); err != nil {
456                 return distribution.Descriptor{}, retryOnError(err)
457         }
458
459         logrus.Debugf("uploaded layer %s (%s), %d bytes", diffID, pushDigest, nn)
460         progress.Update(progressOutput, pd.ID(), "Pushed")
461
462         // Cache mapping from this layer's DiffID to the blobsum
463         if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
464                 Digest:           pushDigest,
465                 SourceRepository: pd.repoInfo.Name(),
466         }); err != nil {
467                 return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
468         }
469
470         desc := distribution.Descriptor{
471                 Digest:    pushDigest,
472                 MediaType: schema2.MediaTypeLayer,
473                 Size:      nn,
474         }
475
476         pd.pushState.Lock()
477         // If Commit succeeded, that's an indication that the remote registry speaks the v2 protocol.
478         pd.pushState.confirmedV2 = true
479         pd.pushState.remoteLayers[diffID] = desc
480         pd.pushState.Unlock()
481
482         return desc, nil
483 }
484
485 // layerAlreadyExists checks if the registry already knows about any of the metadata passed in the "metadata"
486 // slice. If it finds one that the registry knows about, it returns the known digest and "true". If
487 // "checkOtherRepositories" is true, stat will be performed also with digests mapped to any other repository
488 // (not just the target one).
489 func (pd *v2PushDescriptor) layerAlreadyExists(
490         ctx context.Context,
491         progressOutput progress.Output,
492         diffID layer.DiffID,
493         checkOtherRepositories bool,
494         maxExistenceCheckAttempts int,
495         v2Metadata []metadata.V2Metadata,
496 ) (desc distribution.Descriptor, exists bool, err error) {
497         // filter the metadata
498         candidates := []metadata.V2Metadata{}
499         for _, meta := range v2Metadata {
500                 if len(meta.SourceRepository) > 0 && !checkOtherRepositories && meta.SourceRepository != pd.repoInfo.Name() {
501                         continue
502                 }
503                 candidates = append(candidates, meta)
504         }
505         // sort the candidates by similarity
506         sortV2MetadataByLikenessAndAge(pd.repoInfo, pd.hmacKey, candidates)
507
508         digestToMetadata := make(map[digest.Digest]*metadata.V2Metadata)
509         // an array of unique blob digests ordered from the best mount candidates to worst
510         layerDigests := []digest.Digest{}
511         for i := 0; i < len(candidates); i++ {
512                 if len(layerDigests) >= maxExistenceCheckAttempts {
513                         break
514                 }
515                 meta := &candidates[i]
516                 if _, exists := digestToMetadata[meta.Digest]; exists {
517                         // keep reference just to the first mapping (the best mount candidate)
518                         continue
519                 }
520                 if _, exists := pd.checkedDigests[meta.Digest]; exists {
521                         // existence of this digest has already been tested
522                         continue
523                 }
524                 digestToMetadata[meta.Digest] = meta
525                 layerDigests = append(layerDigests, meta.Digest)
526         }
527
528 attempts:
529         for _, dgst := range layerDigests {
530                 meta := digestToMetadata[dgst]
531                 logrus.Debugf("Checking for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.Name())
532                 desc, err = pd.repo.Blobs(ctx).Stat(ctx, dgst)
533                 pd.checkedDigests[meta.Digest] = struct{}{}
534                 switch err {
535                 case nil:
536                         if m, ok := digestToMetadata[desc.Digest]; !ok || m.SourceRepository != pd.repoInfo.Name() || !metadata.CheckV2MetadataHMAC(m, pd.hmacKey) {
537                                 // cache mapping from this layer's DiffID to the blobsum
538                                 if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
539                                         Digest:           desc.Digest,
540                                         SourceRepository: pd.repoInfo.Name(),
541                                 }); err != nil {
542                                         return distribution.Descriptor{}, false, xfer.DoNotRetry{Err: err}
543                                 }
544                         }
545                         desc.MediaType = schema2.MediaTypeLayer
546                         exists = true
547                         break attempts
548                 case distribution.ErrBlobUnknown:
549                         if meta.SourceRepository == pd.repoInfo.Name() {
550                                 // remove the mapping to the target repository
551                                 pd.v2MetadataService.Remove(*meta)
552                         }
553                 default:
554                         logrus.WithError(err).Debugf("Failed to check for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.Name())
555                 }
556         }
557
558         if exists {
559                 progress.Update(progressOutput, pd.ID(), "Layer already exists")
560                 pd.pushState.Lock()
561                 pd.pushState.remoteLayers[diffID] = desc
562                 pd.pushState.Unlock()
563         }
564
565         return desc, exists, nil
566 }
567
568 // getMaxMountAndExistenceCheckAttempts returns a maximum number of cross repository mount attempts from
569 // source repositories of target registry, maximum number of layer existence checks performed on the target
570 // repository and whether the check shall be done also with digests mapped to different repositories. The
571 // decision is based on layer size. The smaller the layer, the fewer attempts shall be made because the cost
572 // of upload does not outweigh a latency.
573 func getMaxMountAndExistenceCheckAttempts(layer PushLayer) (maxMountAttempts, maxExistenceCheckAttempts int, checkOtherRepositories bool) {
574         size, err := layer.Size()
575         switch {
576         // big blob
577         case size > middleLayerMaximumSize:
578                 // 1st attempt to mount the blob few times
579                 // 2nd few existence checks with digests associated to any repository
580                 // then fallback to upload
581                 return 4, 3, true
582
583         // middle sized blobs; if we could not get the size, assume we deal with middle sized blob
584         case size > smallLayerMaximumSize, err != nil:
585                 // 1st attempt to mount blobs of average size few times
586                 // 2nd try at most 1 existence check if there's an existing mapping to the target repository
587                 // then fallback to upload
588                 return 3, 1, false
589
590         // small blobs, do a minimum number of checks
591         default:
592                 return 1, 1, false
593         }
594 }
595
596 // getRepositoryMountCandidates returns an array of v2 metadata items belonging to the given registry. The
597 // array is sorted from youngest to oldest. If requireRegistryMatch is true, the resulting array will contain
598 // only metadata entries having registry part of SourceRepository matching the part of repoInfo.
599 func getRepositoryMountCandidates(
600         repoInfo reference.Named,
601         hmacKey []byte,
602         max int,
603         v2Metadata []metadata.V2Metadata,
604 ) []metadata.V2Metadata {
605         candidates := []metadata.V2Metadata{}
606         for _, meta := range v2Metadata {
607                 sourceRepo, err := reference.ParseNamed(meta.SourceRepository)
608                 if err != nil || reference.Domain(repoInfo) != reference.Domain(sourceRepo) {
609                         continue
610                 }
611                 // target repository is not a viable candidate
612                 if meta.SourceRepository == repoInfo.Name() {
613                         continue
614                 }
615                 candidates = append(candidates, meta)
616         }
617
618         sortV2MetadataByLikenessAndAge(repoInfo, hmacKey, candidates)
619         if max >= 0 && len(candidates) > max {
620                 // select the youngest metadata
621                 candidates = candidates[:max]
622         }
623
624         return candidates
625 }
626
627 // byLikeness is a sorting container for v2 metadata candidates for cross repository mount. The
628 // candidate "a" is preferred over "b":
629 //
630 //  1. if it was hashed using the same AuthConfig as the one used to authenticate to target repository and the
631 //     "b" was not
632 //  2. if a number of its repository path components exactly matching path components of target repository is higher
633 type byLikeness struct {
634         arr            []metadata.V2Metadata
635         hmacKey        []byte
636         pathComponents []string
637 }
638
639 func (bla byLikeness) Less(i, j int) bool {
640         aMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[i], bla.hmacKey)
641         bMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[j], bla.hmacKey)
642         if aMacMatch != bMacMatch {
643                 return aMacMatch
644         }
645         aMatch := numOfMatchingPathComponents(bla.arr[i].SourceRepository, bla.pathComponents)
646         bMatch := numOfMatchingPathComponents(bla.arr[j].SourceRepository, bla.pathComponents)
647         return aMatch > bMatch
648 }
649 func (bla byLikeness) Swap(i, j int) {
650         bla.arr[i], bla.arr[j] = bla.arr[j], bla.arr[i]
651 }
652 func (bla byLikeness) Len() int { return len(bla.arr) }
653
654 func sortV2MetadataByLikenessAndAge(repoInfo reference.Named, hmacKey []byte, marr []metadata.V2Metadata) {
655         // reverse the metadata array to shift the newest entries to the beginning
656         for i := 0; i < len(marr)/2; i++ {
657                 marr[i], marr[len(marr)-i-1] = marr[len(marr)-i-1], marr[i]
658         }
659         // keep equal entries ordered from the youngest to the oldest
660         sort.Stable(byLikeness{
661                 arr:            marr,
662                 hmacKey:        hmacKey,
663                 pathComponents: getPathComponents(repoInfo.Name()),
664         })
665 }
666
667 // numOfMatchingPathComponents returns a number of path components in "pth" that exactly match "matchComponents".
668 func numOfMatchingPathComponents(pth string, matchComponents []string) int {
669         pthComponents := getPathComponents(pth)
670         i := 0
671         for ; i < len(pthComponents) && i < len(matchComponents); i++ {
672                 if matchComponents[i] != pthComponents[i] {
673                         return i
674                 }
675         }
676         return i
677 }
678
679 func getPathComponents(path string) []string {
680         return strings.Split(path, "/")
681 }
682
683 func cancelLayerUpload(ctx context.Context, dgst digest.Digest, layerUpload distribution.BlobWriter) {
684         if layerUpload != nil {
685                 logrus.Debugf("cancelling upload of blob %s", dgst)
686                 err := layerUpload.Cancel(ctx)
687                 if err != nil {
688                         logrus.Warnf("failed to cancel upload: %v", err)
689                 }
690         }
691 }