kate: Initialize debug categories
[platform/upstream/gstreamer.git] / ext / kate / gstkateparse.c
1 /* GStreamer
2  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
3  * Copyright (C) 2006 Andy Wingo <wingo@pobox.com>
4  * Copyright (C) 2008 Vincent Penquerc'h <ogg.k.ogg.k@googlemail.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 /**
23  * SECTION:element-kateparse
24  * @title: kateparse
25  * @short_description: parses kate streams
26  * @see_also: katedec, vorbisparse, oggdemux, theoraparse
27  *
28  * The kateparse element will parse the header packets of the Kate
29  * stream and put them as the streamheader in the caps. This is used in the
30  * multifdsink case where you want to stream live kate streams to multiple
31  * clients, each client has to receive the streamheaders first before they can
32  * consume the kate packets.
33  *
34  * This element also makes sure that the buffers that it pushes out are properly
35  * timestamped and that their offset and offset_end are set. The buffers that
36  * kateparse outputs have all of the metadata that oggmux expects to receive,
37  * which allows you to (for example) remux an ogg/kate file.
38  *
39  * ## Example pipelines
40  *
41  * |[
42  * gst-launch-1.0 -v filesrc location=kate.ogg ! oggdemux ! kateparse ! fakesink
43  * ]|
44  * This pipeline shows that the streamheader is set in the caps, and that each
45  * buffer has the timestamp, duration, offset, and offset_end set.
46  *
47  * |[
48  * gst-launch-1.0 filesrc location=kate.ogg ! oggdemux ! kateparse \
49  *            ! oggmux ! filesink location=kate-remuxed.ogg
50  * ]|
51  * This pipeline shows remuxing. kate-remuxed.ogg might not be exactly the same
52  * as kate.ogg, but they should produce exactly the same decoded data.
53  *
54  */
55
56 #ifdef HAVE_CONFIG_H
57 #  include "config.h"
58 #endif
59
60 #include "gstkateelements.h"
61 #include "gstkateutil.h"
62 #include "gstkateparse.h"
63
64 GST_DEBUG_CATEGORY_EXTERN (gst_kateparse_debug);
65 #define GST_CAT_DEFAULT gst_kateparse_debug
66
67 static GstStaticPadTemplate gst_kate_parse_sink_factory =
68     GST_STATIC_PAD_TEMPLATE ("sink",
69     GST_PAD_SINK,
70     GST_PAD_ALWAYS,
71     GST_STATIC_CAPS ("subtitle/x-kate; application/x-kate")
72     );
73
74 static GstStaticPadTemplate gst_kate_parse_src_factory =
75     GST_STATIC_PAD_TEMPLATE ("src",
76     GST_PAD_SRC,
77     GST_PAD_ALWAYS,
78     GST_STATIC_CAPS ("subtitle/x-kate; application/x-kate")
79     );
80
81 GST_DEBUG_CATEGORY (gst_kateparse_debug);
82
83 #define gst_kate_parse_parent_class parent_class
84 G_DEFINE_TYPE (GstKateParse, gst_kate_parse, GST_TYPE_ELEMENT);
85 #define _do_init \
86   kate_element_init (plugin); \
87   GST_DEBUG_CATEGORY_INIT (gst_kateparse_debug, "kateparse", 0, "Kate parser");
88 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (kateparse, "kateparse", GST_RANK_NONE,
89     GST_TYPE_KATE_PARSE, _do_init);
90
91 static GstFlowReturn gst_kate_parse_chain (GstPad * pad, GstObject * parent,
92     GstBuffer * buffer);
93 static GstStateChangeReturn gst_kate_parse_change_state (GstElement * element,
94     GstStateChange transition);
95 static gboolean gst_kate_parse_sink_event (GstPad * pad, GstObject * parent,
96     GstEvent * event);
97 static gboolean gst_kate_parse_src_query (GstPad * pad, GstObject * parent,
98     GstQuery * query);
99 #if 0
100 static gboolean gst_kate_parse_convert (GstPad * pad,
101     GstFormat src_format, gint64 src_value,
102     GstFormat * dest_format, gint64 * dest_value);
103 #endif
104 static GstFlowReturn gst_kate_parse_parse_packet (GstKateParse * parse,
105     GstBuffer * buf);
106
107 static void
108 gst_kate_parse_class_init (GstKateParseClass * klass)
109 {
110   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
111
112   gstelement_class->change_state = gst_kate_parse_change_state;
113
114   gst_element_class_add_static_pad_template (gstelement_class,
115       &gst_kate_parse_src_factory);
116   gst_element_class_add_static_pad_template (gstelement_class,
117       &gst_kate_parse_sink_factory);
118
119   gst_element_class_set_static_metadata (gstelement_class, "Kate stream parser",
120       "Codec/Parser/Subtitle",
121       "parse raw kate streams",
122       "Vincent Penquerc'h <ogg.k.ogg.k at googlemail dot com>");
123
124   klass->parse_packet = GST_DEBUG_FUNCPTR (gst_kate_parse_parse_packet);
125 }
126
127 static void
128 gst_kate_parse_init (GstKateParse * parse)
129 {
130   parse->sinkpad =
131       gst_pad_new_from_static_template (&gst_kate_parse_sink_factory, "sink");
132   gst_pad_set_chain_function (parse->sinkpad,
133       GST_DEBUG_FUNCPTR (gst_kate_parse_chain));
134   gst_pad_set_event_function (parse->sinkpad,
135       GST_DEBUG_FUNCPTR (gst_kate_parse_sink_event));
136   gst_element_add_pad (GST_ELEMENT (parse), parse->sinkpad);
137
138   parse->srcpad =
139       gst_pad_new_from_static_template (&gst_kate_parse_src_factory, "src");
140   gst_pad_set_query_function (parse->srcpad,
141       GST_DEBUG_FUNCPTR (gst_kate_parse_src_query));
142   gst_element_add_pad (GST_ELEMENT (parse), parse->srcpad);
143 }
144
145 static void
146 gst_kate_parse_drain_event_queue (GstKateParse * parse)
147 {
148   while (parse->event_queue->length) {
149     GstEvent *event;
150
151     event = GST_EVENT_CAST (g_queue_pop_head (parse->event_queue));
152     gst_pad_event_default (parse->sinkpad, NULL, event);
153   }
154 }
155
156 static GstFlowReturn
157 gst_kate_parse_push_headers (GstKateParse * parse)
158 {
159   /* mark and put on caps */
160   GstCaps *caps;
161   GstBuffer *outbuf;
162   kate_packet packet;
163   GList *headers, *outbuf_list = NULL;
164   int ret;
165   gboolean res;
166
167   /* get the headers into the caps, passing them to kate as we go */
168   caps =
169       gst_kate_util_set_header_on_caps (&parse->element,
170       gst_pad_get_current_caps (parse->sinkpad), parse->streamheader);
171
172   if (G_UNLIKELY (!caps)) {
173     GST_ELEMENT_ERROR (parse, STREAM, DECODE, (NULL),
174         ("Failed to set headers on caps"));
175     return GST_FLOW_ERROR;
176   }
177
178   GST_DEBUG_OBJECT (parse, "here are the caps: %" GST_PTR_FORMAT, caps);
179   res = gst_pad_set_caps (parse->srcpad, caps);
180   gst_caps_unref (caps);
181   if (G_UNLIKELY (!res)) {
182     GST_WARNING_OBJECT (parse->srcpad, "Failed to set caps on source pad");
183     return GST_FLOW_NOT_NEGOTIATED;
184   }
185
186   headers = parse->streamheader;
187   while (headers) {
188     GstMapInfo info;
189
190     outbuf = GST_BUFFER_CAST (headers->data);
191
192     if (!gst_buffer_map (outbuf, &info, GST_MAP_READ)) {
193       GST_WARNING_OBJECT (outbuf, "Failed to map buffer");
194       continue;
195     }
196
197     kate_packet_wrap (&packet, info.size, info.data);
198     ret = kate_decode_headerin (&parse->ki, &parse->kc, &packet);
199     if (G_UNLIKELY (ret < 0)) {
200       GST_WARNING_OBJECT (parse, "Failed to decode header: %s",
201           gst_kate_util_get_error_message (ret));
202     }
203     gst_buffer_unmap (outbuf, &info);
204     /* takes ownership of outbuf, which was previously in parse->streamheader */
205     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_HEADER);
206     outbuf_list = g_list_append (outbuf_list, outbuf);
207     headers = headers->next;
208   }
209
210   /* first process queued events */
211   gst_kate_parse_drain_event_queue (parse);
212
213   /* push out buffers, ignoring return value... */
214   headers = outbuf_list;
215   while (headers) {
216     outbuf = GST_BUFFER_CAST (headers->data);
217     gst_pad_push (parse->srcpad, outbuf);
218     headers = headers->next;
219   }
220
221   g_list_free (outbuf_list);
222   g_list_free (parse->streamheader);
223   parse->streamheader = NULL;
224
225   parse->streamheader_sent = TRUE;
226
227   return GST_FLOW_OK;
228 }
229
230 static void
231 gst_kate_parse_clear_queue (GstKateParse * parse)
232 {
233   GST_DEBUG_OBJECT (parse, "Clearing queue");
234   while (parse->buffer_queue->length) {
235     GstBuffer *buf;
236
237     buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));
238     gst_buffer_unref (buf);
239   }
240   while (parse->event_queue->length) {
241     GstEvent *event;
242
243     event = GST_EVENT_CAST (g_queue_pop_head (parse->event_queue));
244     gst_event_unref (event);
245   }
246 }
247
248 static GstFlowReturn
249 gst_kate_parse_push_buffer (GstKateParse * parse, GstBuffer * buf,
250     gint64 granulepos)
251 {
252   GST_LOG_OBJECT (parse, "granulepos %16" G_GINT64_MODIFIER "x", granulepos);
253   if (granulepos < 0) {
254     /* packets coming not from Ogg won't have a granpos in the offset end,
255        so we have to synthesize one here - only problem is we don't know
256        the backlink - pretend there's none for now */
257     GST_INFO_OBJECT (parse, "No granulepos on buffer, synthesizing one");
258     granulepos =
259         kate_duration_granule (&parse->ki,
260         GST_BUFFER_TIMESTAMP (buf) /
261         (double) GST_SECOND) << kate_granule_shift (&parse->ki);
262   }
263   GST_BUFFER_OFFSET (buf) =
264       kate_granule_time (&parse->ki, granulepos) * GST_SECOND;
265   GST_BUFFER_OFFSET_END (buf) = granulepos;
266   GST_BUFFER_TIMESTAMP (buf) = GST_BUFFER_OFFSET (buf);
267
268   return gst_pad_push (parse->srcpad, buf);
269 }
270
271 static GstFlowReturn
272 gst_kate_parse_drain_queue_prematurely (GstKateParse * parse)
273 {
274   GstFlowReturn ret = GST_FLOW_OK;
275
276   /* got an EOS event, make sure to push out any buffers that were in the queue
277    * -- won't normally be the case, but this catches the
278    * didn't-get-a-granulepos-on-the-last-packet case. Assuming a continuous
279    * stream. */
280
281   /* if we got EOS before any buffers came, go ahead and push the other events
282    * first */
283   gst_kate_parse_drain_event_queue (parse);
284
285   while (!g_queue_is_empty (parse->buffer_queue)) {
286     GstBuffer *buf;
287     gint64 granpos;
288
289     buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));
290
291     granpos = GST_BUFFER_OFFSET_END (buf);
292     ret = gst_kate_parse_push_buffer (parse, buf, granpos);
293
294     if (ret != GST_FLOW_OK)
295       goto done;
296   }
297
298   g_assert (g_queue_is_empty (parse->buffer_queue));
299
300 done:
301   return ret;
302 }
303
304 static GstFlowReturn
305 gst_kate_parse_drain_queue (GstKateParse * parse, gint64 granulepos)
306 {
307   GstFlowReturn ret = GST_FLOW_OK;
308
309   if (!g_queue_is_empty (parse->buffer_queue)) {
310     GstBuffer *buf;
311     buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));
312     ret = gst_kate_parse_push_buffer (parse, buf, granulepos);
313
314     if (ret != GST_FLOW_OK)
315       goto done;
316   }
317   g_assert (g_queue_is_empty (parse->buffer_queue));
318
319 done:
320   return ret;
321 }
322
323 static GstFlowReturn
324 gst_kate_parse_queue_buffer (GstKateParse * parse, GstBuffer * buf)
325 {
326   GstFlowReturn ret = GST_FLOW_OK;
327   gint64 granpos;
328
329   buf = gst_buffer_make_writable (buf);
330
331   /* oggdemux stores the granule pos in the offset end */
332   granpos = GST_BUFFER_OFFSET_END (buf);
333   GST_LOG_OBJECT (parse, "granpos %16" G_GINT64_MODIFIER "x", granpos);
334   g_queue_push_tail (parse->buffer_queue, buf);
335
336 #if 1
337   /* if getting buffers from matroska, we won't have a granpos here... */
338   //if (GST_BUFFER_OFFSET_END_IS_VALID (buf)) {
339   ret = gst_kate_parse_drain_queue (parse, granpos);
340   //}
341 #else
342   if (granpos >= 0) {
343     ret = gst_kate_parse_drain_queue (parse, granpos);
344   } else {
345     GST_ELEMENT_ERROR (parse, STREAM, DECODE, (NULL),
346         ("Bad granulepos %" G_GINT64_FORMAT, granpos));
347     ret = GST_FLOW_ERROR;
348   }
349 #endif
350
351   return ret;
352 }
353
354 static GstFlowReturn
355 gst_kate_parse_parse_packet (GstKateParse * parse, GstBuffer * buf)
356 {
357   GstFlowReturn ret = GST_FLOW_OK;
358   guint8 header[1];
359   gsize size;
360
361   g_assert (parse);
362
363   parse->packetno++;
364
365   size = gst_buffer_extract (buf, 0, header, 1);
366
367   GST_LOG_OBJECT (parse, "Got packet %02x, %" G_GSIZE_FORMAT " bytes",
368       size ? header[0] : -1, gst_buffer_get_size (buf));
369
370   if (size > 0 && header[0] & 0x80) {
371     GST_DEBUG_OBJECT (parse, "Found header %02x", header[0]);
372     /* if 0x80 is set, it's streamheader,
373      * so put it on the streamheader list and return */
374     parse->streamheader = g_list_append (parse->streamheader, buf);
375     ret = GST_FLOW_OK;
376   } else {
377     if (!parse->streamheader_sent) {
378       GST_DEBUG_OBJECT (parse, "Found non header, pushing headers seen so far");
379       ret = gst_kate_parse_push_headers (parse);
380     }
381
382     if (ret == GST_FLOW_OK) {
383       ret = gst_kate_parse_queue_buffer (parse, buf);
384     }
385   }
386
387   return ret;
388 }
389
390 static GstFlowReturn
391 gst_kate_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
392 {
393   GstKateParseClass *klass;
394   GstKateParse *parse;
395
396   parse = GST_KATE_PARSE (parent);
397   klass = GST_KATE_PARSE_CLASS (G_OBJECT_GET_CLASS (parse));
398
399   g_assert (klass->parse_packet != NULL);
400
401   if (G_UNLIKELY (!gst_pad_has_current_caps (pad)))
402     return GST_FLOW_NOT_NEGOTIATED;
403
404   return klass->parse_packet (parse, buffer);
405 }
406
407 static gboolean
408 gst_kate_parse_queue_event (GstKateParse * parse, GstEvent * event)
409 {
410   GstFlowReturn ret = TRUE;
411
412   g_queue_push_tail (parse->event_queue, event);
413
414   return ret;
415 }
416
417 static gboolean
418 gst_kate_parse_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
419 {
420   gboolean ret;
421   GstKateParse *parse;
422
423   parse = GST_KATE_PARSE (parent);
424
425   switch (GST_EVENT_TYPE (event)) {
426     case GST_EVENT_FLUSH_STOP:
427       gst_kate_parse_clear_queue (parse);
428       ret = gst_pad_event_default (pad, parent, event);
429       break;
430     case GST_EVENT_EOS:
431       if (!parse->streamheader_sent) {
432         GST_DEBUG_OBJECT (parse, "Got EOS, pushing headers seen so far");
433         ret = gst_kate_parse_push_headers (parse);
434         if (ret != GST_FLOW_OK)
435           break;
436       }
437       gst_kate_parse_drain_queue_prematurely (parse);
438       ret = gst_pad_event_default (pad, parent, event);
439       break;
440     default:
441       if (!parse->streamheader_sent && GST_EVENT_IS_SERIALIZED (event)
442           && GST_EVENT_TYPE (event) > GST_EVENT_CAPS)
443         ret = gst_kate_parse_queue_event (parse, event);
444       else
445         ret = gst_pad_event_default (pad, parent, event);
446       break;
447   }
448
449   return ret;
450 }
451
452 #if 0
453 static gboolean
454 gst_kate_parse_convert (GstPad * pad,
455     GstFormat src_format, gint64 src_value,
456     GstFormat * dest_format, gint64 * dest_value)
457 {
458   gboolean res = TRUE;
459   GstKateParse *parse;
460
461   parse = GST_KATE_PARSE (GST_PAD_PARENT (pad));
462
463   /* fixme: assumes atomic access to lots of instance variables modified from
464    * the streaming thread, including 64-bit variables */
465
466   if (!parse->streamheader_sent)
467     return FALSE;
468
469   if (src_format == *dest_format) {
470     *dest_value = src_value;
471     return TRUE;
472   }
473
474   if (parse->sinkpad == pad &&
475       (src_format == GST_FORMAT_BYTES || *dest_format == GST_FORMAT_BYTES))
476     return FALSE;
477
478   switch (src_format) {
479     case GST_FORMAT_TIME:
480       switch (*dest_format) {
481         default:
482           res = FALSE;
483       }
484       break;
485     case GST_FORMAT_DEFAULT:
486       switch (*dest_format) {
487         case GST_FORMAT_TIME:
488           *dest_value = kate_granule_time (&parse->ki, src_value) * GST_SECOND;
489           break;
490         default:
491           res = FALSE;
492       }
493       break;
494     default:
495       res = FALSE;
496   }
497
498   return res;
499 }
500 #endif
501
502 static gboolean
503 gst_kate_parse_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
504 {
505 #if 1
506   // TODO
507   GST_WARNING ("gst_kate_parse_src_query");
508   return FALSE;
509 #else
510   gint64 granulepos;
511   GstKateParse *parse;
512   gboolean res = FALSE;
513
514   parse = GST_KATE_PARSE (parent);
515
516   switch (GST_QUERY_TYPE (query)) {
517     case GST_QUERY_POSITION:
518     {
519       GstFormat format;
520       gint64 value;
521
522       granulepos = parse->prev_granulepos;
523
524       gst_query_parse_position (query, &format, NULL);
525
526       /* and convert to the final format */
527       if (!(res =
528               gst_kate_parse_convert (pad, GST_FORMAT_DEFAULT, granulepos,
529                   &format, &value)))
530         goto error;
531
532       /* fixme: support segments
533          value = (value - parse->segment_start) + parse->segment_time;
534        */
535
536       gst_query_set_position (query, format, value);
537
538       GST_LOG_OBJECT (parse, "query %p: peer returned granulepos: %"
539           G_GUINT64_FORMAT " - we return %" G_GUINT64_FORMAT " (format %u)",
540           query, granulepos, value, format);
541
542       break;
543     }
544     case GST_QUERY_DURATION:
545     {
546       /* fixme: not threadsafe */
547       /* query peer for total length */
548       if (!gst_pad_is_linked (parse->sinkpad)) {
549         GST_WARNING_OBJECT (parse, "sink pad %" GST_PTR_FORMAT " is not linked",
550             parse->sinkpad);
551         goto error;
552       }
553       if (!(res = gst_pad_query (GST_PAD_PEER (parse->sinkpad), query)))
554         goto error;
555       break;
556     }
557     case GST_QUERY_CONVERT:
558     {
559       GstFormat src_fmt, dest_fmt;
560       gint64 src_val, dest_val;
561
562       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
563       if (!(res =
564               gst_kate_parse_convert (pad, src_fmt, src_val, &dest_fmt,
565                   &dest_val)))
566         goto error;
567       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
568       break;
569     }
570     default:
571       res = gst_pad_query_default (pad, query);
572       break;
573   }
574   return res;
575
576 error:
577   {
578     GST_WARNING_OBJECT (parse, "error handling query");
579     return res;
580   }
581 #endif
582 }
583
584 static void
585 gst_kate_parse_free_stream_headers (GstKateParse * parse)
586 {
587   while (parse->streamheader != NULL) {
588     gst_buffer_unref (GST_BUFFER (parse->streamheader->data));
589     parse->streamheader = g_list_delete_link (parse->streamheader,
590         parse->streamheader);
591   }
592 }
593
594 static GstStateChangeReturn
595 gst_kate_parse_change_state (GstElement * element, GstStateChange transition)
596 {
597   GstKateParse *parse = GST_KATE_PARSE (element);
598   GstStateChangeReturn ret;
599
600   switch (transition) {
601     case GST_STATE_CHANGE_READY_TO_PAUSED:
602       kate_info_init (&parse->ki);
603       kate_comment_init (&parse->kc);
604       parse->packetno = 0;
605       parse->streamheader_sent = FALSE;
606       parse->streamheader = NULL;
607       parse->buffer_queue = g_queue_new ();
608       parse->event_queue = g_queue_new ();
609       break;
610     default:
611       break;
612   }
613
614   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
615
616   switch (transition) {
617     case GST_STATE_CHANGE_PAUSED_TO_READY:
618       kate_info_clear (&parse->ki);
619       kate_comment_clear (&parse->kc);
620
621       gst_kate_parse_clear_queue (parse);
622       g_queue_free (parse->buffer_queue);
623       parse->buffer_queue = NULL;
624       g_queue_free (parse->event_queue);
625       parse->event_queue = NULL;
626       gst_kate_parse_free_stream_headers (parse);
627       break;
628
629     default:
630       break;
631   }
632
633   return ret;
634 }