7 "github.com/Sirupsen/logrus"
8 "github.com/docker/distribution/reference"
9 "github.com/docker/distribution/registry/client/transport"
10 "github.com/docker/docker/distribution/metadata"
11 "github.com/docker/docker/dockerversion"
12 "github.com/docker/docker/image"
13 "github.com/docker/docker/image/v1"
14 "github.com/docker/docker/layer"
15 "github.com/docker/docker/pkg/ioutils"
16 "github.com/docker/docker/pkg/progress"
17 "github.com/docker/docker/pkg/stringid"
18 "github.com/docker/docker/registry"
19 "github.com/opencontainers/go-digest"
20 "golang.org/x/net/context"
23 type v1Pusher struct {
24 v1IDService *metadata.V1IDService
25 endpoint registry.APIEndpoint
27 repoInfo *registry.RepositoryInfo
28 config *ImagePushConfig
29 session *registry.Session
32 func (p *v1Pusher) Push(ctx context.Context) error {
33 tlsConfig, err := p.config.RegistryService.TLSConfig(p.repoInfo.Index.Name)
37 // Adds Docker-specific headers as well as user-specified headers (metaHeaders)
38 tr := transport.NewTransport(
39 // TODO(tiborvass): was NoTimeout
40 registry.NewTransport(tlsConfig),
41 registry.DockerHeaders(dockerversion.DockerUserAgent(ctx), p.config.MetaHeaders)...,
43 client := registry.HTTPClient(tr)
44 v1Endpoint, err := p.endpoint.ToV1Endpoint(dockerversion.DockerUserAgent(ctx), p.config.MetaHeaders)
46 logrus.Debugf("Could not get v1 endpoint: %v", err)
47 return fallbackError{err: err}
49 p.session, err = registry.NewSession(client, p.config.AuthConfig, v1Endpoint)
51 // TODO(dmcgowan): Check if should fallback
52 return fallbackError{err: err}
54 if err := p.pushRepository(ctx); err != nil {
55 // TODO(dmcgowan): Check if should fallback
61 // v1Image exposes the configuration, filesystem layer ID, and a v1 ID for an
62 // image being pushed to a v1 registry.
63 type v1Image interface {
69 type v1ImageCommon struct {
75 func (common *v1ImageCommon) Config() []byte {
79 func (common *v1ImageCommon) V1ID() string {
83 func (common *v1ImageCommon) Layer() layer.Layer {
87 // v1TopImage defines a runnable (top layer) image being pushed to a v1
89 type v1TopImage struct {
94 func newV1TopImage(imageID image.ID, img *image.Image, l layer.Layer, parent *v1DependencyImage) (*v1TopImage, error) {
95 v1ID := imageID.Digest().Hex()
98 parentV1ID = parent.V1ID()
101 config, err := v1.MakeV1ConfigFromConfig(img, v1ID, parentV1ID, false)
107 v1ImageCommon: v1ImageCommon{
116 // v1DependencyImage defines a dependency layer being pushed to a v1 registry.
117 type v1DependencyImage struct {
121 func newV1DependencyImage(l layer.Layer, parent *v1DependencyImage) *v1DependencyImage {
122 v1ID := digest.Digest(l.ChainID()).Hex()
126 config = fmt.Sprintf(`{"id":"%s","parent":"%s"}`, v1ID, parent.V1ID())
128 config = fmt.Sprintf(`{"id":"%s"}`, v1ID)
130 return &v1DependencyImage{
131 v1ImageCommon: v1ImageCommon{
133 config: []byte(config),
139 // Retrieve the all the images to be uploaded in the correct order
140 func (p *v1Pusher) getImageList() (imageList []v1Image, tagsByImage map[image.ID][]string, referencedLayers []PushLayer, err error) {
141 tagsByImage = make(map[image.ID][]string)
143 // Ignore digest references
144 if _, isCanonical := p.ref.(reference.Canonical); isCanonical {
148 tagged, isTagged := p.ref.(reference.NamedTagged)
150 // Push a specific tag
152 var dgst digest.Digest
153 dgst, err = p.config.ReferenceStore.Get(p.ref)
157 imgID = image.IDFromDigest(dgst)
159 imageList, err = p.imageListForTag(imgID, nil, &referencedLayers)
164 tagsByImage[imgID] = []string{tagged.Tag()}
169 imagesSeen := make(map[digest.Digest]struct{})
170 dependenciesSeen := make(map[layer.ChainID]*v1DependencyImage)
172 associations := p.config.ReferenceStore.ReferencesByName(p.ref)
173 for _, association := range associations {
174 if tagged, isTagged = association.Ref.(reference.NamedTagged); !isTagged {
175 // Ignore digest references.
179 imgID := image.IDFromDigest(association.ID)
180 tagsByImage[imgID] = append(tagsByImage[imgID], tagged.Tag())
182 if _, present := imagesSeen[association.ID]; present {
183 // Skip generating image list for already-seen image
186 imagesSeen[association.ID] = struct{}{}
188 imageListForThisTag, err := p.imageListForTag(imgID, dependenciesSeen, &referencedLayers)
190 return nil, nil, nil, err
193 // append to main image list
194 imageList = append(imageList, imageListForThisTag...)
196 if len(imageList) == 0 {
197 return nil, nil, nil, fmt.Errorf("No images found for the requested repository / tag")
199 logrus.Debugf("Image list: %v", imageList)
200 logrus.Debugf("Tags by image: %v", tagsByImage)
205 func (p *v1Pusher) imageListForTag(imgID image.ID, dependenciesSeen map[layer.ChainID]*v1DependencyImage, referencedLayers *[]PushLayer) (imageListForThisTag []v1Image, err error) {
206 ics, ok := p.config.ImageStore.(*imageConfigStore)
208 return nil, fmt.Errorf("only image store images supported for v1 push")
210 img, err := ics.Store.Get(imgID)
215 topLayerID := img.RootFS.ChainID()
216 topLayer := layer.Layer(layer.EmptyLayer)
218 pl, err := p.config.LayerStore.Get(topLayerID)
219 *referencedLayers = append(*referencedLayers, pl)
221 return nil, fmt.Errorf("failed to get top layer from image: %v", err)
224 // V1 push is deprecated, only support existing layerstore layers
225 lsl, ok := pl.(*storeLayer)
227 return nil, fmt.Errorf("only layer store layers supported for v1 push")
231 if l.DiffID() == layer.EmptyLayer.DiffID() {
236 dependencyImages, parent := generateDependencyImages(l, dependenciesSeen)
238 topImage, err := newV1TopImage(imgID, img, topLayer, parent)
243 imageListForThisTag = append(dependencyImages, topImage)
248 func generateDependencyImages(l layer.Layer, dependenciesSeen map[layer.ChainID]*v1DependencyImage) (imageListForThisTag []v1Image, parent *v1DependencyImage) {
253 imageListForThisTag, parent = generateDependencyImages(l.Parent(), dependenciesSeen)
255 if dependenciesSeen != nil {
256 if dependencyImage, present := dependenciesSeen[l.ChainID()]; present {
257 // This layer is already on the list, we can ignore it
258 // and all its parents.
259 return imageListForThisTag, dependencyImage
263 dependencyImage := newV1DependencyImage(l, parent)
264 imageListForThisTag = append(imageListForThisTag, dependencyImage)
266 if dependenciesSeen != nil {
267 dependenciesSeen[l.ChainID()] = dependencyImage
270 return imageListForThisTag, dependencyImage
273 // createImageIndex returns an index of an image's layer IDs and tags.
274 func createImageIndex(images []v1Image, tags map[image.ID][]string) []*registry.ImgData {
275 var imageIndex []*registry.ImgData
276 for _, img := range images {
279 if topImage, isTopImage := img.(*v1TopImage); isTopImage {
280 if tags, hasTags := tags[topImage.imageID]; hasTags {
281 // If an image has tags you must add an entry in the image index
283 for _, tag := range tags {
284 imageIndex = append(imageIndex, ®istry.ImgData{
293 // If the image does not have a tag it still needs to be sent to the
294 // registry with an empty tag so that it is associated with the repository
295 imageIndex = append(imageIndex, ®istry.ImgData{
303 // lookupImageOnEndpoint checks the specified endpoint to see if an image exists
304 // and if it is absent then it sends the image id to the channel to be pushed.
305 func (p *v1Pusher) lookupImageOnEndpoint(wg *sync.WaitGroup, endpoint string, images chan v1Image, imagesToPush chan string) {
307 for image := range images {
309 truncID := stringid.TruncateID(image.Layer().DiffID().String())
310 if err := p.session.LookupRemoteImage(v1ID, endpoint); err != nil {
311 logrus.Errorf("Error in LookupRemoteImage: %s", err)
313 progress.Update(p.config.ProgressOutput, truncID, "Waiting")
315 progress.Update(p.config.ProgressOutput, truncID, "Already exists")
320 func (p *v1Pusher) pushImageToEndpoint(ctx context.Context, endpoint string, imageList []v1Image, tags map[image.ID][]string, repo *registry.RepositoryData) error {
321 workerCount := len(imageList)
322 // start a maximum of 5 workers to check if images exist on the specified endpoint.
327 wg = &sync.WaitGroup{}
328 imageData = make(chan v1Image, workerCount*2)
329 imagesToPush = make(chan string, workerCount*2)
330 pushes = make(chan map[string]struct{}, 1)
332 for i := 0; i < workerCount; i++ {
334 go p.lookupImageOnEndpoint(wg, endpoint, imageData, imagesToPush)
336 // start a go routine that consumes the images to push
338 shouldPush := make(map[string]struct{})
339 for id := range imagesToPush {
340 shouldPush[id] = struct{}{}
344 for _, v1Image := range imageList {
347 // close the channel to notify the workers that there will be no more images to check.
351 // wait for all the images that require pushes to be collected into a consumable map.
352 shouldPush := <-pushes
353 // finish by pushing any images and tags to the endpoint. The order that the images are pushed
354 // is very important that is why we are still iterating over the ordered list of imageIDs.
355 for _, img := range imageList {
357 if _, push := shouldPush[v1ID]; push {
358 if _, err := p.pushImage(ctx, img, endpoint); err != nil {
359 // FIXME: Continue on error?
363 if topImage, isTopImage := img.(*v1TopImage); isTopImage {
364 for _, tag := range tags[topImage.imageID] {
365 progress.Messagef(p.config.ProgressOutput, "", "Pushing tag for rev [%s] on {%s}", stringid.TruncateID(v1ID), endpoint+"repositories/"+reference.Path(p.repoInfo.Name)+"/tags/"+tag)
366 if err := p.session.PushRegistryTag(p.repoInfo.Name, v1ID, tag, endpoint); err != nil {
375 // pushRepository pushes layers that do not already exist on the registry.
376 func (p *v1Pusher) pushRepository(ctx context.Context) error {
377 imgList, tags, referencedLayers, err := p.getImageList()
379 for _, l := range referencedLayers {
387 imageIndex := createImageIndex(imgList, tags)
388 for _, data := range imageIndex {
389 logrus.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag)
392 // Register all the images in a repository with the registry
393 // If an image is not in this list it will not be associated with the repository
394 repoData, err := p.session.PushImageJSONIndex(p.repoInfo.Name, imageIndex, false, nil)
398 // push the repository to each of the endpoints only if it does not exist.
399 for _, endpoint := range repoData.Endpoints {
400 if err := p.pushImageToEndpoint(ctx, endpoint, imgList, tags, repoData); err != nil {
404 _, err = p.session.PushImageJSONIndex(p.repoInfo.Name, imageIndex, true, repoData.Endpoints)
408 func (p *v1Pusher) pushImage(ctx context.Context, v1Image v1Image, ep string) (checksum string, err error) {
410 v1ID := v1Image.V1ID()
411 truncID := stringid.TruncateID(l.DiffID().String())
413 jsonRaw := v1Image.Config()
414 progress.Update(p.config.ProgressOutput, truncID, "Pushing")
416 // General rule is to use ID for graph accesses and compatibilityID for
417 // calls to session.registry()
418 imgData := ®istry.ImgData{
423 if err := p.session.PushImageJSONRegistry(imgData, jsonRaw, ep); err != nil {
424 if err == registry.ErrAlreadyExists {
425 progress.Update(p.config.ProgressOutput, truncID, "Image already pushed, skipping")
431 arch, err := l.TarStream()
437 // don't care if this fails; best effort
438 size, _ := l.DiffSize()
441 logrus.Debugf("rendered layer for %s of [%d] size", v1ID, size)
443 reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, arch), p.config.ProgressOutput, size, truncID, "Pushing")
446 checksum, checksumPayload, err := p.session.PushImageLayerRegistry(v1ID, reader, ep, jsonRaw)
450 imgData.Checksum = checksum
451 imgData.ChecksumPayload = checksumPayload
453 if err := p.session.PushImageChecksumRegistry(imgData, ep); err != nil {
457 if err := p.v1IDService.Set(v1ID, p.repoInfo.Index.Name, l.DiffID()); err != nil {
458 logrus.Warnf("Could not set v1 ID mapping: %v", err)
461 progress.Update(p.config.ProgressOutput, truncID, "Image successfully pushed")
462 return imgData.Checksum, nil