subparse: Send timestamp map custom event for HLS webvtt
[platform/upstream/gstreamer.git] / ext / theora / gsttheoraparse.c
1 /* GStreamer
2  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
3  * Copyright (C) 2006 Andy Wingo <wingo@pobox.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * SECTION:element-theoraparse
23  * @title: theoraparse
24  * @see_also: theoradec, oggdemux, vorbisparse
25  *
26  * The theoraparse element will parse the header packets of the Theora
27  * stream and put them as the streamheader in the caps. This is used in the
28  * multifdsink case where you want to stream live theora streams to multiple
29  * clients, each client has to receive the streamheaders first before they can
30  * consume the theora packets.
31  *
32  * This element also makes sure that the buffers that it pushes out are properly
33  * timestamped and that their offset and offset_end are set. The buffers that
34  * theoraparse outputs have all of the metadata that oggmux expects to receive,
35  * which allows you to (for example) remux an ogg/theora file.
36  *
37  * In addition, this element allows you to fix badly synchronized streams. You
38  * pass in an array of (granule time, buffer time) synchronization points via
39  * the synchronization-points GValueArray property, and this element will adjust
40  * the granulepos values that it outputs. The adjustment will be made by
41  * offsetting all buffers that it outputs by a specified amount, and updating
42  * that offset from the value array whenever a keyframe is processed.
43  *
44  * ## Example pipelines
45  * |[
46  * gst-launch-1.0 -v filesrc location=video.ogg ! oggdemux ! theoraparse ! fakesink
47  * ]|
48  *  This pipeline shows that the streamheader is set in the caps, and that each
49  * buffer has the timestamp, duration, offset, and offset_end set.
50  * |[
51  * gst-launch-1.0 filesrc location=video.ogg ! oggdemux ! theoraparse \
52  *            ! oggmux ! filesink location=video-remuxed.ogg
53  * ]|
54  *  This pipeline shows remuxing. video-remuxed.ogg might not be exactly the same
55  * as video.ogg, but they should produce exactly the same decoded data.
56  *
57  */
58
59 /* FIXME 0.11: suppress warnings for deprecated API such as GValueArray
60  * with newer GLib versions (>= 2.31.0) */
61 #define GLIB_DISABLE_DEPRECATION_WARNINGS
62
63 #ifdef HAVE_CONFIG_H
64 #  include "config.h"
65 #endif
66
67 #include "gsttheoraparse.h"
68
69 #define GST_CAT_DEFAULT theoraparse_debug
70 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
71
72 static GstStaticPadTemplate theora_parse_sink_factory =
73 GST_STATIC_PAD_TEMPLATE ("sink",
74     GST_PAD_SINK,
75     GST_PAD_ALWAYS,
76     GST_STATIC_CAPS ("video/x-theora")
77     );
78
79 static GstStaticPadTemplate theora_parse_src_factory =
80 GST_STATIC_PAD_TEMPLATE ("src",
81     GST_PAD_SRC,
82     GST_PAD_ALWAYS,
83     GST_STATIC_CAPS ("video/x-theora")
84     );
85
86 enum
87 {
88   PROP_0,
89   PROP_SYNCHRONIZATION_POINTS
90 };
91
92 #define gst_theora_parse_parent_class parent_class
93 G_DEFINE_TYPE (GstTheoraParse, gst_theora_parse, GST_TYPE_ELEMENT);
94
95 static void theora_parse_dispose (GObject * object);
96
97 #if 0
98 static void theora_parse_get_property (GObject * object, guint prop_id,
99     GValue * value, GParamSpec * pspec);
100 static void theora_parse_set_property (GObject * object, guint prop_id,
101     const GValue * value, GParamSpec * pspec);
102 #endif
103
104 static GstFlowReturn theora_parse_chain (GstPad * pad, GstObject * parent,
105     GstBuffer * buffer);
106 static GstStateChangeReturn theora_parse_change_state (GstElement * element,
107     GstStateChange transition);
108 static gboolean theora_parse_sink_event (GstPad * pad, GstObject * parent,
109     GstEvent * event);
110 static gboolean theora_parse_src_query (GstPad * pad, GstObject * parent,
111     GstQuery * query);
112
113 static void
114 gst_theora_parse_class_init (GstTheoraParseClass * klass)
115 {
116   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
117   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
118
119   gobject_class->dispose = theora_parse_dispose;
120
121 #if 0
122   gobject_class->get_property = theora_parse_get_property;
123   gobject_class->set_property = theora_parse_set_property;
124
125   /**
126    * GstTheoraParse:sychronization-points
127    *
128    * An array of (granuletime, buffertime) pairs
129    */
130   g_object_class_install_property (gobject_class, PROP_SYNCHRONIZATION_POINTS,
131       g_param_spec_value_array ("synchronization-points",
132           "Synchronization points",
133           "An array of (granuletime, buffertime) pairs",
134           g_param_spec_uint64 ("time", "Time",
135               "Time (either granuletime or buffertime)", 0, G_MAXUINT64, 0,
136               G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS),
137           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
138 #endif
139
140   gst_element_class_add_static_pad_template (gstelement_class,
141       &theora_parse_src_factory);
142   gst_element_class_add_static_pad_template (gstelement_class,
143       &theora_parse_sink_factory);
144   gst_element_class_set_static_metadata (gstelement_class,
145       "Theora video parser", "Codec/Parser/Video", "parse raw theora streams",
146       "Andy Wingo <wingo@pobox.com>");
147
148   gstelement_class->change_state = theora_parse_change_state;
149
150   GST_DEBUG_CATEGORY_INIT (theoraparse_debug, "theoraparse", 0,
151       "Theora parser");
152 }
153
154 static void
155 gst_theora_parse_init (GstTheoraParse * parse)
156 {
157   parse->sinkpad =
158       gst_pad_new_from_static_template (&theora_parse_sink_factory, "sink");
159   gst_pad_set_chain_function (parse->sinkpad, theora_parse_chain);
160   gst_pad_set_event_function (parse->sinkpad, theora_parse_sink_event);
161   gst_element_add_pad (GST_ELEMENT (parse), parse->sinkpad);
162
163   parse->srcpad =
164       gst_pad_new_from_static_template (&theora_parse_src_factory, "src");
165   gst_pad_set_query_function (parse->srcpad, theora_parse_src_query);
166   gst_element_add_pad (GST_ELEMENT (parse), parse->srcpad);
167 }
168
169 static void
170 theora_parse_dispose (GObject * object)
171 {
172   GstTheoraParse *parse = GST_THEORA_PARSE (object);
173
174   g_free (parse->times);
175   parse->times = NULL;
176
177   G_OBJECT_CLASS (parent_class)->dispose (object);
178 }
179
180 #if 0
181 static void
182 theora_parse_set_property (GObject * object, guint prop_id,
183     const GValue * value, GParamSpec * pspec)
184 {
185   GstTheoraParse *parse = GST_THEORA_PARSE (object);
186
187   switch (prop_id) {
188     case PROP_SYNCHRONIZATION_POINTS:
189     {
190       GValueArray *array;
191       guint i;
192
193       array = g_value_get_boxed (value);
194
195       if (array) {
196         if (array->n_values % 2)
197           goto odd_values;
198
199         g_free (parse->times);
200         parse->times = g_new (GstClockTime, array->n_values);
201         parse->npairs = array->n_values / 2;
202         for (i = 0; i < array->n_values; i++)
203           parse->times[i] = g_value_get_uint64 (&array->values[i]);
204       } else {
205         g_free (parse->times);
206         parse->npairs = 0;
207       }
208     }
209       break;
210     default:
211       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
212       break;
213   }
214
215   return;
216
217 odd_values:
218   {
219     g_critical ("expected an even number of time values for "
220         "synchronization-points");
221     return;
222   }
223 }
224
225 static void
226 theora_parse_get_property (GObject * object, guint prop_id,
227     GValue * value, GParamSpec * pspec)
228 {
229   GstTheoraParse *parse = GST_THEORA_PARSE (object);
230
231   switch (prop_id) {
232     case PROP_SYNCHRONIZATION_POINTS:
233     {
234       GValueArray *array = NULL;
235       guint i;
236
237       array = g_value_array_new (parse->npairs * 2);
238
239       for (i = 0; i < parse->npairs; i++) {
240         GValue v = { 0, };
241
242         g_value_init (&v, G_TYPE_UINT64);
243         g_value_set_uint64 (&v, parse->times[i * 2]);
244         g_value_array_append (array, &v);
245         g_value_set_uint64 (&v, parse->times[i * 2 + 1]);
246         g_value_array_append (array, &v);
247         g_value_unset (&v);
248       }
249
250       g_value_take_boxed (value, array);
251     }
252       break;
253     default:
254       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
255       break;
256   }
257 }
258 #endif
259
260 static void
261 theora_parse_set_header_on_caps (GstTheoraParse * parse, GstCaps * caps)
262 {
263   GstBuffer **bufs;
264   GstStructure *structure;
265   gint i;
266   GValue array = { 0 };
267   GValue value = { 0 };
268
269   bufs = parse->streamheader;
270   structure = gst_caps_get_structure (caps, 0);
271   g_value_init (&array, GST_TYPE_ARRAY);
272
273   for (i = 0; i < 3; i++) {
274     if (bufs[i] == NULL)
275       continue;
276
277     bufs[i] = gst_buffer_make_writable (bufs[i]);
278     GST_BUFFER_FLAG_SET (bufs[i], GST_BUFFER_FLAG_HEADER);
279
280     g_value_init (&value, GST_TYPE_BUFFER);
281     gst_value_set_buffer (&value, bufs[i]);
282     gst_value_array_append_value (&array, &value);
283     g_value_unset (&value);
284   }
285
286   gst_structure_take_value (structure, "streamheader", &array);
287 }
288
289 /* two tasks to do here: set the streamheader on the caps, and use libtheora to
290    parse the headers */
291 static void
292 theora_parse_set_streamheader (GstTheoraParse * parse)
293 {
294   GstCaps *caps;
295   gint i;
296   guint32 bitstream_version;
297   th_setup_info *setup = NULL;
298
299   g_assert (!parse->streamheader_received);
300
301   caps = gst_caps_make_writable (gst_pad_query_caps (parse->srcpad, NULL));
302   theora_parse_set_header_on_caps (parse, caps);
303   GST_DEBUG_OBJECT (parse, "here are the caps: %" GST_PTR_FORMAT, caps);
304   gst_pad_set_caps (parse->srcpad, caps);
305   gst_caps_unref (caps);
306
307   for (i = 0; i < 3; i++) {
308     ogg_packet packet;
309     GstBuffer *buf;
310     int ret;
311     GstMapInfo map;
312
313     buf = parse->streamheader[i];
314     if (buf == NULL)
315       continue;
316
317     gst_buffer_map (buf, &map, GST_MAP_READ);
318     packet.packet = map.data;
319     packet.bytes = map.size;
320     packet.granulepos = GST_BUFFER_OFFSET_END (buf);
321     packet.packetno = i + 1;
322     packet.e_o_s = 0;
323     packet.b_o_s = (i == 0);
324     ret = th_decode_headerin (&parse->info, &parse->comment, &setup, &packet);
325     gst_buffer_unmap (buf, &map);
326     if (ret < 0) {
327       GST_WARNING_OBJECT (parse, "Failed to decode Theora header %d: %d",
328           i + 1, ret);
329     }
330   }
331   if (setup) {
332     th_setup_free (setup);
333   }
334
335   parse->fps_n = parse->info.fps_numerator;
336   parse->fps_d = parse->info.fps_denominator;
337   parse->shift = parse->info.keyframe_granule_shift;
338
339   /* With libtheora-1.0beta1 the granulepos scheme was changed:
340    * where earlier the granulepos referred to the index/beginning
341    * of a frame, it now refers to the end, which matches the use
342    * in vorbis/speex. We check the bitstream version from the header so
343    * we know which way to interpret the incoming granuepos
344    */
345   bitstream_version = (parse->info.version_major << 16) |
346       (parse->info.version_minor << 8) | parse->info.version_subminor;
347   parse->is_old_bitstream = (bitstream_version <= 0x00030200);
348
349   parse->streamheader_received = TRUE;
350 }
351
352 static void
353 theora_parse_drain_event_queue (GstTheoraParse * parse)
354 {
355   while (parse->event_queue->length) {
356     GstEvent *event;
357
358     event = GST_EVENT_CAST (g_queue_pop_head (parse->event_queue));
359     gst_pad_event_default (parse->sinkpad, GST_OBJECT_CAST (parse), event);
360   }
361 }
362
363 static void
364 theora_parse_push_headers (GstTheoraParse * parse)
365 {
366   gint i;
367
368   if (!parse->streamheader_received)
369     theora_parse_set_streamheader (parse);
370
371   theora_parse_drain_event_queue (parse);
372
373   /* ignore return values, we pass along the result of pushing data packets only
374    */
375   for (i = 0; i < 3; i++) {
376     GstBuffer *buf;
377
378     if ((buf = parse->streamheader[i])) {
379       gst_pad_push (parse->srcpad, buf);
380       parse->streamheader[i] = NULL;
381     }
382   }
383 }
384
385 static void
386 theora_parse_clear_queue (GstTheoraParse * parse)
387 {
388   while (parse->buffer_queue->length) {
389     GstBuffer *buf;
390
391     buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));
392     gst_buffer_unref (buf);
393   }
394   while (parse->event_queue->length) {
395     GstEvent *event;
396
397     event = GST_EVENT_CAST (g_queue_pop_head (parse->event_queue));
398     gst_event_unref (event);
399   }
400 }
401
402 static gint64
403 make_granulepos (GstTheoraParse * parse, gint64 keyframe, gint64 frame)
404 {
405   gint64 iframe;
406
407   if (keyframe == -1)
408     keyframe = 0;
409   /* If using newer theora, offset the granulepos by +1, see comment in
410    * theora_parse_set_streamheader.
411    * 
412    * We don't increment keyframe directly, as internally we always index frames
413    * starting from 0 and we do some sanity checking below. */
414   if (!parse->is_old_bitstream)
415     iframe = keyframe + 1;
416   else
417     iframe = keyframe;
418
419   g_return_val_if_fail (frame >= keyframe, -1);
420   g_return_val_if_fail (frame - keyframe < 1 << parse->shift, -1);
421
422   return (iframe << parse->shift) + (frame - keyframe);
423 }
424
425 static void
426 parse_granulepos (GstTheoraParse * parse, gint64 granulepos,
427     gint64 * keyframe, gint64 * frame)
428 {
429   gint64 kf;
430
431   kf = granulepos >> parse->shift;
432   /* If using newer theora, offset the granulepos by -1, see comment
433    * in theora_parse_set_streamheader */
434   if (!parse->is_old_bitstream)
435     kf -= 1;
436   if (keyframe)
437     *keyframe = kf;
438   if (frame)
439     *frame = kf + (granulepos & ((1 << parse->shift) - 1));
440 }
441
442 static gboolean
443 is_keyframe (GstBuffer * buf)
444 {
445   gsize size;
446   guint8 data[1];
447
448   size = gst_buffer_get_size (buf);
449   if (size == 0)
450     return FALSE;
451
452   gst_buffer_extract (buf, 0, data, 1);
453
454   return ((data[0] & 0x40) == 0);
455 }
456
457 static void
458 theora_parse_munge_granulepos (GstTheoraParse * parse, GstBuffer * buf,
459     gint64 keyframe, gint64 frame)
460 {
461   gint64 frames_diff;
462   GstClockTimeDiff time_diff;
463
464   if (keyframe == frame) {
465     gint i;
466
467     /* update granule_offset */
468     for (i = 0; i < parse->npairs; i++) {
469       if (parse->times[i * 2] >= GST_BUFFER_OFFSET (buf))
470         break;
471     }
472     if (i > 0) {
473       /* time_diff gets reset below */
474       time_diff = parse->times[i * 2 - 1] - parse->times[i * 2 - 2];
475       parse->granule_offset = gst_util_uint64_scale (time_diff,
476           parse->fps_n, parse->fps_d * GST_SECOND);
477       parse->granule_offset <<= parse->shift;
478     }
479   }
480
481   frames_diff = parse->granule_offset >> parse->shift;
482   time_diff = gst_util_uint64_scale_int (GST_SECOND * frames_diff,
483       parse->fps_d, parse->fps_n);
484
485   GST_DEBUG_OBJECT (parse, "offsetting theora stream by %" G_GINT64_FORMAT
486       " frames (%" GST_TIME_FORMAT ")", frames_diff, GST_TIME_ARGS (time_diff));
487
488   GST_BUFFER_OFFSET_END (buf) += parse->granule_offset;
489   GST_BUFFER_OFFSET (buf) += time_diff;
490   GST_BUFFER_TIMESTAMP (buf) += time_diff;
491 }
492
493 static GstFlowReturn
494 theora_parse_push_buffer (GstTheoraParse * parse, GstBuffer * buf,
495     gint64 keyframe, gint64 frame)
496 {
497
498   GstClockTime this_time, next_time;
499
500   this_time = gst_util_uint64_scale_int (GST_SECOND * frame,
501       parse->fps_d, parse->fps_n);
502
503   next_time = gst_util_uint64_scale_int (GST_SECOND * (frame + 1),
504       parse->fps_d, parse->fps_n);
505
506   GST_BUFFER_OFFSET_END (buf) = make_granulepos (parse, keyframe, frame);
507   GST_BUFFER_OFFSET (buf) = this_time;
508   GST_BUFFER_TIMESTAMP (buf) = this_time;
509   GST_BUFFER_DURATION (buf) = next_time - this_time;
510
511   if (parse->times)
512     theora_parse_munge_granulepos (parse, buf, keyframe, frame);
513
514   GST_DEBUG_OBJECT (parse, "pushing buffer with granulepos %" G_GINT64_FORMAT
515       "|%" G_GINT64_FORMAT, keyframe, frame - keyframe);
516
517   return gst_pad_push (parse->srcpad, buf);
518 }
519
520 static GstFlowReturn
521 theora_parse_drain_queue_prematurely (GstTheoraParse * parse)
522 {
523   GstFlowReturn ret = GST_FLOW_OK;
524
525   /* got an EOS event, make sure to push out any buffers that were in the queue
526    * -- won't normally be the case, but this catches the
527    * didn't-get-a-granulepos-on-the-last-packet case. Assuming a continuous
528    * stream. */
529
530   GST_DEBUG_OBJECT (parse, "got EOS, draining queue");
531
532   /* if we get an eos before pushing the streamheaders, drain our events before
533    * eos */
534   theora_parse_drain_event_queue (parse);
535
536   while (!g_queue_is_empty (parse->buffer_queue)) {
537     GstBuffer *buf;
538
539     buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));
540
541     parse->prev_frame++;
542
543     if (is_keyframe (buf))
544       /* we have a keyframe */
545       parse->prev_keyframe = parse->prev_frame;
546     else
547       GST_BUFFER_FLAGS (buf) |= GST_BUFFER_FLAG_DELTA_UNIT;
548
549     if (parse->prev_keyframe < 0) {
550       if (GST_BUFFER_OFFSET_END_IS_VALID (buf)) {
551         parse_granulepos (parse, GST_BUFFER_OFFSET_END (buf),
552             &parse->prev_keyframe, NULL);
553       } else {
554         /* No previous keyframe known; can't extract one from this frame. That
555          * means we can't do any valid output for this frame, just continue to
556          * the next frame.
557          */
558         gst_buffer_unref (buf);
559         continue;
560       }
561     }
562
563     ret = theora_parse_push_buffer (parse, buf, parse->prev_keyframe,
564         parse->prev_frame);
565
566     if (ret != GST_FLOW_OK)
567       goto done;
568   }
569
570 done:
571   return ret;
572 }
573
574 static GstFlowReturn
575 theora_parse_drain_queue (GstTheoraParse * parse, gint64 granulepos)
576 {
577   GstFlowReturn ret = GST_FLOW_OK;
578   gint64 keyframe, prev_frame, frame;
579
580   parse_granulepos (parse, granulepos, &keyframe, &frame);
581
582   GST_DEBUG ("draining queue of length %d",
583       g_queue_get_length (parse->buffer_queue));
584
585   GST_LOG_OBJECT (parse, "gp %" G_GINT64_FORMAT ", kf %" G_GINT64_FORMAT
586       ", frame %" G_GINT64_FORMAT, granulepos, keyframe, frame);
587
588   prev_frame = frame - g_queue_get_length (parse->buffer_queue);
589
590   GST_LOG_OBJECT (parse,
591       "new prev %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT, prev_frame,
592       parse->prev_frame);
593
594   if (prev_frame < parse->prev_frame) {
595     GST_WARNING ("jumped %" G_GINT64_FORMAT
596         " frames backwards! not sure what to do here",
597         parse->prev_frame - prev_frame);
598     parse->prev_frame = prev_frame;
599   } else if (prev_frame > parse->prev_frame) {
600     GST_INFO ("discontinuity detected (%" G_GINT64_FORMAT
601         " frames)", prev_frame - parse->prev_frame);
602     if (keyframe <= prev_frame && keyframe > parse->prev_keyframe)
603       parse->prev_keyframe = keyframe;
604     parse->prev_frame = prev_frame;
605   }
606
607   while (!g_queue_is_empty (parse->buffer_queue)) {
608     GstBuffer *buf;
609
610     parse->prev_frame++;
611     g_assert (parse->prev_frame >= 0);
612
613     buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));
614
615     if (is_keyframe (buf))
616       /* we have a keyframe */
617       parse->prev_keyframe = parse->prev_frame;
618     else
619       GST_BUFFER_FLAGS (buf) |= GST_BUFFER_FLAG_DELTA_UNIT;
620
621     ret = theora_parse_push_buffer (parse, buf, parse->prev_keyframe,
622         parse->prev_frame);
623
624     if (ret != GST_FLOW_OK)
625       goto done;
626   }
627
628 done:
629   return ret;
630 }
631
632 static GstFlowReturn
633 theora_parse_queue_buffer (GstTheoraParse * parse, GstBuffer * buf)
634 {
635   GstFlowReturn ret = GST_FLOW_OK;
636
637   buf = gst_buffer_make_writable (buf);
638
639   g_queue_push_tail (parse->buffer_queue, buf);
640
641   if (GST_BUFFER_OFFSET_END_IS_VALID (buf)) {
642     if (parse->prev_keyframe < 0) {
643       parse_granulepos (parse, GST_BUFFER_OFFSET_END (buf),
644           &parse->prev_keyframe, NULL);
645     }
646     ret = theora_parse_drain_queue (parse, GST_BUFFER_OFFSET_END (buf));
647   }
648
649   return ret;
650 }
651
652 static GstFlowReturn
653 theora_parse_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
654 {
655   GstFlowReturn ret;
656   GstTheoraParse *parse;
657   GstMapInfo map;
658   guint8 header;
659   gboolean have_header;
660
661   parse = GST_THEORA_PARSE (parent);
662
663   have_header = FALSE;
664
665   gst_buffer_map (buffer, &map, GST_MAP_READ);
666   header = map.data[0];
667   gst_buffer_unmap (buffer, &map);
668
669   if (map.size >= 1) {
670     if (header & 0x80)
671       have_header = TRUE;
672   }
673
674   if (have_header) {
675     if (parse->send_streamheader) {
676       /* we need to collect the headers still */
677       /* so put it on the streamheader list and return */
678       if (header >= 0x80 && header <= 0x82)
679         parse->streamheader[header - 0x80] = buffer;
680     }
681     ret = GST_FLOW_OK;
682   } else {
683     /* data packet, push the headers we collected before */
684     if (parse->send_streamheader) {
685       theora_parse_push_headers (parse);
686       parse->send_streamheader = FALSE;
687     }
688
689     ret = theora_parse_queue_buffer (parse, buffer);
690   }
691
692   return ret;
693 }
694
695 static gboolean
696 theora_parse_queue_event (GstTheoraParse * parse, GstEvent * event)
697 {
698   g_queue_push_tail (parse->event_queue, event);
699   return TRUE;
700 }
701
702 static gboolean
703 theora_parse_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
704 {
705   gboolean ret;
706   GstTheoraParse *parse;
707
708   parse = GST_THEORA_PARSE (parent);
709
710   switch (GST_EVENT_TYPE (event)) {
711     case GST_EVENT_FLUSH_STOP:
712       theora_parse_clear_queue (parse);
713       parse->prev_keyframe = -1;
714       parse->prev_frame = -1;
715       ret = gst_pad_event_default (pad, parent, event);
716       break;
717     case GST_EVENT_EOS:
718       theora_parse_drain_queue_prematurely (parse);
719       ret = gst_pad_event_default (pad, parent, event);
720       break;
721     default:
722       if (parse->send_streamheader && GST_EVENT_IS_SERIALIZED (event)
723           && GST_EVENT_TYPE (event) > GST_EVENT_CAPS)
724         ret = theora_parse_queue_event (parse, event);
725       else
726         ret = gst_pad_event_default (pad, parent, event);
727       break;
728   }
729
730   return ret;
731 }
732
733 static gboolean
734 theora_parse_src_convert (GstPad * pad,
735     GstFormat src_format, gint64 src_value,
736     GstFormat * dest_format, gint64 * dest_value)
737 {
738   gboolean res = TRUE;
739   GstTheoraParse *parse;
740   guint64 scale = 1;
741
742   if (src_format == *dest_format) {
743     *dest_value = src_value;
744     return TRUE;
745   }
746
747   parse = GST_THEORA_PARSE (gst_pad_get_parent (pad));
748
749   /* we need the info part before we can done something */
750   if (!parse->streamheader_received)
751     goto no_header;
752
753   switch (src_format) {
754     case GST_FORMAT_BYTES:
755       switch (*dest_format) {
756         case GST_FORMAT_DEFAULT:
757           *dest_value = gst_util_uint64_scale_int (src_value, 2,
758               parse->info.pic_height * parse->info.pic_width * 3);
759           break;
760         case GST_FORMAT_TIME:
761           /* seems like a rather silly conversion, implement me if you like */
762         default:
763           res = FALSE;
764       }
765       break;
766     case GST_FORMAT_TIME:
767       switch (*dest_format) {
768         case GST_FORMAT_BYTES:
769           scale = 3 * (parse->info.pic_width * parse->info.pic_height) / 2;
770         case GST_FORMAT_DEFAULT:
771           *dest_value = scale * gst_util_uint64_scale (src_value,
772               parse->info.fps_numerator,
773               parse->info.fps_denominator * GST_SECOND);
774           break;
775         default:
776           GST_DEBUG_OBJECT (parse, "cannot convert to format %s",
777               gst_format_get_name (*dest_format));
778           res = FALSE;
779       }
780       break;
781     case GST_FORMAT_DEFAULT:
782       switch (*dest_format) {
783         case GST_FORMAT_TIME:
784           *dest_value = gst_util_uint64_scale (src_value,
785               GST_SECOND * parse->info.fps_denominator,
786               parse->info.fps_numerator);
787           break;
788         case GST_FORMAT_BYTES:
789           *dest_value = gst_util_uint64_scale_int (src_value,
790               3 * parse->info.pic_width * parse->info.pic_height, 2);
791           break;
792         default:
793           res = FALSE;
794       }
795       break;
796     default:
797       res = FALSE;
798   }
799 done:
800   gst_object_unref (parse);
801   return res;
802
803   /* ERRORS */
804 no_header:
805   {
806     GST_DEBUG_OBJECT (parse, "no header yet, cannot convert");
807     res = FALSE;
808     goto done;
809   }
810 }
811
812 static gboolean
813 theora_parse_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
814 {
815   GstTheoraParse *parse;
816   gboolean res = FALSE;
817
818   parse = GST_THEORA_PARSE (parent);
819
820   switch (GST_QUERY_TYPE (query)) {
821     case GST_QUERY_POSITION:
822     {
823       gint64 frame, value;
824       GstFormat my_format, format;
825       gint64 time;
826
827       frame = parse->prev_frame;
828
829       GST_LOG_OBJECT (parse,
830           "query %p: we have current frame: %" G_GINT64_FORMAT, query, frame);
831
832       /* parse format */
833       gst_query_parse_position (query, &format, NULL);
834
835       /* and convert to the final format in two steps with time as the 
836        * intermediate step */
837       my_format = GST_FORMAT_TIME;
838       if (!(res =
839               theora_parse_src_convert (parse->sinkpad, GST_FORMAT_DEFAULT,
840                   frame, &my_format, &time)))
841         goto error;
842
843       /* fixme: handle segments
844          time = (time - parse->segment.start) + parse->segment.time;
845        */
846
847       GST_LOG_OBJECT (parse,
848           "query %p: our time: %" GST_TIME_FORMAT " (conv to %s)",
849           query, GST_TIME_ARGS (time), gst_format_get_name (format));
850
851       if (!(res =
852               theora_parse_src_convert (pad, my_format, time, &format, &value)))
853         goto error;
854
855       gst_query_set_position (query, format, value);
856
857       GST_LOG_OBJECT (parse,
858           "query %p: we return %" G_GINT64_FORMAT " (format %u)", query, value,
859           format);
860
861       break;
862     }
863     case GST_QUERY_DURATION:
864       /* forward to peer for total */
865       if (!(res = gst_pad_query (GST_PAD_PEER (parse->sinkpad), query)))
866         goto error;
867       break;
868     case GST_QUERY_CONVERT:
869     {
870       GstFormat src_fmt, dest_fmt;
871       gint64 src_val, dest_val;
872
873       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
874       if (!(res =
875               theora_parse_src_convert (pad, src_fmt, src_val, &dest_fmt,
876                   &dest_val)))
877         goto error;
878
879       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
880       break;
881     }
882     default:
883       res = gst_pad_query_default (pad, parent, query);
884       break;
885   }
886 done:
887
888   return res;
889
890   /* ERRORS */
891 error:
892   {
893     GST_DEBUG_OBJECT (parse, "query failed");
894     goto done;
895   }
896 }
897
898 static GstStateChangeReturn
899 theora_parse_change_state (GstElement * element, GstStateChange transition)
900 {
901   GstTheoraParse *parse = GST_THEORA_PARSE (element);
902   GstStateChangeReturn ret;
903   gint i;
904
905   switch (transition) {
906     case GST_STATE_CHANGE_READY_TO_PAUSED:
907       th_info_init (&parse->info);
908       th_comment_init (&parse->comment);
909       parse->send_streamheader = TRUE;
910       parse->buffer_queue = g_queue_new ();
911       parse->event_queue = g_queue_new ();
912       parse->prev_keyframe = -1;
913       parse->prev_frame = -1;
914       parse->granule_offset = 0;
915       break;
916     default:
917       break;
918   }
919
920   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
921
922   switch (transition) {
923     case GST_STATE_CHANGE_PAUSED_TO_READY:
924       th_info_clear (&parse->info);
925       th_comment_clear (&parse->comment);
926       theora_parse_clear_queue (parse);
927       g_queue_free (parse->buffer_queue);
928       g_queue_free (parse->event_queue);
929       parse->buffer_queue = NULL;
930       for (i = 0; i < 3; i++) {
931         if (parse->streamheader[i]) {
932           gst_buffer_unref (parse->streamheader[i]);
933           parse->streamheader[i] = NULL;
934         }
935       }
936       parse->streamheader_received = FALSE;
937       break;
938     default:
939       break;
940   }
941
942   return ret;
943 }