Tizen_4.0 base
[platform/upstream/docker-engine.git] / vendor / github.com / containerd / containerd / supervisor / supervisor.go
1 package supervisor
2
3 import (
4         "encoding/json"
5         "io"
6         "io/ioutil"
7         "os"
8         "path/filepath"
9         "sync"
10         "time"
11
12         "github.com/Sirupsen/logrus"
13         "github.com/containerd/containerd/runtime"
14 )
15
16 const (
17         defaultBufferSize = 2048 // size of queue in eventloop
18 )
19
20 // New returns an initialized Process supervisor.
21 func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, timeout time.Duration, retainCount int) (*Supervisor, error) {
22         startTasks := make(chan *startTask, 10)
23         machine, err := CollectMachineInformation()
24         if err != nil {
25                 return nil, err
26         }
27         monitor, err := NewMonitor()
28         if err != nil {
29                 return nil, err
30         }
31         s := &Supervisor{
32                 stateDir:          stateDir,
33                 containers:        make(map[string]*containerInfo),
34                 startTasks:        startTasks,
35                 machine:           machine,
36                 subscribers:       make(map[chan Event]struct{}),
37                 tasks:             make(chan Task, defaultBufferSize),
38                 monitor:           monitor,
39                 runtime:           runtimeName,
40                 runtimeArgs:       runtimeArgs,
41                 shim:              shimName,
42                 timeout:           timeout,
43                 containerExecSync: make(map[string]map[string]chan struct{}),
44         }
45         if err := setupEventLog(s, retainCount); err != nil {
46                 return nil, err
47         }
48         go s.exitHandler()
49         go s.oomHandler()
50         if err := s.restore(); err != nil {
51                 return nil, err
52         }
53         return s, nil
54 }
55
56 type containerInfo struct {
57         container runtime.Container
58 }
59
60 func setupEventLog(s *Supervisor, retainCount int) error {
61         if err := readEventLog(s); err != nil {
62                 return err
63         }
64         logrus.WithField("count", len(s.eventLog)).Debug("containerd: read past events")
65         events := s.Events(time.Time{}, false, "")
66         return eventLogger(s, filepath.Join(s.stateDir, "events.log"), events, retainCount)
67 }
68
69 func eventLogger(s *Supervisor, path string, events chan Event, retainCount int) error {
70         f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755)
71         if err != nil {
72                 return err
73         }
74         go func() {
75                 var (
76                         count = len(s.eventLog)
77                         enc   = json.NewEncoder(f)
78                 )
79                 for e := range events {
80                         // if we have a specified retain count make sure the truncate the event
81                         // log if it grows past the specified number of events to keep.
82                         if retainCount > 0 {
83                                 if count > retainCount {
84                                         logrus.Debug("truncating event log")
85                                         // close the log file
86                                         if f != nil {
87                                                 f.Close()
88                                         }
89                                         slice := retainCount - 1
90                                         l := len(s.eventLog)
91                                         if slice >= l {
92                                                 slice = l
93                                         }
94                                         s.eventLock.Lock()
95                                         s.eventLog = s.eventLog[len(s.eventLog)-slice:]
96                                         s.eventLock.Unlock()
97                                         if f, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755); err != nil {
98                                                 logrus.WithField("error", err).Error("containerd: open event to journal")
99                                                 continue
100                                         }
101                                         enc = json.NewEncoder(f)
102                                         count = 0
103                                         for _, le := range s.eventLog {
104                                                 if err := enc.Encode(le); err != nil {
105                                                         logrus.WithField("error", err).Error("containerd: write event to journal")
106                                                 }
107                                         }
108                                 }
109                         }
110                         s.eventLock.Lock()
111                         s.eventLog = append(s.eventLog, e)
112                         s.eventLock.Unlock()
113                         count++
114                         if err := enc.Encode(e); err != nil {
115                                 logrus.WithField("error", err).Error("containerd: write event to journal")
116                         }
117                 }
118         }()
119         return nil
120 }
121
122 func readEventLog(s *Supervisor) error {
123         f, err := os.Open(filepath.Join(s.stateDir, "events.log"))
124         if err != nil {
125                 if os.IsNotExist(err) {
126                         return nil
127                 }
128                 return err
129         }
130         defer f.Close()
131         dec := json.NewDecoder(f)
132         for {
133                 var e eventV1
134                 if err := dec.Decode(&e); err != nil {
135                         if err == io.EOF {
136                                 break
137                         }
138                         return err
139                 }
140
141                 // We need to take care of -1 Status for backward compatibility
142                 ev := e.Event
143                 ev.Status = uint32(e.Status)
144                 if ev.Status > runtime.UnknownStatus {
145                         ev.Status = runtime.UnknownStatus
146                 }
147                 s.eventLog = append(s.eventLog, ev)
148         }
149         return nil
150 }
151
152 // Supervisor represents a container supervisor
153 type Supervisor struct {
154         // stateDir is the directory on the system to store container runtime state information.
155         stateDir string
156         // name of the OCI compatible runtime used to execute containers
157         runtime     string
158         runtimeArgs []string
159         shim        string
160         containers  map[string]*containerInfo
161         startTasks  chan *startTask
162         // we need a lock around the subscribers map only because additions and deletions from
163         // the map are via the API so we cannot really control the concurrency
164         subscriberLock sync.RWMutex
165         subscribers    map[chan Event]struct{}
166         machine        Machine
167         tasks          chan Task
168         monitor        *Monitor
169         eventLog       []Event
170         eventLock      sync.Mutex
171         timeout        time.Duration
172         // This is used to ensure that exec process death events are sent
173         // before the init process death
174         containerExecSyncLock sync.Mutex
175         containerExecSync     map[string]map[string]chan struct{}
176 }
177
178 // Stop closes all startTasks and sends a SIGTERM to each container's pid1 then waits for they to
179 // terminate.  After it has handled all the SIGCHILD events it will close the signals chan
180 // and exit.  Stop is a non-blocking call and will return after the containers have been signaled
181 func (s *Supervisor) Stop() {
182         // Close the startTasks channel so that no new containers get started
183         close(s.startTasks)
184 }
185
186 // Close closes any open files in the supervisor but expects that Stop has been
187 // callsed so that no more containers are started.
188 func (s *Supervisor) Close() error {
189         return nil
190 }
191
192 // Event represents a container event
193 type Event struct {
194         ID        string    `json:"id"`
195         Type      string    `json:"type"`
196         Timestamp time.Time `json:"timestamp"`
197         PID       string    `json:"pid,omitempty"`
198         Status    uint32    `json:"status,omitempty"`
199 }
200
201 type eventV1 struct {
202         Event
203         Status int `json:"status,omitempty"`
204 }
205
206 // Events returns an event channel that external consumers can use to receive updates
207 // on container events
208 func (s *Supervisor) Events(from time.Time, storedOnly bool, id string) chan Event {
209         c := make(chan Event, defaultBufferSize)
210         if storedOnly {
211                 defer s.Unsubscribe(c)
212         }
213         s.subscriberLock.Lock()
214         defer s.subscriberLock.Unlock()
215         if !from.IsZero() {
216                 // replay old event
217                 s.eventLock.Lock()
218                 past := s.eventLog[:]
219                 s.eventLock.Unlock()
220                 for _, e := range past {
221                         if e.Timestamp.After(from) {
222                                 if id == "" || e.ID == id {
223                                         c <- e
224                                 }
225                         }
226                 }
227         }
228         if storedOnly {
229                 close(c)
230         } else {
231                 EventSubscriberCounter.Inc(1)
232                 s.subscribers[c] = struct{}{}
233         }
234         return c
235 }
236
237 // Unsubscribe removes the provided channel from receiving any more events
238 func (s *Supervisor) Unsubscribe(sub chan Event) {
239         s.subscriberLock.Lock()
240         defer s.subscriberLock.Unlock()
241         if _, ok := s.subscribers[sub]; ok {
242                 delete(s.subscribers, sub)
243                 close(sub)
244                 EventSubscriberCounter.Dec(1)
245         }
246 }
247
248 // notifySubscribers will send the provided event to the external subscribers
249 // of the events channel
250 func (s *Supervisor) notifySubscribers(e Event) {
251         s.subscriberLock.RLock()
252         defer s.subscriberLock.RUnlock()
253         for sub := range s.subscribers {
254                 // do a non-blocking send for the channel
255                 select {
256                 case sub <- e:
257                 default:
258                         logrus.WithField("event", e.Type).Warn("containerd: event not sent to subscriber")
259                 }
260         }
261 }
262
263 // Start is a non-blocking call that runs the supervisor for monitoring contianer processes and
264 // executing new containers.
265 //
266 // This event loop is the only thing that is allowed to modify state of containers and processes
267 // therefore it is save to do operations in the handlers that modify state of the system or
268 // state of the Supervisor
269 func (s *Supervisor) Start() error {
270         logrus.WithFields(logrus.Fields{
271                 "stateDir":    s.stateDir,
272                 "runtime":     s.runtime,
273                 "runtimeArgs": s.runtimeArgs,
274                 "memory":      s.machine.Memory,
275                 "cpus":        s.machine.Cpus,
276         }).Debug("containerd: supervisor running")
277         go func() {
278                 for i := range s.tasks {
279                         s.handleTask(i)
280                 }
281         }()
282         return nil
283 }
284
285 // Machine returns the machine information for which the
286 // supervisor is executing on.
287 func (s *Supervisor) Machine() Machine {
288         return s.machine
289 }
290
291 // SendTask sends the provided event the the supervisors main event loop
292 func (s *Supervisor) SendTask(evt Task) {
293         TasksCounter.Inc(1)
294         s.tasks <- evt
295 }
296
297 func (s *Supervisor) exitHandler() {
298         for p := range s.monitor.Exits() {
299                 e := &ExitTask{
300                         Process: p,
301                 }
302                 s.SendTask(e)
303         }
304 }
305
306 func (s *Supervisor) oomHandler() {
307         for id := range s.monitor.OOMs() {
308                 e := &OOMTask{
309                         ID: id,
310                 }
311                 s.SendTask(e)
312         }
313 }
314
315 func (s *Supervisor) monitorProcess(p runtime.Process) error {
316         return s.monitor.Monitor(p)
317 }
318
319 func (s *Supervisor) restore() error {
320         dirs, err := ioutil.ReadDir(s.stateDir)
321         if err != nil {
322                 return err
323         }
324         for _, d := range dirs {
325                 if !d.IsDir() {
326                         continue
327                 }
328                 id := d.Name()
329                 container, err := runtime.Load(s.stateDir, id, s.shim, s.timeout)
330                 if err != nil {
331                         logrus.WithFields(logrus.Fields{"error": err, "id": id}).Warnf("containerd: failed to load container,removing state directory.")
332                         os.RemoveAll(filepath.Join(s.stateDir, id))
333                         continue
334                 }
335                 processes, err := container.Processes()
336                 if err != nil || len(processes) == 0 {
337                         logrus.WithFields(logrus.Fields{"error": err, "id": id}).Warnf("containerd: container has no process running,removing state directory.")
338                         os.RemoveAll(filepath.Join(s.stateDir, id))
339                         continue
340                 }
341
342                 ContainersCounter.Inc(1)
343                 s.containers[id] = &containerInfo{
344                         container: container,
345                 }
346                 if err := s.monitor.MonitorOOM(container); err != nil && err != runtime.ErrContainerExited {
347                         logrus.WithField("error", err).Error("containerd: notify OOM events")
348                 }
349
350                 s.newExecSyncMap(container.ID())
351
352                 logrus.WithField("id", id).Debug("containerd: container restored")
353                 var exitedProcesses []runtime.Process
354                 for _, p := range processes {
355                         if p.State() == runtime.Running {
356                                 if err := s.monitorProcess(p); err != nil {
357                                         return err
358                                 }
359                         } else {
360                                 exitedProcesses = append(exitedProcesses, p)
361                         }
362                         if p.ID() != runtime.InitProcessID {
363                                 s.newExecSyncChannel(container.ID(), p.ID())
364                         }
365                 }
366                 if len(exitedProcesses) > 0 {
367                         // sort processes so that init is fired last because that is how the kernel sends the
368                         // exit events
369                         sortProcesses(exitedProcesses)
370                         for _, p := range exitedProcesses {
371                                 e := &ExitTask{
372                                         Process: p,
373                                 }
374                                 s.SendTask(e)
375                         }
376                 }
377         }
378         return nil
379 }
380
381 func (s *Supervisor) handleTask(i Task) {
382         var err error
383         switch t := i.(type) {
384         case *AddProcessTask:
385                 err = s.addProcess(t)
386         case *CreateCheckpointTask:
387                 err = s.createCheckpoint(t)
388         case *DeleteCheckpointTask:
389                 err = s.deleteCheckpoint(t)
390         case *StartTask:
391                 err = s.start(t)
392         case *DeleteTask:
393                 err = s.delete(t)
394         case *ExitTask:
395                 err = s.exit(t)
396         case *GetContainersTask:
397                 err = s.getContainers(t)
398         case *SignalTask:
399                 err = s.signal(t)
400         case *StatsTask:
401                 err = s.stats(t)
402         case *UpdateTask:
403                 err = s.updateContainer(t)
404         case *UpdateProcessTask:
405                 err = s.updateProcess(t)
406         case *OOMTask:
407                 err = s.oom(t)
408         default:
409                 err = ErrUnknownTask
410         }
411         if err != errDeferredResponse {
412                 i.ErrorCh() <- err
413                 close(i.ErrorCh())
414         }
415 }
416
417 func (s *Supervisor) newExecSyncMap(containerID string) {
418         s.containerExecSyncLock.Lock()
419         s.containerExecSync[containerID] = make(map[string]chan struct{})
420         s.containerExecSyncLock.Unlock()
421 }
422
423 func (s *Supervisor) newExecSyncChannel(containerID, pid string) {
424         s.containerExecSyncLock.Lock()
425         s.containerExecSync[containerID][pid] = make(chan struct{})
426         s.containerExecSyncLock.Unlock()
427 }
428
429 func (s *Supervisor) deleteExecSyncChannel(containerID, pid string) {
430         s.containerExecSyncLock.Lock()
431         delete(s.containerExecSync[containerID], pid)
432         s.containerExecSyncLock.Unlock()
433 }
434
435 func (s *Supervisor) getExecSyncChannel(containerID, pid string) chan struct{} {
436         s.containerExecSyncLock.Lock()
437         ch := s.containerExecSync[containerID][pid]
438         s.containerExecSyncLock.Unlock()
439         return ch
440 }
441
442 func (s *Supervisor) getExecSyncMap(containerID string) map[string]chan struct{} {
443         s.containerExecSyncLock.Lock()
444         defer s.containerExecSyncLock.Unlock()
445         return s.containerExecSync[containerID]
446 }
447
448 func (s *Supervisor) deleteExecSyncMap(containerID string) {
449         s.containerExecSyncLock.Lock()
450         defer s.containerExecSyncLock.Unlock()
451         delete(s.containerExecSync, containerID)
452 }