Tizen_4.0 base
[platform/upstream/docker-engine.git] / vendor / github.com / docker / swarmkit / agent / task.go
1 package agent
2
3 import (
4         "sync"
5         "time"
6
7         "github.com/docker/swarmkit/agent/exec"
8         "github.com/docker/swarmkit/api"
9         "github.com/docker/swarmkit/api/equality"
10         "github.com/docker/swarmkit/log"
11         "golang.org/x/net/context"
12 )
13
14 // taskManager manages all aspects of task execution and reporting for an agent
15 // through state management.
16 type taskManager struct {
17         task     *api.Task
18         ctlr     exec.Controller
19         reporter StatusReporter
20
21         updateq chan *api.Task
22
23         shutdown     chan struct{}
24         shutdownOnce sync.Once
25         closed       chan struct{}
26         closeOnce    sync.Once
27 }
28
29 func newTaskManager(ctx context.Context, task *api.Task, ctlr exec.Controller, reporter StatusReporter) *taskManager {
30         t := &taskManager{
31                 task:     task.Copy(),
32                 ctlr:     ctlr,
33                 reporter: reporter,
34                 updateq:  make(chan *api.Task),
35                 shutdown: make(chan struct{}),
36                 closed:   make(chan struct{}),
37         }
38         go t.run(ctx)
39         return t
40 }
41
42 // Update the task data.
43 func (tm *taskManager) Update(ctx context.Context, task *api.Task) error {
44         select {
45         case tm.updateq <- task:
46                 return nil
47         case <-tm.closed:
48                 return ErrClosed
49         case <-ctx.Done():
50                 return ctx.Err()
51         }
52 }
53
54 // Close shuts down the task manager, blocking until it is closed.
55 func (tm *taskManager) Close() error {
56         tm.shutdownOnce.Do(func() {
57                 close(tm.shutdown)
58         })
59
60         <-tm.closed
61
62         return nil
63 }
64
65 func (tm *taskManager) Logs(ctx context.Context, options api.LogSubscriptionOptions, publisher exec.LogPublisher) {
66         ctx = log.WithModule(ctx, "taskmanager")
67
68         logCtlr, ok := tm.ctlr.(exec.ControllerLogs)
69         if !ok {
70                 return // no logs available
71         }
72         if err := logCtlr.Logs(ctx, publisher, options); err != nil {
73                 log.G(ctx).WithError(err).Errorf("logs call failed")
74         }
75 }
76
77 func (tm *taskManager) run(ctx context.Context) {
78         ctx, cancelAll := context.WithCancel(ctx)
79         defer cancelAll() // cancel all child operations on exit.
80
81         ctx = log.WithModule(ctx, "taskmanager")
82
83         var (
84                 opctx    context.Context
85                 cancel   context.CancelFunc
86                 run      = make(chan struct{}, 1)
87                 statusq  = make(chan *api.TaskStatus)
88                 errs     = make(chan error)
89                 shutdown = tm.shutdown
90                 updated  bool // true if the task was updated.
91         )
92
93         defer func() {
94                 // closure  picks up current value of cancel.
95                 if cancel != nil {
96                         cancel()
97                 }
98         }()
99
100         run <- struct{}{} // prime the pump
101         for {
102                 select {
103                 case <-run:
104                         // always check for shutdown before running.
105                         select {
106                         case <-tm.shutdown:
107                                 shutdown = tm.shutdown // a little questionable
108                                 continue               // ignore run request and handle shutdown
109                         case <-tm.closed:
110                                 continue
111                         default:
112                         }
113
114                         opctx, cancel = context.WithCancel(ctx)
115
116                         // Several variables need to be snapshotted for the closure below.
117                         opcancel := cancel        // fork for the closure
118                         running := tm.task.Copy() // clone the task before dispatch
119                         statusqLocal := statusq
120                         updatedLocal := updated // capture state of update for goroutine
121                         updated = false
122                         go runctx(ctx, tm.closed, errs, func(ctx context.Context) error {
123                                 defer opcancel()
124
125                                 if updatedLocal {
126                                         // before we do anything, update the task for the controller.
127                                         // always update the controller before running.
128                                         if err := tm.ctlr.Update(opctx, running); err != nil {
129                                                 log.G(ctx).WithError(err).Error("updating task controller failed")
130                                                 return err
131                                         }
132                                 }
133
134                                 status, err := exec.Do(opctx, running, tm.ctlr)
135                                 if status != nil {
136                                         // always report the status if we get one back. This
137                                         // returns to the manager loop, then reports the status
138                                         // upstream.
139                                         select {
140                                         case statusqLocal <- status:
141                                         case <-ctx.Done(): // not opctx, since that may have been cancelled.
142                                         }
143
144                                         if err := tm.reporter.UpdateTaskStatus(ctx, running.ID, status); err != nil {
145                                                 log.G(ctx).WithError(err).Error("task manager failed to report status to agent")
146                                         }
147                                 }
148
149                                 return err
150                         })
151                 case err := <-errs:
152                         // This branch is always executed when an operations completes. The
153                         // goal is to decide whether or not we re-dispatch the operation.
154                         cancel = nil
155
156                         select {
157                         case <-tm.shutdown:
158                                 shutdown = tm.shutdown // re-enable the shutdown branch
159                                 continue               // no dispatch if we are in shutdown.
160                         default:
161                         }
162
163                         switch err {
164                         case exec.ErrTaskNoop:
165                                 if !updated {
166                                         continue // wait till getting pumped via update.
167                                 }
168                         case exec.ErrTaskRetry:
169                                 // TODO(stevvooe): Add exponential backoff with random jitter
170                                 // here. For now, this backoff is enough to keep the task
171                                 // manager from running away with the CPU.
172                                 time.AfterFunc(time.Second, func() {
173                                         errs <- nil // repump this branch, with no err
174                                 })
175                                 continue
176                         case nil, context.Canceled, context.DeadlineExceeded:
177                                 // no log in this case
178                         default:
179                                 log.G(ctx).WithError(err).Error("task operation failed")
180                         }
181
182                         select {
183                         case run <- struct{}{}:
184                         default:
185                         }
186                 case status := <-statusq:
187                         tm.task.Status = *status
188                 case task := <-tm.updateq:
189                         if equality.TasksEqualStable(task, tm.task) {
190                                 continue // ignore the update
191                         }
192
193                         if task.ID != tm.task.ID {
194                                 log.G(ctx).WithField("task.update.id", task.ID).Error("received update for incorrect task")
195                                 continue
196                         }
197
198                         if task.DesiredState < tm.task.DesiredState {
199                                 log.G(ctx).WithField("task.update.desiredstate", task.DesiredState).
200                                         Error("ignoring task update with invalid desired state")
201                                 continue
202                         }
203
204                         task = task.Copy()
205                         task.Status = tm.task.Status // overwrite our status, as it is canonical.
206                         tm.task = task
207                         updated = true
208
209                         // we have accepted the task update
210                         if cancel != nil {
211                                 cancel() // cancel outstanding if necessary.
212                         } else {
213                                 // If this channel op fails, it means there is already a
214                                 // message on the run queue.
215                                 select {
216                                 case run <- struct{}{}:
217                                 default:
218                                 }
219                         }
220                 case <-shutdown:
221                         if cancel != nil {
222                                 // cancel outstanding operation.
223                                 cancel()
224
225                                 // subtle: after a cancellation, we want to avoid busy wait
226                                 // here. this gets renabled in the errs branch and we'll come
227                                 // back around and try shutdown again.
228                                 shutdown = nil // turn off this branch until op proceeds
229                                 continue       // wait until operation actually exits.
230                         }
231
232                         // disable everything, and prepare for closing.
233                         statusq = nil
234                         errs = nil
235                         shutdown = nil
236                         tm.closeOnce.Do(func() {
237                                 close(tm.closed)
238                         })
239                 case <-tm.closed:
240                         return
241                 case <-ctx.Done():
242                         tm.closeOnce.Do(func() {
243                                 close(tm.closed)
244                         })
245                         return
246                 }
247         }
248 }