19 "github.com/Sirupsen/logrus"
20 "github.com/docker/distribution/manifest/schema2"
21 "github.com/docker/distribution/reference"
22 "github.com/docker/docker/api/types"
23 "github.com/docker/docker/api/types/filters"
24 "github.com/docker/docker/distribution"
25 progressutils "github.com/docker/docker/distribution/utils"
26 "github.com/docker/docker/distribution/xfer"
27 "github.com/docker/docker/dockerversion"
28 "github.com/docker/docker/image"
29 "github.com/docker/docker/layer"
30 "github.com/docker/docker/pkg/authorization"
31 "github.com/docker/docker/pkg/chrootarchive"
32 "github.com/docker/docker/pkg/ioutils"
33 "github.com/docker/docker/pkg/mount"
34 "github.com/docker/docker/pkg/pools"
35 "github.com/docker/docker/pkg/progress"
36 "github.com/docker/docker/plugin/v2"
37 refstore "github.com/docker/docker/reference"
38 "github.com/opencontainers/go-digest"
39 "github.com/pkg/errors"
40 "golang.org/x/net/context"
43 var acceptedPluginFilterTags = map[string]bool{
48 // Disable deactivates a plugin. This means resources (volumes, networks) cant use them.
49 func (pm *Manager) Disable(refOrID string, config *types.PluginDisableConfig) error {
50 p, err := pm.config.Store.GetV2Plugin(refOrID)
58 if !config.ForceDisable && p.GetRefCount() > 0 {
59 return fmt.Errorf("plugin %s is in use", p.Name())
62 for _, typ := range p.GetTypes() {
63 if typ.Capability == authorization.AuthZApiImplements {
64 pm.config.AuthzMiddleware.RemovePlugin(p.Name())
68 if err := pm.disable(p, c); err != nil {
71 pm.config.LogPluginEvent(p.GetID(), refOrID, "disable")
75 // Enable activates a plugin, which implies that they are ready to be used by containers.
76 func (pm *Manager) Enable(refOrID string, config *types.PluginEnableConfig) error {
77 p, err := pm.config.Store.GetV2Plugin(refOrID)
82 c := &controller{timeoutInSecs: config.Timeout}
83 if err := pm.enable(p, c, false); err != nil {
86 pm.config.LogPluginEvent(p.GetID(), refOrID, "enable")
90 // Inspect examines a plugin config
91 func (pm *Manager) Inspect(refOrID string) (tp *types.Plugin, err error) {
92 p, err := pm.config.Store.GetV2Plugin(refOrID)
97 return &p.PluginObj, nil
100 func (pm *Manager) pull(ctx context.Context, ref reference.Named, config *distribution.ImagePullConfig, outStream io.Writer) error {
101 if outStream != nil {
102 // Include a buffer so that slow client connections don't affect
103 // transfer performance.
104 progressChan := make(chan progress.Progress, 100)
106 writesDone := make(chan struct{})
113 var cancelFunc context.CancelFunc
114 ctx, cancelFunc = context.WithCancel(ctx)
117 progressutils.WriteDistributionProgress(cancelFunc, outStream, progressChan)
121 config.ProgressOutput = progress.ChanOutput(progressChan)
123 config.ProgressOutput = progress.DiscardOutput()
125 return distribution.Pull(ctx, ref, config)
128 type tempConfigStore struct {
130 configDigest digest.Digest
133 func (s *tempConfigStore) Put(c []byte) (digest.Digest, error) {
134 dgst := digest.FromBytes(c)
137 s.configDigest = dgst
142 func (s *tempConfigStore) Get(d digest.Digest) ([]byte, error) {
143 if d != s.configDigest {
144 return nil, fmt.Errorf("digest not found")
149 func (s *tempConfigStore) GetTarSeekStream(d digest.Digest) (ioutils.ReadSeekCloser, error) {
150 return nil, fmt.Errorf("unimplemented")
153 func (s *tempConfigStore) RootFSAndPlatformFromConfig(c []byte) (*image.RootFS, layer.Platform, error) {
154 return configToRootFS(c)
157 func computePrivileges(c types.PluginConfig) (types.PluginPrivileges, error) {
158 var privileges types.PluginPrivileges
159 if c.Network.Type != "null" && c.Network.Type != "bridge" && c.Network.Type != "" {
160 privileges = append(privileges, types.PluginPrivilege{
162 Description: "permissions to access a network",
163 Value: []string{c.Network.Type},
167 privileges = append(privileges, types.PluginPrivilege{
168 Name: "host ipc namespace",
169 Description: "allow access to host ipc namespace",
170 Value: []string{"true"},
174 privileges = append(privileges, types.PluginPrivilege{
175 Name: "host pid namespace",
176 Description: "allow access to host pid namespace",
177 Value: []string{"true"},
180 for _, mount := range c.Mounts {
181 if mount.Source != nil {
182 privileges = append(privileges, types.PluginPrivilege{
184 Description: "host path to mount",
185 Value: []string{*mount.Source},
189 for _, device := range c.Linux.Devices {
190 if device.Path != nil {
191 privileges = append(privileges, types.PluginPrivilege{
193 Description: "host device to access",
194 Value: []string{*device.Path},
198 if c.Linux.AllowAllDevices {
199 privileges = append(privileges, types.PluginPrivilege{
200 Name: "allow-all-devices",
201 Description: "allow 'rwm' access to all devices",
202 Value: []string{"true"},
205 if len(c.Linux.Capabilities) > 0 {
206 privileges = append(privileges, types.PluginPrivilege{
207 Name: "capabilities",
208 Description: "list of additional capabilities required",
209 Value: c.Linux.Capabilities,
213 return privileges, nil
216 // Privileges pulls a plugin config and computes the privileges required to install it.
217 func (pm *Manager) Privileges(ctx context.Context, ref reference.Named, metaHeader http.Header, authConfig *types.AuthConfig) (types.PluginPrivileges, error) {
218 // create image store instance
219 cs := &tempConfigStore{}
221 // DownloadManager not defined because only pulling configuration.
222 pluginPullConfig := &distribution.ImagePullConfig{
223 Config: distribution.Config{
224 MetaHeaders: metaHeader,
225 AuthConfig: authConfig,
226 RegistryService: pm.config.RegistryService,
227 ImageEventLogger: func(string, string, string) {},
230 Schema2Types: distribution.PluginTypes,
233 if err := pm.pull(ctx, ref, pluginPullConfig, nil); err != nil {
237 if cs.config == nil {
238 return nil, errors.New("no configuration pulled")
240 var config types.PluginConfig
241 if err := json.Unmarshal(cs.config, &config); err != nil {
245 return computePrivileges(config)
248 // Upgrade upgrades a plugin
249 func (pm *Manager) Upgrade(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer) (err error) {
250 p, err := pm.config.Store.GetV2Plugin(name)
252 return errors.Wrap(err, "plugin must be installed before upgrading")
256 return fmt.Errorf("plugin must be disabled before upgrading")
260 defer pm.muGC.RUnlock()
262 // revalidate because Pull is public
263 if _, err := reference.ParseNormalizedNamed(name); err != nil {
264 return errors.Wrapf(err, "failed to parse %q", name)
267 tmpRootFSDir, err := ioutil.TempDir(pm.tmpDir(), ".rootfs")
271 defer os.RemoveAll(tmpRootFSDir)
273 dm := &downloadManager{
274 tmpDir: tmpRootFSDir,
275 blobStore: pm.blobStore,
278 pluginPullConfig := &distribution.ImagePullConfig{
279 Config: distribution.Config{
280 MetaHeaders: metaHeader,
281 AuthConfig: authConfig,
282 RegistryService: pm.config.RegistryService,
283 ImageEventLogger: pm.config.LogPluginEvent,
286 DownloadManager: dm, // todo: reevaluate if possible to substitute distribution/xfer dependencies instead
287 Schema2Types: distribution.PluginTypes,
290 err = pm.pull(ctx, ref, pluginPullConfig, outStream)
296 if err := pm.upgradePlugin(p, dm.configDigest, dm.blobs, tmpRootFSDir, &privileges); err != nil {
299 p.PluginObj.PluginReference = ref.String()
303 // Pull pulls a plugin, check if the correct privileges are provided and install the plugin.
304 func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer) (err error) {
306 defer pm.muGC.RUnlock()
308 // revalidate because Pull is public
309 nameref, err := reference.ParseNormalizedNamed(name)
311 return errors.Wrapf(err, "failed to parse %q", name)
313 name = reference.FamiliarString(reference.TagNameOnly(nameref))
315 if err := pm.config.Store.validateName(name); err != nil {
319 tmpRootFSDir, err := ioutil.TempDir(pm.tmpDir(), ".rootfs")
323 defer os.RemoveAll(tmpRootFSDir)
325 dm := &downloadManager{
326 tmpDir: tmpRootFSDir,
327 blobStore: pm.blobStore,
330 pluginPullConfig := &distribution.ImagePullConfig{
331 Config: distribution.Config{
332 MetaHeaders: metaHeader,
333 AuthConfig: authConfig,
334 RegistryService: pm.config.RegistryService,
335 ImageEventLogger: pm.config.LogPluginEvent,
338 DownloadManager: dm, // todo: reevaluate if possible to substitute distribution/xfer dependencies instead
339 Schema2Types: distribution.PluginTypes,
342 err = pm.pull(ctx, ref, pluginPullConfig, outStream)
348 p, err := pm.createPlugin(name, dm.configDigest, dm.blobs, tmpRootFSDir, &privileges)
352 p.PluginObj.PluginReference = ref.String()
357 // List displays the list of plugins and associated metadata.
358 func (pm *Manager) List(pluginFilters filters.Args) ([]types.Plugin, error) {
359 if err := pluginFilters.Validate(acceptedPluginFilterTags); err != nil {
364 disabledOnly := false
365 if pluginFilters.Include("enabled") {
366 if pluginFilters.ExactMatch("enabled", "true") {
368 } else if pluginFilters.ExactMatch("enabled", "false") {
371 return nil, fmt.Errorf("Invalid filter 'enabled=%s'", pluginFilters.Get("enabled"))
375 plugins := pm.config.Store.GetAll()
376 out := make([]types.Plugin, 0, len(plugins))
379 for _, p := range plugins {
380 if enabledOnly && !p.PluginObj.Enabled {
383 if disabledOnly && p.PluginObj.Enabled {
386 if pluginFilters.Include("capability") {
387 for _, f := range p.GetTypes() {
388 if !pluginFilters.Match("capability", f.Capability) {
393 out = append(out, p.PluginObj)
398 // Push pushes a plugin to the store.
399 func (pm *Manager) Push(ctx context.Context, name string, metaHeader http.Header, authConfig *types.AuthConfig, outStream io.Writer) error {
400 p, err := pm.config.Store.GetV2Plugin(name)
405 ref, err := reference.ParseNormalizedNamed(p.Name())
407 return errors.Wrapf(err, "plugin has invalid name %v for push", p.Name())
410 var po progress.Output
411 if outStream != nil {
412 // Include a buffer so that slow client connections don't affect
413 // transfer performance.
414 progressChan := make(chan progress.Progress, 100)
416 writesDone := make(chan struct{})
423 var cancelFunc context.CancelFunc
424 ctx, cancelFunc = context.WithCancel(ctx)
427 progressutils.WriteDistributionProgress(cancelFunc, outStream, progressChan)
431 po = progress.ChanOutput(progressChan)
433 po = progress.DiscardOutput()
436 // TODO: replace these with manager
437 is := &pluginConfigStore{
441 ls := &pluginLayerProvider{
445 rs := &pluginReference{
450 uploadManager := xfer.NewLayerUploadManager(3)
452 imagePushConfig := &distribution.ImagePushConfig{
453 Config: distribution.Config{
454 MetaHeaders: metaHeader,
455 AuthConfig: authConfig,
457 RegistryService: pm.config.RegistryService,
459 ImageEventLogger: pm.config.LogPluginEvent,
461 RequireSchema2: true,
463 ConfigMediaType: schema2.MediaTypePluginConfig,
465 UploadManager: uploadManager,
468 return distribution.Push(ctx, ref, imagePushConfig)
471 type pluginReference struct {
473 pluginID digest.Digest
476 func (r *pluginReference) References(id digest.Digest) []reference.Named {
477 if r.pluginID != id {
480 return []reference.Named{r.name}
483 func (r *pluginReference) ReferencesByName(ref reference.Named) []refstore.Association {
484 return []refstore.Association{
492 func (r *pluginReference) Get(ref reference.Named) (digest.Digest, error) {
493 if r.name.String() != ref.String() {
494 return digest.Digest(""), refstore.ErrDoesNotExist
496 return r.pluginID, nil
499 func (r *pluginReference) AddTag(ref reference.Named, id digest.Digest, force bool) error {
503 func (r *pluginReference) AddDigest(ref reference.Canonical, id digest.Digest, force bool) error {
507 func (r *pluginReference) Delete(ref reference.Named) (bool, error) {
512 type pluginConfigStore struct {
517 func (s *pluginConfigStore) Put([]byte) (digest.Digest, error) {
518 return digest.Digest(""), errors.New("cannot store config on push")
521 func (s *pluginConfigStore) Get(d digest.Digest) ([]byte, error) {
522 if s.plugin.Config != d {
523 return nil, errors.New("plugin not found")
525 rwc, err := s.pm.blobStore.Get(d)
530 return ioutil.ReadAll(rwc)
533 func (s *pluginConfigStore) GetTarSeekStream(d digest.Digest) (ioutils.ReadSeekCloser, error) {
534 return nil, fmt.Errorf("unimplemented")
537 func (s *pluginConfigStore) RootFSAndPlatformFromConfig(c []byte) (*image.RootFS, layer.Platform, error) {
538 return configToRootFS(c)
541 type pluginLayerProvider struct {
546 func (p *pluginLayerProvider) Get(id layer.ChainID) (distribution.PushLayer, error) {
547 rootFS := rootFSFromPlugin(p.plugin.PluginObj.Config.Rootfs)
549 for i = 1; i <= len(rootFS.DiffIDs); i++ {
550 if layer.CreateChainID(rootFS.DiffIDs[:i]) == id {
554 if i > len(rootFS.DiffIDs) {
555 return nil, errors.New("layer not found")
559 diffIDs: rootFS.DiffIDs[:i],
560 blobs: p.plugin.Blobsums[:i],
564 type pluginLayer struct {
566 diffIDs []layer.DiffID
567 blobs []digest.Digest
570 func (l *pluginLayer) ChainID() layer.ChainID {
571 return layer.CreateChainID(l.diffIDs)
574 func (l *pluginLayer) DiffID() layer.DiffID {
575 return l.diffIDs[len(l.diffIDs)-1]
578 func (l *pluginLayer) Parent() distribution.PushLayer {
579 if len(l.diffIDs) == 1 {
584 diffIDs: l.diffIDs[:len(l.diffIDs)-1],
585 blobs: l.blobs[:len(l.diffIDs)-1],
589 func (l *pluginLayer) Open() (io.ReadCloser, error) {
590 return l.pm.blobStore.Get(l.blobs[len(l.diffIDs)-1])
593 func (l *pluginLayer) Size() (int64, error) {
594 return l.pm.blobStore.Size(l.blobs[len(l.diffIDs)-1])
597 func (l *pluginLayer) MediaType() string {
598 return schema2.MediaTypeLayer
601 func (l *pluginLayer) Release() {
602 // Nothing needs to be release, no references held
605 // Remove deletes plugin's root directory.
606 func (pm *Manager) Remove(name string, config *types.PluginRmConfig) error {
607 p, err := pm.config.Store.GetV2Plugin(name)
616 if !config.ForceRemove {
617 if p.GetRefCount() > 0 {
618 return fmt.Errorf("plugin %s is in use", p.Name())
621 return fmt.Errorf("plugin %s is enabled", p.Name())
626 if err := pm.disable(p, c); err != nil {
627 logrus.Errorf("failed to disable plugin '%s': %s", p.Name(), err)
636 pm.config.Store.Remove(p)
637 pluginDir := filepath.Join(pm.config.Root, id)
638 if err := recursiveUnmount(pluginDir); err != nil {
639 logrus.WithField("dir", pluginDir).WithField("id", id).Warn(err)
641 if err := os.RemoveAll(pluginDir); err != nil {
642 logrus.Warnf("unable to remove %q from plugin remove: %v", pluginDir, err)
644 pm.config.LogPluginEvent(id, name, "remove")
648 func getMounts(root string) ([]string, error) {
649 infos, err := mount.GetMounts()
651 return nil, errors.Wrap(err, "failed to read mount table")
655 for _, m := range infos {
656 if strings.HasPrefix(m.Mountpoint, root) {
657 mounts = append(mounts, m.Mountpoint)
664 func recursiveUnmount(root string) error {
665 mounts, err := getMounts(root)
670 // sort in reverse-lexicographic order so the root mount will always be last
671 sort.Sort(sort.Reverse(sort.StringSlice(mounts)))
673 for i, m := range mounts {
674 if err := mount.Unmount(m); err != nil {
675 if i == len(mounts)-1 {
676 return errors.Wrapf(err, "error performing recursive unmount on %s", root)
678 logrus.WithError(err).WithField("mountpoint", m).Warn("could not unmount")
685 // Set sets plugin args
686 func (pm *Manager) Set(name string, args []string) error {
687 p, err := pm.config.Store.GetV2Plugin(name)
691 if err := p.Set(args); err != nil {
697 // CreateFromContext creates a plugin from the given pluginDir which contains
698 // both the rootfs and the config.json and a repoName with optional tag.
699 func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser, options *types.PluginCreateOptions) (err error) {
701 defer pm.muGC.RUnlock()
703 ref, err := reference.ParseNormalizedNamed(options.RepoName)
705 return errors.Wrapf(err, "failed to parse reference %v", options.RepoName)
707 if _, ok := ref.(reference.Canonical); ok {
708 return errors.Errorf("canonical references are not permitted")
710 name := reference.FamiliarString(reference.TagNameOnly(ref))
712 if err := pm.config.Store.validateName(name); err != nil { // fast check, real check is in createPlugin()
716 tmpRootFSDir, err := ioutil.TempDir(pm.tmpDir(), ".rootfs")
718 return errors.Wrap(err, "failed to create temp directory")
720 defer os.RemoveAll(tmpRootFSDir)
722 var configJSON []byte
723 rootFS := splitConfigRootFSFromTar(tarCtx, &configJSON)
725 rootFSBlob, err := pm.blobStore.New()
729 defer rootFSBlob.Close()
730 gzw := gzip.NewWriter(rootFSBlob)
731 layerDigester := digest.Canonical.Digester()
732 rootFSReader := io.TeeReader(rootFS, io.MultiWriter(gzw, layerDigester.Hash()))
734 if err := chrootarchive.Untar(rootFSReader, tmpRootFSDir, nil); err != nil {
737 if err := rootFS.Close(); err != nil {
741 if configJSON == nil {
742 return errors.New("config not found")
745 if err := gzw.Close(); err != nil {
746 return errors.Wrap(err, "error closing gzip writer")
749 var config types.PluginConfig
750 if err := json.Unmarshal(configJSON, &config); err != nil {
751 return errors.Wrap(err, "failed to parse config")
754 if err := pm.validateConfig(config); err != nil {
761 rootFSBlobsum, err := rootFSBlob.Commit()
771 config.Rootfs = &types.PluginConfigRootfs{
773 DiffIds: []string{layerDigester.Digest().String()},
776 config.DockerVersion = dockerversion.Version
778 configBlob, err := pm.blobStore.New()
782 defer configBlob.Close()
783 if err := json.NewEncoder(configBlob).Encode(config); err != nil {
784 return errors.Wrap(err, "error encoding json config")
786 configBlobsum, err := configBlob.Commit()
791 p, err := pm.createPlugin(name, configBlobsum, []digest.Digest{rootFSBlobsum}, tmpRootFSDir, nil)
795 p.PluginObj.PluginReference = name
797 pm.config.LogPluginEvent(p.PluginObj.ID, name, "create")
802 func (pm *Manager) validateConfig(config types.PluginConfig) error {
806 func splitConfigRootFSFromTar(in io.ReadCloser, config *[]byte) io.ReadCloser {
809 tarReader := tar.NewReader(in)
810 tarWriter := tar.NewWriter(pw)
816 hdr, err := tarReader.Next()
819 pw.CloseWithError(errors.Wrap(err, "no rootfs found"))
822 // Signals end of archive.
828 pw.CloseWithError(errors.Wrap(err, "failed to read from tar"))
832 content := io.Reader(tarReader)
833 name := path.Clean(hdr.Name)
834 if path.IsAbs(name) {
837 if name == configFileName {
838 dt, err := ioutil.ReadAll(content)
840 pw.CloseWithError(errors.Wrapf(err, "failed to read %s", configFileName))
845 if parts := strings.Split(name, "/"); len(parts) != 0 && parts[0] == rootFSFileName {
846 hdr.Name = path.Clean(path.Join(parts[1:]...))
847 if hdr.Typeflag == tar.TypeLink && strings.HasPrefix(strings.ToLower(hdr.Linkname), rootFSFileName+"/") {
848 hdr.Linkname = hdr.Linkname[len(rootFSFileName)+1:]
850 if err := tarWriter.WriteHeader(hdr); err != nil {
851 pw.CloseWithError(errors.Wrap(err, "error writing tar header"))
854 if _, err := pools.Copy(tarWriter, content); err != nil {
855 pw.CloseWithError(errors.Wrap(err, "error copying tar data"))
860 io.Copy(ioutil.Discard, content)