99509e8fc2252aa6ad5d871359efa8ebff8bbe90
[platform/upstream/gstreamer.git] / gst / gdp / gstgdppay.c
1 /* GStreamer
2  * Copyright (C) 2006 Thomas Vander Stichele <thomas at apestaart dot org>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 /**
21  * SECTION:element-gdppay
22  * @see_also: gdpdepay
23  *
24  * This element payloads GStreamer buffers and events using the
25  * GStreamer Data Protocol.
26  *
27  * <refsect2>
28  * |[
29  * gst-launch -v -m videotestsrc num-buffers=50 ! gdppay ! filesink location=test.gdp
30  * ]| This pipeline creates a serialized video stream that can be played back
31  * with the example shown in gdpdepay.
32  * </refsect2>
33  */
34
35 #ifdef HAVE_CONFIG_H
36 #include "config.h"
37 #endif
38
39 #include <gst/dataprotocol/dataprotocol.h>
40
41 #include "gstgdppay.h"
42
43 /* elementfactory information */
44 static const GstElementDetails gdp_pay_details =
45 GST_ELEMENT_DETAILS ("GDP Payloader",
46     "GDP/Payloader",
47     "Payloads GStreamer Data Protocol buffers",
48     "Thomas Vander Stichele <thomas at apestaart dot org>");
49
50 static GstStaticPadTemplate gdp_pay_sink_template =
51 GST_STATIC_PAD_TEMPLATE ("sink",
52     GST_PAD_SINK,
53     GST_PAD_ALWAYS,
54     GST_STATIC_CAPS_ANY);
55
56 static GstStaticPadTemplate gdp_pay_src_template =
57 GST_STATIC_PAD_TEMPLATE ("src",
58     GST_PAD_SRC,
59     GST_PAD_ALWAYS,
60     GST_STATIC_CAPS ("application/x-gdp"));
61
62 GST_DEBUG_CATEGORY_STATIC (gst_gdp_pay_debug);
63 #define GST_CAT_DEFAULT gst_gdp_pay_debug
64
65 #define DEFAULT_CRC_HEADER TRUE
66 #define DEFAULT_CRC_PAYLOAD FALSE
67 #define DEFAULT_VERSION GST_DP_VERSION_1_0
68
69 enum
70 {
71   PROP_0,
72   PROP_CRC_HEADER,
73   PROP_CRC_PAYLOAD,
74   PROP_VERSION,
75 };
76
77 #define _do_init(x) \
78     GST_DEBUG_CATEGORY_INIT (gst_gdp_pay_debug, "gdppay", 0, \
79     "GDP payloader");
80
81 GST_BOILERPLATE_FULL (GstGDPPay, gst_gdp_pay, GstElement,
82     GST_TYPE_ELEMENT, _do_init);
83
84 static void gst_gdp_pay_reset (GstGDPPay * this);
85
86 static GstFlowReturn gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer);
87
88 static gboolean gst_gdp_pay_src_event (GstPad * pad, GstEvent * event);
89
90 static gboolean gst_gdp_pay_sink_event (GstPad * pad, GstEvent * event);
91
92 static GstStateChangeReturn gst_gdp_pay_change_state (GstElement *
93     element, GstStateChange transition);
94
95 static void gst_gdp_pay_set_property (GObject * object, guint prop_id,
96     const GValue * value, GParamSpec * pspec);
97 static void gst_gdp_pay_get_property (GObject * object, guint prop_id,
98     GValue * value, GParamSpec * pspec);
99
100 static void gst_gdp_pay_finalize (GObject * gobject);
101
102 static void
103 gst_gdp_pay_base_init (gpointer g_class)
104 {
105   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
106
107   gst_element_class_set_details (element_class, &gdp_pay_details);
108
109   gst_element_class_add_pad_template (element_class,
110       gst_static_pad_template_get (&gdp_pay_sink_template));
111   gst_element_class_add_pad_template (element_class,
112       gst_static_pad_template_get (&gdp_pay_src_template));
113 }
114
115 static void
116 gst_gdp_pay_class_init (GstGDPPayClass * klass)
117 {
118   GObjectClass *gobject_class;
119
120   GstElementClass *gstelement_class;
121
122   gobject_class = (GObjectClass *) klass;
123   gstelement_class = (GstElementClass *) klass;
124
125   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_gdp_pay_set_property);
126   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_gdp_pay_get_property);
127   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_gdp_pay_finalize);
128
129   g_object_class_install_property (gobject_class, PROP_CRC_HEADER,
130       g_param_spec_boolean ("crc-header", "CRC Header",
131           "Calculate and store a CRC checksum on the header",
132           DEFAULT_CRC_HEADER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
133   g_object_class_install_property (gobject_class, PROP_CRC_PAYLOAD,
134       g_param_spec_boolean ("crc-payload", "CRC Payload",
135           "Calculate and store a CRC checksum on the payload",
136           DEFAULT_CRC_PAYLOAD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
137   g_object_class_install_property (gobject_class, PROP_VERSION,
138       g_param_spec_enum ("version", "Version",
139           "Version of the GStreamer Data Protocol",
140           GST_TYPE_DP_VERSION, DEFAULT_VERSION,
141           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
142
143   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_gdp_pay_change_state);
144 }
145
146 static void
147 gst_gdp_pay_init (GstGDPPay * gdppay, GstGDPPayClass * g_class)
148 {
149   gdppay->sinkpad =
150       gst_pad_new_from_static_template (&gdp_pay_sink_template, "sink");
151   gst_pad_set_chain_function (gdppay->sinkpad,
152       GST_DEBUG_FUNCPTR (gst_gdp_pay_chain));
153   gst_pad_set_event_function (gdppay->sinkpad,
154       GST_DEBUG_FUNCPTR (gst_gdp_pay_sink_event));
155   gst_element_add_pad (GST_ELEMENT (gdppay), gdppay->sinkpad);
156
157   gdppay->srcpad =
158       gst_pad_new_from_static_template (&gdp_pay_src_template, "src");
159   gst_pad_set_event_function (gdppay->srcpad,
160       GST_DEBUG_FUNCPTR (gst_gdp_pay_src_event));
161   gst_element_add_pad (GST_ELEMENT (gdppay), gdppay->srcpad);
162
163   gdppay->crc_header = DEFAULT_CRC_HEADER;
164   gdppay->crc_payload = DEFAULT_CRC_PAYLOAD;
165   gdppay->header_flag = gdppay->crc_header | gdppay->crc_payload;
166   gdppay->version = DEFAULT_VERSION;
167   gdppay->offset = 0;
168
169   gdppay->packetizer = gst_dp_packetizer_new (gdppay->version);
170 }
171
172 static void
173 gst_gdp_pay_finalize (GObject * gobject)
174 {
175   GstGDPPay *this = GST_GDP_PAY (gobject);
176
177   gst_gdp_pay_reset (this);
178   gst_dp_packetizer_free (this->packetizer);
179
180   GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (gobject));
181 }
182
183 static void
184 gst_gdp_pay_reset (GstGDPPay * this)
185 {
186   GST_DEBUG_OBJECT (this, "Resetting GDP object");
187   /* clear the queued buffers */
188   while (this->queue) {
189     GstBuffer *buffer;
190
191     buffer = GST_BUFFER_CAST (this->queue->data);
192
193     /* delete buffer from queue now */
194     this->queue = g_list_delete_link (this->queue, this->queue);
195
196     gst_buffer_unref (buffer);
197   }
198   if (this->caps) {
199     gst_caps_unref (this->caps);
200     this->caps = NULL;
201   }
202   if (this->caps_buf) {
203     gst_buffer_unref (this->caps_buf);
204     this->caps_buf = NULL;
205   }
206   if (this->tag_buf) {
207     gst_buffer_unref (this->tag_buf);
208     this->tag_buf = NULL;
209   }
210   if (this->new_segment_buf) {
211     gst_buffer_unref (this->new_segment_buf);
212     this->new_segment_buf = NULL;
213   }
214   this->sent_streamheader = FALSE;
215   this->offset = 0;
216 }
217
218 /* set OFFSET and OFFSET_END with running count */
219 static void
220 gst_gdp_stamp_buffer (GstGDPPay * this, GstBuffer * buffer)
221 {
222   GST_BUFFER_OFFSET (buffer) = this->offset;
223   GST_BUFFER_OFFSET_END (buffer) = this->offset + GST_BUFFER_SIZE (buffer);
224   this->offset = GST_BUFFER_OFFSET_END (buffer);
225 }
226
227 static GstBuffer *
228 gst_gdp_buffer_from_caps (GstGDPPay * this, GstCaps * caps)
229 {
230   GstBuffer *headerbuf;
231
232   GstBuffer *payloadbuf;
233
234   guint8 *header, *payload;
235
236   guint len;
237
238   if (!this->packetizer->packet_from_caps (caps, this->header_flag, &len,
239           &header, &payload))
240     goto packet_failed;
241
242   GST_LOG_OBJECT (this, "creating GDP header and payload buffer from caps");
243   headerbuf = gst_buffer_new ();
244   gst_buffer_set_data (headerbuf, header, len);
245   GST_BUFFER_MALLOCDATA (headerbuf) = header;
246
247   payloadbuf = gst_buffer_new ();
248   gst_buffer_set_data (payloadbuf, payload,
249       gst_dp_header_payload_length (header));
250   GST_BUFFER_MALLOCDATA (payloadbuf) = payload;
251
252   return gst_buffer_join (headerbuf, payloadbuf);
253
254   /* ERRORS */
255 packet_failed:
256   {
257     GST_WARNING_OBJECT (this, "could not create GDP header from caps");
258     return NULL;
259   }
260 }
261
262 static GstBuffer *
263 gst_gdp_pay_buffer_from_buffer (GstGDPPay * this, GstBuffer * buffer)
264 {
265   GstBuffer *headerbuf;
266
267   guint8 *header;
268
269   guint len;
270
271   if (!this->packetizer->header_from_buffer (buffer, this->header_flag, &len,
272           &header))
273     goto no_buffer;
274
275   GST_LOG_OBJECT (this, "creating GDP header and payload buffer from buffer");
276   headerbuf = gst_buffer_new ();
277   gst_buffer_set_data (headerbuf, header, len);
278   GST_BUFFER_MALLOCDATA (headerbuf) = header;
279
280   /* we do not want to lose the ref on the incoming buffer */
281   gst_buffer_ref (buffer);
282
283   return gst_buffer_join (headerbuf, buffer);
284
285   /* ERRORS */
286 no_buffer:
287   {
288     GST_WARNING_OBJECT (this, "could not create GDP header from buffer");
289     return NULL;
290   }
291 }
292
293 static GstBuffer *
294 gst_gdp_buffer_from_event (GstGDPPay * this, GstEvent * event)
295 {
296   GstBuffer *headerbuf;
297
298   GstBuffer *payloadbuf;
299
300   guint8 *header, *payload;
301
302   guint len;
303
304   gboolean ret;
305
306   ret =
307       this->packetizer->packet_from_event (event, this->header_flag, &len,
308       &header, &payload);
309   if (!ret)
310     goto no_event;
311
312   GST_LOG_OBJECT (this, "creating GDP header and payload buffer from event");
313   headerbuf = gst_buffer_new ();
314   gst_buffer_set_data (headerbuf, header, len);
315   GST_BUFFER_MALLOCDATA (headerbuf) = header;
316
317   payloadbuf = gst_buffer_new ();
318   gst_buffer_set_data (payloadbuf, payload,
319       gst_dp_header_payload_length (header));
320   GST_BUFFER_MALLOCDATA (payloadbuf) = payload;
321
322   return gst_buffer_join (headerbuf, payloadbuf);
323
324   /* ERRORS */
325 no_event:
326   {
327     GST_WARNING_OBJECT (this, "could not create GDP header from event %s (%d)",
328         gst_event_type_get_name (event->type), event->type);
329     return NULL;
330   }
331 }
332
333
334 /* set our caps with streamheader, based on the latest newsegment and caps,
335  * and (possibly) GDP-serialized buffers of the streamheaders on the src pad */
336 static GstFlowReturn
337 gst_gdp_pay_reset_streamheader (GstGDPPay * this)
338 {
339   GstCaps *caps;
340
341   /* We use copies of these to avoid circular refcounts */
342   GstBuffer *new_segment_buf, *caps_buf, *tag_buf;
343
344   GstStructure *structure;
345
346   GstFlowReturn r = GST_FLOW_OK;
347
348   gboolean version_one_zero = TRUE;
349
350   GValue array = { 0 };
351   GValue value = { 0 };
352
353   GST_DEBUG_OBJECT (this, "start");
354   /* In version 0.2, we didn't need or send new segment or tags */
355   if (this->version == GST_DP_VERSION_0_2)
356     version_one_zero = FALSE;
357
358   if (version_one_zero) {
359     if (!this->new_segment_buf || !this->caps_buf) {
360       GST_DEBUG_OBJECT (this, "1.0, missing new_segment or caps, returning");
361       return GST_FLOW_OK;
362     }
363   } else {
364     if (!this->caps_buf) {
365       GST_DEBUG_OBJECT (this, "0.2, missing caps, returning");
366       return GST_FLOW_OK;
367     }
368   }
369
370   /* put copies of the buffers in a fixed list
371    * Stamp the buffers with offset and offset_end as well.
372    * We do this here so the offsets match the order the buffers go out in */
373   g_value_init (&array, GST_TYPE_ARRAY);
374
375   if (version_one_zero) {
376     gst_gdp_stamp_buffer (this, this->new_segment_buf);
377     GST_DEBUG_OBJECT (this, "1.0, appending copy of new segment buffer %p",
378         this->new_segment_buf);
379     new_segment_buf = gst_buffer_copy (this->new_segment_buf);
380     gst_buffer_set_caps (new_segment_buf, NULL);
381     g_value_init (&value, GST_TYPE_BUFFER);
382     gst_value_set_buffer (&value, new_segment_buf);
383     gst_value_array_append_value (&array, &value);
384     g_value_unset (&value);
385     gst_buffer_unref (new_segment_buf);
386
387     if (this->tag_buf) {
388       gst_gdp_stamp_buffer (this, this->tag_buf);
389       GST_DEBUG_OBJECT (this, "1.0, appending copy of tag buffer %p",
390           this->tag_buf);
391       tag_buf = gst_buffer_copy (this->tag_buf);
392       gst_buffer_set_caps (tag_buf, NULL);
393       g_value_init (&value, GST_TYPE_BUFFER);
394       gst_value_set_buffer (&value, tag_buf);
395       gst_value_array_append_value (&array, &value);
396       g_value_unset (&value);
397       gst_buffer_unref (tag_buf);
398     }
399   }
400
401   gst_gdp_stamp_buffer (this, this->caps_buf);
402   GST_DEBUG_OBJECT (this, "appending copy of caps buffer %p", this->caps_buf);
403   caps_buf = gst_buffer_copy (this->caps_buf);
404   gst_buffer_set_caps (caps_buf, NULL);
405   g_value_init (&value, GST_TYPE_BUFFER);
406   gst_value_set_buffer (&value, caps_buf);
407   gst_value_array_append_value (&array, &value);
408   g_value_unset (&value);
409   gst_buffer_unref (caps_buf);
410
411   /* we also need to add GDP serializations of the streamheaders of the
412    * incoming caps */
413   structure = gst_caps_get_structure (this->caps, 0);
414   if (gst_structure_has_field (structure, "streamheader")) {
415     const GValue *sh;
416
417     GArray *buffers;
418
419     GstBuffer *buffer;
420
421     int i;
422
423     sh = gst_structure_get_value (structure, "streamheader");
424     buffers = g_value_peek_pointer (sh);
425     GST_DEBUG_OBJECT (this,
426         "Need to serialize %d incoming streamheader buffers on ours",
427         buffers->len);
428     for (i = 0; i < buffers->len; ++i) {
429       GValue *bufval;
430
431       GstBuffer *outbuffer;
432
433       bufval = &g_array_index (buffers, GValue, i);
434       buffer = g_value_peek_pointer (bufval);
435       outbuffer = gst_gdp_pay_buffer_from_buffer (this, buffer);
436       if (!outbuffer) {
437         g_value_unset (&array);
438         goto no_buffer;
439       }
440
441       g_value_init (&value, GST_TYPE_BUFFER);
442       gst_value_set_buffer (&value, outbuffer);
443       gst_value_array_append_value (&array, &value);
444       g_value_unset (&value);
445
446       gst_buffer_unref (outbuffer);
447     }
448   } else {
449     GST_DEBUG_OBJECT (this, "no streamheader to serialize");
450   }
451
452   GST_DEBUG_OBJECT (this, "%d serialized buffers on streamheaders",
453       gst_value_array_get_size (&array));
454   caps = gst_caps_from_string ("application/x-gdp");
455   structure = gst_caps_get_structure (caps, 0);
456
457   gst_structure_set_value (structure, "streamheader", &array);
458   g_value_unset (&array);
459
460   GST_DEBUG_OBJECT (this, "Setting caps on src pad %" GST_PTR_FORMAT, caps);
461   gst_pad_set_caps (this->srcpad, caps);
462   gst_buffer_set_caps (this->caps_buf, caps);
463   gst_buffer_set_caps (this->new_segment_buf, caps);
464
465   /* if these are our first ever buffers, send out new_segment first */
466   if (!this->sent_streamheader) {
467     GstEvent *event =
468         gst_event_new_new_segment (TRUE, 1.0, GST_FORMAT_BYTES, 0, -1, 0);
469     GST_DEBUG_OBJECT (this, "Sending out new_segment event %p", event);
470     if (!gst_pad_push_event (this->srcpad, event)) {
471       GST_WARNING_OBJECT (this, "pushing new segment failed");
472       r = GST_FLOW_ERROR;
473       goto done;
474     }
475   }
476
477   /* push out these streamheader buffers, then flush our internal queue */
478   GST_DEBUG_OBJECT (this, "Pushing GDP new_segment buffer %p with offset %"
479       G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT, this->new_segment_buf,
480       GST_BUFFER_OFFSET (this->new_segment_buf),
481       GST_BUFFER_OFFSET_END (this->new_segment_buf));
482   /* we stored these bufs with refcount 1, so make sure we keep a ref */
483   r = gst_pad_push (this->srcpad, gst_buffer_ref (this->new_segment_buf));
484   if (r != GST_FLOW_OK) {
485     GST_WARNING_OBJECT (this, "pushing GDP newsegment buffer returned %d", r);
486     goto done;
487   }
488   if (this->tag_buf) {
489     GST_DEBUG_OBJECT (this, "Pushing GDP tag buffer %p", this->tag_buf);
490     /* we stored these bufs with refcount 1, so make sure we keep a ref */
491     r = gst_pad_push (this->srcpad, gst_buffer_ref (this->tag_buf));
492     if (r != GST_FLOW_OK) {
493       GST_WARNING_OBJECT (this, "pushing GDP tag buffer returned %d", r);
494       goto done;
495     }
496   }
497   GST_DEBUG_OBJECT (this, "Pushing GDP caps buffer %p", this->caps_buf);
498   r = gst_pad_push (this->srcpad, gst_buffer_ref (this->caps_buf));
499   if (r != GST_FLOW_OK) {
500     GST_WARNING_OBJECT (this, "pushing GDP caps buffer returned %d", r);
501     goto done;
502   }
503   this->sent_streamheader = TRUE;
504   GST_DEBUG_OBJECT (this, "need to push %d queued buffers",
505       g_list_length (this->queue));
506   while (this->queue) {
507     GstBuffer *buffer;
508
509     buffer = GST_BUFFER_CAST (this->queue->data);
510     GST_DEBUG_OBJECT (this, "Pushing queued GDP buffer %p", buffer);
511
512     /* delete buffer from queue now */
513     this->queue = g_list_delete_link (this->queue, this->queue);
514
515     /* set caps and push */
516     gst_buffer_set_caps (buffer, caps);
517     r = gst_pad_push (this->srcpad, buffer);
518     if (r != GST_FLOW_OK) {
519       GST_WARNING_OBJECT (this, "pushing queued GDP buffer returned %d", r);
520       goto done;
521     }
522   }
523
524 done:
525   gst_caps_unref (caps);
526   GST_DEBUG_OBJECT (this, "stop");
527   return r;
528
529   /* ERRORS */
530 no_buffer:
531   {
532     GST_ELEMENT_ERROR (this, STREAM, FORMAT, (NULL),
533         ("failed to create GDP buffer from streamheader"));
534     return GST_FLOW_ERROR;
535   }
536 }
537
538 /* queue a buffer internally if we haven't sent streamheader buffers yet;
539  * otherwise, just push on, this takes ownership of the buffer. */
540 static GstFlowReturn
541 gst_gdp_queue_buffer (GstGDPPay * this, GstBuffer * buffer)
542 {
543   if (this->sent_streamheader) {
544     GST_LOG_OBJECT (this, "Pushing GDP buffer %p, caps %" GST_PTR_FORMAT,
545         buffer, this->caps);
546     return gst_pad_push (this->srcpad, buffer);
547   }
548
549   /* store it on an internal queue. buffer remains reffed. */
550   this->queue = g_list_append (this->queue, buffer);
551   GST_DEBUG_OBJECT (this, "streamheader not sent yet, "
552       "queued buffer %p, now %d buffers queued",
553       buffer, g_list_length (this->queue));
554
555   return GST_FLOW_OK;
556 }
557
558 static GstFlowReturn
559 gst_gdp_pay_chain (GstPad * pad, GstBuffer * buffer)
560 {
561   GstGDPPay *this;
562
563   GstCaps *caps;
564
565   GstBuffer *outbuffer;
566
567   GstFlowReturn ret;
568
569   this = GST_GDP_PAY (gst_pad_get_parent (pad));
570
571   /* we should have received a new_segment before, otherwise it's a bug.
572    * fake one in that case */
573   if (!this->new_segment_buf) {
574     GstEvent *event;
575
576     GST_WARNING_OBJECT (this,
577         "did not receive new-segment before first buffer");
578     event = gst_event_new_new_segment (TRUE, 1.0, GST_FORMAT_BYTES, 0, -1, 0);
579     outbuffer = gst_gdp_buffer_from_event (this, event);
580     gst_event_unref (event);
581
582     /* GDP 0.2 doesn't know about new-segment, so this is not fatal */
583     if (!outbuffer) {
584       GST_ELEMENT_WARNING (this, STREAM, ENCODE, (NULL),
585           ("Could not create GDP buffer from new segment event"));
586     } else {
587       GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
588       GST_BUFFER_DURATION (outbuffer) = 0;
589       GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_IN_CAPS);
590       GST_DEBUG_OBJECT (this, "Storing buffer %p as new_segment_buf",
591           outbuffer);
592       this->new_segment_buf = outbuffer;
593     }
594   }
595
596   /* make sure we've received caps before */
597   caps = gst_buffer_get_caps (buffer);
598   if (!this->caps && !caps)
599     goto no_caps;
600
601   /* if the caps have changed, process caps first */
602   if (caps && !gst_caps_is_equal (this->caps, caps)) {
603     GST_LOG_OBJECT (this, "caps changed to %p, %" GST_PTR_FORMAT, caps, caps);
604     gst_caps_replace (&(this->caps), caps);
605     outbuffer = gst_gdp_buffer_from_caps (this, caps);
606     if (!outbuffer)
607       goto no_caps_buffer;
608
609     GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
610     GST_BUFFER_DURATION (outbuffer) = 0;
611     GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_IN_CAPS);
612
613     if (this->caps_buf)
614       gst_buffer_unref (this->caps_buf);
615     this->caps_buf = outbuffer;
616     gst_gdp_pay_reset_streamheader (this);
617   }
618
619   if (caps)
620     gst_caps_unref (caps);
621
622   /* create a GDP header packet,
623    * then create a GST buffer of the header packet and the buffer contents */
624   outbuffer = gst_gdp_pay_buffer_from_buffer (this, buffer);
625   if (!outbuffer)
626     goto no_buffer;
627
628   /* If the incoming buffer is IN_CAPS, that means we have it on the caps
629    * as streamheader, and we have serialized a GDP version of it and put it
630    * on our caps */
631   if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_IN_CAPS)) {
632     GST_DEBUG_OBJECT (this, "Setting IN_CAPS flag on outgoing buffer %p",
633         outbuffer);
634     GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_IN_CAPS);
635   }
636
637   gst_gdp_stamp_buffer (this, outbuffer);
638   GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer);
639   GST_BUFFER_DURATION (outbuffer) = GST_BUFFER_DURATION (buffer);
640
641   ret = gst_gdp_queue_buffer (this, outbuffer);
642
643 done:
644   gst_buffer_unref (buffer);
645   gst_object_unref (this);
646   return ret;
647
648   /* ERRORS */
649 no_caps:
650   {
651     /* when returning a fatal error as a GstFlowReturn we must post an error
652      * message */
653     GST_ELEMENT_ERROR (this, STREAM, FORMAT, (NULL),
654         ("first received buffer does not have caps set"));
655     if (caps)
656       gst_caps_unref (caps);
657     ret = GST_FLOW_NOT_NEGOTIATED;
658     goto done;
659   }
660 no_caps_buffer:
661   {
662     GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
663         ("Could not create GDP buffer from caps %" GST_PTR_FORMAT, caps));
664     gst_caps_unref (caps);
665     ret = GST_FLOW_ERROR;
666     goto done;
667   }
668 no_buffer:
669   {
670     GST_ELEMENT_ERROR (this, STREAM, ENCODE, (NULL),
671         ("Could not create GDP buffer from buffer"));
672     ret = GST_FLOW_ERROR;
673     goto done;
674   }
675 }
676
677 static gboolean
678 gst_gdp_pay_sink_event (GstPad * pad, GstEvent * event)
679 {
680   GstBuffer *outbuffer;
681
682   GstGDPPay *this = GST_GDP_PAY (gst_pad_get_parent (pad));
683
684   GstFlowReturn flowret;
685
686   gboolean ret = TRUE;
687
688   GST_DEBUG_OBJECT (this, "received event %p of type %s (%d)",
689       event, gst_event_type_get_name (event->type), event->type);
690
691   /* now turn the event into a buffer */
692   outbuffer = gst_gdp_buffer_from_event (this, event);
693   if (!outbuffer)
694     goto no_outbuffer;
695
696   GST_BUFFER_TIMESTAMP (outbuffer) = GST_EVENT_TIMESTAMP (event);
697   GST_BUFFER_DURATION (outbuffer) = 0;
698
699   /* if we got a new segment or tag event, we should put it on our streamheader,
700    * and not send it on */
701   switch (GST_EVENT_TYPE (event)) {
702     case GST_EVENT_NEWSEGMENT:
703       GST_DEBUG_OBJECT (this, "Storing in caps buffer %p as new_segment_buf",
704           outbuffer);
705
706       if (this->new_segment_buf)
707         gst_buffer_unref (this->new_segment_buf);
708       this->new_segment_buf = outbuffer;
709
710       GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_IN_CAPS);
711       gst_gdp_pay_reset_streamheader (this);
712       break;
713     case GST_EVENT_TAG:
714       GST_DEBUG_OBJECT (this, "Storing in caps buffer %p as tag_buf",
715           outbuffer);
716
717       if (this->tag_buf)
718         gst_buffer_unref (this->tag_buf);
719       this->tag_buf = outbuffer;
720
721       GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_IN_CAPS);
722       gst_gdp_pay_reset_streamheader (this);
723       break;
724     default:
725       GST_DEBUG_OBJECT (this, "queuing GDP buffer %p of event %p", outbuffer,
726           event);
727       flowret = gst_gdp_queue_buffer (this, outbuffer);
728       if (flowret != GST_FLOW_OK)
729         goto push_error;
730       break;
731   }
732
733   /* if we have EOS, we should send on EOS ourselves */
734   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
735     GST_DEBUG_OBJECT (this, "Sending on EOS event %p", event);
736     /* ref, we unref later again */
737     ret = gst_pad_push_event (this->srcpad, gst_event_ref (event));
738   }
739
740 done:
741   gst_event_unref (event);
742   gst_object_unref (this);
743
744   return ret;
745
746   /* ERRORS */
747 no_outbuffer:
748   {
749     GST_ELEMENT_WARNING (this, STREAM, ENCODE, (NULL),
750         ("Could not create GDP buffer from received event (type %s)",
751             gst_event_type_get_name (event->type)));
752     ret = FALSE;
753     goto done;
754   }
755 push_error:
756   {
757     GST_WARNING_OBJECT (this, "queueing GDP event buffer returned %d", flowret);
758     ret = FALSE;
759     goto done;
760   }
761 }
762
763 static gboolean
764 gst_gdp_pay_src_event (GstPad * pad, GstEvent * event)
765 {
766   GstGDPPay *this;
767
768   gboolean res = TRUE;
769
770   this = GST_GDP_PAY (gst_pad_get_parent (pad));
771
772   switch (GST_EVENT_TYPE (event)) {
773     case GST_EVENT_SEEK:
774       /* we refuse seek for now. */
775       gst_event_unref (event);
776       res = FALSE;
777       break;
778     case GST_EVENT_QOS:
779     case GST_EVENT_NAVIGATION:
780     default:
781       /* everything else is passed */
782       res = gst_pad_push_event (this->sinkpad, event);
783       break;
784   }
785   gst_object_unref (this);
786
787   return res;
788 }
789
790 static void
791 gst_gdp_pay_set_property (GObject * object, guint prop_id,
792     const GValue * value, GParamSpec * pspec)
793 {
794   GstGDPPay *this;
795
796   g_return_if_fail (GST_IS_GDP_PAY (object));
797   this = GST_GDP_PAY (object);
798
799   switch (prop_id) {
800     case PROP_CRC_HEADER:
801       this->crc_header =
802           g_value_get_boolean (value) ? GST_DP_HEADER_FLAG_CRC_HEADER : 0;
803       this->header_flag = this->crc_header | this->crc_payload;
804       break;
805     case PROP_CRC_PAYLOAD:
806       this->crc_payload =
807           g_value_get_boolean (value) ? GST_DP_HEADER_FLAG_CRC_PAYLOAD : 0;
808       this->header_flag = this->crc_header | this->crc_payload;
809       break;
810     case PROP_VERSION:
811       this->version = g_value_get_enum (value);
812       break;
813     default:
814       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
815       break;
816   }
817 }
818
819 static void
820 gst_gdp_pay_get_property (GObject * object, guint prop_id,
821     GValue * value, GParamSpec * pspec)
822 {
823   GstGDPPay *this;
824
825   g_return_if_fail (GST_IS_GDP_PAY (object));
826   this = GST_GDP_PAY (object);
827
828   switch (prop_id) {
829     case PROP_CRC_HEADER:
830       g_value_set_boolean (value, this->crc_header);
831       break;
832     case PROP_CRC_PAYLOAD:
833       g_value_set_boolean (value, this->crc_payload);
834       break;
835     case PROP_VERSION:
836       g_value_set_enum (value, this->version);
837       break;
838     default:
839       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
840       break;
841   }
842 }
843
844 static GstStateChangeReturn
845 gst_gdp_pay_change_state (GstElement * element, GstStateChange transition)
846 {
847   GstStateChangeReturn ret;
848
849   GstGDPPay *this = GST_GDP_PAY (element);
850
851   switch (transition) {
852     case GST_STATE_CHANGE_READY_TO_PAUSED:
853       break;
854     default:
855       break;
856   }
857
858   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
859
860   switch (transition) {
861     case GST_STATE_CHANGE_PAUSED_TO_READY:
862       gst_gdp_pay_reset (this);
863       break;
864     default:
865       break;
866   }
867
868   return ret;
869 }
870
871 gboolean
872 gst_gdp_pay_plugin_init (GstPlugin * plugin)
873 {
874   if (!gst_element_register (plugin, "gdppay", GST_RANK_NONE, GST_TYPE_GDP_PAY))
875     return FALSE;
876
877   return TRUE;
878 }