9176f496243fd2266d30440057e3d286343cd179
[platform/upstream/gstreamer.git] / sys / ipcpipeline / gstipcpipelinecomm.c
1 /* GStreamer
2  * Copyright (C) 2015-2017 YouView TV Ltd
3  *   Author: Vincent Penquerch <vincent.penquerch@collabora.co.uk>
4  *
5  * gstipcpipelinecomm.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #  include "config.h"
25 #endif
26
27 #ifdef HAVE_UNISTD_H
28 #  include <unistd.h>
29 #endif
30 #ifdef _MSC_VER
31 /* ssize_t is not available, so match return value of read()/write() on MSVC */
32 #define ssize_t int
33 #endif
34 #include <errno.h>
35 #include <string.h>
36 #include <gst/base/gstbytewriter.h>
37 #include <gst/gstprotection.h>
38 #include "gstipcpipelinecomm.h"
39
40 GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_comm_debug);
41 #define GST_CAT_DEFAULT gst_ipc_pipeline_comm_debug
42
43 #define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
44
45 GQuark QUARK_ID;
46
47 typedef enum
48 {
49   ACK_TYPE_NONE,
50   ACK_TYPE_TIMED,
51   ACK_TYPE_BLOCKING
52 } AckType;
53
54 typedef enum
55 {
56   COMM_REQUEST_TYPE_BUFFER,
57   COMM_REQUEST_TYPE_EVENT,
58   COMM_REQUEST_TYPE_QUERY,
59   COMM_REQUEST_TYPE_STATE_CHANGE,
60   COMM_REQUEST_TYPE_MESSAGE,
61 } CommRequestType;
62
63 typedef struct
64 {
65   guint32 id;
66   gboolean replied;
67   gboolean comm_error;
68   guint32 ret;
69   GstQuery *query;
70   CommRequestType type;
71   GCond cond;
72 } CommRequest;
73
74 static const gchar *comm_request_ret_get_name (CommRequestType type,
75     guint32 ret);
76 static guint32 comm_request_ret_get_failure_value (CommRequestType type);
77
78 static CommRequest *
79 comm_request_new (guint32 id, CommRequestType type, GstQuery * query)
80 {
81   CommRequest *req;
82
83   req = g_malloc (sizeof (CommRequest));
84   req->id = id;
85   g_cond_init (&req->cond);
86   req->replied = FALSE;
87   req->comm_error = FALSE;
88   req->query = query;
89   req->ret = comm_request_ret_get_failure_value (type);
90   req->type = type;
91
92   return req;
93 }
94
95 static guint32
96 comm_request_wait (GstIpcPipelineComm * comm, CommRequest * req,
97     AckType ack_type)
98 {
99   guint32 ret = comm_request_ret_get_failure_value (req->type);
100   guint64 end_time;
101
102   if (ack_type == ACK_TYPE_TIMED)
103     end_time = g_get_monotonic_time () + comm->ack_time;
104   else
105     end_time = G_MAXUINT64;
106
107   GST_TRACE_OBJECT (comm->element, "Waiting for ACK/NAK for request %u",
108       req->id);
109   while (!req->replied) {
110     if (ack_type == ACK_TYPE_TIMED) {
111       if (!g_cond_wait_until (&req->cond, &comm->mutex, end_time))
112         break;
113     } else
114       g_cond_wait (&req->cond, &comm->mutex);
115   }
116
117   if (req->replied) {
118     ret = req->ret;
119     GST_TRACE_OBJECT (comm->element, "Got reply for request %u: %d (%s)",
120         req->id, ret, comm_request_ret_get_name (req->type, ret));
121   } else {
122     req->comm_error = TRUE;
123     GST_ERROR_OBJECT (comm->element, "Timeout waiting for reply for request %u",
124         req->id);
125   }
126
127   return ret;
128 }
129
130 static void
131 comm_request_free (CommRequest * req)
132 {
133   g_cond_clear (&req->cond);
134   g_free (req);
135 }
136
137 static const gchar *
138 comm_request_ret_get_name (CommRequestType type, guint32 ret)
139 {
140   switch (type) {
141     case COMM_REQUEST_TYPE_BUFFER:
142       return gst_flow_get_name (ret);
143     case COMM_REQUEST_TYPE_EVENT:
144     case COMM_REQUEST_TYPE_QUERY:
145     case COMM_REQUEST_TYPE_MESSAGE:
146       return ret ? "TRUE" : "FALSE";
147     case COMM_REQUEST_TYPE_STATE_CHANGE:
148       return gst_element_state_change_return_get_name (ret);
149     default:
150       g_assert_not_reached ();
151   }
152 }
153
154 static guint32
155 comm_request_ret_get_failure_value (CommRequestType type)
156 {
157   switch (type) {
158     case COMM_REQUEST_TYPE_BUFFER:
159       return GST_FLOW_COMM_ERROR;
160     case COMM_REQUEST_TYPE_EVENT:
161     case COMM_REQUEST_TYPE_MESSAGE:
162     case COMM_REQUEST_TYPE_QUERY:
163       return FALSE;
164     case COMM_REQUEST_TYPE_STATE_CHANGE:
165       return GST_STATE_CHANGE_FAILURE;
166     default:
167       g_assert_not_reached ();
168   }
169 }
170
171 static const gchar *
172 gst_ipc_pipeline_comm_data_type_get_name (GstIpcPipelineCommDataType type)
173 {
174   switch (type) {
175     case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
176       return "ACK";
177     case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
178       return "QUERY_RESULT";
179     case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
180       return "BUFFER";
181     case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
182       return "EVENT";
183     case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
184       return "SINK_MESSAGE_EVENT";
185     case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
186       return "QUERY";
187     case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
188       return "STATE_CHANGE";
189     case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
190       return "STATE_LOST";
191     case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
192       return "MESSAGE";
193     case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
194       return "GERROR_MESSAGE";
195     default:
196       return "UNKNOWN";
197   }
198 }
199
200 static gboolean
201 gst_ipc_pipeline_comm_sync_fd (GstIpcPipelineComm * comm, guint32 id,
202     GstQuery * query, guint32 * ret, AckType ack_type, CommRequestType type)
203 {
204   CommRequest *req;
205   gboolean comm_error;
206   GHashTable *waiting_ids;
207
208   if (ack_type == ACK_TYPE_NONE)
209     return TRUE;
210
211   req = comm_request_new (id, type, query);
212   waiting_ids = g_hash_table_ref (comm->waiting_ids);
213   g_hash_table_insert (waiting_ids, GINT_TO_POINTER (id), req);
214   *ret = comm_request_wait (comm, req, ack_type);
215   comm_error = req->comm_error;
216   g_hash_table_remove (waiting_ids, GINT_TO_POINTER (id));
217   g_hash_table_unref (waiting_ids);
218   return !comm_error;
219 }
220
221 static gboolean
222 write_to_fd_raw (GstIpcPipelineComm * comm, const void *data, size_t size)
223 {
224   size_t offset;
225   gboolean ret = TRUE;
226
227   offset = 0;
228   GST_TRACE_OBJECT (comm->element, "Writing %u bytes to fdout",
229       (unsigned) size);
230   while (size) {
231     ssize_t written =
232         write (comm->fdout, (const unsigned char *) data + offset, size);
233     if (written < 0) {
234       if (errno == EAGAIN || errno == EINTR)
235         continue;
236       GST_ERROR_OBJECT (comm->element, "Failed to write to fd: %s",
237           strerror (errno));
238       ret = FALSE;
239       goto done;
240     }
241     size -= written;
242     offset += written;
243   }
244
245 done:
246   return ret;
247 }
248
249 static gboolean
250 write_byte_writer_to_fd (GstIpcPipelineComm * comm, GstByteWriter * bw)
251 {
252   guint8 *data;
253   gboolean ret;
254   guint size;
255
256   size = gst_byte_writer_get_size (bw);
257   data = gst_byte_writer_reset_and_get_data (bw);
258   if (!data)
259     return FALSE;
260   ret = write_to_fd_raw (comm, data, size);
261   g_free (data);
262   return ret;
263 }
264
265 static void
266 gst_ipc_pipeline_comm_write_ack_to_fd (GstIpcPipelineComm * comm, guint32 id,
267     guint32 ret, CommRequestType type)
268 {
269   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK;
270   guint32 size;
271   GstByteWriter bw;
272
273   g_mutex_lock (&comm->mutex);
274
275   GST_TRACE_OBJECT (comm->element, "Writing ACK for %u: %s (%d)", id,
276       comm_request_ret_get_name (type, ret), ret);
277   gst_byte_writer_init (&bw);
278   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
279     goto write_failed;
280   if (!gst_byte_writer_put_uint32_le (&bw, id))
281     goto write_failed;
282   size = sizeof (ret);
283   if (!gst_byte_writer_put_uint32_le (&bw, size))
284     goto write_failed;
285   if (!gst_byte_writer_put_uint32_le (&bw, ret))
286     goto write_failed;
287
288   if (!write_byte_writer_to_fd (comm, &bw))
289     goto write_failed;
290
291 done:
292   g_mutex_unlock (&comm->mutex);
293   gst_byte_writer_reset (&bw);
294   return;
295
296 write_failed:
297   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
298       ("Failed to write to socket"));
299   goto done;
300 }
301
302 void
303 gst_ipc_pipeline_comm_write_flow_ack_to_fd (GstIpcPipelineComm * comm,
304     guint32 id, GstFlowReturn ret)
305 {
306   gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
307       COMM_REQUEST_TYPE_BUFFER);
308 }
309
310 void
311 gst_ipc_pipeline_comm_write_boolean_ack_to_fd (GstIpcPipelineComm * comm,
312     guint32 id, gboolean ret)
313 {
314   gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
315       COMM_REQUEST_TYPE_EVENT);
316 }
317
318 void
319 gst_ipc_pipeline_comm_write_state_change_ack_to_fd (GstIpcPipelineComm * comm,
320     guint32 id, GstStateChangeReturn ret)
321 {
322   gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
323       COMM_REQUEST_TYPE_STATE_CHANGE);
324 }
325
326 void
327 gst_ipc_pipeline_comm_write_query_result_to_fd (GstIpcPipelineComm * comm,
328     guint32 id, gboolean result, GstQuery * query)
329 {
330   const unsigned char payload_type =
331       GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT;
332   guint8 result8 = result;
333   guint32 size;
334   size_t len;
335   char *str = NULL;
336   guint32 type;
337   const GstStructure *structure;
338   GstByteWriter bw;
339
340   g_mutex_lock (&comm->mutex);
341
342   GST_TRACE_OBJECT (comm->element,
343       "Writing query result for %u: %d, %" GST_PTR_FORMAT, id, result, query);
344   gst_byte_writer_init (&bw);
345   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
346     goto write_failed;
347   if (!gst_byte_writer_put_uint32_le (&bw, id))
348     goto write_failed;
349   structure = gst_query_get_structure (query);
350   if (structure) {
351     str = gst_structure_to_string (structure);
352     len = strlen (str);
353   } else {
354     str = NULL;
355     len = 0;
356   }
357   size = 1 + sizeof (guint32) + len + 1;
358   if (!gst_byte_writer_put_uint32_le (&bw, size))
359     goto write_failed;
360   if (!gst_byte_writer_put_uint8 (&bw, result8))
361     goto write_failed;
362   type = GST_QUERY_TYPE (query);
363   if (!gst_byte_writer_put_uint32_le (&bw, type))
364     goto write_failed;
365   if (str) {
366     if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, len + 1))
367       goto write_failed;
368   } else {
369     if (!gst_byte_writer_put_uint8 (&bw, 0))
370       goto write_failed;
371   }
372
373   if (!write_byte_writer_to_fd (comm, &bw))
374     goto write_failed;
375
376 done:
377   g_mutex_unlock (&comm->mutex);
378   gst_byte_writer_reset (&bw);
379   g_free (str);
380   return;
381
382 write_failed:
383   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
384       ("Failed to write to socket"));
385   goto done;
386 }
387
388 static gboolean
389 gst_ipc_pipeline_comm_read_query_result (GstIpcPipelineComm * comm,
390     guint32 size, GstQuery ** query)
391 {
392   gchar *end = NULL;
393   GstStructure *structure;
394   guint8 result;
395   guint32 type;
396   const guint8 *payload = NULL;
397   guint32 mapped_size = size;
398
399   /* this should not be called if we don't have enough yet */
400   *query = NULL;
401   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE);
402   g_return_val_if_fail (size >= 1 + sizeof (guint32), FALSE);
403
404   payload = gst_adapter_map (comm->adapter, mapped_size);
405   if (!payload)
406     return FALSE;
407   result = *payload++;
408   memcpy (&type, payload, sizeof (type));
409   payload += sizeof (type);
410
411   size -= 1 + sizeof (guint32);
412   if (size == 0)
413     goto done;
414
415   if (payload[size - 1]) {
416     result = FALSE;
417     goto done;
418   }
419   if (*payload) {
420     structure = gst_structure_from_string ((const char *) payload, &end);
421   } else {
422     structure = NULL;
423   }
424   if (!structure) {
425     result = FALSE;
426     goto done;
427   }
428
429   *query = gst_query_new_custom (type, structure);
430
431 done:
432   gst_adapter_unmap (comm->adapter);
433   gst_adapter_flush (comm->adapter, mapped_size);
434   return result;
435 }
436
437 typedef struct
438 {
439   guint32 bytes;
440
441   guint64 size;
442   guint32 flags;
443   guint64 api;
444   char *str;
445 } MetaBuildInfo;
446
447 typedef struct
448 {
449   GstIpcPipelineComm *comm;
450   guint32 n_meta;
451   guint32 total_bytes;
452   MetaBuildInfo *info;
453 } MetaListRepresentation;
454
455 static gboolean
456 build_meta (GstBuffer * buffer, GstMeta ** meta, gpointer user_data)
457 {
458   MetaListRepresentation *repr = user_data;
459
460   repr->n_meta++;
461   repr->info = g_realloc (repr->info, repr->n_meta * sizeof (MetaBuildInfo));
462   repr->info[repr->n_meta - 1].bytes =
463       /* 4 byte bytes */
464       4
465       /* 4 byte GstMetaFlags */
466       + 4
467       /* GstMetaInfo::api */
468       + 4 + strlen (g_type_name ((*meta)->info->api)) + 1
469       /* GstMetaInfo::size */
470       + 8
471       /* str length */
472       + 4;
473
474   repr->info[repr->n_meta - 1].flags = (*meta)->flags;
475   repr->info[repr->n_meta - 1].api = (*meta)->info->api;
476   repr->info[repr->n_meta - 1].size = (*meta)->info->size;
477   repr->info[repr->n_meta - 1].str = NULL;
478
479   /* GstMeta is a base class, and actual useful classes are all different...
480      So we list a few of them we know we want and ignore the open ended rest */
481   if ((*meta)->info->api == GST_PROTECTION_META_API_TYPE) {
482     GstProtectionMeta *m = (GstProtectionMeta *) * meta;
483     repr->info[repr->n_meta - 1].str = gst_structure_to_string (m->info);
484     repr->info[repr->n_meta - 1].bytes +=
485         strlen (repr->info[repr->n_meta - 1].str) + 1;
486     GST_TRACE_OBJECT (repr->comm->element, "Found GstMeta type %s: %s",
487         g_type_name ((*meta)->info->api), repr->info[repr->n_meta - 1].str);
488   } else {
489     GST_WARNING_OBJECT (repr->comm->element, "Ignoring GstMeta type %s",
490         g_type_name ((*meta)->info->api));
491   }
492   repr->total_bytes += repr->info[repr->n_meta - 1].bytes;
493   return TRUE;
494 }
495
496 typedef struct
497 {
498   guint64 pts;
499   guint64 dts;
500   guint64 duration;
501   guint64 offset;
502   guint64 offset_end;
503   guint64 flags;
504 } CommBufferMetadata;
505
506 GstFlowReturn
507 gst_ipc_pipeline_comm_write_buffer_to_fd (GstIpcPipelineComm * comm,
508     GstBuffer * buffer)
509 {
510   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER;
511   GstMapInfo map;
512   guint32 ret32 = GST_FLOW_OK;
513   guint32 size, n;
514   CommBufferMetadata meta;
515   GstFlowReturn ret;
516   MetaListRepresentation repr = { comm, 0, 4, NULL };   /* starts a 4 for n_meta */
517   GstByteWriter bw;
518
519   g_mutex_lock (&comm->mutex);
520   ++comm->send_id;
521
522   GST_TRACE_OBJECT (comm->element, "Writing buffer %u: %" GST_PTR_FORMAT,
523       comm->send_id, buffer);
524
525   gst_byte_writer_init (&bw);
526
527   meta.pts = GST_BUFFER_PTS (buffer);
528   meta.dts = GST_BUFFER_DTS (buffer);
529   meta.duration = GST_BUFFER_DURATION (buffer);
530   meta.offset = GST_BUFFER_OFFSET (buffer);
531   meta.offset_end = GST_BUFFER_OFFSET_END (buffer);
532   meta.flags = GST_BUFFER_FLAGS (buffer);
533
534   /* work out meta size */
535   gst_buffer_foreach_meta (buffer, build_meta, &repr);
536
537   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
538     goto write_failed;
539   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
540     goto write_failed;
541   size =
542       gst_buffer_get_size (buffer) + sizeof (guint32) +
543       sizeof (CommBufferMetadata) + repr.total_bytes;
544   if (!gst_byte_writer_put_uint32_le (&bw, size))
545     goto write_failed;
546   if (!gst_byte_writer_put_data (&bw, (const guint8 *) &meta, sizeof (meta)))
547     goto write_failed;
548   size = gst_buffer_get_size (buffer);
549   if (!gst_byte_writer_put_uint32_le (&bw, size))
550     goto write_failed;
551   if (!write_byte_writer_to_fd (comm, &bw))
552     goto write_failed;
553
554   if (!gst_buffer_map (buffer, &map, GST_MAP_READ))
555     goto map_failed;
556   ret = write_to_fd_raw (comm, map.data, map.size);
557   gst_buffer_unmap (buffer, &map);
558   if (!ret)
559     goto write_failed;
560
561   /* meta */
562   gst_byte_writer_init (&bw);
563   if (!gst_byte_writer_put_uint32_le (&bw, repr.n_meta))
564     goto write_failed;
565   for (n = 0; n < repr.n_meta; ++n) {
566     const MetaBuildInfo *info = repr.info + n;
567     guint32 len;
568     const char *s;
569
570     if (!gst_byte_writer_put_uint32_le (&bw, info->bytes))
571       goto write_failed;
572
573     if (!gst_byte_writer_put_uint32_le (&bw, info->flags))
574       goto write_failed;
575
576     s = g_type_name (info->api);
577     len = strlen (s) + 1;
578     if (!gst_byte_writer_put_uint32_le (&bw, len))
579       goto write_failed;
580     if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len))
581       goto write_failed;
582
583     if (!gst_byte_writer_put_uint64_le (&bw, info->size))
584       goto write_failed;
585
586     s = info->str;
587     len = s ? (strlen (s) + 1) : 0;
588     if (!gst_byte_writer_put_uint32_le (&bw, len))
589       goto write_failed;
590     if (len)
591       if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len))
592         goto write_failed;
593   }
594
595   if (!write_byte_writer_to_fd (comm, &bw))
596     goto write_failed;
597
598   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
599           ACK_TYPE_BLOCKING, COMM_REQUEST_TYPE_BUFFER))
600     goto wait_failed;
601   ret = ret32;
602
603 done:
604   g_mutex_unlock (&comm->mutex);
605   gst_byte_writer_reset (&bw);
606   for (n = 0; n < repr.n_meta; ++n)
607     g_free (repr.info[n].str);
608   g_free (repr.info);
609   return ret;
610
611 write_failed:
612   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
613       ("Failed to write to socket"));
614   ret = GST_FLOW_COMM_ERROR;
615   goto done;
616
617 wait_failed:
618   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
619       ("Failed to wait for reply on socket"));
620   ret = GST_FLOW_COMM_ERROR;
621   goto done;
622
623 map_failed:
624   GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
625       ("Failed to map buffer"));
626   ret = GST_FLOW_ERROR;
627   goto done;
628 }
629
630 static GstBuffer *
631 gst_ipc_pipeline_comm_read_buffer (GstIpcPipelineComm * comm, guint32 size)
632 {
633   GstBuffer *buffer;
634   CommBufferMetadata meta;
635   guint32 n_meta, n;
636   const guint8 *payload = NULL;
637   guint32 mapped_size, buffer_data_size;
638
639   /* this should not be called if we don't have enough yet */
640   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
641   g_return_val_if_fail (size >= sizeof (CommBufferMetadata), NULL);
642
643   mapped_size = sizeof (CommBufferMetadata) + sizeof (buffer_data_size);
644   payload = gst_adapter_map (comm->adapter, mapped_size);
645   if (!payload)
646     return NULL;
647   memcpy (&meta, payload, sizeof (CommBufferMetadata));
648   payload += sizeof (CommBufferMetadata);
649   memcpy (&buffer_data_size, payload, sizeof (buffer_data_size));
650   size -= mapped_size;
651   gst_adapter_unmap (comm->adapter);
652   gst_adapter_flush (comm->adapter, mapped_size);
653
654   if (buffer_data_size == 0) {
655     buffer = gst_buffer_new ();
656   } else {
657     buffer = gst_adapter_get_buffer (comm->adapter, buffer_data_size);
658     gst_adapter_flush (comm->adapter, buffer_data_size);
659   }
660   size -= buffer_data_size;
661
662   GST_BUFFER_PTS (buffer) = meta.pts;
663   GST_BUFFER_DTS (buffer) = meta.dts;
664   GST_BUFFER_DURATION (buffer) = meta.duration;
665   GST_BUFFER_OFFSET (buffer) = meta.offset;
666   GST_BUFFER_OFFSET_END (buffer) = meta.offset_end;
667   GST_BUFFER_FLAGS (buffer) = meta.flags;
668
669   /* If you don't call that, the GType isn't yet known at the
670      g_type_from_name below */
671   gst_protection_meta_get_info ();
672
673   mapped_size = size;
674   payload = gst_adapter_map (comm->adapter, mapped_size);
675   if (!payload) {
676     gst_buffer_unref (buffer);
677     return NULL;
678   }
679   memcpy (&n_meta, payload, sizeof (n_meta));
680   payload += sizeof (n_meta);
681
682   for (n = 0; n < n_meta; ++n) {
683     guint32 flags, len, bytes;
684     guint64 msize;
685     GType api;
686     GstMeta *meta;
687     GstStructure *structure = NULL;
688
689     memcpy (&bytes, payload, sizeof (bytes));
690     payload += sizeof (bytes);
691
692 #define READ_FIELD(f) do { \
693     memcpy (&f, payload, sizeof (f)); \
694     payload += sizeof(f); \
695     } while(0)
696
697     READ_FIELD (flags);
698     READ_FIELD (len);
699     api = g_type_from_name ((const char *) payload);
700     payload = (const guint8 *) strchr ((const char *) payload, 0) + 1;
701     READ_FIELD (msize);
702     READ_FIELD (len);
703     if (len) {
704       structure = gst_structure_new_from_string ((const char *) payload);
705       payload += len + 1;
706     }
707
708     /* Seems we can add a meta from the api nor type ? */
709     if (api == GST_PROTECTION_META_API_TYPE) {
710       meta =
711           gst_buffer_add_meta (buffer, gst_protection_meta_get_info (), NULL);
712       ((GstProtectionMeta *) meta)->info = structure;
713     } else {
714       GST_WARNING_OBJECT (comm->element, "Unsupported meta: %s",
715           g_type_name (api));
716       if (structure)
717         gst_structure_free (structure);
718     }
719
720 #undef READ_FIELD
721
722   }
723
724   gst_adapter_unmap (comm->adapter);
725   gst_adapter_flush (comm->adapter, mapped_size);
726
727   return buffer;
728 }
729
730 static gboolean
731 gst_ipc_pipeline_comm_write_sink_message_event_to_fd (GstIpcPipelineComm * comm,
732     GstEvent * event)
733 {
734   const unsigned char payload_type =
735       GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT;
736   gboolean ret;
737   guint32 type, size, eseqnum, mseqnum, ret32 = TRUE, slen, structure_slen;
738   char *str = NULL;
739   const GstStructure *structure;
740   GstMessage *message = NULL;
741   const char *name;
742   GstByteWriter bw;
743
744   g_return_val_if_fail (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE,
745       FALSE);
746
747   g_mutex_lock (&comm->mutex);
748   ++comm->send_id;
749
750   GST_TRACE_OBJECT (comm->element,
751       "Writing sink message event %u: %" GST_PTR_FORMAT, comm->send_id, event);
752
753   gst_byte_writer_init (&bw);
754   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
755     goto write_failed;
756   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
757     goto write_failed;
758   name = gst_structure_get_name (gst_event_get_structure (event));
759   slen = strlen (name) + 1;
760   gst_event_parse_sink_message (event, &message);
761   structure = gst_message_get_structure (message);
762   if (structure) {
763     str = gst_structure_to_string (structure);
764     structure_slen = strlen (str);
765   } else {
766     str = NULL;
767     structure_slen = 0;
768   }
769   size = sizeof (type) + sizeof (eseqnum) + sizeof (mseqnum) + sizeof (slen) +
770       strlen (name) + 1 + structure_slen + 1;
771   if (!gst_byte_writer_put_uint32_le (&bw, size))
772     goto write_failed;
773
774   type = GST_MESSAGE_TYPE (message);
775   if (!gst_byte_writer_put_uint32_le (&bw, type))
776     goto write_failed;
777   size -= sizeof (type);
778
779   eseqnum = GST_EVENT_SEQNUM (event);
780   if (!gst_byte_writer_put_uint32_le (&bw, eseqnum))
781     goto write_failed;
782   size -= sizeof (eseqnum);
783
784   mseqnum = GST_MESSAGE_SEQNUM (message);
785   if (!gst_byte_writer_put_uint32_le (&bw, mseqnum))
786     goto write_failed;
787   size -= sizeof (mseqnum);
788
789   if (!gst_byte_writer_put_uint32_le (&bw, slen))
790     goto write_failed;
791   size -= sizeof (slen);
792
793   if (!gst_byte_writer_put_data (&bw, (const guint8 *) name, slen))
794     goto write_failed;
795   size -= slen;
796
797   if (str) {
798     if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size))
799       goto write_failed;
800   } else {
801     if (!gst_byte_writer_put_uint8 (&bw, 0))
802       goto write_failed;
803   }
804
805   if (!write_byte_writer_to_fd (comm, &bw))
806     goto write_failed;
807
808   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
809           GST_EVENT_IS_SERIALIZED (event) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED,
810           COMM_REQUEST_TYPE_EVENT))
811     goto write_failed;
812
813   ret = ret32;
814
815 done:
816   g_mutex_unlock (&comm->mutex);
817   gst_byte_writer_reset (&bw);
818   g_free (str);
819   if (message)
820     gst_message_unref (message);
821   return ret;
822
823 write_failed:
824   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
825       ("Failed to write to socket"));
826   ret = FALSE;
827   goto done;
828 }
829
830 static GstEvent *
831 gst_ipc_pipeline_comm_read_sink_message_event (GstIpcPipelineComm * comm,
832     guint32 size)
833 {
834   GstMessage *message;
835   GstEvent *event = NULL;
836   gchar *end = NULL;
837   GstStructure *structure;
838   guint32 type, eseqnum, mseqnum, slen;
839   const char *name;
840   guint32 mapped_size = size;
841   const guint8 *payload;
842
843   /* this should not be called if we don't have enough yet */
844   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
845   g_return_val_if_fail (size >= sizeof (type) + sizeof (slen), NULL);
846
847   payload = gst_adapter_map (comm->adapter, mapped_size);
848   if (!payload)
849     return NULL;
850   memcpy (&type, payload, sizeof (type));
851   payload += sizeof (type);
852   size -= sizeof (type);
853   if (size == 0)
854     goto done;
855
856   memcpy (&eseqnum, payload, sizeof (eseqnum));
857   payload += sizeof (eseqnum);
858   size -= sizeof (eseqnum);
859   if (size == 0)
860     goto done;
861
862   memcpy (&mseqnum, payload, sizeof (mseqnum));
863   payload += sizeof (mseqnum);
864   size -= sizeof (mseqnum);
865   if (size == 0)
866     goto done;
867
868   memcpy (&slen, payload, sizeof (slen));
869   payload += sizeof (slen);
870   size -= sizeof (slen);
871   if (size == 0)
872     goto done;
873
874   if (payload[slen - 1])
875     goto done;
876   name = (const char *) payload;
877   payload += slen;
878   size -= slen;
879
880   if ((payload)[size - 1]) {
881     goto done;
882   }
883   if (*payload) {
884     structure = gst_structure_from_string ((const char *) payload, &end);
885   } else {
886     structure = NULL;
887   }
888
889   message =
890       gst_message_new_custom (type, GST_OBJECT (comm->element), structure);
891   gst_message_set_seqnum (message, mseqnum);
892   event = gst_event_new_sink_message (name, message);
893   gst_event_set_seqnum (event, eseqnum);
894   gst_message_unref (message);
895
896 done:
897   gst_adapter_unmap (comm->adapter);
898   gst_adapter_flush (comm->adapter, mapped_size);
899   return event;
900 }
901
902 gboolean
903 gst_ipc_pipeline_comm_write_event_to_fd (GstIpcPipelineComm * comm,
904     gboolean upstream, GstEvent * event)
905 {
906   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT;
907   gboolean ret;
908   guint32 type, size, ret32 = TRUE, seqnum, slen;
909   char *str = NULL;
910   const GstStructure *structure;
911   GstByteWriter bw;
912
913   /* we special case sink-message event as gst can't serialize/de-serialize it */
914   if (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE)
915     return gst_ipc_pipeline_comm_write_sink_message_event_to_fd (comm, event);
916
917   g_mutex_lock (&comm->mutex);
918   ++comm->send_id;
919
920   GST_TRACE_OBJECT (comm->element, "Writing event %u: %" GST_PTR_FORMAT,
921       comm->send_id, event);
922
923   gst_byte_writer_init (&bw);
924   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
925     goto write_failed;
926   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
927     goto write_failed;
928   structure = gst_event_get_structure (event);
929   if (structure) {
930
931     if (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START) {
932       GstStructure *s = gst_structure_copy (structure);
933       gst_structure_remove_field (s, "stream");
934       str = gst_structure_to_string (s);
935       gst_structure_free (s);
936     } else {
937       str = gst_structure_to_string (structure);
938     }
939
940     slen = strlen (str);
941   } else {
942     str = NULL;
943     slen = 0;
944   }
945   size = sizeof (type) + sizeof (seqnum) + 1 + slen + 1;
946   if (!gst_byte_writer_put_uint32_le (&bw, size))
947     goto write_failed;
948
949   type = GST_EVENT_TYPE (event);
950   if (!gst_byte_writer_put_uint32_le (&bw, type))
951     goto write_failed;
952
953   seqnum = GST_EVENT_SEQNUM (event);
954   if (!gst_byte_writer_put_uint32_le (&bw, seqnum))
955     goto write_failed;
956
957   if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0))
958     goto write_failed;
959
960   if (str) {
961     if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1))
962       goto write_failed;
963   } else {
964     if (!gst_byte_writer_put_uint8 (&bw, 0))
965       goto write_failed;
966   }
967
968   if (!write_byte_writer_to_fd (comm, &bw))
969     goto write_failed;
970
971   /* Upstream events get serialized, this is required to send seeks only
972    * one at a time. */
973   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
974           (GST_EVENT_IS_SERIALIZED (event) || GST_EVENT_IS_UPSTREAM (event)) ?
975           ACK_TYPE_BLOCKING : ACK_TYPE_NONE, COMM_REQUEST_TYPE_EVENT))
976     goto write_failed;
977   ret = ret32;
978
979 done:
980   g_mutex_unlock (&comm->mutex);
981   g_free (str);
982   gst_byte_writer_reset (&bw);
983   return ret;
984
985 write_failed:
986   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
987       ("Failed to write to socket"));
988   ret = FALSE;
989   goto done;
990 }
991
992 static GstEvent *
993 gst_ipc_pipeline_comm_read_event (GstIpcPipelineComm * comm, guint32 size,
994     gboolean * upstream)
995 {
996   GstEvent *event = NULL;
997   gchar *end = NULL;
998   GstStructure *structure;
999   guint32 type, seqnum;
1000   guint32 mapped_size = size;
1001   const guint8 *payload;
1002
1003   /* this should not be called if we don't have enough yet */
1004   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1005   g_return_val_if_fail (size >= sizeof (type), NULL);
1006
1007   payload = gst_adapter_map (comm->adapter, mapped_size);
1008   if (!payload)
1009     return NULL;
1010
1011   memcpy (&type, payload, sizeof (type));
1012   payload += sizeof (type);
1013   size -= sizeof (type);
1014   if (size == 0)
1015     goto done;
1016
1017   memcpy (&seqnum, payload, sizeof (seqnum));
1018   payload += sizeof (seqnum);
1019   size -= sizeof (seqnum);
1020   if (size == 0)
1021     goto done;
1022
1023   *upstream = (*payload) ? TRUE : FALSE;
1024   payload += 1;
1025   size -= 1;
1026   if (size == 0)
1027     goto done;
1028
1029   if (payload[size - 1])
1030     goto done;
1031   if (*payload) {
1032     structure = gst_structure_from_string ((const char *) payload, &end);
1033   } else {
1034     structure = NULL;
1035   }
1036
1037   event = gst_event_new_custom (type, structure);
1038   gst_event_set_seqnum (event, seqnum);
1039
1040 done:
1041   gst_adapter_unmap (comm->adapter);
1042   gst_adapter_flush (comm->adapter, mapped_size);
1043   return event;
1044 }
1045
1046 gboolean
1047 gst_ipc_pipeline_comm_write_query_to_fd (GstIpcPipelineComm * comm,
1048     gboolean upstream, GstQuery * query)
1049 {
1050   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY;
1051   gboolean ret;
1052   guint32 type, size, ret32 = TRUE, slen;
1053   char *str = NULL;
1054   const GstStructure *structure;
1055   GstByteWriter bw;
1056
1057   g_mutex_lock (&comm->mutex);
1058   ++comm->send_id;
1059
1060   GST_TRACE_OBJECT (comm->element, "Writing query %u: %" GST_PTR_FORMAT,
1061       comm->send_id, query);
1062
1063   gst_byte_writer_init (&bw);
1064   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1065     goto write_failed;
1066   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1067     goto write_failed;
1068   structure = gst_query_get_structure (query);
1069   if (structure) {
1070     str = gst_structure_to_string (structure);
1071     slen = strlen (str);
1072   } else {
1073     str = NULL;
1074     slen = 0;
1075   }
1076   size = sizeof (type) + 1 + slen + 1;
1077   if (!gst_byte_writer_put_uint32_le (&bw, size))
1078     goto write_failed;
1079
1080   type = GST_QUERY_TYPE (query);
1081   if (!gst_byte_writer_put_uint32_le (&bw, type))
1082     goto write_failed;
1083
1084   if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0))
1085     goto write_failed;
1086
1087   if (str) {
1088     if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1))
1089       goto write_failed;
1090   } else {
1091     if (!gst_byte_writer_put_uint8 (&bw, 0))
1092       goto write_failed;
1093   }
1094
1095   if (!write_byte_writer_to_fd (comm, &bw))
1096     goto write_failed;
1097
1098   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, query, &ret32,
1099           GST_QUERY_IS_SERIALIZED (query) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED,
1100           COMM_REQUEST_TYPE_QUERY))
1101     goto write_failed;
1102
1103   ret = ret32;
1104
1105 done:
1106   g_mutex_unlock (&comm->mutex);
1107   g_free (str);
1108   gst_byte_writer_reset (&bw);
1109   return ret;
1110
1111 write_failed:
1112   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1113       ("Failed to write to socket"));
1114   ret = FALSE;
1115   goto done;
1116 }
1117
1118 static GstQuery *
1119 gst_ipc_pipeline_comm_read_query (GstIpcPipelineComm * comm, guint32 size,
1120     gboolean * upstream)
1121 {
1122   GstQuery *query = NULL;
1123   gchar *end = NULL;
1124   GstStructure *structure;
1125   guint32 type;
1126   guint32 mapped_size = size;
1127   const guint8 *payload;
1128
1129   /* this should not be called if we don't have enough yet */
1130   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1131   g_return_val_if_fail (size >= sizeof (type), NULL);
1132
1133   payload = gst_adapter_map (comm->adapter, mapped_size);
1134   if (!payload)
1135     return NULL;
1136
1137   memcpy (&type, payload, sizeof (type));
1138   payload += sizeof (type);
1139   size -= sizeof (type);
1140   if (size == 0)
1141     goto done;
1142
1143   *upstream = (*payload) ? TRUE : FALSE;
1144   payload += 1;
1145   size -= 1;
1146   if (size == 0)
1147     goto done;
1148
1149   if (payload[size - 1])
1150     goto done;
1151   if (*payload) {
1152     structure = gst_structure_from_string ((const char *) payload, &end);
1153   } else {
1154     structure = NULL;
1155   }
1156
1157   query = gst_query_new_custom (type, structure);
1158
1159   /* CAPS queries contain a filter field, of GstCaps type, which can be NULL.
1160      This does not play well with the serialization/deserialization system,
1161      which will give us a non-NULL GstCaps which has a value of NULL. This
1162      in turn wreaks havoc with any code that tests whether filter is NULL
1163      (which basically means, am I being given an optional GstCaps ?).
1164      So we look for non-NULL GstCaps which have NULL contents, and replace
1165      them with NULL instead. */
1166   if (GST_QUERY_TYPE (query) == GST_QUERY_CAPS) {
1167     GstCaps *filter;
1168     gst_query_parse_caps (query, &filter);
1169     if (filter
1170         && !strcmp (gst_structure_get_name (gst_caps_get_structure (filter, 0)),
1171             "NULL")) {
1172       gst_query_unref (query);
1173       query = gst_query_new_caps (NULL);
1174     }
1175   }
1176
1177 done:
1178   gst_adapter_unmap (comm->adapter);
1179   gst_adapter_flush (comm->adapter, mapped_size);
1180   return query;
1181 }
1182
1183 GstStateChangeReturn
1184 gst_ipc_pipeline_comm_write_state_change_to_fd (GstIpcPipelineComm * comm,
1185     GstStateChange transition)
1186 {
1187   const unsigned char payload_type =
1188       GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE;
1189   GstStateChangeReturn ret;
1190   guint32 size, ret32 = GST_STATE_CHANGE_SUCCESS;
1191   GstByteWriter bw;
1192
1193   g_mutex_lock (&comm->mutex);
1194   ++comm->send_id;
1195
1196   GST_TRACE_OBJECT (comm->element, "Writing state change %u: %s -> %s",
1197       comm->send_id,
1198       gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
1199       gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
1200
1201   gst_byte_writer_init (&bw);
1202   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1203     goto write_failed;
1204   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1205     goto write_failed;
1206   size = sizeof (transition);
1207   if (!gst_byte_writer_put_uint32_le (&bw, size))
1208     goto write_failed;
1209   if (!gst_byte_writer_put_uint32_le (&bw, transition))
1210     goto write_failed;
1211
1212   if (!write_byte_writer_to_fd (comm, &bw))
1213     goto write_failed;
1214
1215   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1216           ACK_TYPE_TIMED, COMM_REQUEST_TYPE_STATE_CHANGE))
1217     goto write_failed;
1218   ret = ret32;
1219
1220 done:
1221   g_mutex_unlock (&comm->mutex);
1222   gst_byte_writer_reset (&bw);
1223   return ret;
1224
1225 write_failed:
1226   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1227       ("Failed to write to socket"));
1228   ret = GST_STATE_CHANGE_FAILURE;
1229   goto done;
1230 }
1231
1232 static gboolean
1233 is_valid_state_change (GstStateChange transition)
1234 {
1235   if (transition == GST_STATE_CHANGE_NULL_TO_READY)
1236     return TRUE;
1237   if (transition == GST_STATE_CHANGE_READY_TO_PAUSED)
1238     return TRUE;
1239   if (transition == GST_STATE_CHANGE_PAUSED_TO_PLAYING)
1240     return TRUE;
1241   if (transition == GST_STATE_CHANGE_PLAYING_TO_PAUSED)
1242     return TRUE;
1243   if (transition == GST_STATE_CHANGE_PAUSED_TO_READY)
1244     return TRUE;
1245   if (transition == GST_STATE_CHANGE_READY_TO_NULL)
1246     return TRUE;
1247   if (GST_STATE_TRANSITION_CURRENT (transition) ==
1248       GST_STATE_TRANSITION_NEXT (transition))
1249     return TRUE;
1250   return FALSE;
1251 }
1252
1253 static gboolean
1254 gst_ipc_pipeline_comm_read_state_change (GstIpcPipelineComm * comm,
1255     guint32 size, guint32 * transition)
1256 {
1257   guint32 mapped_size = size;
1258   const guint8 *payload;
1259
1260   /* this should not be called if we don't have enough yet */
1261   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE);
1262   g_return_val_if_fail (size >= sizeof (*transition), FALSE);
1263
1264   payload = gst_adapter_map (comm->adapter, size);
1265   if (!payload)
1266     return FALSE;
1267   memcpy (transition, payload, sizeof (*transition));
1268   gst_adapter_unmap (comm->adapter);
1269   gst_adapter_flush (comm->adapter, mapped_size);
1270   return is_valid_state_change (*transition);
1271 }
1272
1273 void
1274 gst_ipc_pipeline_comm_write_state_lost_to_fd (GstIpcPipelineComm * comm)
1275 {
1276   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST;
1277   guint32 size;
1278   GstByteWriter bw;
1279
1280   g_mutex_lock (&comm->mutex);
1281   ++comm->send_id;
1282
1283   GST_TRACE_OBJECT (comm->element, "Writing state-lost %u", comm->send_id);
1284   gst_byte_writer_init (&bw);
1285   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1286     goto write_failed;
1287   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1288     goto write_failed;
1289   size = 0;
1290   if (!gst_byte_writer_put_uint32_le (&bw, size))
1291     goto write_failed;
1292
1293   if (!write_byte_writer_to_fd (comm, &bw))
1294     goto write_failed;
1295
1296 done:
1297   g_mutex_unlock (&comm->mutex);
1298   gst_byte_writer_reset (&bw);
1299   return;
1300
1301 write_failed:
1302   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1303       ("Failed to write to socket"));
1304   goto done;
1305 }
1306
1307 static gboolean
1308 gst_ipc_pipeline_comm_read_state_lost (GstIpcPipelineComm * comm, guint32 size)
1309 {
1310   /* no payload */
1311   return TRUE;
1312 }
1313
1314 static gboolean
1315 gst_ipc_pipeline_comm_write_gerror_message_to_fd (GstIpcPipelineComm * comm,
1316     GstMessage * message)
1317 {
1318   const unsigned char payload_type =
1319       GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE;
1320   gboolean ret;
1321   guint32 code, size, ret32 = TRUE;
1322   char *str = NULL;
1323   GError *error;
1324   char *extra_message;
1325   const char *domain_string;
1326   unsigned char msgtype;
1327   GstByteWriter bw;
1328
1329   g_mutex_lock (&comm->mutex);
1330   ++comm->send_id;
1331
1332   if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR) {
1333     gst_message_parse_error (message, &error, &extra_message);
1334     msgtype = 2;
1335   } else if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING) {
1336     gst_message_parse_warning (message, &error, &extra_message);
1337     msgtype = 1;
1338   } else {
1339     gst_message_parse_info (message, &error, &extra_message);
1340     msgtype = 0;
1341   }
1342   code = error->code;
1343   domain_string = g_quark_to_string (error->domain);
1344   GST_TRACE_OBJECT (comm->element,
1345       "Writing error %u: domain %s, code %u, message %s, extra message %s",
1346       comm->send_id, domain_string, error->code, error->message, extra_message);
1347
1348   gst_byte_writer_init (&bw);
1349   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1350     goto write_failed;
1351   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1352     goto write_failed;
1353
1354   size = sizeof (size);
1355   size += 1;
1356   size += strlen (domain_string) + 1;
1357   size += sizeof (code);
1358   size += sizeof (size);
1359   size += error->message ? strlen (error->message) + 1 : 0;
1360   size += sizeof (size);
1361   size += extra_message ? strlen (extra_message) + 1 : 0;
1362
1363   if (!gst_byte_writer_put_uint32_le (&bw, size))
1364     goto write_failed;
1365
1366   if (!gst_byte_writer_put_uint8 (&bw, msgtype))
1367     goto write_failed;
1368   size = strlen (domain_string) + 1;
1369   if (!gst_byte_writer_put_uint32_le (&bw, size))
1370     goto write_failed;
1371   if (!gst_byte_writer_put_data (&bw, (const guint8 *) domain_string, size))
1372     goto write_failed;
1373   if (!gst_byte_writer_put_uint32_le (&bw, code))
1374     goto write_failed;
1375   size = error->message ? strlen (error->message) + 1 : 0;
1376   if (!gst_byte_writer_put_uint32_le (&bw, size))
1377     goto write_failed;
1378   if (error->message) {
1379     if (!gst_byte_writer_put_data (&bw, (const guint8 *) error->message, size))
1380       goto write_failed;
1381   }
1382   size = extra_message ? strlen (extra_message) + 1 : 0;
1383   if (!gst_byte_writer_put_uint32_le (&bw, size))
1384     goto write_failed;
1385   if (extra_message) {
1386     if (!gst_byte_writer_put_data (&bw, (const guint8 *) extra_message, size))
1387       goto write_failed;
1388   }
1389
1390   if (!write_byte_writer_to_fd (comm, &bw))
1391     goto write_failed;
1392
1393   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1394           ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE))
1395     goto write_failed;
1396
1397   ret = ret32;
1398
1399 done:
1400   g_mutex_unlock (&comm->mutex);
1401   g_free (str);
1402   g_error_free (error);
1403   g_free (extra_message);
1404   gst_byte_writer_reset (&bw);
1405   return ret;
1406
1407 write_failed:
1408   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1409       ("Failed to write to socket"));
1410   ret = FALSE;
1411   goto done;
1412 }
1413
1414 static GstMessage *
1415 gst_ipc_pipeline_comm_read_gerror_message (GstIpcPipelineComm * comm,
1416     guint32 size)
1417 {
1418   GstMessage *message = NULL;
1419   guint32 code;
1420   GQuark domain;
1421   const char *msg, *extra_message;
1422   GError *error;
1423   unsigned char msgtype;
1424   guint32 mapped_size = size;
1425   const guint8 *payload;
1426
1427   /* this should not be called if we don't have enough yet */
1428   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1429   g_return_val_if_fail (size >= sizeof (code) + sizeof (size) * 3 + 1 + 1,
1430       NULL);
1431
1432   payload = gst_adapter_map (comm->adapter, mapped_size);
1433   if (!payload)
1434     return NULL;
1435   msgtype = *payload++;
1436   memcpy (&size, payload, sizeof (size));
1437   payload += sizeof (size);
1438   if (payload[size - 1])
1439     goto done;
1440   domain = g_quark_from_string ((const char *) payload);
1441   payload += size;
1442
1443   memcpy (&code, payload, sizeof (code));
1444   payload += sizeof (code);
1445
1446   memcpy (&size, payload, sizeof (size));
1447   payload += sizeof (size);
1448   if (size) {
1449     if (payload[size - 1])
1450       goto done;
1451     msg = (const char *) payload;
1452   } else {
1453     msg = NULL;
1454   }
1455   payload += size;
1456
1457   memcpy (&size, payload, sizeof (size));
1458   payload += sizeof (size);
1459   if (size) {
1460     if (payload[size - 1])
1461       goto done;
1462     extra_message = (const char *) payload;
1463   } else {
1464     extra_message = NULL;
1465   }
1466   payload += size;
1467
1468   error = g_error_new (domain, code, "%s", msg);
1469   if (msgtype == 2)
1470     message =
1471         gst_message_new_error (GST_OBJECT (comm->element), error,
1472         extra_message);
1473   else if (msgtype == 1)
1474     message =
1475         gst_message_new_warning (GST_OBJECT (comm->element), error,
1476         extra_message);
1477   else
1478     message =
1479         gst_message_new_info (GST_OBJECT (comm->element), error, extra_message);
1480   g_error_free (error);
1481
1482 done:
1483   gst_adapter_unmap (comm->adapter);
1484   gst_adapter_flush (comm->adapter, mapped_size);
1485
1486   return message;
1487 }
1488
1489 gboolean
1490 gst_ipc_pipeline_comm_write_message_to_fd (GstIpcPipelineComm * comm,
1491     GstMessage * message)
1492 {
1493   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE;
1494   gboolean ret;
1495   guint32 type, size, ret32 = TRUE, slen;
1496   char *str = NULL;
1497   const GstStructure *structure;
1498   GstByteWriter bw;
1499
1500   /* we special case error as gst can't serialize/de-serialize it */
1501   if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR
1502       || GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING
1503       || GST_MESSAGE_TYPE (message) == GST_MESSAGE_INFO)
1504     return gst_ipc_pipeline_comm_write_gerror_message_to_fd (comm, message);
1505
1506   g_mutex_lock (&comm->mutex);
1507   ++comm->send_id;
1508
1509   GST_TRACE_OBJECT (comm->element, "Writing message %u: %" GST_PTR_FORMAT,
1510       comm->send_id, message);
1511
1512   gst_byte_writer_init (&bw);
1513   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1514     goto write_failed;
1515   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1516     goto write_failed;
1517   structure = gst_message_get_structure (message);
1518   if (structure) {
1519     str = gst_structure_to_string (structure);
1520     slen = strlen (str);
1521   } else {
1522     str = NULL;
1523     slen = 0;
1524   }
1525   size = sizeof (type) + slen + 1;
1526   if (!gst_byte_writer_put_uint32_le (&bw, size))
1527     goto write_failed;
1528
1529   type = GST_MESSAGE_TYPE (message);
1530   if (!gst_byte_writer_put_uint32_le (&bw, type))
1531     goto write_failed;
1532   size -= sizeof (type);
1533   if (str) {
1534     if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size))
1535       goto write_failed;
1536   } else {
1537     if (!gst_byte_writer_put_uint8 (&bw, 0))
1538       goto write_failed;
1539   }
1540
1541   if (!write_byte_writer_to_fd (comm, &bw))
1542     goto write_failed;
1543
1544   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1545           ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE))
1546     goto write_failed;
1547
1548   ret = ret32;
1549
1550 done:
1551   g_mutex_unlock (&comm->mutex);
1552   g_free (str);
1553   gst_byte_writer_reset (&bw);
1554   return ret;
1555
1556 write_failed:
1557   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1558       ("Failed to write to socket"));
1559   ret = FALSE;
1560   goto done;
1561 }
1562
1563 static GstMessage *
1564 gst_ipc_pipeline_comm_read_message (GstIpcPipelineComm * comm, guint32 size)
1565 {
1566   GstMessage *message = NULL;
1567   gchar *end = NULL;
1568   GstStructure *structure;
1569   guint32 type;
1570   guint32 mapped_size = size;
1571   const guint8 *payload;
1572
1573   /* this should not be called if we don't have enough yet */
1574   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1575   g_return_val_if_fail (size >= sizeof (type), NULL);
1576
1577   payload = gst_adapter_map (comm->adapter, mapped_size);
1578   if (!payload)
1579     return NULL;
1580   memcpy (&type, payload, sizeof (type));
1581   payload += sizeof (type);
1582   size -= sizeof (type);
1583   if (size == 0)
1584     goto done;
1585
1586   if (payload[size - 1])
1587     goto done;
1588   if (*payload) {
1589     structure = gst_structure_from_string ((const char *) payload, &end);
1590   } else {
1591     structure = NULL;
1592   }
1593
1594   message =
1595       gst_message_new_custom (type, GST_OBJECT (comm->element), structure);
1596
1597 done:
1598   gst_adapter_unmap (comm->adapter);
1599   gst_adapter_flush (comm->adapter, mapped_size);
1600
1601   return message;
1602 }
1603
1604 void
1605 gst_ipc_pipeline_comm_init (GstIpcPipelineComm * comm, GstElement * element)
1606 {
1607   g_mutex_init (&comm->mutex);
1608   comm->element = element;
1609   comm->fdin = comm->fdout = -1;
1610   comm->ack_time = DEFAULT_ACK_TIME;
1611   comm->waiting_ids =
1612       g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
1613       (GDestroyNotify) comm_request_free);
1614   comm->adapter = gst_adapter_new ();
1615   comm->poll = gst_poll_new (TRUE);
1616   gst_poll_fd_init (&comm->pollFDin);
1617 }
1618
1619 void
1620 gst_ipc_pipeline_comm_clear (GstIpcPipelineComm * comm)
1621 {
1622   g_hash_table_destroy (comm->waiting_ids);
1623   gst_object_unref (comm->adapter);
1624   gst_poll_free (comm->poll);
1625   g_mutex_clear (&comm->mutex);
1626 }
1627
1628 static void
1629 cancel_request (gpointer key, gpointer value, gpointer user_data,
1630     GstFlowReturn fret)
1631 {
1632   GstIpcPipelineComm *comm = (GstIpcPipelineComm *) user_data;
1633   guint32 id = GPOINTER_TO_INT (key);
1634   CommRequest *req = (CommRequest *) value;
1635
1636   GST_TRACE_OBJECT (comm->element, "Cancelling request %u, type %d", id,
1637       req->type);
1638   req->ret = fret;
1639   req->replied = TRUE;
1640   g_cond_signal (&req->cond);
1641 }
1642
1643 static void
1644 cancel_request_error (gpointer key, gpointer value, gpointer user_data)
1645 {
1646   CommRequest *req = (CommRequest *) value;
1647   GstFlowReturn fret = comm_request_ret_get_failure_value (req->type);
1648
1649   cancel_request (key, value, user_data, fret);
1650 }
1651
1652 void
1653 gst_ipc_pipeline_comm_cancel (GstIpcPipelineComm * comm, gboolean cleanup)
1654 {
1655   g_mutex_lock (&comm->mutex);
1656   g_hash_table_foreach (comm->waiting_ids, cancel_request_error, comm);
1657   if (cleanup) {
1658     g_hash_table_unref (comm->waiting_ids);
1659     comm->waiting_ids =
1660         g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
1661         (GDestroyNotify) comm_request_free);
1662   }
1663   g_mutex_unlock (&comm->mutex);
1664 }
1665
1666 static gboolean
1667 set_field (GQuark field_id, const GValue * value, gpointer user_data)
1668 {
1669   GstStructure *structure = user_data;
1670
1671   gst_structure_id_set_value (structure, field_id, value);
1672
1673   return TRUE;
1674 }
1675
1676 static gboolean
1677 gst_ipc_pipeline_comm_reply_request (GstIpcPipelineComm * comm, guint32 id,
1678     GstFlowReturn ret, GstQuery * query)
1679 {
1680   CommRequest *req;
1681
1682   req = g_hash_table_lookup (comm->waiting_ids, GINT_TO_POINTER (id));
1683   if (!req) {
1684     GST_WARNING_OBJECT (comm->element, "Got reply for unknown request %u", id);
1685     return FALSE;
1686   }
1687
1688   GST_TRACE_OBJECT (comm->element, "Got reply %d (%s) for request %u", ret,
1689       comm_request_ret_get_name (req->type, ret), req->id);
1690   req->replied = TRUE;
1691   req->ret = ret;
1692   if (query) {
1693     if (req->query) {
1694       /* We need to update the original query in place, as the caller
1695          will expect the object to be the same */
1696       GstStructure *structure = gst_query_writable_structure (req->query);
1697       gst_structure_remove_all_fields (structure);
1698       gst_structure_foreach (gst_query_get_structure (query), set_field,
1699           structure);
1700     } else {
1701       GST_WARNING_OBJECT (comm->element,
1702           "Got query reply, but no query was in the request");
1703     }
1704   }
1705   g_cond_signal (&req->cond);
1706   return TRUE;
1707 }
1708
1709 static gint
1710 update_adapter (GstIpcPipelineComm * comm)
1711 {
1712   GstMemory *mem = NULL;
1713   GstBuffer *buf;
1714   GstMapInfo map;
1715   ssize_t sz;
1716   gint ret = 0;
1717
1718 again:
1719   /* update pollFDin if necessary (fdin changed or we lost our parent).
1720    * we do not allow a parent-less element to communicate with its peer
1721    * in order to avoid race conditions where the slave tries to change
1722    * the state of its parent pipeline while it is not yet added in that
1723    * pipeline. */
1724   if (comm->pollFDin.fd != comm->fdin || !GST_OBJECT_PARENT (comm->element)) {
1725     if (comm->pollFDin.fd != -1) {
1726       GST_DEBUG_OBJECT (comm->element, "Stop watching fd %d",
1727           comm->pollFDin.fd);
1728       gst_poll_remove_fd (comm->poll, &comm->pollFDin);
1729       gst_poll_fd_init (&comm->pollFDin);
1730     }
1731     if (comm->fdin != -1 && GST_OBJECT_PARENT (comm->element)) {
1732       GST_DEBUG_OBJECT (comm->element, "Start watching fd %d", comm->fdin);
1733       comm->pollFDin.fd = comm->fdin;
1734       gst_poll_add_fd (comm->poll, &comm->pollFDin);
1735       gst_poll_fd_ctl_read (comm->poll, &comm->pollFDin, TRUE);
1736     }
1737   }
1738
1739   /* wait for activity on fdin or a flush */
1740   if (gst_poll_wait (comm->poll, 100 * GST_MSECOND) < 0) {
1741     if (errno == EAGAIN)
1742       goto again;
1743     /* error out, unless interrupted or flushing */
1744     if (errno != EINTR)
1745       ret = (errno == EBUSY) ? 2 : 1;
1746   }
1747
1748   /* read from fdin if possible and push data to our adapter */
1749   if (comm->pollFDin.fd >= 0
1750       && gst_poll_fd_can_read (comm->poll, &comm->pollFDin)) {
1751     if (!mem)
1752       mem = gst_allocator_alloc (NULL, comm->read_chunk_size, NULL);
1753
1754     gst_memory_map (mem, &map, GST_MAP_WRITE);
1755     sz = read (comm->pollFDin.fd, map.data, map.size);
1756     gst_memory_unmap (mem, &map);
1757
1758     if (sz <= 0) {
1759       if (errno == EAGAIN)
1760         goto again;
1761       /* error out, unless interrupted */
1762       if (errno != EINTR)
1763         ret = 1;
1764     } else {
1765       gst_memory_resize (mem, 0, sz);
1766       buf = gst_buffer_new ();
1767       gst_buffer_append_memory (buf, mem);
1768       mem = NULL;
1769       GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz);
1770       gst_adapter_push (comm->adapter, buf);
1771     }
1772   }
1773
1774   if (mem)
1775     gst_memory_unref (mem);
1776
1777   return ret;
1778 }
1779
1780 static gboolean
1781 read_many (GstIpcPipelineComm * comm)
1782 {
1783   gboolean ret = TRUE;
1784   gsize available;
1785   const guint8 *payload;
1786
1787   while (1)
1788     switch (comm->state) {
1789       case GST_IPC_PIPELINE_COMM_STATE_TYPE:
1790       {
1791         guint8 type;
1792         guint32 mapped_size;
1793
1794         available = gst_adapter_available (comm->adapter);
1795         mapped_size = 1 + sizeof (gint32) * 2;
1796         if (available < mapped_size)
1797           goto done;
1798
1799         payload = gst_adapter_map (comm->adapter, mapped_size);
1800         type = *payload++;
1801         g_mutex_lock (&comm->mutex);
1802         memcpy (&comm->id, payload, sizeof (guint32));
1803         memcpy (&comm->payload_length, payload + 4, sizeof (guint32));
1804         g_mutex_unlock (&comm->mutex);
1805         gst_adapter_unmap (comm->adapter);
1806         gst_adapter_flush (comm->adapter, mapped_size);
1807         GST_TRACE_OBJECT (comm->element, "Got id %u, type %d, payload %u",
1808             comm->id, type, comm->payload_length);
1809         switch (type) {
1810           case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
1811           case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
1812           case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
1813           case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
1814           case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
1815           case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
1816           case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
1817           case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
1818           case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
1819           case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
1820             GST_TRACE_OBJECT (comm->element, "switching to state %s",
1821                 gst_ipc_pipeline_comm_data_type_get_name (type));
1822             comm->state = type;
1823             break;
1824           default:
1825             goto out_of_sync;
1826         }
1827         break;
1828       }
1829       case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
1830       {
1831         const guint8 *rets;
1832         guint32 ret32;
1833
1834         available = gst_adapter_available (comm->adapter);
1835         if (available < comm->payload_length)
1836           goto done;
1837
1838         if (available < sizeof (guint32))
1839           goto ack_failed;
1840
1841         rets = gst_adapter_map (comm->adapter, sizeof (guint32));
1842         memcpy (&ret32, rets, sizeof (ret32));
1843         gst_adapter_unmap (comm->adapter);
1844         gst_adapter_flush (comm->adapter, sizeof (guint32));
1845         GST_TRACE_OBJECT (comm->element, "Got ACK %s for id %u",
1846             gst_flow_get_name (ret32), comm->id);
1847
1848         g_mutex_lock (&comm->mutex);
1849         gst_ipc_pipeline_comm_reply_request (comm, comm->id, ret32, NULL);
1850         g_mutex_unlock (&comm->mutex);
1851
1852         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1853         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1854         break;
1855       }
1856       case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
1857       {
1858         GstQuery *query = NULL;
1859         gboolean qret;
1860
1861         available = gst_adapter_available (comm->adapter);
1862         if (available < comm->payload_length)
1863           goto done;
1864
1865         qret =
1866             gst_ipc_pipeline_comm_read_query_result (comm, comm->payload_length,
1867             &query);
1868
1869         GST_TRACE_OBJECT (comm->element,
1870             "deserialized query result %p: %d, %" GST_PTR_FORMAT, query, qret,
1871             query);
1872
1873         g_mutex_lock (&comm->mutex);
1874         gst_ipc_pipeline_comm_reply_request (comm, comm->id, qret, query);
1875         g_mutex_unlock (&comm->mutex);
1876
1877         gst_query_unref (query);
1878
1879         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1880         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1881         break;
1882       }
1883       case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
1884       {
1885         GstBuffer *buf;
1886
1887         available = gst_adapter_available (comm->adapter);
1888         if (available < comm->payload_length)
1889           goto done;
1890
1891         buf = gst_ipc_pipeline_comm_read_buffer (comm, comm->payload_length);
1892         if (!buf)
1893           goto buffer_failed;
1894
1895         /* set caps and push */
1896         GST_TRACE_OBJECT (comm->element,
1897             "deserialized buffer %p, pushing, timestamp %" GST_TIME_FORMAT
1898             ", duration %" GST_TIME_FORMAT ", offset %" G_GINT64_FORMAT
1899             ", offset_end %" G_GINT64_FORMAT ", size %" G_GSIZE_FORMAT
1900             ", flags 0x%x", buf, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
1901             GST_TIME_ARGS (GST_BUFFER_DURATION (buf)), GST_BUFFER_OFFSET (buf),
1902             GST_BUFFER_OFFSET_END (buf), gst_buffer_get_size (buf),
1903             GST_BUFFER_FLAGS (buf));
1904
1905         gst_mini_object_set_qdata (GST_MINI_OBJECT (buf), QUARK_ID,
1906             GINT_TO_POINTER (comm->id), NULL);
1907
1908         if (comm->on_buffer)
1909           (*comm->on_buffer) (comm->id, buf, comm->user_data);
1910
1911         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1912         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1913         break;
1914       }
1915       case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
1916       {
1917         GstEvent *event;
1918         gboolean upstream;
1919
1920         available = gst_adapter_available (comm->adapter);
1921         if (available < comm->payload_length)
1922           goto done;
1923
1924         event = gst_ipc_pipeline_comm_read_event (comm, comm->payload_length,
1925             &upstream);
1926         if (!event)
1927           goto event_failed;
1928
1929         GST_TRACE_OBJECT (comm->element, "deserialized event %p of type %s",
1930             event, gst_event_type_get_name (event->type));
1931
1932         gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID,
1933             GINT_TO_POINTER (comm->id), NULL);
1934
1935         if (comm->on_event)
1936           (*comm->on_event) (comm->id, event, upstream, comm->user_data);
1937
1938         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1939         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1940         break;
1941       }
1942       case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
1943       {
1944         GstEvent *event;
1945
1946         available = gst_adapter_available (comm->adapter);
1947         if (available < comm->payload_length)
1948           goto done;
1949
1950         event = gst_ipc_pipeline_comm_read_sink_message_event (comm,
1951             comm->payload_length);
1952         if (!event)
1953           goto event_failed;
1954
1955         GST_TRACE_OBJECT (comm->element, "deserialized sink message event %p",
1956             event);
1957
1958         gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID,
1959             GINT_TO_POINTER (comm->id), NULL);
1960
1961         if (comm->on_event)
1962           (*comm->on_event) (comm->id, event, FALSE, comm->user_data);
1963
1964         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1965         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1966         break;
1967       }
1968       case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
1969       {
1970         GstQuery *query;
1971         gboolean upstream;
1972
1973         available = gst_adapter_available (comm->adapter);
1974         if (available < comm->payload_length)
1975           goto done;
1976
1977         query = gst_ipc_pipeline_comm_read_query (comm, comm->payload_length,
1978             &upstream);
1979         if (!query)
1980           goto query_failed;
1981
1982         GST_TRACE_OBJECT (comm->element, "deserialized query %p of type %s",
1983             query, gst_query_type_get_name (query->type));
1984
1985         gst_mini_object_set_qdata (GST_MINI_OBJECT (query), QUARK_ID,
1986             GINT_TO_POINTER (comm->id), NULL);
1987
1988         if (comm->on_query)
1989           (*comm->on_query) (comm->id, query, upstream, comm->user_data);
1990
1991         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1992         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1993         break;
1994       }
1995       case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
1996       {
1997         guint32 transition;
1998
1999         available = gst_adapter_available (comm->adapter);
2000         if (available < comm->payload_length)
2001           goto done;
2002
2003         if (!gst_ipc_pipeline_comm_read_state_change (comm,
2004                 comm->payload_length, &transition))
2005           goto state_change_failed;
2006
2007         GST_TRACE_OBJECT (comm->element,
2008             "deserialized state change request: %s -> %s",
2009             gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT
2010                 (transition)),
2011             gst_element_state_get_name (GST_STATE_TRANSITION_NEXT
2012                 (transition)));
2013
2014         if (comm->on_state_change)
2015           (*comm->on_state_change) (comm->id, transition, comm->user_data);
2016
2017         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2018         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2019         break;
2020       }
2021       case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
2022       {
2023         available = gst_adapter_available (comm->adapter);
2024         if (available < comm->payload_length)
2025           goto done;
2026
2027         if (!gst_ipc_pipeline_comm_read_state_lost (comm, comm->payload_length))
2028           goto event_failed;
2029
2030         GST_TRACE_OBJECT (comm->element, "deserialized state-lost");
2031
2032         if (comm->on_state_lost)
2033           (*comm->on_state_lost) (comm->user_data);
2034
2035         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2036         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2037         break;
2038       }
2039       case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
2040       {
2041         GstMessage *message;
2042
2043         available = gst_adapter_available (comm->adapter);
2044         if (available < comm->payload_length)
2045           goto done;
2046
2047         message = gst_ipc_pipeline_comm_read_message (comm,
2048             comm->payload_length);
2049         if (!message)
2050           goto message_failed;
2051
2052         GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s",
2053             message, gst_message_type_get_name (message->type));
2054
2055         if (comm->on_message)
2056           (*comm->on_message) (comm->id, message, comm->user_data);
2057
2058         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2059         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2060         break;
2061       }
2062       case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
2063       {
2064         GstMessage *message;
2065
2066         available = gst_adapter_available (comm->adapter);
2067         if (available < comm->payload_length)
2068           goto done;
2069
2070         message = gst_ipc_pipeline_comm_read_gerror_message (comm,
2071             comm->payload_length);
2072         if (!message)
2073           goto message_failed;
2074
2075         GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s",
2076             message, gst_message_type_get_name (message->type));
2077
2078         if (comm->on_message)
2079           (*comm->on_message) (comm->id, message, comm->user_data);
2080
2081         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2082         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2083         break;
2084       }
2085     }
2086
2087 done:
2088   return ret;
2089
2090   /* ERRORS */
2091 out_of_sync:
2092   {
2093     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2094         ("Socket out of sync"));
2095     ret = FALSE;
2096     goto done;
2097   }
2098 state_change_failed:
2099   {
2100     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2101         ("could not read state change from fd"));
2102     ret = FALSE;
2103     goto done;
2104   }
2105 ack_failed:
2106   {
2107     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2108         ("could not read ack from fd"));
2109     ret = FALSE;
2110     goto done;
2111   }
2112 buffer_failed:
2113   {
2114     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2115         ("could not read buffer from fd"));
2116     ret = FALSE;
2117     goto done;
2118   }
2119 event_failed:
2120   {
2121     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2122         ("could not read event from fd"));
2123     ret = FALSE;
2124     goto done;
2125   }
2126 message_failed:
2127   {
2128     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2129         ("could not read message from fd"));
2130     ret = FALSE;
2131     goto done;
2132   }
2133 query_failed:
2134   {
2135     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2136         ("could not read query from fd"));
2137     ret = FALSE;
2138     goto done;
2139   }
2140 }
2141
2142 static gpointer
2143 reader_thread (gpointer data)
2144 {
2145   GstIpcPipelineComm *comm = (GstIpcPipelineComm *) data;
2146   gboolean running = TRUE;
2147   gint ret = 0;
2148
2149   while (running) {
2150     ret = update_adapter (comm);
2151     switch (ret) {
2152       case 1:
2153         GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
2154             ("Failed to read from socket"));
2155         running = FALSE;
2156         break;
2157       case 2:
2158         GST_INFO_OBJECT (comm->element, "We're stopping, all good");
2159         running = FALSE;
2160         break;
2161       default:
2162         read_many (comm);
2163         break;
2164     }
2165   }
2166
2167   GST_INFO_OBJECT (comm->element, "Reader thread ending");
2168   return NULL;
2169 }
2170
2171 gboolean
2172 gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
2173     void (*on_buffer) (guint32, GstBuffer *, gpointer),
2174     void (*on_event) (guint32, GstEvent *, gboolean, gpointer),
2175     void (*on_query) (guint32, GstQuery *, gboolean, gpointer),
2176     void (*on_state_change) (guint32, GstStateChange, gpointer),
2177     void (*on_state_lost) (gpointer),
2178     void (*on_message) (guint32, GstMessage *, gpointer), gpointer user_data)
2179 {
2180   if (comm->reader_thread)
2181     return FALSE;
2182
2183   comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2184   comm->on_buffer = on_buffer;
2185   comm->on_event = on_event;
2186   comm->on_query = on_query;
2187   comm->on_state_change = on_state_change;
2188   comm->on_state_lost = on_state_lost;
2189   comm->on_message = on_message;
2190   comm->user_data = user_data;
2191   gst_poll_set_flushing (comm->poll, FALSE);
2192   comm->reader_thread =
2193       g_thread_new ("reader", (GThreadFunc) reader_thread, comm);
2194   return TRUE;
2195 }
2196
2197 void
2198 gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm)
2199 {
2200   if (!comm->reader_thread)
2201     return;
2202
2203   gst_poll_set_flushing (comm->poll, TRUE);
2204   g_thread_join (comm->reader_thread);
2205   comm->reader_thread = NULL;
2206 }
2207
2208 static gchar *
2209 gst_value_serialize_event (const GValue * value)
2210 {
2211   const GstStructure *structure;
2212   GstEvent *ev;
2213   gchar *type, *ts, *seqnum, *rt_offset, *str, *str64, *s;
2214   GValue val = G_VALUE_INIT;
2215
2216   ev = g_value_get_boxed (value);
2217
2218   g_value_init (&val, gst_event_type_get_type ());
2219   g_value_set_enum (&val, ev->type);
2220   type = gst_value_serialize (&val);
2221   g_value_unset (&val);
2222
2223   g_value_init (&val, G_TYPE_UINT64);
2224   g_value_set_uint64 (&val, ev->timestamp);
2225   ts = gst_value_serialize (&val);
2226   g_value_unset (&val);
2227
2228   g_value_init (&val, G_TYPE_UINT);
2229   g_value_set_uint (&val, ev->seqnum);
2230   seqnum = gst_value_serialize (&val);
2231   g_value_unset (&val);
2232
2233   g_value_init (&val, G_TYPE_INT64);
2234   g_value_set_int64 (&val, gst_event_get_running_time_offset (ev));
2235   rt_offset = gst_value_serialize (&val);
2236   g_value_unset (&val);
2237
2238   structure = gst_event_get_structure (ev);
2239   str = gst_structure_to_string (structure);
2240   str64 = g_base64_encode ((guchar *) str, strlen (str) + 1);
2241   g_strdelimit (str64, "=", '_');
2242   g_free (str);
2243
2244   s = g_strconcat (type, ":", ts, ":", seqnum, ":", rt_offset, ":", str64,
2245       NULL);
2246
2247   g_free (type);
2248   g_free (ts);
2249   g_free (seqnum);
2250   g_free (rt_offset);
2251   g_free (str64);
2252
2253   return s;
2254 }
2255
2256 static gboolean
2257 gst_value_deserialize_event (GValue * dest, const gchar * s)
2258 {
2259   GstEvent *ev = NULL;
2260   GValue val = G_VALUE_INIT;
2261   gboolean ret = FALSE;
2262   gchar **fields;
2263   gsize len;
2264
2265   fields = g_strsplit (s, ":", -1);
2266   if (g_strv_length (fields) != 5)
2267     goto wrong_length;
2268
2269   g_strdelimit (fields[4], "_", '=');
2270   g_base64_decode_inplace (fields[4], &len);
2271
2272   g_value_init (&val, gst_event_type_get_type ());
2273   if (!gst_value_deserialize (&val, fields[0]))
2274     goto fail;
2275   ev = gst_event_new_custom (g_value_get_enum (&val),
2276       gst_structure_new_from_string (fields[4]));
2277
2278   g_value_unset (&val);
2279   g_value_init (&val, G_TYPE_UINT64);
2280   if (!gst_value_deserialize (&val, fields[1]))
2281     goto fail;
2282   ev->timestamp = g_value_get_uint64 (&val);
2283
2284   g_value_unset (&val);
2285   g_value_init (&val, G_TYPE_UINT);
2286   if (!gst_value_deserialize (&val, fields[2]))
2287     goto fail;
2288   ev->seqnum = g_value_get_uint (&val);
2289
2290   g_value_unset (&val);
2291   g_value_init (&val, G_TYPE_INT64);
2292   if (!gst_value_deserialize (&val, fields[3]))
2293     goto fail;
2294   gst_event_set_running_time_offset (ev, g_value_get_int64 (&val));
2295
2296   g_value_take_boxed (dest, ev);
2297   ev = NULL;
2298   ret = TRUE;
2299
2300 fail:
2301   g_clear_pointer (&ev, gst_event_unref);
2302   g_value_unset (&val);
2303
2304 wrong_length:
2305   g_strfreev (fields);
2306   return ret;
2307 }
2308
2309 #define REGISTER_SERIALIZATION_NO_COMPARE(_gtype, _type)                \
2310 G_STMT_START {                                                          \
2311   static GstValueTable gst_value =                                      \
2312     { 0, NULL,                                             \
2313     gst_value_serialize_ ## _type, gst_value_deserialize_ ## _type };    \
2314   gst_value.type = _gtype;                                              \
2315   gst_value_register (&gst_value);                                      \
2316 } G_STMT_END
2317
2318 void
2319 gst_ipc_pipeline_comm_plugin_init (void)
2320 {
2321   static volatile gsize once = 0;
2322
2323   if (g_once_init_enter (&once)) {
2324     GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_comm_debug, "ipcpipelinecomm", 0,
2325         "ipc pipeline comm");
2326     QUARK_ID = g_quark_from_static_string ("ipcpipeline-id");
2327     REGISTER_SERIALIZATION_NO_COMPARE (gst_event_get_type (), event);
2328     g_once_init_leave (&once, (gsize) 1);
2329   }
2330 }