2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2005 Wim Taymans <wim@fluendo.com>
4 * 2006 Thomas Vander Stichele <thomas at apestaart dot org>
5 * 2014 Tim-Philipp Müller <tim centricular com>
6 * 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
8 * gstipcpipelinesink.c:
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Library General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Library General Public License for more details.
20 * You should have received a copy of the GNU Library General Public
21 * License along with this library; if not, write to the
22 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23 * Boston, MA 02110-1301, USA.
26 * SECTION:element-ipcpipelinesink
27 * @see_also: #GstIpcPipelineSrc, #GstIpcSlavePipeline
29 * Communicates with an ipcpipelinesrc element in another process via a socket.
31 * This element, together with ipcpipelinesrc and ipcslavepipeline form a
32 * mechanism that allows splitting a single pipeline in different processes.
33 * The main use-case for it is a playback pipeline split in two parts, where the
34 * first part contains the networking, parsing and demuxing and the second part
35 * contains the decoding and display. The intention of this split is to improve
36 * security of an application, by letting the networking, parsing and demuxing
37 * parts run in a less privileged process than the process that accesses the
38 * decoder and display.
40 * Once the pipelines in those different processes have been created, the
41 * playback can be controlled entirely from the first pipeline, which is the
42 * one that contains ipcpipelinesink. We call this pipeline the “master”.
43 * All relevant events and queries sent from the application are sent to
44 * the master pipeline and messages to the application are sent from the master
45 * pipeline. The second pipeline, in the other process, is transparently slaved.
47 * ipcpipelinesink can work only in push mode and does not synchronize buffers
48 * to the clock. Synchronization is meant to happen either at the real sink at
49 * the end of the remote slave pipeline, or not to happen at all, if the
52 * A master pipeline may contain more than one ipcpipelinesink elements, which
53 * can be connected either to the same slave pipeline or to different ones.
55 * Communication with ipcpipelinesrc on the slave happens via a socket, using a
56 * custom protocol. Each buffer, event, query, message or state change is
57 * serialized in a "packet" and sent over the socket. The sender then
58 * performs a blocking wait for a reply, if a return code is needed.
60 * All objects that contan a GstStructure (messages, queries, events) are
61 * serialized by serializing the GstStructure to a string
62 * (gst_structure_to_string). This implies some limitations, of course.
63 * All fields of this structures that are not serializable to strings (ex.
64 * object pointers) are ignored, except for some cases where custom
65 * serialization may occur (ex error/warning/info messages that contain a
66 * GError are serialized differently).
68 * Buffers are transported by writing their content directly on the socket.
69 * More efficient ways for memory sharing could be implemented in the future.
76 #include "gstipcpipelinesink.h"
78 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
83 GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_sink_debug);
84 #define GST_CAT_DEFAULT gst_ipc_pipeline_sink_debug
92 static guint gst_ipc_pipeline_sink_signals[LAST_SIGNAL] = { 0 };
104 #define DEFAULT_READ_CHUNK_SIZE 4096
105 #define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
108 GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_sink_debug, "ipcpipelinesink", 0, "ipcpipelinesink element");
109 #define gst_ipc_pipeline_sink_parent_class parent_class
110 G_DEFINE_TYPE_WITH_CODE (GstIpcPipelineSink, gst_ipc_pipeline_sink,
111 GST_TYPE_ELEMENT, _do_init);
113 static void gst_ipc_pipeline_sink_set_property (GObject * object, guint prop_id,
114 const GValue * value, GParamSpec * pspec);
115 static void gst_ipc_pipeline_sink_get_property (GObject * object, guint prop_id,
116 GValue * value, GParamSpec * pspec);
117 static void gst_ipc_pipeline_sink_dispose (GObject * obj);
118 static void gst_ipc_pipeline_sink_finalize (GObject * obj);
119 static gboolean gst_ipc_pipeline_sink_start_reader_thread (GstIpcPipelineSink *
121 static void gst_ipc_pipeline_sink_stop_reader_thread (GstIpcPipelineSink *
124 static GstStateChangeReturn gst_ipc_pipeline_sink_change_state (GstElement *
125 element, GstStateChange transition);
127 static GstFlowReturn gst_ipc_pipeline_sink_chain (GstPad * pad,
128 GstObject * parent, GstBuffer * buffer);
129 static gboolean gst_ipc_pipeline_sink_event (GstPad * pad, GstObject * parent,
131 static gboolean gst_ipc_pipeline_sink_element_query (GstElement * element,
133 static gboolean gst_ipc_pipeline_sink_send_event (GstElement * element,
135 static gboolean gst_ipc_pipeline_sink_query (GstPad * pad, GstObject * parent,
137 static gboolean gst_ipc_pipeline_sink_pad_activate_mode (GstPad * pad,
138 GstObject * parent, GstPadMode mode, gboolean active);
141 static void gst_ipc_pipeline_sink_disconnect (GstIpcPipelineSink * sink);
142 static void pusher (gpointer data, gpointer user_data);
146 gst_ipc_pipeline_sink_class_init (GstIpcPipelineSinkClass * klass)
148 GObjectClass *gobject_class;
149 GstElementClass *gstelement_class;
151 gobject_class = G_OBJECT_CLASS (klass);
152 gstelement_class = GST_ELEMENT_CLASS (klass);
154 gobject_class->set_property = gst_ipc_pipeline_sink_set_property;
155 gobject_class->get_property = gst_ipc_pipeline_sink_get_property;
156 gobject_class->dispose = gst_ipc_pipeline_sink_dispose;
157 gobject_class->finalize = gst_ipc_pipeline_sink_finalize;
159 g_object_class_install_property (gobject_class, PROP_FDIN,
160 g_param_spec_int ("fdin", "Input file descriptor",
161 "File descriptor to received data from",
162 -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
163 g_object_class_install_property (gobject_class, PROP_FDOUT,
164 g_param_spec_int ("fdout", "Output file descriptor",
165 "File descriptor to send data through",
166 -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
167 g_object_class_install_property (gobject_class, PROP_READ_CHUNK_SIZE,
168 g_param_spec_uint ("read-chunk-size", "Read chunk size",
170 1, 1 << 24, DEFAULT_READ_CHUNK_SIZE,
171 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
172 g_object_class_install_property (gobject_class, PROP_ACK_TIME,
173 g_param_spec_uint64 ("ack-time", "Ack time",
174 "Maximum time to wait for a response to a message",
175 0, G_MAXUINT64, DEFAULT_ACK_TIME,
176 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
178 gst_ipc_pipeline_sink_signals[SIGNAL_DISCONNECT] =
179 g_signal_new ("disconnect",
180 G_TYPE_FROM_CLASS (klass),
181 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
182 G_STRUCT_OFFSET (GstIpcPipelineSinkClass, disconnect),
183 NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
185 gst_element_class_set_static_metadata (gstelement_class,
186 "Inter-process Pipeline Sink",
188 "Allows splitting and continuing a pipeline in another process",
189 "Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>");
190 gst_element_class_add_pad_template (gstelement_class,
191 gst_static_pad_template_get (&sinktemplate));
193 gstelement_class->change_state =
194 GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_change_state);
195 gstelement_class->query =
196 GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_element_query);
197 gstelement_class->send_event =
198 GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_send_event);
200 klass->disconnect = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_disconnect);
204 gst_ipc_pipeline_sink_init (GstIpcPipelineSink * sink)
206 GstPadTemplate *pad_template;
208 GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_SINK);
210 gst_ipc_pipeline_comm_init (&sink->comm, GST_ELEMENT (sink));
211 sink->comm.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
212 sink->comm.ack_time = DEFAULT_ACK_TIME;
213 sink->comm.fdin = -1;
214 sink->comm.fdout = -1;
215 sink->threads = g_thread_pool_new (pusher, sink, -1, FALSE, NULL);
216 gst_ipc_pipeline_sink_start_reader_thread (sink);
219 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (sink), "sink");
220 g_return_if_fail (pad_template != NULL);
222 sink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
224 gst_pad_set_activatemode_function (sink->sinkpad,
225 gst_ipc_pipeline_sink_pad_activate_mode);
226 gst_pad_set_query_function (sink->sinkpad, gst_ipc_pipeline_sink_query);
227 gst_pad_set_event_function (sink->sinkpad, gst_ipc_pipeline_sink_event);
228 gst_pad_set_chain_function (sink->sinkpad, gst_ipc_pipeline_sink_chain);
229 gst_element_add_pad (GST_ELEMENT_CAST (sink), sink->sinkpad);
234 gst_ipc_pipeline_sink_dispose (GObject * obj)
236 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (obj);
238 gst_ipc_pipeline_sink_stop_reader_thread (sink);
239 gst_ipc_pipeline_comm_cancel (&sink->comm, TRUE);
241 G_OBJECT_CLASS (parent_class)->dispose (obj);
245 gst_ipc_pipeline_sink_finalize (GObject * obj)
247 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (obj);
249 gst_ipc_pipeline_comm_clear (&sink->comm);
250 g_thread_pool_free (sink->threads, TRUE, TRUE);
252 G_OBJECT_CLASS (parent_class)->finalize (obj);
256 gst_ipc_pipeline_sink_set_property (GObject * object, guint prop_id,
257 const GValue * value, GParamSpec * pspec)
259 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (object);
263 sink->comm.fdin = g_value_get_int (value);
266 sink->comm.fdout = g_value_get_int (value);
268 case PROP_READ_CHUNK_SIZE:
269 sink->comm.read_chunk_size = g_value_get_uint (value);
272 sink->comm.ack_time = g_value_get_uint64 (value);
275 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
281 gst_ipc_pipeline_sink_get_property (GObject * object, guint prop_id,
282 GValue * value, GParamSpec * pspec)
284 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (object);
288 g_value_set_int (value, sink->comm.fdin);
291 g_value_set_int (value, sink->comm.fdout);
293 case PROP_READ_CHUNK_SIZE:
294 g_value_set_uint (value, sink->comm.read_chunk_size);
297 g_value_set_uint64 (value, sink->comm.ack_time);
300 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
306 gst_ipc_pipeline_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
308 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
311 GST_DEBUG_OBJECT (sink, "received event %p of type %s (%d)",
312 event, gst_event_type_get_name (event->type), event->type);
314 ret = gst_ipc_pipeline_comm_write_event_to_fd (&sink->comm, FALSE, event);
315 gst_event_unref (event);
320 gst_ipc_pipeline_sink_chain (GstPad * pad, GstObject * parent,
323 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
326 GST_DEBUG_OBJECT (sink, "Rendering buffer %" GST_PTR_FORMAT, buffer);
328 ret = gst_ipc_pipeline_comm_write_buffer_to_fd (&sink->comm, buffer);
329 if (ret != GST_FLOW_OK)
330 GST_DEBUG_OBJECT (sink, "Peer result was %s", gst_flow_get_name (ret));
332 gst_buffer_unref (buffer);
337 gst_ipc_pipeline_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
339 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
342 GST_DEBUG_OBJECT (sink, "Got query %s: %" GST_PTR_FORMAT,
343 GST_QUERY_TYPE_NAME (query), query);
345 switch (GST_QUERY_TYPE (query)) {
346 case GST_QUERY_ALLOCATION:
347 GST_DEBUG_OBJECT (sink, "Rejecting ALLOCATION query");
351 /* caps queries occur even while linking the pipeline.
352 * It is possible that the ipcpipelinesrc may not be connected at this
353 * point, so let's avoid a couple of errors... */
355 GST_OBJECT_LOCK (sink);
356 state = GST_STATE (sink);
357 GST_OBJECT_UNLOCK (sink);
358 if (state == GST_STATE_NULL)
364 ret = gst_ipc_pipeline_comm_write_query_to_fd (&sink->comm, FALSE, query);
370 gst_ipc_pipeline_sink_element_query (GstElement * element, GstQuery * query)
372 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
375 GST_DEBUG_OBJECT (sink, "Got element query %s: %" GST_PTR_FORMAT,
376 GST_QUERY_TYPE_NAME (query), query);
378 ret = gst_ipc_pipeline_comm_write_query_to_fd (&sink->comm, TRUE, query);
379 GST_DEBUG_OBJECT (sink, "Got query reply: %d: %" GST_PTR_FORMAT, ret, query);
384 gst_ipc_pipeline_sink_send_event (GstElement * element, GstEvent * event)
386 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
389 GST_DEBUG_OBJECT (sink, "Got element event %s: %" GST_PTR_FORMAT,
390 GST_EVENT_TYPE_NAME (event), event);
392 ret = gst_ipc_pipeline_comm_write_event_to_fd (&sink->comm, TRUE, event);
393 GST_DEBUG_OBJECT (sink, "Got event reply: %d: %" GST_PTR_FORMAT, ret, event);
395 gst_event_unref (event);
401 gst_ipc_pipeline_sink_pad_activate_mode (GstPad * pad,
402 GstObject * parent, GstPadMode mode, gboolean active)
404 if (mode == GST_PAD_MODE_PULL)
410 on_buffer (guint32 id, GstBuffer * buffer, gpointer user_data)
412 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
413 GST_ERROR_OBJECT (sink,
414 "Got buffer id %u! I never knew buffers could go upstream...", id);
415 gst_buffer_unref (buffer);
419 pusher (gpointer data, gpointer user_data)
421 GstIpcPipelineSink *sink = user_data;
425 id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (data),
428 if (GST_IS_EVENT (data)) {
429 GstEvent *event = GST_EVENT (data);
430 GST_DEBUG_OBJECT (sink, "Pushing event async: %" GST_PTR_FORMAT, event);
431 ret = gst_pad_push_event (sink->sinkpad, event);
432 GST_DEBUG_OBJECT (sink, "Event pushed, return %d", ret);
433 gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&sink->comm, id, ret);
434 } else if (GST_IS_QUERY (data)) {
435 GstQuery *query = GST_QUERY (data);
436 GST_DEBUG_OBJECT (sink, "Pushing query async: %" GST_PTR_FORMAT, query);
437 ret = gst_pad_peer_query (sink->sinkpad, query);
438 GST_DEBUG_OBJECT (sink, "Query pushed, return %d", ret);
439 gst_ipc_pipeline_comm_write_query_result_to_fd (&sink->comm, id, ret,
441 gst_query_unref (query);
443 GST_ERROR_OBJECT (sink, "Unsupported object type");
445 gst_object_unref (sink);
449 on_event (guint32 id, GstEvent * event, gboolean upstream, gpointer user_data)
451 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
454 GST_ERROR_OBJECT (sink, "Got downstream event id %u! Not supposed to...",
456 gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&sink->comm, id, FALSE);
457 gst_event_unref (event);
461 GST_DEBUG_OBJECT (sink, "Got event id %u: %" GST_PTR_FORMAT, id, event);
462 gst_object_ref (sink);
463 g_thread_pool_push (sink->threads, event, NULL);
467 on_query (guint32 id, GstQuery * query, gboolean upstream, gpointer user_data)
469 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
472 GST_ERROR_OBJECT (sink, "Got downstream query id %u! Not supposed to...",
474 gst_ipc_pipeline_comm_write_query_result_to_fd (&sink->comm, id, FALSE,
476 gst_query_unref (query);
480 GST_DEBUG_OBJECT (sink, "Got query id %u: %" GST_PTR_FORMAT, id, query);
481 gst_object_ref (sink);
482 g_thread_pool_push (sink->threads, query, NULL);
486 on_state_change (guint32 id, GstStateChange transition, gpointer user_data)
488 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
489 GST_ERROR_OBJECT (sink, "Got state change id %u! Not supposed to...", id);
493 on_state_lost (gpointer user_data)
495 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
497 GST_DEBUG_OBJECT (sink, "Got state lost notification, losing state");
499 GST_OBJECT_LOCK (sink);
500 sink->pass_next_async_done = TRUE;
501 GST_OBJECT_UNLOCK (sink);
503 gst_element_lost_state (GST_ELEMENT (sink));
507 do_async_done (GstElement * element, gpointer user_data)
509 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
510 GstMessage *message = user_data;
512 GST_STATE_LOCK (sink);
513 GST_OBJECT_LOCK (sink);
514 if (sink->pass_next_async_done) {
515 sink->pass_next_async_done = FALSE;
516 GST_OBJECT_UNLOCK (sink);
517 gst_element_continue_state (element, GST_STATE_CHANGE_SUCCESS);
518 GST_STATE_UNLOCK (sink);
519 gst_element_post_message (element, gst_message_ref (message));
522 GST_OBJECT_UNLOCK (sink);
523 GST_STATE_UNLOCK (sink);
528 on_message (guint32 id, GstMessage * message, gpointer user_data)
530 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
532 GST_DEBUG_OBJECT (sink, "Got message id %u: %" GST_PTR_FORMAT, id, message);
534 switch (GST_MESSAGE_TYPE (message)) {
535 case GST_MESSAGE_ASYNC_DONE:
536 GST_OBJECT_LOCK (sink);
537 if (sink->pass_next_async_done) {
538 GST_OBJECT_UNLOCK (sink);
539 gst_element_call_async (GST_ELEMENT (sink), do_async_done,
540 message, (GDestroyNotify) gst_message_unref);
542 GST_OBJECT_UNLOCK (sink);
543 gst_message_unref (message);
550 gst_element_post_message (GST_ELEMENT (sink), message);
554 gst_ipc_pipeline_sink_start_reader_thread (GstIpcPipelineSink * sink)
556 if (!gst_ipc_pipeline_comm_start_reader_thread (&sink->comm, on_buffer,
557 on_event, on_query, on_state_change, on_state_lost, on_message,
559 GST_ERROR_OBJECT (sink, "Failed to start reader thread");
566 gst_ipc_pipeline_sink_stop_reader_thread (GstIpcPipelineSink * sink)
568 gst_ipc_pipeline_comm_stop_reader_thread (&sink->comm);
573 gst_ipc_pipeline_sink_disconnect (GstIpcPipelineSink * sink)
575 GST_DEBUG_OBJECT (sink, "Disconnecting");
576 gst_ipc_pipeline_sink_stop_reader_thread (sink);
577 sink->comm.fdin = -1;
578 sink->comm.fdout = -1;
579 gst_ipc_pipeline_comm_cancel (&sink->comm, FALSE);
580 gst_ipc_pipeline_sink_start_reader_thread (sink);
583 static GstStateChangeReturn
584 gst_ipc_pipeline_sink_change_state (GstElement * element,
585 GstStateChange transition)
587 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
588 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
589 GstStateChangeReturn peer_ret = GST_STATE_CHANGE_SUCCESS;
590 gboolean async = FALSE;
591 gboolean down = FALSE;
593 GST_DEBUG_OBJECT (sink, "Got state change request: %s -> %s",
594 gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
595 gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
597 switch (transition) {
598 case GST_STATE_CHANGE_NULL_TO_READY:
599 if (sink->comm.fdin < 0) {
600 GST_ERROR_OBJECT (element, "Invalid fdin: %d", sink->comm.fdin);
601 return GST_STATE_CHANGE_FAILURE;
603 if (sink->comm.fdout < 0) {
604 GST_ERROR_OBJECT (element, "Invalid fdout: %d", sink->comm.fdout);
605 return GST_STATE_CHANGE_FAILURE;
607 if (!sink->comm.reader_thread) {
608 GST_ERROR_OBJECT (element, "Failed to start reader thread");
609 return GST_STATE_CHANGE_FAILURE;
612 case GST_STATE_CHANGE_READY_TO_PAUSED:
613 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
614 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
615 /* In these transitions, it is possible that the peer returns ASYNC.
616 * We don't know that in advance, but we post async-start anyway because
617 * it needs to be delivered *before* async-done, and async-done may
618 * arrive at any point in time after we've set the state of the peer.
619 * In case the peer doesn't return ASYNC, we just post async-done
620 * ourselves and the parent GstBin takes care of matching and deleting
621 * them, so the app never gets any of these. */
628 /* downwards state change */
629 down = (GST_STATE_TRANSITION_CURRENT (transition) >=
630 GST_STATE_TRANSITION_NEXT (transition));
633 GST_DEBUG_OBJECT (sink,
634 "Posting async-start for %s, will need state-change-done",
635 gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
637 gst_element_post_message (GST_ELEMENT (sink),
638 gst_message_new_async_start (GST_OBJECT (sink)));
640 GST_OBJECT_LOCK (sink);
641 sink->pass_next_async_done = TRUE;
642 GST_OBJECT_UNLOCK (sink);
645 /* change the state of the peer first */
646 /* If the fd out is -1, we do not actually call the peer. This will happen
647 when we explicitely disconnected, and in that case we want to be able
648 to bring the element down to NULL, so it can be restarted with a new
650 if (sink->comm.fdout >= 0) {
651 GST_DEBUG_OBJECT (sink, "Calling peer with state change");
652 peer_ret = gst_ipc_pipeline_comm_write_state_change_to_fd (&sink->comm,
654 if (peer_ret == GST_STATE_CHANGE_FAILURE && down) {
655 GST_WARNING_OBJECT (sink, "Peer returned state change failure, "
656 "but ignoring because we are going down");
657 peer_ret = GST_STATE_CHANGE_SUCCESS;
661 GST_WARNING_OBJECT (sink, "Not calling peer (fdout %d)",
663 peer_ret = GST_STATE_CHANGE_SUCCESS;
665 GST_ERROR_OBJECT (sink, "Not calling peer (fdout %d) and failing",
667 peer_ret = GST_STATE_CHANGE_FAILURE;
671 /* chain up to the parent class to change our state, if the peer succeeded */
672 if (peer_ret != GST_STATE_CHANGE_FAILURE) {
673 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
675 if (G_UNLIKELY (ret == GST_STATE_CHANGE_FAILURE && down)) {
676 GST_WARNING_OBJECT (sink, "Parent returned state change failure, "
677 "but ignoring because we are going down");
678 ret = GST_STATE_CHANGE_SUCCESS;
682 GST_DEBUG_OBJECT (sink, "For %s -> %s: Peer ret: %s, parent ret: %s",
683 gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
684 gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)),
685 gst_element_state_change_return_get_name (peer_ret),
686 gst_element_state_change_return_get_name (ret));
688 /* now interpret the return codes */
689 if (async && peer_ret != GST_STATE_CHANGE_ASYNC) {
690 GST_DEBUG_OBJECT (sink, "Posting async-done for %s; peer wasn't ASYNC",
691 gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
693 GST_OBJECT_LOCK (sink);
694 sink->pass_next_async_done = FALSE;
695 GST_OBJECT_UNLOCK (sink);
697 gst_element_post_message (GST_ELEMENT (sink),
698 gst_message_new_async_done (GST_OBJECT (sink), GST_CLOCK_TIME_NONE));
699 } else if (G_UNLIKELY (!async && peer_ret == GST_STATE_CHANGE_ASYNC)) {
700 GST_WARNING_OBJECT (sink, "Transition not async but peer returned ASYNC");
701 peer_ret = GST_STATE_CHANGE_SUCCESS;
704 if (peer_ret == GST_STATE_CHANGE_FAILURE || ret == GST_STATE_CHANGE_FAILURE) {
705 if (peer_ret != GST_STATE_CHANGE_FAILURE && sink->comm.fdout >= 0) {
706 /* only the parent's ret was FAILURE - revert remote changes */
707 GST_DEBUG_OBJECT (sink, "Reverting remote state change because parent "
709 gst_ipc_pipeline_comm_write_state_change_to_fd (&sink->comm,
710 GST_STATE_TRANSITION (GST_STATE_TRANSITION_NEXT (transition),
711 GST_STATE_TRANSITION_CURRENT (transition)));
713 return GST_STATE_CHANGE_FAILURE;
716 /* the parent's (GstElement) state change func won't return ASYNC or
717 * NO_PREROLL, so unless it has returned FAILURE, which we have catched above,
718 * we are not interested in its return code... just return the peer's */