11 "github.com/docker/libkv"
12 "github.com/docker/libkv/store"
13 "github.com/docker/libnetwork/discoverapi"
14 "github.com/docker/libnetwork/types"
18 type DataStore interface {
19 // GetObject gets data from datastore and unmarshals to the specified object
20 GetObject(key string, o KVObject) error
21 // PutObject adds a new Record based on an object into the datastore
22 PutObject(kvObject KVObject) error
23 // PutObjectAtomic provides an atomic add and update operation for a Record
24 PutObjectAtomic(kvObject KVObject) error
25 // DeleteObject deletes a record
26 DeleteObject(kvObject KVObject) error
27 // DeleteObjectAtomic performs an atomic delete operation
28 DeleteObjectAtomic(kvObject KVObject) error
29 // DeleteTree deletes a record
30 DeleteTree(kvObject KVObject) error
31 // Watchable returns whether the store is watchable or not
33 // Watch for changes on a KVObject
34 Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error)
35 // RestartWatch retriggers stopped Watches
37 // Active returns if the store is active
39 // List returns of a list of KVObjects belonging to the parent
40 // key. The caller must pass a KVObject of the same type as
41 // the objects that need to be listed
42 List(string, KVObject) ([]KVObject, error)
43 // Map returns a Map of KVObjects
44 Map(key string, kvObject KVObject) (map[string]KVObject, error)
45 // Scope returns the scope of the store
47 // KVStore returns access to the KV Store
49 // Close closes the data store
53 // ErrKeyModified is raised for an atomic update when the update is working on a stale state
55 ErrKeyModified = store.ErrKeyModified
56 ErrKeyNotFound = store.ErrKeyNotFound
59 type datastore struct {
69 // KVObject is Key/Value interface used by objects to be part of the DataStore
70 type KVObject interface {
71 // Key method lets an object provide the Key to be used in KV Store
73 // KeyPrefix method lets an object return immediate parent key that can be used for tree walk
75 // Value method lets an object marshal its content to be stored in the KV store
77 // SetValue is used by the datastore to set the object's value when loaded from the data store.
78 SetValue([]byte) error
79 // Index method returns the latest DB Index as seen by the object
81 // SetIndex method allows the datastore to store the latest DB Index into the object
83 // True if the object exists in the datastore, false if it hasn't been stored yet.
84 // When SetIndex() is called, the object has been stored.
86 // DataScope indicates the storage scope of the KV object
88 // Skip provides a way for a KV Object to avoid persisting it in the KV Store
92 // KVConstructor interface defines methods which can construct a KVObject from another.
93 type KVConstructor interface {
94 // New returns a new object which is created based on the
97 // CopyTo deep copies the contents of the implementing object
98 // to the passed destination object
99 CopyTo(KVObject) error
102 // ScopeCfg represents Datastore configuration.
103 type ScopeCfg struct {
104 Client ScopeClientCfg
107 // ScopeClientCfg represents Datastore Client-only mode configuration
108 type ScopeClientCfg struct {
115 // LocalScope indicates to store the KV object in local datastore such as boltdb
117 // GlobalScope indicates to store the KV object in global datastore such as consul/etcd/zookeeper
118 GlobalScope = "global"
119 // SwarmScope is not indicating a datastore location. It is defined here
120 // along with the other two scopes just for consistency.
122 defaultPrefix = "/var/lib/docker/network/files"
126 // NetworkKeyPrefix is the prefix for network key in the kv store
127 NetworkKeyPrefix = "network"
128 // EndpointKeyPrefix is the prefix for endpoint key in the kv store
129 EndpointKeyPrefix = "endpoint"
133 defaultScopes = makeDefaultScopes()
136 func makeDefaultScopes() map[string]*ScopeCfg {
137 def := make(map[string]*ScopeCfg)
138 def[LocalScope] = &ScopeCfg{
139 Client: ScopeClientCfg{
140 Provider: string(store.BOLTDB),
141 Address: defaultPrefix + "/local-kv.db",
142 Config: &store.Config{
143 Bucket: "libnetwork",
144 ConnectionTimeout: time.Minute,
152 var defaultRootChain = []string{"docker", "network", "v1.0"}
153 var rootChain = defaultRootChain
155 // DefaultScopes returns a map of default scopes and its config for clients to use.
156 func DefaultScopes(dataDir string) map[string]*ScopeCfg {
158 defaultScopes[LocalScope].Client.Address = dataDir + "/network/files/local-kv.db"
162 defaultScopes[LocalScope].Client.Address = defaultPrefix + "/local-kv.db"
166 // IsValid checks if the scope config has valid configuration.
167 func (cfg *ScopeCfg) IsValid() bool {
169 strings.TrimSpace(cfg.Client.Provider) == "" ||
170 strings.TrimSpace(cfg.Client.Address) == "" {
177 //Key provides convenient method to create a Key
178 func Key(key ...string) string {
179 keychain := append(rootChain, key...)
180 str := strings.Join(keychain, "/")
184 //ParseKey provides convenient method to unpack the key to complement the Key function
185 func ParseKey(key string) ([]string, error) {
186 chain := strings.Split(strings.Trim(key, "/"), "/")
188 // The key must atleast be equal to the rootChain in order to be considered as valid
189 if len(chain) <= len(rootChain) || !reflect.DeepEqual(chain[0:len(rootChain)], rootChain) {
190 return nil, types.BadRequestErrorf("invalid Key : %s", key)
192 return chain[len(rootChain):], nil
195 // newClient used to connect to KV Store
196 func newClient(scope string, kv string, addr string, config *store.Config, cached bool) (DataStore, error) {
198 if cached && scope != LocalScope {
199 return nil, fmt.Errorf("caching supported only for scope %s", LocalScope)
202 if scope == LocalScope {
207 config = &store.Config{}
212 if kv == string(store.BOLTDB) {
214 addrs = strings.Split(addr, ",")
217 parts := strings.SplitN(addr, "/", 2)
218 addrs = strings.Split(parts[0], ",")
220 // Add the custom prefix to the root chain
222 rootChain = append([]string{parts[1]}, defaultRootChain...)
226 store, err := libkv.NewStore(store.Backend(kv), addrs, config)
231 ds := &datastore{scope: scope, store: store, active: true, watchCh: make(chan struct{}), sequential: sequential}
233 ds.cache = newCache(ds)
239 // NewDataStore creates a new instance of LibKV data store
240 func NewDataStore(scope string, cfg *ScopeCfg) (DataStore, error) {
241 if cfg == nil || cfg.Client.Provider == "" || cfg.Client.Address == "" {
242 c, ok := defaultScopes[scope]
243 if !ok || c.Client.Provider == "" || c.Client.Address == "" {
244 return nil, fmt.Errorf("unexpected scope %s without configuration passed", scope)
251 if scope == LocalScope {
255 return newClient(scope, cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config, cached)
258 // NewDataStoreFromConfig creates a new instance of LibKV data store starting from the datastore config data
259 func NewDataStoreFromConfig(dsc discoverapi.DatastoreConfigData) (DataStore, error) {
265 sCfgP, ok = dsc.Config.(*store.Config)
266 if !ok && dsc.Config != nil {
267 return nil, fmt.Errorf("cannot parse store configuration: %v", dsc.Config)
270 scopeCfg := &ScopeCfg{
271 Client: ScopeClientCfg{
272 Address: dsc.Address,
273 Provider: dsc.Provider,
278 ds, err := NewDataStore(dsc.Scope, scopeCfg)
280 return nil, fmt.Errorf("failed to construct datastore client from datastore configuration %v: %v", dsc, err)
286 func (ds *datastore) Close() {
290 func (ds *datastore) Scope() string {
294 func (ds *datastore) Active() bool {
298 func (ds *datastore) Watchable() bool {
299 return ds.scope != LocalScope
302 func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error) {
303 sCh := make(chan struct{})
305 ctor, ok := kvObject.(KVConstructor)
307 return nil, fmt.Errorf("error watching object type %T, object does not implement KVConstructor interface", kvObject)
310 kvpCh, err := ds.store.Watch(Key(kvObject.Key()...), sCh)
315 kvoCh := make(chan KVObject)
321 // Make sure to get a new instance of watch channel
323 watchCh := ds.watchCh
332 case kvPair := <-kvpCh:
333 // If the backend KV store gets reset libkv's go routine
334 // for the watch can exit resulting in a nil value in
345 if err = dstO.SetValue(kvPair.Value); err != nil {
346 log.Printf("Could not unmarshal kvpair value = %s", string(kvPair.Value))
350 dstO.SetIndex(kvPair.LastIndex)
355 // Wait on watch channel for a re-trigger when datastore becomes active
358 kvpCh, err = ds.store.Watch(Key(kvObject.Key()...), sCh)
360 log.Printf("Could not watch the key %s in store: %v", Key(kvObject.Key()...), err)
369 func (ds *datastore) RestartWatch() {
374 watchCh := ds.watchCh
375 ds.watchCh = make(chan struct{})
379 func (ds *datastore) KVStore() store.Store {
383 // PutObjectAtomic adds a new Record based on an object into the datastore
384 func (ds *datastore) PutObjectAtomic(kvObject KVObject) error {
386 previous *store.KVPair
396 return types.BadRequestErrorf("invalid KV Object : nil")
399 kvObjValue := kvObject.Value()
401 if kvObjValue == nil {
402 return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
409 if kvObject.Exists() {
410 previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
415 _, pair, err = ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous, nil)
417 if err == store.ErrKeyExists {
418 return ErrKeyModified
423 kvObject.SetIndex(pair.LastIndex)
427 // If persistent store is skipped, sequencing needs to
429 return ds.cache.add(kvObject, kvObject.Skip())
435 // PutObject adds a new Record based on an object into the datastore
436 func (ds *datastore) PutObject(kvObject KVObject) error {
443 return types.BadRequestErrorf("invalid KV Object : nil")
450 if err := ds.putObjectWithKey(kvObject, kvObject.Key()...); err != nil {
456 // If persistent store is skipped, sequencing needs to
458 return ds.cache.add(kvObject, kvObject.Skip())
464 func (ds *datastore) putObjectWithKey(kvObject KVObject, key ...string) error {
465 kvObjValue := kvObject.Value()
467 if kvObjValue == nil {
468 return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
470 return ds.store.Put(Key(key...), kvObjValue, nil)
473 // GetObject returns a record matching the key
474 func (ds *datastore) GetObject(key string, o KVObject) error {
481 return ds.cache.get(key, o)
484 kvPair, err := ds.store.Get(key)
489 if err := o.SetValue(kvPair.Value); err != nil {
493 // Make sure the object has a correct view of the DB index in
494 // case we need to modify it and update the DB.
495 o.SetIndex(kvPair.LastIndex)
499 func (ds *datastore) ensureParent(parent string) error {
500 exists, err := ds.store.Exists(parent)
507 return ds.store.Put(parent, []byte{}, &store.WriteOptions{IsDir: true})
510 func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) {
517 return ds.cache.list(kvObject)
521 cb := func(key string, val KVObject) {
522 kvol = append(kvol, val)
524 err := ds.iterateKVPairsFromStore(key, kvObject, cb)
531 func (ds *datastore) iterateKVPairsFromStore(key string, kvObject KVObject, callback func(string, KVObject)) error {
532 // Bail out right away if the kvObject does not implement KVConstructor
533 ctor, ok := kvObject.(KVConstructor)
535 return fmt.Errorf("error listing objects, object does not implement KVConstructor interface")
538 // Make sure the parent key exists
539 if err := ds.ensureParent(key); err != nil {
543 kvList, err := ds.store.List(key)
548 for _, kvPair := range kvList {
549 if len(kvPair.Value) == 0 {
554 if err := dstO.SetValue(kvPair.Value); err != nil {
558 // Make sure the object has a correct view of the DB index in
559 // case we need to modify it and update the DB.
560 dstO.SetIndex(kvPair.LastIndex)
561 callback(kvPair.Key, dstO)
567 func (ds *datastore) Map(key string, kvObject KVObject) (map[string]KVObject, error) {
573 kvol := make(map[string]KVObject)
574 cb := func(key string, val KVObject) {
575 // Trim the leading & trailing "/" to make it consistent across all stores
576 kvol[strings.Trim(key, "/")] = val
578 err := ds.iterateKVPairsFromStore(key, kvObject, cb)
585 // DeleteObject unconditionally deletes a record from the store
586 func (ds *datastore) DeleteObject(kvObject KVObject) error {
592 // cleaup the cache first
594 // If persistent store is skipped, sequencing needs to
596 ds.cache.del(kvObject, kvObject.Skip())
603 return ds.store.Delete(Key(kvObject.Key()...))
606 // DeleteObjectAtomic performs atomic delete on a record
607 func (ds *datastore) DeleteObjectAtomic(kvObject KVObject) error {
614 return types.BadRequestErrorf("invalid KV Object : nil")
617 previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
623 if _, err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil {
624 if err == store.ErrKeyExists {
625 return ErrKeyModified
631 // cleanup the cache only if AtomicDelete went through successfully
633 // If persistent store is skipped, sequencing needs to
635 return ds.cache.del(kvObject, kvObject.Skip())
641 // DeleteTree unconditionally deletes a record from the store
642 func (ds *datastore) DeleteTree(kvObject KVObject) error {
648 // cleaup the cache first
650 // If persistent store is skipped, sequencing needs to
652 ds.cache.del(kvObject, kvObject.Skip())
659 return ds.store.DeleteTree(Key(kvObject.KeyPrefix()...))