Tizen_4.0 base
[platform/upstream/docker-engine.git] / distribution / push_v1.go
1 package distribution
2
3 import (
4         "fmt"
5         "sync"
6
7         "github.com/Sirupsen/logrus"
8         "github.com/docker/distribution/reference"
9         "github.com/docker/distribution/registry/client/transport"
10         "github.com/docker/docker/distribution/metadata"
11         "github.com/docker/docker/dockerversion"
12         "github.com/docker/docker/image"
13         "github.com/docker/docker/image/v1"
14         "github.com/docker/docker/layer"
15         "github.com/docker/docker/pkg/ioutils"
16         "github.com/docker/docker/pkg/progress"
17         "github.com/docker/docker/pkg/stringid"
18         "github.com/docker/docker/registry"
19         "github.com/opencontainers/go-digest"
20         "golang.org/x/net/context"
21 )
22
23 type v1Pusher struct {
24         v1IDService *metadata.V1IDService
25         endpoint    registry.APIEndpoint
26         ref         reference.Named
27         repoInfo    *registry.RepositoryInfo
28         config      *ImagePushConfig
29         session     *registry.Session
30 }
31
32 func (p *v1Pusher) Push(ctx context.Context) error {
33         tlsConfig, err := p.config.RegistryService.TLSConfig(p.repoInfo.Index.Name)
34         if err != nil {
35                 return err
36         }
37         // Adds Docker-specific headers as well as user-specified headers (metaHeaders)
38         tr := transport.NewTransport(
39                 // TODO(tiborvass): was NoTimeout
40                 registry.NewTransport(tlsConfig),
41                 registry.DockerHeaders(dockerversion.DockerUserAgent(ctx), p.config.MetaHeaders)...,
42         )
43         client := registry.HTTPClient(tr)
44         v1Endpoint, err := p.endpoint.ToV1Endpoint(dockerversion.DockerUserAgent(ctx), p.config.MetaHeaders)
45         if err != nil {
46                 logrus.Debugf("Could not get v1 endpoint: %v", err)
47                 return fallbackError{err: err}
48         }
49         p.session, err = registry.NewSession(client, p.config.AuthConfig, v1Endpoint)
50         if err != nil {
51                 // TODO(dmcgowan): Check if should fallback
52                 return fallbackError{err: err}
53         }
54         if err := p.pushRepository(ctx); err != nil {
55                 // TODO(dmcgowan): Check if should fallback
56                 return err
57         }
58         return nil
59 }
60
61 // v1Image exposes the configuration, filesystem layer ID, and a v1 ID for an
62 // image being pushed to a v1 registry.
63 type v1Image interface {
64         Config() []byte
65         Layer() layer.Layer
66         V1ID() string
67 }
68
69 type v1ImageCommon struct {
70         layer  layer.Layer
71         config []byte
72         v1ID   string
73 }
74
75 func (common *v1ImageCommon) Config() []byte {
76         return common.config
77 }
78
79 func (common *v1ImageCommon) V1ID() string {
80         return common.v1ID
81 }
82
83 func (common *v1ImageCommon) Layer() layer.Layer {
84         return common.layer
85 }
86
87 // v1TopImage defines a runnable (top layer) image being pushed to a v1
88 // registry.
89 type v1TopImage struct {
90         v1ImageCommon
91         imageID image.ID
92 }
93
94 func newV1TopImage(imageID image.ID, img *image.Image, l layer.Layer, parent *v1DependencyImage) (*v1TopImage, error) {
95         v1ID := imageID.Digest().Hex()
96         parentV1ID := ""
97         if parent != nil {
98                 parentV1ID = parent.V1ID()
99         }
100
101         config, err := v1.MakeV1ConfigFromConfig(img, v1ID, parentV1ID, false)
102         if err != nil {
103                 return nil, err
104         }
105
106         return &v1TopImage{
107                 v1ImageCommon: v1ImageCommon{
108                         v1ID:   v1ID,
109                         config: config,
110                         layer:  l,
111                 },
112                 imageID: imageID,
113         }, nil
114 }
115
116 // v1DependencyImage defines a dependency layer being pushed to a v1 registry.
117 type v1DependencyImage struct {
118         v1ImageCommon
119 }
120
121 func newV1DependencyImage(l layer.Layer, parent *v1DependencyImage) *v1DependencyImage {
122         v1ID := digest.Digest(l.ChainID()).Hex()
123
124         var config string
125         if parent != nil {
126                 config = fmt.Sprintf(`{"id":"%s","parent":"%s"}`, v1ID, parent.V1ID())
127         } else {
128                 config = fmt.Sprintf(`{"id":"%s"}`, v1ID)
129         }
130         return &v1DependencyImage{
131                 v1ImageCommon: v1ImageCommon{
132                         v1ID:   v1ID,
133                         config: []byte(config),
134                         layer:  l,
135                 },
136         }
137 }
138
139 // Retrieve the all the images to be uploaded in the correct order
140 func (p *v1Pusher) getImageList() (imageList []v1Image, tagsByImage map[image.ID][]string, referencedLayers []PushLayer, err error) {
141         tagsByImage = make(map[image.ID][]string)
142
143         // Ignore digest references
144         if _, isCanonical := p.ref.(reference.Canonical); isCanonical {
145                 return
146         }
147
148         tagged, isTagged := p.ref.(reference.NamedTagged)
149         if isTagged {
150                 // Push a specific tag
151                 var imgID image.ID
152                 var dgst digest.Digest
153                 dgst, err = p.config.ReferenceStore.Get(p.ref)
154                 if err != nil {
155                         return
156                 }
157                 imgID = image.IDFromDigest(dgst)
158
159                 imageList, err = p.imageListForTag(imgID, nil, &referencedLayers)
160                 if err != nil {
161                         return
162                 }
163
164                 tagsByImage[imgID] = []string{tagged.Tag()}
165
166                 return
167         }
168
169         imagesSeen := make(map[digest.Digest]struct{})
170         dependenciesSeen := make(map[layer.ChainID]*v1DependencyImage)
171
172         associations := p.config.ReferenceStore.ReferencesByName(p.ref)
173         for _, association := range associations {
174                 if tagged, isTagged = association.Ref.(reference.NamedTagged); !isTagged {
175                         // Ignore digest references.
176                         continue
177                 }
178
179                 imgID := image.IDFromDigest(association.ID)
180                 tagsByImage[imgID] = append(tagsByImage[imgID], tagged.Tag())
181
182                 if _, present := imagesSeen[association.ID]; present {
183                         // Skip generating image list for already-seen image
184                         continue
185                 }
186                 imagesSeen[association.ID] = struct{}{}
187
188                 imageListForThisTag, err := p.imageListForTag(imgID, dependenciesSeen, &referencedLayers)
189                 if err != nil {
190                         return nil, nil, nil, err
191                 }
192
193                 // append to main image list
194                 imageList = append(imageList, imageListForThisTag...)
195         }
196         if len(imageList) == 0 {
197                 return nil, nil, nil, fmt.Errorf("No images found for the requested repository / tag")
198         }
199         logrus.Debugf("Image list: %v", imageList)
200         logrus.Debugf("Tags by image: %v", tagsByImage)
201
202         return
203 }
204
205 func (p *v1Pusher) imageListForTag(imgID image.ID, dependenciesSeen map[layer.ChainID]*v1DependencyImage, referencedLayers *[]PushLayer) (imageListForThisTag []v1Image, err error) {
206         ics, ok := p.config.ImageStore.(*imageConfigStore)
207         if !ok {
208                 return nil, fmt.Errorf("only image store images supported for v1 push")
209         }
210         img, err := ics.Store.Get(imgID)
211         if err != nil {
212                 return nil, err
213         }
214
215         topLayerID := img.RootFS.ChainID()
216         topLayer := layer.Layer(layer.EmptyLayer)
217
218         pl, err := p.config.LayerStore.Get(topLayerID)
219         *referencedLayers = append(*referencedLayers, pl)
220         if err != nil {
221                 return nil, fmt.Errorf("failed to get top layer from image: %v", err)
222         }
223
224         // V1 push is deprecated, only support existing layerstore layers
225         lsl, ok := pl.(*storeLayer)
226         if !ok {
227                 return nil, fmt.Errorf("only layer store layers supported for v1 push")
228         }
229         l := lsl.Layer
230
231         if l.DiffID() == layer.EmptyLayer.DiffID() {
232                 topLayer = l
233                 l = l.Parent()
234         }
235
236         dependencyImages, parent := generateDependencyImages(l, dependenciesSeen)
237
238         topImage, err := newV1TopImage(imgID, img, topLayer, parent)
239         if err != nil {
240                 return nil, err
241         }
242
243         imageListForThisTag = append(dependencyImages, topImage)
244
245         return
246 }
247
248 func generateDependencyImages(l layer.Layer, dependenciesSeen map[layer.ChainID]*v1DependencyImage) (imageListForThisTag []v1Image, parent *v1DependencyImage) {
249         if l == nil {
250                 return nil, nil
251         }
252
253         imageListForThisTag, parent = generateDependencyImages(l.Parent(), dependenciesSeen)
254
255         if dependenciesSeen != nil {
256                 if dependencyImage, present := dependenciesSeen[l.ChainID()]; present {
257                         // This layer is already on the list, we can ignore it
258                         // and all its parents.
259                         return imageListForThisTag, dependencyImage
260                 }
261         }
262
263         dependencyImage := newV1DependencyImage(l, parent)
264         imageListForThisTag = append(imageListForThisTag, dependencyImage)
265
266         if dependenciesSeen != nil {
267                 dependenciesSeen[l.ChainID()] = dependencyImage
268         }
269
270         return imageListForThisTag, dependencyImage
271 }
272
273 // createImageIndex returns an index of an image's layer IDs and tags.
274 func createImageIndex(images []v1Image, tags map[image.ID][]string) []*registry.ImgData {
275         var imageIndex []*registry.ImgData
276         for _, img := range images {
277                 v1ID := img.V1ID()
278
279                 if topImage, isTopImage := img.(*v1TopImage); isTopImage {
280                         if tags, hasTags := tags[topImage.imageID]; hasTags {
281                                 // If an image has tags you must add an entry in the image index
282                                 // for each tag
283                                 for _, tag := range tags {
284                                         imageIndex = append(imageIndex, &registry.ImgData{
285                                                 ID:  v1ID,
286                                                 Tag: tag,
287                                         })
288                                 }
289                                 continue
290                         }
291                 }
292
293                 // If the image does not have a tag it still needs to be sent to the
294                 // registry with an empty tag so that it is associated with the repository
295                 imageIndex = append(imageIndex, &registry.ImgData{
296                         ID:  v1ID,
297                         Tag: "",
298                 })
299         }
300         return imageIndex
301 }
302
303 // lookupImageOnEndpoint checks the specified endpoint to see if an image exists
304 // and if it is absent then it sends the image id to the channel to be pushed.
305 func (p *v1Pusher) lookupImageOnEndpoint(wg *sync.WaitGroup, endpoint string, images chan v1Image, imagesToPush chan string) {
306         defer wg.Done()
307         for image := range images {
308                 v1ID := image.V1ID()
309                 truncID := stringid.TruncateID(image.Layer().DiffID().String())
310                 if err := p.session.LookupRemoteImage(v1ID, endpoint); err != nil {
311                         logrus.Errorf("Error in LookupRemoteImage: %s", err)
312                         imagesToPush <- v1ID
313                         progress.Update(p.config.ProgressOutput, truncID, "Waiting")
314                 } else {
315                         progress.Update(p.config.ProgressOutput, truncID, "Already exists")
316                 }
317         }
318 }
319
320 func (p *v1Pusher) pushImageToEndpoint(ctx context.Context, endpoint string, imageList []v1Image, tags map[image.ID][]string, repo *registry.RepositoryData) error {
321         workerCount := len(imageList)
322         // start a maximum of 5 workers to check if images exist on the specified endpoint.
323         if workerCount > 5 {
324                 workerCount = 5
325         }
326         var (
327                 wg           = &sync.WaitGroup{}
328                 imageData    = make(chan v1Image, workerCount*2)
329                 imagesToPush = make(chan string, workerCount*2)
330                 pushes       = make(chan map[string]struct{}, 1)
331         )
332         for i := 0; i < workerCount; i++ {
333                 wg.Add(1)
334                 go p.lookupImageOnEndpoint(wg, endpoint, imageData, imagesToPush)
335         }
336         // start a go routine that consumes the images to push
337         go func() {
338                 shouldPush := make(map[string]struct{})
339                 for id := range imagesToPush {
340                         shouldPush[id] = struct{}{}
341                 }
342                 pushes <- shouldPush
343         }()
344         for _, v1Image := range imageList {
345                 imageData <- v1Image
346         }
347         // close the channel to notify the workers that there will be no more images to check.
348         close(imageData)
349         wg.Wait()
350         close(imagesToPush)
351         // wait for all the images that require pushes to be collected into a consumable map.
352         shouldPush := <-pushes
353         // finish by pushing any images and tags to the endpoint.  The order that the images are pushed
354         // is very important that is why we are still iterating over the ordered list of imageIDs.
355         for _, img := range imageList {
356                 v1ID := img.V1ID()
357                 if _, push := shouldPush[v1ID]; push {
358                         if _, err := p.pushImage(ctx, img, endpoint); err != nil {
359                                 // FIXME: Continue on error?
360                                 return err
361                         }
362                 }
363                 if topImage, isTopImage := img.(*v1TopImage); isTopImage {
364                         for _, tag := range tags[topImage.imageID] {
365                                 progress.Messagef(p.config.ProgressOutput, "", "Pushing tag for rev [%s] on {%s}", stringid.TruncateID(v1ID), endpoint+"repositories/"+reference.Path(p.repoInfo.Name)+"/tags/"+tag)
366                                 if err := p.session.PushRegistryTag(p.repoInfo.Name, v1ID, tag, endpoint); err != nil {
367                                         return err
368                                 }
369                         }
370                 }
371         }
372         return nil
373 }
374
375 // pushRepository pushes layers that do not already exist on the registry.
376 func (p *v1Pusher) pushRepository(ctx context.Context) error {
377         imgList, tags, referencedLayers, err := p.getImageList()
378         defer func() {
379                 for _, l := range referencedLayers {
380                         l.Release()
381                 }
382         }()
383         if err != nil {
384                 return err
385         }
386
387         imageIndex := createImageIndex(imgList, tags)
388         for _, data := range imageIndex {
389                 logrus.Debugf("Pushing ID: %s with Tag: %s", data.ID, data.Tag)
390         }
391
392         // Register all the images in a repository with the registry
393         // If an image is not in this list it will not be associated with the repository
394         repoData, err := p.session.PushImageJSONIndex(p.repoInfo.Name, imageIndex, false, nil)
395         if err != nil {
396                 return err
397         }
398         // push the repository to each of the endpoints only if it does not exist.
399         for _, endpoint := range repoData.Endpoints {
400                 if err := p.pushImageToEndpoint(ctx, endpoint, imgList, tags, repoData); err != nil {
401                         return err
402                 }
403         }
404         _, err = p.session.PushImageJSONIndex(p.repoInfo.Name, imageIndex, true, repoData.Endpoints)
405         return err
406 }
407
408 func (p *v1Pusher) pushImage(ctx context.Context, v1Image v1Image, ep string) (checksum string, err error) {
409         l := v1Image.Layer()
410         v1ID := v1Image.V1ID()
411         truncID := stringid.TruncateID(l.DiffID().String())
412
413         jsonRaw := v1Image.Config()
414         progress.Update(p.config.ProgressOutput, truncID, "Pushing")
415
416         // General rule is to use ID for graph accesses and compatibilityID for
417         // calls to session.registry()
418         imgData := &registry.ImgData{
419                 ID: v1ID,
420         }
421
422         // Send the json
423         if err := p.session.PushImageJSONRegistry(imgData, jsonRaw, ep); err != nil {
424                 if err == registry.ErrAlreadyExists {
425                         progress.Update(p.config.ProgressOutput, truncID, "Image already pushed, skipping")
426                         return "", nil
427                 }
428                 return "", err
429         }
430
431         arch, err := l.TarStream()
432         if err != nil {
433                 return "", err
434         }
435         defer arch.Close()
436
437         // don't care if this fails; best effort
438         size, _ := l.DiffSize()
439
440         // Send the layer
441         logrus.Debugf("rendered layer for %s of [%d] size", v1ID, size)
442
443         reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, arch), p.config.ProgressOutput, size, truncID, "Pushing")
444         defer reader.Close()
445
446         checksum, checksumPayload, err := p.session.PushImageLayerRegistry(v1ID, reader, ep, jsonRaw)
447         if err != nil {
448                 return "", err
449         }
450         imgData.Checksum = checksum
451         imgData.ChecksumPayload = checksumPayload
452         // Send the checksum
453         if err := p.session.PushImageChecksumRegistry(imgData, ep); err != nil {
454                 return "", err
455         }
456
457         if err := p.v1IDService.Set(v1ID, p.repoInfo.Index.Name, l.DiffID()); err != nil {
458                 logrus.Warnf("Could not set v1 ID mapping: %v", err)
459         }
460
461         progress.Update(p.config.ProgressOutput, truncID, "Image successfully pushed")
462         return imgData.Checksum, nil
463 }