baseparse: streamline query handling
[platform/upstream/gstreamer.git] / gst / audioparsers / gstbaseparse.c
1 /* GStreamer
2  * Copyright (C) 2008 Nokia Corporation. All rights reserved.
3  *   Contact: Stefan Kost <stefan.kost@nokia.com>
4  * Copyright (C) 2008 Sebastian Dröge <sebastian.droege@collabora.co.uk>.
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbaseparse
24  * @short_description: Base class for stream parsers
25  * @see_also: #GstBaseTransform
26  *
27  * This base class is for parser elements that process data and splits it 
28  * into separate audio/video/whatever frames.
29  *
30  * It provides for:
31  * <itemizedlist>
32  *   <listitem><para>One sinkpad and one srcpad</para></listitem>
33  *   <listitem><para>Handles state changes</para></listitem>
34  *   <listitem><para>Does flushing</para></listitem>
35  *   <listitem><para>Push mode</para></listitem>
36  *   <listitem><para>Pull mode</para></listitem>
37  *   <listitem><para>Handles events (NEWSEGMENT/EOS/FLUSH)</para></listitem>
38  *   <listitem><para>Handles seeking in both modes</para></listitem>
39  *   <listitem><para>
40  *        Handles POSITION/DURATION/SEEKING/FORMAT/CONVERT queries
41  *   </para></listitem>
42  * </itemizedlist>
43  *
44  * The purpose of this base class is to provide a basic functionality of
45  * a parser and share a lot of rather complex code.
46  *
47  * Description of the parsing mechanism:
48  * <orderedlist>
49  * <listitem>
50  *   <itemizedlist><title>Set-up phase</title>
51  *   <listitem><para>
52  *     GstBaseParse class calls @set_sink_caps to inform the subclass about
53  *     incoming sinkpad caps. Subclass should set the srcpad caps accordingly.
54  *   </para></listitem>
55  *   <listitem><para>
56  *     GstBaseParse calls @start to inform subclass that data processing is
57  *     about to start now.
58  *   </para></listitem>
59  *   <listitem><para>
60  *      At least in this point subclass needs to tell the GstBaseParse class
61  *      how big data chunks it wants to receive (min_frame_size). It can do 
62  *      this with @gst_base_parse_set_min_frame_size.
63  *   </para></listitem>
64  *   <listitem><para>
65  *      GstBaseParse class sets up appropriate data passing mode (pull/push)
66  *      and starts to process the data.
67  *   </para></listitem>
68  *   </itemizedlist>
69  * </listitem>
70  * <listitem>
71  *   <itemizedlist>
72  *   <title>Parsing phase</title>
73  *     <listitem><para>
74  *       GstBaseParse gathers at least min_frame_size bytes of data either 
75  *       by pulling it from upstream or collecting buffers into internal
76  *       #GstAdapter.
77  *     </para></listitem>
78  *     <listitem><para>
79  *       A buffer of min_frame_size bytes is passed to subclass with
80  *       @check_valid_frame. Subclass checks the contents and returns TRUE
81  *       if the buffer contains a valid frame. It also needs to set the
82  *       @framesize according to the detected frame size. If buffer didn't
83  *       contain a valid frame, this call must return FALSE and optionally
84  *       set the @skipsize value to inform base class that how many bytes
85  *       it needs to skip in order to find a valid frame. The passed buffer
86  *       is read-only.  Note that @check_valid_frame might receive any small
87  *       amount of input data when leftover data is being drained (e.g. at EOS).
88  *     </para></listitem>
89  *     <listitem><para>
90  *       After valid frame is found, it will be passed again to subclass with
91  *       @parse_frame call. Now subclass is responsible for parsing the
92  *       frame contents and setting the buffer timestamp, duration and caps.
93  *     </para></listitem>
94  *     <listitem><para>
95  *       Finally the buffer can be pushed downstream and parsing loop starts
96  *       over again.
97  *     </para></listitem>
98  *     <listitem><para>
99  *       During the parsing process GstBaseParseClass will handle both srcpad and
100  *       sinkpad events. They will be passed to subclass if @event or
101  *       @src_event callbacks have been provided.
102  *     </para></listitem>
103  *   </itemizedlist>
104  * </listitem>
105  * <listitem>
106  *   <itemizedlist><title>Shutdown phase</title>
107  *   <listitem><para>
108  *     GstBaseParse class calls @stop to inform the subclass that data
109  *     parsing will be stopped.
110  *   </para></listitem>
111  *   </itemizedlist>
112  * </listitem>
113  * </orderedlist>
114  *
115  * Subclass is responsible for providing pad template caps for
116  * source and sink pads. The pads need to be named "sink" and "src". It also 
117  * needs to set the fixed caps on srcpad, when the format is ensured (e.g. 
118  * when base class calls subclass' @set_sink_caps function).
119  *
120  * This base class uses GST_FORMAT_DEFAULT as a meaning of frames. So,
121  * subclass conversion routine needs to know that conversion from
122  * GST_FORMAT_TIME to GST_FORMAT_DEFAULT must return the
123  * frame number that can be found from the given byte position.
124  *
125  * GstBaseParse uses subclasses conversion methods also for seeking. If
126  * subclass doesn't provide @convert function, seeking will get disabled.
127  *
128  * Subclass @start and @stop functions will be called to inform the beginning
129  * and end of data processing.
130  *
131  * Things that subclass need to take care of:
132  * <itemizedlist>
133  *   <listitem><para>Provide pad templates</para></listitem>
134  *   <listitem><para>
135  *      Fixate the source pad caps when appropriate
136  *   </para></listitem>
137  *   <listitem><para>
138  *      Inform base class how big data chunks should be retrieved. This is
139  *      done with @gst_base_parse_set_min_frame_size function.
140  *   </para></listitem>
141  *   <listitem><para>
142  *      Examine data chunks passed to subclass with @check_valid_frame
143  *      and tell if they contain a valid frame
144  *   </para></listitem>
145  *   <listitem><para>
146  *      Set the caps and timestamp to frame that is passed to subclass with
147  *      @parse_frame function.
148  *   </para></listitem>
149  *   <listitem><para>Provide conversion functions</para></listitem>
150  *   <listitem><para>
151  *      Update the duration information with @gst_base_parse_set_duration
152  *   </para></listitem>
153  *   <listitem><para>
154  *      Alternatively, parsing (or specs) might yield a frames per seconds rate
155  *      which can be provided to GstBaseParse to enable it to cater for
156  *      buffer time metadata (which will be taken from upstream as much as possible).
157  *      Internally keeping track of frames and respective
158  *      sizes that have been pushed provides GstBaseParse with a bytes per frame
159  *      rate.  A default @convert (used if not overriden) will then use these
160  *      rates to perform obvious conversions.  These rates are also used to update
161  *      (estimated) duration at regular frame intervals.
162  *      If no (fixed) frames per second rate applies, default conversion will be
163  *      based on (estimated) bytes per second (but no default buffer metadata
164  *      can be provided in this case).
165  *   </para></listitem>
166  * </itemizedlist>
167  *
168  */
169
170 /* TODO:
171  *  - Better segment handling:
172  *    - NEWSEGMENT for gaps
173  *    - Not NEWSEGMENT starting at 0 but at first frame timestamp
174  *  - GstIndex support
175  *  - Seek table generation and subclass seek entry injection
176  *  - Accurate seeking
177  *  - In push mode provide a queue of adapter-"queued" buffers for upstream
178  *    buffer metadata
179  *  - Queue buffers/events until caps are set
180  *  - Let subclass decide if frames outside the segment should be dropped
181  *  - Send queries upstream
182  */
183
184 #ifdef HAVE_CONFIG_H
185 #  include "config.h"
186 #endif
187
188 #include <stdlib.h>
189 #include <string.h>
190
191 #include "gstbaseparse.h"
192
193 #define MIN_FRAMES_TO_POST_BITRATE 10
194
195 GST_DEBUG_CATEGORY_STATIC (gst_base_parse_debug);
196 #define GST_CAT_DEFAULT gst_base_parse_debug
197
198 /* Supported formats */
199 static GstFormat fmtlist[] = {
200   GST_FORMAT_DEFAULT,
201   GST_FORMAT_BYTES,
202   GST_FORMAT_TIME,
203   0
204 };
205
206 #define GST_BASE_PARSE_GET_PRIVATE(obj)  \
207     (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_PARSE, GstBaseParsePrivate))
208
209 struct _GstBaseParsePrivate
210 {
211   GstActivateMode pad_mode;
212
213   gint64 duration;
214   GstFormat duration_fmt;
215   gint64 estimated_duration;
216
217   guint min_frame_size;
218   gboolean passthrough;
219   guint fps_num, fps_den;
220   guint update_interval;
221
222   gboolean discont;
223   gboolean flushing;
224   gboolean drain;
225
226   gint64 offset;
227   GstClockTime next_ts;
228   GstClockTime prev_ts;
229   GstClockTime frame_duration;
230
231   guint64 framecount;
232   guint64 bytecount;
233   guint64 data_bytecount;
234   guint64 acc_duration;
235
236   gboolean post_min_bitrate;
237   gboolean post_avg_bitrate;
238   gboolean post_max_bitrate;
239   guint min_bitrate;
240   guint avg_bitrate;
241   guint max_bitrate;
242
243   GList *pending_events;
244
245   GstBuffer *cache;
246 };
247
248 static GstElementClass *parent_class = NULL;
249
250 static void gst_base_parse_class_init (GstBaseParseClass * klass);
251 static void gst_base_parse_init (GstBaseParse * parse,
252     GstBaseParseClass * klass);
253
254 GType
255 gst_base_parse_get_type (void)
256 {
257   static GType base_parse_type = 0;
258
259   if (!base_parse_type) {
260     static const GTypeInfo base_parse_info = {
261       sizeof (GstBaseParseClass),
262       (GBaseInitFunc) NULL,
263       (GBaseFinalizeFunc) NULL,
264       (GClassInitFunc) gst_base_parse_class_init,
265       NULL,
266       NULL,
267       sizeof (GstBaseParse),
268       0,
269       (GInstanceInitFunc) gst_base_parse_init,
270     };
271
272     base_parse_type = g_type_register_static (GST_TYPE_ELEMENT,
273         "GstAudioBaseParseBad", &base_parse_info, G_TYPE_FLAG_ABSTRACT);
274   }
275   return base_parse_type;
276 }
277
278 static void gst_base_parse_finalize (GObject * object);
279
280 static gboolean gst_base_parse_sink_activate (GstPad * sinkpad);
281 static gboolean gst_base_parse_sink_activate_push (GstPad * pad,
282     gboolean active);
283 static gboolean gst_base_parse_sink_activate_pull (GstPad * pad,
284     gboolean active);
285 static gboolean gst_base_parse_handle_seek (GstBaseParse * parse,
286     GstEvent * event);
287 static void gst_base_parse_handle_tag (GstBaseParse * parse, GstEvent * event);
288
289 static gboolean gst_base_parse_src_event (GstPad * pad, GstEvent * event);
290 static gboolean gst_base_parse_sink_event (GstPad * pad, GstEvent * event);
291 static gboolean gst_base_parse_query (GstPad * pad, GstQuery * query);
292 static gboolean gst_base_parse_sink_setcaps (GstPad * pad, GstCaps * caps);
293 static const GstQueryType *gst_base_parse_get_querytypes (GstPad * pad);
294
295 static GstFlowReturn gst_base_parse_chain (GstPad * pad, GstBuffer * buffer);
296 static void gst_base_parse_loop (GstPad * pad);
297
298 static gboolean gst_base_parse_check_frame (GstBaseParse * parse,
299     GstBuffer * buffer, guint * framesize, gint * skipsize);
300
301 static GstFlowReturn gst_base_parse_parse_frame (GstBaseParse * parse,
302     GstBuffer * buffer);
303
304 static gboolean gst_base_parse_sink_eventfunc (GstBaseParse * parse,
305     GstEvent * event);
306
307 static gboolean gst_base_parse_src_eventfunc (GstBaseParse * parse,
308     GstEvent * event);
309
310 static gboolean gst_base_parse_is_seekable (GstBaseParse * parse);
311
312 gboolean gst_base_parse_convert (GstBaseParse * parse, GstFormat src_format,
313     gint64 src_value, GstFormat dest_format, gint64 * dest_value);
314
315 static void gst_base_parse_drain (GstBaseParse * parse);
316
317 static void gst_base_parse_post_bitrates (GstBaseParse * parse,
318     gboolean post_min, gboolean post_avg, gboolean post_max);
319
320 static void
321 gst_base_parse_finalize (GObject * object)
322 {
323   GstBaseParse *parse = GST_BASE_PARSE (object);
324   GstEvent **p_ev;
325
326   g_mutex_free (parse->parse_lock);
327   g_object_unref (parse->adapter);
328
329   if (parse->pending_segment) {
330     p_ev = &parse->pending_segment;
331     gst_event_replace (p_ev, NULL);
332   }
333   if (parse->close_segment) {
334     p_ev = &parse->close_segment;
335     gst_event_replace (p_ev, NULL);
336   }
337
338   if (parse->priv->cache) {
339     gst_buffer_unref (parse->priv->cache);
340     parse->priv->cache = NULL;
341   }
342
343   g_list_foreach (parse->priv->pending_events, (GFunc) gst_mini_object_unref,
344       NULL);
345   g_list_free (parse->priv->pending_events);
346   parse->priv->pending_events = NULL;
347
348   G_OBJECT_CLASS (parent_class)->finalize (object);
349 }
350
351 static void
352 gst_base_parse_class_init (GstBaseParseClass * klass)
353 {
354   GObjectClass *gobject_class;
355
356   gobject_class = G_OBJECT_CLASS (klass);
357   g_type_class_add_private (klass, sizeof (GstBaseParsePrivate));
358   parent_class = g_type_class_peek_parent (klass);
359   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_parse_finalize);
360
361   /* Default handlers */
362   klass->check_valid_frame = gst_base_parse_check_frame;
363   klass->parse_frame = gst_base_parse_parse_frame;
364   klass->src_event = gst_base_parse_src_eventfunc;
365   klass->is_seekable = gst_base_parse_is_seekable;
366   klass->convert = gst_base_parse_convert;
367
368   GST_DEBUG_CATEGORY_INIT (gst_base_parse_debug, "baseparse", 0,
369       "baseparse element");
370 }
371
372 static void
373 gst_base_parse_init (GstBaseParse * parse, GstBaseParseClass * bclass)
374 {
375   GstPadTemplate *pad_template;
376
377   GST_DEBUG_OBJECT (parse, "gst_base_parse_init");
378
379   parse->priv = GST_BASE_PARSE_GET_PRIVATE (parse);
380
381   pad_template =
382       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "sink");
383   g_return_if_fail (pad_template != NULL);
384   parse->sinkpad = gst_pad_new_from_template (pad_template, "sink");
385   gst_pad_set_event_function (parse->sinkpad,
386       GST_DEBUG_FUNCPTR (gst_base_parse_sink_event));
387   gst_pad_set_setcaps_function (parse->sinkpad,
388       GST_DEBUG_FUNCPTR (gst_base_parse_sink_setcaps));
389   gst_pad_set_chain_function (parse->sinkpad,
390       GST_DEBUG_FUNCPTR (gst_base_parse_chain));
391   gst_pad_set_activate_function (parse->sinkpad,
392       GST_DEBUG_FUNCPTR (gst_base_parse_sink_activate));
393   gst_pad_set_activatepush_function (parse->sinkpad,
394       GST_DEBUG_FUNCPTR (gst_base_parse_sink_activate_push));
395   gst_pad_set_activatepull_function (parse->sinkpad,
396       GST_DEBUG_FUNCPTR (gst_base_parse_sink_activate_pull));
397   gst_element_add_pad (GST_ELEMENT (parse), parse->sinkpad);
398
399   GST_DEBUG_OBJECT (parse, "sinkpad created");
400
401   pad_template =
402       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
403   g_return_if_fail (pad_template != NULL);
404   parse->srcpad = gst_pad_new_from_template (pad_template, "src");
405   gst_pad_set_event_function (parse->srcpad,
406       GST_DEBUG_FUNCPTR (gst_base_parse_src_event));
407   gst_pad_set_query_type_function (parse->srcpad,
408       GST_DEBUG_FUNCPTR (gst_base_parse_get_querytypes));
409   gst_pad_set_query_function (parse->srcpad,
410       GST_DEBUG_FUNCPTR (gst_base_parse_query));
411   gst_pad_use_fixed_caps (parse->srcpad);
412   gst_element_add_pad (GST_ELEMENT (parse), parse->srcpad);
413   GST_DEBUG_OBJECT (parse, "src created");
414
415   parse->parse_lock = g_mutex_new ();
416   parse->adapter = gst_adapter_new ();
417   parse->pending_segment = NULL;
418   parse->close_segment = NULL;
419
420   parse->priv->pad_mode = GST_ACTIVATE_NONE;
421   parse->priv->duration = -1;
422   parse->priv->min_frame_size = 1;
423   parse->priv->passthrough = FALSE;
424   parse->priv->discont = FALSE;
425   parse->priv->flushing = FALSE;
426   parse->priv->offset = 0;
427   GST_DEBUG_OBJECT (parse, "init ok");
428 }
429
430
431
432 /**
433  * gst_base_parse_check_frame:
434  * @parse: #GstBaseParse.
435  * @buffer: GstBuffer.
436  * @framesize: This will be set to tell the found frame size in bytes.
437  * @skipsize: Output parameter that tells how much data needs to be skipped
438  *            in order to find the following frame header.
439  *
440  * Default callback for check_valid_frame.
441  * 
442  * Returns: Always TRUE.
443  */
444 static gboolean
445 gst_base_parse_check_frame (GstBaseParse * parse,
446     GstBuffer * buffer, guint * framesize, gint * skipsize)
447 {
448   *framesize = GST_BUFFER_SIZE (buffer);
449   *skipsize = 0;
450   return TRUE;
451 }
452
453
454 /**
455  * gst_base_parse_parse_frame:
456  * @parse: #GstBaseParse.
457  * @buffer: #GstBuffer.
458  *
459  * Default callback for parse_frame.
460  */
461 static GstFlowReturn
462 gst_base_parse_parse_frame (GstBaseParse * parse, GstBuffer * buffer)
463 {
464   if (!GST_BUFFER_TIMESTAMP_IS_VALID (buffer) &&
465       GST_CLOCK_TIME_IS_VALID (parse->priv->next_ts)) {
466     GST_BUFFER_TIMESTAMP (buffer) = parse->priv->next_ts;
467   }
468   if (!GST_BUFFER_DURATION_IS_VALID (buffer) &&
469       GST_CLOCK_TIME_IS_VALID (parse->priv->frame_duration)) {
470     GST_BUFFER_DURATION (buffer) = parse->priv->frame_duration;
471   }
472   return GST_FLOW_OK;
473 }
474
475
476 /**
477  * gst_base_parse_bytepos_to_time:
478  * @parse: #GstBaseParse.
479  * @bytepos: Position (in bytes) to be converted.
480  * @pos_in_time: #GstClockTime pointer where the result is set.
481  *
482  * Convert given byte position into #GstClockTime format.
483  * 
484  * Returns: TRUE if conversion succeeded.
485  */
486 static gboolean
487 gst_base_parse_bytepos_to_time (GstBaseParse * parse, gint64 bytepos,
488     GstClockTime * pos_in_time)
489 {
490   GstBaseParseClass *klass;
491   gboolean res = FALSE;
492
493   klass = GST_BASE_PARSE_GET_CLASS (parse);
494
495   if (klass->convert) {
496     res = klass->convert (parse, GST_FORMAT_BYTES, bytepos,
497         GST_FORMAT_TIME, (gint64 *) pos_in_time);
498   }
499   return res;
500 }
501
502
503 /**
504  * gst_base_parse_sink_event:
505  * @pad: #GstPad that received the event.
506  * @event: #GstEvent to be handled.
507  *
508  * Handler for sink pad events.
509  *
510  * Returns: TRUE if the event was handled.
511  */
512 static gboolean
513 gst_base_parse_sink_event (GstPad * pad, GstEvent * event)
514 {
515   GstBaseParse *parse;
516   GstBaseParseClass *bclass;
517   gboolean handled = FALSE;
518   gboolean ret = TRUE;
519
520
521   parse = GST_BASE_PARSE (gst_pad_get_parent (pad));
522   bclass = GST_BASE_PARSE_GET_CLASS (parse);
523
524   GST_DEBUG_OBJECT (parse, "handling event %d", GST_EVENT_TYPE (event));
525
526   /* Cache all events except EOS, NEWSEGMENT and FLUSH_STOP if we have a
527    * pending segment */
528   if (parse->pending_segment && GST_EVENT_TYPE (event) != GST_EVENT_EOS
529       && GST_EVENT_TYPE (event) != GST_EVENT_NEWSEGMENT
530       && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_START
531       && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
532
533     if (GST_EVENT_TYPE (event) == GST_EVENT_TAG)
534       /* See if any bitrate tags were posted */
535       gst_base_parse_handle_tag (parse, event);
536
537     parse->priv->pending_events =
538         g_list_append (parse->priv->pending_events, event);
539     ret = TRUE;
540   } else {
541
542     if (GST_EVENT_TYPE (event) == GST_EVENT_EOS &&
543         parse->priv->framecount < MIN_FRAMES_TO_POST_BITRATE)
544       /* We've not posted bitrate tags yet - do so now */
545       gst_base_parse_post_bitrates (parse, TRUE, TRUE, TRUE);
546
547     if (bclass->event)
548       handled = bclass->event (parse, event);
549
550     if (!handled)
551       handled = gst_base_parse_sink_eventfunc (parse, event);
552
553     if (!handled)
554       ret = gst_pad_event_default (pad, event);
555   }
556
557   gst_object_unref (parse);
558   GST_DEBUG_OBJECT (parse, "event handled");
559   return ret;
560 }
561
562
563 /**
564  * gst_base_parse_sink_eventfunc:
565  * @parse: #GstBaseParse.
566  * @event: #GstEvent to be handled.
567  *
568  * Element-level event handler function.
569  *
570  * Returns: TRUE if the event was handled and not need forwarding.
571  */
572 static gboolean
573 gst_base_parse_sink_eventfunc (GstBaseParse * parse, GstEvent * event)
574 {
575   gboolean handled = FALSE;
576   GstEvent **eventp;
577
578   switch (GST_EVENT_TYPE (event)) {
579     case GST_EVENT_NEWSEGMENT:
580     {
581       gdouble rate, applied_rate;
582       GstFormat format;
583       gint64 start, stop, pos, offset = 0;
584       gboolean update;
585
586       gst_event_parse_new_segment_full (event, &update, &rate, &applied_rate,
587           &format, &start, &stop, &pos);
588
589
590       if (format == GST_FORMAT_BYTES) {
591         GstClockTime seg_start, seg_stop, seg_pos;
592
593         /* stop time is allowed to be open-ended, but not start & pos */
594         seg_stop = GST_CLOCK_TIME_NONE;
595         offset = pos;
596
597         if (gst_base_parse_bytepos_to_time (parse, start, &seg_start) &&
598             gst_base_parse_bytepos_to_time (parse, pos, &seg_pos)) {
599           gst_event_unref (event);
600           event = gst_event_new_new_segment_full (update, rate, applied_rate,
601               GST_FORMAT_TIME, seg_start, seg_stop, seg_pos);
602           format = GST_FORMAT_TIME;
603           GST_DEBUG_OBJECT (parse, "Converted incoming segment to TIME. "
604               "start = %" GST_TIME_FORMAT ", stop = %" GST_TIME_FORMAT
605               ", pos = %" GST_TIME_FORMAT, GST_TIME_ARGS (seg_start),
606               GST_TIME_ARGS (seg_stop), GST_TIME_ARGS (seg_pos));
607         }
608       }
609
610       if (format != GST_FORMAT_TIME) {
611         /* Unknown incoming segment format. Output a default open-ended 
612          * TIME segment */
613         gst_event_unref (event);
614         event = gst_event_new_new_segment_full (update, rate, applied_rate,
615             GST_FORMAT_TIME, 0, GST_CLOCK_TIME_NONE, 0);
616       }
617
618       gst_event_parse_new_segment_full (event, &update, &rate, &applied_rate,
619           &format, &start, &stop, &pos);
620
621       gst_segment_set_newsegment_full (&parse->segment, update, rate,
622           applied_rate, format, start, stop, pos);
623
624       GST_DEBUG_OBJECT (parse, "Created newseg rate %g, applied rate %g, "
625           "format %d, start = %" GST_TIME_FORMAT ", stop = %" GST_TIME_FORMAT
626           ", pos = %" GST_TIME_FORMAT, rate, applied_rate, format,
627           GST_TIME_ARGS (start), GST_TIME_ARGS (stop), GST_TIME_ARGS (pos));
628
629       /* save the segment for later, right before we push a new buffer so that
630        * the caps are fixed and the next linked element can receive
631        * the segment. */
632       eventp = &parse->pending_segment;
633       gst_event_replace (eventp, event);
634       gst_event_unref (event);
635       handled = TRUE;
636
637       /* but finish the current segment */
638       GST_DEBUG_OBJECT (parse, "draining current segment");
639       gst_base_parse_drain (parse);
640       gst_adapter_clear (parse->adapter);
641       parse->priv->offset = offset;
642       parse->priv->next_ts = start;
643       break;
644     }
645
646     case GST_EVENT_FLUSH_START:
647       parse->priv->flushing = TRUE;
648       handled = gst_pad_push_event (parse->srcpad, event);
649       /* Wait for _chain() to exit by taking the srcpad STREAM_LOCK */
650       GST_PAD_STREAM_LOCK (parse->srcpad);
651       GST_PAD_STREAM_UNLOCK (parse->srcpad);
652
653       break;
654
655     case GST_EVENT_FLUSH_STOP:
656       gst_adapter_clear (parse->adapter);
657       parse->priv->flushing = FALSE;
658       parse->priv->discont = TRUE;
659       break;
660
661     case GST_EVENT_EOS:
662       gst_base_parse_drain (parse);
663       break;
664
665     default:
666       break;
667   }
668
669   return handled;
670 }
671
672
673 /**
674  * gst_base_parse_src_event:
675  * @pad: #GstPad that received the event.
676  * @event: #GstEvent that was received.
677  *
678  * Handler for source pad events.
679  *
680  * Returns: TRUE if the event was handled.
681  */
682 static gboolean
683 gst_base_parse_src_event (GstPad * pad, GstEvent * event)
684 {
685   GstBaseParse *parse;
686   GstBaseParseClass *bclass;
687   gboolean handled = FALSE;
688   gboolean ret = TRUE;
689
690   parse = GST_BASE_PARSE (gst_pad_get_parent (pad));
691   bclass = GST_BASE_PARSE_GET_CLASS (parse);
692
693   GST_DEBUG_OBJECT (parse, "event %d, %s", GST_EVENT_TYPE (event),
694       GST_EVENT_TYPE_NAME (event));
695
696   if (bclass->src_event)
697     handled = bclass->src_event (parse, event);
698
699   if (!handled)
700     ret = gst_pad_event_default (pad, event);
701   else
702     gst_event_unref (event);
703
704   gst_object_unref (parse);
705   return ret;
706 }
707
708
709 /**
710  * gst_base_parse_src_eventfunc:
711  * @parse: #GstBaseParse.
712  * @event: #GstEvent that was received.
713  *
714  * Default srcpad event handler.
715  *
716  * Returns: TRUE if the event was handled and can be dropped.
717  */
718 static gboolean
719 gst_base_parse_src_eventfunc (GstBaseParse * parse, GstEvent * event)
720 {
721   gboolean handled = FALSE;
722   GstBaseParseClass *bclass;
723
724   bclass = GST_BASE_PARSE_GET_CLASS (parse);
725
726   switch (GST_EVENT_TYPE (event)) {
727     case GST_EVENT_SEEK:
728     {
729       if (bclass->is_seekable (parse)) {
730         handled = gst_base_parse_handle_seek (parse, event);
731       }
732       break;
733     }
734     default:
735       break;
736   }
737   return handled;
738 }
739
740
741 /**
742  * gst_base_parse_is_seekable:
743  * @parse: #GstBaseParse.
744  *
745  * Default handler for is_seekable.
746  *
747  * Returns: Always TRUE.
748  */
749 static gboolean
750 gst_base_parse_is_seekable (GstBaseParse * parse)
751 {
752   return TRUE;
753 }
754
755 /**
756  * gst_base_parse_convert:
757  * @parse: #GstBaseParse.
758  * @src_format: #GstFormat describing the source format.
759  * @src_value: Source value to be converted.
760  * @dest_format: #GstFormat defining the converted format.
761  * @dest_value: Pointer where the conversion result will be put.
762  *
763  * Implementation of "convert" vmethod in #GstBaseParse class.
764  *
765  * Returns: TRUE if conversion was successful.
766  */
767 gboolean
768 gst_base_parse_convert (GstBaseParse * parse,
769     GstFormat src_format,
770     gint64 src_value, GstFormat dest_format, gint64 * dest_value)
771 {
772   gboolean ret = FALSE;
773   guint64 bytes, duration;
774
775   if (G_UNLIKELY (src_format == dest_format)) {
776     *dest_value = src_value;
777     return TRUE;
778   }
779
780   if (G_UNLIKELY (src_value == -1)) {
781     *dest_value = -1;
782     return TRUE;
783   }
784
785   /* need at least some frames */
786   if (!parse->priv->framecount)
787     return FALSE;
788
789   /* either frame info (having num means den also ok) or use average bitrate */
790   if (parse->priv->fps_num) {
791     duration = parse->priv->framecount * parse->priv->fps_den * 1000;
792     bytes = parse->priv->bytecount * parse->priv->fps_num;
793   } else {
794     duration = parse->priv->acc_duration / GST_MSECOND;
795     bytes = parse->priv->bytecount;
796   }
797
798   if (G_UNLIKELY (!duration || !bytes))
799     return FALSE;
800
801   if (src_format == GST_FORMAT_BYTES) {
802     if (dest_format == GST_FORMAT_TIME) {
803       /* BYTES -> TIME conversion */
804       GST_DEBUG_OBJECT (parse, "converting bytes -> time");
805       *dest_value = gst_util_uint64_scale (src_value, duration, bytes);
806       *dest_value *= GST_MSECOND;
807       GST_DEBUG_OBJECT (parse, "conversion result: %" G_GINT64_FORMAT " ms",
808           *dest_value / GST_MSECOND);
809       ret = TRUE;
810     }
811   } else if (src_format == GST_FORMAT_TIME) {
812     if (dest_format == GST_FORMAT_BYTES) {
813       GST_DEBUG_OBJECT (parse, "converting time -> bytes");
814       *dest_value = gst_util_uint64_scale (src_value / GST_MSECOND, bytes,
815           duration);
816       GST_DEBUG_OBJECT (parse,
817           "time %" G_GINT64_FORMAT " ms in bytes = %" G_GINT64_FORMAT,
818           src_value / GST_MSECOND, *dest_value);
819       ret = TRUE;
820     }
821   } else if (src_format == GST_FORMAT_DEFAULT) {
822     /* DEFAULT == frame-based */
823     if (dest_format == GST_FORMAT_TIME) {
824       if (parse->priv->fps_den) {
825         *dest_value = gst_util_uint64_scale (src_value,
826             GST_SECOND * parse->priv->fps_den, parse->priv->fps_num);
827         ret = TRUE;
828       }
829     } else if (dest_format == GST_FORMAT_BYTES) {
830     }
831   }
832
833   return ret;
834 }
835
836 /**
837  * gst_base_parse_update_duration:
838  * @parse: #GstBaseParse.
839  *
840  */
841 static void
842 gst_base_parse_update_duration (GstBaseParse * aacparse)
843 {
844   GstPad *peer;
845   GstBaseParse *parse;
846   GstBaseParseClass *klass;
847
848   parse = GST_BASE_PARSE (aacparse);
849   klass = GST_BASE_PARSE_GET_CLASS (parse);
850
851   /* must be able to convert */
852   if (!klass->convert)
853     return;
854
855   peer = gst_pad_get_peer (parse->sinkpad);
856   if (peer) {
857     GstFormat pformat = GST_FORMAT_BYTES;
858     gboolean qres = FALSE;
859     gint64 ptot, dest_value;
860
861     qres = gst_pad_query_duration (peer, &pformat, &ptot);
862     gst_object_unref (GST_OBJECT (peer));
863     if (qres) {
864       if (klass->convert (parse, pformat, ptot, GST_FORMAT_TIME, &dest_value))
865         parse->priv->estimated_duration = dest_value;
866     }
867   }
868 }
869
870 static void
871 gst_base_parse_post_bitrates (GstBaseParse * parse, gboolean post_min,
872     gboolean post_avg, gboolean post_max)
873 {
874   GstTagList *taglist = gst_tag_list_new ();
875
876   if (post_min && parse->priv->post_min_bitrate)
877     gst_tag_list_add (taglist, GST_TAG_MERGE_REPLACE,
878         GST_TAG_MINIMUM_BITRATE, parse->priv->min_bitrate, NULL);
879
880   if (post_avg && parse->priv->post_min_bitrate)
881     gst_tag_list_add (taglist, GST_TAG_MERGE_REPLACE, GST_TAG_BITRATE,
882         parse->priv->avg_bitrate, NULL);
883
884   if (post_max && parse->priv->post_max_bitrate)
885     gst_tag_list_add (taglist, GST_TAG_MERGE_REPLACE,
886         GST_TAG_MAXIMUM_BITRATE, parse->priv->max_bitrate, NULL);
887
888   GST_DEBUG_OBJECT (parse, "Updated bitrates. Min: %u, Avg: %u, Max: %u",
889       parse->priv->min_bitrate, parse->priv->avg_bitrate,
890       parse->priv->max_bitrate);
891
892   gst_element_found_tags_for_pad (GST_ELEMENT (parse), parse->srcpad, taglist);
893 }
894
895 /**
896  * gst_base_parse_update_bitrates:
897  * @parse: #GstBaseParse.
898  * @buffer: Current frame as a #GstBuffer
899  *
900  * Keeps track of the minimum and maximum bitrates, and also maintains a
901  * running average bitrate of the stream so far.
902  */
903 static void
904 gst_base_parse_update_bitrates (GstBaseParse * parse, GstBuffer * buffer)
905 {
906   /* Only update the tag on a 10 kbps delta */
907   static const gint update_threshold = 10000;
908
909   GstBaseParseClass *klass;
910   guint64 data_len, frame_dur;
911   gint overhead = 0, frame_bitrate, old_avg_bitrate = parse->priv->avg_bitrate;
912   gboolean update_min = FALSE, update_avg = FALSE, update_max = FALSE;
913
914   klass = GST_BASE_PARSE_GET_CLASS (parse);
915
916   if (klass->get_frame_overhead) {
917     overhead = klass->get_frame_overhead (parse, buffer);
918     if (overhead == -1)
919       return;
920   }
921
922   data_len = GST_BUFFER_SIZE (buffer) - overhead;
923   parse->priv->data_bytecount += data_len;
924
925   if (parse->priv->fps_num) {
926     /* Calculate duration of a frame from frame properties */
927     frame_dur = (GST_SECOND * parse->priv->fps_den) / parse->priv->fps_num;
928     parse->priv->avg_bitrate = (8 * parse->priv->data_bytecount * GST_SECOND) /
929         (parse->priv->framecount * frame_dur);
930
931   } else if (GST_BUFFER_DURATION_IS_VALID (buffer)) {
932     /* Calculate duration of a frame from buffer properties */
933     frame_dur = GST_BUFFER_DURATION (buffer);
934     parse->priv->avg_bitrate = (8 * parse->priv->data_bytecount * GST_SECOND) /
935         parse->priv->acc_duration;
936
937   } else {
938     /* No way to figure out frame duration (is this even possible?) */
939     return;
940   }
941
942   frame_bitrate = (8 * data_len * GST_SECOND) / frame_dur;
943
944   if (frame_bitrate < parse->priv->min_bitrate) {
945     parse->priv->min_bitrate = frame_bitrate;
946     update_min = TRUE;
947   }
948
949   if (frame_bitrate > parse->priv->max_bitrate) {
950     parse->priv->max_bitrate = frame_bitrate;
951     update_max = TRUE;
952   }
953
954   if (old_avg_bitrate / update_threshold !=
955       parse->priv->avg_bitrate / update_threshold)
956     update_avg = TRUE;
957
958   if (parse->priv->framecount >= MIN_FRAMES_TO_POST_BITRATE &&
959       (update_min || update_avg || update_max))
960     gst_base_parse_post_bitrates (parse, update_min, update_avg, update_max);
961 }
962
963 /**
964  * gst_base_parse_handle_and_push_buffer:
965  * @parse: #GstBaseParse.
966  * @klass: #GstBaseParseClass.
967  * @buffer: #GstBuffer.
968  *
969  * Parses the frame from given buffer and pushes it forward. Also performs
970  * timestamp handling and checks the segment limits.
971  *
972  * This is called with srcpad STREAM_LOCK held.
973  *
974  * Returns: #GstFlowReturn
975  */
976 static GstFlowReturn
977 gst_base_parse_handle_and_push_buffer (GstBaseParse * parse,
978     GstBaseParseClass * klass, GstBuffer * buffer)
979 {
980   GstFlowReturn ret;
981
982   if (parse->priv->discont) {
983     GST_DEBUG_OBJECT (parse, "marking DISCONT");
984     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
985     parse->priv->discont = FALSE;
986   }
987
988   GST_LOG_OBJECT (parse,
989       "parsing frame at offset %" G_GUINT64_FORMAT
990       " (%#" G_GINT64_MODIFIER "x) of size %d",
991       GST_BUFFER_OFFSET (buffer), GST_BUFFER_OFFSET (buffer),
992       GST_BUFFER_SIZE (buffer));
993
994   ret = klass->parse_frame (parse, buffer);
995
996   /* re-use default handler to add missing metadata as-much-as-possible */
997   gst_base_parse_parse_frame (parse, buffer);
998   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer) &&
999       GST_BUFFER_DURATION_IS_VALID (buffer)) {
1000     parse->priv->next_ts =
1001         GST_BUFFER_TIMESTAMP (buffer) + GST_BUFFER_DURATION (buffer);
1002   } else {
1003     /* we lost track, do not produce bogus time next time around
1004      * (probably means parser subclass has given up on parsing as well) */
1005     GST_DEBUG_OBJECT (parse, "no next fallback timestamp");
1006     parse->priv->next_ts = GST_CLOCK_TIME_NONE;
1007   }
1008
1009   /* First buffers are dropped, this means that the subclass needs more
1010    * frames to decide on the format and queues them internally */
1011   /* convert internal flow to OK and mark discont for the next buffer. */
1012   if (ret == GST_BASE_PARSE_FLOW_DROPPED) {
1013     gst_buffer_unref (buffer);
1014     return GST_FLOW_OK;
1015   } else if (ret != GST_FLOW_OK) {
1016     return ret;
1017   }
1018
1019   return gst_base_parse_push_buffer (parse, buffer);
1020 }
1021
1022 /**
1023  * gst_base_parse_push_buffer:
1024  * @parse: #GstBaseParse.
1025  * @buffer: #GstBuffer.
1026  *
1027  * Pushes the buffer downstream, sends any pending events and
1028  * does some timestamp and segment handling.
1029  *
1030  * This must be called with srcpad STREAM_LOCK held.
1031  *
1032  * Returns: #GstFlowReturn
1033  */
1034 GstFlowReturn
1035 gst_base_parse_push_buffer (GstBaseParse * parse, GstBuffer * buffer)
1036 {
1037   GstFlowReturn ret = GST_FLOW_OK;
1038   GstClockTime last_start = GST_CLOCK_TIME_NONE;
1039   GstClockTime last_stop = GST_CLOCK_TIME_NONE;
1040
1041   GST_LOG_OBJECT (parse,
1042       "processing buffer of size %d with ts %" GST_TIME_FORMAT
1043       ", duration %" GST_TIME_FORMAT, GST_BUFFER_SIZE (buffer),
1044       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
1045       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
1046
1047   /* update stats */
1048   parse->priv->bytecount += GST_BUFFER_SIZE (buffer);
1049   if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BASE_PARSE_BUFFER_FLAG_NO_FRAME)) {
1050     parse->priv->framecount++;
1051     if (GST_BUFFER_DURATION_IS_VALID (buffer)) {
1052       parse->priv->acc_duration += GST_BUFFER_DURATION (buffer);
1053     }
1054   }
1055   GST_BUFFER_FLAG_UNSET (buffer, GST_BASE_PARSE_BUFFER_FLAG_NO_FRAME);
1056   if (parse->priv->update_interval &&
1057       (parse->priv->framecount % parse->priv->update_interval) == 0)
1058     gst_base_parse_update_duration (parse);
1059
1060   gst_base_parse_update_bitrates (parse, buffer);
1061
1062   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer))
1063     last_start = last_stop = GST_BUFFER_TIMESTAMP (buffer);
1064   if (last_start != GST_CLOCK_TIME_NONE
1065       && GST_BUFFER_DURATION_IS_VALID (buffer))
1066     last_stop = last_start + GST_BUFFER_DURATION (buffer);
1067
1068   /* should have caps by now */
1069   g_return_val_if_fail (GST_PAD_CAPS (parse->srcpad), GST_FLOW_ERROR);
1070
1071   gst_buffer_set_caps (buffer, GST_PAD_CAPS (parse->srcpad));
1072
1073   /* segment times are typically estimates,
1074    * actual frame data might lead subclass to different timestamps,
1075    * so override segment start from what is supplied there */
1076   if (G_UNLIKELY (parse->pending_segment && !parse->priv->passthrough &&
1077           GST_CLOCK_TIME_IS_VALID (last_start))) {
1078     gst_event_unref (parse->pending_segment);
1079     /* stop time possibly lost this way,
1080      * but unlikely and not really supported */
1081     parse->pending_segment =
1082         gst_event_new_new_segment (FALSE, parse->segment.rate,
1083         parse->segment.format, last_start, -1, last_start);
1084   }
1085
1086   /* and should then also be linked downstream, so safe to send some events */
1087   if (parse->priv->pad_mode == GST_ACTIVATE_PULL) {
1088     if (G_UNLIKELY (parse->close_segment)) {
1089       GST_DEBUG_OBJECT (parse, "loop sending close segment");
1090       gst_pad_push_event (parse->srcpad, parse->close_segment);
1091       parse->close_segment = NULL;
1092     }
1093
1094     if (G_UNLIKELY (parse->pending_segment)) {
1095       GST_DEBUG_OBJECT (parse, "loop push pending segment");
1096       gst_pad_push_event (parse->srcpad, parse->pending_segment);
1097       parse->pending_segment = NULL;
1098     }
1099   } else {
1100     if (G_UNLIKELY (parse->pending_segment)) {
1101       GST_DEBUG_OBJECT (parse, "chain pushing a pending segment");
1102       gst_pad_push_event (parse->srcpad, parse->pending_segment);
1103       parse->pending_segment = NULL;
1104     }
1105   }
1106
1107   if (G_UNLIKELY (parse->priv->pending_events)) {
1108     GList *l;
1109
1110     for (l = parse->priv->pending_events; l != NULL; l = l->next) {
1111       gst_pad_push_event (parse->srcpad, GST_EVENT (l->data));
1112     }
1113     g_list_free (parse->priv->pending_events);
1114     parse->priv->pending_events = NULL;
1115   }
1116
1117   /* TODO: Add to seek table */
1118
1119   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer) &&
1120       GST_CLOCK_TIME_IS_VALID (parse->segment.stop) &&
1121       GST_BUFFER_TIMESTAMP (buffer) > parse->segment.stop) {
1122     GST_LOG_OBJECT (parse, "Dropped frame, after segment");
1123     gst_buffer_unref (buffer);
1124   } else if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer) &&
1125       GST_BUFFER_DURATION_IS_VALID (buffer) &&
1126       GST_CLOCK_TIME_IS_VALID (parse->segment.start) &&
1127       GST_BUFFER_TIMESTAMP (buffer) + GST_BUFFER_DURATION (buffer)
1128       < parse->segment.start) {
1129     /* FIXME: subclass needs way to override the start as downstream might
1130      * need frames before for proper decoding */
1131     GST_LOG_OBJECT (parse, "Dropped frame, before segment");
1132     gst_buffer_unref (buffer);
1133   } else {
1134     ret = gst_pad_push (parse->srcpad, buffer);
1135     GST_LOG_OBJECT (parse, "frame (%d bytes) pushed: %d",
1136         GST_BUFFER_SIZE (buffer), ret);
1137   }
1138
1139   /* Update current running segment position */
1140   if (ret == GST_FLOW_OK && last_stop != GST_CLOCK_TIME_NONE)
1141     gst_segment_set_last_stop (&parse->segment, GST_FORMAT_TIME, last_stop);
1142
1143   return ret;
1144 }
1145
1146
1147 /**
1148  * gst_base_parse_drain:
1149  * @parse: #GstBaseParse.
1150  *
1151  * Drains the adapter until it is empty. It decreases the min_frame_size to
1152  * match the current adapter size and calls chain method until the adapter
1153  * is emptied or chain returns with error.
1154  */
1155 static void
1156 gst_base_parse_drain (GstBaseParse * parse)
1157 {
1158   guint avail;
1159
1160   GST_DEBUG_OBJECT (parse, "draining");
1161   parse->priv->drain = TRUE;
1162
1163   for (;;) {
1164     avail = gst_adapter_available (parse->adapter);
1165     if (!avail)
1166       break;
1167
1168     if (gst_base_parse_chain (parse->sinkpad, NULL) != GST_FLOW_OK) {
1169       break;
1170     }
1171
1172     /* nothing changed, maybe due to truncated frame; break infinite loop */
1173     if (avail == gst_adapter_available (parse->adapter)) {
1174       GST_DEBUG_OBJECT (parse, "no change during draining; flushing");
1175       gst_adapter_clear (parse->adapter);
1176     }
1177   }
1178
1179   parse->priv->drain = FALSE;
1180 }
1181
1182
1183 /**
1184  * gst_base_parse_chain:
1185  * @pad: #GstPad.
1186  * @buffer: #GstBuffer.
1187  *
1188  * Returns: #GstFlowReturn.
1189  */
1190 static GstFlowReturn
1191 gst_base_parse_chain (GstPad * pad, GstBuffer * buffer)
1192 {
1193   GstBaseParseClass *bclass;
1194   GstBaseParse *parse;
1195   GstFlowReturn ret = GST_FLOW_OK;
1196   GstBuffer *outbuf = NULL;
1197   GstBuffer *tmpbuf = NULL;
1198   guint fsize = 0;
1199   gint skip = -1;
1200   const guint8 *data;
1201   guint min_size;
1202   GstClockTime timestamp;
1203
1204   parse = GST_BASE_PARSE (GST_OBJECT_PARENT (pad));
1205   bclass = GST_BASE_PARSE_GET_CLASS (parse);
1206
1207   if (G_LIKELY (buffer)) {
1208     GST_LOG_OBJECT (parse, "buffer size: %d, offset = %" G_GINT64_FORMAT,
1209         GST_BUFFER_SIZE (buffer), GST_BUFFER_OFFSET (buffer));
1210     if (G_UNLIKELY (parse->priv->passthrough)) {
1211       buffer = gst_buffer_make_metadata_writable (buffer);
1212       return gst_base_parse_push_buffer (parse, buffer);
1213     } else
1214       gst_adapter_push (parse->adapter, buffer);
1215   }
1216
1217   /* Parse and push as many frames as possible */
1218   /* Stop either when adapter is empty or we are flushing */
1219   while (!parse->priv->flushing) {
1220     tmpbuf = gst_buffer_new ();
1221
1222     /* Synchronization loop */
1223     for (;;) {
1224       GST_BASE_PARSE_LOCK (parse);
1225       min_size = parse->priv->min_frame_size;
1226       GST_BASE_PARSE_UNLOCK (parse);
1227
1228       if (G_UNLIKELY (parse->priv->drain)) {
1229         min_size = gst_adapter_available (parse->adapter);
1230         GST_DEBUG_OBJECT (parse, "draining, data left: %d", min_size);
1231         if (G_UNLIKELY (!min_size)) {
1232           gst_buffer_unref (tmpbuf);
1233           goto done;
1234         }
1235       }
1236
1237       /* Collect at least min_frame_size bytes */
1238       if (gst_adapter_available (parse->adapter) < min_size) {
1239         GST_DEBUG_OBJECT (parse, "not enough data available (only %d bytes)",
1240             gst_adapter_available (parse->adapter));
1241         gst_buffer_unref (tmpbuf);
1242         goto done;
1243       }
1244
1245       data = gst_adapter_peek (parse->adapter, min_size);
1246       GST_BUFFER_DATA (tmpbuf) = (guint8 *) data;
1247       GST_BUFFER_SIZE (tmpbuf) = min_size;
1248       GST_BUFFER_OFFSET (tmpbuf) = parse->priv->offset;
1249       GST_BUFFER_FLAG_SET (tmpbuf, GST_MINI_OBJECT_FLAG_READONLY);
1250
1251       if (parse->priv->discont) {
1252         GST_DEBUG_OBJECT (parse, "marking DISCONT");
1253         GST_BUFFER_FLAG_SET (tmpbuf, GST_BUFFER_FLAG_DISCONT);
1254       }
1255
1256       skip = -1;
1257       if (bclass->check_valid_frame (parse, tmpbuf, &fsize, &skip)) {
1258         if (gst_adapter_available (parse->adapter) < fsize) {
1259           GST_DEBUG_OBJECT (parse,
1260               "found valid frame but not enough data available (only %d bytes)",
1261               gst_adapter_available (parse->adapter));
1262           gst_buffer_unref (tmpbuf);
1263           goto done;
1264         }
1265         break;
1266       }
1267       if (skip > 0) {
1268         GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip);
1269         gst_adapter_flush (parse->adapter, skip);
1270         parse->priv->offset += skip;
1271         parse->priv->discont = TRUE;
1272       } else if (skip == -1) {
1273         /* subclass didn't touch this value. By default we skip 1 byte */
1274         GST_LOG_OBJECT (parse, "finding sync, skipping 1 byte");
1275         gst_adapter_flush (parse->adapter, 1);
1276         parse->priv->offset++;
1277         parse->priv->discont = TRUE;
1278       }
1279       /* There is a possibility that subclass set the skip value to zero.
1280          This means that it has probably found a frame but wants to ask
1281          more data (by increasing the min_size) to be sure of this. */
1282     }
1283     gst_buffer_unref (tmpbuf);
1284     tmpbuf = NULL;
1285
1286     if (skip > 0) {
1287       /* Subclass found the sync, but still wants to skip some data */
1288       GST_LOG_OBJECT (parse, "skipping %d bytes", skip);
1289       gst_adapter_flush (parse->adapter, skip);
1290       parse->priv->offset += skip;
1291     }
1292
1293     /* Grab lock to prevent a race with FLUSH_START handler */
1294     GST_PAD_STREAM_LOCK (parse->srcpad);
1295
1296     /* FLUSH_START event causes the "flushing" flag to be set. In this
1297      * case we can leave the frame pushing loop */
1298     if (parse->priv->flushing) {
1299       GST_PAD_STREAM_UNLOCK (parse->srcpad);
1300       break;
1301     }
1302
1303     /* FIXME: Would it be more efficient to make a subbuffer instead? */
1304     outbuf = gst_adapter_take_buffer (parse->adapter, fsize);
1305     outbuf = gst_buffer_make_metadata_writable (outbuf);
1306
1307     /* Subclass may want to know the data offset */
1308     GST_BUFFER_OFFSET (outbuf) = parse->priv->offset;
1309     parse->priv->offset += fsize;
1310
1311     /* move along with upstream timestamp (if any),
1312      * but interpolate in between */
1313     timestamp = gst_adapter_prev_timestamp (parse->adapter, NULL);
1314     if (GST_CLOCK_TIME_IS_VALID (timestamp) &&
1315         (parse->priv->prev_ts != timestamp)) {
1316       parse->priv->prev_ts = parse->priv->next_ts = timestamp;
1317     }
1318
1319     ret = gst_base_parse_handle_and_push_buffer (parse, bclass, outbuf);
1320     GST_PAD_STREAM_UNLOCK (parse->srcpad);
1321
1322     if (ret != GST_FLOW_OK) {
1323       GST_LOG_OBJECT (parse, "push returned %d", ret);
1324       break;
1325     }
1326   }
1327
1328 done:
1329   GST_LOG_OBJECT (parse, "chain leaving");
1330   return ret;
1331 }
1332
1333 /* pull @size bytes at current offset,
1334  * i.e. at least try to and possibly return a shorter buffer if near the end */
1335 static GstFlowReturn
1336 gst_base_parse_pull_range (GstBaseParse * parse, guint size,
1337     GstBuffer ** buffer)
1338 {
1339   GstFlowReturn ret = GST_FLOW_OK;
1340
1341   g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
1342
1343   /* Caching here actually makes much less difference than one would expect.
1344    * We do it mainly to avoid pulling buffers of 1 byte all the time */
1345   if (parse->priv->cache) {
1346     gint64 cache_offset = GST_BUFFER_OFFSET (parse->priv->cache);
1347     gint cache_size = GST_BUFFER_SIZE (parse->priv->cache);
1348
1349     if (cache_offset <= parse->priv->offset &&
1350         (parse->priv->offset + size) <= (cache_offset + cache_size)) {
1351       *buffer = gst_buffer_create_sub (parse->priv->cache,
1352           parse->priv->offset - cache_offset, size);
1353       GST_BUFFER_OFFSET (*buffer) = parse->priv->offset;
1354       return GST_FLOW_OK;
1355     }
1356     /* not enough data in the cache, free cache and get a new one */
1357     gst_buffer_unref (parse->priv->cache);
1358     parse->priv->cache = NULL;
1359   }
1360
1361   /* refill the cache */
1362   ret =
1363       gst_pad_pull_range (parse->sinkpad, parse->priv->offset, MAX (size,
1364           64 * 1024), &parse->priv->cache);
1365   if (ret != GST_FLOW_OK) {
1366     parse->priv->cache = NULL;
1367     return ret;
1368   }
1369
1370   if (GST_BUFFER_SIZE (parse->priv->cache) >= size) {
1371     *buffer = gst_buffer_create_sub (parse->priv->cache, 0, size);
1372     GST_BUFFER_OFFSET (*buffer) = parse->priv->offset;
1373     return GST_FLOW_OK;
1374   }
1375
1376   /* Not possible to get enough data, try a last time with
1377    * requesting exactly the size we need */
1378   gst_buffer_unref (parse->priv->cache);
1379   parse->priv->cache = NULL;
1380
1381   ret = gst_pad_pull_range (parse->sinkpad, parse->priv->offset, size,
1382       &parse->priv->cache);
1383
1384   if (ret != GST_FLOW_OK) {
1385     GST_DEBUG_OBJECT (parse, "pull_range returned %d", ret);
1386     *buffer = NULL;
1387     return ret;
1388   }
1389
1390   if (GST_BUFFER_SIZE (parse->priv->cache) < size) {
1391     GST_DEBUG_OBJECT (parse, "Returning short buffer at offset %"
1392         G_GUINT64_FORMAT ": wanted %u bytes, got %u bytes", parse->priv->offset,
1393         size, GST_BUFFER_SIZE (parse->priv->cache));
1394
1395     *buffer = parse->priv->cache;
1396     parse->priv->cache = NULL;
1397
1398     return GST_FLOW_OK;
1399   }
1400
1401   *buffer = gst_buffer_create_sub (parse->priv->cache, 0, size);
1402   GST_BUFFER_OFFSET (*buffer) = parse->priv->offset;
1403
1404   return GST_FLOW_OK;
1405 }
1406
1407 /**
1408  * gst_base_parse_loop:
1409  * @pad: GstPad
1410  *
1411  * Loop that is used in pull mode to retrieve data from upstream.
1412  */
1413 static void
1414 gst_base_parse_loop (GstPad * pad)
1415 {
1416   GstBaseParse *parse;
1417   GstBaseParseClass *klass;
1418   GstBuffer *buffer, *outbuf;
1419   gboolean ret = FALSE;
1420   guint fsize = 0, min_size;
1421   gint skip = 0;
1422
1423   parse = GST_BASE_PARSE (gst_pad_get_parent (pad));
1424   klass = GST_BASE_PARSE_GET_CLASS (parse);
1425
1426   /* TODO: Check if we reach segment stop limit */
1427
1428   while (TRUE) {
1429
1430     GST_BASE_PARSE_LOCK (parse);
1431     min_size = parse->priv->min_frame_size;
1432     GST_BASE_PARSE_UNLOCK (parse);
1433
1434     ret = gst_base_parse_pull_range (parse, min_size, &buffer);
1435
1436     if (ret == GST_FLOW_UNEXPECTED)
1437       goto eos;
1438     else if (ret != GST_FLOW_OK)
1439       goto need_pause;
1440
1441     if (parse->priv->discont) {
1442       GST_DEBUG_OBJECT (parse, "marking DISCONT");
1443       GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1444     }
1445
1446     /* if we got a short read, inform subclass we are draining leftover
1447      * and no more is to be expected */
1448     if (GST_BUFFER_SIZE (buffer) < min_size)
1449       parse->priv->drain = TRUE;
1450
1451     skip = -1;
1452     if (klass->check_valid_frame (parse, buffer, &fsize, &skip)) {
1453       parse->priv->drain = FALSE;
1454       break;
1455     }
1456     parse->priv->drain = FALSE;
1457     if (skip > 0) {
1458       GST_LOG_OBJECT (parse, "finding sync, skipping %d bytes", skip);
1459       parse->priv->offset += skip;
1460       parse->priv->discont = TRUE;
1461     } else if (skip == -1) {
1462       GST_LOG_OBJECT (parse, "finding sync, skipping 1 byte");
1463       parse->priv->offset++;
1464       parse->priv->discont = TRUE;
1465     }
1466     GST_DEBUG_OBJECT (parse, "finding sync...");
1467     gst_buffer_unref (buffer);
1468   }
1469
1470   if (fsize <= GST_BUFFER_SIZE (buffer)) {
1471     outbuf = gst_buffer_create_sub (buffer, 0, fsize);
1472     GST_BUFFER_OFFSET (outbuf) = GST_BUFFER_OFFSET (buffer);
1473     gst_buffer_unref (buffer);
1474   } else {
1475     gst_buffer_unref (buffer);
1476     ret = gst_base_parse_pull_range (parse, fsize, &outbuf);
1477
1478     if (ret == GST_FLOW_UNEXPECTED)
1479       goto eos;
1480     else if (ret != GST_FLOW_OK)
1481       goto need_pause;
1482     if (GST_BUFFER_SIZE (outbuf) < fsize)
1483       goto eos;
1484   }
1485
1486   parse->priv->offset += fsize;
1487
1488   /* Does the subclass want to skip too? */
1489   if (skip > 0)
1490     parse->priv->offset += skip;
1491
1492   /* This always unrefs the outbuf, even if error occurs */
1493   ret = gst_base_parse_handle_and_push_buffer (parse, klass, outbuf);
1494
1495   if (ret != GST_FLOW_OK) {
1496     GST_DEBUG_OBJECT (parse, "flow: %s", gst_flow_get_name (ret));
1497     if (ret == GST_FLOW_UNEXPECTED) {
1498       gst_pad_push_event (parse->srcpad, gst_event_new_eos ());
1499     } else if (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED) {
1500       GST_ELEMENT_ERROR (parse, STREAM, FAILED, (NULL),
1501           ("streaming task paused, reason: %s", gst_flow_get_name (ret)));
1502       gst_pad_push_event (parse->srcpad, gst_event_new_eos ());
1503     }
1504     goto need_pause;
1505   }
1506
1507   gst_object_unref (parse);
1508   return;
1509
1510 need_pause:
1511   {
1512     GST_LOG_OBJECT (parse, "pausing task %d", ret);
1513     gst_pad_pause_task (pad);
1514     gst_object_unref (parse);
1515     return;
1516   }
1517 eos:
1518   {
1519     GST_LOG_OBJECT (parse, "sending eos");
1520     gst_pad_push_event (parse->srcpad, gst_event_new_eos ());
1521     goto need_pause;
1522   }
1523 }
1524
1525
1526 /**
1527  * gst_base_parse_sink_activate:
1528  * @sinkpad: #GstPad to be activated.
1529  *
1530  * Returns: TRUE if activation succeeded.
1531  */
1532 static gboolean
1533 gst_base_parse_sink_activate (GstPad * sinkpad)
1534 {
1535   GstBaseParse *parse;
1536   gboolean result = TRUE;
1537
1538   parse = GST_BASE_PARSE (gst_pad_get_parent (sinkpad));
1539
1540   GST_DEBUG_OBJECT (parse, "sink activate");
1541
1542   if (gst_pad_check_pull_range (sinkpad)) {
1543     GST_DEBUG_OBJECT (parse, "trying to activate in pull mode");
1544     result = gst_pad_activate_pull (sinkpad, TRUE);
1545   } else {
1546     GST_DEBUG_OBJECT (parse, "trying to activate in push mode");
1547     result = gst_pad_activate_push (sinkpad, TRUE);
1548   }
1549
1550   GST_DEBUG_OBJECT (parse, "sink activate return %d", result);
1551   gst_object_unref (parse);
1552   return result;
1553 }
1554
1555
1556 /**
1557  * gst_base_parse_activate:
1558  * @parse: #GstBaseParse.
1559  * @active: TRUE if element will be activated, FALSE if disactivated.
1560  *
1561  * Returns: TRUE if the operation succeeded.
1562  */
1563 static gboolean
1564 gst_base_parse_activate (GstBaseParse * parse, gboolean active)
1565 {
1566   GstBaseParseClass *klass;
1567   gboolean result = FALSE;
1568
1569   GST_DEBUG_OBJECT (parse, "activate");
1570
1571   klass = GST_BASE_PARSE_GET_CLASS (parse);
1572
1573   if (active) {
1574     if (parse->priv->pad_mode == GST_ACTIVATE_NONE && klass->start)
1575       result = klass->start (parse);
1576
1577     GST_OBJECT_LOCK (parse);
1578     gst_segment_init (&parse->segment, GST_FORMAT_TIME);
1579     parse->priv->duration = -1;
1580     parse->priv->discont = TRUE;
1581     parse->priv->flushing = FALSE;
1582     parse->priv->offset = 0;
1583     parse->priv->update_interval = 0;
1584     parse->priv->fps_num = parse->priv->fps_den = 0;
1585     parse->priv->frame_duration = GST_CLOCK_TIME_NONE;
1586     parse->priv->framecount = 0;
1587     parse->priv->bytecount = 0;
1588     parse->priv->acc_duration = 0;
1589     parse->priv->estimated_duration = -1;
1590     parse->priv->next_ts = 0;
1591     parse->priv->passthrough = FALSE;
1592     parse->priv->post_min_bitrate = TRUE;
1593     parse->priv->post_avg_bitrate = TRUE;
1594     parse->priv->post_max_bitrate = TRUE;
1595     parse->priv->min_bitrate = G_MAXUINT;
1596     parse->priv->max_bitrate = 0;
1597     parse->priv->max_bitrate = 0;
1598
1599     if (parse->pending_segment)
1600       gst_event_unref (parse->pending_segment);
1601
1602     parse->pending_segment =
1603         gst_event_new_new_segment (FALSE, parse->segment.rate,
1604         parse->segment.format,
1605         parse->segment.start, parse->segment.stop, parse->segment.last_stop);
1606
1607     GST_OBJECT_UNLOCK (parse);
1608   } else {
1609     /* We must make sure streaming has finished before resetting things
1610      * and calling the ::stop vfunc */
1611     GST_PAD_STREAM_LOCK (parse->sinkpad);
1612     GST_PAD_STREAM_UNLOCK (parse->sinkpad);
1613
1614     if (parse->priv->pad_mode != GST_ACTIVATE_NONE && klass->stop)
1615       result = klass->stop (parse);
1616
1617     g_list_foreach (parse->priv->pending_events, (GFunc) gst_mini_object_unref,
1618         NULL);
1619     g_list_free (parse->priv->pending_events);
1620     parse->priv->pending_events = NULL;
1621
1622     if (parse->priv->cache) {
1623       gst_buffer_unref (parse->priv->cache);
1624       parse->priv->cache = NULL;
1625     }
1626
1627     parse->priv->pad_mode = GST_ACTIVATE_NONE;
1628   }
1629   GST_DEBUG_OBJECT (parse, "activate: %d", result);
1630   return result;
1631 }
1632
1633
1634 /**
1635  * gst_base_parse_sink_activate_push:
1636  * @pad: #GstPad to be (de)activated.
1637  * @active: TRUE when activating, FALSE when deactivating.
1638  *
1639  * Returns: TRUE if (de)activation succeeded.
1640  */
1641 static gboolean
1642 gst_base_parse_sink_activate_push (GstPad * pad, gboolean active)
1643 {
1644   gboolean result = TRUE;
1645   GstBaseParse *parse;
1646
1647   parse = GST_BASE_PARSE (gst_pad_get_parent (pad));
1648
1649   GST_DEBUG_OBJECT (parse, "sink activate push");
1650
1651   result = gst_base_parse_activate (parse, active);
1652
1653   if (result)
1654     parse->priv->pad_mode = active ? GST_ACTIVATE_PUSH : GST_ACTIVATE_NONE;
1655
1656   GST_DEBUG_OBJECT (parse, "sink activate push: %d", result);
1657
1658   gst_object_unref (parse);
1659   return result;
1660 }
1661
1662
1663 /**
1664  * gst_base_parse_sink_activate_pull:
1665  * @sinkpad: #GstPad to be (de)activated.
1666  * @active: TRUE when activating, FALSE when deactivating.
1667  *
1668  * Returns: TRUE if (de)activation succeeded.
1669  */
1670 static gboolean
1671 gst_base_parse_sink_activate_pull (GstPad * sinkpad, gboolean active)
1672 {
1673   gboolean result = FALSE;
1674   GstBaseParse *parse;
1675
1676   parse = GST_BASE_PARSE (gst_pad_get_parent (sinkpad));
1677
1678   GST_DEBUG_OBJECT (parse, "activate pull");
1679
1680   result = gst_base_parse_activate (parse, active);
1681
1682   if (result) {
1683     if (active) {
1684       result &= gst_pad_start_task (sinkpad,
1685           (GstTaskFunction) gst_base_parse_loop, sinkpad);
1686     } else {
1687       result &= gst_pad_stop_task (sinkpad);
1688     }
1689   }
1690
1691   if (result)
1692     parse->priv->pad_mode = active ? GST_ACTIVATE_PULL : GST_ACTIVATE_NONE;
1693
1694   GST_DEBUG_OBJECT (parse, "sink activate pull: %d", result);
1695
1696   gst_object_unref (parse);
1697   return result;
1698 }
1699
1700
1701 /**
1702  * gst_base_parse_set_duration:
1703  * @parse: #GstBaseParse.
1704  * @fmt: #GstFormat.
1705  * @duration: duration value.
1706  *
1707  * Sets the duration of the currently playing media. Subclass can use this
1708  * when it notices a change in the media duration.
1709  */
1710 void
1711 gst_base_parse_set_duration (GstBaseParse * parse,
1712     GstFormat fmt, gint64 duration)
1713 {
1714   g_return_if_fail (parse != NULL);
1715
1716   GST_BASE_PARSE_LOCK (parse);
1717   if (duration != parse->priv->duration) {
1718     GstMessage *m;
1719
1720     m = gst_message_new_duration (GST_OBJECT (parse), fmt, duration);
1721     gst_element_post_message (GST_ELEMENT (parse), m);
1722
1723     /* TODO: what about duration tag? */
1724   }
1725   parse->priv->duration = duration;
1726   parse->priv->duration_fmt = fmt;
1727   GST_DEBUG_OBJECT (parse, "set duration: %" G_GINT64_FORMAT, duration);
1728   GST_BASE_PARSE_UNLOCK (parse);
1729 }
1730
1731
1732 /**
1733  * gst_base_parse_set_min_frame_size:
1734  * @parse: #GstBaseParse.
1735  * @min_size: Minimum size of the data that this base class should give to
1736  *            subclass.
1737  *
1738  * Subclass can use this function to tell the base class that it needs to
1739  * give at least #min_size buffers.
1740  */
1741 void
1742 gst_base_parse_set_min_frame_size (GstBaseParse * parse, guint min_size)
1743 {
1744   g_return_if_fail (parse != NULL);
1745
1746   GST_BASE_PARSE_LOCK (parse);
1747   parse->priv->min_frame_size = min_size;
1748   GST_LOG_OBJECT (parse, "set frame_min_size: %d", min_size);
1749   GST_BASE_PARSE_UNLOCK (parse);
1750 }
1751
1752 /**
1753  * gst_base_transform_set_passthrough:
1754  * @trans: the #GstBaseParse to set
1755  * @passthrough: boolean indicating passthrough mode.
1756  *
1757  * Set passthrough mode for this parser.  If operating in passthrough,
1758  * incoming buffers are pushed through unmodified.
1759  */
1760 void
1761 gst_base_parse_set_passthrough (GstBaseParse * parse, gboolean passthrough)
1762 {
1763   g_return_if_fail (parse != NULL);
1764
1765   GST_BASE_PARSE_LOCK (parse);
1766   parse->priv->passthrough = passthrough;
1767   GST_LOG_OBJECT (parse, "set passthrough: %d", passthrough);
1768   GST_BASE_PARSE_UNLOCK (parse);
1769 }
1770
1771 /**
1772  * gst_base_transform_set_frame_props:
1773  * @parse: the #GstBaseParse to set
1774  * @fps_num: frames per second (numerator).
1775  * @fps_den: frames per second (denominator).
1776  * @interval: duration update interval in frames.
1777  *
1778  * If frames per second is configured, parser can provide for default @convert
1779  * between GST_FORMAT_TIME and GST_FORMAT_BYTES, as well as buffer duration
1780  * and timestamping.  However, even if this frame information is provided,
1781  * subclass can still choose to provide for a @convert and set buffer metadata.
1782  * If #interval is non-zero (default), then stream duration is determined
1783  * based on frame and byte counts, and updated every #interval frames.
1784  */
1785 void
1786 gst_base_parse_set_frame_props (GstBaseParse * parse, guint fps_num,
1787     guint fps_den, gint interval)
1788 {
1789   g_return_if_fail (parse != NULL);
1790
1791   GST_BASE_PARSE_LOCK (parse);
1792   parse->priv->fps_num = fps_num;
1793   parse->priv->fps_den = fps_den;
1794   parse->priv->update_interval = interval;
1795   if (!fps_num || !fps_den) {
1796     GST_DEBUG_OBJECT (parse, "invalid fps (%d/%d), ignoring parameters",
1797         fps_num, fps_den);
1798     fps_num = fps_den = 0;
1799     interval = 0;
1800     parse->priv->frame_duration = GST_CLOCK_TIME_NONE;
1801   } else {
1802     parse->priv->frame_duration =
1803         gst_util_uint64_scale (GST_SECOND, parse->priv->fps_den,
1804         parse->priv->fps_num);
1805   }
1806   GST_LOG_OBJECT (parse, "set fps: %d/%d => duration: %" G_GINT64_FORMAT " ms",
1807       fps_num, fps_den, parse->priv->frame_duration / GST_MSECOND);
1808   GST_LOG_OBJECT (parse, "set update interval: %d", interval);
1809   GST_BASE_PARSE_UNLOCK (parse);
1810 }
1811
1812 /**
1813  * gst_base_transform_get_sync:
1814  * @parse: the #GstBaseParse to query
1815  *
1816  * Returns: TRUE if parser is considered 'in sync'.  That is, frames have been
1817  * continuously successfully parsed and pushed.
1818  */
1819 gboolean
1820 gst_base_parse_get_sync (GstBaseParse * parse)
1821 {
1822   gboolean ret;
1823
1824   g_return_val_if_fail (parse != NULL, FALSE);
1825
1826   GST_BASE_PARSE_LOCK (parse);
1827   /* losing sync is pretty much a discont (and vice versa), no ? */
1828   ret = !parse->priv->discont;
1829   GST_BASE_PARSE_UNLOCK (parse);
1830
1831   GST_DEBUG_OBJECT (parse, "sync: %d", ret);
1832   return ret;
1833 }
1834
1835 /**
1836  * gst_base_transform_get_drain:
1837  * @parse: the #GstBaseParse to query
1838  *
1839  * Returns: TRUE if parser is currently 'draining'.  That is, leftover data
1840  * (e.g. in FLUSH or EOS situation) is being parsed.
1841  */
1842 gboolean
1843 gst_base_parse_get_drain (GstBaseParse * parse)
1844 {
1845   gboolean ret;
1846
1847   g_return_val_if_fail (parse != NULL, FALSE);
1848
1849   GST_BASE_PARSE_LOCK (parse);
1850   /* losing sync is pretty much a discont (and vice versa), no ? */
1851   ret = parse->priv->drain;
1852   GST_BASE_PARSE_UNLOCK (parse);
1853
1854   GST_DEBUG_OBJECT (parse, "drain: %d", ret);
1855   return ret;
1856 }
1857
1858 static gboolean
1859 gst_base_parse_get_duration (GstBaseParse * parse, GstFormat format,
1860     GstClockTime * duration)
1861 {
1862   GstBaseParseClass *klass = GST_BASE_PARSE_GET_CLASS (parse);
1863   gboolean res = FALSE;
1864
1865   g_return_val_if_fail (duration != NULL, FALSE);
1866
1867   *duration = GST_CLOCK_TIME_NONE;
1868   if (parse->priv->duration != -1 && format == parse->priv->duration_fmt) {
1869     GST_LOG_OBJECT (parse, "using provided duration");
1870     *duration = parse->priv->duration;
1871     res = TRUE;
1872   } else if (parse->priv->duration != -1) {
1873     GST_LOG_OBJECT (parse, "converting provided duration");
1874     res = klass->convert (parse, parse->priv->duration_fmt,
1875         parse->priv->duration, format, (gint64 *) duration);
1876   } else if (format == GST_FORMAT_TIME && parse->priv->estimated_duration != -1) {
1877     GST_LOG_OBJECT (parse, "using estimated duration");
1878     *duration = parse->priv->estimated_duration;
1879     res = TRUE;
1880   }
1881
1882   GST_LOG_OBJECT (parse, "res: %d, duration %" GST_TIME_FORMAT, res,
1883       GST_TIME_ARGS (*duration));
1884   return res;
1885 }
1886
1887 /**
1888  * gst_base_parse_get_querytypes:
1889  * @pad: GstPad
1890  *
1891  * Returns: A table of #GstQueryType items describing supported query types.
1892  */
1893 static const GstQueryType *
1894 gst_base_parse_get_querytypes (GstPad * pad)
1895 {
1896   static const GstQueryType list[] = {
1897     GST_QUERY_POSITION,
1898     GST_QUERY_DURATION,
1899     GST_QUERY_FORMATS,
1900     GST_QUERY_SEEKING,
1901     GST_QUERY_CONVERT,
1902     0
1903   };
1904
1905   return list;
1906 }
1907
1908
1909 /**
1910  * gst_base_parse_query:
1911  * @pad: #GstPad.
1912  * @query: #GstQuery.
1913  *
1914  * Returns: TRUE on success.
1915  */
1916 static gboolean
1917 gst_base_parse_query (GstPad * pad, GstQuery * query)
1918 {
1919   GstBaseParse *parse;
1920   GstBaseParseClass *klass;
1921   gboolean res = FALSE;
1922
1923   parse = GST_BASE_PARSE (GST_PAD_PARENT (pad));
1924   klass = GST_BASE_PARSE_GET_CLASS (parse);
1925
1926   /* If subclass doesn't provide conversion function we can't reply
1927      to the query either */
1928   if (!klass->convert) {
1929     return FALSE;
1930   }
1931
1932   GST_LOG_OBJECT (parse, "handling query: %" GST_PTR_FORMAT, query);
1933
1934   switch (GST_QUERY_TYPE (query)) {
1935     case GST_QUERY_POSITION:
1936     {
1937       gint64 dest_value;
1938       GstFormat format;
1939
1940       GST_DEBUG_OBJECT (parse, "position query");
1941       gst_query_parse_position (query, &format, NULL);
1942
1943       g_mutex_lock (parse->parse_lock);
1944       if (format == GST_FORMAT_BYTES) {
1945         dest_value = parse->priv->offset;
1946         res = TRUE;
1947       } else if (format == parse->segment.format &&
1948           GST_CLOCK_TIME_IS_VALID (parse->segment.last_stop)) {
1949         dest_value = parse->segment.last_stop;
1950         res = TRUE;
1951       }
1952       g_mutex_unlock (parse->parse_lock);
1953
1954       if (res)
1955         gst_query_set_position (query, format, dest_value);
1956       else {
1957         res = gst_pad_query_default (pad, query);
1958         if (!res) {
1959           /* no precise result, upstream no idea either, then best estimate */
1960           /* priv->offset is updated in both PUSH/PULL modes */
1961           g_mutex_lock (parse->parse_lock);
1962           res = klass->convert (parse, GST_FORMAT_BYTES, parse->priv->offset,
1963               format, &dest_value);
1964           g_mutex_unlock (parse->parse_lock);
1965         }
1966       }
1967       break;
1968     }
1969     case GST_QUERY_DURATION:
1970     {
1971       GstFormat format;
1972       GstClockTime duration;
1973
1974       GST_DEBUG_OBJECT (parse, "duration query");
1975       gst_query_parse_duration (query, &format, NULL);
1976
1977       /* consult upstream */
1978       res = gst_pad_query_default (pad, query);
1979
1980       /* otherwise best estimate from us */
1981       if (!res) {
1982         g_mutex_lock (parse->parse_lock);
1983         res = gst_base_parse_get_duration (parse, format, &duration);
1984         g_mutex_unlock (parse->parse_lock);
1985         if (res)
1986           gst_query_set_duration (query, format, duration);
1987       }
1988       break;
1989     }
1990     case GST_QUERY_SEEKING:
1991     {
1992       GstFormat fmt;
1993       GstClockTime duration = GST_CLOCK_TIME_NONE;
1994       gboolean seekable = FALSE;
1995
1996       GST_DEBUG_OBJECT (parse, "seeking query");
1997       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
1998
1999       /* consult upstream */
2000       res = gst_pad_query_default (pad, query);
2001
2002       /* we may be able to help if in TIME */
2003       if (fmt == GST_FORMAT_TIME && klass->is_seekable (parse)) {
2004         gst_query_parse_seeking (query, &fmt, &seekable, NULL, NULL);
2005         /* already OK if upstream takes care */
2006         GST_LOG_OBJECT (parse, "upstream handled %d, seekable %d",
2007             res, seekable);
2008         if (!(res && seekable)) {
2009           /* TODO maybe also check upstream provides proper duration ? */
2010           seekable = TRUE;
2011           if (!gst_base_parse_get_duration (parse, GST_FORMAT_TIME, &duration)
2012               || duration == -1) {
2013             seekable = FALSE;
2014           } else {
2015             GstQuery *q;
2016
2017             q = gst_query_new_seeking (GST_FORMAT_BYTES);
2018             if (!gst_pad_peer_query (parse->sinkpad, q)) {
2019               seekable = FALSE;
2020             } else {
2021               gst_query_parse_seeking (q, &fmt, &seekable, NULL, NULL);
2022             }
2023             GST_LOG_OBJECT (parse, "upstream BYTE handled %d, seekable %d",
2024                 res, seekable);
2025             gst_query_unref (q);
2026           }
2027           gst_query_set_seeking (query, GST_FORMAT_TIME, seekable, 0, duration);
2028           res = TRUE;
2029         }
2030       }
2031       break;
2032     }
2033     case GST_QUERY_FORMATS:
2034       gst_query_set_formatsv (query, 3, fmtlist);
2035       res = TRUE;
2036       break;
2037     case GST_QUERY_CONVERT:
2038     {
2039       GstFormat src_format, dest_format;
2040       gint64 src_value, dest_value;
2041
2042       gst_query_parse_convert (query, &src_format, &src_value,
2043           &dest_format, &dest_value);
2044
2045       res = klass->convert (parse, src_format, src_value,
2046           dest_format, &dest_value);
2047       if (res) {
2048         gst_query_set_convert (query, src_format, src_value,
2049             dest_format, dest_value);
2050       }
2051       break;
2052     }
2053     default:
2054       res = gst_pad_query_default (pad, query);
2055       break;
2056   }
2057   return res;
2058 }
2059
2060
2061 /**
2062  * gst_base_parse_handle_seek:
2063  * @parse: #GstBaseParse.
2064  * @event: #GstEvent.
2065  *
2066  * Returns: TRUE if seek succeeded.
2067  */
2068 static gboolean
2069 gst_base_parse_handle_seek (GstBaseParse * parse, GstEvent * event)
2070 {
2071   GstBaseParseClass *klass;
2072   gdouble rate;
2073   GstFormat format;
2074   GstSeekFlags flags;
2075   GstSeekType cur_type = GST_SEEK_TYPE_NONE, stop_type;
2076   gboolean flush, update, res = TRUE;
2077   gint64 cur, stop, seekpos;
2078   GstSegment seeksegment = { 0, };
2079   GstFormat dstformat;
2080
2081   klass = GST_BASE_PARSE_GET_CLASS (parse);
2082
2083   gst_event_parse_seek (event, &rate, &format, &flags,
2084       &cur_type, &cur, &stop_type, &stop);
2085
2086   GST_DEBUG_OBJECT (parse, "seek to format %s, "
2087       "start type %d at %" GST_TIME_FORMAT ", end type %d at %"
2088       GST_TIME_FORMAT, gst_format_get_name (format),
2089       cur_type, GST_TIME_ARGS (cur), stop_type, GST_TIME_ARGS (stop));
2090
2091   /* no negative rates yet */
2092   if (rate < 0.0)
2093     goto negative_rate;
2094
2095   if (cur_type != GST_SEEK_TYPE_SET)
2096     goto wrong_type;
2097
2098   /* For any format other than TIME, see if upstream handles
2099    * it directly or fail. For TIME, try upstream, but do it ourselves if
2100    * it fails upstream */
2101   if (format != GST_FORMAT_TIME) {
2102     /* default action delegates to upstream */
2103     return FALSE;
2104   } else {
2105     gst_event_ref (event);
2106     if (gst_pad_push_event (parse->sinkpad, event)) {
2107       return TRUE;
2108     }
2109   }
2110
2111   /* too much estimating going on to support this sensibly,
2112    * and no eos/end-of-segment loop handling either ... */
2113   if ((stop_type == GST_SEEK_TYPE_SET && stop != GST_CLOCK_TIME_NONE) ||
2114       (stop_type != GST_SEEK_TYPE_NONE && stop_type != GST_SEEK_TYPE_SET) ||
2115       (flags & GST_SEEK_FLAG_SEGMENT))
2116     goto wrong_type;
2117   stop = -1;
2118
2119   /* get flush flag */
2120   flush = flags & GST_SEEK_FLAG_FLUSH;
2121
2122   /* copy segment, we need this because we still need the old
2123    * segment when we close the current segment. */
2124   memcpy (&seeksegment, &parse->segment, sizeof (GstSegment));
2125
2126   GST_DEBUG_OBJECT (parse, "configuring seek");
2127   gst_segment_set_seek (&seeksegment, rate, format, flags,
2128       cur_type, cur, stop_type, stop, &update);
2129
2130   /* figure out the last position we need to play. If it's configured (stop !=
2131    * -1), use that, else we play until the total duration of the file */
2132   if ((stop = seeksegment.stop) == -1)
2133     stop = seeksegment.duration;
2134
2135   dstformat = GST_FORMAT_BYTES;
2136   if (!gst_pad_query_convert (parse->srcpad, format, seeksegment.last_stop,
2137           &dstformat, &seekpos)) {
2138     GST_DEBUG_OBJECT (parse, "conversion failed");
2139     return FALSE;
2140   }
2141
2142   GST_DEBUG_OBJECT (parse,
2143       "seek position %" G_GINT64_FORMAT " in bytes: %" G_GINT64_FORMAT, cur,
2144       seekpos);
2145
2146   if (parse->priv->pad_mode == GST_ACTIVATE_PULL) {
2147     gint64 last_stop;
2148
2149     GST_DEBUG_OBJECT (parse, "seek in PULL mode");
2150
2151     if (flush) {
2152       if (parse->srcpad) {
2153         GST_DEBUG_OBJECT (parse, "sending flush start");
2154         gst_pad_push_event (parse->srcpad, gst_event_new_flush_start ());
2155       }
2156     } else {
2157       gst_pad_pause_task (parse->sinkpad);
2158     }
2159
2160     /* we should now be able to grab the streaming thread because we stopped it
2161      * with the above flush/pause code */
2162     GST_PAD_STREAM_LOCK (parse->sinkpad);
2163
2164     /* save current position */
2165     last_stop = parse->segment.last_stop;
2166     GST_DEBUG_OBJECT (parse, "stopped streaming at %" G_GINT64_FORMAT,
2167         last_stop);
2168
2169     /* now commit to new position */
2170     parse->priv->offset = seekpos;
2171
2172     /* prepare for streaming again */
2173     if (flush) {
2174       GST_DEBUG_OBJECT (parse, "sending flush stop");
2175       gst_pad_push_event (parse->srcpad, gst_event_new_flush_stop ());
2176     } else {
2177       if (parse->close_segment)
2178         gst_event_unref (parse->close_segment);
2179
2180       parse->close_segment = gst_event_new_new_segment (TRUE,
2181           parse->segment.rate, parse->segment.format,
2182           parse->segment.accum, parse->segment.last_stop, parse->segment.accum);
2183
2184       /* keep track of our last_stop */
2185       seeksegment.accum = parse->segment.last_stop;
2186
2187       GST_DEBUG_OBJECT (parse, "Created close seg format %d, "
2188           "start = %" GST_TIME_FORMAT ", stop = %" GST_TIME_FORMAT
2189           ", pos = %" GST_TIME_FORMAT, format,
2190           GST_TIME_ARGS (parse->segment.accum),
2191           GST_TIME_ARGS (parse->segment.last_stop),
2192           GST_TIME_ARGS (parse->segment.accum));
2193     }
2194
2195     memcpy (&parse->segment, &seeksegment, sizeof (GstSegment));
2196
2197     /* store the newsegment event so it can be sent from the streaming thread. */
2198     if (parse->pending_segment)
2199       gst_event_unref (parse->pending_segment);
2200
2201     /* This will be sent later in _loop() */
2202     parse->pending_segment =
2203         gst_event_new_new_segment (FALSE, parse->segment.rate,
2204         parse->segment.format,
2205         parse->segment.last_stop, stop, parse->segment.last_stop);
2206
2207     GST_DEBUG_OBJECT (parse, "Created newseg format %d, "
2208         "start = %" GST_TIME_FORMAT ", stop = %" GST_TIME_FORMAT
2209         ", pos = %" GST_TIME_FORMAT, format,
2210         GST_TIME_ARGS (parse->segment.last_stop),
2211         GST_TIME_ARGS (stop), GST_TIME_ARGS (parse->segment.last_stop));
2212
2213     /* mark discont if we are going to stream from another position. */
2214     if (last_stop != parse->segment.last_stop) {
2215       GST_DEBUG_OBJECT (parse,
2216           "mark DISCONT, we did a seek to another position");
2217       parse->priv->discont = TRUE;
2218       parse->priv->next_ts = parse->segment.last_stop;
2219     }
2220
2221     /* Start streaming thread if paused */
2222     gst_pad_start_task (parse->sinkpad,
2223         (GstTaskFunction) gst_base_parse_loop, parse->sinkpad);
2224
2225     GST_PAD_STREAM_UNLOCK (parse->sinkpad);
2226   } else {
2227     GstEvent *new_event;
2228     /* The only thing we need to do in PUSH-mode is to send the
2229        seek event (in bytes) to upstream. Segment / flush handling happens
2230        in corresponding src event handlers */
2231     GST_DEBUG_OBJECT (parse, "seek in PUSH mode");
2232     new_event = gst_event_new_seek (rate, GST_FORMAT_BYTES, flush,
2233         GST_SEEK_TYPE_SET, seekpos, stop_type, stop);
2234
2235     res = gst_pad_push_event (parse->sinkpad, new_event);
2236   }
2237
2238 done:
2239   return res;
2240
2241   /* ERRORS */
2242 negative_rate:
2243   {
2244     GST_DEBUG_OBJECT (parse, "negative playback rates are not supported yet.");
2245     res = FALSE;
2246     goto done;
2247   }
2248 wrong_type:
2249   {
2250     GST_DEBUG_OBJECT (parse, "unsupported seek type.");
2251     res = FALSE;
2252     goto done;
2253   }
2254 }
2255
2256 /**
2257  * gst_base_parse_handle_tag:
2258  * @parse: #GstBaseParse.
2259  * @event: #GstEvent.
2260  *
2261  * Checks if bitrates are available from upstream tags so that we don't
2262  * override them later
2263  */
2264 static void
2265 gst_base_parse_handle_tag (GstBaseParse * parse, GstEvent * event)
2266 {
2267   GstTagList *taglist = NULL;
2268   guint tmp;
2269
2270   gst_event_parse_tag (event, &taglist);
2271
2272   if (gst_tag_list_get_uint (taglist, GST_TAG_MINIMUM_BITRATE, &tmp))
2273     parse->priv->post_min_bitrate = FALSE;
2274   if (gst_tag_list_get_uint (taglist, GST_TAG_BITRATE, &tmp))
2275     parse->priv->post_avg_bitrate = FALSE;
2276   if (gst_tag_list_get_uint (taglist, GST_TAG_MAXIMUM_BITRATE, &tmp))
2277     parse->priv->post_max_bitrate = FALSE;
2278 }
2279
2280 /**
2281  * gst_base_parse_sink_setcaps:
2282  * @pad: #GstPad.
2283  * @caps: #GstCaps.
2284  *
2285  * Returns: TRUE if caps were accepted.
2286  */
2287 static gboolean
2288 gst_base_parse_sink_setcaps (GstPad * pad, GstCaps * caps)
2289 {
2290   GstBaseParse *parse;
2291   GstBaseParseClass *klass;
2292   gboolean res = TRUE;
2293
2294   parse = GST_BASE_PARSE (GST_PAD_PARENT (pad));
2295   klass = GST_BASE_PARSE_GET_CLASS (parse);
2296
2297   GST_DEBUG_OBJECT (parse, "caps: %" GST_PTR_FORMAT, caps);
2298
2299   if (klass->set_sink_caps)
2300     res = klass->set_sink_caps (parse, caps);
2301
2302   return res && gst_pad_set_caps (pad, caps);
2303 }