eef7acf020245f21140ab3e9c63fa9bae0a01d8e
[platform/upstream/gstreamer.git] / ext / kate / gstkateparse.c
1 /* GStreamer
2  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
3  * Copyright (C) 2006 Andy Wingo <wingo@pobox.com>
4  * Copyright (C) 2008 Vincent Penquerc'h <ogg.k.ogg.k@googlemail.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 /**
23  * SECTION:element-kateparse
24  * @short_description: parses kate streams
25  * @see_also: katedec, vorbisparse, oggdemux, theoraparse
26  *
27  * <refsect2>
28  * <para>
29  * The kateparse element will parse the header packets of the Kate
30  * stream and put them as the streamheader in the caps. This is used in the
31  * multifdsink case where you want to stream live kate streams to multiple
32  * clients, each client has to receive the streamheaders first before they can
33  * consume the kate packets.
34  * </para>
35  * <para>
36  * This element also makes sure that the buffers that it pushes out are properly
37  * timestamped and that their offset and offset_end are set. The buffers that
38  * kateparse outputs have all of the metadata that oggmux expects to receive,
39  * which allows you to (for example) remux an ogg/kate file.
40  * </para>
41  * <title>Example pipelines</title>
42  * <para>
43  * <programlisting>
44  * gst-launch -v filesrc location=kate.ogg ! oggdemux ! kateparse ! fakesink
45  * </programlisting>
46  * This pipeline shows that the streamheader is set in the caps, and that each
47  * buffer has the timestamp, duration, offset, and offset_end set.
48  * </para>
49  * <para>
50  * <programlisting>
51  * gst-launch filesrc location=kate.ogg ! oggdemux ! kateparse \
52  *            ! oggmux ! filesink location=kate-remuxed.ogg
53  * </programlisting>
54  * This pipeline shows remuxing. kate-remuxed.ogg might not be exactly the same
55  * as kate.ogg, but they should produce exactly the same decoded data.
56  * </para>
57  * </refsect2>
58  *
59  */
60
61 #ifdef HAVE_CONFIG_H
62 #  include "config.h"
63 #endif
64
65 #include "gstkate.h"
66 #include "gstkateutil.h"
67 #include "gstkateparse.h"
68
69 GST_DEBUG_CATEGORY_EXTERN (gst_kateparse_debug);
70 #define GST_CAT_DEFAULT gst_kateparse_debug
71
72 static GstStaticPadTemplate gst_kate_parse_sink_factory =
73     GST_STATIC_PAD_TEMPLATE ("sink",
74     GST_PAD_SINK,
75     GST_PAD_ALWAYS,
76     GST_STATIC_CAPS ("subtitle/x-kate; application/x-kate")
77     );
78
79 static GstStaticPadTemplate gst_kate_parse_src_factory =
80     GST_STATIC_PAD_TEMPLATE ("src",
81     GST_PAD_SRC,
82     GST_PAD_ALWAYS,
83     GST_STATIC_CAPS ("subtitle/x-kate; application/x-kate")
84     );
85
86 #define gst_kate_parse_parent_class parent_class
87 G_DEFINE_TYPE (GstKateParse, gst_kate_parse, GST_TYPE_ELEMENT);
88
89 static GstFlowReturn gst_kate_parse_chain (GstPad * pad, GstObject * parent,
90     GstBuffer * buffer);
91 static GstStateChangeReturn gst_kate_parse_change_state (GstElement * element,
92     GstStateChange transition);
93 static gboolean gst_kate_parse_sink_event (GstPad * pad, GstObject * parent,
94     GstEvent * event);
95 static gboolean gst_kate_parse_src_query (GstPad * pad, GstObject * parent,
96     GstQuery * query);
97 #if 0
98 static gboolean gst_kate_parse_convert (GstPad * pad,
99     GstFormat src_format, gint64 src_value,
100     GstFormat * dest_format, gint64 * dest_value);
101 #endif
102 static GstFlowReturn gst_kate_parse_parse_packet (GstKateParse * parse,
103     GstBuffer * buf);
104
105 static void
106 gst_kate_parse_class_init (GstKateParseClass * klass)
107 {
108   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
109
110   gstelement_class->change_state = gst_kate_parse_change_state;
111
112   gst_element_class_add_pad_template (gstelement_class,
113       gst_static_pad_template_get (&gst_kate_parse_src_factory));
114   gst_element_class_add_pad_template (gstelement_class,
115       gst_static_pad_template_get (&gst_kate_parse_sink_factory));
116
117   gst_element_class_set_static_metadata (gstelement_class, "Kate stream parser",
118       "Codec/Parser/Subtitle",
119       "parse raw kate streams",
120       "Vincent Penquerc'h <ogg.k.ogg.k at googlemail dot com>");
121
122   klass->parse_packet = GST_DEBUG_FUNCPTR (gst_kate_parse_parse_packet);
123 }
124
125 static void
126 gst_kate_parse_init (GstKateParse * parse)
127 {
128   parse->sinkpad =
129       gst_pad_new_from_static_template (&gst_kate_parse_sink_factory, "sink");
130   gst_pad_set_chain_function (parse->sinkpad,
131       GST_DEBUG_FUNCPTR (gst_kate_parse_chain));
132   gst_pad_set_event_function (parse->sinkpad,
133       GST_DEBUG_FUNCPTR (gst_kate_parse_sink_event));
134   gst_element_add_pad (GST_ELEMENT (parse), parse->sinkpad);
135
136   parse->srcpad =
137       gst_pad_new_from_static_template (&gst_kate_parse_src_factory, "src");
138   gst_pad_set_query_function (parse->srcpad,
139       GST_DEBUG_FUNCPTR (gst_kate_parse_src_query));
140   gst_element_add_pad (GST_ELEMENT (parse), parse->srcpad);
141 }
142
143 static void
144 gst_kate_parse_drain_event_queue (GstKateParse * parse)
145 {
146   while (parse->event_queue->length) {
147     GstEvent *event;
148
149     event = GST_EVENT_CAST (g_queue_pop_head (parse->event_queue));
150     gst_pad_event_default (parse->sinkpad, NULL, event);
151   }
152 }
153
154 static GstFlowReturn
155 gst_kate_parse_push_headers (GstKateParse * parse)
156 {
157   /* mark and put on caps */
158   GstCaps *caps;
159   GstBuffer *outbuf;
160   kate_packet packet;
161   GList *headers, *outbuf_list = NULL;
162   int ret;
163   gboolean res;
164
165   /* get the headers into the caps, passing them to kate as we go */
166   caps =
167       gst_kate_util_set_header_on_caps (&parse->element,
168       gst_pad_get_current_caps (parse->sinkpad), parse->streamheader);
169
170   if (G_UNLIKELY (!caps)) {
171     GST_ELEMENT_ERROR (parse, STREAM, DECODE, (NULL),
172         ("Failed to set headers on caps"));
173     return GST_FLOW_ERROR;
174   }
175
176   GST_DEBUG_OBJECT (parse, "here are the caps: %" GST_PTR_FORMAT, caps);
177   res = gst_pad_set_caps (parse->srcpad, caps);
178   gst_caps_unref (caps);
179   if (G_UNLIKELY (!res)) {
180     GST_WARNING_OBJECT (parse->srcpad, "Failed to set caps on source pad");
181     return GST_FLOW_NOT_NEGOTIATED;
182   }
183
184   headers = parse->streamheader;
185   while (headers) {
186     GstMapInfo info;
187
188     outbuf = GST_BUFFER_CAST (headers->data);
189
190     if (!gst_buffer_map (outbuf, &info, GST_MAP_READ)) {
191       GST_WARNING_OBJECT (outbuf, "Failed to map buffer");
192       continue;
193     }
194
195     kate_packet_wrap (&packet, info.size, info.data);
196     ret = kate_decode_headerin (&parse->ki, &parse->kc, &packet);
197     if (G_UNLIKELY (ret < 0)) {
198       GST_WARNING_OBJECT (parse, "Failed to decode header: %s",
199           gst_kate_util_get_error_message (ret));
200     }
201     gst_buffer_unmap (outbuf, &info);
202     /* takes ownership of outbuf, which was previously in parse->streamheader */
203     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_HEADER);
204     outbuf_list = g_list_append (outbuf_list, outbuf);
205     headers = headers->next;
206   }
207
208   /* first process queued events */
209   gst_kate_parse_drain_event_queue (parse);
210
211   /* push out buffers, ignoring return value... */
212   headers = outbuf_list;
213   while (headers) {
214     outbuf = GST_BUFFER_CAST (headers->data);
215     gst_pad_push (parse->srcpad, outbuf);
216     headers = headers->next;
217   }
218
219   g_list_free (outbuf_list);
220   g_list_free (parse->streamheader);
221   parse->streamheader = NULL;
222
223   parse->streamheader_sent = TRUE;
224
225   return GST_FLOW_OK;
226 }
227
228 static void
229 gst_kate_parse_clear_queue (GstKateParse * parse)
230 {
231   GST_DEBUG_OBJECT (parse, "Clearing queue");
232   while (parse->buffer_queue->length) {
233     GstBuffer *buf;
234
235     buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));
236     gst_buffer_unref (buf);
237   }
238   while (parse->event_queue->length) {
239     GstEvent *event;
240
241     event = GST_EVENT_CAST (g_queue_pop_head (parse->event_queue));
242     gst_event_unref (event);
243   }
244 }
245
246 static GstFlowReturn
247 gst_kate_parse_push_buffer (GstKateParse * parse, GstBuffer * buf,
248     gint64 granulepos)
249 {
250   GST_LOG_OBJECT (parse, "granulepos %16" G_GINT64_MODIFIER "x", granulepos);
251   if (granulepos < 0) {
252     /* packets coming not from Ogg won't have a granpos in the offset end,
253        so we have to synthesize one here - only problem is we don't know
254        the backlink - pretend there's none for now */
255     GST_INFO_OBJECT (parse, "No granulepos on buffer, synthesizing one");
256     granulepos =
257         kate_duration_granule (&parse->ki,
258         GST_BUFFER_TIMESTAMP (buf) /
259         (double) GST_SECOND) << kate_granule_shift (&parse->ki);
260   }
261   GST_BUFFER_OFFSET (buf) =
262       kate_granule_time (&parse->ki, granulepos) * GST_SECOND;
263   GST_BUFFER_OFFSET_END (buf) = granulepos;
264   GST_BUFFER_TIMESTAMP (buf) = GST_BUFFER_OFFSET (buf);
265
266   return gst_pad_push (parse->srcpad, buf);
267 }
268
269 static GstFlowReturn
270 gst_kate_parse_drain_queue_prematurely (GstKateParse * parse)
271 {
272   GstFlowReturn ret = GST_FLOW_OK;
273
274   /* got an EOS event, make sure to push out any buffers that were in the queue
275    * -- won't normally be the case, but this catches the
276    * didn't-get-a-granulepos-on-the-last-packet case. Assuming a continuous
277    * stream. */
278
279   /* if we got EOS before any buffers came, go ahead and push the other events
280    * first */
281   gst_kate_parse_drain_event_queue (parse);
282
283   while (!g_queue_is_empty (parse->buffer_queue)) {
284     GstBuffer *buf;
285     gint64 granpos;
286
287     buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));
288
289     granpos = GST_BUFFER_OFFSET_END (buf);
290     ret = gst_kate_parse_push_buffer (parse, buf, granpos);
291
292     if (ret != GST_FLOW_OK)
293       goto done;
294   }
295
296   g_assert (g_queue_is_empty (parse->buffer_queue));
297
298 done:
299   return ret;
300 }
301
302 static GstFlowReturn
303 gst_kate_parse_drain_queue (GstKateParse * parse, gint64 granulepos)
304 {
305   GstFlowReturn ret = GST_FLOW_OK;
306
307   if (!g_queue_is_empty (parse->buffer_queue)) {
308     GstBuffer *buf;
309     buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));
310     ret = gst_kate_parse_push_buffer (parse, buf, granulepos);
311
312     if (ret != GST_FLOW_OK)
313       goto done;
314   }
315   g_assert (g_queue_is_empty (parse->buffer_queue));
316
317 done:
318   return ret;
319 }
320
321 static GstFlowReturn
322 gst_kate_parse_queue_buffer (GstKateParse * parse, GstBuffer * buf)
323 {
324   GstFlowReturn ret = GST_FLOW_OK;
325   gint64 granpos;
326
327   buf = gst_buffer_make_writable (buf);
328
329   /* oggdemux stores the granule pos in the offset end */
330   granpos = GST_BUFFER_OFFSET_END (buf);
331   GST_LOG_OBJECT (parse, "granpos %16" G_GINT64_MODIFIER "x", granpos);
332   g_queue_push_tail (parse->buffer_queue, buf);
333
334 #if 1
335   /* if getting buffers from matroska, we won't have a granpos here... */
336   //if (GST_BUFFER_OFFSET_END_IS_VALID (buf)) {
337   ret = gst_kate_parse_drain_queue (parse, granpos);
338   //}
339 #else
340   if (granpos >= 0) {
341     ret = gst_kate_parse_drain_queue (parse, granpos);
342   } else {
343     GST_ELEMENT_ERROR (parse, STREAM, DECODE, (NULL),
344         ("Bad granulepos %" G_GINT64_FORMAT, granpos));
345     ret = GST_FLOW_ERROR;
346   }
347 #endif
348
349   return ret;
350 }
351
352 static GstFlowReturn
353 gst_kate_parse_parse_packet (GstKateParse * parse, GstBuffer * buf)
354 {
355   GstFlowReturn ret = GST_FLOW_OK;
356   guint8 header[1];
357   gsize size;
358
359   g_assert (parse);
360
361   parse->packetno++;
362
363   size = gst_buffer_extract (buf, 0, header, 1);
364
365   GST_LOG_OBJECT (parse, "Got packet %02x, %zu bytes",
366       size ? header[0] : -1, gst_buffer_get_size (buf));
367
368   if (size > 0 && header[0] & 0x80) {
369     GST_DEBUG_OBJECT (parse, "Found header %02x", header[0]);
370     /* if 0x80 is set, it's streamheader,
371      * so put it on the streamheader list and return */
372     parse->streamheader = g_list_append (parse->streamheader, buf);
373     ret = GST_FLOW_OK;
374   } else {
375     if (!parse->streamheader_sent) {
376       GST_DEBUG_OBJECT (parse, "Found non header, pushing headers seen so far");
377       ret = gst_kate_parse_push_headers (parse);
378     }
379
380     if (ret == GST_FLOW_OK) {
381       ret = gst_kate_parse_queue_buffer (parse, buf);
382     }
383   }
384
385   return ret;
386 }
387
388 static GstFlowReturn
389 gst_kate_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
390 {
391   GstKateParseClass *klass;
392   GstKateParse *parse;
393
394   parse = GST_KATE_PARSE (GST_PAD_PARENT (pad));
395   klass = GST_KATE_PARSE_CLASS (G_OBJECT_GET_CLASS (parse));
396
397   g_assert (klass->parse_packet != NULL);
398
399   if (G_UNLIKELY (!gst_pad_has_current_caps (pad)))
400     return GST_FLOW_NOT_NEGOTIATED;
401
402   return klass->parse_packet (parse, buffer);
403 }
404
405 static gboolean
406 gst_kate_parse_queue_event (GstKateParse * parse, GstEvent * event)
407 {
408   GstFlowReturn ret = TRUE;
409
410   g_queue_push_tail (parse->event_queue, event);
411
412   return ret;
413 }
414
415 static gboolean
416 gst_kate_parse_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
417 {
418   gboolean ret;
419   GstKateParse *parse;
420
421   parse = GST_KATE_PARSE (parent);
422
423   switch (GST_EVENT_TYPE (event)) {
424     case GST_EVENT_FLUSH_START:
425       gst_kate_parse_clear_queue (parse);
426       ret = gst_pad_event_default (pad, parent, event);
427       break;
428     case GST_EVENT_EOS:
429       if (!parse->streamheader_sent) {
430         GST_DEBUG_OBJECT (parse, "Got EOS, pushing headers seen so far");
431         ret = gst_kate_parse_push_headers (parse);
432         if (ret != GST_FLOW_OK)
433           break;
434       }
435       gst_kate_parse_drain_queue_prematurely (parse);
436       ret = gst_pad_event_default (pad, parent, event);
437       break;
438     default:
439       if (!parse->streamheader_sent && GST_EVENT_IS_SERIALIZED (event))
440         ret = gst_kate_parse_queue_event (parse, event);
441       else
442         ret = gst_pad_event_default (pad, parent, event);
443       break;
444   }
445
446   return ret;
447 }
448
449 #if 0
450 static gboolean
451 gst_kate_parse_convert (GstPad * pad,
452     GstFormat src_format, gint64 src_value,
453     GstFormat * dest_format, gint64 * dest_value)
454 {
455   gboolean res = TRUE;
456   GstKateParse *parse;
457
458   parse = GST_KATE_PARSE (GST_PAD_PARENT (pad));
459
460   /* fixme: assumes atomic access to lots of instance variables modified from
461    * the streaming thread, including 64-bit variables */
462
463   if (!parse->streamheader_sent)
464     return FALSE;
465
466   if (src_format == *dest_format) {
467     *dest_value = src_value;
468     return TRUE;
469   }
470
471   if (parse->sinkpad == pad &&
472       (src_format == GST_FORMAT_BYTES || *dest_format == GST_FORMAT_BYTES))
473     return FALSE;
474
475   switch (src_format) {
476     case GST_FORMAT_TIME:
477       switch (*dest_format) {
478         default:
479           res = FALSE;
480       }
481       break;
482     case GST_FORMAT_DEFAULT:
483       switch (*dest_format) {
484         case GST_FORMAT_TIME:
485           *dest_value = kate_granule_time (&parse->ki, src_value) * GST_SECOND;
486           break;
487         default:
488           res = FALSE;
489       }
490       break;
491     default:
492       res = FALSE;
493   }
494
495   return res;
496 }
497 #endif
498
499 static gboolean
500 gst_kate_parse_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
501 {
502 #if 1
503   // TODO
504   GST_WARNING ("gst_kate_parse_src_query");
505   return FALSE;
506 #else
507   gint64 granulepos;
508   GstKateParse *parse;
509   gboolean res = FALSE;
510
511   parse = GST_KATE_PARSE (parent);
512
513   switch (GST_QUERY_TYPE (query)) {
514     case GST_QUERY_POSITION:
515     {
516       GstFormat format;
517       gint64 value;
518
519       granulepos = parse->prev_granulepos;
520
521       gst_query_parse_position (query, &format, NULL);
522
523       /* and convert to the final format */
524       if (!(res =
525               gst_kate_parse_convert (pad, GST_FORMAT_DEFAULT, granulepos,
526                   &format, &value)))
527         goto error;
528
529       /* fixme: support segments
530          value = (value - parse->segment_start) + parse->segment_time;
531        */
532
533       gst_query_set_position (query, format, value);
534
535       GST_LOG_OBJECT (parse, "query %p: peer returned granulepos: %"
536           G_GUINT64_FORMAT " - we return %" G_GUINT64_FORMAT " (format %u)",
537           query, granulepos, value, format);
538
539       break;
540     }
541     case GST_QUERY_DURATION:
542     {
543       /* fixme: not threadsafe */
544       /* query peer for total length */
545       if (!gst_pad_is_linked (parse->sinkpad)) {
546         GST_WARNING_OBJECT (parse, "sink pad %" GST_PTR_FORMAT " is not linked",
547             parse->sinkpad);
548         goto error;
549       }
550       if (!(res = gst_pad_query (GST_PAD_PEER (parse->sinkpad), query)))
551         goto error;
552       break;
553     }
554     case GST_QUERY_CONVERT:
555     {
556       GstFormat src_fmt, dest_fmt;
557       gint64 src_val, dest_val;
558
559       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
560       if (!(res =
561               gst_kate_parse_convert (pad, src_fmt, src_val, &dest_fmt,
562                   &dest_val)))
563         goto error;
564       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
565       break;
566     }
567     default:
568       res = gst_pad_query_default (pad, query);
569       break;
570   }
571   return res;
572
573 error:
574   {
575     GST_WARNING_OBJECT (parse, "error handling query");
576     return res;
577   }
578 #endif
579 }
580
581 static void
582 gst_kate_parse_free_stream_headers (GstKateParse * parse)
583 {
584   while (parse->streamheader != NULL) {
585     gst_buffer_unref (GST_BUFFER (parse->streamheader->data));
586     parse->streamheader = g_list_delete_link (parse->streamheader,
587         parse->streamheader);
588   }
589 }
590
591 static GstStateChangeReturn
592 gst_kate_parse_change_state (GstElement * element, GstStateChange transition)
593 {
594   GstKateParse *parse = GST_KATE_PARSE (element);
595   GstStateChangeReturn ret;
596
597   switch (transition) {
598     case GST_STATE_CHANGE_READY_TO_PAUSED:
599       kate_info_init (&parse->ki);
600       kate_comment_init (&parse->kc);
601       parse->packetno = 0;
602       parse->streamheader_sent = FALSE;
603       parse->streamheader = NULL;
604       parse->buffer_queue = g_queue_new ();
605       parse->event_queue = g_queue_new ();
606       break;
607     default:
608       break;
609   }
610
611   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
612
613   switch (transition) {
614     case GST_STATE_CHANGE_PAUSED_TO_READY:
615       kate_info_clear (&parse->ki);
616       kate_comment_clear (&parse->kc);
617
618       gst_kate_parse_clear_queue (parse);
619       g_queue_free (parse->buffer_queue);
620       parse->buffer_queue = NULL;
621       g_queue_free (parse->event_queue);
622       parse->event_queue = NULL;
623       gst_kate_parse_free_stream_headers (parse);
624       break;
625
626     default:
627       break;
628   }
629
630   return ret;
631 }