baseparse: implement leftover draining in pull mode
[platform/upstream/gst-plugins-good.git] / gst / audioparsers / gstbaseparse.c
1 /* GStreamer
2  * Copyright (C) 2008 Nokia Corporation. All rights reserved.
3  *   Contact: Stefan Kost <stefan.kost@nokia.com>
4  * Copyright (C) 2008 Sebastian Dröge <sebastian.droege@collabora.co.uk>.
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbaseparse
24  * @short_description: Base class for stream parsers
25  * @see_also: #GstBaseTransform
26  *
27  * This base class is for parser elements that process data and splits it 
28  * into separate audio/video/whatever frames.
29  *
30  * It provides for:
31  * <itemizedlist>
32  *   <listitem><para>One sinkpad and one srcpad</para></listitem>
33  *   <listitem><para>Handles state changes</para></listitem>
34  *   <listitem><para>Does flushing</para></listitem>
35  *   <listitem><para>Push mode</para></listitem>
36  *   <listitem><para>Pull mode</para></listitem>
37  *   <listitem><para>Handles events (NEWSEGMENT/EOS/FLUSH)</para></listitem>
38  *   <listitem><para>Handles seeking in both modes</para></listitem>
39  *   <listitem><para>
40  *        Handles POSITION/DURATION/SEEKING/FORMAT/CONVERT queries
41  *   </para></listitem>
42  * </itemizedlist>
43  *
44  * The purpose of this base class is to provide a basic functionality of
45  * a parser and share a lot of rather complex code.
46  *
47  * Description of the parsing mechanism:
48  * <orderedlist>
49  * <listitem>
50  *   <itemizedlist><title>Set-up phase</title>
51  *   <listitem><para>
52  *     GstBaseParse class calls @set_sink_caps to inform the subclass about
53  *     incoming sinkpad caps. Subclass should set the srcpad caps accordingly.
54  *   </para></listitem>
55  *   <listitem><para>
56  *     GstBaseParse calls @start to inform subclass that data processing is
57  *     about to start now.
58  *   </para></listitem>
59  *   <listitem><para>
60  *      At least in this point subclass needs to tell the GstBaseParse class
61  *      how big data chunks it wants to receive (min_frame_size). It can do 
62  *      this with @gst_base_parse_set_min_frame_size.
63  *   </para></listitem>
64  *   <listitem><para>
65  *      GstBaseParse class sets up appropriate data passing mode (pull/push)
66  *      and starts to process the data.
67  *   </para></listitem>
68  *   </itemizedlist>
69  * </listitem>
70  * <listitem>
71  *   <itemizedlist>
72  *   <title>Parsing phase</title>
73  *     <listitem><para>
74  *       GstBaseParse gathers at least min_frame_size bytes of data either 
75  *       by pulling it from upstream or collecting buffers into internal
76  *       #GstAdapter.
77  *     </para></listitem>
78  *     <listitem><para>
79  *       A buffer of min_frame_size bytes is passed to subclass with
80  *       @check_valid_frame. Subclass checks the contents and returns TRUE
81  *       if the buffer contains a valid frame. It also needs to set the
82  *       @framesize according to the detected frame size. If buffer didn't
83  *       contain a valid frame, this call must return FALSE and optionally
84  *       set the @skipsize value to inform base class that how many bytes
85  *       it needs to skip in order to find a valid frame. The passed buffer
86  *       is read-only.  Note that @check_valid_frame might receive any small
87  *       amount of input data when leftover data is being drained (e.g. at EOS).
88  *     </para></listitem>
89  *     <listitem><para>
90  *       After valid frame is found, it will be passed again to subclass with
91  *       @parse_frame call. Now subclass is responsible for parsing the
92  *       frame contents and setting the buffer timestamp, duration and caps.
93  *     </para></listitem>
94  *     <listitem><para>
95  *       Finally the buffer can be pushed downstream and parsing loop starts
96  *       over again.
97  *     </para></listitem>
98  *     <listitem><para>
99  *       During the parsing process GstBaseParseClass will handle both srcpad and
100  *       sinkpad events. They will be passed to subclass if @event or
101  *       @src_event callbacks have been provided.
102  *     </para></listitem>
103  *   </itemizedlist>
104  * </listitem>
105  * <listitem>
106  *   <itemizedlist><title>Shutdown phase</title>
107  *   <listitem><para>
108  *     GstBaseParse class calls @stop to inform the subclass that data
109  *     parsing will be stopped.
110  *   </para></listitem>
111  *   </itemizedlist>
112  * </listitem>
113  * </orderedlist>
114  *
115  * Subclass is responsible for providing pad template caps for
116  * source and sink pads. The pads need to be named "sink" and "src". It also 
117  * needs to set the fixed caps on srcpad, when the format is ensured (e.g. 
118  * when base class calls subclass' @set_sink_caps function).
119  *
120  * This base class uses GST_FORMAT_DEFAULT as a meaning of frames. So,
121  * subclass conversion routine needs to know that conversion from
122  * GST_FORMAT_TIME to GST_FORMAT_DEFAULT must return the
123  * frame number that can be found from the given byte position.
124  *
125  * GstBaseParse uses subclasses conversion methods also for seeking. If
126  * subclass doesn't provide @convert function, seeking will get disabled.
127  *
128  * Subclass @start and @stop functions will be called to inform the beginning
129  * and end of data processing.
130  *
131  * Things that subclass need to take care of:
132  * <itemizedlist>
133  *   <listitem><para>Provide pad templates</para></listitem>
134  *   <listitem><para>
135  *      Fixate the source pad caps when appropriate
136  *   </para></listitem>
137  *   <listitem><para>
138  *      Inform base class how big data chunks should be retrieved. This is
139  *      done with @gst_base_parse_set_min_frame_size function.
140  *   </para></listitem>
141  *   <listitem><para>
142  *      Examine data chunks passed to subclass with @check_valid_frame
143  *      and tell if they contain a valid frame
144  *   </para></listitem>
145  *   <listitem><para>
146  *      Set the caps and timestamp to frame that is passed to subclass with
147  *      @parse_frame function.
148  *   </para></listitem>
149  *   <listitem><para>Provide conversion functions</para></listitem>
150  *   <listitem><para>
151  *      Update the duration information with @gst_base_parse_set_duration
152  *   </para></listitem>
153  *   <listitem><para>
154  *      Alternatively, parsing (or specs) might yield a frames per seconds rate
155  *      which can be provided to GstBaseParse to enable it to cater for
156  *      buffer time metadata (which will be taken from upstream as much as possible).
157  *      Internally keeping track of frames and respective
158  *      sizes that have been pushed provides GstBaseParse with a bytes per frame
159  *      rate.  A default @convert (used if not overriden) will then use these
160  *      rates to perform obvious conversions.  These rates are also used to update
161  *      (estimated) duration at regular frame intervals.
162  *      If no (fixed) frames per second rate applies, default conversion will be
163  *      based on (estimated) bytes per second (but no default buffer metadata
164  *      can be provided in this case).
165  *   </para></listitem>
166  * </itemizedlist>
167  *
168  */
169
170 /* TODO:
171  *  - Better segment handling:
172  *    - NEWSEGMENT for gaps
173  *    - Not NEWSEGMENT starting at 0 but at first frame timestamp
174  *  - GstIndex support
175  *  - Seek table generation and subclass seek entry injection
176  *  - Accurate seeking
177  *  - In push mode provide a queue of adapter-"queued" buffers for upstream
178  *    buffer metadata
179  *  - Queue buffers/events until caps are set
180  *  - Let subclass decide if frames outside the segment should be dropped
181  *  - Send queries upstream
182  */
183
184 #ifdef HAVE_CONFIG_H
185 #  include "config.h"
186 #endif
187
188 #include <stdlib.h>
189 #include <string.h>
190
191 #include "gstbaseparse.h"
192
193 GST_DEBUG_CATEGORY_STATIC (gst_base_parse_debug);
194 #define GST_CAT_DEFAULT gst_base_parse_debug
195
196 /* Supported formats */
197 static GstFormat fmtlist[] = {
198   GST_FORMAT_DEFAULT,
199   GST_FORMAT_BYTES,
200   GST_FORMAT_TIME,
201   0
202 };
203
204 #define GST_BASE_PARSE_GET_PRIVATE(obj)  \
205     (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_PARSE, GstBaseParsePrivate))
206
207 struct _GstBaseParsePrivate
208 {
209   GstActivateMode pad_mode;
210
211   gint64 duration;
212   GstFormat duration_fmt;
213   gint64 estimated_duration;
214
215   guint min_frame_size;
216   gboolean passthrough;
217   guint fps_num, fps_den;
218   guint update_interval;
219
220   gboolean discont;
221   gboolean flushing;
222   gboolean drain;
223
224   gint64 offset;
225   GstClockTime next_ts;
226   GstClockTime prev_ts;
227   GstClockTime frame_duration;
228
229   guint64 framecount;
230   guint64 bytecount;
231   guint64 acc_duration;
232
233   GList *pending_events;
234
235   GstBuffer *cache;
236 };
237
238 struct _GstBaseParseClassPrivate
239 {
240   gpointer _padding;
241 };
242
243 static GstElementClass *parent_class = NULL;
244
245 static void gst_base_parse_base_init (gpointer g_class);
246 static void gst_base_parse_base_finalize (gpointer g_class);
247 static void gst_base_parse_class_init (GstBaseParseClass * klass);
248 static void gst_base_parse_init (GstBaseParse * parse,
249     GstBaseParseClass * klass);
250
251 GType
252 gst_base_parse_get_type (void)
253 {
254   static GType base_parse_type = 0;
255
256   if (!base_parse_type) {
257     static const GTypeInfo base_parse_info = {
258       sizeof (GstBaseParseClass),
259       (GBaseInitFunc) gst_base_parse_base_init,
260       (GBaseFinalizeFunc) gst_base_parse_base_finalize,
261       (GClassInitFunc) gst_base_parse_class_init,
262       NULL,
263       NULL,
264       sizeof (GstBaseParse),
265       0,
266       (GInstanceInitFunc) gst_base_parse_init,
267     };
268
269     base_parse_type = g_type_register_static (GST_TYPE_ELEMENT,
270         "GstBaseParse", &base_parse_info, G_TYPE_FLAG_ABSTRACT);
271   }
272   return base_parse_type;
273 }
274
275 static void gst_base_parse_finalize (GObject * object);
276
277 static gboolean gst_base_parse_sink_activate (GstPad * sinkpad);
278 static gboolean gst_base_parse_sink_activate_push (GstPad * pad,
279     gboolean active);
280 static gboolean gst_base_parse_sink_activate_pull (GstPad * pad,
281     gboolean active);
282 static gboolean gst_base_parse_handle_seek (GstBaseParse * parse,
283     GstEvent * event);
284
285 static gboolean gst_base_parse_src_event (GstPad * pad, GstEvent * event);
286 static gboolean gst_base_parse_sink_event (GstPad * pad, GstEvent * event);
287 static gboolean gst_base_parse_query (GstPad * pad, GstQuery * query);
288 static gboolean gst_base_parse_sink_setcaps (GstPad * pad, GstCaps * caps);
289 static const GstQueryType *gst_base_parse_get_querytypes (GstPad * pad);
290
291 static GstFlowReturn gst_base_parse_chain (GstPad * pad, GstBuffer * buffer);
292 static void gst_base_parse_loop (GstPad * pad);
293
294 static gboolean gst_base_parse_check_frame (GstBaseParse * parse,
295     GstBuffer * buffer, guint * framesize, gint * skipsize);
296
297 static GstFlowReturn gst_base_parse_parse_frame (GstBaseParse * parse,
298     GstBuffer * buffer);
299
300 static gboolean gst_base_parse_sink_eventfunc (GstBaseParse * parse,
301     GstEvent * event);
302
303 static gboolean gst_base_parse_src_eventfunc (GstBaseParse * parse,
304     GstEvent * event);
305
306 static gboolean gst_base_parse_is_seekable (GstBaseParse * parse);
307
308 gboolean gst_base_parse_convert (GstBaseParse * parse, GstFormat src_format,
309     gint64 src_value, GstFormat dest_format, gint64 * dest_value);
310
311 static void gst_base_parse_drain (GstBaseParse * parse);
312
313 static void
314 gst_base_parse_base_init (gpointer g_class)
315 {
316   GstBaseParseClass *klass = GST_BASE_PARSE_CLASS (g_class);
317   GstBaseParseClassPrivate *priv;
318
319   GST_DEBUG_CATEGORY_INIT (gst_base_parse_debug, "baseparse", 0,
320       "baseparse element");
321
322   /* TODO: Remove this once GObject supports class private data */
323   priv = g_slice_new0 (GstBaseParseClassPrivate);
324   if (klass->priv)
325     memcpy (priv, klass->priv, sizeof (GstBaseParseClassPrivate));
326   klass->priv = priv;
327 }
328
329 static void
330 gst_base_parse_base_finalize (gpointer g_class)
331 {
332   GstBaseParseClass *klass = GST_BASE_PARSE_CLASS (g_class);
333
334   g_slice_free (GstBaseParseClassPrivate, klass->priv);
335   klass->priv = NULL;
336 }
337
338 static void
339 gst_base_parse_finalize (GObject * object)
340 {
341   GstBaseParse *parse = GST_BASE_PARSE (object);
342   GstEvent **p_ev;
343
344   g_mutex_free (parse->parse_lock);
345   g_object_unref (parse->adapter);
346
347   if (parse->pending_segment) {
348     p_ev = &parse->pending_segment;
349     gst_event_replace (p_ev, NULL);
350   }
351   if (parse->close_segment) {
352     p_ev = &parse->close_segment;
353     gst_event_replace (p_ev, NULL);
354   }
355
356   if (parse->priv->cache) {
357     gst_buffer_unref (parse->priv->cache);
358     parse->priv->cache = NULL;
359   }
360
361   g_list_foreach (parse->priv->pending_events, (GFunc) gst_mini_object_unref,
362       NULL);
363   g_list_free (parse->priv->pending_events);
364   parse->priv->pending_events = NULL;
365
366   G_OBJECT_CLASS (parent_class)->finalize (object);
367 }
368
369 static void
370 gst_base_parse_class_init (GstBaseParseClass * klass)
371 {
372   GObjectClass *gobject_class;
373
374   gobject_class = G_OBJECT_CLASS (klass);
375   g_type_class_add_private (klass, sizeof (GstBaseParsePrivate));
376   parent_class = g_type_class_peek_parent (klass);
377   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_parse_finalize);
378
379   /* Default handlers */
380   klass->check_valid_frame = gst_base_parse_check_frame;
381   klass->parse_frame = gst_base_parse_parse_frame;
382   klass->event = gst_base_parse_sink_eventfunc;
383   klass->src_event = gst_base_parse_src_eventfunc;
384   klass->is_seekable = gst_base_parse_is_seekable;
385   klass->convert = gst_base_parse_convert;
386 }
387
388 static void
389 gst_base_parse_init (GstBaseParse * parse, GstBaseParseClass * bclass)
390 {
391   GstPadTemplate *pad_template;
392
393   GST_DEBUG_OBJECT (parse, "gst_base_parse_init");
394
395   parse->priv = GST_BASE_PARSE_GET_PRIVATE (parse);
396
397   pad_template =
398       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "sink");
399   g_return_if_fail (pad_template != NULL);
400   parse->sinkpad = gst_pad_new_from_template (pad_template, "sink");
401   gst_pad_set_event_function (parse->sinkpad,
402       GST_DEBUG_FUNCPTR (gst_base_parse_sink_event));
403   gst_pad_set_setcaps_function (parse->sinkpad,
404       GST_DEBUG_FUNCPTR (gst_base_parse_sink_setcaps));
405   gst_pad_set_chain_function (parse->sinkpad,
406       GST_DEBUG_FUNCPTR (gst_base_parse_chain));
407   gst_pad_set_activate_function (parse->sinkpad,
408       GST_DEBUG_FUNCPTR (gst_base_parse_sink_activate));
409   gst_pad_set_activatepush_function (parse->sinkpad,
410       GST_DEBUG_FUNCPTR (gst_base_parse_sink_activate_push));
411   gst_pad_set_activatepull_function (parse->sinkpad,
412       GST_DEBUG_FUNCPTR (gst_base_parse_sink_activate_pull));
413   gst_element_add_pad (GST_ELEMENT (parse), parse->sinkpad);
414
415   GST_DEBUG_OBJECT (parse, "sinkpad created");
416
417   pad_template =
418       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
419   g_return_if_fail (pad_template != NULL);
420   parse->srcpad = gst_pad_new_from_template (pad_template, "src");
421   gst_pad_set_event_function (parse->srcpad,
422       GST_DEBUG_FUNCPTR (gst_base_parse_src_event));
423   gst_pad_set_query_type_function (parse->srcpad,
424       GST_DEBUG_FUNCPTR (gst_base_parse_get_querytypes));
425   gst_pad_set_query_function (parse->srcpad,
426       GST_DEBUG_FUNCPTR (gst_base_parse_query));
427   gst_pad_use_fixed_caps (parse->srcpad);
428   gst_element_add_pad (GST_ELEMENT (parse), parse->srcpad);
429   GST_DEBUG_OBJECT (parse, "src created");
430
431   parse->parse_lock = g_mutex_new ();
432   parse->adapter = gst_adapter_new ();
433   parse->pending_segment = NULL;
434   parse->close_segment = NULL;
435
436   parse->priv->pad_mode = GST_ACTIVATE_NONE;
437   parse->priv->duration = -1;
438   parse->priv->min_frame_size = 1;
439   parse->priv->passthrough = FALSE;
440   parse->priv->discont = FALSE;
441   parse->priv->flushing = FALSE;
442   parse->priv->offset = 0;
443   GST_DEBUG_OBJECT (parse, "init ok");
444 }
445
446
447
448 /**
449  * gst_base_parse_check_frame:
450  * @parse: #GstBaseParse.
451  * @buffer: GstBuffer.
452  * @framesize: This will be set to tell the found frame size in bytes.
453  * @skipsize: Output parameter that tells how much data needs to be skipped
454  *            in order to find the following frame header.
455  *
456  * Default callback for check_valid_frame.
457  * 
458  * Returns: Always TRUE.
459  */
460 static gboolean
461 gst_base_parse_check_frame (GstBaseParse * parse,
462     GstBuffer * buffer, guint * framesize, gint * skipsize)
463 {
464   *framesize = GST_BUFFER_SIZE (buffer);
465   *skipsize = 0;
466   return TRUE;
467 }
468
469
470 /**
471  * gst_base_parse_parse_frame:
472  * @parse: #GstBaseParse.
473  * @buffer: #GstBuffer.
474  *
475  * Default callback for parse_frame.
476  */
477 static GstFlowReturn
478 gst_base_parse_parse_frame (GstBaseParse * parse, GstBuffer * buffer)
479 {
480   if (!GST_BUFFER_TIMESTAMP_IS_VALID (buffer) &&
481       GST_CLOCK_TIME_IS_VALID (parse->priv->next_ts)) {
482     GST_BUFFER_TIMESTAMP (buffer) = parse->priv->next_ts;
483   }
484   if (!GST_BUFFER_DURATION_IS_VALID (buffer) &&
485       GST_CLOCK_TIME_IS_VALID (parse->priv->frame_duration)) {
486     GST_BUFFER_DURATION (buffer) = parse->priv->frame_duration;
487   }
488   return GST_FLOW_OK;
489 }
490
491
492 /**
493  * gst_base_parse_bytepos_to_time:
494  * @parse: #GstBaseParse.
495  * @bytepos: Position (in bytes) to be converted.
496  * @pos_in_time: #GstClockTime pointer where the result is set.
497  *
498  * Convert given byte position into #GstClockTime format.
499  * 
500  * Returns: TRUE if conversion succeeded.
501  */
502 static gboolean
503 gst_base_parse_bytepos_to_time (GstBaseParse * parse, gint64 bytepos,
504     GstClockTime * pos_in_time)
505 {
506   GstBaseParseClass *klass;
507   gboolean res = FALSE;
508
509   klass = GST_BASE_PARSE_GET_CLASS (parse);
510
511   if (klass->convert) {
512     res = klass->convert (parse, GST_FORMAT_BYTES, bytepos,
513         GST_FORMAT_TIME, (gint64 *) pos_in_time);
514   }
515   return res;
516 }
517
518
519 /**
520  * gst_base_parse_sink_event:
521  * @pad: #GstPad that received the event.
522  * @event: #GstEvent to be handled.
523  *
524  * Handler for sink pad events.
525  *
526  * Returns: TRUE if the event was handled.
527  */
528 static gboolean
529 gst_base_parse_sink_event (GstPad * pad, GstEvent * event)
530 {
531   GstBaseParse *parse;
532   GstBaseParseClass *bclass;
533   gboolean handled = FALSE;
534   gboolean ret = TRUE;
535
536
537   parse = GST_BASE_PARSE (gst_pad_get_parent (pad));
538   bclass = GST_BASE_PARSE_GET_CLASS (parse);
539
540   GST_DEBUG_OBJECT (parse, "handling event %d", GST_EVENT_TYPE (event));
541
542   /* Cache all events except EOS, NEWSEGMENT and FLUSH_STOP if we have a
543    * pending segment */
544   if (parse->pending_segment && GST_EVENT_TYPE (event) != GST_EVENT_EOS
545       && GST_EVENT_TYPE (event) != GST_EVENT_NEWSEGMENT
546       && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_START
547       && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
548     parse->priv->pending_events =
549         g_list_append (parse->priv->pending_events, event);
550     ret = TRUE;
551   } else {
552
553     if (bclass->event)
554       handled = bclass->event (parse, event);
555
556     if (!handled)
557       ret = gst_pad_event_default (pad, event);
558   }
559
560   gst_object_unref (parse);
561   GST_DEBUG_OBJECT (parse, "event handled");
562   return ret;
563 }
564
565
566 /**
567  * gst_base_parse_sink_eventfunc:
568  * @parse: #GstBaseParse.
569  * @event: #GstEvent to be handled.
570  *
571  * Element-level event handler function.
572  *
573  * Returns: TRUE if the event was handled and not need forwarding.
574  */
575 static gboolean
576 gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event)
577 {
578   gboolean handled = FALSE;
579   GstEvent **eventp;
580
581   switch (GST_EVENT_TYPE (event)) {
582     case GST_EVENT_NEWSEGMENT:
583     {
584       gdouble rate, applied_rate;
585       GstFormat format;
586       gint64 start, stop, pos, offset = 0;
587       gboolean update;
588
589       gst_event_parse_new_segment_full (event, &update, &rate, &applied_rate,
590           &format, &start, &stop, &pos);
591
592
593       if (format == GST_FORMAT_BYTES) {
594         GstClockTime seg_start, seg_stop, seg_pos;
595
596         /* stop time is allowed to be open-ended, but not start & pos */
597         seg_stop = GST_CLOCK_TIME_NONE;
598         offset = pos;
599
600         if (gst_base_parse_bytepos_to_time (parse, start, &seg_start) &&
601             gst_base_parse_bytepos_to_time (parse, pos, &seg_pos)) {
602           gst_event_unref (event);
603           event = gst_event_new_new_segment_full (update, rate, applied_rate,
604               GST_FORMAT_TIME, seg_start, seg_stop, seg_pos);
605           format = GST_FORMAT_TIME;
606           GST_DEBUG_OBJECT (parse, "Converted incoming segment to TIME. "
607               "start = %" GST_TIME_FORMAT ", stop = %" GST_TIME_FORMAT
608               ", pos = %" GST_TIME_FORMAT, GST_TIME_ARGS (seg_start),
609               GST_TIME_ARGS (seg_stop), GST_TIME_ARGS (seg_pos));
610         }
611       }
612
613       if (format != GST_FORMAT_TIME) {
614         /* Unknown incoming segment format. Output a default open-ended 
615          * TIME segment */
616         gst_event_unref (event);
617         event = gst_event_new_new_segment_full (update, rate, applied_rate,
618             GST_FORMAT_TIME, 0, GST_CLOCK_TIME_NONE, 0);
619       }
620
621       gst_event_parse_new_segment_full (event, &update, &rate, &applied_rate,
622           &format, &start, &stop, &pos);
623
624       gst_segment_set_newsegment_full (&parse->segment, update, rate,
625           applied_rate, format, start, stop, pos);
626
627       GST_DEBUG_OBJECT (parse, "Created newseg rate %g, applied rate %g, "
628           "format %d, start = %" GST_TIME_FORMAT ", stop = %" GST_TIME_FORMAT
629           ", pos = %" GST_TIME_FORMAT, rate, applied_rate, format,
630           GST_TIME_ARGS (start), GST_TIME_ARGS (stop), GST_TIME_ARGS (pos));
631
632       /* save the segment for later, right before we push a new buffer so that
633        * the caps are fixed and the next linked element can receive
634        * the segment. */
635       eventp = &parse->pending_segment;
636       gst_event_replace (eventp, event);
637       gst_event_unref (event);
638       handled = TRUE;
639
640       /* but finish the current segment */
641       GST_DEBUG_OBJECT (parse, "draining current segment");
642       gst_base_parse_drain (parse);
643       gst_adapter_clear (parse->adapter);
644       parse->priv->offset = offset;
645       parse->priv->next_ts = start;
646       break;
647     }
648
649     case GST_EVENT_FLUSH_START:
650       parse->priv->flushing = TRUE;
651       handled = gst_pad_push_event (parse->srcpad, event);
652       /* Wait for _chain() to exit by taking the srcpad STREAM_LOCK */
653       GST_PAD_STREAM_LOCK (parse->srcpad);
654       GST_PAD_STREAM_UNLOCK (parse->srcpad);
655
656       break;
657
658     case GST_EVENT_FLUSH_STOP:
659       gst_adapter_clear (parse->adapter);
660       parse->priv->flushing = FALSE;
661       parse->priv->discont = TRUE;
662       break;
663
664     case GST_EVENT_EOS:
665       gst_base_parse_drain (parse);
666       break;
667
668     default:
669       break;
670   }
671
672   return handled;
673 }
674
675
676 /**
677  * gst_base_parse_src_event:
678  * @pad: #GstPad that received the event.
679  * @event: #GstEvent that was received.
680  *
681  * Handler for source pad events.
682  *
683  * Returns: TRUE if the event was handled.
684  */
685 static gboolean
686 gst_base_parse_src_event (GstPad * pad, GstEvent * event)
687 {
688   GstBaseParse *parse;
689   GstBaseParseClass *bclass;
690   gboolean handled = FALSE;
691   gboolean ret = TRUE;
692
693   parse = GST_BASE_PARSE (gst_pad_get_parent (pad));
694   bclass = GST_BASE_PARSE_GET_CLASS (parse);
695
696   GST_DEBUG_OBJECT (parse, "event %d, %s", GST_EVENT_TYPE (event),
697       GST_EVENT_TYPE_NAME (event));
698
699   if (bclass->src_event)
700     handled = bclass->src_event (parse, event);
701
702   if (!handled)
703     ret = gst_pad_event_default (pad, event);
704
705   gst_object_unref (parse);
706   return ret;
707 }
708
709
710 /**
711  * gst_base_parse_src_eventfunc:
712  * @parse: #GstBaseParse.
713  * @event: #GstEvent that was received.
714  *
715  * Default srcpad event handler.
716  *
717  * Returns: TRUE if the event was handled and can be dropped.
718  */
719 static gboolean
720 gst_base_parse_src_eventfunc (GstBaseParse * parse, GstEvent * event)
721 {
722   gboolean handled = FALSE;
723   GstBaseParseClass *bclass;
724
725   bclass = GST_BASE_PARSE_GET_CLASS (parse);
726
727   switch (GST_EVENT_TYPE (event)) {
728     case GST_EVENT_SEEK:
729     {
730       if (bclass->is_seekable (parse)) {
731         handled = gst_base_parse_handle_seek (parse, event);
732         gst_event_unref (event);
733       }
734       break;
735     }
736     default:
737       break;
738   }
739   return handled;
740 }
741
742
743 /**
744  * gst_base_parse_is_seekable:
745  * @parse: #GstBaseParse.
746  *
747  * Default handler for is_seekable.
748  *
749  * Returns: Always TRUE.
750  */
751 static gboolean
752 gst_base_parse_is_seekable (GstBaseParse * parse)
753 {
754   return TRUE;
755 }
756
757 /**
758  * gst_base_parse_convert:
759  * @parse: #GstBaseParse.
760  * @src_format: #GstFormat describing the source format.
761  * @src_value: Source value to be converted.
762  * @dest_format: #GstFormat defining the converted format.
763  * @dest_value: Pointer where the conversion result will be put.
764  *
765  * Implementation of "convert" vmethod in #GstBaseParse class.
766  *
767  * Returns: TRUE if conversion was successful.
768  */
769 gboolean
770 gst_base_parse_convert (GstBaseParse * parse,
771     GstFormat src_format,
772     gint64 src_value, GstFormat dest_format, gint64 * dest_value)
773 {
774   gboolean ret = FALSE;
775   guint64 bytes, duration;
776
777   if (G_UNLIKELY (src_format == dest_format)) {
778     *dest_value = src_value;
779     return TRUE;
780   }
781
782   if (G_UNLIKELY (src_value == -1)) {
783     *dest_value = -1;
784     return TRUE;
785   }
786
787   /* need at least some frames */
788   if (!parse->priv->framecount)
789     return FALSE;
790
791   /* either frame info (having num means den also ok) or use average bitrate */
792   if (parse->priv->fps_num) {
793     duration = parse->priv->framecount * parse->priv->fps_den * 1000;
794     bytes = parse->priv->bytecount * parse->priv->fps_num;
795   } else {
796     duration = parse->priv->acc_duration / GST_MSECOND;
797     bytes = parse->priv->bytecount;
798   }
799
800   if (G_UNLIKELY (!duration || !bytes))
801     return FALSE;
802
803   if (src_format == GST_FORMAT_BYTES) {
804     if (dest_format == GST_FORMAT_TIME) {
805       /* BYTES -> TIME conversion */
806       GST_DEBUG_OBJECT (parse, "converting bytes -> time");
807
808       *dest_value = gst_util_uint64_scale (src_value, duration, bytes);
809       *dest_value *= GST_MSECOND;
810       GST_DEBUG_OBJECT (parse, "conversion result: %" G_GINT64_FORMAT " ms",
811           *dest_value / GST_MSECOND);
812       ret = TRUE;
813     }
814   } else if (src_format == GST_FORMAT_TIME) {
815     GST_DEBUG_OBJECT (parse, "converting time -> bytes");
816     if (dest_format == GST_FORMAT_BYTES) {
817       *dest_value = gst_util_uint64_scale (src_value / GST_MSECOND, bytes,
818           duration);
819       GST_DEBUG_OBJECT (parse,
820           "time %" G_GINT64_FORMAT " ms in bytes = %" G_GINT64_FORMAT,
821           src_value / GST_MSECOND, *dest_value);
822       ret = TRUE;
823     }
824   } else if (src_format == GST_FORMAT_DEFAULT) {
825     /* DEFAULT == frame-based */
826     if (dest_format == GST_FORMAT_TIME) {
827       if (parse->priv->fps_den) {
828         *dest_value = gst_util_uint64_scale (src_value,
829             GST_SECOND * parse->priv->fps_den, parse->priv->fps_num);
830         ret = TRUE;
831       }
832     } else if (dest_format == GST_FORMAT_BYTES) {
833     }
834   }
835
836   return ret;
837 }
838
839 /**
840  * gst_base_parse_update_duration:
841  * @parse: #GstBaseParse.
842  *
843  */
844 static void
845 gst_base_parse_update_duration (GstBaseParse * aacparse)
846 {
847   GstPad *peer;
848   GstBaseParse *parse;
849   GstBaseParseClass *klass;
850
851   parse = GST_BASE_PARSE (aacparse);
852   klass = GST_BASE_PARSE_GET_CLASS (parse);
853
854   /* must be able to convert */
855   if (!klass->convert)
856     return;
857
858   peer = gst_pad_get_peer (parse->sinkpad);
859   if (peer) {
860     GstFormat pformat = GST_FORMAT_BYTES;
861     gboolean qres = FALSE;
862     gint64 ptot, dest_value;
863
864     qres = gst_pad_query_duration (peer, &pformat, &ptot);
865     gst_object_unref (GST_OBJECT (peer));
866     if (qres) {
867       if (klass->convert (parse, pformat, ptot, GST_FORMAT_TIME, &dest_value))
868         parse->priv->estimated_duration = dest_value;
869     }
870   }
871 }
872
873 /**
874  * gst_base_parse_handle_and_push_buffer:
875  * @parse: #GstBaseParse.
876  * @klass: #GstBaseParseClass.
877  * @buffer: #GstBuffer.
878  *
879  * Parses the frame from given buffer and pushes it forward. Also performs
880  * timestamp handling and checks the segment limits.
881  *
882  * This is called with srcpad STREAM_LOCK held.
883  *
884  * Returns: #GstFlowReturn
885  */
886 static GstFlowReturn
887 gst_base_parse_handle_and_push_buffer (GstBaseParse * parse,
888     GstBaseParseClass * klass, GstBuffer * buffer)
889 {
890   GstFlowReturn ret;
891
892   if (parse->priv->discont) {
893     GST_DEBUG_OBJECT (parse, "marking DISCONT");
894     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
895     parse->priv->discont = FALSE;
896   }
897
898   ret = klass->parse_frame (parse, buffer);
899
900   /* re-use default handler to add missing metadata as-much-as-possible */
901   gst_base_parse_parse_frame (parse, buffer);
902   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer) &&
903       GST_BUFFER_DURATION_IS_VALID (buffer)) {
904     parse->priv->next_ts =
905         GST_BUFFER_TIMESTAMP (buffer) + GST_BUFFER_DURATION (buffer);
906   } else {
907     /* we lost track, do not produce bogus time next time around
908      * (probably means parser subclass has given up on parsing as well) */
909     GST_DEBUG_OBJECT (parse, "no next fallback timestamp");
910     parse->priv->next_ts = GST_CLOCK_TIME_NONE;
911   }
912
913   /* First buffers are dropped, this means that the subclass needs more
914    * frames to decide on the format and queues them internally */
915   if (ret == GST_BASE_PARSE_FLOW_DROPPED && !GST_PAD_CAPS (parse->srcpad)) {
916     gst_buffer_unref (buffer);
917     return GST_FLOW_OK;
918   }
919
920   /* convert internal flow to OK and mark discont for the next buffer. */
921   if (ret == GST_BASE_PARSE_FLOW_DROPPED) {
922     parse->priv->discont = TRUE;
923     ret = GST_FLOW_OK;
924
925     gst_buffer_unref (buffer);
926
927     return ret;
928   } else if (ret != GST_FLOW_OK) {
929     return ret;
930   }
931
932   return gst_base_parse_push_buffer (parse, buffer);
933 }
934
935 /**
936  * gst_base_parse_push_buffer:
937  * @parse: #GstBaseParse.
938  * @buffer: #GstBuffer.
939  *
940  * Pushes the buffer downstream, sends any pending events and
941  * does some timestamp and segment handling.
942  *
943  * This must be called with srcpad STREAM_LOCK held.
944  *
945  * Returns: #GstFlowReturn
946  */
947 GstFlowReturn
948 gst_base_parse_push_buffer (GstBaseParse * parse, GstBuffer * buffer)
949 {
950   GstFlowReturn ret = GST_FLOW_OK;
951   GstClockTime last_stop = GST_CLOCK_TIME_NONE;
952
953   GST_LOG_OBJECT (parse,
954       "processing buffer of size %d with ts %" GST_TIME_FORMAT
955       ", duration %" GST_TIME_FORMAT, GST_BUFFER_SIZE (buffer),
956       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
957       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
958
959   /* update stats */
960   parse->priv->bytecount += GST_BUFFER_SIZE (buffer);
961   if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BASE_PARSE_BUFFER_FLAG_NO_FRAME)) {
962     parse->priv->framecount++;
963     if (GST_BUFFER_DURATION_IS_VALID (buffer)) {
964       parse->priv->acc_duration += GST_BUFFER_DURATION (buffer);
965     }
966   }
967   GST_BUFFER_FLAG_UNSET (buffer, GST_BASE_PARSE_BUFFER_FLAG_NO_FRAME);
968   if (parse->priv->update_interval &&
969       (parse->priv->framecount % parse->priv->update_interval) == 0)
970     gst_base_parse_update_duration (parse);
971
972   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer))
973     last_stop = GST_BUFFER_TIMESTAMP (buffer);
974   if (last_stop != GST_CLOCK_TIME_NONE && GST_BUFFER_DURATION_IS_VALID (buffer))
975     last_stop += GST_BUFFER_DURATION (buffer);
976
977   /* should have caps by now */
978   g_return_val_if_fail (GST_PAD_CAPS (parse->srcpad), GST_FLOW_ERROR);
979
980   gst_buffer_set_caps (buffer, GST_PAD_CAPS (parse->srcpad));
981
982   /* and should then also be linked downstream, so safe to send some events */
983   if (parse->priv->pad_mode == GST_ACTIVATE_PULL) {
984     if (G_UNLIKELY (parse->close_segment)) {
985       GST_DEBUG_OBJECT (parse, "loop sending close segment");
986       gst_pad_push_event (parse->srcpad, parse->close_segment);
987       parse->close_segment = NULL;
988     }
989
990     if (G_UNLIKELY (parse->pending_segment)) {
991       GST_DEBUG_OBJECT (parse, "loop push pending segment");
992       gst_pad_push_event (parse->srcpad, parse->pending_segment);
993       parse->pending_segment = NULL;
994     }
995   } else {
996     if (G_UNLIKELY (parse->pending_segment)) {
997       GST_DEBUG_OBJECT (parse, "chain pushing a pending segment");
998       gst_pad_push_event (parse->srcpad, parse->pending_segment);
999       parse->pending_segment = NULL;
1000     }
1001   }
1002
1003   if (G_UNLIKELY (parse->priv->pending_events)) {
1004     GList *l;
1005
1006     for (l = parse->priv->pending_events; l != NULL; l = l->next) {
1007       gst_pad_push_event (parse->srcpad, GST_EVENT (l->data));
1008     }
1009     g_list_free (parse->priv->pending_events);
1010     parse->priv->pending_events = NULL;
1011   }
1012
1013   /* TODO: Add to seek table */
1014
1015   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer) &&
1016       GST_CLOCK_TIME_IS_VALID (parse->segment.stop) &&
1017       GST_BUFFER_TIMESTAMP (buffer) > parse->segment.stop) {
1018     GST_LOG_OBJECT (parse, "Dropped frame, after segment");
1019     gst_buffer_unref (buffer);
1020   } else if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer) &&
1021       GST_BUFFER_DURATION_IS_VALID (buffer) &&
1022       GST_CLOCK_TIME_IS_VALID (parse->segment.start) &&
1023       GST_BUFFER_TIMESTAMP (buffer) + GST_BUFFER_DURATION (buffer)
1024       < parse->segment.start) {
1025     /* FIXME: subclass needs way to override the start as downstream might
1026      * need frames before for proper decoding */
1027     GST_LOG_OBJECT (parse, "Dropped frame, before segment");
1028     gst_buffer_unref (buffer);
1029   } else {
1030     ret = gst_pad_push (parse->srcpad, buffer);
1031     GST_LOG_OBJECT (parse, "frame (%d bytes) pushed: %d",
1032         GST_BUFFER_SIZE (buffer), ret);
1033   }
1034
1035   /* Update current running segment position */
1036   if (ret == GST_FLOW_OK && last_stop != GST_CLOCK_TIME_NONE)
1037     gst_segment_set_last_stop (&parse->segment, GST_FORMAT_TIME, last_stop);
1038
1039   return ret;
1040 }
1041
1042
1043 /**
1044  * gst_base_parse_drain:
1045  * @parse: #GstBaseParse.
1046  *
1047  * Drains the adapter until it is empty. It decreases the min_frame_size to
1048  * match the current adapter size and calls chain method until the adapter
1049  * is emptied or chain returns with error.
1050  */
1051 static void
1052 gst_base_parse_drain (GstBaseParse * parse)
1053 {
1054   guint avail;
1055
1056   GST_DEBUG_OBJECT (parse, "draining");
1057   parse->priv->drain = TRUE;
1058
1059   for (;;) {
1060     avail = gst_adapter_available (parse->adapter);
1061     if (!avail)
1062       break;
1063
1064     if (gst_base_parse_chain (parse->sinkpad, NULL) != GST_FLOW_OK) {
1065       break;
1066     }
1067
1068     /* nothing changed, maybe due to truncated frame; break infinite loop */
1069     if (avail == gst_adapter_available (parse->adapter)) {
1070       GST_DEBUG_OBJECT (parse, "no change during draining; flushing");
1071       gst_adapter_clear (parse->adapter);
1072     }
1073   }
1074
1075   parse->priv->drain = FALSE;
1076 }
1077
1078
1079 /**
1080  * gst_base_parse_chain:
1081  * @pad: #GstPad.
1082  * @buffer: #GstBuffer.
1083  *
1084  * Returns: #GstFlowReturn.
1085  */
1086 static GstFlowReturn
1087 gst_base_parse_chain (GstPad * pad, GstBuffer * buffer)
1088 {
1089   GstBaseParseClass *bclass;
1090   GstBaseParse *parse;
1091   GstFlowReturn ret = GST_FLOW_OK;
1092   GstBuffer *outbuf = NULL;
1093   GstBuffer *tmpbuf = NULL;
1094   guint fsize = 0;
1095   gint skip = -1;
1096   const guint8 *data;
1097   guint min_size;
1098   GstClockTime timestamp;
1099
1100   parse = GST_BASE_PARSE (GST_OBJECT_PARENT (pad));
1101   bclass = GST_BASE_PARSE_GET_CLASS (parse);
1102
1103   if (G_LIKELY (buffer)) {
1104     GST_LOG_OBJECT (parse, "buffer size: %d, offset = %" G_GINT64_FORMAT,
1105         GST_BUFFER_SIZE (buffer), GST_BUFFER_OFFSET (buffer));
1106     if (G_UNLIKELY (parse->priv->passthrough)) {
1107       buffer = gst_buffer_make_metadata_writable (buffer);
1108       return gst_base_parse_push_buffer (parse, buffer);
1109     } else
1110       gst_adapter_push (parse->adapter, buffer);
1111   }
1112
1113   /* Parse and push as many frames as possible */
1114   /* Stop either when adapter is empty or we are flushing */
1115   while (!parse->priv->flushing) {
1116     tmpbuf = gst_buffer_new ();
1117
1118     /* Synchronization loop */
1119     for (;;) {
1120       GST_BASE_PARSE_LOCK (parse);
1121       min_size = parse->priv->min_frame_size;
1122       GST_BASE_PARSE_UNLOCK (parse);
1123
1124       if (G_UNLIKELY (parse->priv->drain)) {
1125         min_size = gst_adapter_available (parse->adapter);
1126         GST_DEBUG_OBJECT (parse, "draining, data left: %d", min_size);
1127         if (G_UNLIKELY (!min_size)) {
1128           gst_buffer_unref (tmpbuf);
1129           goto done;
1130         }
1131       }
1132
1133       /* Collect at least min_frame_size bytes */
1134       if (gst_adapter_available (parse->adapter) < min_size) {
1135         GST_DEBUG_OBJECT (parse, "not enough data available (only %d bytes)",
1136             gst_adapter_available (parse->adapter));
1137         gst_buffer_unref (tmpbuf);
1138         goto done;
1139       }
1140
1141       data = gst_adapter_peek (parse->adapter, min_size);
1142       GST_BUFFER_DATA (tmpbuf) = (guint8 *) data;
1143       GST_BUFFER_SIZE (tmpbuf) = min_size;
1144       GST_BUFFER_OFFSET (tmpbuf) = parse->priv->offset;
1145       GST_BUFFER_FLAG_SET (tmpbuf, GST_MINI_OBJECT_FLAG_READONLY);
1146
1147       if (parse->priv->discont) {
1148         GST_DEBUG_OBJECT (parse, "marking DISCONT");
1149         GST_BUFFER_FLAG_SET (tmpbuf, GST_BUFFER_FLAG_DISCONT);
1150       }
1151
1152       skip = -1;
1153       if (bclass->check_valid_frame (parse, tmpbuf, &fsize, &skip)) {
1154         if (gst_adapter_available (parse->adapter) < fsize) {
1155           GST_DEBUG_OBJECT (parse,
1156               "found valid frame but not enough data available (only %d bytes)",
1157               gst_adapter_available (parse->adapter));
1158           gst_buffer_unref (tmpbuf);
1159           goto done;
1160         }
1161         break;
1162       }
1163       if (skip > 0) {
1164         GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip);
1165         gst_adapter_flush (parse->adapter, skip);
1166         parse->priv->offset += skip;
1167         parse->priv->discont = TRUE;
1168       } else if (skip == -1) {
1169         /* subclass didn't touch this value. By default we skip 1 byte */
1170         GST_LOG_OBJECT (parse, "finding sync, skipping 1 byte");
1171         gst_adapter_flush (parse->adapter, 1);
1172         parse->priv->offset++;
1173         parse->priv->discont = TRUE;
1174       }
1175       /* There is a possibility that subclass set the skip value to zero.
1176          This means that it has probably found a frame but wants to ask
1177          more data (by increasing the min_size) to be sure of this. */
1178     }
1179     gst_buffer_unref (tmpbuf);
1180     tmpbuf = NULL;
1181
1182     if (skip > 0) {
1183       /* Subclass found the sync, but still wants to skip some data */
1184       GST_LOG_OBJECT (parse, "skipping %d bytes", skip);
1185       gst_adapter_flush (parse->adapter, skip);
1186       parse->priv->offset += skip;
1187     }
1188
1189     /* Grab lock to prevent a race with FLUSH_START handler */
1190     GST_PAD_STREAM_LOCK (parse->srcpad);
1191
1192     /* FLUSH_START event causes the "flushing" flag to be set. In this
1193      * case we can leave the frame pushing loop */
1194     if (parse->priv->flushing) {
1195       GST_PAD_STREAM_UNLOCK (parse->srcpad);
1196       break;
1197     }
1198
1199     /* FIXME: Would it be more efficient to make a subbuffer instead? */
1200     outbuf = gst_adapter_take_buffer (parse->adapter, fsize);
1201     outbuf = gst_buffer_make_metadata_writable (outbuf);
1202
1203     /* Subclass may want to know the data offset */
1204     GST_BUFFER_OFFSET (outbuf) = parse->priv->offset;
1205     parse->priv->offset += fsize;
1206
1207     /* move along with upstream timestamp (if any),
1208      * but interpolate in between */
1209     timestamp = gst_adapter_prev_timestamp (parse->adapter, NULL);
1210     if (GST_CLOCK_TIME_IS_VALID (timestamp) &&
1211         (parse->priv->prev_ts != timestamp)) {
1212       parse->priv->prev_ts = parse->priv->next_ts = timestamp;
1213     }
1214
1215     ret = gst_base_parse_handle_and_push_buffer (parse, bclass, outbuf);
1216     GST_PAD_STREAM_UNLOCK (parse->srcpad);
1217
1218     if (ret != GST_FLOW_OK) {
1219       GST_LOG_OBJECT (parse, "push returned %d", ret);
1220       break;
1221     }
1222   }
1223
1224 done:
1225   GST_LOG_OBJECT (parse, "chain leaving");
1226   return ret;
1227 }
1228
1229 /* pull @size bytes at current offset,
1230  * i.e. at least try to and possibly return a shorter buffer if near the end */
1231 static GstFlowReturn
1232 gst_base_parse_pull_range (GstBaseParse * parse, guint size,
1233     GstBuffer ** buffer)
1234 {
1235   GstFlowReturn ret = GST_FLOW_OK;
1236
1237   g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
1238
1239   /* Caching here actually makes much less difference than one would expect.
1240    * We do it mainly to avoid pulling buffers of 1 byte all the time */
1241   if (parse->priv->cache) {
1242     guint64 cache_offset = GST_BUFFER_OFFSET (parse->priv->cache);
1243     guint cache_size = GST_BUFFER_SIZE (parse->priv->cache);
1244
1245     if (cache_offset <= parse->priv->offset &&
1246         (parse->priv->offset + size) <= (cache_offset + cache_size)) {
1247       *buffer = gst_buffer_create_sub (parse->priv->cache,
1248           parse->priv->offset - cache_offset, size);
1249       GST_BUFFER_OFFSET (*buffer) = parse->priv->offset;
1250       return GST_FLOW_OK;
1251     }
1252     /* not enough data in the cache, free cache and get a new one */
1253     gst_buffer_unref (parse->priv->cache);
1254     parse->priv->cache = NULL;
1255   }
1256
1257   /* refill the cache */
1258   ret =
1259       gst_pad_pull_range (parse->sinkpad, parse->priv->offset, MAX (size,
1260           64 * 1024), &parse->priv->cache);
1261   if (ret != GST_FLOW_OK) {
1262     parse->priv->cache = NULL;
1263     return ret;
1264   }
1265
1266   if (GST_BUFFER_SIZE (parse->priv->cache) >= size) {
1267     *buffer = gst_buffer_create_sub (parse->priv->cache, 0, size);
1268     GST_BUFFER_OFFSET (*buffer) = parse->priv->offset;
1269     return GST_FLOW_OK;
1270   }
1271
1272   /* Not possible to get enough data, try a last time with
1273    * requesting exactly the size we need */
1274   gst_buffer_unref (parse->priv->cache);
1275   parse->priv->cache = NULL;
1276
1277   ret = gst_pad_pull_range (parse->sinkpad, parse->priv->offset, size,
1278       &parse->priv->cache);
1279
1280   if (ret != GST_FLOW_OK) {
1281     GST_DEBUG_OBJECT (parse, "pull_range returned %d", ret);
1282     *buffer = NULL;
1283     return ret;
1284   }
1285
1286   if (GST_BUFFER_SIZE (parse->priv->cache) < size) {
1287     GST_DEBUG_OBJECT (parse, "Returning short buffer at offset %"
1288         G_GUINT64_FORMAT ": wanted %u bytes, got %u bytes", parse->priv->offset,
1289         size, GST_BUFFER_SIZE (parse->priv->cache));
1290
1291     *buffer = parse->priv->cache;
1292     parse->priv->cache = NULL;
1293
1294     return GST_FLOW_OK;
1295   }
1296
1297   *buffer = gst_buffer_create_sub (parse->priv->cache, 0, size);
1298   GST_BUFFER_OFFSET (*buffer) = parse->priv->offset;
1299
1300   return GST_FLOW_OK;
1301 }
1302
1303 /**
1304  * gst_base_parse_loop:
1305  * @pad: GstPad
1306  *
1307  * Loop that is used in pull mode to retrieve data from upstream.
1308  */
1309 static void
1310 gst_base_parse_loop (GstPad * pad)
1311 {
1312   GstBaseParse *parse;
1313   GstBaseParseClass *klass;
1314   GstBuffer *buffer, *outbuf;
1315   gboolean ret = FALSE;
1316   guint fsize = 0, min_size;
1317   gint skip = 0;
1318
1319   parse = GST_BASE_PARSE (gst_pad_get_parent (pad));
1320   klass = GST_BASE_PARSE_GET_CLASS (parse);
1321
1322   /* TODO: Check if we reach segment stop limit */
1323
1324   while (TRUE) {
1325
1326     GST_BASE_PARSE_LOCK (parse);
1327     min_size = parse->priv->min_frame_size;
1328     GST_BASE_PARSE_UNLOCK (parse);
1329
1330     ret = gst_base_parse_pull_range (parse, min_size, &buffer);
1331
1332     if (ret == GST_FLOW_UNEXPECTED)
1333       goto eos;
1334     else if (ret != GST_FLOW_OK)
1335       goto need_pause;
1336
1337     if (parse->priv->discont) {
1338       GST_DEBUG_OBJECT (parse, "marking DISCONT");
1339       GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1340     }
1341
1342     /* if we got a short read, inform subclass we are draining leftover
1343      * and no more is to be expected */
1344     if (GST_BUFFER_SIZE (buffer) < min_size)
1345       parse->priv->drain = TRUE;
1346
1347     skip = -1;
1348     if (klass->check_valid_frame (parse, buffer, &fsize, &skip)) {
1349       parse->priv->drain = FALSE;
1350       break;
1351     }
1352     parse->priv->drain = FALSE;
1353     if (skip > 0) {
1354       GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip);
1355       parse->priv->offset += skip;
1356       parse->priv->discont = TRUE;
1357     } else if (skip == -1) {
1358       GST_LOG_OBJECT (parse, "finding sync, skipping 1 byte");
1359       parse->priv->offset++;
1360       parse->priv->discont = TRUE;
1361     }
1362     GST_DEBUG_OBJECT (parse, "finding sync...");
1363     gst_buffer_unref (buffer);
1364   }
1365
1366   if (fsize <= GST_BUFFER_SIZE (buffer)) {
1367     outbuf = gst_buffer_create_sub (buffer, 0, fsize);
1368     GST_BUFFER_OFFSET (outbuf) = GST_BUFFER_OFFSET (buffer);
1369     gst_buffer_unref (buffer);
1370   } else {
1371     gst_buffer_unref (buffer);
1372     ret = gst_base_parse_pull_range (parse, fsize, &outbuf);
1373
1374     if (ret == GST_FLOW_UNEXPECTED)
1375       goto eos;
1376     else if (ret != GST_FLOW_OK)
1377       goto need_pause;
1378     if (GST_BUFFER_SIZE (outbuf) < fsize)
1379       goto eos;
1380   }
1381
1382   parse->priv->offset += fsize;
1383
1384   /* Does the subclass want to skip too? */
1385   if (skip > 0)
1386     parse->priv->offset += skip;
1387
1388   /* This always unrefs the outbuf, even if error occurs */
1389   ret = gst_base_parse_handle_and_push_buffer (parse, klass, outbuf);
1390
1391   if (ret != GST_FLOW_OK) {
1392     GST_DEBUG_OBJECT (parse, "flow: %s", gst_flow_get_name (ret));
1393     if (GST_FLOW_IS_FATAL (ret)) {
1394       GST_ELEMENT_ERROR (parse, STREAM, FAILED, (NULL),
1395           ("streaming task paused, reason: %s", gst_flow_get_name (ret)));
1396       gst_pad_push_event (parse->srcpad, gst_event_new_eos ());
1397     }
1398     goto need_pause;
1399   }
1400
1401   gst_object_unref (parse);
1402   return;
1403
1404 need_pause:
1405   {
1406     GST_LOG_OBJECT (parse, "pausing task");
1407     gst_pad_pause_task (pad);
1408     gst_object_unref (parse);
1409     return;
1410   }
1411 eos:
1412   {
1413     GST_LOG_OBJECT (parse, "pausing task %d", ret);
1414     gst_pad_push_event (parse->srcpad, gst_event_new_eos ());
1415     gst_pad_pause_task (pad);
1416     gst_object_unref (parse);
1417     return;
1418   }
1419 }
1420
1421
1422 /**
1423  * gst_base_parse_sink_activate:
1424  * @sinkpad: #GstPad to be activated.
1425  *
1426  * Returns: TRUE if activation succeeded.
1427  */
1428 static gboolean
1429 gst_base_parse_sink_activate (GstPad * sinkpad)
1430 {
1431   GstBaseParse *parse;
1432   gboolean result = TRUE;
1433
1434   parse = GST_BASE_PARSE (gst_pad_get_parent (sinkpad));
1435
1436   GST_DEBUG_OBJECT (parse, "sink activate");
1437
1438   if (gst_pad_check_pull_range (sinkpad)) {
1439     GST_DEBUG_OBJECT (parse, "trying to activate in pull mode");
1440     result = gst_pad_activate_pull (sinkpad, TRUE);
1441   } else {
1442     GST_DEBUG_OBJECT (parse, "trying to activate in push mode");
1443     result = gst_pad_activate_push (sinkpad, TRUE);
1444   }
1445
1446   GST_DEBUG_OBJECT (parse, "sink activate return %d", result);
1447   gst_object_unref (parse);
1448   return result;
1449 }
1450
1451
1452 /**
1453  * gst_base_parse_activate:
1454  * @parse: #GstBaseParse.
1455  * @active: TRUE if element will be activated, FALSE if disactivated.
1456  *
1457  * Returns: TRUE if the operation succeeded.
1458  */
1459 static gboolean
1460 gst_base_parse_activate (GstBaseParse * parse, gboolean active)
1461 {
1462   GstBaseParseClass *klass;
1463   gboolean result = FALSE;
1464
1465   GST_DEBUG_OBJECT (parse, "activate");
1466
1467   klass = GST_BASE_PARSE_GET_CLASS (parse);
1468
1469   if (active) {
1470     if (parse->priv->pad_mode == GST_ACTIVATE_NONE && klass->start)
1471       result = klass->start (parse);
1472
1473     GST_OBJECT_LOCK (parse);
1474     gst_segment_init (&parse->segment, GST_FORMAT_TIME);
1475     parse->priv->duration = -1;
1476     parse->priv->discont = TRUE;
1477     parse->priv->flushing = FALSE;
1478     parse->priv->offset = 0;
1479     parse->priv->update_interval = 0;
1480     parse->priv->fps_num = parse->priv->fps_den = 0;
1481     parse->priv->frame_duration = GST_CLOCK_TIME_NONE;
1482     parse->priv->framecount = 0;
1483     parse->priv->bytecount = 0;
1484     parse->priv->acc_duration = 0;
1485     parse->priv->estimated_duration = -1;
1486     parse->priv->next_ts = 0;
1487     parse->priv->passthrough = FALSE;
1488
1489     if (parse->pending_segment)
1490       gst_event_unref (parse->pending_segment);
1491
1492     parse->pending_segment =
1493         gst_event_new_new_segment (FALSE, parse->segment.rate,
1494         parse->segment.format,
1495         parse->segment.start, parse->segment.stop, parse->segment.last_stop);
1496
1497     GST_OBJECT_UNLOCK (parse);
1498   } else {
1499     /* We must make sure streaming has finished before resetting things
1500      * and calling the ::stop vfunc */
1501     GST_PAD_STREAM_LOCK (parse->sinkpad);
1502     GST_PAD_STREAM_UNLOCK (parse->sinkpad);
1503
1504     if (parse->priv->pad_mode != GST_ACTIVATE_NONE && klass->stop)
1505       result = klass->stop (parse);
1506
1507     g_list_foreach (parse->priv->pending_events, (GFunc) gst_mini_object_unref,
1508         NULL);
1509     g_list_free (parse->priv->pending_events);
1510     parse->priv->pending_events = NULL;
1511
1512     if (parse->priv->cache) {
1513       gst_buffer_unref (parse->priv->cache);
1514       parse->priv->cache = NULL;
1515     }
1516
1517     parse->priv->pad_mode = GST_ACTIVATE_NONE;
1518   }
1519   GST_DEBUG_OBJECT (parse, "activate: %d", result);
1520   return result;
1521 }
1522
1523
1524 /**
1525  * gst_base_parse_sink_activate_push:
1526  * @pad: #GstPad to be (de)activated.
1527  * @active: TRUE when activating, FALSE when deactivating.
1528  *
1529  * Returns: TRUE if (de)activation succeeded.
1530  */
1531 static gboolean
1532 gst_base_parse_sink_activate_push (GstPad * pad, gboolean active)
1533 {
1534   gboolean result = TRUE;
1535   GstBaseParse *parse;
1536
1537   parse = GST_BASE_PARSE (gst_pad_get_parent (pad));
1538
1539   GST_DEBUG_OBJECT (parse, "sink activate push");
1540
1541   result = gst_base_parse_activate (parse, active);
1542
1543   if (result)
1544     parse->priv->pad_mode = active ? GST_ACTIVATE_PUSH : GST_ACTIVATE_NONE;
1545
1546   GST_DEBUG_OBJECT (parse, "sink activate push: %d", result);
1547
1548   gst_object_unref (parse);
1549   return result;
1550 }
1551
1552
1553 /**
1554  * gst_base_parse_sink_activate_pull:
1555  * @sinkpad: #GstPad to be (de)activated.
1556  * @active: TRUE when activating, FALSE when deactivating.
1557  *
1558  * Returns: TRUE if (de)activation succeeded.
1559  */
1560 static gboolean
1561 gst_base_parse_sink_activate_pull (GstPad * sinkpad, gboolean active)
1562 {
1563   gboolean result = FALSE;
1564   GstBaseParse *parse;
1565
1566   parse = GST_BASE_PARSE (gst_pad_get_parent (sinkpad));
1567
1568   GST_DEBUG_OBJECT (parse, "activate pull");
1569
1570   result = gst_base_parse_activate (parse, active);
1571
1572   if (result) {
1573     if (active) {
1574       result &= gst_pad_start_task (sinkpad,
1575           (GstTaskFunction) gst_base_parse_loop, sinkpad);
1576     } else {
1577       result &= gst_pad_stop_task (sinkpad);
1578     }
1579   }
1580
1581   if (result)
1582     parse->priv->pad_mode = active ? GST_ACTIVATE_PULL : GST_ACTIVATE_NONE;
1583
1584   GST_DEBUG_OBJECT (parse, "sink activate pull: %d", result);
1585
1586   gst_object_unref (parse);
1587   return result;
1588 }
1589
1590
1591 /**
1592  * gst_base_parse_set_duration:
1593  * @parse: #GstBaseParse.
1594  * @fmt: #GstFormat.
1595  * @duration: duration value.
1596  *
1597  * Sets the duration of the currently playing media. Subclass can use this
1598  * when it notices a change in the media duration.
1599  */
1600 void
1601 gst_base_parse_set_duration (GstBaseParse * parse,
1602     GstFormat fmt, gint64 duration)
1603 {
1604   g_return_if_fail (parse != NULL);
1605
1606   GST_BASE_PARSE_LOCK (parse);
1607   if (duration != parse->priv->duration) {
1608     GstMessage *m;
1609
1610     m = gst_message_new_duration (GST_OBJECT (parse), fmt, duration);
1611     gst_element_post_message (GST_ELEMENT (parse), m);
1612
1613     /* TODO: what about duration tag? */
1614   }
1615   parse->priv->duration = duration;
1616   parse->priv->duration_fmt = fmt;
1617   GST_DEBUG_OBJECT (parse, "set duration: %" G_GINT64_FORMAT, duration);
1618   GST_BASE_PARSE_UNLOCK (parse);
1619 }
1620
1621
1622 /**
1623  * gst_base_parse_set_min_frame_size:
1624  * @parse: #GstBaseParse.
1625  * @min_size: Minimum size of the data that this base class should give to
1626  *            subclass.
1627  *
1628  * Subclass can use this function to tell the base class that it needs to
1629  * give at least #min_size buffers.
1630  */
1631 void
1632 gst_base_parse_set_min_frame_size (GstBaseParse * parse, guint min_size)
1633 {
1634   g_return_if_fail (parse != NULL);
1635
1636   GST_BASE_PARSE_LOCK (parse);
1637   parse->priv->min_frame_size = min_size;
1638   GST_LOG_OBJECT (parse, "set frame_min_size: %d", min_size);
1639   GST_BASE_PARSE_UNLOCK (parse);
1640 }
1641
1642 /**
1643  * gst_base_transform_set_passthrough:
1644  * @trans: the #GstBaseParse to set
1645  * @passthrough: boolean indicating passthrough mode.
1646  *
1647  * Set passthrough mode for this parser.  If operating in passthrough,
1648  * incoming buffers are pushed through unmodified.
1649  */
1650 void
1651 gst_base_parse_set_passthrough (GstBaseParse * parse, gboolean passthrough)
1652 {
1653   g_return_if_fail (parse != NULL);
1654
1655   GST_BASE_PARSE_LOCK (parse);
1656   parse->priv->passthrough = passthrough;
1657   GST_LOG_OBJECT (parse, "set passthrough: %d", passthrough);
1658   GST_BASE_PARSE_UNLOCK (parse);
1659 }
1660
1661 /**
1662  * gst_base_transform_set_frame_props:
1663  * @parse: the #GstBaseParse to set
1664  * @fps_num: frames per second (numerator).
1665  * @fps_den: frames per second (denominator).
1666  * @interval: duration update interval in frames.
1667  *
1668  * If frames per second is configured, parser can provide for default @convert
1669  * between GST_FORMAT_TIME and GST_FORMAT_BYTES, as well as buffer duration
1670  * and timestamping.  However, even if this frame information is provided,
1671  * subclass can still choose to provide for a @convert and set buffer metadata.
1672  * If #interval is non-zero (default), then stream duration is determined
1673  * based on frame and byte counts, and updated every #interval frames.
1674  */
1675 void
1676 gst_base_parse_set_frame_props (GstBaseParse * parse, guint fps_num,
1677     guint fps_den, gint interval)
1678 {
1679   g_return_if_fail (parse != NULL);
1680
1681   GST_BASE_PARSE_LOCK (parse);
1682   parse->priv->fps_num = fps_num;
1683   parse->priv->fps_den = fps_den;
1684   parse->priv->update_interval = interval;
1685   if (!fps_num || !fps_den) {
1686     GST_DEBUG_OBJECT (parse, "invalid fps (%d/%d), ignoring parameters",
1687         fps_num, fps_den);
1688     fps_num = fps_den = 0;
1689     interval = 0;
1690     parse->priv->frame_duration = GST_CLOCK_TIME_NONE;
1691   } else {
1692     parse->priv->frame_duration =
1693         gst_util_uint64_scale (GST_SECOND, parse->priv->fps_den,
1694         parse->priv->fps_num);
1695   }
1696   GST_LOG_OBJECT (parse, "set fps: %d/%d => duration: %" G_GINT64_FORMAT " ms",
1697       fps_num, fps_den, parse->priv->frame_duration / GST_MSECOND);
1698   GST_LOG_OBJECT (parse, "set update interval: %d", interval);
1699   GST_BASE_PARSE_UNLOCK (parse);
1700 }
1701
1702 /**
1703  * gst_base_transform_get_sync:
1704  * @parse: the #GstBaseParse to query
1705  *
1706  * Returns TRUE if parser is considered 'in sync'.  That is, frames have been
1707  * continuously successfully parsed and pushed.
1708  */
1709 gboolean
1710 gst_base_parse_get_sync (GstBaseParse * parse)
1711 {
1712   gboolean ret;
1713
1714   g_return_val_if_fail (parse != NULL, FALSE);
1715
1716   GST_BASE_PARSE_LOCK (parse);
1717   /* losing sync is pretty much a discont (and vice versa), no ? */
1718   ret = !parse->priv->discont;
1719   GST_BASE_PARSE_UNLOCK (parse);
1720
1721   GST_DEBUG_OBJECT (parse, "sync: %d", ret);
1722   return ret;
1723 }
1724
1725 /**
1726  * gst_base_transform_get_drain:
1727  * @parse: the #GstBaseParse to query
1728  *
1729  * Returns TRUE if parser is currently 'draining'.  That is, leftover data
1730  * (e.g. in FLUSH or EOS situation) is being parsed.
1731  */
1732 gboolean
1733 gst_base_parse_get_drain (GstBaseParse * parse)
1734 {
1735   gboolean ret;
1736
1737   g_return_val_if_fail (parse != NULL, FALSE);
1738
1739   GST_BASE_PARSE_LOCK (parse);
1740   /* losing sync is pretty much a discont (and vice versa), no ? */
1741   ret = parse->priv->drain;
1742   GST_BASE_PARSE_UNLOCK (parse);
1743
1744   GST_DEBUG_OBJECT (parse, "drain: %d", ret);
1745   return ret;
1746 }
1747
1748 /**
1749  * gst_base_parse_get_querytypes:
1750  * @pad: GstPad
1751  *
1752  * Returns: A table of #GstQueryType items describing supported query types.
1753  */
1754 static const GstQueryType *
1755 gst_base_parse_get_querytypes (GstPad * pad)
1756 {
1757   static const GstQueryType list[] = {
1758     GST_QUERY_POSITION,
1759     GST_QUERY_DURATION,
1760     GST_QUERY_FORMATS,
1761     GST_QUERY_SEEKING,
1762     GST_QUERY_CONVERT,
1763     0
1764   };
1765
1766   return list;
1767 }
1768
1769
1770 /**
1771  * gst_base_parse_query:
1772  * @pad: #GstPad.
1773  * @query: #GstQuery.
1774  *
1775  * Returns: TRUE on success.
1776  */
1777 static gboolean
1778 gst_base_parse_query (GstPad * pad, GstQuery * query)
1779 {
1780   GstBaseParse *parse;
1781   GstBaseParseClass *klass;
1782   gboolean res = FALSE;
1783
1784   parse = GST_BASE_PARSE (GST_PAD_PARENT (pad));
1785   klass = GST_BASE_PARSE_GET_CLASS (parse);
1786
1787   /* If subclass doesn't provide conversion function we can't reply
1788      to the query either */
1789   if (!klass->convert) {
1790     return FALSE;
1791   }
1792
1793   switch (GST_QUERY_TYPE (query)) {
1794     case GST_QUERY_POSITION:
1795     {
1796       gint64 dest_value;
1797       GstFormat format;
1798
1799       GST_DEBUG_OBJECT (parse, "position query");
1800
1801       gst_query_parse_position (query, &format, NULL);
1802
1803       g_mutex_lock (parse->parse_lock);
1804
1805       if (format == GST_FORMAT_BYTES) {
1806         dest_value = parse->priv->offset;
1807         res = TRUE;
1808       } else if (format == parse->segment.format &&
1809           GST_CLOCK_TIME_IS_VALID (parse->segment.last_stop)) {
1810         dest_value = parse->segment.last_stop;
1811         res = TRUE;
1812       } else {
1813         /* priv->offset is updated in both PUSH/PULL modes */
1814         res = klass->convert (parse, GST_FORMAT_BYTES, parse->priv->offset,
1815             format, &dest_value);
1816       }
1817       g_mutex_unlock (parse->parse_lock);
1818
1819       if (res)
1820         gst_query_set_position (query, format, dest_value);
1821       else
1822         res = gst_pad_query_default (pad, query);
1823
1824       break;
1825     }
1826     case GST_QUERY_DURATION:
1827     {
1828       GstFormat format;
1829       gint64 dest_value;
1830
1831       GST_DEBUG_OBJECT (parse, "duration query");
1832
1833       gst_query_parse_duration (query, &format, NULL);
1834
1835       g_mutex_lock (parse->parse_lock);
1836
1837       if (format == GST_FORMAT_BYTES) {
1838         res = gst_pad_query_peer_duration (parse->sinkpad, &format,
1839             &dest_value);
1840       } else if (parse->priv->duration != -1 &&
1841           format == parse->priv->duration_fmt) {
1842         dest_value = parse->priv->duration;
1843         res = TRUE;
1844       } else if (parse->priv->duration != -1) {
1845         res = klass->convert (parse, parse->priv->duration_fmt,
1846             parse->priv->duration, format, &dest_value);
1847       } else if (parse->priv->estimated_duration != -1) {
1848         dest_value = parse->priv->estimated_duration;
1849         res = TRUE;
1850       }
1851
1852       g_mutex_unlock (parse->parse_lock);
1853
1854       if (res)
1855         gst_query_set_duration (query, format, dest_value);
1856       else
1857         res = gst_pad_query_default (pad, query);
1858       break;
1859     }
1860     case GST_QUERY_SEEKING:
1861     {
1862       GstFormat fmt;
1863       gboolean seekable = FALSE;
1864
1865       GST_DEBUG_OBJECT (parse, "seeking query");
1866
1867       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
1868
1869       if (fmt != GST_FORMAT_TIME) {
1870         return gst_pad_query_default (pad, query);
1871       }
1872
1873       seekable = klass->is_seekable (parse);
1874
1875       /* TODO: could this duration be calculated/converted if subclass
1876          hasn't given it? */
1877       gst_query_set_seeking (query, GST_FORMAT_TIME, seekable, 0,
1878           (parse->priv->duration == -1) ?
1879           GST_CLOCK_TIME_NONE : parse->priv->duration);
1880
1881       GST_DEBUG_OBJECT (parse, "seekable: %d", seekable);
1882       res = TRUE;
1883       break;
1884     }
1885     case GST_QUERY_FORMATS:
1886       gst_query_set_formatsv (query, 3, fmtlist);
1887       res = TRUE;
1888       break;
1889
1890     case GST_QUERY_CONVERT:
1891     {
1892       GstFormat src_format, dest_format;
1893       gint64 src_value, dest_value;
1894
1895       gst_query_parse_convert (query, &src_format, &src_value,
1896           &dest_format, &dest_value);
1897
1898       /* FIXME: hm? doesn't make sense 
1899        * We require all those values to be given
1900        if (src_format && src_value && dest_format && dest_value ) { */
1901       res = klass->convert (parse, src_format, src_value,
1902           dest_format, &dest_value);
1903       if (res) {
1904         gst_query_set_convert (query, src_format, src_value,
1905             dest_format, dest_value);
1906       }
1907       /*} */
1908       break;
1909     }
1910     default:
1911       res = gst_pad_query_default (pad, query);
1912       break;
1913   }
1914   return res;
1915 }
1916
1917
1918 /**
1919  * gst_base_parse_handle_seek:
1920  * @parse: #GstBaseParse.
1921  * @event: #GstEvent.
1922  *
1923  * Returns: TRUE if seek succeeded.
1924  */
1925 static gboolean
1926 gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event)
1927 {
1928   GstBaseParseClass *klass;
1929   gdouble rate;
1930   GstFormat format;
1931   GstSeekFlags flags;
1932   GstSeekType cur_type = GST_SEEK_TYPE_NONE, stop_type;
1933   gboolean flush, update, res = TRUE;
1934   gint64 cur, stop, seekpos;
1935   GstSegment seeksegment = { 0, };
1936   GstFormat dstformat;
1937
1938   klass = GST_BASE_PARSE_GET_CLASS (parse);
1939
1940   gst_event_parse_seek (event, &rate, &format, &flags,
1941       &cur_type, &cur, &stop_type, &stop);
1942
1943   /* no negative rates yet */
1944   if (rate < 0.0)
1945     goto negative_rate;
1946
1947   if (cur_type != GST_SEEK_TYPE_SET)
1948     goto wrong_type;
1949
1950   /* For any format other than TIME, see if upstream handles
1951    * it directly or fail. For TIME, try upstream, but do it ourselves if
1952    * it fails upstream */
1953   if (format != GST_FORMAT_TIME) {
1954     return gst_pad_push_event (parse->sinkpad, event);
1955   } else {
1956     gst_event_ref (event);
1957     if (gst_pad_push_event (parse->sinkpad, event)) {
1958       gst_event_unref (event);
1959       return TRUE;
1960     }
1961   }
1962
1963   /* get flush flag */
1964   flush = flags & GST_SEEK_FLAG_FLUSH;
1965
1966   dstformat = GST_FORMAT_BYTES;
1967   if (!gst_pad_query_convert (parse->srcpad, format, cur, &dstformat, &seekpos)) {
1968     GST_DEBUG_OBJECT (parse, "conversion failed");
1969     return FALSE;
1970   }
1971
1972   GST_DEBUG_OBJECT (parse,
1973       "seek position %" G_GINT64_FORMAT " in bytes: %" G_GINT64_FORMAT, cur,
1974       seekpos);
1975
1976   if (parse->priv->pad_mode == GST_ACTIVATE_PULL) {
1977     gint64 last_stop;
1978
1979     GST_DEBUG_OBJECT (parse, "seek in PULL mode");
1980
1981     if (flush) {
1982       if (parse->srcpad) {
1983         GST_DEBUG_OBJECT (parse, "sending flush start");
1984         gst_pad_push_event (parse->srcpad, gst_event_new_flush_start ());
1985       }
1986     } else {
1987       gst_pad_pause_task (parse->sinkpad);
1988     }
1989
1990     /* we should now be able to grab the streaming thread because we stopped it
1991      * with the above flush/pause code */
1992     GST_PAD_STREAM_LOCK (parse->sinkpad);
1993
1994     /* save current position */
1995     last_stop = parse->segment.last_stop;
1996     GST_DEBUG_OBJECT (parse, "stopped streaming at %" G_GINT64_FORMAT,
1997         last_stop);
1998
1999     /* copy segment, we need this because we still need the old
2000      * segment when we close the current segment. */
2001     memcpy (&seeksegment, &parse->segment, sizeof (GstSegment));
2002
2003     GST_DEBUG_OBJECT (parse, "configuring seek");
2004     gst_segment_set_seek (&seeksegment, rate, format, flags,
2005         cur_type, cur, stop_type, stop, &update);
2006
2007     /* figure out the last position we need to play. If it's configured (stop !=
2008      * -1), use that, else we play until the total duration of the file */
2009     if ((stop = seeksegment.stop) == -1)
2010       stop = seeksegment.duration;
2011
2012     parse->priv->offset = seekpos;
2013
2014     /* prepare for streaming again */
2015     if (flush) {
2016       GST_DEBUG_OBJECT (parse, "sending flush stop");
2017       gst_pad_push_event (parse->srcpad, gst_event_new_flush_stop ());
2018     } else {
2019       if (parse->close_segment)
2020         gst_event_unref (parse->close_segment);
2021
2022       parse->close_segment = gst_event_new_new_segment (TRUE,
2023           parse->segment.rate, parse->segment.format,
2024           parse->segment.accum, parse->segment.last_stop, parse->segment.accum);
2025
2026       /* keep track of our last_stop */
2027       seeksegment.accum = parse->segment.last_stop;
2028
2029       GST_DEBUG_OBJECT (parse, "Created close seg format %d, "
2030           "start = %" GST_TIME_FORMAT ", stop = %" GST_TIME_FORMAT
2031           ", pos = %" GST_TIME_FORMAT, format,
2032           GST_TIME_ARGS (parse->segment.accum),
2033           GST_TIME_ARGS (parse->segment.last_stop),
2034           GST_TIME_ARGS (parse->segment.accum));
2035     }
2036
2037     memcpy (&parse->segment, &seeksegment, sizeof (GstSegment));
2038
2039     /* store the newsegment event so it can be sent from the streaming thread. */
2040     if (parse->pending_segment)
2041       gst_event_unref (parse->pending_segment);
2042
2043     /* This will be sent later in _loop() */
2044     parse->pending_segment =
2045         gst_event_new_new_segment (FALSE, parse->segment.rate,
2046         parse->segment.format,
2047         parse->segment.last_stop, stop, parse->segment.last_stop);
2048
2049     GST_DEBUG_OBJECT (parse, "Created newseg format %d, "
2050         "start = %" GST_TIME_FORMAT ", stop = %" GST_TIME_FORMAT
2051         ", pos = %" GST_TIME_FORMAT, format,
2052         GST_TIME_ARGS (parse->segment.last_stop),
2053         GST_TIME_ARGS (stop), GST_TIME_ARGS (parse->segment.last_stop));
2054
2055     /* mark discont if we are going to stream from another position. */
2056     if (last_stop != parse->segment.last_stop) {
2057       GST_DEBUG_OBJECT (parse,
2058           "mark DISCONT, we did a seek to another position");
2059       parse->priv->discont = TRUE;
2060       parse->priv->next_ts = parse->segment.last_stop;
2061     }
2062
2063     /* Start streaming thread if paused */
2064     gst_pad_start_task (parse->sinkpad,
2065         (GstTaskFunction) gst_base_parse_loop, parse->sinkpad);
2066
2067     GST_PAD_STREAM_UNLOCK (parse->sinkpad);
2068   } else {
2069     GstEvent *new_event;
2070     /* The only thing we need to do in PUSH-mode is to send the
2071        seek event (in bytes) to upstream. Segment / flush handling happens
2072        in corresponding src event handlers */
2073     GST_DEBUG_OBJECT (parse, "seek in PUSH mode");
2074     new_event = gst_event_new_seek (rate, GST_FORMAT_BYTES, flush,
2075         GST_SEEK_TYPE_SET, seekpos, stop_type, stop);
2076
2077     res = gst_pad_push_event (parse->sinkpad, new_event);
2078   }
2079
2080 done:
2081   return res;
2082
2083   /* ERRORS */
2084 negative_rate:
2085   {
2086     GST_DEBUG_OBJECT (parse, "negative playback rates are not supported yet.");
2087     res = FALSE;
2088     goto done;
2089   }
2090 wrong_type:
2091   {
2092     GST_DEBUG_OBJECT (parse, "unsupported seek type.");
2093     res = FALSE;
2094     goto done;
2095   }
2096 }
2097
2098
2099 /**
2100  * gst_base_parse_sink_setcaps:
2101  * @pad: #GstPad.
2102  * @caps: #GstCaps.
2103  *
2104  * Returns: TRUE if caps were accepted.
2105  */
2106 static gboolean
2107 gst_base_parse_sink_setcaps (GstPad * pad, GstCaps * caps)
2108 {
2109   GstBaseParse *parse;
2110   GstBaseParseClass *klass;
2111   gboolean res = TRUE;
2112
2113   parse = GST_BASE_PARSE (GST_PAD_PARENT (pad));
2114   klass = GST_BASE_PARSE_GET_CLASS (parse);
2115
2116   GST_DEBUG_OBJECT (parse, "caps: %" GST_PTR_FORMAT, caps);
2117
2118   if (klass->set_sink_caps)
2119     res = klass->set_sink_caps (parse, caps);
2120
2121   parse->negotiated = res;
2122   return res && gst_pad_set_caps (pad, caps);
2123 }