12 "golang.org/x/net/context"
14 "github.com/Sirupsen/logrus"
15 "github.com/docker/distribution"
16 "github.com/docker/distribution/manifest/schema1"
17 "github.com/docker/distribution/manifest/schema2"
18 "github.com/docker/distribution/reference"
19 "github.com/docker/distribution/registry/client"
20 apitypes "github.com/docker/docker/api/types"
21 "github.com/docker/docker/distribution/metadata"
22 "github.com/docker/docker/distribution/xfer"
23 "github.com/docker/docker/layer"
24 "github.com/docker/docker/pkg/ioutils"
25 "github.com/docker/docker/pkg/progress"
26 "github.com/docker/docker/pkg/stringid"
27 "github.com/docker/docker/registry"
28 "github.com/opencontainers/go-digest"
32 smallLayerMaximumSize = 100 * (1 << 10) // 100KB
33 middleLayerMaximumSize = 10 * (1 << 20) // 10MB
36 type v2Pusher struct {
37 v2MetadataService metadata.V2MetadataService
39 endpoint registry.APIEndpoint
40 repoInfo *registry.RepositoryInfo
41 config *ImagePushConfig
42 repo distribution.Repository
44 // pushState is state built by the Upload functions.
48 type pushState struct {
50 // remoteLayers is the set of layers known to exist on the remote side.
51 // This avoids redundant queries when pushing multiple tags that
52 // involve the same layers. It is also used to fill in digest and size
53 // information when building the manifest.
54 remoteLayers map[layer.DiffID]distribution.Descriptor
55 // confirmedV2 is set to true if we confirm we're talking to a v2
56 // registry. This is used to limit fallbacks to the v1 protocol.
60 func (p *v2Pusher) Push(ctx context.Context) (err error) {
61 p.pushState.remoteLayers = make(map[layer.DiffID]distribution.Descriptor)
63 p.repo, p.pushState.confirmedV2, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
65 logrus.Debugf("Error getting v2 registry: %v", err)
69 if err = p.pushV2Repository(ctx); err != nil {
70 if continueOnError(err) {
73 confirmedV2: p.pushState.confirmedV2,
81 func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) {
82 if namedTagged, isNamedTagged := p.ref.(reference.NamedTagged); isNamedTagged {
83 imageID, err := p.config.ReferenceStore.Get(p.ref)
85 return fmt.Errorf("tag does not exist: %s", reference.FamiliarString(p.ref))
88 return p.pushV2Tag(ctx, namedTagged, imageID)
91 if !reference.IsNameOnly(p.ref) {
92 return errors.New("cannot push a digest reference")
97 for _, association := range p.config.ReferenceStore.ReferencesByName(p.ref) {
98 if namedTagged, isNamedTagged := association.Ref.(reference.NamedTagged); isNamedTagged {
100 if err := p.pushV2Tag(ctx, namedTagged, association.ID); err != nil {
107 return fmt.Errorf("no tags to push for %s", reference.FamiliarName(p.repoInfo.Name))
113 func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error {
114 logrus.Debugf("Pushing repository: %s", reference.FamiliarString(ref))
116 imgConfig, err := p.config.ImageStore.Get(id)
118 return fmt.Errorf("could not find image from tag %s: %v", reference.FamiliarString(ref), err)
121 rootfs, _, err := p.config.ImageStore.RootFSAndPlatformFromConfig(imgConfig)
123 return fmt.Errorf("unable to get rootfs for image %s: %s", reference.FamiliarString(ref), err)
126 l, err := p.config.LayerStore.Get(rootfs.ChainID())
128 return fmt.Errorf("failed to get top layer from image: %v", err)
132 hmacKey, err := metadata.ComputeV2MetadataHMACKey(p.config.AuthConfig)
134 return fmt.Errorf("failed to compute hmac key of auth config: %v", err)
137 var descriptors []xfer.UploadDescriptor
139 descriptorTemplate := v2PushDescriptor{
140 v2MetadataService: p.v2MetadataService,
142 repoInfo: p.repoInfo.Name,
144 endpoint: p.endpoint,
146 pushState: &p.pushState,
149 // Loop bounds condition is to avoid pushing the base layer on Windows.
150 for range rootfs.DiffIDs {
151 descriptor := descriptorTemplate
153 descriptor.checkedDigests = make(map[digest.Digest]struct{})
154 descriptors = append(descriptors, &descriptor)
159 if err := p.config.UploadManager.Upload(ctx, descriptors, p.config.ProgressOutput); err != nil {
164 builder := schema2.NewManifestBuilder(p.repo.Blobs(ctx), p.config.ConfigMediaType, imgConfig)
165 manifest, err := manifestFromBuilder(ctx, builder, descriptors)
170 manSvc, err := p.repo.Manifests(ctx)
175 putOptions := []distribution.ManifestServiceOption{distribution.WithTag(ref.Tag())}
176 if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
177 if runtime.GOOS == "windows" || p.config.TrustKey == nil || p.config.RequireSchema2 {
178 logrus.Warnf("failed to upload schema2 manifest: %v", err)
182 logrus.Warnf("failed to upload schema2 manifest: %v - falling back to schema1", err)
184 manifestRef, err := reference.WithTag(p.repo.Named(), ref.Tag())
188 builder = schema1.NewConfigManifestBuilder(p.repo.Blobs(ctx), p.config.TrustKey, manifestRef, imgConfig)
189 manifest, err = manifestFromBuilder(ctx, builder, descriptors)
194 if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
199 var canonicalManifest []byte
201 switch v := manifest.(type) {
202 case *schema1.SignedManifest:
203 canonicalManifest = v.Canonical
204 case *schema2.DeserializedManifest:
205 _, canonicalManifest, err = v.Payload()
211 manifestDigest := digest.FromBytes(canonicalManifest)
212 progress.Messagef(p.config.ProgressOutput, "", "%s: digest: %s size: %d", ref.Tag(), manifestDigest, len(canonicalManifest))
214 if err := addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id); err != nil {
218 // Signal digest to the trust client so it can sign the
219 // push, if appropriate.
220 progress.Aux(p.config.ProgressOutput, apitypes.PushResult{Tag: ref.Tag(), Digest: manifestDigest.String(), Size: len(canonicalManifest)})
225 func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuilder, descriptors []xfer.UploadDescriptor) (distribution.Manifest, error) {
226 // descriptors is in reverse order; iterate backwards to get references
227 // appended in the right order.
228 for i := len(descriptors) - 1; i >= 0; i-- {
229 if err := builder.AppendReference(descriptors[i].(*v2PushDescriptor)); err != nil {
234 return builder.Build(ctx)
237 type v2PushDescriptor struct {
239 v2MetadataService metadata.V2MetadataService
241 repoInfo reference.Named
243 endpoint registry.APIEndpoint
244 repo distribution.Repository
246 remoteDescriptor distribution.Descriptor
247 // a set of digests whose presence has been checked in a target repository
248 checkedDigests map[digest.Digest]struct{}
251 func (pd *v2PushDescriptor) Key() string {
252 return "v2push:" + pd.ref.Name() + " " + pd.layer.DiffID().String()
255 func (pd *v2PushDescriptor) ID() string {
256 return stringid.TruncateID(pd.layer.DiffID().String())
259 func (pd *v2PushDescriptor) DiffID() layer.DiffID {
260 return pd.layer.DiffID()
263 func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
264 // Skip foreign layers unless this registry allows nondistributable artifacts.
265 if !pd.endpoint.AllowNondistributableArtifacts {
266 if fs, ok := pd.layer.(distribution.Describable); ok {
267 if d := fs.Descriptor(); len(d.URLs) > 0 {
268 progress.Update(progressOutput, pd.ID(), "Skipped foreign layer")
274 diffID := pd.DiffID()
277 if descriptor, ok := pd.pushState.remoteLayers[diffID]; ok {
278 // it is already known that the push is not needed and
279 // therefore doing a stat is unnecessary
280 pd.pushState.Unlock()
281 progress.Update(progressOutput, pd.ID(), "Layer already exists")
282 return descriptor, nil
284 pd.pushState.Unlock()
286 maxMountAttempts, maxExistenceChecks, checkOtherRepositories := getMaxMountAndExistenceCheckAttempts(pd.layer)
288 // Do we have any metadata associated with this layer's DiffID?
289 v2Metadata, err := pd.v2MetadataService.GetMetadata(diffID)
291 // check for blob existence in the target repository
292 descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, true, 1, v2Metadata)
293 if exists || err != nil {
294 return descriptor, err
298 // if digest was empty or not saved, or if blob does not exist on the remote repository,
299 // then push the blob.
300 bs := pd.repo.Blobs(ctx)
302 var layerUpload distribution.BlobWriter
304 // Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload
305 candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxMountAttempts, v2Metadata)
306 for _, mountCandidate := range candidates {
307 logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountCandidate.Digest, mountCandidate.SourceRepository)
308 createOpts := []distribution.BlobCreateOption{}
310 if len(mountCandidate.SourceRepository) > 0 {
311 namedRef, err := reference.ParseNormalizedNamed(mountCandidate.SourceRepository)
313 logrus.Errorf("failed to parse source repository reference %v: %v", reference.FamiliarString(namedRef), err)
314 pd.v2MetadataService.Remove(mountCandidate)
318 // Candidates are always under same domain, create remote reference
319 // with only path to set mount from with
320 remoteRef, err := reference.WithName(reference.Path(namedRef))
322 logrus.Errorf("failed to make remote reference out of %q: %v", reference.Path(namedRef), err)
326 canonicalRef, err := reference.WithDigest(reference.TrimNamed(remoteRef), mountCandidate.Digest)
328 logrus.Errorf("failed to make canonical reference: %v", err)
332 createOpts = append(createOpts, client.WithMountFrom(canonicalRef))
336 lu, err := bs.Create(ctx, createOpts...)
337 switch err := err.(type) {
340 case distribution.ErrBlobMounted:
341 progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", err.From.Name())
343 err.Descriptor.MediaType = schema2.MediaTypeLayer
346 pd.pushState.confirmedV2 = true
347 pd.pushState.remoteLayers[diffID] = err.Descriptor
348 pd.pushState.Unlock()
350 // Cache mapping from this layer's DiffID to the blobsum
351 if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
352 Digest: err.Descriptor.Digest,
353 SourceRepository: pd.repoInfo.Name(),
355 return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
357 return err.Descriptor, nil
359 logrus.Infof("failed to mount layer %s (%s) from %s: %v", diffID, mountCandidate.Digest, mountCandidate.SourceRepository, err)
362 if len(mountCandidate.SourceRepository) > 0 &&
363 (metadata.CheckV2MetadataHMAC(&mountCandidate, pd.hmacKey) ||
364 len(mountCandidate.HMAC) == 0) {
365 cause := "blob mount failure"
367 cause = fmt.Sprintf("an error: %v", err.Error())
369 logrus.Debugf("removing association between layer %s and %s due to %s", mountCandidate.Digest, mountCandidate.SourceRepository, cause)
370 pd.v2MetadataService.Remove(mountCandidate)
374 // cancel previous upload
375 cancelLayerUpload(ctx, mountCandidate.Digest, layerUpload)
380 if maxExistenceChecks-len(pd.checkedDigests) > 0 {
381 // do additional layer existence checks with other known digests if any
382 descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, checkOtherRepositories, maxExistenceChecks-len(pd.checkedDigests), v2Metadata)
383 if exists || err != nil {
384 return descriptor, err
388 logrus.Debugf("Pushing layer: %s", diffID)
389 if layerUpload == nil {
390 layerUpload, err = bs.Create(ctx)
392 return distribution.Descriptor{}, retryOnError(err)
395 defer layerUpload.Close()
398 desc, err := pd.uploadUsingSession(ctx, progressOutput, diffID, layerUpload)
406 func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
407 pd.remoteDescriptor = descriptor
410 func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor {
411 return pd.remoteDescriptor
414 func (pd *v2PushDescriptor) uploadUsingSession(
416 progressOutput progress.Output,
418 layerUpload distribution.BlobWriter,
419 ) (distribution.Descriptor, error) {
420 var reader io.ReadCloser
422 contentReader, err := pd.layer.Open()
424 return distribution.Descriptor{}, retryOnError(err)
427 size, _ := pd.layer.Size()
429 reader = progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, contentReader), progressOutput, size, pd.ID(), "Pushing")
431 switch m := pd.layer.MediaType(); m {
432 case schema2.MediaTypeUncompressedLayer:
433 compressedReader, compressionDone := compress(reader)
434 defer func(closer io.Closer) {
438 reader = compressedReader
439 case schema2.MediaTypeLayer:
442 return distribution.Descriptor{}, fmt.Errorf("unsupported layer media type %s", m)
445 digester := digest.Canonical.Digester()
446 tee := io.TeeReader(reader, digester.Hash())
448 nn, err := layerUpload.ReadFrom(tee)
451 return distribution.Descriptor{}, retryOnError(err)
454 pushDigest := digester.Digest()
455 if _, err := layerUpload.Commit(ctx, distribution.Descriptor{Digest: pushDigest}); err != nil {
456 return distribution.Descriptor{}, retryOnError(err)
459 logrus.Debugf("uploaded layer %s (%s), %d bytes", diffID, pushDigest, nn)
460 progress.Update(progressOutput, pd.ID(), "Pushed")
462 // Cache mapping from this layer's DiffID to the blobsum
463 if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
465 SourceRepository: pd.repoInfo.Name(),
467 return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
470 desc := distribution.Descriptor{
472 MediaType: schema2.MediaTypeLayer,
477 // If Commit succeeded, that's an indication that the remote registry speaks the v2 protocol.
478 pd.pushState.confirmedV2 = true
479 pd.pushState.remoteLayers[diffID] = desc
480 pd.pushState.Unlock()
485 // layerAlreadyExists checks if the registry already knows about any of the metadata passed in the "metadata"
486 // slice. If it finds one that the registry knows about, it returns the known digest and "true". If
487 // "checkOtherRepositories" is true, stat will be performed also with digests mapped to any other repository
488 // (not just the target one).
489 func (pd *v2PushDescriptor) layerAlreadyExists(
491 progressOutput progress.Output,
493 checkOtherRepositories bool,
494 maxExistenceCheckAttempts int,
495 v2Metadata []metadata.V2Metadata,
496 ) (desc distribution.Descriptor, exists bool, err error) {
497 // filter the metadata
498 candidates := []metadata.V2Metadata{}
499 for _, meta := range v2Metadata {
500 if len(meta.SourceRepository) > 0 && !checkOtherRepositories && meta.SourceRepository != pd.repoInfo.Name() {
503 candidates = append(candidates, meta)
505 // sort the candidates by similarity
506 sortV2MetadataByLikenessAndAge(pd.repoInfo, pd.hmacKey, candidates)
508 digestToMetadata := make(map[digest.Digest]*metadata.V2Metadata)
509 // an array of unique blob digests ordered from the best mount candidates to worst
510 layerDigests := []digest.Digest{}
511 for i := 0; i < len(candidates); i++ {
512 if len(layerDigests) >= maxExistenceCheckAttempts {
515 meta := &candidates[i]
516 if _, exists := digestToMetadata[meta.Digest]; exists {
517 // keep reference just to the first mapping (the best mount candidate)
520 if _, exists := pd.checkedDigests[meta.Digest]; exists {
521 // existence of this digest has already been tested
524 digestToMetadata[meta.Digest] = meta
525 layerDigests = append(layerDigests, meta.Digest)
529 for _, dgst := range layerDigests {
530 meta := digestToMetadata[dgst]
531 logrus.Debugf("Checking for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.Name())
532 desc, err = pd.repo.Blobs(ctx).Stat(ctx, dgst)
533 pd.checkedDigests[meta.Digest] = struct{}{}
536 if m, ok := digestToMetadata[desc.Digest]; !ok || m.SourceRepository != pd.repoInfo.Name() || !metadata.CheckV2MetadataHMAC(m, pd.hmacKey) {
537 // cache mapping from this layer's DiffID to the blobsum
538 if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
540 SourceRepository: pd.repoInfo.Name(),
542 return distribution.Descriptor{}, false, xfer.DoNotRetry{Err: err}
545 desc.MediaType = schema2.MediaTypeLayer
548 case distribution.ErrBlobUnknown:
549 if meta.SourceRepository == pd.repoInfo.Name() {
550 // remove the mapping to the target repository
551 pd.v2MetadataService.Remove(*meta)
554 logrus.WithError(err).Debugf("Failed to check for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.Name())
559 progress.Update(progressOutput, pd.ID(), "Layer already exists")
561 pd.pushState.remoteLayers[diffID] = desc
562 pd.pushState.Unlock()
565 return desc, exists, nil
568 // getMaxMountAndExistenceCheckAttempts returns a maximum number of cross repository mount attempts from
569 // source repositories of target registry, maximum number of layer existence checks performed on the target
570 // repository and whether the check shall be done also with digests mapped to different repositories. The
571 // decision is based on layer size. The smaller the layer, the fewer attempts shall be made because the cost
572 // of upload does not outweigh a latency.
573 func getMaxMountAndExistenceCheckAttempts(layer PushLayer) (maxMountAttempts, maxExistenceCheckAttempts int, checkOtherRepositories bool) {
574 size, err := layer.Size()
577 case size > middleLayerMaximumSize:
578 // 1st attempt to mount the blob few times
579 // 2nd few existence checks with digests associated to any repository
580 // then fallback to upload
583 // middle sized blobs; if we could not get the size, assume we deal with middle sized blob
584 case size > smallLayerMaximumSize, err != nil:
585 // 1st attempt to mount blobs of average size few times
586 // 2nd try at most 1 existence check if there's an existing mapping to the target repository
587 // then fallback to upload
590 // small blobs, do a minimum number of checks
596 // getRepositoryMountCandidates returns an array of v2 metadata items belonging to the given registry. The
597 // array is sorted from youngest to oldest. If requireRegistryMatch is true, the resulting array will contain
598 // only metadata entries having registry part of SourceRepository matching the part of repoInfo.
599 func getRepositoryMountCandidates(
600 repoInfo reference.Named,
603 v2Metadata []metadata.V2Metadata,
604 ) []metadata.V2Metadata {
605 candidates := []metadata.V2Metadata{}
606 for _, meta := range v2Metadata {
607 sourceRepo, err := reference.ParseNamed(meta.SourceRepository)
608 if err != nil || reference.Domain(repoInfo) != reference.Domain(sourceRepo) {
611 // target repository is not a viable candidate
612 if meta.SourceRepository == repoInfo.Name() {
615 candidates = append(candidates, meta)
618 sortV2MetadataByLikenessAndAge(repoInfo, hmacKey, candidates)
619 if max >= 0 && len(candidates) > max {
620 // select the youngest metadata
621 candidates = candidates[:max]
627 // byLikeness is a sorting container for v2 metadata candidates for cross repository mount. The
628 // candidate "a" is preferred over "b":
630 // 1. if it was hashed using the same AuthConfig as the one used to authenticate to target repository and the
632 // 2. if a number of its repository path components exactly matching path components of target repository is higher
633 type byLikeness struct {
634 arr []metadata.V2Metadata
636 pathComponents []string
639 func (bla byLikeness) Less(i, j int) bool {
640 aMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[i], bla.hmacKey)
641 bMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[j], bla.hmacKey)
642 if aMacMatch != bMacMatch {
645 aMatch := numOfMatchingPathComponents(bla.arr[i].SourceRepository, bla.pathComponents)
646 bMatch := numOfMatchingPathComponents(bla.arr[j].SourceRepository, bla.pathComponents)
647 return aMatch > bMatch
649 func (bla byLikeness) Swap(i, j int) {
650 bla.arr[i], bla.arr[j] = bla.arr[j], bla.arr[i]
652 func (bla byLikeness) Len() int { return len(bla.arr) }
654 func sortV2MetadataByLikenessAndAge(repoInfo reference.Named, hmacKey []byte, marr []metadata.V2Metadata) {
655 // reverse the metadata array to shift the newest entries to the beginning
656 for i := 0; i < len(marr)/2; i++ {
657 marr[i], marr[len(marr)-i-1] = marr[len(marr)-i-1], marr[i]
659 // keep equal entries ordered from the youngest to the oldest
660 sort.Stable(byLikeness{
663 pathComponents: getPathComponents(repoInfo.Name()),
667 // numOfMatchingPathComponents returns a number of path components in "pth" that exactly match "matchComponents".
668 func numOfMatchingPathComponents(pth string, matchComponents []string) int {
669 pthComponents := getPathComponents(pth)
671 for ; i < len(pthComponents) && i < len(matchComponents); i++ {
672 if matchComponents[i] != pthComponents[i] {
679 func getPathComponents(path string) []string {
680 return strings.Split(path, "/")
683 func cancelLayerUpload(ctx context.Context, dgst digest.Digest, layerUpload distribution.BlobWriter) {
684 if layerUpload != nil {
685 logrus.Debugf("cancelling upload of blob %s", dgst)
686 err := layerUpload.Cancel(ctx)
688 logrus.Warnf("failed to cancel upload: %v", err)