f4984774815ed21d40c01b9301d6f3ea5b9caa97
[platform/upstream/gstreamer.git] / gst / gdp / gstgdppay.c
1 /* GStreamer
2  * Copyright (C) 2006 Thomas Vander Stichele <thomas at apestaart dot org>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 /**
21  * SECTION:element-gdppay
22  *
23  * <refsect2>
24  * <para>
25  * This element payloads GStreamer buffers and events using the
26  * GStreamer Data Protocol.
27  * </para>
28  * </refsect2>
29  */
30
31 #ifdef HAVE_CONFIG_H
32 #include "config.h"
33 #endif
34
35 #include <gst/dataprotocol/dataprotocol.h>
36
37 #include "gstgdppay.h"
38
39 /* elementfactory information */
40 static const GstElementDetails gdp_pay_details =
41 GST_ELEMENT_DETAILS ("GDP Payloader",
42     "GDP/Payloader",
43     "Payloads GStreamer Data Protocol buffers",
44     "Thomas Vander Stichele <thomas at apestaart dot org>");
45
46 enum
47 {
48   PROP_0,
49   /* FILL ME */
50 };
51
52 static GstStaticPadTemplate gdp_pay_sink_template =
53 GST_STATIC_PAD_TEMPLATE ("sink",
54     GST_PAD_SINK,
55     GST_PAD_ALWAYS,
56     GST_STATIC_CAPS_ANY);
57
58 static GstStaticPadTemplate gdp_pay_src_template =
59 GST_STATIC_PAD_TEMPLATE ("src",
60     GST_PAD_SRC,
61     GST_PAD_ALWAYS,
62     GST_STATIC_CAPS ("application/x-gdp"));
63
64 GST_DEBUG_CATEGORY (gst_gdp_pay_debug);
65 #define GST_CAT_DEFAULT gst_gdp_pay_debug
66
67 #define _do_init(x) \
68     GST_DEBUG_CATEGORY_INIT (gst_gdp_pay_debug, "gdppay", 0, \
69     "GDP payloader");
70
71 GST_BOILERPLATE_FULL (GstGDPPay, gst_gdp_pay, GstElement,
72     GST_TYPE_ELEMENT, _do_init);
73
74 static GstFlowReturn gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer);
75 static gboolean gst_gdp_pay_sink_event (GstPad * pad, GstEvent * event);
76 static GstStateChangeReturn gst_gdp_pay_change_state (GstElement *
77     element, GstStateChange transition);
78
79 static void gst_gdp_pay_dispose (GObject * gobject);
80
81 static void
82 gst_gdp_pay_base_init (gpointer g_class)
83 {
84   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
85
86   gst_element_class_set_details (element_class, &gdp_pay_details);
87
88   gst_element_class_add_pad_template (element_class,
89       gst_static_pad_template_get (&gdp_pay_sink_template));
90   gst_element_class_add_pad_template (element_class,
91       gst_static_pad_template_get (&gdp_pay_src_template));
92 }
93
94 static void
95 gst_gdp_pay_class_init (GstGDPPayClass * klass)
96 {
97   GObjectClass *gobject_class;
98   GstElementClass *gstelement_class;
99
100   gobject_class = (GObjectClass *) klass;
101   gstelement_class = (GstElementClass *) klass;
102
103   parent_class = g_type_class_peek_parent (klass);
104
105   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_gdp_pay_dispose);
106   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_gdp_pay_change_state);
107 }
108
109 static void
110 gst_gdp_pay_init (GstGDPPay * gdppay, GstGDPPayClass * g_class)
111 {
112   gdppay->sinkpad =
113       gst_pad_new_from_static_template (&gdp_pay_sink_template, "sink");
114   gst_pad_set_chain_function (gdppay->sinkpad,
115       GST_DEBUG_FUNCPTR (gst_gdp_pay_chain));
116   gst_pad_set_event_function (gdppay->sinkpad,
117       GST_DEBUG_FUNCPTR (gst_gdp_pay_sink_event));
118   gst_element_add_pad (GST_ELEMENT (gdppay), gdppay->sinkpad);
119
120   gdppay->srcpad =
121       gst_pad_new_from_static_template (&gdp_pay_src_template, "src");
122   gst_element_add_pad (GST_ELEMENT (gdppay), gdppay->srcpad);
123
124   gdppay->offset = 0;
125 }
126
127 static void
128 gst_gdp_pay_dispose (GObject * gobject)
129 {
130   GstGDPPay *this = GST_GDP_PAY (gobject);
131
132   if (this->caps_buf) {
133     gst_buffer_unref (this->caps_buf);
134     this->caps_buf = NULL;
135   }
136   if (this->new_segment_buf) {
137     gst_buffer_unref (this->new_segment_buf);
138     this->new_segment_buf = NULL;
139   }
140   GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (gobject));
141 }
142
143 /* set OFFSET and OFFSET_END with running count */
144 static void
145 gst_gdp_stamp_buffer (GstGDPPay * this, GstBuffer * buffer)
146 {
147   GST_BUFFER_OFFSET (buffer) = this->offset;
148   GST_BUFFER_OFFSET_END (buffer) = this->offset + GST_BUFFER_SIZE (buffer);
149   this->offset = GST_BUFFER_OFFSET_END (buffer);
150 }
151
152 static GstBuffer *
153 gst_gdp_buffer_from_caps (GstGDPPay * this, GstCaps * caps)
154 {
155   GstBuffer *headerbuf;
156   GstBuffer *payloadbuf;
157   guint8 *header, *payload;
158   guint len;
159
160   if (!gst_dp_packet_from_caps (caps, 0, &len, &header, &payload)) {
161     GST_WARNING_OBJECT (this, "could not create GDP header from caps");
162     return NULL;
163   }
164
165   GST_LOG_OBJECT (this, "creating GDP header and payload buffer from caps");
166   headerbuf = gst_buffer_new ();
167   gst_buffer_set_data (headerbuf, header, len);
168   GST_BUFFER_MALLOCDATA (headerbuf) = header;
169
170   payloadbuf = gst_buffer_new ();
171   gst_buffer_set_data (payloadbuf, payload,
172       gst_dp_header_payload_length (header));
173   GST_BUFFER_MALLOCDATA (payloadbuf) = payload;
174
175   return gst_buffer_join (headerbuf, payloadbuf);
176 }
177
178 static GstBuffer *
179 gst_gdp_pay_buffer_from_buffer (GstGDPPay * this, GstBuffer * buffer)
180 {
181   GstBuffer *headerbuf;
182   guint8 *header;
183   guint len;
184
185   if (!gst_dp_header_from_buffer (buffer, 0, &len, &header)) {
186     GST_WARNING_OBJECT (this, "could not create GDP header from buffer");
187     return NULL;
188   }
189
190   GST_LOG_OBJECT (this, "creating GDP header and payload buffer from buffer");
191   headerbuf = gst_buffer_new ();
192   gst_buffer_set_data (headerbuf, header, len);
193   GST_BUFFER_MALLOCDATA (headerbuf) = header;
194
195   /* we do not want to lose the ref on the incoming buffer */
196   gst_buffer_ref (buffer);
197   return gst_buffer_join (headerbuf, buffer);
198 }
199
200 static GstBuffer *
201 gst_gdp_buffer_from_event (GstGDPPay * this, GstEvent * event)
202 {
203   GstBuffer *headerbuf;
204   GstBuffer *payloadbuf;
205   guint8 *header, *payload;
206   guint len;
207
208   if (!gst_dp_packet_from_event (event, 0, &len, &header, &payload)) {
209     GST_WARNING_OBJECT (this, "could not create GDP header from event");
210     return NULL;
211   }
212
213   GST_LOG_OBJECT (this, "creating GDP header and payload buffer from event");
214   headerbuf = gst_buffer_new ();
215   gst_buffer_set_data (headerbuf, header, len);
216   GST_BUFFER_MALLOCDATA (headerbuf) = header;
217
218   payloadbuf = gst_buffer_new ();
219   gst_buffer_set_data (payloadbuf, payload,
220       gst_dp_header_payload_length (header));
221   GST_BUFFER_MALLOCDATA (payloadbuf) = payload;
222
223   return gst_buffer_join (headerbuf, payloadbuf);
224 }
225
226
227 /* set our caps with streamheader, based on the latest newsegment and caps,
228  * and (possibly) GDP-serialized buffers of the streamheaders on the src pad */
229 static GstFlowReturn
230 gst_gdp_pay_reset_streamheader (GstGDPPay * this)
231 {
232   GstCaps *caps;
233   GstStructure *structure;
234   GstBuffer *new_segment_buf, *caps_buf;
235   GstFlowReturn r = GST_FLOW_OK;
236
237   GValue array = { 0 };
238   GValue value = { 0 };
239
240   /* we need both new segment and caps before we can set streamheader */
241   if (!this->new_segment_buf || !this->caps_buf)
242     return GST_FLOW_OK;
243
244   /* we copy to avoid circular refcounts */
245   new_segment_buf = gst_buffer_copy (this->new_segment_buf);
246   caps_buf = gst_buffer_copy (this->caps_buf);
247
248   /* put copies of the buffers in a fixed list */
249   g_value_init (&array, GST_TYPE_ARRAY);
250
251   g_value_init (&value, GST_TYPE_BUFFER);
252   gst_value_set_buffer (&value, new_segment_buf);
253   gst_value_array_append_value (&array, &value);
254   g_value_unset (&value);
255
256   g_value_init (&value, GST_TYPE_BUFFER);
257   gst_value_set_buffer (&value, caps_buf);
258   gst_value_array_append_value (&array, &value);
259   g_value_unset (&value);
260
261   /* we also need to add GDP serializations of the streamheaders of the
262    * incoming caps */
263   structure = gst_caps_get_structure (this->caps, 0);
264   if (gst_structure_has_field (structure, "streamheader")) {
265     const GValue *sh;
266     GArray *buffers;
267     GstBuffer *buffer;
268     int i;
269
270     sh = gst_structure_get_value (structure, "streamheader");
271     buffers = g_value_peek_pointer (sh);
272     GST_DEBUG_OBJECT (this,
273         "Need to serialize %d incoming streamheader buffers on our streamheader",
274         buffers->len);
275     for (i = 0; i < buffers->len; ++i) {
276       GValue *bufval;
277       GstBuffer *outbuffer;
278
279       bufval = &g_array_index (buffers, GValue, i);
280       buffer = g_value_peek_pointer (bufval);
281       outbuffer = gst_gdp_pay_buffer_from_buffer (this, buffer);
282       if (outbuffer) {
283         g_value_init (&value, GST_TYPE_BUFFER);
284         gst_value_set_buffer (&value, outbuffer);
285         gst_value_array_append_value (&array, &value);
286         g_value_unset (&value);
287       }
288       /* FIXME: if one or more in this loop fail to produce and outbuffer,
289        * should we error out ? Once ? Every time ? */
290     }
291   }
292
293   caps = gst_caps_from_string ("application/x-gdp");
294   structure = gst_caps_get_structure (caps, 0);
295
296   gst_structure_set_value (structure, "streamheader", &array);
297   g_value_unset (&array);
298
299   /* Unref our copies */
300   gst_buffer_unref (new_segment_buf);
301   gst_buffer_unref (caps_buf);
302
303   GST_DEBUG_OBJECT (this, "Setting caps on src pad %" GST_PTR_FORMAT, caps);
304   gst_pad_set_caps (this->srcpad, caps);
305   gst_buffer_set_caps (this->caps_buf, caps);
306   gst_buffer_set_caps (this->new_segment_buf, caps);
307
308   /* if these are our first ever buffers, send out new_segment first */
309   if (!this->sent_streamheader) {
310     GstEvent *event =
311         gst_event_new_new_segment (TRUE, 1.0, GST_FORMAT_BYTES, 0, -1, 0);
312     GST_DEBUG_OBJECT (this, "Sending out new_segment event %p", event);
313     if (!gst_pad_push_event (this->srcpad, event)) {
314       GST_WARNING_OBJECT (this, "pushing new segment failed");
315       return GST_FLOW_ERROR;
316     }
317   }
318
319   /* push out these streamheader buffers, then flush our internal queue */
320   GST_DEBUG_OBJECT (this, "Pushing GDP new_segment buffer %p",
321       this->new_segment_buf);
322   /* we stored these bufs with refcount 1, so make sure we keep a ref */
323   r = gst_pad_push (this->srcpad, gst_buffer_ref (this->new_segment_buf));
324   if (r != GST_FLOW_OK) {
325     GST_WARNING_OBJECT (this, "pushing GDP newsegment buffer returned %d", r);
326     return r;
327   }
328   GST_DEBUG_OBJECT (this, "Pushing GDP caps buffer %p", this->new_segment_buf);
329   r = gst_pad_push (this->srcpad, gst_buffer_ref (this->caps_buf));
330   if (r != GST_FLOW_OK) {
331     GST_WARNING_OBJECT (this, "pushing GDP caps buffer returned %d", r);
332     return r;
333   }
334   this->sent_streamheader = TRUE;
335   GST_DEBUG_OBJECT (this, "need to push %d queued buffers",
336       g_list_length (this->queue));
337   if (this->queue) {
338     GList *l;
339
340     for (l = this->queue; l; l = g_list_next (l)) {
341       GST_DEBUG_OBJECT (this, "Pushing queued GDP buffer %p", l->data);
342       gst_buffer_set_caps (l->data, caps);
343       r = gst_pad_push (this->srcpad, l->data);
344       if (r != GST_FLOW_OK) {
345         GST_WARNING_OBJECT (this, "pushing queued GDP buffer returned %d", r);
346         return r;
347       }
348     }
349   }
350
351   return r;
352 }
353
354 /* queue a buffer internally if we haven't sent streamheader buffers yet;
355  * otherwise, just push on */
356 static GstFlowReturn
357 gst_gdp_queue_buffer (GstGDPPay * this, GstBuffer * buffer)
358 {
359   if (this->sent_streamheader) {
360     GST_LOG_OBJECT (this, "Pushing GDP buffer %p", buffer);
361     GST_LOG_OBJECT (this, "set caps %" GST_PTR_FORMAT, this->caps);
362     return gst_pad_push (this->srcpad, buffer);
363   }
364
365   /* store it on an internal queue */
366   this->queue = g_list_append (this->queue, buffer);
367   GST_DEBUG_OBJECT (this, "queued buffer %p, now %d buffers queued",
368       buffer, g_list_length (this->queue));
369   return GST_FLOW_OK;
370 }
371
372 static GstFlowReturn
373 gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer)
374 {
375   GstGDPPay *this;
376   GstCaps *caps;
377   GstBuffer *outbuffer;
378   GstFlowReturn ret;
379
380   this = GST_GDP_PAY (gst_pad_get_parent (pad));
381
382   /* we should have received a new_segment before, otherwise it's a bug.
383    * fake one in that case */
384   if (!this->new_segment_buf) {
385     GstEvent *event;
386
387     GST_WARNING_OBJECT (this,
388         "did not receive new-segment before first buffer");
389     event = gst_event_new_new_segment (TRUE, 1.0, GST_FORMAT_BYTES, 0, -1, 0);
390     outbuffer = gst_gdp_buffer_from_event (this, event);
391     gst_event_unref (event);
392
393     if (!outbuffer) {
394       GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
395           ("Could not create GDP buffer from new segment event"));
396       ret = GST_FLOW_ERROR;
397       goto done;
398     }
399
400     gst_gdp_stamp_buffer (this, outbuffer);
401     GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
402     GST_BUFFER_DURATION (outbuffer) = 0;
403     this->new_segment_buf = outbuffer;
404   }
405
406   /* make sure we've received caps before */
407   caps = gst_buffer_get_caps (buffer);
408   if (!this->caps && !caps) {
409     GST_WARNING_OBJECT (this, "first received buffer does not have caps set");
410     if (caps)
411       gst_caps_unref (caps);
412     ret = GST_FLOW_NOT_NEGOTIATED;
413     goto done;
414   }
415
416   /* if the caps have changed, process caps first */
417   if (caps && !gst_caps_is_equal (this->caps, caps)) {
418     GST_LOG_OBJECT (this, "caps changed to %p, %" GST_PTR_FORMAT, caps, caps);
419     gst_caps_replace (&(this->caps), caps);
420     outbuffer = gst_gdp_buffer_from_caps (this, caps);
421     if (!outbuffer) {
422       GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
423           ("Could not create GDP buffer from caps %" GST_PTR_FORMAT, caps));
424       gst_caps_unref (caps);
425       ret = GST_FLOW_ERROR;
426       goto done;
427     }
428
429     gst_gdp_stamp_buffer (this, outbuffer);
430     GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
431     GST_BUFFER_DURATION (outbuffer) = 0;
432     GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_IN_CAPS);
433     this->caps_buf = outbuffer;
434     gst_gdp_pay_reset_streamheader (this);
435   }
436
437   /* create a GDP header packet,
438    * then create a GST buffer of the header packet and the buffer contents */
439   outbuffer = gst_gdp_pay_buffer_from_buffer (this, buffer);
440   if (!outbuffer) {
441     GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
442         ("Could not create GDP buffer from buffer"));
443     ret = GST_FLOW_ERROR;
444     goto done;
445   }
446
447   gst_gdp_stamp_buffer (this, outbuffer);
448   GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
449   GST_BUFFER_DURATION (outbuffer) = GST_BUFFER_DURATION (buffer);
450
451   ret = gst_gdp_queue_buffer (this, outbuffer);
452
453 done:
454   gst_buffer_unref (buffer);
455   gst_object_unref (this);
456   return ret;
457 }
458
459 static gboolean
460 gst_gdp_pay_sink_event (GstPad * pad, GstEvent * event)
461 {
462   GstBuffer *outbuffer;
463   GstGDPPay *this = GST_GDP_PAY (gst_pad_get_parent (pad));
464   GstFlowReturn flowret;
465   gboolean ret = TRUE;
466
467   /* now turn the event into a buffer */
468   outbuffer = gst_gdp_buffer_from_event (this, event);
469   if (!outbuffer) {
470     GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
471         ("Could not create GDP buffer from event"));
472     ret = FALSE;
473     goto done;
474   }
475   gst_gdp_stamp_buffer (this, outbuffer);
476   GST_BUFFER_TIMESTAMP (outbuffer) = GST_EVENT_TIMESTAMP (event);
477   GST_BUFFER_DURATION (outbuffer) = 0;
478
479   /* if we got a new segment, we should put it on our streamheader,
480    * and not send it on */
481   if (GST_EVENT_TYPE (event) == GST_EVENT_NEWSEGMENT) {
482     if (this->new_segment_buf) {
483       gst_buffer_unref (this->new_segment_buf);
484     }
485     this->new_segment_buf = outbuffer;
486     gst_gdp_pay_reset_streamheader (this);
487   } else {
488     flowret = gst_gdp_queue_buffer (this, outbuffer);
489     if (flowret != GST_FLOW_OK) {
490       GST_WARNING_OBJECT (this, "queueing GDP caps buffer returned %d",
491           flowret);
492       ret = FALSE;
493       goto done;
494     }
495   }
496
497   /* if we have EOS, we should send on EOS ourselves */
498   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
499     GST_DEBUG_OBJECT (this, "Sending on EOS event %p", event);
500     return gst_pad_push_event (this->srcpad, event);
501   };
502
503 done:
504   gst_object_unref (this);
505   gst_event_unref (event);
506   return ret;
507 }
508
509 static GstStateChangeReturn
510 gst_gdp_pay_change_state (GstElement * element, GstStateChange transition)
511 {
512   GstStateChangeReturn ret;
513   GstGDPPay *this = GST_GDP_PAY (element);
514
515   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
516
517   switch (transition) {
518     case GST_STATE_CHANGE_READY_TO_NULL:
519       if (this->caps) {
520         gst_caps_unref (this->caps);
521         this->caps = NULL;
522       }
523       break;
524     default:
525       break;
526   }
527
528   return ret;
529 }
530
531 gboolean
532 gst_gdp_pay_plugin_init (GstPlugin * plugin)
533 {
534   if (!gst_element_register (plugin, "gdppay", GST_RANK_NONE, GST_TYPE_GDP_PAY))
535     return FALSE;
536
537   return TRUE;
538 }