7 "github.com/docker/swarmkit/agent/exec"
8 "github.com/docker/swarmkit/api"
9 "github.com/docker/swarmkit/api/equality"
10 "github.com/docker/swarmkit/log"
11 "golang.org/x/net/context"
14 // taskManager manages all aspects of task execution and reporting for an agent
15 // through state management.
16 type taskManager struct {
19 reporter StatusReporter
21 updateq chan *api.Task
23 shutdown chan struct{}
24 shutdownOnce sync.Once
29 func newTaskManager(ctx context.Context, task *api.Task, ctlr exec.Controller, reporter StatusReporter) *taskManager {
34 updateq: make(chan *api.Task),
35 shutdown: make(chan struct{}),
36 closed: make(chan struct{}),
42 // Update the task data.
43 func (tm *taskManager) Update(ctx context.Context, task *api.Task) error {
45 case tm.updateq <- task:
54 // Close shuts down the task manager, blocking until it is closed.
55 func (tm *taskManager) Close() error {
56 tm.shutdownOnce.Do(func() {
65 func (tm *taskManager) Logs(ctx context.Context, options api.LogSubscriptionOptions, publisher exec.LogPublisher) {
66 ctx = log.WithModule(ctx, "taskmanager")
68 logCtlr, ok := tm.ctlr.(exec.ControllerLogs)
70 return // no logs available
72 if err := logCtlr.Logs(ctx, publisher, options); err != nil {
73 log.G(ctx).WithError(err).Errorf("logs call failed")
77 func (tm *taskManager) run(ctx context.Context) {
78 ctx, cancelAll := context.WithCancel(ctx)
79 defer cancelAll() // cancel all child operations on exit.
81 ctx = log.WithModule(ctx, "taskmanager")
85 cancel context.CancelFunc
86 run = make(chan struct{}, 1)
87 statusq = make(chan *api.TaskStatus)
88 errs = make(chan error)
89 shutdown = tm.shutdown
90 updated bool // true if the task was updated.
94 // closure picks up current value of cancel.
100 run <- struct{}{} // prime the pump
104 // always check for shutdown before running.
107 shutdown = tm.shutdown // a little questionable
108 continue // ignore run request and handle shutdown
114 opctx, cancel = context.WithCancel(ctx)
116 // Several variables need to be snapshotted for the closure below.
117 opcancel := cancel // fork for the closure
118 running := tm.task.Copy() // clone the task before dispatch
119 statusqLocal := statusq
120 updatedLocal := updated // capture state of update for goroutine
122 go runctx(ctx, tm.closed, errs, func(ctx context.Context) error {
126 // before we do anything, update the task for the controller.
127 // always update the controller before running.
128 if err := tm.ctlr.Update(opctx, running); err != nil {
129 log.G(ctx).WithError(err).Error("updating task controller failed")
134 status, err := exec.Do(opctx, running, tm.ctlr)
136 // always report the status if we get one back. This
137 // returns to the manager loop, then reports the status
140 case statusqLocal <- status:
141 case <-ctx.Done(): // not opctx, since that may have been cancelled.
144 if err := tm.reporter.UpdateTaskStatus(ctx, running.ID, status); err != nil {
145 log.G(ctx).WithError(err).Error("task manager failed to report status to agent")
152 // This branch is always executed when an operations completes. The
153 // goal is to decide whether or not we re-dispatch the operation.
158 shutdown = tm.shutdown // re-enable the shutdown branch
159 continue // no dispatch if we are in shutdown.
164 case exec.ErrTaskNoop:
166 continue // wait till getting pumped via update.
168 case exec.ErrTaskRetry:
169 // TODO(stevvooe): Add exponential backoff with random jitter
170 // here. For now, this backoff is enough to keep the task
171 // manager from running away with the CPU.
172 time.AfterFunc(time.Second, func() {
173 errs <- nil // repump this branch, with no err
176 case nil, context.Canceled, context.DeadlineExceeded:
177 // no log in this case
179 log.G(ctx).WithError(err).Error("task operation failed")
183 case run <- struct{}{}:
186 case status := <-statusq:
187 tm.task.Status = *status
188 case task := <-tm.updateq:
189 if equality.TasksEqualStable(task, tm.task) {
190 continue // ignore the update
193 if task.ID != tm.task.ID {
194 log.G(ctx).WithField("task.update.id", task.ID).Error("received update for incorrect task")
198 if task.DesiredState < tm.task.DesiredState {
199 log.G(ctx).WithField("task.update.desiredstate", task.DesiredState).
200 Error("ignoring task update with invalid desired state")
205 task.Status = tm.task.Status // overwrite our status, as it is canonical.
209 // we have accepted the task update
211 cancel() // cancel outstanding if necessary.
213 // If this channel op fails, it means there is already a
214 // message on the run queue.
216 case run <- struct{}{}:
222 // cancel outstanding operation.
225 // subtle: after a cancellation, we want to avoid busy wait
226 // here. this gets renabled in the errs branch and we'll come
227 // back around and try shutdown again.
228 shutdown = nil // turn off this branch until op proceeds
229 continue // wait until operation actually exits.
232 // disable everything, and prepare for closing.
236 tm.closeOnce.Do(func() {
242 tm.closeOnce.Do(func() {