12 "github.com/Sirupsen/logrus"
13 "github.com/containerd/containerd/runtime"
17 defaultBufferSize = 2048 // size of queue in eventloop
20 // New returns an initialized Process supervisor.
21 func New(stateDir string, runtimeName, shimName string, runtimeArgs []string, timeout time.Duration, retainCount int) (*Supervisor, error) {
22 startTasks := make(chan *startTask, 10)
23 machine, err := CollectMachineInformation()
27 monitor, err := NewMonitor()
33 containers: make(map[string]*containerInfo),
34 startTasks: startTasks,
36 subscribers: make(map[chan Event]struct{}),
37 tasks: make(chan Task, defaultBufferSize),
40 runtimeArgs: runtimeArgs,
43 containerExecSync: make(map[string]map[string]chan struct{}),
45 if err := setupEventLog(s, retainCount); err != nil {
50 if err := s.restore(); err != nil {
56 type containerInfo struct {
57 container runtime.Container
60 func setupEventLog(s *Supervisor, retainCount int) error {
61 if err := readEventLog(s); err != nil {
64 logrus.WithField("count", len(s.eventLog)).Debug("containerd: read past events")
65 events := s.Events(time.Time{}, false, "")
66 return eventLogger(s, filepath.Join(s.stateDir, "events.log"), events, retainCount)
69 func eventLogger(s *Supervisor, path string, events chan Event, retainCount int) error {
70 f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755)
76 count = len(s.eventLog)
77 enc = json.NewEncoder(f)
79 for e := range events {
80 // if we have a specified retain count make sure the truncate the event
81 // log if it grows past the specified number of events to keep.
83 if count > retainCount {
84 logrus.Debug("truncating event log")
89 slice := retainCount - 1
95 s.eventLog = s.eventLog[len(s.eventLog)-slice:]
97 if f, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND|os.O_TRUNC, 0755); err != nil {
98 logrus.WithField("error", err).Error("containerd: open event to journal")
101 enc = json.NewEncoder(f)
103 for _, le := range s.eventLog {
104 if err := enc.Encode(le); err != nil {
105 logrus.WithField("error", err).Error("containerd: write event to journal")
111 s.eventLog = append(s.eventLog, e)
114 if err := enc.Encode(e); err != nil {
115 logrus.WithField("error", err).Error("containerd: write event to journal")
122 func readEventLog(s *Supervisor) error {
123 f, err := os.Open(filepath.Join(s.stateDir, "events.log"))
125 if os.IsNotExist(err) {
131 dec := json.NewDecoder(f)
134 if err := dec.Decode(&e); err != nil {
141 // We need to take care of -1 Status for backward compatibility
143 ev.Status = uint32(e.Status)
144 if ev.Status > runtime.UnknownStatus {
145 ev.Status = runtime.UnknownStatus
147 s.eventLog = append(s.eventLog, ev)
152 // Supervisor represents a container supervisor
153 type Supervisor struct {
154 // stateDir is the directory on the system to store container runtime state information.
156 // name of the OCI compatible runtime used to execute containers
160 containers map[string]*containerInfo
161 startTasks chan *startTask
162 // we need a lock around the subscribers map only because additions and deletions from
163 // the map are via the API so we cannot really control the concurrency
164 subscriberLock sync.RWMutex
165 subscribers map[chan Event]struct{}
171 timeout time.Duration
172 // This is used to ensure that exec process death events are sent
173 // before the init process death
174 containerExecSyncLock sync.Mutex
175 containerExecSync map[string]map[string]chan struct{}
178 // Stop closes all startTasks and sends a SIGTERM to each container's pid1 then waits for they to
179 // terminate. After it has handled all the SIGCHILD events it will close the signals chan
180 // and exit. Stop is a non-blocking call and will return after the containers have been signaled
181 func (s *Supervisor) Stop() {
182 // Close the startTasks channel so that no new containers get started
186 // Close closes any open files in the supervisor but expects that Stop has been
187 // callsed so that no more containers are started.
188 func (s *Supervisor) Close() error {
192 // Event represents a container event
194 ID string `json:"id"`
195 Type string `json:"type"`
196 Timestamp time.Time `json:"timestamp"`
197 PID string `json:"pid,omitempty"`
198 Status uint32 `json:"status,omitempty"`
201 type eventV1 struct {
203 Status int `json:"status,omitempty"`
206 // Events returns an event channel that external consumers can use to receive updates
207 // on container events
208 func (s *Supervisor) Events(from time.Time, storedOnly bool, id string) chan Event {
209 c := make(chan Event, defaultBufferSize)
211 defer s.Unsubscribe(c)
213 s.subscriberLock.Lock()
214 defer s.subscriberLock.Unlock()
218 past := s.eventLog[:]
220 for _, e := range past {
221 if e.Timestamp.After(from) {
222 if id == "" || e.ID == id {
231 EventSubscriberCounter.Inc(1)
232 s.subscribers[c] = struct{}{}
237 // Unsubscribe removes the provided channel from receiving any more events
238 func (s *Supervisor) Unsubscribe(sub chan Event) {
239 s.subscriberLock.Lock()
240 defer s.subscriberLock.Unlock()
241 if _, ok := s.subscribers[sub]; ok {
242 delete(s.subscribers, sub)
244 EventSubscriberCounter.Dec(1)
248 // notifySubscribers will send the provided event to the external subscribers
249 // of the events channel
250 func (s *Supervisor) notifySubscribers(e Event) {
251 s.subscriberLock.RLock()
252 defer s.subscriberLock.RUnlock()
253 for sub := range s.subscribers {
254 // do a non-blocking send for the channel
258 logrus.WithField("event", e.Type).Warn("containerd: event not sent to subscriber")
263 // Start is a non-blocking call that runs the supervisor for monitoring contianer processes and
264 // executing new containers.
266 // This event loop is the only thing that is allowed to modify state of containers and processes
267 // therefore it is save to do operations in the handlers that modify state of the system or
268 // state of the Supervisor
269 func (s *Supervisor) Start() error {
270 logrus.WithFields(logrus.Fields{
271 "stateDir": s.stateDir,
272 "runtime": s.runtime,
273 "runtimeArgs": s.runtimeArgs,
274 "memory": s.machine.Memory,
275 "cpus": s.machine.Cpus,
276 }).Debug("containerd: supervisor running")
278 for i := range s.tasks {
285 // Machine returns the machine information for which the
286 // supervisor is executing on.
287 func (s *Supervisor) Machine() Machine {
291 // SendTask sends the provided event the the supervisors main event loop
292 func (s *Supervisor) SendTask(evt Task) {
297 func (s *Supervisor) exitHandler() {
298 for p := range s.monitor.Exits() {
306 func (s *Supervisor) oomHandler() {
307 for id := range s.monitor.OOMs() {
315 func (s *Supervisor) monitorProcess(p runtime.Process) error {
316 return s.monitor.Monitor(p)
319 func (s *Supervisor) restore() error {
320 dirs, err := ioutil.ReadDir(s.stateDir)
324 for _, d := range dirs {
329 container, err := runtime.Load(s.stateDir, id, s.shim, s.timeout)
331 logrus.WithFields(logrus.Fields{"error": err, "id": id}).Warnf("containerd: failed to load container,removing state directory.")
332 os.RemoveAll(filepath.Join(s.stateDir, id))
335 processes, err := container.Processes()
336 if err != nil || len(processes) == 0 {
337 logrus.WithFields(logrus.Fields{"error": err, "id": id}).Warnf("containerd: container has no process running,removing state directory.")
338 os.RemoveAll(filepath.Join(s.stateDir, id))
342 ContainersCounter.Inc(1)
343 s.containers[id] = &containerInfo{
344 container: container,
346 if err := s.monitor.MonitorOOM(container); err != nil && err != runtime.ErrContainerExited {
347 logrus.WithField("error", err).Error("containerd: notify OOM events")
350 s.newExecSyncMap(container.ID())
352 logrus.WithField("id", id).Debug("containerd: container restored")
353 var exitedProcesses []runtime.Process
354 for _, p := range processes {
355 if p.State() == runtime.Running {
356 if err := s.monitorProcess(p); err != nil {
360 exitedProcesses = append(exitedProcesses, p)
362 if p.ID() != runtime.InitProcessID {
363 s.newExecSyncChannel(container.ID(), p.ID())
366 if len(exitedProcesses) > 0 {
367 // sort processes so that init is fired last because that is how the kernel sends the
369 sortProcesses(exitedProcesses)
370 for _, p := range exitedProcesses {
381 func (s *Supervisor) handleTask(i Task) {
383 switch t := i.(type) {
384 case *AddProcessTask:
385 err = s.addProcess(t)
386 case *CreateCheckpointTask:
387 err = s.createCheckpoint(t)
388 case *DeleteCheckpointTask:
389 err = s.deleteCheckpoint(t)
396 case *GetContainersTask:
397 err = s.getContainers(t)
403 err = s.updateContainer(t)
404 case *UpdateProcessTask:
405 err = s.updateProcess(t)
411 if err != errDeferredResponse {
417 func (s *Supervisor) newExecSyncMap(containerID string) {
418 s.containerExecSyncLock.Lock()
419 s.containerExecSync[containerID] = make(map[string]chan struct{})
420 s.containerExecSyncLock.Unlock()
423 func (s *Supervisor) newExecSyncChannel(containerID, pid string) {
424 s.containerExecSyncLock.Lock()
425 s.containerExecSync[containerID][pid] = make(chan struct{})
426 s.containerExecSyncLock.Unlock()
429 func (s *Supervisor) deleteExecSyncChannel(containerID, pid string) {
430 s.containerExecSyncLock.Lock()
431 delete(s.containerExecSync[containerID], pid)
432 s.containerExecSyncLock.Unlock()
435 func (s *Supervisor) getExecSyncChannel(containerID, pid string) chan struct{} {
436 s.containerExecSyncLock.Lock()
437 ch := s.containerExecSync[containerID][pid]
438 s.containerExecSyncLock.Unlock()
442 func (s *Supervisor) getExecSyncMap(containerID string) map[string]chan struct{} {
443 s.containerExecSyncLock.Lock()
444 defer s.containerExecSyncLock.Unlock()
445 return s.containerExecSync[containerID]
448 func (s *Supervisor) deleteExecSyncMap(containerID string) {
449 s.containerExecSyncLock.Lock()
450 defer s.containerExecSyncLock.Unlock()
451 delete(s.containerExecSync, containerID)