2 * Copyright (C) 2015-2017 YouView TV Ltd
3 * Author: Vincent Penquerch <vincent.penquerch@collabora.co.uk>
5 * gstipcpipelinecomm.c:
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
31 /* ssize_t is not available, so match return value of read()/write() on MSVC */
36 #include <gst/base/gstbytewriter.h>
37 #include <gst/gstprotection.h>
38 #include "gstipcpipelinecomm.h"
40 GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_comm_debug);
41 #define GST_CAT_DEFAULT gst_ipc_pipeline_comm_debug
43 #define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
56 COMM_REQUEST_TYPE_BUFFER,
57 COMM_REQUEST_TYPE_EVENT,
58 COMM_REQUEST_TYPE_QUERY,
59 COMM_REQUEST_TYPE_STATE_CHANGE,
60 COMM_REQUEST_TYPE_MESSAGE,
74 static const gchar *comm_request_ret_get_name (CommRequestType type,
76 static guint32 comm_request_ret_get_failure_value (CommRequestType type);
79 comm_request_new (guint32 id, CommRequestType type, GstQuery * query)
83 req = g_malloc (sizeof (CommRequest));
85 g_cond_init (&req->cond);
87 req->comm_error = FALSE;
89 req->ret = comm_request_ret_get_failure_value (type);
96 comm_request_wait (GstIpcPipelineComm * comm, CommRequest * req,
99 guint32 ret = comm_request_ret_get_failure_value (req->type);
102 if (ack_type == ACK_TYPE_TIMED)
103 end_time = g_get_monotonic_time () + comm->ack_time;
105 end_time = G_MAXUINT64;
107 GST_TRACE_OBJECT (comm->element, "Waiting for ACK/NAK for request %u",
109 while (!req->replied) {
110 if (ack_type == ACK_TYPE_TIMED) {
111 if (!g_cond_wait_until (&req->cond, &comm->mutex, end_time))
114 g_cond_wait (&req->cond, &comm->mutex);
119 GST_TRACE_OBJECT (comm->element, "Got reply for request %u: %d (%s)",
120 req->id, ret, comm_request_ret_get_name (req->type, ret));
122 req->comm_error = TRUE;
123 GST_ERROR_OBJECT (comm->element, "Timeout waiting for reply for request %u",
131 comm_request_free (CommRequest * req)
133 g_cond_clear (&req->cond);
138 comm_request_ret_get_name (CommRequestType type, guint32 ret)
141 case COMM_REQUEST_TYPE_BUFFER:
142 return gst_flow_get_name (ret);
143 case COMM_REQUEST_TYPE_EVENT:
144 case COMM_REQUEST_TYPE_QUERY:
145 case COMM_REQUEST_TYPE_MESSAGE:
146 return ret ? "TRUE" : "FALSE";
147 case COMM_REQUEST_TYPE_STATE_CHANGE:
148 return gst_element_state_change_return_get_name (ret);
150 g_assert_not_reached ();
155 comm_request_ret_get_failure_value (CommRequestType type)
158 case COMM_REQUEST_TYPE_BUFFER:
159 return GST_FLOW_COMM_ERROR;
160 case COMM_REQUEST_TYPE_EVENT:
161 case COMM_REQUEST_TYPE_MESSAGE:
162 case COMM_REQUEST_TYPE_QUERY:
164 case COMM_REQUEST_TYPE_STATE_CHANGE:
165 return GST_STATE_CHANGE_FAILURE;
167 g_assert_not_reached ();
172 gst_ipc_pipeline_comm_data_type_get_name (GstIpcPipelineCommDataType type)
175 case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
177 case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
178 return "QUERY_RESULT";
179 case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
181 case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
183 case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
184 return "SINK_MESSAGE_EVENT";
185 case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
187 case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
188 return "STATE_CHANGE";
189 case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
191 case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
193 case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
194 return "GERROR_MESSAGE";
201 gst_ipc_pipeline_comm_sync_fd (GstIpcPipelineComm * comm, guint32 id,
202 GstQuery * query, guint32 * ret, AckType ack_type, CommRequestType type)
206 GHashTable *waiting_ids;
208 if (ack_type == ACK_TYPE_NONE)
211 req = comm_request_new (id, type, query);
212 waiting_ids = g_hash_table_ref (comm->waiting_ids);
213 g_hash_table_insert (waiting_ids, GINT_TO_POINTER (id), req);
214 *ret = comm_request_wait (comm, req, ack_type);
215 comm_error = req->comm_error;
216 g_hash_table_remove (waiting_ids, GINT_TO_POINTER (id));
217 g_hash_table_unref (waiting_ids);
222 write_to_fd_raw (GstIpcPipelineComm * comm, const void *data, size_t size)
228 GST_TRACE_OBJECT (comm->element, "Writing %u bytes to fdout",
232 write (comm->fdout, (const unsigned char *) data + offset, size);
234 if (errno == EAGAIN || errno == EINTR)
236 GST_ERROR_OBJECT (comm->element, "Failed to write to fd: %s",
250 write_byte_writer_to_fd (GstIpcPipelineComm * comm, GstByteWriter * bw)
256 size = gst_byte_writer_get_size (bw);
257 data = gst_byte_writer_reset_and_get_data (bw);
260 ret = write_to_fd_raw (comm, data, size);
266 gst_ipc_pipeline_comm_write_ack_to_fd (GstIpcPipelineComm * comm, guint32 id,
267 guint32 ret, CommRequestType type)
269 const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK;
273 g_mutex_lock (&comm->mutex);
275 GST_TRACE_OBJECT (comm->element, "Writing ACK for %u: %s (%d)", id,
276 comm_request_ret_get_name (type, ret), ret);
277 gst_byte_writer_init (&bw);
278 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
280 if (!gst_byte_writer_put_uint32_le (&bw, id))
283 if (!gst_byte_writer_put_uint32_le (&bw, size))
285 if (!gst_byte_writer_put_uint32_le (&bw, ret))
288 if (!write_byte_writer_to_fd (comm, &bw))
292 g_mutex_unlock (&comm->mutex);
293 gst_byte_writer_reset (&bw);
297 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
298 ("Failed to write to socket"));
303 gst_ipc_pipeline_comm_write_flow_ack_to_fd (GstIpcPipelineComm * comm,
304 guint32 id, GstFlowReturn ret)
306 gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
307 COMM_REQUEST_TYPE_BUFFER);
311 gst_ipc_pipeline_comm_write_boolean_ack_to_fd (GstIpcPipelineComm * comm,
312 guint32 id, gboolean ret)
314 gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
315 COMM_REQUEST_TYPE_EVENT);
319 gst_ipc_pipeline_comm_write_state_change_ack_to_fd (GstIpcPipelineComm * comm,
320 guint32 id, GstStateChangeReturn ret)
322 gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
323 COMM_REQUEST_TYPE_STATE_CHANGE);
327 gst_ipc_pipeline_comm_write_query_result_to_fd (GstIpcPipelineComm * comm,
328 guint32 id, gboolean result, GstQuery * query)
330 const unsigned char payload_type =
331 GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT;
332 guint8 result8 = result;
337 const GstStructure *structure;
340 g_mutex_lock (&comm->mutex);
342 GST_TRACE_OBJECT (comm->element,
343 "Writing query result for %u: %d, %" GST_PTR_FORMAT, id, result, query);
344 gst_byte_writer_init (&bw);
345 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
347 if (!gst_byte_writer_put_uint32_le (&bw, id))
349 structure = gst_query_get_structure (query);
351 str = gst_structure_to_string (structure);
357 size = 1 + sizeof (guint32) + len + 1;
358 if (!gst_byte_writer_put_uint32_le (&bw, size))
360 if (!gst_byte_writer_put_uint8 (&bw, result8))
362 type = GST_QUERY_TYPE (query);
363 if (!gst_byte_writer_put_uint32_le (&bw, type))
366 if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, len + 1))
369 if (!gst_byte_writer_put_uint8 (&bw, 0))
373 if (!write_byte_writer_to_fd (comm, &bw))
377 g_mutex_unlock (&comm->mutex);
378 gst_byte_writer_reset (&bw);
383 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
384 ("Failed to write to socket"));
389 gst_ipc_pipeline_comm_read_query_result (GstIpcPipelineComm * comm,
390 guint32 size, GstQuery ** query)
393 GstStructure *structure;
396 const guint8 *payload = NULL;
397 guint32 mapped_size = size;
399 /* this should not be called if we don't have enough yet */
401 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE);
402 g_return_val_if_fail (size >= 1 + sizeof (guint32), FALSE);
404 payload = gst_adapter_map (comm->adapter, mapped_size);
408 memcpy (&type, payload, sizeof (type));
409 payload += sizeof (type);
411 size -= 1 + sizeof (guint32);
415 if (payload[size - 1]) {
420 structure = gst_structure_from_string ((const char *) payload, &end);
429 *query = gst_query_new_custom (type, structure);
432 gst_adapter_unmap (comm->adapter);
433 gst_adapter_flush (comm->adapter, mapped_size);
449 GstIpcPipelineComm *comm;
453 } MetaListRepresentation;
456 build_meta (GstBuffer * buffer, GstMeta ** meta, gpointer user_data)
458 MetaListRepresentation *repr = user_data;
461 repr->info = g_realloc (repr->info, repr->n_meta * sizeof (MetaBuildInfo));
462 repr->info[repr->n_meta - 1].bytes =
465 /* 4 byte GstMetaFlags */
467 /* GstMetaInfo::api */
468 + 4 + strlen (g_type_name ((*meta)->info->api)) + 1
469 /* GstMetaInfo::size */
474 repr->info[repr->n_meta - 1].flags = (*meta)->flags;
475 repr->info[repr->n_meta - 1].api = (*meta)->info->api;
476 repr->info[repr->n_meta - 1].size = (*meta)->info->size;
477 repr->info[repr->n_meta - 1].str = NULL;
479 /* GstMeta is a base class, and actual useful classes are all different...
480 So we list a few of them we know we want and ignore the open ended rest */
481 if ((*meta)->info->api == GST_PROTECTION_META_API_TYPE) {
482 GstProtectionMeta *m = (GstProtectionMeta *) * meta;
483 repr->info[repr->n_meta - 1].str = gst_structure_to_string (m->info);
484 repr->info[repr->n_meta - 1].bytes +=
485 strlen (repr->info[repr->n_meta - 1].str) + 1;
486 GST_TRACE_OBJECT (repr->comm->element, "Found GstMeta type %s: %s",
487 g_type_name ((*meta)->info->api), repr->info[repr->n_meta - 1].str);
489 GST_WARNING_OBJECT (repr->comm->element, "Ignoring GstMeta type %s",
490 g_type_name ((*meta)->info->api));
492 repr->total_bytes += repr->info[repr->n_meta - 1].bytes;
504 } CommBufferMetadata;
507 gst_ipc_pipeline_comm_write_buffer_to_fd (GstIpcPipelineComm * comm,
510 const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER;
512 guint32 ret32 = GST_FLOW_OK;
514 CommBufferMetadata meta;
516 MetaListRepresentation repr = { comm, 0, 4, NULL }; /* starts a 4 for n_meta */
519 g_mutex_lock (&comm->mutex);
522 GST_TRACE_OBJECT (comm->element, "Writing buffer %u: %" GST_PTR_FORMAT,
523 comm->send_id, buffer);
525 gst_byte_writer_init (&bw);
527 meta.pts = GST_BUFFER_PTS (buffer);
528 meta.dts = GST_BUFFER_DTS (buffer);
529 meta.duration = GST_BUFFER_DURATION (buffer);
530 meta.offset = GST_BUFFER_OFFSET (buffer);
531 meta.offset_end = GST_BUFFER_OFFSET_END (buffer);
532 meta.flags = GST_BUFFER_FLAGS (buffer);
534 /* work out meta size */
535 gst_buffer_foreach_meta (buffer, build_meta, &repr);
537 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
539 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
542 gst_buffer_get_size (buffer) + sizeof (guint32) +
543 sizeof (CommBufferMetadata) + repr.total_bytes;
544 if (!gst_byte_writer_put_uint32_le (&bw, size))
546 if (!gst_byte_writer_put_data (&bw, (const guint8 *) &meta, sizeof (meta)))
548 size = gst_buffer_get_size (buffer);
549 if (!gst_byte_writer_put_uint32_le (&bw, size))
551 if (!write_byte_writer_to_fd (comm, &bw))
554 if (!gst_buffer_map (buffer, &map, GST_MAP_READ))
556 ret = write_to_fd_raw (comm, map.data, map.size);
557 gst_buffer_unmap (buffer, &map);
562 gst_byte_writer_init (&bw);
563 if (!gst_byte_writer_put_uint32_le (&bw, repr.n_meta))
565 for (n = 0; n < repr.n_meta; ++n) {
566 const MetaBuildInfo *info = repr.info + n;
570 if (!gst_byte_writer_put_uint32_le (&bw, info->bytes))
573 if (!gst_byte_writer_put_uint32_le (&bw, info->flags))
576 s = g_type_name (info->api);
577 len = strlen (s) + 1;
578 if (!gst_byte_writer_put_uint32_le (&bw, len))
580 if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len))
583 if (!gst_byte_writer_put_uint64_le (&bw, info->size))
587 len = s ? (strlen (s) + 1) : 0;
588 if (!gst_byte_writer_put_uint32_le (&bw, len))
591 if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len))
595 if (!write_byte_writer_to_fd (comm, &bw))
598 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
599 ACK_TYPE_BLOCKING, COMM_REQUEST_TYPE_BUFFER))
604 g_mutex_unlock (&comm->mutex);
605 gst_byte_writer_reset (&bw);
606 for (n = 0; n < repr.n_meta; ++n)
607 g_free (repr.info[n].str);
612 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
613 ("Failed to write to socket"));
614 ret = GST_FLOW_COMM_ERROR;
618 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
619 ("Failed to wait for reply on socket"));
620 ret = GST_FLOW_COMM_ERROR;
624 GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
625 ("Failed to map buffer"));
626 ret = GST_FLOW_ERROR;
631 gst_ipc_pipeline_comm_read_buffer (GstIpcPipelineComm * comm, guint32 size)
634 CommBufferMetadata meta;
636 const guint8 *payload = NULL;
637 guint32 mapped_size, buffer_data_size;
639 /* this should not be called if we don't have enough yet */
640 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
641 g_return_val_if_fail (size >= sizeof (CommBufferMetadata), NULL);
643 mapped_size = sizeof (CommBufferMetadata) + sizeof (buffer_data_size);
644 payload = gst_adapter_map (comm->adapter, mapped_size);
647 memcpy (&meta, payload, sizeof (CommBufferMetadata));
648 payload += sizeof (CommBufferMetadata);
649 memcpy (&buffer_data_size, payload, sizeof (buffer_data_size));
651 gst_adapter_unmap (comm->adapter);
652 gst_adapter_flush (comm->adapter, mapped_size);
654 if (buffer_data_size == 0) {
655 buffer = gst_buffer_new ();
657 buffer = gst_adapter_get_buffer (comm->adapter, buffer_data_size);
658 gst_adapter_flush (comm->adapter, buffer_data_size);
660 size -= buffer_data_size;
662 GST_BUFFER_PTS (buffer) = meta.pts;
663 GST_BUFFER_DTS (buffer) = meta.dts;
664 GST_BUFFER_DURATION (buffer) = meta.duration;
665 GST_BUFFER_OFFSET (buffer) = meta.offset;
666 GST_BUFFER_OFFSET_END (buffer) = meta.offset_end;
667 GST_BUFFER_FLAGS (buffer) = meta.flags;
669 /* If you don't call that, the GType isn't yet known at the
670 g_type_from_name below */
671 gst_protection_meta_get_info ();
674 payload = gst_adapter_map (comm->adapter, mapped_size);
676 gst_buffer_unref (buffer);
679 memcpy (&n_meta, payload, sizeof (n_meta));
680 payload += sizeof (n_meta);
682 for (n = 0; n < n_meta; ++n) {
683 guint32 flags, len, bytes;
687 GstStructure *structure = NULL;
689 memcpy (&bytes, payload, sizeof (bytes));
690 payload += sizeof (bytes);
692 #define READ_FIELD(f) do { \
693 memcpy (&f, payload, sizeof (f)); \
694 payload += sizeof(f); \
699 api = g_type_from_name ((const char *) payload);
700 payload = (const guint8 *) strchr ((const char *) payload, 0) + 1;
704 structure = gst_structure_new_from_string ((const char *) payload);
708 /* Seems we can add a meta from the api nor type ? */
709 if (api == GST_PROTECTION_META_API_TYPE) {
711 gst_buffer_add_meta (buffer, gst_protection_meta_get_info (), NULL);
712 ((GstProtectionMeta *) meta)->info = structure;
714 GST_WARNING_OBJECT (comm->element, "Unsupported meta: %s",
717 gst_structure_free (structure);
724 gst_adapter_unmap (comm->adapter);
725 gst_adapter_flush (comm->adapter, mapped_size);
731 gst_ipc_pipeline_comm_write_sink_message_event_to_fd (GstIpcPipelineComm * comm,
734 const unsigned char payload_type =
735 GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT;
737 guint32 type, size, eseqnum, mseqnum, ret32 = TRUE, slen, structure_slen;
739 const GstStructure *structure;
740 GstMessage *message = NULL;
744 g_return_val_if_fail (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE,
747 g_mutex_lock (&comm->mutex);
750 GST_TRACE_OBJECT (comm->element,
751 "Writing sink message event %u: %" GST_PTR_FORMAT, comm->send_id, event);
753 gst_byte_writer_init (&bw);
754 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
756 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
758 name = gst_structure_get_name (gst_event_get_structure (event));
759 slen = strlen (name) + 1;
760 gst_event_parse_sink_message (event, &message);
761 structure = gst_message_get_structure (message);
763 str = gst_structure_to_string (structure);
764 structure_slen = strlen (str);
769 size = sizeof (type) + sizeof (eseqnum) + sizeof (mseqnum) + sizeof (slen) +
770 strlen (name) + 1 + structure_slen + 1;
771 if (!gst_byte_writer_put_uint32_le (&bw, size))
774 type = GST_MESSAGE_TYPE (message);
775 if (!gst_byte_writer_put_uint32_le (&bw, type))
777 size -= sizeof (type);
779 eseqnum = GST_EVENT_SEQNUM (event);
780 if (!gst_byte_writer_put_uint32_le (&bw, eseqnum))
782 size -= sizeof (eseqnum);
784 mseqnum = GST_MESSAGE_SEQNUM (message);
785 if (!gst_byte_writer_put_uint32_le (&bw, mseqnum))
787 size -= sizeof (mseqnum);
789 if (!gst_byte_writer_put_uint32_le (&bw, slen))
791 size -= sizeof (slen);
793 if (!gst_byte_writer_put_data (&bw, (const guint8 *) name, slen))
798 if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size))
801 if (!gst_byte_writer_put_uint8 (&bw, 0))
805 if (!write_byte_writer_to_fd (comm, &bw))
808 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
809 GST_EVENT_IS_SERIALIZED (event) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED,
810 COMM_REQUEST_TYPE_EVENT))
816 g_mutex_unlock (&comm->mutex);
817 gst_byte_writer_reset (&bw);
820 gst_message_unref (message);
824 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
825 ("Failed to write to socket"));
831 gst_ipc_pipeline_comm_read_sink_message_event (GstIpcPipelineComm * comm,
835 GstEvent *event = NULL;
837 GstStructure *structure;
838 guint32 type, eseqnum, mseqnum, slen;
840 guint32 mapped_size = size;
841 const guint8 *payload;
843 /* this should not be called if we don't have enough yet */
844 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
845 g_return_val_if_fail (size >= sizeof (type) + sizeof (slen), NULL);
847 payload = gst_adapter_map (comm->adapter, mapped_size);
850 memcpy (&type, payload, sizeof (type));
851 payload += sizeof (type);
852 size -= sizeof (type);
856 memcpy (&eseqnum, payload, sizeof (eseqnum));
857 payload += sizeof (eseqnum);
858 size -= sizeof (eseqnum);
862 memcpy (&mseqnum, payload, sizeof (mseqnum));
863 payload += sizeof (mseqnum);
864 size -= sizeof (mseqnum);
868 memcpy (&slen, payload, sizeof (slen));
869 payload += sizeof (slen);
870 size -= sizeof (slen);
874 if (payload[slen - 1])
876 name = (const char *) payload;
880 if ((payload)[size - 1]) {
884 structure = gst_structure_from_string ((const char *) payload, &end);
890 gst_message_new_custom (type, GST_OBJECT (comm->element), structure);
891 gst_message_set_seqnum (message, mseqnum);
892 event = gst_event_new_sink_message (name, message);
893 gst_event_set_seqnum (event, eseqnum);
894 gst_message_unref (message);
897 gst_adapter_unmap (comm->adapter);
898 gst_adapter_flush (comm->adapter, mapped_size);
903 gst_ipc_pipeline_comm_write_event_to_fd (GstIpcPipelineComm * comm,
904 gboolean upstream, GstEvent * event)
906 const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT;
908 guint32 type, size, ret32 = TRUE, seqnum, slen;
910 const GstStructure *structure;
913 /* we special case sink-message event as gst can't serialize/de-serialize it */
914 if (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE)
915 return gst_ipc_pipeline_comm_write_sink_message_event_to_fd (comm, event);
917 g_mutex_lock (&comm->mutex);
920 GST_TRACE_OBJECT (comm->element, "Writing event %u: %" GST_PTR_FORMAT,
921 comm->send_id, event);
923 gst_byte_writer_init (&bw);
924 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
926 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
928 structure = gst_event_get_structure (event);
931 if (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START) {
932 GstStructure *s = gst_structure_copy (structure);
933 gst_structure_remove_field (s, "stream");
934 str = gst_structure_to_string (s);
935 gst_structure_free (s);
937 str = gst_structure_to_string (structure);
945 size = sizeof (type) + sizeof (seqnum) + 1 + slen + 1;
946 if (!gst_byte_writer_put_uint32_le (&bw, size))
949 type = GST_EVENT_TYPE (event);
950 if (!gst_byte_writer_put_uint32_le (&bw, type))
953 seqnum = GST_EVENT_SEQNUM (event);
954 if (!gst_byte_writer_put_uint32_le (&bw, seqnum))
957 if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0))
961 if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1))
964 if (!gst_byte_writer_put_uint8 (&bw, 0))
968 if (!write_byte_writer_to_fd (comm, &bw))
971 /* Upstream events get serialized, this is required to send seeks only
973 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
974 (GST_EVENT_IS_SERIALIZED (event) || GST_EVENT_IS_UPSTREAM (event)) ?
975 ACK_TYPE_BLOCKING : ACK_TYPE_NONE, COMM_REQUEST_TYPE_EVENT))
980 g_mutex_unlock (&comm->mutex);
982 gst_byte_writer_reset (&bw);
986 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
987 ("Failed to write to socket"));
993 gst_ipc_pipeline_comm_read_event (GstIpcPipelineComm * comm, guint32 size,
996 GstEvent *event = NULL;
998 GstStructure *structure;
999 guint32 type, seqnum;
1000 guint32 mapped_size = size;
1001 const guint8 *payload;
1003 /* this should not be called if we don't have enough yet */
1004 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1005 g_return_val_if_fail (size >= sizeof (type), NULL);
1007 payload = gst_adapter_map (comm->adapter, mapped_size);
1011 memcpy (&type, payload, sizeof (type));
1012 payload += sizeof (type);
1013 size -= sizeof (type);
1017 memcpy (&seqnum, payload, sizeof (seqnum));
1018 payload += sizeof (seqnum);
1019 size -= sizeof (seqnum);
1023 *upstream = (*payload) ? TRUE : FALSE;
1029 if (payload[size - 1])
1032 structure = gst_structure_from_string ((const char *) payload, &end);
1037 event = gst_event_new_custom (type, structure);
1038 gst_event_set_seqnum (event, seqnum);
1041 gst_adapter_unmap (comm->adapter);
1042 gst_adapter_flush (comm->adapter, mapped_size);
1047 gst_ipc_pipeline_comm_write_query_to_fd (GstIpcPipelineComm * comm,
1048 gboolean upstream, GstQuery * query)
1050 const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY;
1052 guint32 type, size, ret32 = TRUE, slen;
1054 const GstStructure *structure;
1057 g_mutex_lock (&comm->mutex);
1060 GST_TRACE_OBJECT (comm->element, "Writing query %u: %" GST_PTR_FORMAT,
1061 comm->send_id, query);
1063 gst_byte_writer_init (&bw);
1064 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1066 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1068 structure = gst_query_get_structure (query);
1070 str = gst_structure_to_string (structure);
1071 slen = strlen (str);
1076 size = sizeof (type) + 1 + slen + 1;
1077 if (!gst_byte_writer_put_uint32_le (&bw, size))
1080 type = GST_QUERY_TYPE (query);
1081 if (!gst_byte_writer_put_uint32_le (&bw, type))
1084 if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0))
1088 if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1))
1091 if (!gst_byte_writer_put_uint8 (&bw, 0))
1095 if (!write_byte_writer_to_fd (comm, &bw))
1098 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, query, &ret32,
1099 GST_QUERY_IS_SERIALIZED (query) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED,
1100 COMM_REQUEST_TYPE_QUERY))
1106 g_mutex_unlock (&comm->mutex);
1108 gst_byte_writer_reset (&bw);
1112 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1113 ("Failed to write to socket"));
1119 gst_ipc_pipeline_comm_read_query (GstIpcPipelineComm * comm, guint32 size,
1120 gboolean * upstream)
1122 GstQuery *query = NULL;
1124 GstStructure *structure;
1126 guint32 mapped_size = size;
1127 const guint8 *payload;
1129 /* this should not be called if we don't have enough yet */
1130 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1131 g_return_val_if_fail (size >= sizeof (type), NULL);
1133 payload = gst_adapter_map (comm->adapter, mapped_size);
1137 memcpy (&type, payload, sizeof (type));
1138 payload += sizeof (type);
1139 size -= sizeof (type);
1143 *upstream = (*payload) ? TRUE : FALSE;
1149 if (payload[size - 1])
1152 structure = gst_structure_from_string ((const char *) payload, &end);
1157 query = gst_query_new_custom (type, structure);
1159 /* CAPS queries contain a filter field, of GstCaps type, which can be NULL.
1160 This does not play well with the serialization/deserialization system,
1161 which will give us a non-NULL GstCaps which has a value of NULL. This
1162 in turn wreaks havoc with any code that tests whether filter is NULL
1163 (which basically means, am I being given an optional GstCaps ?).
1164 So we look for non-NULL GstCaps which have NULL contents, and replace
1165 them with NULL instead. */
1166 if (GST_QUERY_TYPE (query) == GST_QUERY_CAPS) {
1168 gst_query_parse_caps (query, &filter);
1170 && !strcmp (gst_structure_get_name (gst_caps_get_structure (filter, 0)),
1172 gst_query_unref (query);
1173 query = gst_query_new_caps (NULL);
1178 gst_adapter_unmap (comm->adapter);
1179 gst_adapter_flush (comm->adapter, mapped_size);
1183 GstStateChangeReturn
1184 gst_ipc_pipeline_comm_write_state_change_to_fd (GstIpcPipelineComm * comm,
1185 GstStateChange transition)
1187 const unsigned char payload_type =
1188 GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE;
1189 GstStateChangeReturn ret;
1190 guint32 size, ret32 = GST_STATE_CHANGE_SUCCESS;
1193 g_mutex_lock (&comm->mutex);
1196 GST_TRACE_OBJECT (comm->element, "Writing state change %u: %s -> %s",
1198 gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
1199 gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
1201 gst_byte_writer_init (&bw);
1202 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1204 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1206 size = sizeof (transition);
1207 if (!gst_byte_writer_put_uint32_le (&bw, size))
1209 if (!gst_byte_writer_put_uint32_le (&bw, transition))
1212 if (!write_byte_writer_to_fd (comm, &bw))
1215 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1216 ACK_TYPE_TIMED, COMM_REQUEST_TYPE_STATE_CHANGE))
1221 g_mutex_unlock (&comm->mutex);
1222 gst_byte_writer_reset (&bw);
1226 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1227 ("Failed to write to socket"));
1228 ret = GST_STATE_CHANGE_FAILURE;
1233 is_valid_state_change (GstStateChange transition)
1235 if (transition == GST_STATE_CHANGE_NULL_TO_READY)
1237 if (transition == GST_STATE_CHANGE_READY_TO_PAUSED)
1239 if (transition == GST_STATE_CHANGE_PAUSED_TO_PLAYING)
1241 if (transition == GST_STATE_CHANGE_PLAYING_TO_PAUSED)
1243 if (transition == GST_STATE_CHANGE_PAUSED_TO_READY)
1245 if (transition == GST_STATE_CHANGE_READY_TO_NULL)
1247 if (GST_STATE_TRANSITION_CURRENT (transition) ==
1248 GST_STATE_TRANSITION_NEXT (transition))
1254 gst_ipc_pipeline_comm_read_state_change (GstIpcPipelineComm * comm,
1255 guint32 size, guint32 * transition)
1257 guint32 mapped_size = size;
1258 const guint8 *payload;
1260 /* this should not be called if we don't have enough yet */
1261 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE);
1262 g_return_val_if_fail (size >= sizeof (*transition), FALSE);
1264 payload = gst_adapter_map (comm->adapter, size);
1267 memcpy (transition, payload, sizeof (*transition));
1268 gst_adapter_unmap (comm->adapter);
1269 gst_adapter_flush (comm->adapter, mapped_size);
1270 return is_valid_state_change (*transition);
1274 gst_ipc_pipeline_comm_write_state_lost_to_fd (GstIpcPipelineComm * comm)
1276 const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST;
1280 g_mutex_lock (&comm->mutex);
1283 GST_TRACE_OBJECT (comm->element, "Writing state-lost %u", comm->send_id);
1284 gst_byte_writer_init (&bw);
1285 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1287 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1290 if (!gst_byte_writer_put_uint32_le (&bw, size))
1293 if (!write_byte_writer_to_fd (comm, &bw))
1297 g_mutex_unlock (&comm->mutex);
1298 gst_byte_writer_reset (&bw);
1302 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1303 ("Failed to write to socket"));
1308 gst_ipc_pipeline_comm_read_state_lost (GstIpcPipelineComm * comm, guint32 size)
1315 gst_ipc_pipeline_comm_write_gerror_message_to_fd (GstIpcPipelineComm * comm,
1316 GstMessage * message)
1318 const unsigned char payload_type =
1319 GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE;
1321 guint32 code, size, ret32 = TRUE;
1324 char *extra_message;
1325 const char *domain_string;
1326 unsigned char msgtype;
1329 g_mutex_lock (&comm->mutex);
1332 if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR) {
1333 gst_message_parse_error (message, &error, &extra_message);
1335 } else if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING) {
1336 gst_message_parse_warning (message, &error, &extra_message);
1339 gst_message_parse_info (message, &error, &extra_message);
1343 domain_string = g_quark_to_string (error->domain);
1344 GST_TRACE_OBJECT (comm->element,
1345 "Writing error %u: domain %s, code %u, message %s, extra message %s",
1346 comm->send_id, domain_string, error->code, error->message, extra_message);
1348 gst_byte_writer_init (&bw);
1349 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1351 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1354 size = sizeof (size);
1356 size += strlen (domain_string) + 1;
1357 size += sizeof (code);
1358 size += sizeof (size);
1359 size += error->message ? strlen (error->message) + 1 : 0;
1360 size += sizeof (size);
1361 size += extra_message ? strlen (extra_message) + 1 : 0;
1363 if (!gst_byte_writer_put_uint32_le (&bw, size))
1366 if (!gst_byte_writer_put_uint8 (&bw, msgtype))
1368 size = strlen (domain_string) + 1;
1369 if (!gst_byte_writer_put_uint32_le (&bw, size))
1371 if (!gst_byte_writer_put_data (&bw, (const guint8 *) domain_string, size))
1373 if (!gst_byte_writer_put_uint32_le (&bw, code))
1375 size = error->message ? strlen (error->message) + 1 : 0;
1376 if (!gst_byte_writer_put_uint32_le (&bw, size))
1378 if (error->message) {
1379 if (!gst_byte_writer_put_data (&bw, (const guint8 *) error->message, size))
1382 size = extra_message ? strlen (extra_message) + 1 : 0;
1383 if (!gst_byte_writer_put_uint32_le (&bw, size))
1385 if (extra_message) {
1386 if (!gst_byte_writer_put_data (&bw, (const guint8 *) extra_message, size))
1390 if (!write_byte_writer_to_fd (comm, &bw))
1393 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1394 ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE))
1400 g_mutex_unlock (&comm->mutex);
1402 g_error_free (error);
1403 g_free (extra_message);
1404 gst_byte_writer_reset (&bw);
1408 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1409 ("Failed to write to socket"));
1415 gst_ipc_pipeline_comm_read_gerror_message (GstIpcPipelineComm * comm,
1418 GstMessage *message = NULL;
1421 const char *msg, *extra_message;
1423 unsigned char msgtype;
1424 guint32 mapped_size = size;
1425 const guint8 *payload;
1427 /* this should not be called if we don't have enough yet */
1428 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1429 g_return_val_if_fail (size >= sizeof (code) + sizeof (size) * 3 + 1 + 1,
1432 payload = gst_adapter_map (comm->adapter, mapped_size);
1435 msgtype = *payload++;
1436 memcpy (&size, payload, sizeof (size));
1437 payload += sizeof (size);
1438 if (payload[size - 1])
1440 domain = g_quark_from_string ((const char *) payload);
1443 memcpy (&code, payload, sizeof (code));
1444 payload += sizeof (code);
1446 memcpy (&size, payload, sizeof (size));
1447 payload += sizeof (size);
1449 if (payload[size - 1])
1451 msg = (const char *) payload;
1457 memcpy (&size, payload, sizeof (size));
1458 payload += sizeof (size);
1460 if (payload[size - 1])
1462 extra_message = (const char *) payload;
1464 extra_message = NULL;
1468 error = g_error_new (domain, code, "%s", msg);
1471 gst_message_new_error (GST_OBJECT (comm->element), error,
1473 else if (msgtype == 1)
1475 gst_message_new_warning (GST_OBJECT (comm->element), error,
1479 gst_message_new_info (GST_OBJECT (comm->element), error, extra_message);
1480 g_error_free (error);
1483 gst_adapter_unmap (comm->adapter);
1484 gst_adapter_flush (comm->adapter, mapped_size);
1490 gst_ipc_pipeline_comm_write_message_to_fd (GstIpcPipelineComm * comm,
1491 GstMessage * message)
1493 const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE;
1495 guint32 type, size, ret32 = TRUE, slen;
1497 const GstStructure *structure;
1500 /* we special case error as gst can't serialize/de-serialize it */
1501 if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR
1502 || GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING
1503 || GST_MESSAGE_TYPE (message) == GST_MESSAGE_INFO)
1504 return gst_ipc_pipeline_comm_write_gerror_message_to_fd (comm, message);
1506 g_mutex_lock (&comm->mutex);
1509 GST_TRACE_OBJECT (comm->element, "Writing message %u: %" GST_PTR_FORMAT,
1510 comm->send_id, message);
1512 gst_byte_writer_init (&bw);
1513 if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1515 if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1517 structure = gst_message_get_structure (message);
1519 str = gst_structure_to_string (structure);
1520 slen = strlen (str);
1525 size = sizeof (type) + slen + 1;
1526 if (!gst_byte_writer_put_uint32_le (&bw, size))
1529 type = GST_MESSAGE_TYPE (message);
1530 if (!gst_byte_writer_put_uint32_le (&bw, type))
1532 size -= sizeof (type);
1534 if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size))
1537 if (!gst_byte_writer_put_uint8 (&bw, 0))
1541 if (!write_byte_writer_to_fd (comm, &bw))
1544 if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1545 ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE))
1551 g_mutex_unlock (&comm->mutex);
1553 gst_byte_writer_reset (&bw);
1557 GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1558 ("Failed to write to socket"));
1564 gst_ipc_pipeline_comm_read_message (GstIpcPipelineComm * comm, guint32 size)
1566 GstMessage *message = NULL;
1568 GstStructure *structure;
1570 guint32 mapped_size = size;
1571 const guint8 *payload;
1573 /* this should not be called if we don't have enough yet */
1574 g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1575 g_return_val_if_fail (size >= sizeof (type), NULL);
1577 payload = gst_adapter_map (comm->adapter, mapped_size);
1580 memcpy (&type, payload, sizeof (type));
1581 payload += sizeof (type);
1582 size -= sizeof (type);
1586 if (payload[size - 1])
1589 structure = gst_structure_from_string ((const char *) payload, &end);
1595 gst_message_new_custom (type, GST_OBJECT (comm->element), structure);
1598 gst_adapter_unmap (comm->adapter);
1599 gst_adapter_flush (comm->adapter, mapped_size);
1605 gst_ipc_pipeline_comm_init (GstIpcPipelineComm * comm, GstElement * element)
1607 g_mutex_init (&comm->mutex);
1608 comm->element = element;
1609 comm->fdin = comm->fdout = -1;
1610 comm->ack_time = DEFAULT_ACK_TIME;
1612 g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
1613 (GDestroyNotify) comm_request_free);
1614 comm->adapter = gst_adapter_new ();
1615 comm->poll = gst_poll_new (TRUE);
1616 gst_poll_fd_init (&comm->pollFDin);
1620 gst_ipc_pipeline_comm_clear (GstIpcPipelineComm * comm)
1622 g_hash_table_destroy (comm->waiting_ids);
1623 gst_object_unref (comm->adapter);
1624 gst_poll_free (comm->poll);
1625 g_mutex_clear (&comm->mutex);
1629 cancel_request (gpointer key, gpointer value, gpointer user_data,
1632 GstIpcPipelineComm *comm = (GstIpcPipelineComm *) user_data;
1633 guint32 id = GPOINTER_TO_INT (key);
1634 CommRequest *req = (CommRequest *) value;
1636 GST_TRACE_OBJECT (comm->element, "Cancelling request %u, type %d", id,
1639 req->replied = TRUE;
1640 g_cond_signal (&req->cond);
1644 cancel_request_error (gpointer key, gpointer value, gpointer user_data)
1646 CommRequest *req = (CommRequest *) value;
1647 GstFlowReturn fret = comm_request_ret_get_failure_value (req->type);
1649 cancel_request (key, value, user_data, fret);
1653 gst_ipc_pipeline_comm_cancel (GstIpcPipelineComm * comm, gboolean cleanup)
1655 g_mutex_lock (&comm->mutex);
1656 g_hash_table_foreach (comm->waiting_ids, cancel_request_error, comm);
1658 g_hash_table_unref (comm->waiting_ids);
1660 g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
1661 (GDestroyNotify) comm_request_free);
1663 g_mutex_unlock (&comm->mutex);
1667 set_field (GQuark field_id, const GValue * value, gpointer user_data)
1669 GstStructure *structure = user_data;
1671 gst_structure_id_set_value (structure, field_id, value);
1677 gst_ipc_pipeline_comm_reply_request (GstIpcPipelineComm * comm, guint32 id,
1678 GstFlowReturn ret, GstQuery * query)
1682 req = g_hash_table_lookup (comm->waiting_ids, GINT_TO_POINTER (id));
1684 GST_WARNING_OBJECT (comm->element, "Got reply for unknown request %u", id);
1688 GST_TRACE_OBJECT (comm->element, "Got reply %d (%s) for request %u", ret,
1689 comm_request_ret_get_name (req->type, ret), req->id);
1690 req->replied = TRUE;
1694 /* We need to update the original query in place, as the caller
1695 will expect the object to be the same */
1696 GstStructure *structure = gst_query_writable_structure (req->query);
1697 gst_structure_remove_all_fields (structure);
1698 gst_structure_foreach (gst_query_get_structure (query), set_field,
1701 GST_WARNING_OBJECT (comm->element,
1702 "Got query reply, but no query was in the request");
1705 g_cond_signal (&req->cond);
1710 update_adapter (GstIpcPipelineComm * comm)
1712 GstMemory *mem = NULL;
1719 /* update pollFDin if necessary (fdin changed or we lost our parent).
1720 * we do not allow a parent-less element to communicate with its peer
1721 * in order to avoid race conditions where the slave tries to change
1722 * the state of its parent pipeline while it is not yet added in that
1724 if (comm->pollFDin.fd != comm->fdin || !GST_OBJECT_PARENT (comm->element)) {
1725 if (comm->pollFDin.fd != -1) {
1726 GST_DEBUG_OBJECT (comm->element, "Stop watching fd %d",
1728 gst_poll_remove_fd (comm->poll, &comm->pollFDin);
1729 gst_poll_fd_init (&comm->pollFDin);
1731 if (comm->fdin != -1 && GST_OBJECT_PARENT (comm->element)) {
1732 GST_DEBUG_OBJECT (comm->element, "Start watching fd %d", comm->fdin);
1733 comm->pollFDin.fd = comm->fdin;
1734 gst_poll_add_fd (comm->poll, &comm->pollFDin);
1735 gst_poll_fd_ctl_read (comm->poll, &comm->pollFDin, TRUE);
1739 /* wait for activity on fdin or a flush */
1740 if (gst_poll_wait (comm->poll, 100 * GST_MSECOND) < 0) {
1741 if (errno == EAGAIN)
1743 /* error out, unless interrupted or flushing */
1745 ret = (errno == EBUSY) ? 2 : 1;
1748 /* read from fdin if possible and push data to our adapter */
1749 if (comm->pollFDin.fd >= 0
1750 && gst_poll_fd_can_read (comm->poll, &comm->pollFDin)) {
1752 mem = gst_allocator_alloc (NULL, comm->read_chunk_size, NULL);
1754 gst_memory_map (mem, &map, GST_MAP_WRITE);
1755 sz = read (comm->pollFDin.fd, map.data, map.size);
1756 gst_memory_unmap (mem, &map);
1759 if (errno == EAGAIN)
1761 /* error out, unless interrupted */
1765 gst_memory_resize (mem, 0, sz);
1766 buf = gst_buffer_new ();
1767 gst_buffer_append_memory (buf, mem);
1769 GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz);
1770 gst_adapter_push (comm->adapter, buf);
1775 gst_memory_unref (mem);
1781 read_many (GstIpcPipelineComm * comm)
1783 gboolean ret = TRUE;
1785 const guint8 *payload;
1788 switch (comm->state) {
1789 case GST_IPC_PIPELINE_COMM_STATE_TYPE:
1792 guint32 mapped_size;
1794 available = gst_adapter_available (comm->adapter);
1795 mapped_size = 1 + sizeof (gint32) * 2;
1796 if (available < mapped_size)
1799 payload = gst_adapter_map (comm->adapter, mapped_size);
1801 g_mutex_lock (&comm->mutex);
1802 memcpy (&comm->id, payload, sizeof (guint32));
1803 memcpy (&comm->payload_length, payload + 4, sizeof (guint32));
1804 g_mutex_unlock (&comm->mutex);
1805 gst_adapter_unmap (comm->adapter);
1806 gst_adapter_flush (comm->adapter, mapped_size);
1807 GST_TRACE_OBJECT (comm->element, "Got id %u, type %d, payload %u",
1808 comm->id, type, comm->payload_length);
1810 case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
1811 case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
1812 case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
1813 case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
1814 case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
1815 case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
1816 case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
1817 case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
1818 case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
1819 case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
1820 GST_TRACE_OBJECT (comm->element, "switching to state %s",
1821 gst_ipc_pipeline_comm_data_type_get_name (type));
1829 case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
1834 available = gst_adapter_available (comm->adapter);
1835 if (available < comm->payload_length)
1838 if (available < sizeof (guint32))
1841 rets = gst_adapter_map (comm->adapter, sizeof (guint32));
1842 memcpy (&ret32, rets, sizeof (ret32));
1843 gst_adapter_unmap (comm->adapter);
1844 gst_adapter_flush (comm->adapter, sizeof (guint32));
1845 GST_TRACE_OBJECT (comm->element, "Got ACK %s for id %u",
1846 gst_flow_get_name (ret32), comm->id);
1848 g_mutex_lock (&comm->mutex);
1849 gst_ipc_pipeline_comm_reply_request (comm, comm->id, ret32, NULL);
1850 g_mutex_unlock (&comm->mutex);
1852 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1853 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1856 case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
1858 GstQuery *query = NULL;
1861 available = gst_adapter_available (comm->adapter);
1862 if (available < comm->payload_length)
1866 gst_ipc_pipeline_comm_read_query_result (comm, comm->payload_length,
1869 GST_TRACE_OBJECT (comm->element,
1870 "deserialized query result %p: %d, %" GST_PTR_FORMAT, query, qret,
1873 g_mutex_lock (&comm->mutex);
1874 gst_ipc_pipeline_comm_reply_request (comm, comm->id, qret, query);
1875 g_mutex_unlock (&comm->mutex);
1877 gst_query_unref (query);
1879 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1880 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1883 case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
1887 available = gst_adapter_available (comm->adapter);
1888 if (available < comm->payload_length)
1891 buf = gst_ipc_pipeline_comm_read_buffer (comm, comm->payload_length);
1895 /* set caps and push */
1896 GST_TRACE_OBJECT (comm->element,
1897 "deserialized buffer %p, pushing, timestamp %" GST_TIME_FORMAT
1898 ", duration %" GST_TIME_FORMAT ", offset %" G_GINT64_FORMAT
1899 ", offset_end %" G_GINT64_FORMAT ", size %" G_GSIZE_FORMAT
1900 ", flags 0x%x", buf, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
1901 GST_TIME_ARGS (GST_BUFFER_DURATION (buf)), GST_BUFFER_OFFSET (buf),
1902 GST_BUFFER_OFFSET_END (buf), gst_buffer_get_size (buf),
1903 GST_BUFFER_FLAGS (buf));
1905 gst_mini_object_set_qdata (GST_MINI_OBJECT (buf), QUARK_ID,
1906 GINT_TO_POINTER (comm->id), NULL);
1908 if (comm->on_buffer)
1909 (*comm->on_buffer) (comm->id, buf, comm->user_data);
1911 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1912 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1915 case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
1920 available = gst_adapter_available (comm->adapter);
1921 if (available < comm->payload_length)
1924 event = gst_ipc_pipeline_comm_read_event (comm, comm->payload_length,
1929 GST_TRACE_OBJECT (comm->element, "deserialized event %p of type %s",
1930 event, gst_event_type_get_name (event->type));
1932 gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID,
1933 GINT_TO_POINTER (comm->id), NULL);
1936 (*comm->on_event) (comm->id, event, upstream, comm->user_data);
1938 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1939 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1942 case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
1946 available = gst_adapter_available (comm->adapter);
1947 if (available < comm->payload_length)
1950 event = gst_ipc_pipeline_comm_read_sink_message_event (comm,
1951 comm->payload_length);
1955 GST_TRACE_OBJECT (comm->element, "deserialized sink message event %p",
1958 gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID,
1959 GINT_TO_POINTER (comm->id), NULL);
1962 (*comm->on_event) (comm->id, event, FALSE, comm->user_data);
1964 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1965 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1968 case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
1973 available = gst_adapter_available (comm->adapter);
1974 if (available < comm->payload_length)
1977 query = gst_ipc_pipeline_comm_read_query (comm, comm->payload_length,
1982 GST_TRACE_OBJECT (comm->element, "deserialized query %p of type %s",
1983 query, gst_query_type_get_name (query->type));
1985 gst_mini_object_set_qdata (GST_MINI_OBJECT (query), QUARK_ID,
1986 GINT_TO_POINTER (comm->id), NULL);
1989 (*comm->on_query) (comm->id, query, upstream, comm->user_data);
1991 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1992 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1995 case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
1999 available = gst_adapter_available (comm->adapter);
2000 if (available < comm->payload_length)
2003 if (!gst_ipc_pipeline_comm_read_state_change (comm,
2004 comm->payload_length, &transition))
2005 goto state_change_failed;
2007 GST_TRACE_OBJECT (comm->element,
2008 "deserialized state change request: %s -> %s",
2009 gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT
2011 gst_element_state_get_name (GST_STATE_TRANSITION_NEXT
2014 if (comm->on_state_change)
2015 (*comm->on_state_change) (comm->id, transition, comm->user_data);
2017 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2018 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2021 case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
2023 available = gst_adapter_available (comm->adapter);
2024 if (available < comm->payload_length)
2027 if (!gst_ipc_pipeline_comm_read_state_lost (comm, comm->payload_length))
2030 GST_TRACE_OBJECT (comm->element, "deserialized state-lost");
2032 if (comm->on_state_lost)
2033 (*comm->on_state_lost) (comm->user_data);
2035 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2036 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2039 case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
2041 GstMessage *message;
2043 available = gst_adapter_available (comm->adapter);
2044 if (available < comm->payload_length)
2047 message = gst_ipc_pipeline_comm_read_message (comm,
2048 comm->payload_length);
2050 goto message_failed;
2052 GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s",
2053 message, gst_message_type_get_name (message->type));
2055 if (comm->on_message)
2056 (*comm->on_message) (comm->id, message, comm->user_data);
2058 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2059 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2062 case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
2064 GstMessage *message;
2066 available = gst_adapter_available (comm->adapter);
2067 if (available < comm->payload_length)
2070 message = gst_ipc_pipeline_comm_read_gerror_message (comm,
2071 comm->payload_length);
2073 goto message_failed;
2075 GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s",
2076 message, gst_message_type_get_name (message->type));
2078 if (comm->on_message)
2079 (*comm->on_message) (comm->id, message, comm->user_data);
2081 GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2082 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2093 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2094 ("Socket out of sync"));
2098 state_change_failed:
2100 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2101 ("could not read state change from fd"));
2107 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2108 ("could not read ack from fd"));
2114 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2115 ("could not read buffer from fd"));
2121 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2122 ("could not read event from fd"));
2128 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2129 ("could not read message from fd"));
2135 GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2136 ("could not read query from fd"));
2143 reader_thread (gpointer data)
2145 GstIpcPipelineComm *comm = (GstIpcPipelineComm *) data;
2146 gboolean running = TRUE;
2150 ret = update_adapter (comm);
2153 GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
2154 ("Failed to read from socket"));
2158 GST_INFO_OBJECT (comm->element, "We're stopping, all good");
2167 GST_INFO_OBJECT (comm->element, "Reader thread ending");
2172 gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
2173 void (*on_buffer) (guint32, GstBuffer *, gpointer),
2174 void (*on_event) (guint32, GstEvent *, gboolean, gpointer),
2175 void (*on_query) (guint32, GstQuery *, gboolean, gpointer),
2176 void (*on_state_change) (guint32, GstStateChange, gpointer),
2177 void (*on_state_lost) (gpointer),
2178 void (*on_message) (guint32, GstMessage *, gpointer), gpointer user_data)
2180 if (comm->reader_thread)
2183 comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2184 comm->on_buffer = on_buffer;
2185 comm->on_event = on_event;
2186 comm->on_query = on_query;
2187 comm->on_state_change = on_state_change;
2188 comm->on_state_lost = on_state_lost;
2189 comm->on_message = on_message;
2190 comm->user_data = user_data;
2191 gst_poll_set_flushing (comm->poll, FALSE);
2192 comm->reader_thread =
2193 g_thread_new ("reader", (GThreadFunc) reader_thread, comm);
2198 gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm)
2200 if (!comm->reader_thread)
2203 gst_poll_set_flushing (comm->poll, TRUE);
2204 g_thread_join (comm->reader_thread);
2205 comm->reader_thread = NULL;
2209 gst_value_serialize_event (const GValue * value)
2211 const GstStructure *structure;
2213 gchar *type, *ts, *seqnum, *rt_offset, *str, *str64, *s;
2214 GValue val = G_VALUE_INIT;
2216 ev = g_value_get_boxed (value);
2218 g_value_init (&val, gst_event_type_get_type ());
2219 g_value_set_enum (&val, ev->type);
2220 type = gst_value_serialize (&val);
2221 g_value_unset (&val);
2223 g_value_init (&val, G_TYPE_UINT64);
2224 g_value_set_uint64 (&val, ev->timestamp);
2225 ts = gst_value_serialize (&val);
2226 g_value_unset (&val);
2228 g_value_init (&val, G_TYPE_UINT);
2229 g_value_set_uint (&val, ev->seqnum);
2230 seqnum = gst_value_serialize (&val);
2231 g_value_unset (&val);
2233 g_value_init (&val, G_TYPE_INT64);
2234 g_value_set_int64 (&val, gst_event_get_running_time_offset (ev));
2235 rt_offset = gst_value_serialize (&val);
2236 g_value_unset (&val);
2238 structure = gst_event_get_structure (ev);
2239 str = gst_structure_to_string (structure);
2240 str64 = g_base64_encode ((guchar *) str, strlen (str) + 1);
2241 g_strdelimit (str64, "=", '_');
2244 s = g_strconcat (type, ":", ts, ":", seqnum, ":", rt_offset, ":", str64,
2257 gst_value_deserialize_event (GValue * dest, const gchar * s)
2259 GstEvent *ev = NULL;
2260 GValue val = G_VALUE_INIT;
2261 gboolean ret = FALSE;
2265 fields = g_strsplit (s, ":", -1);
2266 if (g_strv_length (fields) != 5)
2269 g_strdelimit (fields[4], "_", '=');
2270 g_base64_decode_inplace (fields[4], &len);
2272 g_value_init (&val, gst_event_type_get_type ());
2273 if (!gst_value_deserialize (&val, fields[0]))
2275 ev = gst_event_new_custom (g_value_get_enum (&val),
2276 gst_structure_new_from_string (fields[4]));
2278 g_value_unset (&val);
2279 g_value_init (&val, G_TYPE_UINT64);
2280 if (!gst_value_deserialize (&val, fields[1]))
2282 ev->timestamp = g_value_get_uint64 (&val);
2284 g_value_unset (&val);
2285 g_value_init (&val, G_TYPE_UINT);
2286 if (!gst_value_deserialize (&val, fields[2]))
2288 ev->seqnum = g_value_get_uint (&val);
2290 g_value_unset (&val);
2291 g_value_init (&val, G_TYPE_INT64);
2292 if (!gst_value_deserialize (&val, fields[3]))
2294 gst_event_set_running_time_offset (ev, g_value_get_int64 (&val));
2296 g_value_take_boxed (dest, ev);
2301 g_clear_pointer (&ev, gst_event_unref);
2302 g_value_unset (&val);
2305 g_strfreev (fields);
2309 #define REGISTER_SERIALIZATION_NO_COMPARE(_gtype, _type) \
2311 static GstValueTable gst_value = \
2313 gst_value_serialize_ ## _type, gst_value_deserialize_ ## _type }; \
2314 gst_value.type = _gtype; \
2315 gst_value_register (&gst_value); \
2319 gst_ipc_pipeline_comm_plugin_init (void)
2321 static gsize once = 0;
2323 if (g_once_init_enter (&once)) {
2324 GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_comm_debug, "ipcpipelinecomm", 0,
2325 "ipc pipeline comm");
2326 QUARK_ID = g_quark_from_static_string ("ipcpipeline-id");
2327 REGISTER_SERIALIZATION_NO_COMPARE (gst_event_get_type (), event);
2328 g_once_init_leave (&once, (gsize) 1);