documentation: fixed a heap o' typos
[platform/upstream/gstreamer.git] / sys / ipcpipeline / gstipcpipelinesink.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2005 Wim Taymans <wim@fluendo.com>
4  *                    2006 Thomas Vander Stichele <thomas at apestaart dot org>
5  *                    2014 Tim-Philipp Müller <tim centricular com>
6  *               2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
7  *
8  * gstipcpipelinesink.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25 /**
26  * SECTION:element-ipcpipelinesink
27  * @see_also: #GstIpcPipelineSrc, #GstIpcSlavePipeline
28  *
29  * Communicates with an ipcpipelinesrc element in another process via a socket.
30  *
31  * This element, together with ipcpipelinesrc and ipcslavepipeline form a
32  * mechanism that allows splitting a single pipeline in different processes.
33  * The main use-case for it is a playback pipeline split in two parts, where the
34  * first part contains the networking, parsing and demuxing and the second part
35  * contains the decoding and display. The intention of this split is to improve
36  * security of an application, by letting the networking, parsing and demuxing
37  * parts run in a less privileged process than the process that accesses the
38  * decoder and display.
39  *
40  * Once the pipelines in those different processes have been created, the
41  * playback can be controlled entirely from the first pipeline, which is the
42  * one that contains ipcpipelinesink. We call this pipeline the “master”.
43  * All relevant events and queries sent from the application are sent to
44  * the master pipeline and messages to the application are sent from the master
45  * pipeline. The second pipeline, in the other process, is transparently slaved.
46  *
47  * ipcpipelinesink can work only in push mode and does not synchronize buffers
48  * to the clock. Synchronization is meant to happen either at the real sink at
49  * the end of the remote slave pipeline, or not to happen at all, if the
50  * pipeline is live.
51  *
52  * A master pipeline may contain more than one ipcpipelinesink elements, which
53  * can be connected either to the same slave pipeline or to different ones.
54  *
55  * Communication with ipcpipelinesrc on the slave happens via a socket, using a
56  * custom protocol. Each buffer, event, query, message or state change is
57  * serialized in a "packet" and sent over the socket. The sender then
58  * performs a blocking wait for a reply, if a return code is needed.
59  *
60  * All objects that contain a GstStructure (messages, queries, events) are
61  * serialized by serializing the GstStructure to a string
62  * (gst_structure_to_string). This implies some limitations, of course.
63  * All fields of this structures that are not serializable to strings (ex.
64  * object pointers) are ignored, except for some cases where custom
65  * serialization may occur (ex error/warning/info messages that contain a
66  * GError are serialized differently).
67  *
68  * Buffers are transported by writing their content directly on the socket.
69  * More efficient ways for memory sharing could be implemented in the future.
70  */
71
72 #ifdef HAVE_CONFIG_H
73 #  include "config.h"
74 #endif
75
76 #include "gstipcpipelinesink.h"
77
78 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
79     GST_PAD_SINK,
80     GST_PAD_ALWAYS,
81     GST_STATIC_CAPS_ANY);
82
83 GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_sink_debug);
84 #define GST_CAT_DEFAULT gst_ipc_pipeline_sink_debug
85
86 enum
87 {
88   SIGNAL_DISCONNECT,
89   /* FILL ME */
90   LAST_SIGNAL
91 };
92 static guint gst_ipc_pipeline_sink_signals[LAST_SIGNAL] = { 0 };
93
94 enum
95 {
96   PROP_0,
97   PROP_FDIN,
98   PROP_FDOUT,
99   PROP_READ_CHUNK_SIZE,
100   PROP_ACK_TIME,
101 };
102
103
104 #define DEFAULT_READ_CHUNK_SIZE 4096
105 #define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
106
107 #define _do_init \
108     GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_sink_debug, "ipcpipelinesink", 0, "ipcpipelinesink element");
109 #define gst_ipc_pipeline_sink_parent_class parent_class
110 G_DEFINE_TYPE_WITH_CODE (GstIpcPipelineSink, gst_ipc_pipeline_sink,
111     GST_TYPE_ELEMENT, _do_init);
112
113 static void gst_ipc_pipeline_sink_set_property (GObject * object, guint prop_id,
114     const GValue * value, GParamSpec * pspec);
115 static void gst_ipc_pipeline_sink_get_property (GObject * object, guint prop_id,
116     GValue * value, GParamSpec * pspec);
117 static void gst_ipc_pipeline_sink_dispose (GObject * obj);
118 static void gst_ipc_pipeline_sink_finalize (GObject * obj);
119 static gboolean gst_ipc_pipeline_sink_start_reader_thread (GstIpcPipelineSink *
120     sink);
121 static void gst_ipc_pipeline_sink_stop_reader_thread (GstIpcPipelineSink *
122     sink);
123
124 static GstStateChangeReturn gst_ipc_pipeline_sink_change_state (GstElement *
125     element, GstStateChange transition);
126
127 static GstFlowReturn gst_ipc_pipeline_sink_chain (GstPad * pad,
128     GstObject * parent, GstBuffer * buffer);
129 static gboolean gst_ipc_pipeline_sink_event (GstPad * pad, GstObject * parent,
130     GstEvent * event);
131 static gboolean gst_ipc_pipeline_sink_element_query (GstElement * element,
132     GstQuery * query);
133 static gboolean gst_ipc_pipeline_sink_send_event (GstElement * element,
134     GstEvent * event);
135 static gboolean gst_ipc_pipeline_sink_query (GstPad * pad, GstObject * parent,
136     GstQuery * query);
137 static gboolean gst_ipc_pipeline_sink_pad_activate_mode (GstPad * pad,
138     GstObject * parent, GstPadMode mode, gboolean active);
139
140
141 static void gst_ipc_pipeline_sink_disconnect (GstIpcPipelineSink * sink);
142 static void pusher (gpointer data, gpointer user_data);
143
144
145 static void
146 gst_ipc_pipeline_sink_class_init (GstIpcPipelineSinkClass * klass)
147 {
148   GObjectClass *gobject_class;
149   GstElementClass *gstelement_class;
150
151   gobject_class = G_OBJECT_CLASS (klass);
152   gstelement_class = GST_ELEMENT_CLASS (klass);
153
154   gobject_class->set_property = gst_ipc_pipeline_sink_set_property;
155   gobject_class->get_property = gst_ipc_pipeline_sink_get_property;
156   gobject_class->dispose = gst_ipc_pipeline_sink_dispose;
157   gobject_class->finalize = gst_ipc_pipeline_sink_finalize;
158
159   g_object_class_install_property (gobject_class, PROP_FDIN,
160       g_param_spec_int ("fdin", "Input file descriptor",
161           "File descriptor to received data from",
162           -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
163   g_object_class_install_property (gobject_class, PROP_FDOUT,
164       g_param_spec_int ("fdout", "Output file descriptor",
165           "File descriptor to send data through",
166           -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
167   g_object_class_install_property (gobject_class, PROP_READ_CHUNK_SIZE,
168       g_param_spec_uint ("read-chunk-size", "Read chunk size",
169           "Read chunk size",
170           1, 1 << 24, DEFAULT_READ_CHUNK_SIZE,
171           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
172   g_object_class_install_property (gobject_class, PROP_ACK_TIME,
173       g_param_spec_uint64 ("ack-time", "Ack time",
174           "Maximum time to wait for a response to a message",
175           0, G_MAXUINT64, DEFAULT_ACK_TIME,
176           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
177
178   gst_ipc_pipeline_sink_signals[SIGNAL_DISCONNECT] =
179       g_signal_new ("disconnect",
180       G_TYPE_FROM_CLASS (klass),
181       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
182       G_STRUCT_OFFSET (GstIpcPipelineSinkClass, disconnect),
183       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
184
185   gst_element_class_set_static_metadata (gstelement_class,
186       "Inter-process Pipeline Sink",
187       "Sink",
188       "Allows splitting and continuing a pipeline in another process",
189       "Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>");
190   gst_element_class_add_pad_template (gstelement_class,
191       gst_static_pad_template_get (&sinktemplate));
192
193   gstelement_class->change_state =
194       GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_change_state);
195   gstelement_class->query =
196       GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_element_query);
197   gstelement_class->send_event =
198       GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_send_event);
199
200   klass->disconnect = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_disconnect);
201 }
202
203 static void
204 gst_ipc_pipeline_sink_init (GstIpcPipelineSink * sink)
205 {
206   GstPadTemplate *pad_template;
207
208   GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_SINK);
209
210   gst_ipc_pipeline_comm_init (&sink->comm, GST_ELEMENT (sink));
211   sink->comm.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
212   sink->comm.ack_time = DEFAULT_ACK_TIME;
213   sink->comm.fdin = -1;
214   sink->comm.fdout = -1;
215   sink->threads = g_thread_pool_new (pusher, sink, -1, FALSE, NULL);
216   gst_ipc_pipeline_sink_start_reader_thread (sink);
217
218   pad_template =
219       gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (sink), "sink");
220   g_return_if_fail (pad_template != NULL);
221
222   sink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
223
224   gst_pad_set_activatemode_function (sink->sinkpad,
225       gst_ipc_pipeline_sink_pad_activate_mode);
226   gst_pad_set_query_function (sink->sinkpad, gst_ipc_pipeline_sink_query);
227   gst_pad_set_event_function (sink->sinkpad, gst_ipc_pipeline_sink_event);
228   gst_pad_set_chain_function (sink->sinkpad, gst_ipc_pipeline_sink_chain);
229   gst_element_add_pad (GST_ELEMENT_CAST (sink), sink->sinkpad);
230
231 }
232
233 static void
234 gst_ipc_pipeline_sink_dispose (GObject * obj)
235 {
236   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (obj);
237
238   gst_ipc_pipeline_sink_stop_reader_thread (sink);
239   gst_ipc_pipeline_comm_cancel (&sink->comm, TRUE);
240
241   G_OBJECT_CLASS (parent_class)->dispose (obj);
242 }
243
244 static void
245 gst_ipc_pipeline_sink_finalize (GObject * obj)
246 {
247   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (obj);
248
249   gst_ipc_pipeline_comm_clear (&sink->comm);
250   g_thread_pool_free (sink->threads, TRUE, TRUE);
251
252   G_OBJECT_CLASS (parent_class)->finalize (obj);
253 }
254
255 static void
256 gst_ipc_pipeline_sink_set_property (GObject * object, guint prop_id,
257     const GValue * value, GParamSpec * pspec)
258 {
259   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (object);
260
261   switch (prop_id) {
262     case PROP_FDIN:
263       sink->comm.fdin = g_value_get_int (value);
264       break;
265     case PROP_FDOUT:
266       sink->comm.fdout = g_value_get_int (value);
267       break;
268     case PROP_READ_CHUNK_SIZE:
269       sink->comm.read_chunk_size = g_value_get_uint (value);
270       break;
271     case PROP_ACK_TIME:
272       sink->comm.ack_time = g_value_get_uint64 (value);
273       break;
274     default:
275       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
276       break;
277   }
278 }
279
280 static void
281 gst_ipc_pipeline_sink_get_property (GObject * object, guint prop_id,
282     GValue * value, GParamSpec * pspec)
283 {
284   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (object);
285
286   switch (prop_id) {
287     case PROP_FDIN:
288       g_value_set_int (value, sink->comm.fdin);
289       break;
290     case PROP_FDOUT:
291       g_value_set_int (value, sink->comm.fdout);
292       break;
293     case PROP_READ_CHUNK_SIZE:
294       g_value_set_uint (value, sink->comm.read_chunk_size);
295       break;
296     case PROP_ACK_TIME:
297       g_value_set_uint64 (value, sink->comm.ack_time);
298       break;
299     default:
300       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
301       break;
302   }
303 }
304
305 static gboolean
306 gst_ipc_pipeline_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
307 {
308   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
309   gboolean ret;
310
311   GST_DEBUG_OBJECT (sink, "received event %p of type %s (%d)",
312       event, gst_event_type_get_name (event->type), event->type);
313
314   ret = gst_ipc_pipeline_comm_write_event_to_fd (&sink->comm, FALSE, event);
315   gst_event_unref (event);
316   return ret;
317 }
318
319 static GstFlowReturn
320 gst_ipc_pipeline_sink_chain (GstPad * pad, GstObject * parent,
321     GstBuffer * buffer)
322 {
323   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
324   GstFlowReturn ret;
325
326   GST_DEBUG_OBJECT (sink, "Rendering buffer %" GST_PTR_FORMAT, buffer);
327
328   ret = gst_ipc_pipeline_comm_write_buffer_to_fd (&sink->comm, buffer);
329   if (ret != GST_FLOW_OK)
330     GST_DEBUG_OBJECT (sink, "Peer result was %s", gst_flow_get_name (ret));
331
332   gst_buffer_unref (buffer);
333   return ret;
334 }
335
336 static gboolean
337 gst_ipc_pipeline_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
338 {
339   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
340   gboolean ret;
341
342   GST_DEBUG_OBJECT (sink, "Got query %s: %" GST_PTR_FORMAT,
343       GST_QUERY_TYPE_NAME (query), query);
344
345   switch (GST_QUERY_TYPE (query)) {
346     case GST_QUERY_ALLOCATION:
347       GST_DEBUG_OBJECT (sink, "Rejecting ALLOCATION query");
348       return FALSE;
349     case GST_QUERY_CAPS:
350     {
351       /* caps queries occur even while linking the pipeline.
352        * It is possible that the ipcpipelinesrc may not be connected at this
353        * point, so let's avoid a couple of errors... */
354       GstState state;
355       GST_OBJECT_LOCK (sink);
356       state = GST_STATE (sink);
357       GST_OBJECT_UNLOCK (sink);
358       if (state == GST_STATE_NULL)
359         return FALSE;
360     }
361     default:
362       break;
363   }
364   ret = gst_ipc_pipeline_comm_write_query_to_fd (&sink->comm, FALSE, query);
365
366   return ret;
367 }
368
369 static gboolean
370 gst_ipc_pipeline_sink_element_query (GstElement * element, GstQuery * query)
371 {
372   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
373   gboolean ret;
374
375   GST_DEBUG_OBJECT (sink, "Got element query %s: %" GST_PTR_FORMAT,
376       GST_QUERY_TYPE_NAME (query), query);
377
378   ret = gst_ipc_pipeline_comm_write_query_to_fd (&sink->comm, TRUE, query);
379   GST_DEBUG_OBJECT (sink, "Got query reply: %d: %" GST_PTR_FORMAT, ret, query);
380   return ret;
381 }
382
383 static gboolean
384 gst_ipc_pipeline_sink_send_event (GstElement * element, GstEvent * event)
385 {
386   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
387   gboolean ret;
388
389   GST_DEBUG_OBJECT (sink, "Got element event %s: %" GST_PTR_FORMAT,
390       GST_EVENT_TYPE_NAME (event), event);
391
392   ret = gst_ipc_pipeline_comm_write_event_to_fd (&sink->comm, TRUE, event);
393   GST_DEBUG_OBJECT (sink, "Got event reply: %d: %" GST_PTR_FORMAT, ret, event);
394
395   gst_event_unref (event);
396   return ret;
397 }
398
399
400 static gboolean
401 gst_ipc_pipeline_sink_pad_activate_mode (GstPad * pad,
402     GstObject * parent, GstPadMode mode, gboolean active)
403 {
404   if (mode == GST_PAD_MODE_PULL)
405     return FALSE;
406   return TRUE;
407 }
408
409 static void
410 on_buffer (guint32 id, GstBuffer * buffer, gpointer user_data)
411 {
412   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
413   GST_ERROR_OBJECT (sink,
414       "Got buffer id %u! I never knew buffers could go upstream...", id);
415   gst_buffer_unref (buffer);
416 }
417
418 static void
419 pusher (gpointer data, gpointer user_data)
420 {
421   GstIpcPipelineSink *sink = user_data;
422   gboolean ret;
423   guint32 id;
424
425   id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (data),
426           QUARK_ID));
427
428   if (GST_IS_EVENT (data)) {
429     GstEvent *event = GST_EVENT (data);
430     GST_DEBUG_OBJECT (sink, "Pushing event async: %" GST_PTR_FORMAT, event);
431     ret = gst_pad_push_event (sink->sinkpad, event);
432     GST_DEBUG_OBJECT (sink, "Event pushed, return %d", ret);
433     gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&sink->comm, id, ret);
434   } else if (GST_IS_QUERY (data)) {
435     GstQuery *query = GST_QUERY (data);
436     GST_DEBUG_OBJECT (sink, "Pushing query async: %" GST_PTR_FORMAT, query);
437     ret = gst_pad_peer_query (sink->sinkpad, query);
438     GST_DEBUG_OBJECT (sink, "Query pushed, return %d", ret);
439     gst_ipc_pipeline_comm_write_query_result_to_fd (&sink->comm, id, ret,
440         query);
441     gst_query_unref (query);
442   } else {
443     GST_ERROR_OBJECT (sink, "Unsupported object type");
444   }
445   gst_object_unref (sink);
446 }
447
448 static void
449 on_event (guint32 id, GstEvent * event, gboolean upstream, gpointer user_data)
450 {
451   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
452
453   if (!upstream) {
454     GST_ERROR_OBJECT (sink, "Got downstream event id %u! Not supposed to...",
455         id);
456     gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&sink->comm, id, FALSE);
457     gst_event_unref (event);
458     return;
459   }
460
461   GST_DEBUG_OBJECT (sink, "Got event id %u: %" GST_PTR_FORMAT, id, event);
462   gst_object_ref (sink);
463   g_thread_pool_push (sink->threads, event, NULL);
464 }
465
466 static void
467 on_query (guint32 id, GstQuery * query, gboolean upstream, gpointer user_data)
468 {
469   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
470
471   if (!upstream) {
472     GST_ERROR_OBJECT (sink, "Got downstream query id %u! Not supposed to...",
473         id);
474     gst_ipc_pipeline_comm_write_query_result_to_fd (&sink->comm, id, FALSE,
475         query);
476     gst_query_unref (query);
477     return;
478   }
479
480   GST_DEBUG_OBJECT (sink, "Got query id %u: %" GST_PTR_FORMAT, id, query);
481   gst_object_ref (sink);
482   g_thread_pool_push (sink->threads, query, NULL);
483 }
484
485 static void
486 on_state_change (guint32 id, GstStateChange transition, gpointer user_data)
487 {
488   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
489   GST_ERROR_OBJECT (sink, "Got state change id %u! Not supposed to...", id);
490 }
491
492 static void
493 on_state_lost (gpointer user_data)
494 {
495   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
496
497   GST_DEBUG_OBJECT (sink, "Got state lost notification, losing state");
498
499   GST_OBJECT_LOCK (sink);
500   sink->pass_next_async_done = TRUE;
501   GST_OBJECT_UNLOCK (sink);
502
503   gst_element_lost_state (GST_ELEMENT (sink));
504 }
505
506 static void
507 do_async_done (GstElement * element, gpointer user_data)
508 {
509   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
510   GstMessage *message = user_data;
511
512   GST_STATE_LOCK (sink);
513   GST_OBJECT_LOCK (sink);
514   if (sink->pass_next_async_done) {
515     sink->pass_next_async_done = FALSE;
516     GST_OBJECT_UNLOCK (sink);
517     gst_element_continue_state (element, GST_STATE_CHANGE_SUCCESS);
518     GST_STATE_UNLOCK (sink);
519     gst_element_post_message (element, gst_message_ref (message));
520
521   } else {
522     GST_OBJECT_UNLOCK (sink);
523     GST_STATE_UNLOCK (sink);
524   }
525 }
526
527 static void
528 on_message (guint32 id, GstMessage * message, gpointer user_data)
529 {
530   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
531
532   GST_DEBUG_OBJECT (sink, "Got message id %u: %" GST_PTR_FORMAT, id, message);
533
534   switch (GST_MESSAGE_TYPE (message)) {
535     case GST_MESSAGE_ASYNC_DONE:
536       GST_OBJECT_LOCK (sink);
537       if (sink->pass_next_async_done) {
538         GST_OBJECT_UNLOCK (sink);
539         gst_element_call_async (GST_ELEMENT (sink), do_async_done,
540             message, (GDestroyNotify) gst_message_unref);
541       } else {
542         GST_OBJECT_UNLOCK (sink);
543         gst_message_unref (message);
544       }
545       return;
546     default:
547       break;
548   }
549
550   gst_element_post_message (GST_ELEMENT (sink), message);
551 }
552
553 static gboolean
554 gst_ipc_pipeline_sink_start_reader_thread (GstIpcPipelineSink * sink)
555 {
556   if (!gst_ipc_pipeline_comm_start_reader_thread (&sink->comm, on_buffer,
557           on_event, on_query, on_state_change, on_state_lost, on_message,
558           sink)) {
559     GST_ERROR_OBJECT (sink, "Failed to start reader thread");
560     return FALSE;
561   }
562   return TRUE;
563 }
564
565 static void
566 gst_ipc_pipeline_sink_stop_reader_thread (GstIpcPipelineSink * sink)
567 {
568   gst_ipc_pipeline_comm_stop_reader_thread (&sink->comm);
569 }
570
571
572 static void
573 gst_ipc_pipeline_sink_disconnect (GstIpcPipelineSink * sink)
574 {
575   GST_DEBUG_OBJECT (sink, "Disconnecting");
576   gst_ipc_pipeline_sink_stop_reader_thread (sink);
577   sink->comm.fdin = -1;
578   sink->comm.fdout = -1;
579   gst_ipc_pipeline_comm_cancel (&sink->comm, FALSE);
580   gst_ipc_pipeline_sink_start_reader_thread (sink);
581 }
582
583 static GstStateChangeReturn
584 gst_ipc_pipeline_sink_change_state (GstElement * element,
585     GstStateChange transition)
586 {
587   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
588   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
589   GstStateChangeReturn peer_ret = GST_STATE_CHANGE_SUCCESS;
590   gboolean async = FALSE;
591   gboolean down = FALSE;
592
593   GST_DEBUG_OBJECT (sink, "Got state change request: %s -> %s",
594       gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
595       gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
596
597   switch (transition) {
598     case GST_STATE_CHANGE_NULL_TO_READY:
599       if (sink->comm.fdin < 0) {
600         GST_ERROR_OBJECT (element, "Invalid fdin: %d", sink->comm.fdin);
601         return GST_STATE_CHANGE_FAILURE;
602       }
603       if (sink->comm.fdout < 0) {
604         GST_ERROR_OBJECT (element, "Invalid fdout: %d", sink->comm.fdout);
605         return GST_STATE_CHANGE_FAILURE;
606       }
607       if (!sink->comm.reader_thread) {
608         GST_ERROR_OBJECT (element, "Failed to start reader thread");
609         return GST_STATE_CHANGE_FAILURE;
610       }
611       break;
612     case GST_STATE_CHANGE_READY_TO_PAUSED:
613     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
614     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
615       /* In these transitions, it is possible that the peer returns ASYNC.
616        * We don't know that in advance, but we post async-start anyway because
617        * it needs to be delivered *before* async-done, and async-done may
618        * arrive at any point in time after we've set the state of the peer.
619        * In case the peer doesn't return ASYNC, we just post async-done
620        * ourselves and the parent GstBin takes care of matching and deleting
621        * them, so the app never gets any of these. */
622       async = TRUE;
623       break;
624     default:
625       break;
626   }
627
628   /* downwards state change */
629   down = (GST_STATE_TRANSITION_CURRENT (transition) >=
630       GST_STATE_TRANSITION_NEXT (transition));
631
632   if (async) {
633     GST_DEBUG_OBJECT (sink,
634         "Posting async-start for %s, will need state-change-done",
635         gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
636
637     gst_element_post_message (GST_ELEMENT (sink),
638         gst_message_new_async_start (GST_OBJECT (sink)));
639
640     GST_OBJECT_LOCK (sink);
641     sink->pass_next_async_done = TRUE;
642     GST_OBJECT_UNLOCK (sink);
643   }
644
645   /* change the state of the peer first */
646   /* If the fd out is -1, we do not actually call the peer. This will happen
647      when we explicitly disconnected, and in that case we want to be able
648      to bring the element down to NULL, so it can be restarted with a new
649      slave pipeline. */
650   if (sink->comm.fdout >= 0) {
651     GST_DEBUG_OBJECT (sink, "Calling peer with state change");
652     peer_ret = gst_ipc_pipeline_comm_write_state_change_to_fd (&sink->comm,
653         transition);
654     if (peer_ret == GST_STATE_CHANGE_FAILURE && down) {
655       GST_WARNING_OBJECT (sink, "Peer returned state change failure, "
656           "but ignoring because we are going down");
657       peer_ret = GST_STATE_CHANGE_SUCCESS;
658     }
659   } else {
660     if (down) {
661       GST_WARNING_OBJECT (sink, "Not calling peer (fdout %d)",
662           sink->comm.fdout);
663       peer_ret = GST_STATE_CHANGE_SUCCESS;
664     } else {
665       GST_ERROR_OBJECT (sink, "Not calling peer (fdout %d) and failing",
666           sink->comm.fdout);
667       peer_ret = GST_STATE_CHANGE_FAILURE;
668     }
669   }
670
671   /* chain up to the parent class to change our state, if the peer succeeded */
672   if (peer_ret != GST_STATE_CHANGE_FAILURE) {
673     ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
674
675     if (G_UNLIKELY (ret == GST_STATE_CHANGE_FAILURE && down)) {
676       GST_WARNING_OBJECT (sink, "Parent returned state change failure, "
677           "but ignoring because we are going down");
678       ret = GST_STATE_CHANGE_SUCCESS;
679     }
680   }
681
682   GST_DEBUG_OBJECT (sink, "For %s -> %s: Peer ret: %s, parent ret: %s",
683       gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
684       gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)),
685       gst_element_state_change_return_get_name (peer_ret),
686       gst_element_state_change_return_get_name (ret));
687
688   /* now interpret the return codes */
689   if (async && peer_ret != GST_STATE_CHANGE_ASYNC) {
690     GST_DEBUG_OBJECT (sink, "Posting async-done for %s; peer wasn't ASYNC",
691         gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
692
693     GST_OBJECT_LOCK (sink);
694     sink->pass_next_async_done = FALSE;
695     GST_OBJECT_UNLOCK (sink);
696
697     gst_element_post_message (GST_ELEMENT (sink),
698         gst_message_new_async_done (GST_OBJECT (sink), GST_CLOCK_TIME_NONE));
699   } else if (G_UNLIKELY (!async && peer_ret == GST_STATE_CHANGE_ASYNC)) {
700     GST_WARNING_OBJECT (sink, "Transition not async but peer returned ASYNC");
701     peer_ret = GST_STATE_CHANGE_SUCCESS;
702   }
703
704   if (peer_ret == GST_STATE_CHANGE_FAILURE || ret == GST_STATE_CHANGE_FAILURE) {
705     if (peer_ret != GST_STATE_CHANGE_FAILURE && sink->comm.fdout >= 0) {
706       /* only the parent's ret was FAILURE - revert remote changes */
707       GST_DEBUG_OBJECT (sink, "Reverting remote state change because parent "
708           "returned failure");
709       gst_ipc_pipeline_comm_write_state_change_to_fd (&sink->comm,
710           GST_STATE_TRANSITION (GST_STATE_TRANSITION_NEXT (transition),
711               GST_STATE_TRANSITION_CURRENT (transition)));
712     }
713     return GST_STATE_CHANGE_FAILURE;
714   }
715
716   /* the parent's (GstElement) state change func won't return ASYNC or
717    * NO_PREROLL, so unless it has returned FAILURE, which we have caught above,
718    * we are not interested in its return code... just return the peer's */
719   return peer_ret;
720 }