Tizen_4.0 base
[platform/upstream/docker-engine.git] / vendor / github.com / docker / swarmkit / manager / orchestrator / replicated / services.go
1 package replicated
2
3 import (
4         "sort"
5
6         "github.com/docker/go-events"
7         "github.com/docker/swarmkit/api"
8         "github.com/docker/swarmkit/log"
9         "github.com/docker/swarmkit/manager/orchestrator"
10         "github.com/docker/swarmkit/manager/state/store"
11         "golang.org/x/net/context"
12 )
13
14 // This file provices service-level orchestration. It observes changes to
15 // services and creates and destroys tasks as necessary to match the service
16 // specifications. This is different from task-level orchestration, which
17 // responds to changes in individual tasks (or nodes which run them).
18
19 func (r *Orchestrator) initCluster(readTx store.ReadTx) error {
20         clusters, err := store.FindClusters(readTx, store.ByName("default"))
21         if err != nil {
22                 return err
23         }
24
25         if len(clusters) != 1 {
26                 // we'll just pick it when it is created.
27                 return nil
28         }
29
30         r.cluster = clusters[0]
31         return nil
32 }
33
34 func (r *Orchestrator) initServices(readTx store.ReadTx) error {
35         services, err := store.FindServices(readTx, store.All)
36         if err != nil {
37                 return err
38         }
39         for _, s := range services {
40                 if orchestrator.IsReplicatedService(s) {
41                         r.reconcileServices[s.ID] = s
42                 }
43         }
44         return nil
45 }
46
47 func (r *Orchestrator) handleServiceEvent(ctx context.Context, event events.Event) {
48         switch v := event.(type) {
49         case api.EventDeleteService:
50                 if !orchestrator.IsReplicatedService(v.Service) {
51                         return
52                 }
53                 orchestrator.DeleteServiceTasks(ctx, r.store, v.Service)
54                 r.restarts.ClearServiceHistory(v.Service.ID)
55                 delete(r.reconcileServices, v.Service.ID)
56         case api.EventCreateService:
57                 if !orchestrator.IsReplicatedService(v.Service) {
58                         return
59                 }
60                 r.reconcileServices[v.Service.ID] = v.Service
61         case api.EventUpdateService:
62                 if !orchestrator.IsReplicatedService(v.Service) {
63                         return
64                 }
65                 r.reconcileServices[v.Service.ID] = v.Service
66         }
67 }
68
69 func (r *Orchestrator) tickServices(ctx context.Context) {
70         if len(r.reconcileServices) > 0 {
71                 for _, s := range r.reconcileServices {
72                         r.reconcile(ctx, s)
73                 }
74                 r.reconcileServices = make(map[string]*api.Service)
75         }
76 }
77
78 func (r *Orchestrator) resolveService(ctx context.Context, task *api.Task) *api.Service {
79         if task.ServiceID == "" {
80                 return nil
81         }
82         var service *api.Service
83         r.store.View(func(tx store.ReadTx) {
84                 service = store.GetService(tx, task.ServiceID)
85         })
86         return service
87 }
88
89 func (r *Orchestrator) reconcile(ctx context.Context, service *api.Service) {
90         runningSlots, deadSlots, err := orchestrator.GetRunnableAndDeadSlots(r.store, service.ID)
91         if err != nil {
92                 log.G(ctx).WithError(err).Errorf("reconcile failed finding tasks")
93                 return
94         }
95
96         numSlots := len(runningSlots)
97
98         slotsSlice := make([]orchestrator.Slot, 0, numSlots)
99         for _, slot := range runningSlots {
100                 slotsSlice = append(slotsSlice, slot)
101         }
102
103         deploy := service.Spec.GetMode().(*api.ServiceSpec_Replicated)
104         specifiedSlots := deploy.Replicated.Replicas
105
106         switch {
107         case specifiedSlots > uint64(numSlots):
108                 log.G(ctx).Debugf("Service %s was scaled up from %d to %d instances", service.ID, numSlots, specifiedSlots)
109                 // Update all current tasks then add missing tasks
110                 r.updater.Update(ctx, r.cluster, service, slotsSlice)
111                 err = r.store.Batch(func(batch *store.Batch) error {
112                         r.addTasks(ctx, batch, service, runningSlots, deadSlots, specifiedSlots-uint64(numSlots))
113                         r.deleteTasksMap(ctx, batch, deadSlots)
114                         return nil
115                 })
116                 if err != nil {
117                         log.G(ctx).WithError(err).Errorf("reconcile batch failed")
118                 }
119
120         case specifiedSlots < uint64(numSlots):
121                 // Update up to N tasks then remove the extra
122                 log.G(ctx).Debugf("Service %s was scaled down from %d to %d instances", service.ID, numSlots, specifiedSlots)
123
124                 // Preferentially remove tasks on the nodes that have the most
125                 // copies of this service, to leave a more balanced result.
126
127                 // First sort tasks such that tasks which are currently running
128                 // (in terms of observed state) appear before non-running tasks.
129                 // This will cause us to prefer to remove non-running tasks, all
130                 // other things being equal in terms of node balance.
131
132                 sort.Sort(slotsByRunningState(slotsSlice))
133
134                 // Assign each task an index that counts it as the nth copy of
135                 // of the service on its node (1, 2, 3, ...), and sort the
136                 // tasks by this counter value.
137
138                 slotsByNode := make(map[string]int)
139                 slotsWithIndices := make(slotsByIndex, 0, numSlots)
140
141                 for _, slot := range slotsSlice {
142                         if len(slot) == 1 && slot[0].NodeID != "" {
143                                 slotsByNode[slot[0].NodeID]++
144                                 slotsWithIndices = append(slotsWithIndices, slotWithIndex{slot: slot, index: slotsByNode[slot[0].NodeID]})
145                         } else {
146                                 slotsWithIndices = append(slotsWithIndices, slotWithIndex{slot: slot, index: -1})
147                         }
148                 }
149
150                 sort.Sort(slotsWithIndices)
151
152                 sortedSlots := make([]orchestrator.Slot, 0, numSlots)
153                 for _, slot := range slotsWithIndices {
154                         sortedSlots = append(sortedSlots, slot.slot)
155                 }
156
157                 r.updater.Update(ctx, r.cluster, service, sortedSlots[:specifiedSlots])
158                 err = r.store.Batch(func(batch *store.Batch) error {
159                         r.deleteTasksMap(ctx, batch, deadSlots)
160                         r.deleteTasks(ctx, batch, sortedSlots[specifiedSlots:])
161                         return nil
162                 })
163                 if err != nil {
164                         log.G(ctx).WithError(err).Errorf("reconcile batch failed")
165                 }
166
167         case specifiedSlots == uint64(numSlots):
168                 err = r.store.Batch(func(batch *store.Batch) error {
169                         r.deleteTasksMap(ctx, batch, deadSlots)
170                         return nil
171                 })
172                 if err != nil {
173                         log.G(ctx).WithError(err).Errorf("reconcile batch failed")
174                 }
175                 // Simple update, no scaling - update all tasks.
176                 r.updater.Update(ctx, r.cluster, service, slotsSlice)
177         }
178 }
179
180 func (r *Orchestrator) addTasks(ctx context.Context, batch *store.Batch, service *api.Service, runningSlots map[uint64]orchestrator.Slot, deadSlots map[uint64]orchestrator.Slot, count uint64) {
181         slot := uint64(0)
182         for i := uint64(0); i < count; i++ {
183                 // Find a slot number that is missing a running task
184                 for {
185                         slot++
186                         if _, ok := runningSlots[slot]; !ok {
187                                 break
188                         }
189                 }
190
191                 delete(deadSlots, slot)
192                 err := batch.Update(func(tx store.Tx) error {
193                         return store.CreateTask(tx, orchestrator.NewTask(r.cluster, service, slot, ""))
194                 })
195                 if err != nil {
196                         log.G(ctx).Errorf("Failed to create task: %v", err)
197                 }
198         }
199 }
200
201 func (r *Orchestrator) deleteTasks(ctx context.Context, batch *store.Batch, slots []orchestrator.Slot) {
202         for _, slot := range slots {
203                 for _, t := range slot {
204                         r.deleteTask(ctx, batch, t)
205                 }
206         }
207 }
208
209 func (r *Orchestrator) deleteTasksMap(ctx context.Context, batch *store.Batch, slots map[uint64]orchestrator.Slot) {
210         for _, slot := range slots {
211                 for _, t := range slot {
212                         r.deleteTask(ctx, batch, t)
213                 }
214         }
215 }
216
217 func (r *Orchestrator) deleteTask(ctx context.Context, batch *store.Batch, t *api.Task) {
218         err := batch.Update(func(tx store.Tx) error {
219                 return store.DeleteTask(tx, t.ID)
220         })
221         if err != nil {
222                 log.G(ctx).WithError(err).Errorf("deleting task %s failed", t.ID)
223         }
224 }
225
226 // IsRelatedService returns true if the service should be governed by this orchestrator
227 func (r *Orchestrator) IsRelatedService(service *api.Service) bool {
228         return orchestrator.IsReplicatedService(service)
229 }