6 "github.com/docker/go-events"
7 "github.com/docker/swarmkit/api"
8 "github.com/docker/swarmkit/log"
9 "github.com/docker/swarmkit/manager/orchestrator"
10 "github.com/docker/swarmkit/manager/state/store"
11 "golang.org/x/net/context"
14 // This file provices service-level orchestration. It observes changes to
15 // services and creates and destroys tasks as necessary to match the service
16 // specifications. This is different from task-level orchestration, which
17 // responds to changes in individual tasks (or nodes which run them).
19 func (r *Orchestrator) initCluster(readTx store.ReadTx) error {
20 clusters, err := store.FindClusters(readTx, store.ByName("default"))
25 if len(clusters) != 1 {
26 // we'll just pick it when it is created.
30 r.cluster = clusters[0]
34 func (r *Orchestrator) initServices(readTx store.ReadTx) error {
35 services, err := store.FindServices(readTx, store.All)
39 for _, s := range services {
40 if orchestrator.IsReplicatedService(s) {
41 r.reconcileServices[s.ID] = s
47 func (r *Orchestrator) handleServiceEvent(ctx context.Context, event events.Event) {
48 switch v := event.(type) {
49 case api.EventDeleteService:
50 if !orchestrator.IsReplicatedService(v.Service) {
53 orchestrator.DeleteServiceTasks(ctx, r.store, v.Service)
54 r.restarts.ClearServiceHistory(v.Service.ID)
55 delete(r.reconcileServices, v.Service.ID)
56 case api.EventCreateService:
57 if !orchestrator.IsReplicatedService(v.Service) {
60 r.reconcileServices[v.Service.ID] = v.Service
61 case api.EventUpdateService:
62 if !orchestrator.IsReplicatedService(v.Service) {
65 r.reconcileServices[v.Service.ID] = v.Service
69 func (r *Orchestrator) tickServices(ctx context.Context) {
70 if len(r.reconcileServices) > 0 {
71 for _, s := range r.reconcileServices {
74 r.reconcileServices = make(map[string]*api.Service)
78 func (r *Orchestrator) resolveService(ctx context.Context, task *api.Task) *api.Service {
79 if task.ServiceID == "" {
82 var service *api.Service
83 r.store.View(func(tx store.ReadTx) {
84 service = store.GetService(tx, task.ServiceID)
89 func (r *Orchestrator) reconcile(ctx context.Context, service *api.Service) {
90 runningSlots, deadSlots, err := orchestrator.GetRunnableAndDeadSlots(r.store, service.ID)
92 log.G(ctx).WithError(err).Errorf("reconcile failed finding tasks")
96 numSlots := len(runningSlots)
98 slotsSlice := make([]orchestrator.Slot, 0, numSlots)
99 for _, slot := range runningSlots {
100 slotsSlice = append(slotsSlice, slot)
103 deploy := service.Spec.GetMode().(*api.ServiceSpec_Replicated)
104 specifiedSlots := deploy.Replicated.Replicas
107 case specifiedSlots > uint64(numSlots):
108 log.G(ctx).Debugf("Service %s was scaled up from %d to %d instances", service.ID, numSlots, specifiedSlots)
109 // Update all current tasks then add missing tasks
110 r.updater.Update(ctx, r.cluster, service, slotsSlice)
111 err = r.store.Batch(func(batch *store.Batch) error {
112 r.addTasks(ctx, batch, service, runningSlots, deadSlots, specifiedSlots-uint64(numSlots))
113 r.deleteTasksMap(ctx, batch, deadSlots)
117 log.G(ctx).WithError(err).Errorf("reconcile batch failed")
120 case specifiedSlots < uint64(numSlots):
121 // Update up to N tasks then remove the extra
122 log.G(ctx).Debugf("Service %s was scaled down from %d to %d instances", service.ID, numSlots, specifiedSlots)
124 // Preferentially remove tasks on the nodes that have the most
125 // copies of this service, to leave a more balanced result.
127 // First sort tasks such that tasks which are currently running
128 // (in terms of observed state) appear before non-running tasks.
129 // This will cause us to prefer to remove non-running tasks, all
130 // other things being equal in terms of node balance.
132 sort.Sort(slotsByRunningState(slotsSlice))
134 // Assign each task an index that counts it as the nth copy of
135 // of the service on its node (1, 2, 3, ...), and sort the
136 // tasks by this counter value.
138 slotsByNode := make(map[string]int)
139 slotsWithIndices := make(slotsByIndex, 0, numSlots)
141 for _, slot := range slotsSlice {
142 if len(slot) == 1 && slot[0].NodeID != "" {
143 slotsByNode[slot[0].NodeID]++
144 slotsWithIndices = append(slotsWithIndices, slotWithIndex{slot: slot, index: slotsByNode[slot[0].NodeID]})
146 slotsWithIndices = append(slotsWithIndices, slotWithIndex{slot: slot, index: -1})
150 sort.Sort(slotsWithIndices)
152 sortedSlots := make([]orchestrator.Slot, 0, numSlots)
153 for _, slot := range slotsWithIndices {
154 sortedSlots = append(sortedSlots, slot.slot)
157 r.updater.Update(ctx, r.cluster, service, sortedSlots[:specifiedSlots])
158 err = r.store.Batch(func(batch *store.Batch) error {
159 r.deleteTasksMap(ctx, batch, deadSlots)
160 r.deleteTasks(ctx, batch, sortedSlots[specifiedSlots:])
164 log.G(ctx).WithError(err).Errorf("reconcile batch failed")
167 case specifiedSlots == uint64(numSlots):
168 err = r.store.Batch(func(batch *store.Batch) error {
169 r.deleteTasksMap(ctx, batch, deadSlots)
173 log.G(ctx).WithError(err).Errorf("reconcile batch failed")
175 // Simple update, no scaling - update all tasks.
176 r.updater.Update(ctx, r.cluster, service, slotsSlice)
180 func (r *Orchestrator) addTasks(ctx context.Context, batch *store.Batch, service *api.Service, runningSlots map[uint64]orchestrator.Slot, deadSlots map[uint64]orchestrator.Slot, count uint64) {
182 for i := uint64(0); i < count; i++ {
183 // Find a slot number that is missing a running task
186 if _, ok := runningSlots[slot]; !ok {
191 delete(deadSlots, slot)
192 err := batch.Update(func(tx store.Tx) error {
193 return store.CreateTask(tx, orchestrator.NewTask(r.cluster, service, slot, ""))
196 log.G(ctx).Errorf("Failed to create task: %v", err)
201 func (r *Orchestrator) deleteTasks(ctx context.Context, batch *store.Batch, slots []orchestrator.Slot) {
202 for _, slot := range slots {
203 for _, t := range slot {
204 r.deleteTask(ctx, batch, t)
209 func (r *Orchestrator) deleteTasksMap(ctx context.Context, batch *store.Batch, slots map[uint64]orchestrator.Slot) {
210 for _, slot := range slots {
211 for _, t := range slot {
212 r.deleteTask(ctx, batch, t)
217 func (r *Orchestrator) deleteTask(ctx context.Context, batch *store.Batch, t *api.Task) {
218 err := batch.Update(func(tx store.Tx) error {
219 return store.DeleteTask(tx, t.ID)
222 log.G(ctx).WithError(err).Errorf("deleting task %s failed", t.ID)
226 // IsRelatedService returns true if the service should be governed by this orchestrator
227 func (r *Orchestrator) IsRelatedService(service *api.Service) bool {
228 return orchestrator.IsReplicatedService(service)