wasapi: try to satisfy both mingw and msvc
[platform/upstream/gstreamer.git] / sys / ipcpipeline / gstipcpipelinecomm.c
1 /* GStreamer
2  * Copyright (C) 2015-2017 YouView TV Ltd
3  *   Author: Vincent Penquerch <vincent.penquerch@collabora.co.uk>
4  *
5  * gstipcpipelinecomm.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #  include "config.h"
25 #endif
26
27 #include <unistd.h>
28 #include <errno.h>
29 #include <string.h>
30 #include <gst/base/gstbytewriter.h>
31 #include <gst/gstprotection.h>
32 #include "gstipcpipelinecomm.h"
33
34 GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_comm_debug);
35 #define GST_CAT_DEFAULT gst_ipc_pipeline_comm_debug
36
37 #define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
38
39 GQuark QUARK_ID;
40
41 typedef enum
42 {
43   ACK_TYPE_NONE,
44   ACK_TYPE_TIMED,
45   ACK_TYPE_BLOCKING
46 } AckType;
47
48 typedef enum
49 {
50   COMM_REQUEST_TYPE_BUFFER,
51   COMM_REQUEST_TYPE_EVENT,
52   COMM_REQUEST_TYPE_QUERY,
53   COMM_REQUEST_TYPE_STATE_CHANGE,
54   COMM_REQUEST_TYPE_MESSAGE,
55 } CommRequestType;
56
57 typedef struct
58 {
59   guint32 id;
60   gboolean replied;
61   gboolean comm_error;
62   guint32 ret;
63   GstQuery *query;
64   CommRequestType type;
65   GCond cond;
66 } CommRequest;
67
68 static const gchar *comm_request_ret_get_name (CommRequestType type,
69     guint32 ret);
70 static guint32 comm_request_ret_get_failure_value (CommRequestType type);
71
72 static CommRequest *
73 comm_request_new (guint32 id, CommRequestType type, GstQuery * query)
74 {
75   CommRequest *req;
76
77   req = g_malloc (sizeof (CommRequest));
78   req->id = id;
79   g_cond_init (&req->cond);
80   req->replied = FALSE;
81   req->comm_error = FALSE;
82   req->query = query;
83   req->ret = comm_request_ret_get_failure_value (type);
84   req->type = type;
85
86   return req;
87 }
88
89 static guint32
90 comm_request_wait (GstIpcPipelineComm * comm, CommRequest * req,
91     AckType ack_type)
92 {
93   guint32 ret = comm_request_ret_get_failure_value (req->type);
94   guint64 end_time;
95
96   if (ack_type == ACK_TYPE_TIMED)
97     end_time = g_get_monotonic_time () + comm->ack_time;
98   else
99     end_time = G_MAXUINT64;
100
101   GST_TRACE_OBJECT (comm->element, "Waiting for ACK/NAK for request %u",
102       req->id);
103   while (!req->replied) {
104     if (ack_type == ACK_TYPE_TIMED) {
105       if (!g_cond_wait_until (&req->cond, &comm->mutex, end_time))
106         break;
107     } else
108       g_cond_wait (&req->cond, &comm->mutex);
109   }
110
111   if (req->replied) {
112     ret = req->ret;
113     GST_TRACE_OBJECT (comm->element, "Got reply for request %u: %d (%s)",
114         req->id, ret, comm_request_ret_get_name (req->type, ret));
115   } else {
116     req->comm_error = TRUE;
117     GST_ERROR_OBJECT (comm->element, "Timeout waiting for reply for request %u",
118         req->id);
119   }
120
121   return ret;
122 }
123
124 static void
125 comm_request_free (CommRequest * req)
126 {
127   g_cond_clear (&req->cond);
128   g_free (req);
129 }
130
131 static const gchar *
132 comm_request_ret_get_name (CommRequestType type, guint32 ret)
133 {
134   switch (type) {
135     case COMM_REQUEST_TYPE_BUFFER:
136       return gst_flow_get_name (ret);
137     case COMM_REQUEST_TYPE_EVENT:
138     case COMM_REQUEST_TYPE_QUERY:
139     case COMM_REQUEST_TYPE_MESSAGE:
140       return ret ? "TRUE" : "FALSE";
141     case COMM_REQUEST_TYPE_STATE_CHANGE:
142       return gst_element_state_change_return_get_name (ret);
143     default:
144       g_assert_not_reached ();
145   }
146 }
147
148 static guint32
149 comm_request_ret_get_failure_value (CommRequestType type)
150 {
151   switch (type) {
152     case COMM_REQUEST_TYPE_BUFFER:
153       return GST_FLOW_COMM_ERROR;
154     case COMM_REQUEST_TYPE_EVENT:
155     case COMM_REQUEST_TYPE_MESSAGE:
156     case COMM_REQUEST_TYPE_QUERY:
157       return FALSE;
158     case COMM_REQUEST_TYPE_STATE_CHANGE:
159       return GST_STATE_CHANGE_FAILURE;
160     default:
161       g_assert_not_reached ();
162   }
163 }
164
165 static const gchar *
166 gst_ipc_pipeline_comm_data_type_get_name (GstIpcPipelineCommDataType type)
167 {
168   switch (type) {
169     case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
170       return "ACK";
171     case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
172       return "QUERY_RESULT";
173     case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
174       return "BUFFER";
175     case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
176       return "EVENT";
177     case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
178       return "SINK_MESSAGE_EVENT";
179     case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
180       return "QUERY";
181     case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
182       return "STATE_CHANGE";
183     case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
184       return "STATE_LOST";
185     case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
186       return "MESSAGE";
187     case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
188       return "GERROR_MESSAGE";
189     default:
190       return "UNKNOWN";
191   }
192 }
193
194 static gboolean
195 gst_ipc_pipeline_comm_sync_fd (GstIpcPipelineComm * comm, guint32 id,
196     GstQuery * query, guint32 * ret, AckType ack_type, CommRequestType type)
197 {
198   CommRequest *req;
199   gboolean comm_error;
200   GHashTable *waiting_ids;
201
202   if (ack_type == ACK_TYPE_NONE)
203     return TRUE;
204
205   req = comm_request_new (id, type, query);
206   waiting_ids = g_hash_table_ref (comm->waiting_ids);
207   g_hash_table_insert (waiting_ids, GINT_TO_POINTER (id), req);
208   *ret = comm_request_wait (comm, req, ack_type);
209   comm_error = req->comm_error;
210   g_hash_table_remove (waiting_ids, GINT_TO_POINTER (id));
211   g_hash_table_unref (waiting_ids);
212   return !comm_error;
213 }
214
215 static gboolean
216 write_to_fd_raw (GstIpcPipelineComm * comm, const void *data, size_t size)
217 {
218   size_t offset;
219   gboolean ret = TRUE;
220
221   offset = 0;
222   GST_TRACE_OBJECT (comm->element, "Writing %zu bytes to fdout", size);
223   while (size) {
224     ssize_t written =
225         write (comm->fdout, (const unsigned char *) data + offset, size);
226     if (written < 0) {
227       if (errno == EAGAIN || errno == EINTR)
228         continue;
229       GST_ERROR_OBJECT (comm->element, "Failed to write to fd: %s",
230           strerror (errno));
231       ret = FALSE;
232       goto done;
233     }
234     size -= written;
235     offset += written;
236   }
237
238 done:
239   return ret;
240 }
241
242 static gboolean
243 write_byte_writer_to_fd (GstIpcPipelineComm * comm, GstByteWriter * bw)
244 {
245   guint8 *data;
246   gboolean ret;
247   guint size;
248
249   size = gst_byte_writer_get_size (bw);
250   data = gst_byte_writer_reset_and_get_data (bw);
251   if (!data)
252     return FALSE;
253   ret = write_to_fd_raw (comm, data, size);
254   g_free (data);
255   return ret;
256 }
257
258 static void
259 gst_ipc_pipeline_comm_write_ack_to_fd (GstIpcPipelineComm * comm, guint32 id,
260     guint32 ret, CommRequestType type)
261 {
262   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK;
263   guint32 size;
264   GstByteWriter bw;
265
266   g_mutex_lock (&comm->mutex);
267
268   GST_TRACE_OBJECT (comm->element, "Writing ACK for %u: %s (%d)", id,
269       comm_request_ret_get_name (type, ret), ret);
270   gst_byte_writer_init (&bw);
271   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
272     goto write_failed;
273   if (!gst_byte_writer_put_uint32_le (&bw, id))
274     goto write_failed;
275   size = sizeof (ret);
276   if (!gst_byte_writer_put_uint32_le (&bw, size))
277     goto write_failed;
278   if (!gst_byte_writer_put_uint32_le (&bw, ret))
279     goto write_failed;
280
281   if (!write_byte_writer_to_fd (comm, &bw))
282     goto write_failed;
283
284 done:
285   g_mutex_unlock (&comm->mutex);
286   gst_byte_writer_reset (&bw);
287   return;
288
289 write_failed:
290   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
291       ("Failed to write to socket"));
292   goto done;
293 }
294
295 void
296 gst_ipc_pipeline_comm_write_flow_ack_to_fd (GstIpcPipelineComm * comm,
297     guint32 id, GstFlowReturn ret)
298 {
299   gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
300       COMM_REQUEST_TYPE_BUFFER);
301 }
302
303 void
304 gst_ipc_pipeline_comm_write_boolean_ack_to_fd (GstIpcPipelineComm * comm,
305     guint32 id, gboolean ret)
306 {
307   gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
308       COMM_REQUEST_TYPE_EVENT);
309 }
310
311 void
312 gst_ipc_pipeline_comm_write_state_change_ack_to_fd (GstIpcPipelineComm * comm,
313     guint32 id, GstStateChangeReturn ret)
314 {
315   gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret,
316       COMM_REQUEST_TYPE_STATE_CHANGE);
317 }
318
319 void
320 gst_ipc_pipeline_comm_write_query_result_to_fd (GstIpcPipelineComm * comm,
321     guint32 id, gboolean result, GstQuery * query)
322 {
323   const unsigned char payload_type =
324       GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT;
325   guint8 result8 = result;
326   guint32 size;
327   size_t len;
328   char *str = NULL;
329   guint32 type;
330   const GstStructure *structure;
331   GstByteWriter bw;
332
333   g_mutex_lock (&comm->mutex);
334
335   GST_TRACE_OBJECT (comm->element,
336       "Writing query result for %u: %d, %" GST_PTR_FORMAT, id, result, query);
337   gst_byte_writer_init (&bw);
338   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
339     goto write_failed;
340   if (!gst_byte_writer_put_uint32_le (&bw, id))
341     goto write_failed;
342   structure = gst_query_get_structure (query);
343   if (structure) {
344     str = gst_structure_to_string (structure);
345     len = strlen (str);
346   } else {
347     str = NULL;
348     len = 0;
349   }
350   size = 1 + sizeof (guint32) + len + 1;
351   if (!gst_byte_writer_put_uint32_le (&bw, size))
352     goto write_failed;
353   if (!gst_byte_writer_put_uint8 (&bw, result8))
354     goto write_failed;
355   type = GST_QUERY_TYPE (query);
356   if (!gst_byte_writer_put_uint32_le (&bw, type))
357     goto write_failed;
358   if (str) {
359     if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, len + 1))
360       goto write_failed;
361   } else {
362     if (!gst_byte_writer_put_uint8 (&bw, 0))
363       goto write_failed;
364   }
365
366   if (!write_byte_writer_to_fd (comm, &bw))
367     goto write_failed;
368
369 done:
370   g_mutex_unlock (&comm->mutex);
371   gst_byte_writer_reset (&bw);
372   g_free (str);
373   return;
374
375 write_failed:
376   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
377       ("Failed to write to socket"));
378   goto done;
379 }
380
381 static gboolean
382 gst_ipc_pipeline_comm_read_query_result (GstIpcPipelineComm * comm,
383     guint32 size, GstQuery ** query)
384 {
385   gchar *end = NULL;
386   GstStructure *structure;
387   guint8 result;
388   guint32 type;
389   const guint8 *payload = NULL;
390   guint32 mapped_size = size;
391
392   /* this should not be called if we don't have enough yet */
393   *query = NULL;
394   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE);
395   g_return_val_if_fail (size >= 1 + sizeof (guint32), FALSE);
396
397   payload = gst_adapter_map (comm->adapter, mapped_size);
398   if (!payload)
399     return FALSE;
400   result = *payload++;
401   memcpy (&type, payload, sizeof (type));
402   payload += sizeof (type);
403
404   size -= 1 + sizeof (guint32);
405   if (size == 0)
406     goto done;
407
408   if (payload[size - 1]) {
409     result = FALSE;
410     goto done;
411   }
412   if (*payload) {
413     structure = gst_structure_from_string ((const char *) payload, &end);
414   } else {
415     structure = NULL;
416   }
417   if (!structure) {
418     result = FALSE;
419     goto done;
420   }
421
422   *query = gst_query_new_custom (type, structure);
423
424 done:
425   gst_adapter_unmap (comm->adapter);
426   gst_adapter_flush (comm->adapter, mapped_size);
427   return result;
428 }
429
430 typedef struct
431 {
432   guint32 bytes;
433
434   guint64 size;
435   guint32 flags;
436   guint64 api;
437   char *str;
438 } MetaBuildInfo;
439
440 typedef struct
441 {
442   GstIpcPipelineComm *comm;
443   guint32 n_meta;
444   guint32 total_bytes;
445   MetaBuildInfo *info;
446 } MetaListRepresentation;
447
448 static gboolean
449 build_meta (GstBuffer * buffer, GstMeta ** meta, gpointer user_data)
450 {
451   MetaListRepresentation *repr = user_data;
452
453   repr->n_meta++;
454   repr->info = g_realloc (repr->info, repr->n_meta * sizeof (MetaBuildInfo));
455   repr->info[repr->n_meta - 1].bytes =
456       /* 4 byte bytes */
457       4
458       /* 4 byte GstMetaFlags */
459       + 4
460       /* GstMetaInfo::api */
461       + 4 + strlen (g_type_name ((*meta)->info->api)) + 1
462       /* GstMetaInfo::size */
463       + 8
464       /* str length */
465       + 4;
466
467   repr->info[repr->n_meta - 1].flags = (*meta)->flags;
468   repr->info[repr->n_meta - 1].api = (*meta)->info->api;
469   repr->info[repr->n_meta - 1].size = (*meta)->info->size;
470   repr->info[repr->n_meta - 1].str = NULL;
471
472   /* GstMeta is a base class, and actual useful classes are all different...
473      So we list a few of them we know we want and ignore the open ended rest */
474   if ((*meta)->info->api == GST_PROTECTION_META_API_TYPE) {
475     GstProtectionMeta *m = (GstProtectionMeta *) * meta;
476     repr->info[repr->n_meta - 1].str = gst_structure_to_string (m->info);
477     repr->info[repr->n_meta - 1].bytes +=
478         strlen (repr->info[repr->n_meta - 1].str) + 1;
479     GST_TRACE_OBJECT (repr->comm->element, "Found GstMeta type %s: %s",
480         g_type_name ((*meta)->info->api), repr->info[repr->n_meta - 1].str);
481   } else {
482     GST_WARNING_OBJECT (repr->comm->element, "Ignoring GstMeta type %s",
483         g_type_name ((*meta)->info->api));
484   }
485   repr->total_bytes += repr->info[repr->n_meta - 1].bytes;
486   return TRUE;
487 }
488
489 typedef struct
490 {
491   guint64 pts;
492   guint64 dts;
493   guint64 duration;
494   guint64 offset;
495   guint64 offset_end;
496   guint64 flags;
497 } CommBufferMetadata;
498
499 GstFlowReturn
500 gst_ipc_pipeline_comm_write_buffer_to_fd (GstIpcPipelineComm * comm,
501     GstBuffer * buffer)
502 {
503   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER;
504   GstMapInfo map;
505   guint32 ret32 = GST_FLOW_OK;
506   guint32 size, n;
507   CommBufferMetadata meta;
508   GstFlowReturn ret;
509   MetaListRepresentation repr = { comm, 0, 4, NULL };   /* starts a 4 for n_meta */
510   GstByteWriter bw;
511
512   g_mutex_lock (&comm->mutex);
513   ++comm->send_id;
514
515   GST_TRACE_OBJECT (comm->element, "Writing buffer %u: %" GST_PTR_FORMAT,
516       comm->send_id, buffer);
517
518   gst_byte_writer_init (&bw);
519
520   meta.pts = GST_BUFFER_PTS (buffer);
521   meta.dts = GST_BUFFER_DTS (buffer);
522   meta.duration = GST_BUFFER_DURATION (buffer);
523   meta.offset = GST_BUFFER_OFFSET (buffer);
524   meta.offset_end = GST_BUFFER_OFFSET_END (buffer);
525   meta.flags = GST_BUFFER_FLAGS (buffer);
526
527   /* work out meta size */
528   gst_buffer_foreach_meta (buffer, build_meta, &repr);
529
530   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
531     goto write_failed;
532   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
533     goto write_failed;
534   size =
535       gst_buffer_get_size (buffer) + sizeof (guint32) +
536       sizeof (CommBufferMetadata) + repr.total_bytes;
537   if (!gst_byte_writer_put_uint32_le (&bw, size))
538     goto write_failed;
539   if (!gst_byte_writer_put_data (&bw, (const guint8 *) &meta, sizeof (meta)))
540     goto write_failed;
541   size = gst_buffer_get_size (buffer);
542   if (!gst_byte_writer_put_uint32_le (&bw, size))
543     goto write_failed;
544   if (!write_byte_writer_to_fd (comm, &bw))
545     goto write_failed;
546
547   if (!gst_buffer_map (buffer, &map, GST_MAP_READ))
548     goto map_failed;
549   ret = write_to_fd_raw (comm, map.data, map.size);
550   gst_buffer_unmap (buffer, &map);
551   if (!ret)
552     goto write_failed;
553
554   /* meta */
555   gst_byte_writer_init (&bw);
556   if (!gst_byte_writer_put_uint32_le (&bw, repr.n_meta))
557     goto write_failed;
558   for (n = 0; n < repr.n_meta; ++n) {
559     const MetaBuildInfo *info = repr.info + n;
560     guint32 len;
561     const char *s;
562
563     if (!gst_byte_writer_put_uint32_le (&bw, info->bytes))
564       goto write_failed;
565
566     if (!gst_byte_writer_put_uint32_le (&bw, info->flags))
567       goto write_failed;
568
569     s = g_type_name (info->api);
570     len = strlen (s) + 1;
571     if (!gst_byte_writer_put_uint32_le (&bw, len))
572       goto write_failed;
573     if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len))
574       goto write_failed;
575
576     if (!gst_byte_writer_put_uint64_le (&bw, info->size))
577       goto write_failed;
578
579     s = info->str;
580     len = s ? (strlen (s) + 1) : 0;
581     if (!gst_byte_writer_put_uint32_le (&bw, len))
582       goto write_failed;
583     if (len)
584       if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len))
585         goto write_failed;
586   }
587
588   if (!write_byte_writer_to_fd (comm, &bw))
589     goto write_failed;
590
591   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
592           ACK_TYPE_BLOCKING, COMM_REQUEST_TYPE_BUFFER))
593     goto wait_failed;
594   ret = ret32;
595
596 done:
597   g_mutex_unlock (&comm->mutex);
598   gst_byte_writer_reset (&bw);
599   for (n = 0; n < repr.n_meta; ++n)
600     g_free (repr.info[n].str);
601   g_free (repr.info);
602   return ret;
603
604 write_failed:
605   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
606       ("Failed to write to socket"));
607   ret = GST_FLOW_COMM_ERROR;
608   goto done;
609
610 wait_failed:
611   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
612       ("Failed to wait for reply on socket"));
613   ret = GST_FLOW_COMM_ERROR;
614   goto done;
615
616 map_failed:
617   GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
618       ("Failed to map buffer"));
619   ret = GST_FLOW_ERROR;
620   goto done;
621 }
622
623 static GstBuffer *
624 gst_ipc_pipeline_comm_read_buffer (GstIpcPipelineComm * comm, guint32 size)
625 {
626   GstBuffer *buffer;
627   CommBufferMetadata meta;
628   guint32 n_meta, n;
629   const guint8 *payload = NULL;
630   guint32 mapped_size, buffer_data_size;
631
632   /* this should not be called if we don't have enough yet */
633   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
634   g_return_val_if_fail (size >= sizeof (CommBufferMetadata), NULL);
635
636   mapped_size = sizeof (CommBufferMetadata) + sizeof (buffer_data_size);
637   payload = gst_adapter_map (comm->adapter, mapped_size);
638   if (!payload)
639     return NULL;
640   memcpy (&meta, payload, sizeof (CommBufferMetadata));
641   payload += sizeof (CommBufferMetadata);
642   memcpy (&buffer_data_size, payload, sizeof (buffer_data_size));
643   size -= mapped_size;
644   gst_adapter_unmap (comm->adapter);
645   gst_adapter_flush (comm->adapter, mapped_size);
646
647   if (buffer_data_size == 0) {
648     buffer = gst_buffer_new ();
649   } else {
650     buffer = gst_adapter_get_buffer (comm->adapter, buffer_data_size);
651     gst_adapter_flush (comm->adapter, buffer_data_size);
652   }
653   size -= buffer_data_size;
654
655   GST_BUFFER_PTS (buffer) = meta.pts;
656   GST_BUFFER_DTS (buffer) = meta.dts;
657   GST_BUFFER_DURATION (buffer) = meta.duration;
658   GST_BUFFER_OFFSET (buffer) = meta.offset;
659   GST_BUFFER_OFFSET_END (buffer) = meta.offset_end;
660   GST_BUFFER_FLAGS (buffer) = meta.flags;
661
662   /* If you don't call that, the GType isn't yet known at the
663      g_type_from_name below */
664   gst_protection_meta_get_info ();
665
666   mapped_size = size;
667   payload = gst_adapter_map (comm->adapter, mapped_size);
668   if (!payload) {
669     gst_buffer_unref (buffer);
670     return NULL;
671   }
672   memcpy (&n_meta, payload, sizeof (n_meta));
673   payload += sizeof (n_meta);
674
675   for (n = 0; n < n_meta; ++n) {
676     guint32 flags, len, bytes;
677     guint64 msize;
678     GType api;
679     GstMeta *meta;
680     GstStructure *structure = NULL;
681
682     memcpy (&bytes, payload, sizeof (bytes));
683     payload += sizeof (bytes);
684
685 #define READ_FIELD(f) do { \
686     memcpy (&f, payload, sizeof (f)); \
687     payload += sizeof(f); \
688     } while(0)
689
690     READ_FIELD (flags);
691     READ_FIELD (len);
692     api = g_type_from_name ((const char *) payload);
693     payload = (const guint8 *) strchr ((const char *) payload, 0) + 1;
694     READ_FIELD (msize);
695     READ_FIELD (len);
696     if (len) {
697       structure = gst_structure_new_from_string ((const char *) payload);
698       payload += len + 1;
699     }
700
701     /* Seems we can add a meta from the api nor type ? */
702     if (api == GST_PROTECTION_META_API_TYPE) {
703       meta =
704           gst_buffer_add_meta (buffer, gst_protection_meta_get_info (), NULL);
705       ((GstProtectionMeta *) meta)->info = structure;
706     } else {
707       GST_WARNING_OBJECT (comm->element, "Unsupported meta: %s",
708           g_type_name (api));
709       if (structure)
710         gst_structure_free (structure);
711     }
712
713 #undef READ_FIELD
714
715   }
716
717   gst_adapter_unmap (comm->adapter);
718   gst_adapter_flush (comm->adapter, mapped_size);
719
720   return buffer;
721 }
722
723 static gboolean
724 gst_ipc_pipeline_comm_write_sink_message_event_to_fd (GstIpcPipelineComm * comm,
725     GstEvent * event)
726 {
727   const unsigned char payload_type =
728       GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT;
729   gboolean ret;
730   guint32 type, size, eseqnum, mseqnum, ret32 = TRUE, slen, structure_slen;
731   char *str = NULL;
732   const GstStructure *structure;
733   GstMessage *message = NULL;
734   const char *name;
735   GstByteWriter bw;
736
737   g_return_val_if_fail (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE,
738       FALSE);
739
740   g_mutex_lock (&comm->mutex);
741   ++comm->send_id;
742
743   GST_TRACE_OBJECT (comm->element,
744       "Writing sink message event %u: %" GST_PTR_FORMAT, comm->send_id, event);
745
746   gst_byte_writer_init (&bw);
747   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
748     goto write_failed;
749   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
750     goto write_failed;
751   name = gst_structure_get_name (gst_event_get_structure (event));
752   slen = strlen (name) + 1;
753   gst_event_parse_sink_message (event, &message);
754   structure = gst_message_get_structure (message);
755   if (structure) {
756     str = gst_structure_to_string (structure);
757     structure_slen = strlen (str);
758   } else {
759     str = NULL;
760     structure_slen = 0;
761   }
762   size = sizeof (type) + sizeof (eseqnum) + sizeof (mseqnum) + sizeof (slen) +
763       strlen (name) + 1 + structure_slen + 1;
764   if (!gst_byte_writer_put_uint32_le (&bw, size))
765     goto write_failed;
766
767   type = GST_MESSAGE_TYPE (message);
768   if (!gst_byte_writer_put_uint32_le (&bw, type))
769     goto write_failed;
770   size -= sizeof (type);
771
772   eseqnum = GST_EVENT_SEQNUM (event);
773   if (!gst_byte_writer_put_uint32_le (&bw, eseqnum))
774     goto write_failed;
775   size -= sizeof (eseqnum);
776
777   mseqnum = GST_MESSAGE_SEQNUM (message);
778   if (!gst_byte_writer_put_uint32_le (&bw, mseqnum))
779     goto write_failed;
780   size -= sizeof (mseqnum);
781
782   if (!gst_byte_writer_put_uint32_le (&bw, slen))
783     goto write_failed;
784   size -= sizeof (slen);
785
786   if (!gst_byte_writer_put_data (&bw, (const guint8 *) name, slen))
787     goto write_failed;
788   size -= slen;
789
790   if (str) {
791     if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size))
792       goto write_failed;
793   } else {
794     if (!gst_byte_writer_put_uint8 (&bw, 0))
795       goto write_failed;
796   }
797
798   if (!write_byte_writer_to_fd (comm, &bw))
799     goto write_failed;
800
801   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
802           GST_EVENT_IS_SERIALIZED (event) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED,
803           COMM_REQUEST_TYPE_EVENT))
804     goto write_failed;
805
806   ret = ret32;
807
808 done:
809   g_mutex_unlock (&comm->mutex);
810   gst_byte_writer_reset (&bw);
811   g_free (str);
812   if (message)
813     gst_message_unref (message);
814   return ret;
815
816 write_failed:
817   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
818       ("Failed to write to socket"));
819   ret = FALSE;
820   goto done;
821 }
822
823 static GstEvent *
824 gst_ipc_pipeline_comm_read_sink_message_event (GstIpcPipelineComm * comm,
825     guint32 size)
826 {
827   GstMessage *message;
828   GstEvent *event = NULL;
829   gchar *end = NULL;
830   GstStructure *structure;
831   guint32 type, eseqnum, mseqnum, slen;
832   const char *name;
833   guint32 mapped_size = size;
834   const guint8 *payload;
835
836   /* this should not be called if we don't have enough yet */
837   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
838   g_return_val_if_fail (size >= sizeof (type) + sizeof (slen), NULL);
839
840   payload = gst_adapter_map (comm->adapter, mapped_size);
841   if (!payload)
842     return NULL;
843   memcpy (&type, payload, sizeof (type));
844   payload += sizeof (type);
845   size -= sizeof (type);
846   if (size == 0)
847     goto done;
848
849   memcpy (&eseqnum, payload, sizeof (eseqnum));
850   payload += sizeof (eseqnum);
851   size -= sizeof (eseqnum);
852   if (size == 0)
853     goto done;
854
855   memcpy (&mseqnum, payload, sizeof (mseqnum));
856   payload += sizeof (mseqnum);
857   size -= sizeof (mseqnum);
858   if (size == 0)
859     goto done;
860
861   memcpy (&slen, payload, sizeof (slen));
862   payload += sizeof (slen);
863   size -= sizeof (slen);
864   if (size == 0)
865     goto done;
866
867   if (payload[slen - 1])
868     goto done;
869   name = (const char *) payload;
870   payload += slen;
871   size -= slen;
872
873   if ((payload)[size - 1]) {
874     goto done;
875   }
876   if (*payload) {
877     structure = gst_structure_from_string ((const char *) payload, &end);
878   } else {
879     structure = NULL;
880   }
881
882   message =
883       gst_message_new_custom (type, GST_OBJECT (comm->element), structure);
884   gst_message_set_seqnum (message, mseqnum);
885   event = gst_event_new_sink_message (name, message);
886   gst_event_set_seqnum (event, eseqnum);
887   gst_message_unref (message);
888
889 done:
890   gst_adapter_unmap (comm->adapter);
891   gst_adapter_flush (comm->adapter, mapped_size);
892   return event;
893 }
894
895 gboolean
896 gst_ipc_pipeline_comm_write_event_to_fd (GstIpcPipelineComm * comm,
897     gboolean upstream, GstEvent * event)
898 {
899   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT;
900   gboolean ret;
901   guint32 type, size, ret32 = TRUE, seqnum, slen;
902   char *str = NULL;
903   const GstStructure *structure;
904   GstByteWriter bw;
905
906   /* we special case sink-message event as gst can't serialize/de-serialize it */
907   if (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE)
908     return gst_ipc_pipeline_comm_write_sink_message_event_to_fd (comm, event);
909
910   g_mutex_lock (&comm->mutex);
911   ++comm->send_id;
912
913   GST_TRACE_OBJECT (comm->element, "Writing event %u: %" GST_PTR_FORMAT,
914       comm->send_id, event);
915
916   gst_byte_writer_init (&bw);
917   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
918     goto write_failed;
919   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
920     goto write_failed;
921   structure = gst_event_get_structure (event);
922   if (structure) {
923
924     if (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START) {
925       GstStructure *s = gst_structure_copy (structure);
926       gst_structure_remove_field (s, "stream");
927       str = gst_structure_to_string (s);
928       gst_structure_free (s);
929     } else {
930       str = gst_structure_to_string (structure);
931     }
932
933     slen = strlen (str);
934   } else {
935     str = NULL;
936     slen = 0;
937   }
938   size = sizeof (type) + sizeof (seqnum) + 1 + slen + 1;
939   if (!gst_byte_writer_put_uint32_le (&bw, size))
940     goto write_failed;
941
942   type = GST_EVENT_TYPE (event);
943   if (!gst_byte_writer_put_uint32_le (&bw, type))
944     goto write_failed;
945
946   seqnum = GST_EVENT_SEQNUM (event);
947   if (!gst_byte_writer_put_uint32_le (&bw, seqnum))
948     goto write_failed;
949
950   if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0))
951     goto write_failed;
952
953   if (str) {
954     if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1))
955       goto write_failed;
956   } else {
957     if (!gst_byte_writer_put_uint8 (&bw, 0))
958       goto write_failed;
959   }
960
961   if (!write_byte_writer_to_fd (comm, &bw))
962     goto write_failed;
963
964   /* Upstream events get serialized, this is required to send seeks only
965    * one at a time. */
966   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
967           (GST_EVENT_IS_SERIALIZED (event) || GST_EVENT_IS_UPSTREAM (event)) ?
968           ACK_TYPE_BLOCKING : ACK_TYPE_NONE, COMM_REQUEST_TYPE_EVENT))
969     goto write_failed;
970   ret = ret32;
971
972 done:
973   g_mutex_unlock (&comm->mutex);
974   g_free (str);
975   gst_byte_writer_reset (&bw);
976   return ret;
977
978 write_failed:
979   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
980       ("Failed to write to socket"));
981   ret = FALSE;
982   goto done;
983 }
984
985 static GstEvent *
986 gst_ipc_pipeline_comm_read_event (GstIpcPipelineComm * comm, guint32 size,
987     gboolean * upstream)
988 {
989   GstEvent *event = NULL;
990   gchar *end = NULL;
991   GstStructure *structure;
992   guint32 type, seqnum;
993   guint32 mapped_size = size;
994   const guint8 *payload;
995
996   /* this should not be called if we don't have enough yet */
997   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
998   g_return_val_if_fail (size >= sizeof (type), NULL);
999
1000   payload = gst_adapter_map (comm->adapter, mapped_size);
1001   if (!payload)
1002     return NULL;
1003
1004   memcpy (&type, payload, sizeof (type));
1005   payload += sizeof (type);
1006   size -= sizeof (type);
1007   if (size == 0)
1008     goto done;
1009
1010   memcpy (&seqnum, payload, sizeof (seqnum));
1011   payload += sizeof (seqnum);
1012   size -= sizeof (seqnum);
1013   if (size == 0)
1014     goto done;
1015
1016   *upstream = (*payload) ? TRUE : FALSE;
1017   payload += 1;
1018   size -= 1;
1019   if (size == 0)
1020     goto done;
1021
1022   if (payload[size - 1])
1023     goto done;
1024   if (*payload) {
1025     structure = gst_structure_from_string ((const char *) payload, &end);
1026   } else {
1027     structure = NULL;
1028   }
1029
1030   event = gst_event_new_custom (type, structure);
1031   gst_event_set_seqnum (event, seqnum);
1032
1033 done:
1034   gst_adapter_unmap (comm->adapter);
1035   gst_adapter_flush (comm->adapter, mapped_size);
1036   return event;
1037 }
1038
1039 gboolean
1040 gst_ipc_pipeline_comm_write_query_to_fd (GstIpcPipelineComm * comm,
1041     gboolean upstream, GstQuery * query)
1042 {
1043   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY;
1044   gboolean ret;
1045   guint32 type, size, ret32 = TRUE, slen;
1046   char *str = NULL;
1047   const GstStructure *structure;
1048   GstByteWriter bw;
1049
1050   g_mutex_lock (&comm->mutex);
1051   ++comm->send_id;
1052
1053   GST_TRACE_OBJECT (comm->element, "Writing query %u: %" GST_PTR_FORMAT,
1054       comm->send_id, query);
1055
1056   gst_byte_writer_init (&bw);
1057   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1058     goto write_failed;
1059   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1060     goto write_failed;
1061   structure = gst_query_get_structure (query);
1062   if (structure) {
1063     str = gst_structure_to_string (structure);
1064     slen = strlen (str);
1065   } else {
1066     str = NULL;
1067     slen = 0;
1068   }
1069   size = sizeof (type) + 1 + slen + 1;
1070   if (!gst_byte_writer_put_uint32_le (&bw, size))
1071     goto write_failed;
1072
1073   type = GST_QUERY_TYPE (query);
1074   if (!gst_byte_writer_put_uint32_le (&bw, type))
1075     goto write_failed;
1076
1077   if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0))
1078     goto write_failed;
1079
1080   if (str) {
1081     if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1))
1082       goto write_failed;
1083   } else {
1084     if (!gst_byte_writer_put_uint8 (&bw, 0))
1085       goto write_failed;
1086   }
1087
1088   if (!write_byte_writer_to_fd (comm, &bw))
1089     goto write_failed;
1090
1091   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, query, &ret32,
1092           GST_QUERY_IS_SERIALIZED (query) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED,
1093           COMM_REQUEST_TYPE_QUERY))
1094     goto write_failed;
1095
1096   ret = ret32;
1097
1098 done:
1099   g_mutex_unlock (&comm->mutex);
1100   g_free (str);
1101   gst_byte_writer_reset (&bw);
1102   return ret;
1103
1104 write_failed:
1105   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1106       ("Failed to write to socket"));
1107   ret = FALSE;
1108   goto done;
1109 }
1110
1111 static GstQuery *
1112 gst_ipc_pipeline_comm_read_query (GstIpcPipelineComm * comm, guint32 size,
1113     gboolean * upstream)
1114 {
1115   GstQuery *query = NULL;
1116   gchar *end = NULL;
1117   GstStructure *structure;
1118   guint32 type;
1119   guint32 mapped_size = size;
1120   const guint8 *payload;
1121
1122   /* this should not be called if we don't have enough yet */
1123   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1124   g_return_val_if_fail (size >= sizeof (type), NULL);
1125
1126   payload = gst_adapter_map (comm->adapter, mapped_size);
1127   if (!payload)
1128     return NULL;
1129
1130   memcpy (&type, payload, sizeof (type));
1131   payload += sizeof (type);
1132   size -= sizeof (type);
1133   if (size == 0)
1134     goto done;
1135
1136   *upstream = (*payload) ? TRUE : FALSE;
1137   payload += 1;
1138   size -= 1;
1139   if (size == 0)
1140     goto done;
1141
1142   if (payload[size - 1])
1143     goto done;
1144   if (*payload) {
1145     structure = gst_structure_from_string ((const char *) payload, &end);
1146   } else {
1147     structure = NULL;
1148   }
1149
1150   query = gst_query_new_custom (type, structure);
1151
1152   /* CAPS queries contain a filter field, of GstCaps type, which can be NULL.
1153      This does not play well with the serialization/deserialization system,
1154      which will give us a non-NULL GstCaps which has a value of NULL. This
1155      in turn wreaks havoc with any code that tests whether filter is NULL
1156      (which basically means, am I being given an optional GstCaps ?).
1157      So we look for non-NULL GstCaps which have NULL contents, and replace
1158      them with NULL instead. */
1159   if (GST_QUERY_TYPE (query) == GST_QUERY_CAPS) {
1160     GstCaps *filter;
1161     gst_query_parse_caps (query, &filter);
1162     if (filter
1163         && !strcmp (gst_structure_get_name (gst_caps_get_structure (filter, 0)),
1164             "NULL")) {
1165       gst_query_unref (query);
1166       query = gst_query_new_caps (NULL);
1167     }
1168   }
1169
1170 done:
1171   gst_adapter_unmap (comm->adapter);
1172   gst_adapter_flush (comm->adapter, mapped_size);
1173   return query;
1174 }
1175
1176 GstStateChangeReturn
1177 gst_ipc_pipeline_comm_write_state_change_to_fd (GstIpcPipelineComm * comm,
1178     GstStateChange transition)
1179 {
1180   const unsigned char payload_type =
1181       GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE;
1182   GstStateChangeReturn ret;
1183   guint32 size, ret32 = GST_STATE_CHANGE_SUCCESS;
1184   GstByteWriter bw;
1185
1186   g_mutex_lock (&comm->mutex);
1187   ++comm->send_id;
1188
1189   GST_TRACE_OBJECT (comm->element, "Writing state change %u: %s -> %s",
1190       comm->send_id,
1191       gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
1192       gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
1193
1194   gst_byte_writer_init (&bw);
1195   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1196     goto write_failed;
1197   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1198     goto write_failed;
1199   size = sizeof (transition);
1200   if (!gst_byte_writer_put_uint32_le (&bw, size))
1201     goto write_failed;
1202   if (!gst_byte_writer_put_uint32_le (&bw, transition))
1203     goto write_failed;
1204
1205   if (!write_byte_writer_to_fd (comm, &bw))
1206     goto write_failed;
1207
1208   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1209           ACK_TYPE_TIMED, COMM_REQUEST_TYPE_STATE_CHANGE))
1210     goto write_failed;
1211   ret = ret32;
1212
1213 done:
1214   g_mutex_unlock (&comm->mutex);
1215   gst_byte_writer_reset (&bw);
1216   return ret;
1217
1218 write_failed:
1219   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1220       ("Failed to write to socket"));
1221   ret = GST_STATE_CHANGE_FAILURE;
1222   goto done;
1223 }
1224
1225 static gboolean
1226 is_valid_state_change (GstStateChange transition)
1227 {
1228   if (transition == GST_STATE_CHANGE_NULL_TO_READY)
1229     return TRUE;
1230   if (transition == GST_STATE_CHANGE_READY_TO_PAUSED)
1231     return TRUE;
1232   if (transition == GST_STATE_CHANGE_PAUSED_TO_PLAYING)
1233     return TRUE;
1234   if (transition == GST_STATE_CHANGE_PLAYING_TO_PAUSED)
1235     return TRUE;
1236   if (transition == GST_STATE_CHANGE_PAUSED_TO_READY)
1237     return TRUE;
1238   if (transition == GST_STATE_CHANGE_READY_TO_NULL)
1239     return TRUE;
1240   if (GST_STATE_TRANSITION_CURRENT (transition) ==
1241       GST_STATE_TRANSITION_NEXT (transition))
1242     return TRUE;
1243   return FALSE;
1244 }
1245
1246 static gboolean
1247 gst_ipc_pipeline_comm_read_state_change (GstIpcPipelineComm * comm,
1248     guint32 size, guint32 * transition)
1249 {
1250   guint32 mapped_size = size;
1251   const guint8 *payload;
1252
1253   /* this should not be called if we don't have enough yet */
1254   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE);
1255   g_return_val_if_fail (size >= sizeof (*transition), FALSE);
1256
1257   payload = gst_adapter_map (comm->adapter, size);
1258   if (!payload)
1259     return FALSE;
1260   memcpy (transition, payload, sizeof (*transition));
1261   gst_adapter_unmap (comm->adapter);
1262   gst_adapter_flush (comm->adapter, mapped_size);
1263   return is_valid_state_change (*transition);
1264 }
1265
1266 void
1267 gst_ipc_pipeline_comm_write_state_lost_to_fd (GstIpcPipelineComm * comm)
1268 {
1269   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST;
1270   guint32 size;
1271   GstByteWriter bw;
1272
1273   g_mutex_lock (&comm->mutex);
1274   ++comm->send_id;
1275
1276   GST_TRACE_OBJECT (comm->element, "Writing state-lost %u", comm->send_id);
1277   gst_byte_writer_init (&bw);
1278   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1279     goto write_failed;
1280   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1281     goto write_failed;
1282   size = 0;
1283   if (!gst_byte_writer_put_uint32_le (&bw, size))
1284     goto write_failed;
1285
1286   if (!write_byte_writer_to_fd (comm, &bw))
1287     goto write_failed;
1288
1289 done:
1290   g_mutex_unlock (&comm->mutex);
1291   gst_byte_writer_reset (&bw);
1292   return;
1293
1294 write_failed:
1295   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1296       ("Failed to write to socket"));
1297   goto done;
1298 }
1299
1300 static gboolean
1301 gst_ipc_pipeline_comm_read_state_lost (GstIpcPipelineComm * comm, guint32 size)
1302 {
1303   /* no payload */
1304   return TRUE;
1305 }
1306
1307 static gboolean
1308 gst_ipc_pipeline_comm_write_gerror_message_to_fd (GstIpcPipelineComm * comm,
1309     GstMessage * message)
1310 {
1311   const unsigned char payload_type =
1312       GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE;
1313   gboolean ret;
1314   guint32 code, size, ret32 = TRUE;
1315   char *str = NULL;
1316   GError *error;
1317   char *extra_message;
1318   const char *domain_string;
1319   unsigned char msgtype;
1320   GstByteWriter bw;
1321
1322   g_mutex_lock (&comm->mutex);
1323   ++comm->send_id;
1324
1325   if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR) {
1326     gst_message_parse_error (message, &error, &extra_message);
1327     msgtype = 2;
1328   } else if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING) {
1329     gst_message_parse_warning (message, &error, &extra_message);
1330     msgtype = 1;
1331   } else {
1332     gst_message_parse_info (message, &error, &extra_message);
1333     msgtype = 0;
1334   }
1335   code = error->code;
1336   domain_string = g_quark_to_string (error->domain);
1337   GST_TRACE_OBJECT (comm->element,
1338       "Writing error %u: domain %s, code %u, message %s, extra message %s",
1339       comm->send_id, domain_string, error->code, error->message, extra_message);
1340
1341   gst_byte_writer_init (&bw);
1342   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1343     goto write_failed;
1344   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1345     goto write_failed;
1346
1347   size = sizeof (size);
1348   size += 1;
1349   size += strlen (domain_string) + 1;
1350   size += sizeof (code);
1351   size += sizeof (size);
1352   size += error->message ? strlen (error->message) + 1 : 0;
1353   size += sizeof (size);
1354   size += extra_message ? strlen (extra_message) + 1 : 0;
1355
1356   if (!gst_byte_writer_put_uint32_le (&bw, size))
1357     goto write_failed;
1358
1359   if (!gst_byte_writer_put_uint8 (&bw, msgtype))
1360     goto write_failed;
1361   size = strlen (domain_string) + 1;
1362   if (!gst_byte_writer_put_uint32_le (&bw, size))
1363     goto write_failed;
1364   if (!gst_byte_writer_put_data (&bw, (const guint8 *) domain_string, size))
1365     goto write_failed;
1366   if (!gst_byte_writer_put_uint32_le (&bw, code))
1367     goto write_failed;
1368   size = error->message ? strlen (error->message) + 1 : 0;
1369   if (!gst_byte_writer_put_uint32_le (&bw, size))
1370     goto write_failed;
1371   if (error->message) {
1372     if (!gst_byte_writer_put_data (&bw, (const guint8 *) error->message, size))
1373       goto write_failed;
1374   }
1375   size = extra_message ? strlen (extra_message) + 1 : 0;
1376   if (!gst_byte_writer_put_uint32_le (&bw, size))
1377     goto write_failed;
1378   if (extra_message) {
1379     if (!gst_byte_writer_put_data (&bw, (const guint8 *) extra_message, size))
1380       goto write_failed;
1381   }
1382
1383   if (!write_byte_writer_to_fd (comm, &bw))
1384     goto write_failed;
1385
1386   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1387           ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE))
1388     goto write_failed;
1389
1390   ret = ret32;
1391
1392 done:
1393   g_mutex_unlock (&comm->mutex);
1394   g_free (str);
1395   g_error_free (error);
1396   g_free (extra_message);
1397   gst_byte_writer_reset (&bw);
1398   return ret;
1399
1400 write_failed:
1401   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1402       ("Failed to write to socket"));
1403   ret = FALSE;
1404   goto done;
1405 }
1406
1407 static GstMessage *
1408 gst_ipc_pipeline_comm_read_gerror_message (GstIpcPipelineComm * comm,
1409     guint32 size)
1410 {
1411   GstMessage *message = NULL;
1412   guint32 code;
1413   GQuark domain;
1414   const char *msg, *extra_message;
1415   GError *error;
1416   unsigned char msgtype;
1417   guint32 mapped_size = size;
1418   const guint8 *payload;
1419
1420   /* this should not be called if we don't have enough yet */
1421   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1422   g_return_val_if_fail (size >= sizeof (code) + sizeof (size) * 3 + 1 + 1,
1423       NULL);
1424
1425   payload = gst_adapter_map (comm->adapter, mapped_size);
1426   if (!payload)
1427     return NULL;
1428   msgtype = *payload++;
1429   memcpy (&size, payload, sizeof (size));
1430   payload += sizeof (size);
1431   if (payload[size - 1])
1432     goto done;
1433   domain = g_quark_from_string ((const char *) payload);
1434   payload += size;
1435
1436   memcpy (&code, payload, sizeof (code));
1437   payload += sizeof (code);
1438
1439   memcpy (&size, payload, sizeof (size));
1440   payload += sizeof (size);
1441   if (size) {
1442     if (payload[size - 1])
1443       goto done;
1444     msg = (const char *) payload;
1445   } else {
1446     msg = NULL;
1447   }
1448   payload += size;
1449
1450   memcpy (&size, payload, sizeof (size));
1451   payload += sizeof (size);
1452   if (size) {
1453     if (payload[size - 1])
1454       goto done;
1455     extra_message = (const char *) payload;
1456   } else {
1457     extra_message = NULL;
1458   }
1459   payload += size;
1460
1461   error = g_error_new (domain, code, "%s", msg);
1462   if (msgtype == 2)
1463     message =
1464         gst_message_new_error (GST_OBJECT (comm->element), error,
1465         extra_message);
1466   else if (msgtype == 1)
1467     message =
1468         gst_message_new_warning (GST_OBJECT (comm->element), error,
1469         extra_message);
1470   else
1471     message =
1472         gst_message_new_info (GST_OBJECT (comm->element), error, extra_message);
1473   g_error_free (error);
1474
1475 done:
1476   gst_adapter_unmap (comm->adapter);
1477   gst_adapter_flush (comm->adapter, mapped_size);
1478
1479   return message;
1480 }
1481
1482 gboolean
1483 gst_ipc_pipeline_comm_write_message_to_fd (GstIpcPipelineComm * comm,
1484     GstMessage * message)
1485 {
1486   const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE;
1487   gboolean ret;
1488   guint32 type, size, ret32 = TRUE, slen;
1489   char *str = NULL;
1490   const GstStructure *structure;
1491   GstByteWriter bw;
1492
1493   /* we special case error as gst can't serialize/de-serialize it */
1494   if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR
1495       || GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING
1496       || GST_MESSAGE_TYPE (message) == GST_MESSAGE_INFO)
1497     return gst_ipc_pipeline_comm_write_gerror_message_to_fd (comm, message);
1498
1499   g_mutex_lock (&comm->mutex);
1500   ++comm->send_id;
1501
1502   GST_TRACE_OBJECT (comm->element, "Writing message %u: %" GST_PTR_FORMAT,
1503       comm->send_id, message);
1504
1505   gst_byte_writer_init (&bw);
1506   if (!gst_byte_writer_put_uint8 (&bw, payload_type))
1507     goto write_failed;
1508   if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id))
1509     goto write_failed;
1510   structure = gst_message_get_structure (message);
1511   if (structure) {
1512     str = gst_structure_to_string (structure);
1513     slen = strlen (str);
1514   } else {
1515     str = NULL;
1516     slen = 0;
1517   }
1518   size = sizeof (type) + slen + 1;
1519   if (!gst_byte_writer_put_uint32_le (&bw, size))
1520     goto write_failed;
1521
1522   type = GST_MESSAGE_TYPE (message);
1523   if (!gst_byte_writer_put_uint32_le (&bw, type))
1524     goto write_failed;
1525   size -= sizeof (type);
1526   if (str) {
1527     if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size))
1528       goto write_failed;
1529   } else {
1530     if (!gst_byte_writer_put_uint8 (&bw, 0))
1531       goto write_failed;
1532   }
1533
1534   if (!write_byte_writer_to_fd (comm, &bw))
1535     goto write_failed;
1536
1537   if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32,
1538           ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE))
1539     goto write_failed;
1540
1541   ret = ret32;
1542
1543 done:
1544   g_mutex_unlock (&comm->mutex);
1545   g_free (str);
1546   gst_byte_writer_reset (&bw);
1547   return ret;
1548
1549 write_failed:
1550   GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL),
1551       ("Failed to write to socket"));
1552   ret = FALSE;
1553   goto done;
1554 }
1555
1556 static GstMessage *
1557 gst_ipc_pipeline_comm_read_message (GstIpcPipelineComm * comm, guint32 size)
1558 {
1559   GstMessage *message = NULL;
1560   gchar *end = NULL;
1561   GstStructure *structure;
1562   guint32 type;
1563   guint32 mapped_size = size;
1564   const guint8 *payload;
1565
1566   /* this should not be called if we don't have enough yet */
1567   g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL);
1568   g_return_val_if_fail (size >= sizeof (type), NULL);
1569
1570   payload = gst_adapter_map (comm->adapter, mapped_size);
1571   if (!payload)
1572     return NULL;
1573   memcpy (&type, payload, sizeof (type));
1574   payload += sizeof (type);
1575   size -= sizeof (type);
1576   if (size == 0)
1577     goto done;
1578
1579   if (payload[size - 1])
1580     goto done;
1581   if (*payload) {
1582     structure = gst_structure_from_string ((const char *) payload, &end);
1583   } else {
1584     structure = NULL;
1585   }
1586
1587   message =
1588       gst_message_new_custom (type, GST_OBJECT (comm->element), structure);
1589
1590 done:
1591   gst_adapter_unmap (comm->adapter);
1592   gst_adapter_flush (comm->adapter, mapped_size);
1593
1594   return message;
1595 }
1596
1597 void
1598 gst_ipc_pipeline_comm_init (GstIpcPipelineComm * comm, GstElement * element)
1599 {
1600   g_mutex_init (&comm->mutex);
1601   comm->element = element;
1602   comm->fdin = comm->fdout = -1;
1603   comm->ack_time = DEFAULT_ACK_TIME;
1604   comm->waiting_ids =
1605       g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
1606       (GDestroyNotify) comm_request_free);
1607   comm->adapter = gst_adapter_new ();
1608   comm->poll = gst_poll_new (TRUE);
1609   gst_poll_fd_init (&comm->pollFDin);
1610 }
1611
1612 void
1613 gst_ipc_pipeline_comm_clear (GstIpcPipelineComm * comm)
1614 {
1615   g_hash_table_destroy (comm->waiting_ids);
1616   gst_object_unref (comm->adapter);
1617   gst_poll_free (comm->poll);
1618   g_mutex_clear (&comm->mutex);
1619 }
1620
1621 static void
1622 cancel_request (gpointer key, gpointer value, gpointer user_data,
1623     GstFlowReturn fret)
1624 {
1625   GstIpcPipelineComm *comm = (GstIpcPipelineComm *) user_data;
1626   guint32 id = GPOINTER_TO_INT (key);
1627   CommRequest *req = (CommRequest *) value;
1628
1629   GST_TRACE_OBJECT (comm->element, "Cancelling request %u, type %d", id,
1630       req->type);
1631   req->ret = fret;
1632   req->replied = TRUE;
1633   g_cond_signal (&req->cond);
1634 }
1635
1636 static void
1637 cancel_request_error (gpointer key, gpointer value, gpointer user_data)
1638 {
1639   CommRequest *req = (CommRequest *) value;
1640   GstFlowReturn fret = comm_request_ret_get_failure_value (req->type);
1641
1642   cancel_request (key, value, user_data, fret);
1643 }
1644
1645 void
1646 gst_ipc_pipeline_comm_cancel (GstIpcPipelineComm * comm, gboolean cleanup)
1647 {
1648   g_mutex_lock (&comm->mutex);
1649   g_hash_table_foreach (comm->waiting_ids, cancel_request_error, comm);
1650   if (cleanup) {
1651     g_hash_table_unref (comm->waiting_ids);
1652     comm->waiting_ids =
1653         g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
1654         (GDestroyNotify) comm_request_free);
1655   }
1656   g_mutex_unlock (&comm->mutex);
1657 }
1658
1659 static gboolean
1660 set_field (GQuark field_id, const GValue * value, gpointer user_data)
1661 {
1662   GstStructure *structure = user_data;
1663
1664   gst_structure_id_set_value (structure, field_id, value);
1665
1666   return TRUE;
1667 }
1668
1669 static gboolean
1670 gst_ipc_pipeline_comm_reply_request (GstIpcPipelineComm * comm, guint32 id,
1671     GstFlowReturn ret, GstQuery * query)
1672 {
1673   CommRequest *req;
1674
1675   req = g_hash_table_lookup (comm->waiting_ids, GINT_TO_POINTER (id));
1676   if (!req) {
1677     GST_WARNING_OBJECT (comm->element, "Got reply for unknown request %u", id);
1678     return FALSE;
1679   }
1680
1681   GST_TRACE_OBJECT (comm->element, "Got reply %d (%s) for request %u", ret,
1682       comm_request_ret_get_name (req->type, ret), req->id);
1683   req->replied = TRUE;
1684   req->ret = ret;
1685   if (query) {
1686     if (req->query) {
1687       /* We need to update the original query in place, as the caller
1688          will expect the object to be the same */
1689       GstStructure *structure = gst_query_writable_structure (req->query);
1690       gst_structure_remove_all_fields (structure);
1691       gst_structure_foreach (gst_query_get_structure (query), set_field,
1692           structure);
1693     } else {
1694       GST_WARNING_OBJECT (comm->element,
1695           "Got query reply, but no query was in the request");
1696     }
1697   }
1698   g_cond_signal (&req->cond);
1699   return TRUE;
1700 }
1701
1702 static gint
1703 update_adapter (GstIpcPipelineComm * comm)
1704 {
1705   GstMemory *mem = NULL;
1706   GstBuffer *buf;
1707   GstMapInfo map;
1708   ssize_t sz;
1709   gint ret = 0;
1710
1711 again:
1712   /* update pollFDin if necessary (fdin changed or we lost our parent).
1713    * we do not allow a parent-less element to communicate with its peer
1714    * in order to avoid race conditions where the slave tries to change
1715    * the state of its parent pipeline while it is not yet added in that
1716    * pipeline. */
1717   if (comm->pollFDin.fd != comm->fdin || !GST_OBJECT_PARENT (comm->element)) {
1718     if (comm->pollFDin.fd != -1) {
1719       GST_DEBUG_OBJECT (comm->element, "Stop watching fd %d",
1720           comm->pollFDin.fd);
1721       gst_poll_remove_fd (comm->poll, &comm->pollFDin);
1722       gst_poll_fd_init (&comm->pollFDin);
1723     }
1724     if (comm->fdin != -1 && GST_OBJECT_PARENT (comm->element)) {
1725       GST_DEBUG_OBJECT (comm->element, "Start watching fd %d", comm->fdin);
1726       comm->pollFDin.fd = comm->fdin;
1727       gst_poll_add_fd (comm->poll, &comm->pollFDin);
1728       gst_poll_fd_ctl_read (comm->poll, &comm->pollFDin, TRUE);
1729     }
1730   }
1731
1732   /* wait for activity on fdin or a flush */
1733   if (gst_poll_wait (comm->poll, 100 * GST_MSECOND) < 0) {
1734     if (errno == EAGAIN)
1735       goto again;
1736     /* error out, unless interrupted or flushing */
1737     if (errno != EINTR)
1738       ret = (errno == EBUSY) ? 2 : 1;
1739   }
1740
1741   /* read from fdin if possible and push data to our adapter */
1742   if (comm->pollFDin.fd >= 0
1743       && gst_poll_fd_can_read (comm->poll, &comm->pollFDin)) {
1744     if (!mem)
1745       mem = gst_allocator_alloc (NULL, comm->read_chunk_size, NULL);
1746
1747     gst_memory_map (mem, &map, GST_MAP_WRITE);
1748     sz = read (comm->pollFDin.fd, map.data, map.size);
1749     gst_memory_unmap (mem, &map);
1750
1751     if (sz <= 0) {
1752       if (errno == EAGAIN)
1753         goto again;
1754       /* error out, unless interrupted */
1755       if (errno != EINTR)
1756         ret = 1;
1757     } else {
1758       gst_memory_resize (mem, 0, sz);
1759       buf = gst_buffer_new ();
1760       gst_buffer_append_memory (buf, mem);
1761       mem = NULL;
1762       GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz);
1763       gst_adapter_push (comm->adapter, buf);
1764     }
1765   }
1766
1767   if (mem)
1768     gst_memory_unref (mem);
1769
1770   return ret;
1771 }
1772
1773 static gboolean
1774 read_many (GstIpcPipelineComm * comm)
1775 {
1776   gboolean ret = TRUE;
1777   gsize available;
1778   const guint8 *payload;
1779
1780   while (1)
1781     switch (comm->state) {
1782       case GST_IPC_PIPELINE_COMM_STATE_TYPE:
1783       {
1784         guint8 type;
1785         guint32 mapped_size;
1786
1787         available = gst_adapter_available (comm->adapter);
1788         mapped_size = 1 + sizeof (gint32) * 2;
1789         if (available < mapped_size)
1790           goto done;
1791
1792         payload = gst_adapter_map (comm->adapter, mapped_size);
1793         type = *payload++;
1794         g_mutex_lock (&comm->mutex);
1795         memcpy (&comm->id, payload, sizeof (guint32));
1796         memcpy (&comm->payload_length, payload + 4, sizeof (guint32));
1797         g_mutex_unlock (&comm->mutex);
1798         gst_adapter_unmap (comm->adapter);
1799         gst_adapter_flush (comm->adapter, mapped_size);
1800         GST_TRACE_OBJECT (comm->element, "Got id %u, type %d, payload %u",
1801             comm->id, type, comm->payload_length);
1802         switch (type) {
1803           case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
1804           case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
1805           case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
1806           case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
1807           case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
1808           case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
1809           case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
1810           case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
1811           case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
1812           case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
1813             GST_TRACE_OBJECT (comm->element, "switching to state %s",
1814                 gst_ipc_pipeline_comm_data_type_get_name (type));
1815             comm->state = type;
1816             break;
1817           default:
1818             goto out_of_sync;
1819         }
1820         break;
1821       }
1822       case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK:
1823       {
1824         const guint8 *rets;
1825         guint32 ret32;
1826
1827         available = gst_adapter_available (comm->adapter);
1828         if (available < comm->payload_length)
1829           goto done;
1830
1831         if (available < sizeof (guint32))
1832           goto ack_failed;
1833
1834         rets = gst_adapter_map (comm->adapter, sizeof (guint32));
1835         memcpy (&ret32, rets, sizeof (ret32));
1836         gst_adapter_unmap (comm->adapter);
1837         gst_adapter_flush (comm->adapter, sizeof (guint32));
1838         GST_TRACE_OBJECT (comm->element, "Got ACK %s for id %u",
1839             gst_flow_get_name (ret32), comm->id);
1840
1841         g_mutex_lock (&comm->mutex);
1842         gst_ipc_pipeline_comm_reply_request (comm, comm->id, ret32, NULL);
1843         g_mutex_unlock (&comm->mutex);
1844
1845         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1846         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1847         break;
1848       }
1849       case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT:
1850       {
1851         GstQuery *query = NULL;
1852         gboolean qret;
1853
1854         available = gst_adapter_available (comm->adapter);
1855         if (available < comm->payload_length)
1856           goto done;
1857
1858         qret =
1859             gst_ipc_pipeline_comm_read_query_result (comm, comm->payload_length,
1860             &query);
1861
1862         GST_TRACE_OBJECT (comm->element,
1863             "deserialized query result %p: %d, %" GST_PTR_FORMAT, query, qret,
1864             query);
1865
1866         g_mutex_lock (&comm->mutex);
1867         gst_ipc_pipeline_comm_reply_request (comm, comm->id, qret, query);
1868         g_mutex_unlock (&comm->mutex);
1869
1870         gst_query_unref (query);
1871
1872         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1873         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1874         break;
1875       }
1876       case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER:
1877       {
1878         GstBuffer *buf;
1879
1880         available = gst_adapter_available (comm->adapter);
1881         if (available < comm->payload_length)
1882           goto done;
1883
1884         buf = gst_ipc_pipeline_comm_read_buffer (comm, comm->payload_length);
1885         if (!buf)
1886           goto buffer_failed;
1887
1888         /* set caps and push */
1889         GST_TRACE_OBJECT (comm->element,
1890             "deserialized buffer %p, pushing, timestamp %" GST_TIME_FORMAT
1891             ", duration %" GST_TIME_FORMAT ", offset %" G_GINT64_FORMAT
1892             ", offset_end %" G_GINT64_FORMAT ", size %" G_GSIZE_FORMAT
1893             ", flags 0x%x", buf, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
1894             GST_TIME_ARGS (GST_BUFFER_DURATION (buf)), GST_BUFFER_OFFSET (buf),
1895             GST_BUFFER_OFFSET_END (buf), gst_buffer_get_size (buf),
1896             GST_BUFFER_FLAGS (buf));
1897
1898         gst_mini_object_set_qdata (GST_MINI_OBJECT (buf), QUARK_ID,
1899             GINT_TO_POINTER (comm->id), NULL);
1900
1901         if (comm->on_buffer)
1902           (*comm->on_buffer) (comm->id, buf, comm->user_data);
1903
1904         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1905         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1906         break;
1907       }
1908       case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT:
1909       {
1910         GstEvent *event;
1911         gboolean upstream;
1912
1913         available = gst_adapter_available (comm->adapter);
1914         if (available < comm->payload_length)
1915           goto done;
1916
1917         event = gst_ipc_pipeline_comm_read_event (comm, comm->payload_length,
1918             &upstream);
1919         if (!event)
1920           goto event_failed;
1921
1922         GST_TRACE_OBJECT (comm->element, "deserialized event %p of type %s",
1923             event, gst_event_type_get_name (event->type));
1924
1925         gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID,
1926             GINT_TO_POINTER (comm->id), NULL);
1927
1928         if (comm->on_event)
1929           (*comm->on_event) (comm->id, event, upstream, comm->user_data);
1930
1931         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1932         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1933         break;
1934       }
1935       case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT:
1936       {
1937         GstEvent *event;
1938
1939         available = gst_adapter_available (comm->adapter);
1940         if (available < comm->payload_length)
1941           goto done;
1942
1943         event = gst_ipc_pipeline_comm_read_sink_message_event (comm,
1944             comm->payload_length);
1945         if (!event)
1946           goto event_failed;
1947
1948         GST_TRACE_OBJECT (comm->element, "deserialized sink message event %p",
1949             event);
1950
1951         gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID,
1952             GINT_TO_POINTER (comm->id), NULL);
1953
1954         if (comm->on_event)
1955           (*comm->on_event) (comm->id, event, FALSE, comm->user_data);
1956
1957         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1958         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1959         break;
1960       }
1961       case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY:
1962       {
1963         GstQuery *query;
1964         gboolean upstream;
1965
1966         available = gst_adapter_available (comm->adapter);
1967         if (available < comm->payload_length)
1968           goto done;
1969
1970         query = gst_ipc_pipeline_comm_read_query (comm, comm->payload_length,
1971             &upstream);
1972         if (!query)
1973           goto query_failed;
1974
1975         GST_TRACE_OBJECT (comm->element, "deserialized query %p of type %s",
1976             query, gst_query_type_get_name (query->type));
1977
1978         gst_mini_object_set_qdata (GST_MINI_OBJECT (query), QUARK_ID,
1979             GINT_TO_POINTER (comm->id), NULL);
1980
1981         if (comm->on_query)
1982           (*comm->on_query) (comm->id, query, upstream, comm->user_data);
1983
1984         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
1985         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
1986         break;
1987       }
1988       case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE:
1989       {
1990         guint32 transition;
1991
1992         available = gst_adapter_available (comm->adapter);
1993         if (available < comm->payload_length)
1994           goto done;
1995
1996         if (!gst_ipc_pipeline_comm_read_state_change (comm,
1997                 comm->payload_length, &transition))
1998           goto state_change_failed;
1999
2000         GST_TRACE_OBJECT (comm->element,
2001             "deserialized state change request: %s -> %s",
2002             gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT
2003                 (transition)),
2004             gst_element_state_get_name (GST_STATE_TRANSITION_NEXT
2005                 (transition)));
2006
2007         if (comm->on_state_change)
2008           (*comm->on_state_change) (comm->id, transition, comm->user_data);
2009
2010         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2011         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2012         break;
2013       }
2014       case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST:
2015       {
2016         available = gst_adapter_available (comm->adapter);
2017         if (available < comm->payload_length)
2018           goto done;
2019
2020         if (!gst_ipc_pipeline_comm_read_state_lost (comm, comm->payload_length))
2021           goto event_failed;
2022
2023         GST_TRACE_OBJECT (comm->element, "deserialized state-lost");
2024
2025         if (comm->on_state_lost)
2026           (*comm->on_state_lost) (comm->user_data);
2027
2028         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2029         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2030         break;
2031       }
2032       case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE:
2033       {
2034         GstMessage *message;
2035
2036         available = gst_adapter_available (comm->adapter);
2037         if (available < comm->payload_length)
2038           goto done;
2039
2040         message = gst_ipc_pipeline_comm_read_message (comm,
2041             comm->payload_length);
2042         if (!message)
2043           goto message_failed;
2044
2045         GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s",
2046             message, gst_message_type_get_name (message->type));
2047
2048         if (comm->on_message)
2049           (*comm->on_message) (comm->id, message, comm->user_data);
2050
2051         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2052         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2053         break;
2054       }
2055       case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE:
2056       {
2057         GstMessage *message;
2058
2059         available = gst_adapter_available (comm->adapter);
2060         if (available < comm->payload_length)
2061           goto done;
2062
2063         message = gst_ipc_pipeline_comm_read_gerror_message (comm,
2064             comm->payload_length);
2065         if (!message)
2066           goto message_failed;
2067
2068         GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s",
2069             message, gst_message_type_get_name (message->type));
2070
2071         if (comm->on_message)
2072           (*comm->on_message) (comm->id, message, comm->user_data);
2073
2074         GST_TRACE_OBJECT (comm->element, "switching to state TYPE");
2075         comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2076         break;
2077       }
2078     }
2079
2080 done:
2081   return ret;
2082
2083   /* ERRORS */
2084 out_of_sync:
2085   {
2086     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2087         ("Socket out of sync"));
2088     ret = FALSE;
2089     goto done;
2090   }
2091 state_change_failed:
2092   {
2093     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2094         ("could not read state change from fd"));
2095     ret = FALSE;
2096     goto done;
2097   }
2098 ack_failed:
2099   {
2100     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2101         ("could not read ack from fd"));
2102     ret = FALSE;
2103     goto done;
2104   }
2105 buffer_failed:
2106   {
2107     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2108         ("could not read buffer from fd"));
2109     ret = FALSE;
2110     goto done;
2111   }
2112 event_failed:
2113   {
2114     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2115         ("could not read event from fd"));
2116     ret = FALSE;
2117     goto done;
2118   }
2119 message_failed:
2120   {
2121     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2122         ("could not read message from fd"));
2123     ret = FALSE;
2124     goto done;
2125   }
2126 query_failed:
2127   {
2128     GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL),
2129         ("could not read query from fd"));
2130     ret = FALSE;
2131     goto done;
2132   }
2133 }
2134
2135 static gpointer
2136 reader_thread (gpointer data)
2137 {
2138   GstIpcPipelineComm *comm = (GstIpcPipelineComm *) data;
2139   gboolean running = TRUE;
2140   gint ret = 0;
2141
2142   while (running) {
2143     ret = update_adapter (comm);
2144     switch (ret) {
2145       case 1:
2146         GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
2147             ("Failed to read from socket"));
2148         running = FALSE;
2149         break;
2150       case 2:
2151         GST_INFO_OBJECT (comm->element, "We're stopping, all good");
2152         running = FALSE;
2153         break;
2154       default:
2155         read_many (comm);
2156         break;
2157     }
2158   }
2159
2160   GST_INFO_OBJECT (comm->element, "Reader thread ending");
2161   return NULL;
2162 }
2163
2164 gboolean
2165 gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
2166     void (*on_buffer) (guint32, GstBuffer *, gpointer),
2167     void (*on_event) (guint32, GstEvent *, gboolean, gpointer),
2168     void (*on_query) (guint32, GstQuery *, gboolean, gpointer),
2169     void (*on_state_change) (guint32, GstStateChange, gpointer),
2170     void (*on_state_lost) (gpointer),
2171     void (*on_message) (guint32, GstMessage *, gpointer), gpointer user_data)
2172 {
2173   if (comm->reader_thread)
2174     return FALSE;
2175
2176   comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
2177   comm->on_buffer = on_buffer;
2178   comm->on_event = on_event;
2179   comm->on_query = on_query;
2180   comm->on_state_change = on_state_change;
2181   comm->on_state_lost = on_state_lost;
2182   comm->on_message = on_message;
2183   comm->user_data = user_data;
2184   gst_poll_set_flushing (comm->poll, FALSE);
2185   comm->reader_thread =
2186       g_thread_new ("reader", (GThreadFunc) reader_thread, comm);
2187   return TRUE;
2188 }
2189
2190 void
2191 gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm)
2192 {
2193   if (!comm->reader_thread)
2194     return;
2195
2196   gst_poll_set_flushing (comm->poll, TRUE);
2197   g_thread_join (comm->reader_thread);
2198   comm->reader_thread = NULL;
2199 }
2200
2201 static gchar *
2202 gst_value_serialize_event (const GValue * value)
2203 {
2204   const GstStructure *structure;
2205   GstEvent *ev;
2206   gchar *type, *ts, *seqnum, *rt_offset, *str, *str64, *s;
2207   GValue val = G_VALUE_INIT;
2208
2209   ev = g_value_get_boxed (value);
2210
2211   g_value_init (&val, gst_event_type_get_type ());
2212   g_value_set_enum (&val, ev->type);
2213   type = gst_value_serialize (&val);
2214   g_value_unset (&val);
2215
2216   g_value_init (&val, G_TYPE_UINT64);
2217   g_value_set_uint64 (&val, ev->timestamp);
2218   ts = gst_value_serialize (&val);
2219   g_value_unset (&val);
2220
2221   g_value_init (&val, G_TYPE_UINT);
2222   g_value_set_uint (&val, ev->seqnum);
2223   seqnum = gst_value_serialize (&val);
2224   g_value_unset (&val);
2225
2226   g_value_init (&val, G_TYPE_INT64);
2227   g_value_set_int64 (&val, gst_event_get_running_time_offset (ev));
2228   rt_offset = gst_value_serialize (&val);
2229   g_value_unset (&val);
2230
2231   structure = gst_event_get_structure (ev);
2232   str = gst_structure_to_string (structure);
2233   str64 = g_base64_encode ((guchar *) str, strlen (str) + 1);
2234   g_strdelimit (str64, "=", '_');
2235   g_free (str);
2236
2237   s = g_strconcat (type, ":", ts, ":", seqnum, ":", rt_offset, ":", str64,
2238       NULL);
2239
2240   g_free (type);
2241   g_free (ts);
2242   g_free (seqnum);
2243   g_free (rt_offset);
2244   g_free (str64);
2245
2246   return s;
2247 }
2248
2249 static gboolean
2250 gst_value_deserialize_event (GValue * dest, const gchar * s)
2251 {
2252   GstEvent *ev = NULL;
2253   GValue val = G_VALUE_INIT;
2254   gboolean ret = FALSE;
2255   gchar **fields;
2256   gsize len;
2257
2258   fields = g_strsplit (s, ":", -1);
2259   if (g_strv_length (fields) != 5)
2260     goto wrong_length;
2261
2262   g_strdelimit (fields[4], "_", '=');
2263   g_base64_decode_inplace (fields[4], &len);
2264
2265   g_value_init (&val, gst_event_type_get_type ());
2266   if (!gst_value_deserialize (&val, fields[0]))
2267     goto fail;
2268   ev = gst_event_new_custom (g_value_get_enum (&val),
2269       gst_structure_new_from_string (fields[4]));
2270
2271   g_value_unset (&val);
2272   g_value_init (&val, G_TYPE_UINT64);
2273   if (!gst_value_deserialize (&val, fields[1]))
2274     goto fail;
2275   ev->timestamp = g_value_get_uint64 (&val);
2276
2277   g_value_unset (&val);
2278   g_value_init (&val, G_TYPE_UINT);
2279   if (!gst_value_deserialize (&val, fields[2]))
2280     goto fail;
2281   ev->seqnum = g_value_get_uint (&val);
2282
2283   g_value_unset (&val);
2284   g_value_init (&val, G_TYPE_INT64);
2285   if (!gst_value_deserialize (&val, fields[3]))
2286     goto fail;
2287   gst_event_set_running_time_offset (ev, g_value_get_int64 (&val));
2288
2289   g_value_take_boxed (dest, ev);
2290   ev = NULL;
2291   ret = TRUE;
2292
2293 fail:
2294   g_clear_pointer (&ev, gst_event_unref);
2295   g_value_unset (&val);
2296
2297 wrong_length:
2298   g_strfreev (fields);
2299   return ret;
2300 }
2301
2302 #define REGISTER_SERIALIZATION_NO_COMPARE(_gtype, _type)                \
2303 G_STMT_START {                                                          \
2304   static GstValueTable gst_value =                                      \
2305     { 0, NULL,                                             \
2306     gst_value_serialize_ ## _type, gst_value_deserialize_ ## _type };    \
2307   gst_value.type = _gtype;                                              \
2308   gst_value_register (&gst_value);                                      \
2309 } G_STMT_END
2310
2311 void
2312 gst_ipc_pipeline_comm_plugin_init (void)
2313 {
2314   static volatile gsize once = 0;
2315
2316   if (g_once_init_enter (&once)) {
2317     GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_comm_debug, "ipcpipelinecomm", 0,
2318         "ipc pipeline comm");
2319     QUARK_ID = g_quark_from_static_string ("ipcpipeline-id");
2320     REGISTER_SERIALIZATION_NO_COMPARE (gst_event_get_type (), event);
2321     g_once_init_leave (&once, (gsize) 1);
2322   }
2323 }