Tizen_4.0 base
[platform/upstream/docker-engine.git] / vendor / github.com / docker / libnetwork / datastore / datastore.go
1 package datastore
2
3 import (
4         "fmt"
5         "log"
6         "reflect"
7         "strings"
8         "sync"
9         "time"
10
11         "github.com/docker/libkv"
12         "github.com/docker/libkv/store"
13         "github.com/docker/libnetwork/discoverapi"
14         "github.com/docker/libnetwork/types"
15 )
16
17 //DataStore exported
18 type DataStore interface {
19         // GetObject gets data from datastore and unmarshals to the specified object
20         GetObject(key string, o KVObject) error
21         // PutObject adds a new Record based on an object into the datastore
22         PutObject(kvObject KVObject) error
23         // PutObjectAtomic provides an atomic add and update operation for a Record
24         PutObjectAtomic(kvObject KVObject) error
25         // DeleteObject deletes a record
26         DeleteObject(kvObject KVObject) error
27         // DeleteObjectAtomic performs an atomic delete operation
28         DeleteObjectAtomic(kvObject KVObject) error
29         // DeleteTree deletes a record
30         DeleteTree(kvObject KVObject) error
31         // Watchable returns whether the store is watchable or not
32         Watchable() bool
33         // Watch for changes on a KVObject
34         Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error)
35         // RestartWatch retriggers stopped Watches
36         RestartWatch()
37         // Active returns if the store is active
38         Active() bool
39         // List returns of a list of KVObjects belonging to the parent
40         // key. The caller must pass a KVObject of the same type as
41         // the objects that need to be listed
42         List(string, KVObject) ([]KVObject, error)
43         // Map returns a Map of KVObjects
44         Map(key string, kvObject KVObject) (map[string]KVObject, error)
45         // Scope returns the scope of the store
46         Scope() string
47         // KVStore returns access to the KV Store
48         KVStore() store.Store
49         // Close closes the data store
50         Close()
51 }
52
53 // ErrKeyModified is raised for an atomic update when the update is working on a stale state
54 var (
55         ErrKeyModified = store.ErrKeyModified
56         ErrKeyNotFound = store.ErrKeyNotFound
57 )
58
59 type datastore struct {
60         scope      string
61         store      store.Store
62         cache      *cache
63         watchCh    chan struct{}
64         active     bool
65         sequential bool
66         sync.Mutex
67 }
68
69 // KVObject is Key/Value interface used by objects to be part of the DataStore
70 type KVObject interface {
71         // Key method lets an object provide the Key to be used in KV Store
72         Key() []string
73         // KeyPrefix method lets an object return immediate parent key that can be used for tree walk
74         KeyPrefix() []string
75         // Value method lets an object marshal its content to be stored in the KV store
76         Value() []byte
77         // SetValue is used by the datastore to set the object's value when loaded from the data store.
78         SetValue([]byte) error
79         // Index method returns the latest DB Index as seen by the object
80         Index() uint64
81         // SetIndex method allows the datastore to store the latest DB Index into the object
82         SetIndex(uint64)
83         // True if the object exists in the datastore, false if it hasn't been stored yet.
84         // When SetIndex() is called, the object has been stored.
85         Exists() bool
86         // DataScope indicates the storage scope of the KV object
87         DataScope() string
88         // Skip provides a way for a KV Object to avoid persisting it in the KV Store
89         Skip() bool
90 }
91
92 // KVConstructor interface defines methods which can construct a KVObject from another.
93 type KVConstructor interface {
94         // New returns a new object which is created based on the
95         // source object
96         New() KVObject
97         // CopyTo deep copies the contents of the implementing object
98         // to the passed destination object
99         CopyTo(KVObject) error
100 }
101
102 // ScopeCfg represents Datastore configuration.
103 type ScopeCfg struct {
104         Client ScopeClientCfg
105 }
106
107 // ScopeClientCfg represents Datastore Client-only mode configuration
108 type ScopeClientCfg struct {
109         Provider string
110         Address  string
111         Config   *store.Config
112 }
113
114 const (
115         // LocalScope indicates to store the KV object in local datastore such as boltdb
116         LocalScope = "local"
117         // GlobalScope indicates to store the KV object in global datastore such as consul/etcd/zookeeper
118         GlobalScope = "global"
119         // SwarmScope is not indicating a datastore location. It is defined here
120         // along with the other two scopes just for consistency.
121         SwarmScope    = "swarm"
122         defaultPrefix = "/var/lib/docker/network/files"
123 )
124
125 const (
126         // NetworkKeyPrefix is the prefix for network key in the kv store
127         NetworkKeyPrefix = "network"
128         // EndpointKeyPrefix is the prefix for endpoint key in the kv store
129         EndpointKeyPrefix = "endpoint"
130 )
131
132 var (
133         defaultScopes = makeDefaultScopes()
134 )
135
136 func makeDefaultScopes() map[string]*ScopeCfg {
137         def := make(map[string]*ScopeCfg)
138         def[LocalScope] = &ScopeCfg{
139                 Client: ScopeClientCfg{
140                         Provider: string(store.BOLTDB),
141                         Address:  defaultPrefix + "/local-kv.db",
142                         Config: &store.Config{
143                                 Bucket:            "libnetwork",
144                                 ConnectionTimeout: time.Minute,
145                         },
146                 },
147         }
148
149         return def
150 }
151
152 var defaultRootChain = []string{"docker", "network", "v1.0"}
153 var rootChain = defaultRootChain
154
155 // DefaultScopes returns a map of default scopes and its config for clients to use.
156 func DefaultScopes(dataDir string) map[string]*ScopeCfg {
157         if dataDir != "" {
158                 defaultScopes[LocalScope].Client.Address = dataDir + "/network/files/local-kv.db"
159                 return defaultScopes
160         }
161
162         defaultScopes[LocalScope].Client.Address = defaultPrefix + "/local-kv.db"
163         return defaultScopes
164 }
165
166 // IsValid checks if the scope config has valid configuration.
167 func (cfg *ScopeCfg) IsValid() bool {
168         if cfg == nil ||
169                 strings.TrimSpace(cfg.Client.Provider) == "" ||
170                 strings.TrimSpace(cfg.Client.Address) == "" {
171                 return false
172         }
173
174         return true
175 }
176
177 //Key provides convenient method to create a Key
178 func Key(key ...string) string {
179         keychain := append(rootChain, key...)
180         str := strings.Join(keychain, "/")
181         return str + "/"
182 }
183
184 //ParseKey provides convenient method to unpack the key to complement the Key function
185 func ParseKey(key string) ([]string, error) {
186         chain := strings.Split(strings.Trim(key, "/"), "/")
187
188         // The key must atleast be equal to the rootChain in order to be considered as valid
189         if len(chain) <= len(rootChain) || !reflect.DeepEqual(chain[0:len(rootChain)], rootChain) {
190                 return nil, types.BadRequestErrorf("invalid Key : %s", key)
191         }
192         return chain[len(rootChain):], nil
193 }
194
195 // newClient used to connect to KV Store
196 func newClient(scope string, kv string, addr string, config *store.Config, cached bool) (DataStore, error) {
197
198         if cached && scope != LocalScope {
199                 return nil, fmt.Errorf("caching supported only for scope %s", LocalScope)
200         }
201         sequential := false
202         if scope == LocalScope {
203                 sequential = true
204         }
205
206         if config == nil {
207                 config = &store.Config{}
208         }
209
210         var addrs []string
211
212         if kv == string(store.BOLTDB) {
213                 // Parse file path
214                 addrs = strings.Split(addr, ",")
215         } else {
216                 // Parse URI
217                 parts := strings.SplitN(addr, "/", 2)
218                 addrs = strings.Split(parts[0], ",")
219
220                 // Add the custom prefix to the root chain
221                 if len(parts) == 2 {
222                         rootChain = append([]string{parts[1]}, defaultRootChain...)
223                 }
224         }
225
226         store, err := libkv.NewStore(store.Backend(kv), addrs, config)
227         if err != nil {
228                 return nil, err
229         }
230
231         ds := &datastore{scope: scope, store: store, active: true, watchCh: make(chan struct{}), sequential: sequential}
232         if cached {
233                 ds.cache = newCache(ds)
234         }
235
236         return ds, nil
237 }
238
239 // NewDataStore creates a new instance of LibKV data store
240 func NewDataStore(scope string, cfg *ScopeCfg) (DataStore, error) {
241         if cfg == nil || cfg.Client.Provider == "" || cfg.Client.Address == "" {
242                 c, ok := defaultScopes[scope]
243                 if !ok || c.Client.Provider == "" || c.Client.Address == "" {
244                         return nil, fmt.Errorf("unexpected scope %s without configuration passed", scope)
245                 }
246
247                 cfg = c
248         }
249
250         var cached bool
251         if scope == LocalScope {
252                 cached = true
253         }
254
255         return newClient(scope, cfg.Client.Provider, cfg.Client.Address, cfg.Client.Config, cached)
256 }
257
258 // NewDataStoreFromConfig creates a new instance of LibKV data store starting from the datastore config data
259 func NewDataStoreFromConfig(dsc discoverapi.DatastoreConfigData) (DataStore, error) {
260         var (
261                 ok    bool
262                 sCfgP *store.Config
263         )
264
265         sCfgP, ok = dsc.Config.(*store.Config)
266         if !ok && dsc.Config != nil {
267                 return nil, fmt.Errorf("cannot parse store configuration: %v", dsc.Config)
268         }
269
270         scopeCfg := &ScopeCfg{
271                 Client: ScopeClientCfg{
272                         Address:  dsc.Address,
273                         Provider: dsc.Provider,
274                         Config:   sCfgP,
275                 },
276         }
277
278         ds, err := NewDataStore(dsc.Scope, scopeCfg)
279         if err != nil {
280                 return nil, fmt.Errorf("failed to construct datastore client from datastore configuration %v: %v", dsc, err)
281         }
282
283         return ds, err
284 }
285
286 func (ds *datastore) Close() {
287         ds.store.Close()
288 }
289
290 func (ds *datastore) Scope() string {
291         return ds.scope
292 }
293
294 func (ds *datastore) Active() bool {
295         return ds.active
296 }
297
298 func (ds *datastore) Watchable() bool {
299         return ds.scope != LocalScope
300 }
301
302 func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error) {
303         sCh := make(chan struct{})
304
305         ctor, ok := kvObject.(KVConstructor)
306         if !ok {
307                 return nil, fmt.Errorf("error watching object type %T, object does not implement KVConstructor interface", kvObject)
308         }
309
310         kvpCh, err := ds.store.Watch(Key(kvObject.Key()...), sCh)
311         if err != nil {
312                 return nil, err
313         }
314
315         kvoCh := make(chan KVObject)
316
317         go func() {
318         retry_watch:
319                 var err error
320
321                 // Make sure to get a new instance of watch channel
322                 ds.Lock()
323                 watchCh := ds.watchCh
324                 ds.Unlock()
325
326         loop:
327                 for {
328                         select {
329                         case <-stopCh:
330                                 close(sCh)
331                                 return
332                         case kvPair := <-kvpCh:
333                                 // If the backend KV store gets reset libkv's go routine
334                                 // for the watch can exit resulting in a nil value in
335                                 // channel.
336                                 if kvPair == nil {
337                                         ds.Lock()
338                                         ds.active = false
339                                         ds.Unlock()
340                                         break loop
341                                 }
342
343                                 dstO := ctor.New()
344
345                                 if err = dstO.SetValue(kvPair.Value); err != nil {
346                                         log.Printf("Could not unmarshal kvpair value = %s", string(kvPair.Value))
347                                         break
348                                 }
349
350                                 dstO.SetIndex(kvPair.LastIndex)
351                                 kvoCh <- dstO
352                         }
353                 }
354
355                 // Wait on watch channel for a re-trigger when datastore becomes active
356                 <-watchCh
357
358                 kvpCh, err = ds.store.Watch(Key(kvObject.Key()...), sCh)
359                 if err != nil {
360                         log.Printf("Could not watch the key %s in store: %v", Key(kvObject.Key()...), err)
361                 }
362
363                 goto retry_watch
364         }()
365
366         return kvoCh, nil
367 }
368
369 func (ds *datastore) RestartWatch() {
370         ds.Lock()
371         defer ds.Unlock()
372
373         ds.active = true
374         watchCh := ds.watchCh
375         ds.watchCh = make(chan struct{})
376         close(watchCh)
377 }
378
379 func (ds *datastore) KVStore() store.Store {
380         return ds.store
381 }
382
383 // PutObjectAtomic adds a new Record based on an object into the datastore
384 func (ds *datastore) PutObjectAtomic(kvObject KVObject) error {
385         var (
386                 previous *store.KVPair
387                 pair     *store.KVPair
388                 err      error
389         )
390         if ds.sequential {
391                 ds.Lock()
392                 defer ds.Unlock()
393         }
394
395         if kvObject == nil {
396                 return types.BadRequestErrorf("invalid KV Object : nil")
397         }
398
399         kvObjValue := kvObject.Value()
400
401         if kvObjValue == nil {
402                 return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
403         }
404
405         if kvObject.Skip() {
406                 goto add_cache
407         }
408
409         if kvObject.Exists() {
410                 previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
411         } else {
412                 previous = nil
413         }
414
415         _, pair, err = ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous, nil)
416         if err != nil {
417                 if err == store.ErrKeyExists {
418                         return ErrKeyModified
419                 }
420                 return err
421         }
422
423         kvObject.SetIndex(pair.LastIndex)
424
425 add_cache:
426         if ds.cache != nil {
427                 // If persistent store is skipped, sequencing needs to
428                 // happen in cache.
429                 return ds.cache.add(kvObject, kvObject.Skip())
430         }
431
432         return nil
433 }
434
435 // PutObject adds a new Record based on an object into the datastore
436 func (ds *datastore) PutObject(kvObject KVObject) error {
437         if ds.sequential {
438                 ds.Lock()
439                 defer ds.Unlock()
440         }
441
442         if kvObject == nil {
443                 return types.BadRequestErrorf("invalid KV Object : nil")
444         }
445
446         if kvObject.Skip() {
447                 goto add_cache
448         }
449
450         if err := ds.putObjectWithKey(kvObject, kvObject.Key()...); err != nil {
451                 return err
452         }
453
454 add_cache:
455         if ds.cache != nil {
456                 // If persistent store is skipped, sequencing needs to
457                 // happen in cache.
458                 return ds.cache.add(kvObject, kvObject.Skip())
459         }
460
461         return nil
462 }
463
464 func (ds *datastore) putObjectWithKey(kvObject KVObject, key ...string) error {
465         kvObjValue := kvObject.Value()
466
467         if kvObjValue == nil {
468                 return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
469         }
470         return ds.store.Put(Key(key...), kvObjValue, nil)
471 }
472
473 // GetObject returns a record matching the key
474 func (ds *datastore) GetObject(key string, o KVObject) error {
475         if ds.sequential {
476                 ds.Lock()
477                 defer ds.Unlock()
478         }
479
480         if ds.cache != nil {
481                 return ds.cache.get(key, o)
482         }
483
484         kvPair, err := ds.store.Get(key)
485         if err != nil {
486                 return err
487         }
488
489         if err := o.SetValue(kvPair.Value); err != nil {
490                 return err
491         }
492
493         // Make sure the object has a correct view of the DB index in
494         // case we need to modify it and update the DB.
495         o.SetIndex(kvPair.LastIndex)
496         return nil
497 }
498
499 func (ds *datastore) ensureParent(parent string) error {
500         exists, err := ds.store.Exists(parent)
501         if err != nil {
502                 return err
503         }
504         if exists {
505                 return nil
506         }
507         return ds.store.Put(parent, []byte{}, &store.WriteOptions{IsDir: true})
508 }
509
510 func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) {
511         if ds.sequential {
512                 ds.Lock()
513                 defer ds.Unlock()
514         }
515
516         if ds.cache != nil {
517                 return ds.cache.list(kvObject)
518         }
519
520         var kvol []KVObject
521         cb := func(key string, val KVObject) {
522                 kvol = append(kvol, val)
523         }
524         err := ds.iterateKVPairsFromStore(key, kvObject, cb)
525         if err != nil {
526                 return nil, err
527         }
528         return kvol, nil
529 }
530
531 func (ds *datastore) iterateKVPairsFromStore(key string, kvObject KVObject, callback func(string, KVObject)) error {
532         // Bail out right away if the kvObject does not implement KVConstructor
533         ctor, ok := kvObject.(KVConstructor)
534         if !ok {
535                 return fmt.Errorf("error listing objects, object does not implement KVConstructor interface")
536         }
537
538         // Make sure the parent key exists
539         if err := ds.ensureParent(key); err != nil {
540                 return err
541         }
542
543         kvList, err := ds.store.List(key)
544         if err != nil {
545                 return err
546         }
547
548         for _, kvPair := range kvList {
549                 if len(kvPair.Value) == 0 {
550                         continue
551                 }
552
553                 dstO := ctor.New()
554                 if err := dstO.SetValue(kvPair.Value); err != nil {
555                         return err
556                 }
557
558                 // Make sure the object has a correct view of the DB index in
559                 // case we need to modify it and update the DB.
560                 dstO.SetIndex(kvPair.LastIndex)
561                 callback(kvPair.Key, dstO)
562         }
563
564         return nil
565 }
566
567 func (ds *datastore) Map(key string, kvObject KVObject) (map[string]KVObject, error) {
568         if ds.sequential {
569                 ds.Lock()
570                 defer ds.Unlock()
571         }
572
573         kvol := make(map[string]KVObject)
574         cb := func(key string, val KVObject) {
575                 // Trim the leading & trailing "/" to make it consistent across all stores
576                 kvol[strings.Trim(key, "/")] = val
577         }
578         err := ds.iterateKVPairsFromStore(key, kvObject, cb)
579         if err != nil {
580                 return nil, err
581         }
582         return kvol, nil
583 }
584
585 // DeleteObject unconditionally deletes a record from the store
586 func (ds *datastore) DeleteObject(kvObject KVObject) error {
587         if ds.sequential {
588                 ds.Lock()
589                 defer ds.Unlock()
590         }
591
592         // cleaup the cache first
593         if ds.cache != nil {
594                 // If persistent store is skipped, sequencing needs to
595                 // happen in cache.
596                 ds.cache.del(kvObject, kvObject.Skip())
597         }
598
599         if kvObject.Skip() {
600                 return nil
601         }
602
603         return ds.store.Delete(Key(kvObject.Key()...))
604 }
605
606 // DeleteObjectAtomic performs atomic delete on a record
607 func (ds *datastore) DeleteObjectAtomic(kvObject KVObject) error {
608         if ds.sequential {
609                 ds.Lock()
610                 defer ds.Unlock()
611         }
612
613         if kvObject == nil {
614                 return types.BadRequestErrorf("invalid KV Object : nil")
615         }
616
617         previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
618
619         if kvObject.Skip() {
620                 goto del_cache
621         }
622
623         if _, err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil {
624                 if err == store.ErrKeyExists {
625                         return ErrKeyModified
626                 }
627                 return err
628         }
629
630 del_cache:
631         // cleanup the cache only if AtomicDelete went through successfully
632         if ds.cache != nil {
633                 // If persistent store is skipped, sequencing needs to
634                 // happen in cache.
635                 return ds.cache.del(kvObject, kvObject.Skip())
636         }
637
638         return nil
639 }
640
641 // DeleteTree unconditionally deletes a record from the store
642 func (ds *datastore) DeleteTree(kvObject KVObject) error {
643         if ds.sequential {
644                 ds.Lock()
645                 defer ds.Unlock()
646         }
647
648         // cleaup the cache first
649         if ds.cache != nil {
650                 // If persistent store is skipped, sequencing needs to
651                 // happen in cache.
652                 ds.cache.del(kvObject, kvObject.Skip())
653         }
654
655         if kvObject.Skip() {
656                 return nil
657         }
658
659         return ds.store.DeleteTree(Key(kvObject.KeyPrefix()...))
660 }