update for _get_caps() -> _query_caps()
[platform/upstream/gstreamer.git] / ext / theora / gsttheoraparse.c
1 /* GStreamer
2  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
3  * Copyright (C) 2006 Andy Wingo <wingo@pobox.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * SECTION:element-theoraparse
23  * @see_also: theoradec, oggdemux, vorbisparse
24  *
25  * The theoraparse element will parse the header packets of the Theora
26  * stream and put them as the streamheader in the caps. This is used in the
27  * multifdsink case where you want to stream live theora streams to multiple
28  * clients, each client has to receive the streamheaders first before they can
29  * consume the theora packets.
30  *
31  * This element also makes sure that the buffers that it pushes out are properly
32  * timestamped and that their offset and offset_end are set. The buffers that
33  * theoraparse outputs have all of the metadata that oggmux expects to receive,
34  * which allows you to (for example) remux an ogg/theora file.
35  *
36  * In addition, this element allows you to fix badly synchronized streams. You
37  * pass in an array of (granule time, buffer time) synchronization points via
38  * the synchronization-points GValueArray property, and this element will adjust
39  * the granulepos values that it outputs. The adjustment will be made by
40  * offsetting all buffers that it outputs by a specified amount, and updating
41  * that offset from the value array whenever a keyframe is processed.
42  *
43  * <refsect2>
44  * <title>Example pipelines</title>
45  * |[
46  * gst-launch -v filesrc location=video.ogg ! oggdemux ! theoraparse ! fakesink
47  * ]| This pipeline shows that the streamheader is set in the caps, and that each
48  * buffer has the timestamp, duration, offset, and offset_end set.
49  * |[
50  * gst-launch filesrc location=video.ogg ! oggdemux ! theoraparse \
51  *            ! oggmux ! filesink location=video-remuxed.ogg
52  * ]| This pipeline shows remuxing. video-remuxed.ogg might not be exactly the same
53  * as video.ogg, but they should produce exactly the same decoded data.
54  * </refsect2>
55  *
56  * Last reviewed on 2008-05-28 (0.10.20)
57  */
58
59 #ifdef HAVE_CONFIG_H
60 #  include "config.h"
61 #endif
62
63 #include "gsttheoraparse.h"
64
65 #define GST_CAT_DEFAULT theoraparse_debug
66 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
67
68 static GstStaticPadTemplate theora_parse_sink_factory =
69 GST_STATIC_PAD_TEMPLATE ("sink",
70     GST_PAD_SINK,
71     GST_PAD_ALWAYS,
72     GST_STATIC_CAPS ("video/x-theora")
73     );
74
75 static GstStaticPadTemplate theora_parse_src_factory =
76 GST_STATIC_PAD_TEMPLATE ("src",
77     GST_PAD_SRC,
78     GST_PAD_ALWAYS,
79     GST_STATIC_CAPS ("video/x-theora")
80     );
81
82 enum
83 {
84   PROP_0,
85   PROP_SYNCHRONIZATION_POINTS
86 };
87
88 #define gst_theora_parse_parent_class parent_class
89 G_DEFINE_TYPE (GstTheoraParse, gst_theora_parse, GST_TYPE_ELEMENT);
90
91 static void theora_parse_dispose (GObject * object);
92 static void theora_parse_get_property (GObject * object, guint prop_id,
93     GValue * value, GParamSpec * pspec);
94 static void theora_parse_set_property (GObject * object, guint prop_id,
95     const GValue * value, GParamSpec * pspec);
96
97 static GstFlowReturn theora_parse_chain (GstPad * pad, GstBuffer * buffer);
98 static GstStateChangeReturn theora_parse_change_state (GstElement * element,
99     GstStateChange transition);
100 static gboolean theora_parse_sink_event (GstPad * pad, GstEvent * event);
101 static gboolean theora_parse_src_query (GstPad * pad, GstQuery * query);
102
103 static void
104 gst_theora_parse_class_init (GstTheoraParseClass * klass)
105 {
106   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
107   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
108
109   gobject_class->dispose = theora_parse_dispose;
110   gobject_class->get_property = theora_parse_get_property;
111   gobject_class->set_property = theora_parse_set_property;
112
113   /**
114    * GstTheoraParse:sychronization-points
115    *
116    * An array of (granuletime, buffertime) pairs
117    *
118    * Since: 0.10.10
119    */
120   g_object_class_install_property (gobject_class, PROP_SYNCHRONIZATION_POINTS,
121       g_param_spec_value_array ("synchronization-points",
122           "Synchronization points",
123           "An array of (granuletime, buffertime) pairs",
124           g_param_spec_uint64 ("time", "Time",
125               "Time (either granuletime or buffertime)", 0, G_MAXUINT64, 0,
126               G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS),
127           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
128
129
130   gst_element_class_add_pad_template (gstelement_class,
131       gst_static_pad_template_get (&theora_parse_src_factory));
132   gst_element_class_add_pad_template (gstelement_class,
133       gst_static_pad_template_get (&theora_parse_sink_factory));
134   gst_element_class_set_details_simple (gstelement_class,
135       "Theora video parser", "Codec/Parser/Video",
136       "parse raw theora streams", "Andy Wingo <wingo@pobox.com>");
137
138   gstelement_class->change_state = theora_parse_change_state;
139
140   GST_DEBUG_CATEGORY_INIT (theoraparse_debug, "theoraparse", 0,
141       "Theora parser");
142 }
143
144 static void
145 gst_theora_parse_init (GstTheoraParse * parse)
146 {
147   parse->sinkpad =
148       gst_pad_new_from_static_template (&theora_parse_sink_factory, "sink");
149   gst_pad_set_chain_function (parse->sinkpad, theora_parse_chain);
150   gst_pad_set_event_function (parse->sinkpad, theora_parse_sink_event);
151   gst_element_add_pad (GST_ELEMENT (parse), parse->sinkpad);
152
153   parse->srcpad =
154       gst_pad_new_from_static_template (&theora_parse_src_factory, "src");
155   gst_pad_set_query_function (parse->srcpad, theora_parse_src_query);
156   gst_element_add_pad (GST_ELEMENT (parse), parse->srcpad);
157 }
158
159 static void
160 theora_parse_dispose (GObject * object)
161 {
162   GstTheoraParse *parse = GST_THEORA_PARSE (object);
163
164   g_free (parse->times);
165   parse->times = NULL;
166
167   G_OBJECT_CLASS (parent_class)->dispose (object);
168 }
169
170 static void
171 theora_parse_set_property (GObject * object, guint prop_id,
172     const GValue * value, GParamSpec * pspec)
173 {
174   GstTheoraParse *parse = GST_THEORA_PARSE (object);
175
176   switch (prop_id) {
177     case PROP_SYNCHRONIZATION_POINTS:
178     {
179       GValueArray *array;
180       guint i;
181
182       array = g_value_get_boxed (value);
183
184       if (array) {
185         if (array->n_values % 2)
186           goto odd_values;
187
188         g_free (parse->times);
189         parse->times = g_new (GstClockTime, array->n_values);
190         parse->npairs = array->n_values / 2;
191         for (i = 0; i < array->n_values; i++)
192           parse->times[i] = g_value_get_uint64 (&array->values[i]);
193       } else {
194         g_free (parse->times);
195         parse->npairs = 0;
196       }
197     }
198       break;
199     default:
200       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
201       break;
202   }
203
204   return;
205
206 odd_values:
207   {
208     g_critical ("expected an even number of time values for "
209         "synchronization-points");
210     return;
211   }
212 }
213
214 static void
215 theora_parse_get_property (GObject * object, guint prop_id,
216     GValue * value, GParamSpec * pspec)
217 {
218   GstTheoraParse *parse = GST_THEORA_PARSE (object);
219
220   switch (prop_id) {
221     case PROP_SYNCHRONIZATION_POINTS:
222     {
223       GValueArray *array = NULL;
224       guint i;
225
226       array = g_value_array_new (parse->npairs * 2);
227
228       for (i = 0; i < parse->npairs; i++) {
229         GValue v = { 0, };
230
231         g_value_init (&v, G_TYPE_UINT64);
232         g_value_set_uint64 (&v, parse->times[i * 2]);
233         g_value_array_append (array, &v);
234         g_value_set_uint64 (&v, parse->times[i * 2 + 1]);
235         g_value_array_append (array, &v);
236         g_value_unset (&v);
237       }
238
239       g_value_set_boxed (value, array);
240     }
241       break;
242     default:
243       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
244       break;
245   }
246 }
247
248 static void
249 theora_parse_set_header_on_caps (GstTheoraParse * parse, GstCaps * caps)
250 {
251   GstBuffer **bufs;
252   GstStructure *structure;
253   gint i;
254   GValue array = { 0 };
255   GValue value = { 0 };
256
257   bufs = parse->streamheader;
258   structure = gst_caps_get_structure (caps, 0);
259   g_value_init (&array, GST_TYPE_ARRAY);
260
261   for (i = 0; i < 3; i++) {
262     if (bufs[i] == NULL)
263       continue;
264
265     bufs[i] = gst_buffer_make_writable (bufs[i]);
266     GST_BUFFER_FLAG_SET (bufs[i], GST_BUFFER_FLAG_IN_CAPS);
267
268     g_value_init (&value, GST_TYPE_BUFFER);
269     gst_value_set_buffer (&value, bufs[i]);
270     gst_value_array_append_value (&array, &value);
271     g_value_unset (&value);
272   }
273
274   gst_structure_set_value (structure, "streamheader", &array);
275   g_value_unset (&array);
276 }
277
278 /* two tasks to do here: set the streamheader on the caps, and use libtheora to
279    parse the headers */
280 static void
281 theora_parse_set_streamheader (GstTheoraParse * parse)
282 {
283   GstCaps *caps;
284   gint i;
285   guint32 bitstream_version;
286   th_setup_info *setup = NULL;
287
288   g_assert (!parse->streamheader_received);
289
290   caps = gst_caps_make_writable (gst_pad_query_caps (parse->srcpad, NULL));
291   theora_parse_set_header_on_caps (parse, caps);
292   GST_DEBUG_OBJECT (parse, "here are the caps: %" GST_PTR_FORMAT, caps);
293   gst_pad_set_caps (parse->srcpad, caps);
294   gst_caps_unref (caps);
295
296   for (i = 0; i < 3; i++) {
297     ogg_packet packet;
298     GstBuffer *buf;
299     int ret;
300     gsize size;
301
302     buf = parse->streamheader[i];
303     if (buf == NULL)
304       continue;
305
306     packet.packet = gst_buffer_map (buf, &size, NULL, GST_MAP_READ);
307     packet.bytes = size;
308     packet.granulepos = GST_BUFFER_OFFSET_END (buf);
309     packet.packetno = i + 1;
310     packet.e_o_s = 0;
311     packet.b_o_s = (i == 0);
312     ret = th_decode_headerin (&parse->info, &parse->comment, &setup, &packet);
313     gst_buffer_unmap (buf, packet.packet, size);
314     if (ret < 0) {
315       GST_WARNING_OBJECT (parse, "Failed to decode Theora header %d: %d\n",
316           i + 1, ret);
317     }
318   }
319   if (setup) {
320     th_setup_free (setup);
321   }
322
323   parse->fps_n = parse->info.fps_numerator;
324   parse->fps_d = parse->info.fps_denominator;
325   parse->shift = parse->info.keyframe_granule_shift;
326
327   /* With libtheora-1.0beta1 the granulepos scheme was changed:
328    * where earlier the granulepos refered to the index/beginning
329    * of a frame, it now refers to the end, which matches the use
330    * in vorbis/speex. We check the bitstream version from the header so
331    * we know which way to interpret the incoming granuepos
332    */
333   bitstream_version = (parse->info.version_major << 16) |
334       (parse->info.version_minor << 8) | parse->info.version_subminor;
335   parse->is_old_bitstream = (bitstream_version <= 0x00030200);
336
337   parse->streamheader_received = TRUE;
338 }
339
340 static void
341 theora_parse_drain_event_queue (GstTheoraParse * parse)
342 {
343   while (parse->event_queue->length) {
344     GstEvent *event;
345
346     event = GST_EVENT_CAST (g_queue_pop_head (parse->event_queue));
347     gst_pad_event_default (parse->sinkpad, event);
348   }
349 }
350
351 static void
352 theora_parse_push_headers (GstTheoraParse * parse)
353 {
354   gint i;
355
356   theora_parse_drain_event_queue (parse);
357
358   if (!parse->streamheader_received)
359     theora_parse_set_streamheader (parse);
360
361   /* ignore return values, we pass along the result of pushing data packets only
362    */
363   for (i = 0; i < 3; i++) {
364     GstBuffer *buf;
365
366     if ((buf = parse->streamheader[i])) {
367       gst_pad_push (parse->srcpad, buf);
368       parse->streamheader[i] = NULL;
369     }
370   }
371 }
372
373 static void
374 theora_parse_clear_queue (GstTheoraParse * parse)
375 {
376   while (parse->buffer_queue->length) {
377     GstBuffer *buf;
378
379     buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));
380     gst_buffer_unref (buf);
381   }
382   while (parse->event_queue->length) {
383     GstEvent *event;
384
385     event = GST_EVENT_CAST (g_queue_pop_head (parse->event_queue));
386     gst_event_unref (event);
387   }
388 }
389
390 static gint64
391 make_granulepos (GstTheoraParse * parse, gint64 keyframe, gint64 frame)
392 {
393   gint64 iframe;
394
395   if (keyframe == -1)
396     keyframe = 0;
397   /* If using newer theora, offset the granulepos by +1, see comment in
398    * theora_parse_set_streamheader.
399    * 
400    * We don't increment keyframe directly, as internally we always index frames
401    * starting from 0 and we do some sanity checking below. */
402   if (!parse->is_old_bitstream)
403     iframe = keyframe + 1;
404   else
405     iframe = keyframe;
406
407   g_return_val_if_fail (frame >= keyframe, -1);
408   g_return_val_if_fail (frame - keyframe < 1 << parse->shift, -1);
409
410   return (iframe << parse->shift) + (frame - keyframe);
411 }
412
413 static void
414 parse_granulepos (GstTheoraParse * parse, gint64 granulepos,
415     gint64 * keyframe, gint64 * frame)
416 {
417   gint64 kf;
418
419   kf = granulepos >> parse->shift;
420   /* If using newer theora, offset the granulepos by -1, see comment
421    * in theora_parse_set_streamheader */
422   if (!parse->is_old_bitstream)
423     kf -= 1;
424   if (keyframe)
425     *keyframe = kf;
426   if (frame)
427     *frame = kf + (granulepos & ((1 << parse->shift) - 1));
428 }
429
430 static gboolean
431 is_keyframe (GstBuffer * buf)
432 {
433   gsize size;
434   guint8 data[1];
435
436   size = gst_buffer_get_size (buf);
437   if (size == 0)
438     return FALSE;
439
440   gst_buffer_extract (buf, 0, data, 1);
441
442   return ((data[0] & 0x40) == 0);
443 }
444
445 static void
446 theora_parse_munge_granulepos (GstTheoraParse * parse, GstBuffer * buf,
447     gint64 keyframe, gint64 frame)
448 {
449   gint64 frames_diff;
450   GstClockTimeDiff time_diff;
451
452   if (keyframe == frame) {
453     gint i;
454
455     /* update granule_offset */
456     for (i = 0; i < parse->npairs; i++) {
457       if (parse->times[i * 2] >= GST_BUFFER_OFFSET (buf))
458         break;
459     }
460     if (i > 0) {
461       /* time_diff gets reset below */
462       time_diff = parse->times[i * 2 - 1] - parse->times[i * 2 - 2];
463       parse->granule_offset = gst_util_uint64_scale (time_diff,
464           parse->fps_n, parse->fps_d * GST_SECOND);
465       parse->granule_offset <<= parse->shift;
466     }
467   }
468
469   frames_diff = parse->granule_offset >> parse->shift;
470   time_diff = gst_util_uint64_scale_int (GST_SECOND * frames_diff,
471       parse->fps_d, parse->fps_n);
472
473   GST_DEBUG_OBJECT (parse, "offsetting theora stream by %" G_GINT64_FORMAT
474       " frames (%" GST_TIME_FORMAT ")", frames_diff, GST_TIME_ARGS (time_diff));
475
476   GST_BUFFER_OFFSET_END (buf) += parse->granule_offset;
477   GST_BUFFER_OFFSET (buf) += time_diff;
478   GST_BUFFER_TIMESTAMP (buf) += time_diff;
479 }
480
481 static GstFlowReturn
482 theora_parse_push_buffer (GstTheoraParse * parse, GstBuffer * buf,
483     gint64 keyframe, gint64 frame)
484 {
485
486   GstClockTime this_time, next_time;
487
488   this_time = gst_util_uint64_scale_int (GST_SECOND * frame,
489       parse->fps_d, parse->fps_n);
490
491   next_time = gst_util_uint64_scale_int (GST_SECOND * (frame + 1),
492       parse->fps_d, parse->fps_n);
493
494   GST_BUFFER_OFFSET_END (buf) = make_granulepos (parse, keyframe, frame);
495   GST_BUFFER_OFFSET (buf) = this_time;
496   GST_BUFFER_TIMESTAMP (buf) = this_time;
497   GST_BUFFER_DURATION (buf) = next_time - this_time;
498
499   if (parse->times)
500     theora_parse_munge_granulepos (parse, buf, keyframe, frame);
501
502   GST_DEBUG_OBJECT (parse, "pushing buffer with granulepos %" G_GINT64_FORMAT
503       "|%" G_GINT64_FORMAT, keyframe, frame - keyframe);
504
505   return gst_pad_push (parse->srcpad, buf);
506 }
507
508 static GstFlowReturn
509 theora_parse_drain_queue_prematurely (GstTheoraParse * parse)
510 {
511   GstFlowReturn ret = GST_FLOW_OK;
512
513   /* got an EOS event, make sure to push out any buffers that were in the queue
514    * -- won't normally be the case, but this catches the
515    * didn't-get-a-granulepos-on-the-last-packet case. Assuming a continuous
516    * stream. */
517
518   GST_DEBUG_OBJECT (parse, "got EOS, draining queue");
519
520   /* if we get an eos before pushing the streamheaders, drain our events before
521    * eos */
522   theora_parse_drain_event_queue (parse);
523
524   while (!g_queue_is_empty (parse->buffer_queue)) {
525     GstBuffer *buf;
526
527     buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));
528
529     parse->prev_frame++;
530
531     if (is_keyframe (buf))
532       /* we have a keyframe */
533       parse->prev_keyframe = parse->prev_frame;
534     else
535       GST_BUFFER_FLAGS (buf) |= GST_BUFFER_FLAG_DELTA_UNIT;
536
537     if (parse->prev_keyframe < 0) {
538       if (GST_BUFFER_OFFSET_END_IS_VALID (buf)) {
539         parse_granulepos (parse, GST_BUFFER_OFFSET_END (buf),
540             &parse->prev_keyframe, NULL);
541       } else {
542         /* No previous keyframe known; can't extract one from this frame. That
543          * means we can't do any valid output for this frame, just continue to
544          * the next frame.
545          */
546         gst_buffer_unref (buf);
547         continue;
548       }
549     }
550
551     ret = theora_parse_push_buffer (parse, buf, parse->prev_keyframe,
552         parse->prev_frame);
553
554     if (ret != GST_FLOW_OK)
555       goto done;
556   }
557
558 done:
559   return ret;
560 }
561
562 static GstFlowReturn
563 theora_parse_drain_queue (GstTheoraParse * parse, gint64 granulepos)
564 {
565   GstFlowReturn ret = GST_FLOW_OK;
566   gint64 keyframe, prev_frame, frame;
567
568   parse_granulepos (parse, granulepos, &keyframe, &frame);
569
570   GST_DEBUG ("draining queue of length %d",
571       g_queue_get_length (parse->buffer_queue));
572
573   GST_LOG_OBJECT (parse, "gp %" G_GINT64_FORMAT ", kf %" G_GINT64_FORMAT
574       ", frame %" G_GINT64_FORMAT, granulepos, keyframe, frame);
575
576   prev_frame = frame - g_queue_get_length (parse->buffer_queue);
577
578   GST_LOG_OBJECT (parse,
579       "new prev %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT, prev_frame,
580       parse->prev_frame);
581
582   if (prev_frame < parse->prev_frame) {
583     GST_WARNING ("jumped %" G_GINT64_FORMAT
584         " frames backwards! not sure what to do here",
585         parse->prev_frame - prev_frame);
586     parse->prev_frame = prev_frame;
587   } else if (prev_frame > parse->prev_frame) {
588     GST_INFO ("discontinuity detected (%" G_GINT64_FORMAT
589         " frames)", prev_frame - parse->prev_frame);
590     if (keyframe <= prev_frame && keyframe > parse->prev_keyframe)
591       parse->prev_keyframe = keyframe;
592     parse->prev_frame = prev_frame;
593   }
594
595   while (!g_queue_is_empty (parse->buffer_queue)) {
596     GstBuffer *buf;
597
598     parse->prev_frame++;
599     g_assert (parse->prev_frame >= 0);
600
601     buf = GST_BUFFER_CAST (g_queue_pop_head (parse->buffer_queue));
602
603     if (is_keyframe (buf))
604       /* we have a keyframe */
605       parse->prev_keyframe = parse->prev_frame;
606     else
607       GST_BUFFER_FLAGS (buf) |= GST_BUFFER_FLAG_DELTA_UNIT;
608
609     ret = theora_parse_push_buffer (parse, buf, parse->prev_keyframe,
610         parse->prev_frame);
611
612     if (ret != GST_FLOW_OK)
613       goto done;
614   }
615
616 done:
617   return ret;
618 }
619
620 static GstFlowReturn
621 theora_parse_queue_buffer (GstTheoraParse * parse, GstBuffer * buf)
622 {
623   GstFlowReturn ret = GST_FLOW_OK;
624
625   buf = gst_buffer_make_writable (buf);
626
627   g_queue_push_tail (parse->buffer_queue, buf);
628
629   if (GST_BUFFER_OFFSET_END_IS_VALID (buf)) {
630     if (parse->prev_keyframe < 0) {
631       parse_granulepos (parse, GST_BUFFER_OFFSET_END (buf),
632           &parse->prev_keyframe, NULL);
633     }
634     ret = theora_parse_drain_queue (parse, GST_BUFFER_OFFSET_END (buf));
635   }
636
637   return ret;
638 }
639
640 static GstFlowReturn
641 theora_parse_chain (GstPad * pad, GstBuffer * buffer)
642 {
643   GstFlowReturn ret;
644   GstTheoraParse *parse;
645   guint8 *data, header;
646   gsize size;
647   gboolean have_header;
648
649   parse = GST_THEORA_PARSE (gst_pad_get_parent (pad));
650
651   have_header = FALSE;
652
653   data = gst_buffer_map (buffer, &size, NULL, GST_MAP_READ);
654   header = data[0];
655   gst_buffer_unmap (buffer, data, size);
656
657   if (size >= 1) {
658     if (header & 0x80)
659       have_header = TRUE;
660   }
661
662   if (have_header) {
663     if (parse->send_streamheader) {
664       /* we need to collect the headers still */
665       /* so put it on the streamheader list and return */
666       if (header >= 0x80 && header <= 0x82)
667         parse->streamheader[header - 0x80] = buffer;
668     }
669     ret = GST_FLOW_OK;
670   } else {
671     /* data packet, push the headers we collected before */
672     if (parse->send_streamheader) {
673       theora_parse_push_headers (parse);
674       parse->send_streamheader = FALSE;
675     }
676
677     ret = theora_parse_queue_buffer (parse, buffer);
678   }
679
680   gst_object_unref (parse);
681
682   return ret;
683 }
684
685 static gboolean
686 theora_parse_queue_event (GstTheoraParse * parse, GstEvent * event)
687 {
688   g_queue_push_tail (parse->event_queue, event);
689   return TRUE;
690 }
691
692 static gboolean
693 theora_parse_sink_event (GstPad * pad, GstEvent * event)
694 {
695   gboolean ret;
696   GstTheoraParse *parse;
697
698   parse = GST_THEORA_PARSE (gst_pad_get_parent (pad));
699
700   switch (GST_EVENT_TYPE (event)) {
701     case GST_EVENT_FLUSH_STOP:
702       theora_parse_clear_queue (parse);
703       parse->prev_keyframe = -1;
704       parse->prev_frame = -1;
705       ret = gst_pad_event_default (pad, event);
706       break;
707     case GST_EVENT_EOS:
708       theora_parse_drain_queue_prematurely (parse);
709       ret = gst_pad_event_default (pad, event);
710       break;
711     default:
712       if (parse->send_streamheader && GST_EVENT_IS_SERIALIZED (event))
713         ret = theora_parse_queue_event (parse, event);
714       else
715         ret = gst_pad_event_default (pad, event);
716       break;
717   }
718
719   gst_object_unref (parse);
720
721   return ret;
722 }
723
724 static gboolean
725 theora_parse_src_convert (GstPad * pad,
726     GstFormat src_format, gint64 src_value,
727     GstFormat * dest_format, gint64 * dest_value)
728 {
729   gboolean res = TRUE;
730   GstTheoraParse *parse;
731   guint64 scale = 1;
732
733   if (src_format == *dest_format) {
734     *dest_value = src_value;
735     return TRUE;
736   }
737
738   parse = GST_THEORA_PARSE (gst_pad_get_parent (pad));
739
740   /* we need the info part before we can done something */
741   if (!parse->streamheader_received)
742     goto no_header;
743
744   switch (src_format) {
745     case GST_FORMAT_BYTES:
746       switch (*dest_format) {
747         case GST_FORMAT_DEFAULT:
748           *dest_value = gst_util_uint64_scale_int (src_value, 2,
749               parse->info.pic_height * parse->info.pic_width * 3);
750           break;
751         case GST_FORMAT_TIME:
752           /* seems like a rather silly conversion, implement me if you like */
753         default:
754           res = FALSE;
755       }
756       break;
757     case GST_FORMAT_TIME:
758       switch (*dest_format) {
759         case GST_FORMAT_BYTES:
760           scale = 3 * (parse->info.pic_width * parse->info.pic_height) / 2;
761         case GST_FORMAT_DEFAULT:
762           *dest_value = scale * gst_util_uint64_scale (src_value,
763               parse->info.fps_numerator,
764               parse->info.fps_denominator * GST_SECOND);
765           break;
766         default:
767           GST_DEBUG_OBJECT (parse, "cannot convert to format %s",
768               gst_format_get_name (*dest_format));
769           res = FALSE;
770       }
771       break;
772     case GST_FORMAT_DEFAULT:
773       switch (*dest_format) {
774         case GST_FORMAT_TIME:
775           *dest_value = gst_util_uint64_scale (src_value,
776               GST_SECOND * parse->info.fps_denominator,
777               parse->info.fps_numerator);
778           break;
779         case GST_FORMAT_BYTES:
780           *dest_value = gst_util_uint64_scale_int (src_value,
781               3 * parse->info.pic_width * parse->info.pic_height, 2);
782           break;
783         default:
784           res = FALSE;
785       }
786       break;
787     default:
788       res = FALSE;
789   }
790 done:
791   gst_object_unref (parse);
792   return res;
793
794   /* ERRORS */
795 no_header:
796   {
797     GST_DEBUG_OBJECT (parse, "no header yet, cannot convert");
798     res = FALSE;
799     goto done;
800   }
801 }
802
803 static gboolean
804 theora_parse_src_query (GstPad * pad, GstQuery * query)
805 {
806   GstTheoraParse *parse;
807
808   gboolean res = FALSE;
809
810   parse = GST_THEORA_PARSE (gst_pad_get_parent (pad));
811
812   switch (GST_QUERY_TYPE (query)) {
813     case GST_QUERY_POSITION:
814     {
815       gint64 frame, value;
816       GstFormat my_format, format;
817       gint64 time;
818
819       frame = parse->prev_frame;
820
821       GST_LOG_OBJECT (parse,
822           "query %p: we have current frame: %" G_GINT64_FORMAT, query, frame);
823
824       /* parse format */
825       gst_query_parse_position (query, &format, NULL);
826
827       /* and convert to the final format in two steps with time as the 
828        * intermediate step */
829       my_format = GST_FORMAT_TIME;
830       if (!(res =
831               theora_parse_src_convert (parse->sinkpad, GST_FORMAT_DEFAULT,
832                   frame, &my_format, &time)))
833         goto error;
834
835       /* fixme: handle segments
836          time = (time - parse->segment.start) + parse->segment.time;
837        */
838
839       GST_LOG_OBJECT (parse,
840           "query %p: our time: %" GST_TIME_FORMAT " (conv to %s)",
841           query, GST_TIME_ARGS (time), gst_format_get_name (format));
842
843       if (!(res =
844               theora_parse_src_convert (pad, my_format, time, &format, &value)))
845         goto error;
846
847       gst_query_set_position (query, format, value);
848
849       GST_LOG_OBJECT (parse,
850           "query %p: we return %" G_GINT64_FORMAT " (format %u)", query, value,
851           format);
852
853       break;
854     }
855     case GST_QUERY_DURATION:
856       /* forward to peer for total */
857       if (!(res = gst_pad_query (GST_PAD_PEER (parse->sinkpad), query)))
858         goto error;
859       break;
860     case GST_QUERY_CONVERT:
861     {
862       GstFormat src_fmt, dest_fmt;
863       gint64 src_val, dest_val;
864
865       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
866       if (!(res =
867               theora_parse_src_convert (pad, src_fmt, src_val, &dest_fmt,
868                   &dest_val)))
869         goto error;
870
871       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
872       break;
873     }
874     default:
875       res = gst_pad_query_default (pad, query);
876       break;
877   }
878 done:
879   gst_object_unref (parse);
880
881   return res;
882
883   /* ERRORS */
884 error:
885   {
886     GST_DEBUG_OBJECT (parse, "query failed");
887     goto done;
888   }
889 }
890
891 static GstStateChangeReturn
892 theora_parse_change_state (GstElement * element, GstStateChange transition)
893 {
894   GstTheoraParse *parse = GST_THEORA_PARSE (element);
895   GstStateChangeReturn ret;
896   gint i;
897
898   switch (transition) {
899     case GST_STATE_CHANGE_READY_TO_PAUSED:
900       th_info_init (&parse->info);
901       th_comment_init (&parse->comment);
902       parse->send_streamheader = TRUE;
903       parse->buffer_queue = g_queue_new ();
904       parse->event_queue = g_queue_new ();
905       parse->prev_keyframe = -1;
906       parse->prev_frame = -1;
907       parse->granule_offset = 0;
908       break;
909     default:
910       break;
911   }
912
913   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
914
915   switch (transition) {
916     case GST_STATE_CHANGE_PAUSED_TO_READY:
917       th_info_clear (&parse->info);
918       th_comment_clear (&parse->comment);
919       theora_parse_clear_queue (parse);
920       g_queue_free (parse->buffer_queue);
921       g_queue_free (parse->event_queue);
922       parse->buffer_queue = NULL;
923       for (i = 0; i < 3; i++) {
924         if (parse->streamheader[i]) {
925           gst_buffer_unref (parse->streamheader[i]);
926           parse->streamheader[i] = NULL;
927         }
928       }
929       parse->streamheader_received = FALSE;
930       break;
931     default:
932       break;
933   }
934
935   return ret;
936 }