Merge branch 'master' into 0.11
[platform/upstream/gstreamer.git] / ext / ogg / gstoggmux.c
1 /* OGG muxer plugin for GStreamer
2  * Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
3  * Copyright (C) 2006 Thomas Vander Stichele <thomas at apestaart dot org>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * SECTION:element-oggmux
23  * @see_also: <link linkend="gst-plugins-base-plugins-oggdemux">oggdemux</link>
24  *
25  * This element merges streams (audio and video) into ogg files.
26  *
27  * <refsect2>
28  * <title>Example pipelines</title>
29  * |[
30  * gst-launch v4l2src num-buffers=500 ! video/x-raw-yuv,width=320,height=240 ! ffmpegcolorspace ! theoraenc ! oggmux ! filesink location=video.ogg
31  * ]| Encodes a video stream captured from a v4l2-compatible camera to Ogg/Theora
32  * (the encoding will stop automatically after 500 frames)
33  * </refsect2>
34  *
35  * Last reviewed on 2008-02-06 (0.10.17)
36  */
37
38 #ifdef HAVE_CONFIG_H
39 #include "config.h"
40 #endif
41
42 #include <gst/gst.h>
43 #include <gst/base/gstcollectpads.h>
44 #include <gst/tag/tag.h>
45
46 #include "gstoggmux.h"
47
48 /* memcpy - if someone knows a way to get rid of it, please speak up
49  * note: the ogg docs even say you need this... */
50 #include <string.h>
51 #include <time.h>
52 #include <stdlib.h>             /* rand, srand, atoi */
53
54 GST_DEBUG_CATEGORY_STATIC (gst_ogg_mux_debug);
55 #define GST_CAT_DEFAULT gst_ogg_mux_debug
56
57 /* This isn't generally what you'd want with an end-time macro, because
58    technically the end time of a buffer with invalid duration is invalid. But
59    for sorting ogg pages this is what we want. */
60 #define GST_BUFFER_END_TIME(buf) \
61     (GST_BUFFER_DURATION_IS_VALID (buf) \
62     ? GST_BUFFER_TIMESTAMP (buf) + GST_BUFFER_DURATION (buf) \
63     : GST_BUFFER_TIMESTAMP (buf))
64
65 #define GST_BUFFER_RUNNING_TIME(buf, oggpad) \
66     (GST_BUFFER_DURATION_IS_VALID (buf) \
67     ? gst_segment_to_running_time (&(oggpad)->segment, GST_FORMAT_TIME, \
68     GST_BUFFER_TIMESTAMP (buf)) : 0)
69
70 #define GST_GP_FORMAT "[gp %8" G_GINT64_FORMAT "]"
71 #define GST_GP_CAST(_gp) ((gint64) _gp)
72
73 typedef enum
74 {
75   GST_OGG_FLAG_BOS = GST_ELEMENT_FLAG_LAST,
76   GST_OGG_FLAG_EOS
77 }
78 GstOggFlag;
79
80 /* OggMux signals and args */
81 enum
82 {
83   /* FILL ME */
84   LAST_SIGNAL
85 };
86
87 /* set to 0.5 seconds by default */
88 #define DEFAULT_MAX_DELAY       G_GINT64_CONSTANT(500000000)
89 #define DEFAULT_MAX_PAGE_DELAY  G_GINT64_CONSTANT(500000000)
90 enum
91 {
92   ARG_0,
93   ARG_MAX_DELAY,
94   ARG_MAX_PAGE_DELAY,
95 };
96
97 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
98     GST_PAD_SRC,
99     GST_PAD_ALWAYS,
100     GST_STATIC_CAPS ("application/ogg")
101     );
102
103 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink_%d",
104     GST_PAD_SINK,
105     GST_PAD_REQUEST,
106     GST_STATIC_CAPS ("video/x-theora; "
107         "audio/x-vorbis; audio/x-flac; audio/x-speex; audio/x-celt; "
108         "application/x-ogm-video; application/x-ogm-audio; video/x-dirac; "
109         "video/x-smoke; video/x-vp8; text/x-cmml, encoded = (boolean) TRUE; "
110         "subtitle/x-kate; application/x-kate")
111     );
112
113 static void gst_ogg_mux_base_init (gpointer g_class);
114 static void gst_ogg_mux_class_init (GstOggMuxClass * klass);
115 static void gst_ogg_mux_init (GstOggMux * ogg_mux);
116 static void gst_ogg_mux_finalize (GObject * object);
117
118 static GstFlowReturn
119 gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux);
120 static gboolean gst_ogg_mux_handle_src_event (GstPad * pad, GstEvent * event);
121 static GstPad *gst_ogg_mux_request_new_pad (GstElement * element,
122     GstPadTemplate * templ, const gchar * name);
123 static void gst_ogg_mux_release_pad (GstElement * element, GstPad * pad);
124
125 static void gst_ogg_mux_set_property (GObject * object,
126     guint prop_id, const GValue * value, GParamSpec * pspec);
127 static void gst_ogg_mux_get_property (GObject * object,
128     guint prop_id, GValue * value, GParamSpec * pspec);
129 static GstStateChangeReturn gst_ogg_mux_change_state (GstElement * element,
130     GstStateChange transition);
131
132 static GstElementClass *parent_class = NULL;
133
134 /*static guint gst_ogg_mux_signals[LAST_SIGNAL] = { 0 }; */
135
136 GType
137 gst_ogg_mux_get_type (void)
138 {
139   static GType ogg_mux_type = 0;
140
141   if (G_UNLIKELY (ogg_mux_type == 0)) {
142     static const GTypeInfo ogg_mux_info = {
143       sizeof (GstOggMuxClass),
144       gst_ogg_mux_base_init,
145       NULL,
146       (GClassInitFunc) gst_ogg_mux_class_init,
147       NULL,
148       NULL,
149       sizeof (GstOggMux),
150       0,
151       (GInstanceInitFunc) gst_ogg_mux_init,
152     };
153     static const GInterfaceInfo preset_info = {
154       NULL,
155       NULL,
156       NULL
157     };
158
159     ogg_mux_type =
160         g_type_register_static (GST_TYPE_ELEMENT, "GstOggMux", &ogg_mux_info,
161         0);
162
163     g_type_add_interface_static (ogg_mux_type, GST_TYPE_PRESET, &preset_info);
164   }
165   return ogg_mux_type;
166 }
167
168 static void
169 gst_ogg_mux_base_init (gpointer g_class)
170 {
171   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
172
173   gst_element_class_add_pad_template (element_class,
174       gst_static_pad_template_get (&src_factory));
175   gst_element_class_add_pad_template (element_class,
176       gst_static_pad_template_get (&sink_factory));
177
178   gst_element_class_set_details_simple (element_class,
179       "Ogg muxer", "Codec/Muxer",
180       "mux ogg streams (info about ogg: http://xiph.org)",
181       "Wim Taymans <wim@fluendo.com>");
182 }
183
184 static void
185 gst_ogg_mux_class_init (GstOggMuxClass * klass)
186 {
187   GObjectClass *gobject_class;
188   GstElementClass *gstelement_class;
189
190   gobject_class = (GObjectClass *) klass;
191   gstelement_class = (GstElementClass *) klass;
192
193   parent_class = g_type_class_peek_parent (klass);
194
195   gobject_class->finalize = gst_ogg_mux_finalize;
196   gobject_class->get_property = gst_ogg_mux_get_property;
197   gobject_class->set_property = gst_ogg_mux_set_property;
198
199   gstelement_class->request_new_pad = gst_ogg_mux_request_new_pad;
200   gstelement_class->release_pad = gst_ogg_mux_release_pad;
201
202   g_object_class_install_property (gobject_class, ARG_MAX_DELAY,
203       g_param_spec_uint64 ("max-delay", "Max delay",
204           "Maximum delay in multiplexing streams", 0, G_MAXUINT64,
205           DEFAULT_MAX_DELAY,
206           (GParamFlags) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
207   g_object_class_install_property (gobject_class, ARG_MAX_PAGE_DELAY,
208       g_param_spec_uint64 ("max-page-delay", "Max page delay",
209           "Maximum delay for sending out a page", 0, G_MAXUINT64,
210           DEFAULT_MAX_PAGE_DELAY,
211           (GParamFlags) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
212
213   gstelement_class->change_state = gst_ogg_mux_change_state;
214
215 }
216
217 #if 0
218 static const GstEventMask *
219 gst_ogg_mux_get_sink_event_masks (GstPad * pad)
220 {
221   static const GstEventMask gst_ogg_mux_sink_event_masks[] = {
222     {GST_EVENT_EOS, 0},
223     {GST_EVENT_DISCONTINUOUS, 0},
224     {0,}
225   };
226
227   return gst_ogg_mux_sink_event_masks;
228 }
229 #endif
230
231 static void
232 gst_ogg_mux_clear (GstOggMux * ogg_mux)
233 {
234   ogg_mux->pulling = NULL;
235   ogg_mux->need_headers = TRUE;
236   ogg_mux->delta_pad = NULL;
237   ogg_mux->offset = 0;
238   ogg_mux->next_ts = 0;
239   ogg_mux->last_ts = GST_CLOCK_TIME_NONE;
240 }
241
242 static void
243 gst_ogg_mux_init (GstOggMux * ogg_mux)
244 {
245   GstElementClass *klass = GST_ELEMENT_GET_CLASS (ogg_mux);
246
247   ogg_mux->srcpad =
248       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
249           "src"), "src");
250   gst_pad_set_event_function (ogg_mux->srcpad, gst_ogg_mux_handle_src_event);
251   gst_element_add_pad (GST_ELEMENT (ogg_mux), ogg_mux->srcpad);
252
253   GST_OBJECT_FLAG_SET (GST_ELEMENT (ogg_mux), GST_OGG_FLAG_BOS);
254
255   /* seed random number generator for creation of serial numbers */
256   srand (time (NULL));
257
258   ogg_mux->collect = gst_collect_pads_new ();
259   gst_collect_pads_set_function (ogg_mux->collect,
260       (GstCollectPadsFunction) GST_DEBUG_FUNCPTR (gst_ogg_mux_collected),
261       ogg_mux);
262
263   ogg_mux->max_delay = DEFAULT_MAX_DELAY;
264   ogg_mux->max_page_delay = DEFAULT_MAX_PAGE_DELAY;
265
266   gst_ogg_mux_clear (ogg_mux);
267 }
268
269 static void
270 gst_ogg_mux_finalize (GObject * object)
271 {
272   GstOggMux *ogg_mux;
273
274   ogg_mux = GST_OGG_MUX (object);
275
276   if (ogg_mux->collect) {
277     gst_object_unref (ogg_mux->collect);
278     ogg_mux->collect = NULL;
279   }
280
281   G_OBJECT_CLASS (parent_class)->finalize (object);
282 }
283
284 static void
285 gst_ogg_mux_ogg_pad_destroy_notify (GstCollectData * data)
286 {
287   GstOggPadData *oggpad = (GstOggPadData *) data;
288   GstBuffer *buf;
289
290   ogg_stream_clear (&oggpad->map.stream);
291   gst_caps_replace (&oggpad->map.caps, NULL);
292
293   if (oggpad->pagebuffers) {
294     while ((buf = g_queue_pop_head (oggpad->pagebuffers)) != NULL) {
295       gst_buffer_unref (buf);
296     }
297     g_queue_free (oggpad->pagebuffers);
298     oggpad->pagebuffers = NULL;
299   }
300 }
301
302 static GstPadLinkReturn
303 gst_ogg_mux_sinkconnect (GstPad * pad, GstPad * peer)
304 {
305   GstOggMux *ogg_mux;
306
307   ogg_mux = GST_OGG_MUX (gst_pad_get_parent (pad));
308
309   GST_DEBUG_OBJECT (ogg_mux, "sinkconnect triggered on %s", GST_PAD_NAME (pad));
310
311   gst_object_unref (ogg_mux);
312
313   return GST_PAD_LINK_OK;
314 }
315
316 static gboolean
317 gst_ogg_mux_sink_event (GstPad * pad, GstEvent * event)
318 {
319   GstOggMux *ogg_mux = GST_OGG_MUX (gst_pad_get_parent (pad));
320   GstOggPadData *ogg_pad = (GstOggPadData *) gst_pad_get_element_private (pad);
321   gboolean ret = FALSE;
322
323   GST_DEBUG_OBJECT (pad, "Got %s event", GST_EVENT_TYPE_NAME (event));
324
325   switch (GST_EVENT_TYPE (event)) {
326     case GST_EVENT_NEWSEGMENT:{
327       gboolean update;
328       gdouble rate;
329       gdouble applied_rate;
330       GstFormat format;
331       gint64 start, stop, position;
332
333       gst_event_parse_new_segment_full (event, &update, &rate,
334           &applied_rate, &format, &start, &stop, &position);
335
336       /* We don't support non time NEWSEGMENT events */
337       if (format != GST_FORMAT_TIME) {
338         gst_event_unref (event);
339         event = NULL;
340         break;
341       }
342
343       gst_segment_set_newsegment_full (&ogg_pad->segment, update, rate,
344           applied_rate, format, start, stop, position);
345
346       break;
347     }
348     case GST_EVENT_FLUSH_STOP:{
349       gst_segment_init (&ogg_pad->segment, GST_FORMAT_TIME);
350       break;
351     }
352     default:
353       break;
354   }
355
356   /* now GstCollectPads can take care of the rest, e.g. EOS */
357   if (event != NULL)
358     ret = ogg_pad->collect_event (pad, event);
359
360   gst_object_unref (ogg_mux);
361   return ret;
362 }
363
364 static gboolean
365 gst_ogg_mux_is_serialno_present (GstOggMux * ogg_mux, guint32 serialno)
366 {
367   GSList *walk;
368
369   walk = ogg_mux->collect->data;
370   while (walk) {
371     GstOggPadData *pad = (GstOggPadData *) walk->data;
372     if (pad->map.serialno == serialno)
373       return TRUE;
374     walk = walk->next;
375   }
376
377   return FALSE;
378 }
379
380 static guint32
381 gst_ogg_mux_generate_serialno (GstOggMux * ogg_mux)
382 {
383   guint32 serialno;
384
385   do {
386     serialno = g_random_int_range (0, G_MAXINT32);
387   } while (gst_ogg_mux_is_serialno_present (ogg_mux, serialno));
388
389   return serialno;
390 }
391
392 static GstPad *
393 gst_ogg_mux_request_new_pad (GstElement * element,
394     GstPadTemplate * templ, const gchar * req_name)
395 {
396   GstOggMux *ogg_mux;
397   GstPad *newpad;
398   GstElementClass *klass;
399
400   g_return_val_if_fail (templ != NULL, NULL);
401
402   if (templ->direction != GST_PAD_SINK)
403     goto wrong_direction;
404
405   g_return_val_if_fail (GST_IS_OGG_MUX (element), NULL);
406   ogg_mux = GST_OGG_MUX (element);
407
408   klass = GST_ELEMENT_GET_CLASS (element);
409
410   if (templ != gst_element_class_get_pad_template (klass, "sink_%d"))
411     goto wrong_template;
412
413   {
414     gint serial;
415     gchar *name;
416
417     if (req_name == NULL || strlen (req_name) < 6) {
418       /* no name given when requesting the pad, use random serial number */
419       serial = gst_ogg_mux_generate_serialno (ogg_mux);
420     } else {
421       /* parse serial number from requested padname */
422       serial = atoi (&req_name[5]);
423     }
424     /* create new pad with the name */
425     GST_DEBUG_OBJECT (ogg_mux, "Creating new pad for serial %d", serial);
426     name = g_strdup_printf ("sink_%d", serial);
427     newpad = gst_pad_new_from_template (templ, name);
428     g_free (name);
429
430     /* construct our own wrapper data structure for the pad to
431      * keep track of its status */
432     {
433       GstOggPadData *oggpad;
434
435       oggpad = (GstOggPadData *)
436           gst_collect_pads_add_pad_full (ogg_mux->collect, newpad,
437           sizeof (GstOggPadData), gst_ogg_mux_ogg_pad_destroy_notify);
438       ogg_mux->active_pads++;
439
440       oggpad->map.serialno = serial;
441       ogg_stream_init (&oggpad->map.stream, oggpad->map.serialno);
442       oggpad->packetno = 0;
443       oggpad->pageno = 0;
444       oggpad->eos = FALSE;
445       /* we assume there will be some control data first for this pad */
446       oggpad->state = GST_OGG_PAD_STATE_CONTROL;
447       oggpad->new_page = TRUE;
448       oggpad->first_delta = FALSE;
449       oggpad->prev_delta = FALSE;
450       oggpad->data_pushed = FALSE;
451       oggpad->pagebuffers = g_queue_new ();
452       oggpad->map.headers = NULL;
453       oggpad->map.queued = NULL;
454
455       gst_segment_init (&oggpad->segment, GST_FORMAT_TIME);
456
457       oggpad->collect_event = (GstPadEventFunction) GST_PAD_EVENTFUNC (newpad);
458       gst_pad_set_event_function (newpad,
459           GST_DEBUG_FUNCPTR (gst_ogg_mux_sink_event));
460     }
461   }
462
463   /* setup some pad functions */
464   gst_pad_set_link_function (newpad, gst_ogg_mux_sinkconnect);
465
466   /* dd the pad to the element */
467   gst_element_add_pad (element, newpad);
468
469   return newpad;
470
471   /* ERRORS */
472 wrong_direction:
473   {
474     g_warning ("ogg_mux: request pad that is not a SINK pad\n");
475     return NULL;
476   }
477 wrong_template:
478   {
479     g_warning ("ogg_mux: this is not our template!\n");
480     return NULL;
481   }
482 }
483
484 static void
485 gst_ogg_mux_release_pad (GstElement * element, GstPad * pad)
486 {
487   GstOggMux *ogg_mux;
488
489   ogg_mux = GST_OGG_MUX (gst_pad_get_parent (pad));
490
491   gst_collect_pads_remove_pad (ogg_mux->collect, pad);
492   gst_element_remove_pad (element, pad);
493
494   gst_object_unref (ogg_mux);
495 }
496
497 /* handle events */
498 static gboolean
499 gst_ogg_mux_handle_src_event (GstPad * pad, GstEvent * event)
500 {
501   GstEventType type;
502
503   type = event ? GST_EVENT_TYPE (event) : GST_EVENT_UNKNOWN;
504
505   switch (type) {
506     case GST_EVENT_SEEK:
507       /* disable seeking for now */
508       return FALSE;
509     default:
510       break;
511   }
512
513   return gst_pad_event_default (pad, event);
514 }
515
516 static GstBuffer *
517 gst_ogg_mux_buffer_from_page (GstOggMux * mux, ogg_page * page, gboolean delta)
518 {
519   GstBuffer *buffer;
520
521   /* allocate space for header and body */
522   buffer = gst_buffer_new_and_alloc (page->header_len + page->body_len);
523   gst_buffer_fill (buffer, 0, page->header, page->header_len);
524   gst_buffer_fill (buffer, page->header_len, page->body, page->body_len);
525
526   /* Here we set granulepos as our OFFSET_END to give easy direct access to
527    * this value later. Before we push it, we reset this to OFFSET + SIZE
528    * (see gst_ogg_mux_push_buffer). */
529   GST_BUFFER_OFFSET_END (buffer) = ogg_page_granulepos (page);
530   if (delta)
531     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
532
533   GST_LOG_OBJECT (mux, GST_GP_FORMAT
534       " created buffer %p from ogg page",
535       GST_GP_CAST (ogg_page_granulepos (page)), buffer);
536
537   return buffer;
538 }
539
540 static GstFlowReturn
541 gst_ogg_mux_push_buffer (GstOggMux * mux, GstBuffer * buffer,
542     GstOggPadData * oggpad)
543 {
544   GstCaps *caps;
545
546   /* fix up OFFSET and OFFSET_END again */
547   GST_BUFFER_OFFSET (buffer) = mux->offset;
548   mux->offset += gst_buffer_get_size (buffer);
549   GST_BUFFER_OFFSET_END (buffer) = mux->offset;
550
551   /* Ensure we have monotonically increasing timestamps in the output. */
552   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)) {
553     gint64 run_time = GST_BUFFER_RUNNING_TIME (buffer, oggpad);
554     if (mux->last_ts != GST_CLOCK_TIME_NONE && run_time < mux->last_ts)
555       GST_BUFFER_TIMESTAMP (buffer) = mux->last_ts;
556     else
557       mux->last_ts = run_time;
558   }
559
560   caps = gst_pad_get_negotiated_caps (mux->srcpad);
561   gst_buffer_set_caps (buffer, caps);
562   if (caps)
563     gst_caps_unref (caps);
564
565   return gst_pad_push (mux->srcpad, buffer);
566 }
567
568 /* if all queues have at least one page, dequeue the page with the lowest
569  * timestamp */
570 static gboolean
571 gst_ogg_mux_dequeue_page (GstOggMux * mux, GstFlowReturn * flowret)
572 {
573   GSList *walk;
574   GstOggPadData *opad = NULL;   /* "oldest" pad */
575   GstClockTime oldest = GST_CLOCK_TIME_NONE;
576   GstBuffer *buf = NULL;
577   gboolean ret = FALSE;
578
579   *flowret = GST_FLOW_OK;
580
581   walk = mux->collect->data;
582   while (walk) {
583     GstOggPadData *pad = (GstOggPadData *) walk->data;
584
585     /* We need each queue to either be at EOS, or have one or more pages
586      * available with a set granulepos (i.e. not -1), otherwise we don't have
587      * enough data yet to determine which stream needs to go next for correct
588      * time ordering. */
589     if (pad->pagebuffers->length == 0) {
590       if (pad->eos) {
591         GST_LOG_OBJECT (pad->collect.pad,
592             "pad is EOS, skipping for dequeue decision");
593       } else {
594         GST_LOG_OBJECT (pad->collect.pad,
595             "no pages in this queue, can't dequeue");
596         return FALSE;
597       }
598     } else {
599       /* We then need to check for a non-negative granulepos */
600       int i;
601       gboolean valid = FALSE;
602
603       for (i = 0; i < pad->pagebuffers->length; i++) {
604         buf = g_queue_peek_nth (pad->pagebuffers, i);
605         /* Here we check the OFFSET_END, which is actually temporarily the
606          * granulepos value for this buffer */
607         if (GST_BUFFER_OFFSET_END (buf) != -1) {
608           valid = TRUE;
609           break;
610         }
611       }
612       if (!valid) {
613         GST_LOG_OBJECT (pad->collect.pad,
614             "No page timestamps in queue, can't dequeue");
615         return FALSE;
616       }
617     }
618
619     walk = g_slist_next (walk);
620   }
621
622   walk = mux->collect->data;
623   while (walk) {
624     GstOggPadData *pad = (GstOggPadData *) walk->data;
625
626     /* any page with a granulepos of -1 can be pushed immediately.
627      * TODO: it CAN be, but it seems silly to do so? */
628     buf = g_queue_peek_head (pad->pagebuffers);
629     while (buf && GST_BUFFER_OFFSET_END (buf) == -1) {
630       GST_LOG_OBJECT (pad->collect.pad, "[gp        -1] pushing page");
631       g_queue_pop_head (pad->pagebuffers);
632       *flowret = gst_ogg_mux_push_buffer (mux, buf, pad);
633       buf = g_queue_peek_head (pad->pagebuffers);
634       ret = TRUE;
635     }
636
637     if (buf) {
638       /* if no oldest buffer yet, take this one */
639       if (oldest == GST_CLOCK_TIME_NONE) {
640         GST_LOG_OBJECT (mux, "no oldest yet, taking buffer %p from pad %"
641             GST_PTR_FORMAT " with gp time %" GST_TIME_FORMAT,
642             buf, pad->collect.pad, GST_TIME_ARGS (GST_BUFFER_OFFSET (buf)));
643         oldest = GST_BUFFER_OFFSET (buf);
644         opad = pad;
645       } else {
646         /* if we have an oldest, compare with this one */
647         if (GST_BUFFER_OFFSET (buf) < oldest) {
648           GST_LOG_OBJECT (mux, "older buffer %p, taking from pad %"
649               GST_PTR_FORMAT " with gp time %" GST_TIME_FORMAT,
650               buf, pad->collect.pad, GST_TIME_ARGS (GST_BUFFER_OFFSET (buf)));
651           oldest = GST_BUFFER_OFFSET (buf);
652           opad = pad;
653         }
654       }
655     }
656     walk = g_slist_next (walk);
657   }
658
659   if (oldest != GST_CLOCK_TIME_NONE) {
660     g_assert (opad);
661     buf = g_queue_pop_head (opad->pagebuffers);
662     GST_LOG_OBJECT (opad->collect.pad,
663         GST_GP_FORMAT " pushing oldest page buffer %p (granulepos time %"
664         GST_TIME_FORMAT ")", GST_BUFFER_OFFSET_END (buf), buf,
665         GST_TIME_ARGS (GST_BUFFER_OFFSET (buf)));
666     *flowret = gst_ogg_mux_push_buffer (mux, buf, opad);
667     ret = TRUE;
668   }
669
670   return ret;
671 }
672
673 /* put the given ogg page on a per-pad queue, timestamping it correctly.
674  * after that, dequeue and push as many pages as possible.
675  * Caller should make sure:
676  * pad->timestamp     was set with the timestamp of the first packet put
677  *                    on the page
678  * pad->timestamp_end was set with the timestamp + duration of the last packet
679  *                    put on the page
680  * pad->gp_time       was set with the time matching the gp of the last
681  *                    packet put on the page
682  *
683  * will also reset timestamp and timestamp_end, so caller func can restart
684  * counting.
685  */
686 static GstFlowReturn
687 gst_ogg_mux_pad_queue_page (GstOggMux * mux, GstOggPadData * pad,
688     ogg_page * page, gboolean delta)
689 {
690   GstFlowReturn ret;
691   GstBuffer *buffer = gst_ogg_mux_buffer_from_page (mux, page, delta);
692
693   /* take the timestamp of the first packet on this page */
694   GST_BUFFER_TIMESTAMP (buffer) = pad->timestamp;
695   GST_BUFFER_DURATION (buffer) = pad->timestamp_end - pad->timestamp;
696   /* take the gp time of the last completed packet on this page */
697   GST_BUFFER_OFFSET (buffer) = pad->gp_time;
698
699   /* the next page will start where the current page's end time leaves off */
700   pad->timestamp = pad->timestamp_end;
701
702   g_queue_push_tail (pad->pagebuffers, buffer);
703   GST_LOG_OBJECT (pad->collect.pad, GST_GP_FORMAT
704       " queued buffer page %p (gp time %"
705       GST_TIME_FORMAT ", timestamp %" GST_TIME_FORMAT
706       "), %d page buffers queued", GST_GP_CAST (ogg_page_granulepos (page)),
707       buffer, GST_TIME_ARGS (GST_BUFFER_OFFSET (buffer)),
708       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
709       g_queue_get_length (pad->pagebuffers));
710
711   while (gst_ogg_mux_dequeue_page (mux, &ret)) {
712     if (ret != GST_FLOW_OK)
713       break;
714   }
715
716   return ret;
717 }
718
719 /*
720  * Given two pads, compare the buffers queued on it.
721  * Returns:
722  *  0 if they have an equal priority
723  * -1 if the first is better
724  *  1 if the second is better
725  * Priority decided by: a) validity, b) older timestamp, c) smaller number
726  * of muxed pages
727  */
728 static gint
729 gst_ogg_mux_compare_pads (GstOggMux * ogg_mux, GstOggPadData * first,
730     GstOggPadData * second)
731 {
732   guint64 firsttime, secondtime;
733
734   /* if the first pad doesn't contain anything or is even NULL, return
735    * the second pad as best candidate and vice versa */
736   if (first == NULL || (first->buffer == NULL && first->next_buffer == NULL))
737     return 1;
738   if (second == NULL || (second->buffer == NULL && second->next_buffer == NULL))
739     return -1;
740
741   /* no timestamp on first buffer, it must go first */
742   if (first->buffer)
743     firsttime = GST_BUFFER_TIMESTAMP (first->buffer);
744   else
745     firsttime = GST_BUFFER_TIMESTAMP (first->next_buffer);
746   if (firsttime == GST_CLOCK_TIME_NONE)
747     return -1;
748
749   /* no timestamp on second buffer, it must go first */
750   if (second->buffer)
751     secondtime = GST_BUFFER_TIMESTAMP (second->buffer);
752   else
753     secondtime = GST_BUFFER_TIMESTAMP (second->next_buffer);
754   if (secondtime == GST_CLOCK_TIME_NONE)
755     return 1;
756
757   firsttime = gst_segment_to_running_time (&first->segment, GST_FORMAT_TIME,
758       firsttime);
759   secondtime = gst_segment_to_running_time (&second->segment, GST_FORMAT_TIME,
760       secondtime);
761
762   /* first buffer has higher timestamp, second one should go first */
763   if (secondtime < firsttime)
764     return 1;
765   /* second buffer has higher timestamp, first one should go first */
766   else if (secondtime > firsttime)
767     return -1;
768   else {
769     /* buffers with equal timestamps, prefer the pad that has the
770      * least number of pages muxed */
771     if (second->pageno < first->pageno)
772       return 1;
773     else if (second->pageno > first->pageno)
774       return -1;
775   }
776
777   /* same priority if all of the above failed */
778   return 0;
779 }
780
781 /* make sure at least one buffer is queued on all pads, two if possible
782  * 
783  * if pad->buffer == NULL, pad->next_buffer !=  NULL, then
784  *   we do not know if the buffer is the last or not
785  * if pad->buffer != NULL, pad->next_buffer != NULL, then
786  *   pad->buffer is not the last buffer for the pad
787  * if pad->buffer != NULL, pad->next_buffer == NULL, then
788  *   pad->buffer if the last buffer for the pad
789  * 
790  * returns a pointer to an oggpad that holds the best buffer, or
791  * NULL when no pad was usable. "best" means the buffer marked
792  * with the lowest timestamp. If best->buffer == NULL then nothing
793  * should be done until more data arrives */
794 static GstOggPadData *
795 gst_ogg_mux_queue_pads (GstOggMux * ogg_mux)
796 {
797   GstOggPadData *bestpad = NULL, *still_hungry = NULL;
798   GSList *walk;
799
800   /* try to make sure we have a buffer from each usable pad first */
801   walk = ogg_mux->collect->data;
802   while (walk) {
803     GstOggPadData *pad;
804     GstCollectData *data;
805
806     data = (GstCollectData *) walk->data;
807     pad = (GstOggPadData *) data;
808
809     walk = g_slist_next (walk);
810
811     GST_LOG_OBJECT (data->pad, "looking at pad for buffer");
812
813     /* try to get a new buffer for this pad if needed and possible */
814     if (pad->buffer == NULL) {
815       GstBuffer *buf;
816
817       /* shift the buffer along if needed (it's okay if next_buffer is NULL) */
818       if (pad->buffer == NULL) {
819         GST_LOG_OBJECT (data->pad, "shifting buffer %" GST_PTR_FORMAT,
820             pad->next_buffer);
821         pad->buffer = pad->next_buffer;
822         pad->next_buffer = NULL;
823       }
824
825       buf = gst_collect_pads_pop (ogg_mux->collect, data);
826       GST_LOG_OBJECT (data->pad, "popped buffer %" GST_PTR_FORMAT, buf);
827
828       /* On EOS we get a NULL buffer */
829       if (buf != NULL) {
830         if (ogg_mux->delta_pad == NULL &&
831             GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT))
832           ogg_mux->delta_pad = pad;
833
834         /* if we need headers */
835         if (pad->state == GST_OGG_PAD_STATE_CONTROL) {
836           /* and we have one */
837           ogg_packet packet;
838           gboolean is_header;
839           gsize size;
840
841           packet.packet = gst_buffer_map (buf, &size, NULL, GST_MAP_READ);
842           packet.bytes = size;
843
844           if (GST_BUFFER_OFFSET_END_IS_VALID (buf))
845             packet.granulepos = GST_BUFFER_OFFSET_END (buf);
846           else
847             packet.granulepos = 0;
848
849           /* if we're not yet in data mode, ensure we're setup on the first packet */
850           if (!pad->have_type) {
851             pad->have_type = gst_ogg_stream_setup_map (&pad->map, &packet);
852             if (!pad->have_type) {
853               GST_ERROR_OBJECT (pad, "mapper didn't recognise input stream "
854                   "(pad caps: %" GST_PTR_FORMAT ")", GST_PAD_CAPS (pad));
855             } else {
856               GST_DEBUG_OBJECT (pad, "caps detected: %" GST_PTR_FORMAT,
857                   pad->map.caps);
858             }
859           }
860
861           if (pad->have_type)
862             is_header = gst_ogg_stream_packet_is_header (&pad->map, &packet);
863           else                  /* fallback (FIXME 0.11: remove IN_CAPS hack) */
864             is_header = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS);
865
866           gst_buffer_unmap (buf, packet.packet, size);
867
868           if (is_header) {
869             GST_DEBUG_OBJECT (ogg_mux,
870                 "got header buffer in control state, ignoring");
871             /* just ignore */
872             pad->map.n_header_packets_seen++;
873             gst_buffer_unref (buf);
874             buf = NULL;
875           } else {
876             GST_DEBUG_OBJECT (ogg_mux,
877                 "got data buffer in control state, switching to data mode");
878             /* this is a data buffer so switch to data state */
879             pad->state = GST_OGG_PAD_STATE_DATA;
880           }
881         }
882       } else {
883         GST_DEBUG_OBJECT (data->pad, "EOS on pad");
884         if (!pad->eos) {
885           ogg_page page;
886
887           /* it's no longer active */
888           ogg_mux->active_pads--;
889
890           /* Just gone to EOS. Flush existing page(s) */
891           pad->eos = TRUE;
892
893           while (ogg_stream_flush (&pad->map.stream, &page)) {
894             /* Place page into the per-pad queue */
895             gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page, pad->first_delta);
896             /* increment the page number counter */
897             pad->pageno++;
898             /* mark other pages as delta */
899             pad->first_delta = TRUE;
900           }
901         }
902       }
903
904       pad->next_buffer = buf;
905     }
906
907     /* we should have a buffer now, see if it is the best pad to
908      * pull on */
909     if (pad->buffer || pad->next_buffer) {
910       if (gst_ogg_mux_compare_pads (ogg_mux, bestpad, pad) > 0) {
911         GST_LOG_OBJECT (data->pad,
912             "new best pad, with buffers %" GST_PTR_FORMAT
913             " and %" GST_PTR_FORMAT, pad->buffer, pad->next_buffer);
914
915         bestpad = pad;
916       }
917     } else if (!pad->eos) {
918       GST_LOG_OBJECT (data->pad, "hungry pad");
919       still_hungry = pad;
920     }
921   }
922
923   if (still_hungry)
924     /* drop back into collectpads... */
925     return still_hungry;
926   else
927     return bestpad;
928 }
929
930 static GList *
931 gst_ogg_mux_get_headers (GstOggPadData * pad)
932 {
933   GList *res = NULL;
934   GstStructure *structure;
935   GstCaps *caps;
936   GstPad *thepad;
937
938   thepad = pad->collect.pad;
939
940   GST_LOG_OBJECT (thepad, "getting headers");
941
942   caps = gst_pad_get_negotiated_caps (thepad);
943   if (caps != NULL) {
944     const GValue *streamheader;
945
946     structure = gst_caps_get_structure (caps, 0);
947     streamheader = gst_structure_get_value (structure, "streamheader");
948     if (streamheader != NULL) {
949       GST_LOG_OBJECT (thepad, "got header");
950       if (G_VALUE_TYPE (streamheader) == GST_TYPE_ARRAY) {
951         GArray *bufarr = g_value_peek_pointer (streamheader);
952         gint i;
953
954         GST_LOG_OBJECT (thepad, "got fixed list");
955
956         for (i = 0; i < bufarr->len; i++) {
957           GValue *bufval = &g_array_index (bufarr, GValue, i);
958
959           GST_LOG_OBJECT (thepad, "item %d", i);
960           if (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER) {
961             GstBuffer *buf = g_value_peek_pointer (bufval);
962
963             GST_LOG_OBJECT (thepad, "adding item %d to header list", i);
964
965             gst_buffer_ref (buf);
966             res = g_list_append (res, buf);
967           }
968         }
969       } else {
970         GST_LOG_OBJECT (thepad, "streamheader is not fixed list");
971       }
972
973       /* Start a new page for every CMML buffer */
974       if (gst_structure_has_name (structure, "text/x-cmml"))
975         pad->always_flush_page = TRUE;
976     } else if (gst_structure_has_name (structure, "video/x-dirac")) {
977       res = g_list_append (res, pad->buffer);
978       pad->buffer = pad->next_buffer;
979       pad->next_buffer = NULL;
980       pad->always_flush_page = TRUE;
981     } else {
982       GST_LOG_OBJECT (thepad, "caps don't have streamheader");
983     }
984     gst_caps_unref (caps);
985   } else {
986     GST_LOG_OBJECT (thepad, "got empty caps as negotiated format");
987   }
988   return res;
989 }
990
991 static GstCaps *
992 gst_ogg_mux_set_header_on_caps (GstCaps * caps, GList * buffers)
993 {
994   GstStructure *structure;
995   GValue array = { 0 };
996   GList *walk = buffers;
997
998   caps = gst_caps_make_writable (caps);
999
1000   structure = gst_caps_get_structure (caps, 0);
1001
1002   /* put buffers in a fixed list */
1003   g_value_init (&array, GST_TYPE_ARRAY);
1004
1005   while (walk) {
1006     GstBuffer *buf = GST_BUFFER (walk->data);
1007     GstBuffer *copy;
1008     GValue value = { 0 };
1009
1010     walk = walk->next;
1011
1012     /* mark buffer */
1013     GST_LOG ("Setting IN_CAPS on buffer of length %d",
1014         gst_buffer_get_size (buf));
1015     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_IN_CAPS);
1016
1017     g_value_init (&value, GST_TYPE_BUFFER);
1018     copy = gst_buffer_copy (buf);
1019     gst_value_set_buffer (&value, copy);
1020     gst_buffer_unref (copy);
1021     gst_value_array_append_value (&array, &value);
1022     g_value_unset (&value);
1023   }
1024   gst_structure_set_value (structure, "streamheader", &array);
1025   g_value_unset (&array);
1026
1027   return caps;
1028 }
1029
1030 /*
1031  * For each pad we need to write out one (small) header in one
1032  * page that allows decoders to identify the type of the stream.
1033  * After that we need to write out all extra info for the decoders.
1034  * In the case of a codec that also needs data as configuration, we can
1035  * find that info in the streamcaps. 
1036  * After writing the headers we must start a new page for the data.
1037  */
1038 static GstFlowReturn
1039 gst_ogg_mux_send_headers (GstOggMux * mux)
1040 {
1041   GSList *walk;
1042   GList *hbufs, *hwalk;
1043   GstCaps *caps;
1044   GstFlowReturn ret;
1045
1046   hbufs = NULL;
1047   ret = GST_FLOW_OK;
1048
1049   GST_LOG_OBJECT (mux, "collecting headers");
1050
1051   walk = mux->collect->data;
1052   while (walk) {
1053     GstOggPadData *pad;
1054     GstPad *thepad;
1055
1056     pad = (GstOggPadData *) walk->data;
1057     thepad = pad->collect.pad;
1058
1059     walk = g_slist_next (walk);
1060
1061     GST_LOG_OBJECT (mux, "looking at pad %s:%s", GST_DEBUG_PAD_NAME (thepad));
1062
1063     /* if the pad has no buffer, we don't care */
1064     if (pad->buffer == NULL && pad->next_buffer == NULL)
1065       continue;
1066
1067     /* now figure out the headers */
1068     pad->map.headers = gst_ogg_mux_get_headers (pad);
1069   }
1070
1071   GST_LOG_OBJECT (mux, "creating BOS pages");
1072   walk = mux->collect->data;
1073   while (walk) {
1074     GstOggPadData *pad;
1075     GstBuffer *buf;
1076     ogg_packet packet;
1077     ogg_page page;
1078     GstPad *thepad;
1079     GstCaps *caps;
1080     GstStructure *structure;
1081     GstBuffer *hbuf;
1082     gsize size;
1083
1084     pad = (GstOggPadData *) walk->data;
1085     thepad = pad->collect.pad;
1086     caps = gst_pad_get_negotiated_caps (thepad);
1087     structure = gst_caps_get_structure (caps, 0);
1088
1089     walk = walk->next;
1090
1091     pad->packetno = 0;
1092
1093     GST_LOG_OBJECT (thepad, "looping over headers");
1094
1095     if (pad->map.headers) {
1096       buf = GST_BUFFER (pad->map.headers->data);
1097       pad->map.headers = g_list_remove (pad->map.headers, buf);
1098     } else if (pad->buffer) {
1099       buf = pad->buffer;
1100       gst_buffer_ref (buf);
1101     } else if (pad->next_buffer) {
1102       buf = pad->next_buffer;
1103       gst_buffer_ref (buf);
1104     } else {
1105       /* fixme -- should be caught in the previous list traversal. */
1106       GST_OBJECT_LOCK (thepad);
1107       g_critical ("No headers or buffers on pad %s:%s",
1108           GST_DEBUG_PAD_NAME (thepad));
1109       GST_OBJECT_UNLOCK (thepad);
1110       continue;
1111     }
1112
1113     /* create a packet from the buffer */
1114     packet.packet = gst_buffer_map (buf, &size, NULL, GST_MAP_READ);
1115     packet.bytes = size;
1116     packet.granulepos = GST_BUFFER_OFFSET_END (buf);
1117     if (packet.granulepos == -1)
1118       packet.granulepos = 0;
1119     /* mark BOS and packet number */
1120     packet.b_o_s = (pad->packetno == 0);
1121     packet.packetno = pad->packetno++;
1122     /* mark EOS */
1123     packet.e_o_s = 0;
1124
1125     /* swap the packet in */
1126     ogg_stream_packetin (&pad->map.stream, &packet);
1127
1128     gst_buffer_unmap (buf, packet.packet, size);
1129     gst_buffer_unref (buf);
1130
1131     GST_LOG_OBJECT (thepad, "flushing out BOS page");
1132     if (!ogg_stream_flush (&pad->map.stream, &page))
1133       g_critical ("Could not flush BOS page");
1134
1135     hbuf = gst_ogg_mux_buffer_from_page (mux, &page, FALSE);
1136
1137     GST_LOG_OBJECT (mux, "swapped out page with mime type %s",
1138         gst_structure_get_name (structure));
1139
1140     /* quick hack: put Theora, VP8 and Dirac video pages at the front.
1141      * Ideally, we would have a settable enum for which Ogg
1142      * profile we work with, and order based on that.
1143      * (FIXME: if there is more than one video stream, shouldn't we only put
1144      * one's BOS into the first page, followed by an audio stream's BOS, and
1145      * only then followed by the remaining video and audio streams?) */
1146     if (gst_structure_has_name (structure, "video/x-theora")) {
1147       GST_DEBUG_OBJECT (thepad, "putting %s page at the front", "Theora");
1148       hbufs = g_list_prepend (hbufs, hbuf);
1149     } else if (gst_structure_has_name (structure, "video/x-dirac")) {
1150       GST_DEBUG_OBJECT (thepad, "putting %s page at the front", "Dirac");
1151       hbufs = g_list_prepend (hbufs, hbuf);
1152       pad->always_flush_page = TRUE;
1153     } else if (gst_structure_has_name (structure, "video/x-vp8")) {
1154       GST_DEBUG_OBJECT (thepad, "putting %s page at the front", "VP8");
1155       hbufs = g_list_prepend (hbufs, hbuf);
1156     } else {
1157       hbufs = g_list_append (hbufs, hbuf);
1158     }
1159     gst_caps_unref (caps);
1160   }
1161
1162   GST_LOG_OBJECT (mux, "creating next headers");
1163   walk = mux->collect->data;
1164   while (walk) {
1165     GstOggPadData *pad;
1166     GstPad *thepad;
1167
1168     pad = (GstOggPadData *) walk->data;
1169     thepad = pad->collect.pad;
1170
1171     walk = walk->next;
1172
1173     GST_LOG_OBJECT (mux, "looping over headers for pad %s:%s",
1174         GST_DEBUG_PAD_NAME (thepad));
1175
1176     hwalk = pad->map.headers;
1177     while (hwalk) {
1178       GstBuffer *buf = GST_BUFFER (hwalk->data);
1179       ogg_packet packet;
1180       ogg_page page;
1181       gsize size;
1182
1183       hwalk = hwalk->next;
1184
1185       /* create a packet from the buffer */
1186       packet.packet = gst_buffer_map (buf, &size, NULL, GST_MAP_READ);
1187       packet.bytes = size;
1188       packet.granulepos = GST_BUFFER_OFFSET_END (buf);
1189       if (packet.granulepos == -1)
1190         packet.granulepos = 0;
1191       /* mark BOS and packet number */
1192       packet.b_o_s = (pad->packetno == 0);
1193       packet.packetno = pad->packetno++;
1194       /* mark EOS */
1195       packet.e_o_s = 0;
1196
1197       /* swap the packet in */
1198       ogg_stream_packetin (&pad->map.stream, &packet);
1199       gst_buffer_unmap (buf, packet.packet, size);
1200       gst_buffer_unref (buf);
1201
1202       /* if last header, flush page */
1203       if (hwalk == NULL) {
1204         GST_LOG_OBJECT (mux,
1205             "flushing page as packet %" G_GUINT64_FORMAT " is first or "
1206             "last packet", (guint64) packet.packetno);
1207         while (ogg_stream_flush (&pad->map.stream, &page)) {
1208           GstBuffer *hbuf = gst_ogg_mux_buffer_from_page (mux, &page, FALSE);
1209
1210           GST_LOG_OBJECT (mux, "swapped out page");
1211           hbufs = g_list_append (hbufs, hbuf);
1212         }
1213       } else {
1214         GST_LOG_OBJECT (mux, "try to swap out page");
1215         /* just try to swap out a page then */
1216         while (ogg_stream_pageout (&pad->map.stream, &page) > 0) {
1217           GstBuffer *hbuf = gst_ogg_mux_buffer_from_page (mux, &page, FALSE);
1218
1219           GST_LOG_OBJECT (mux, "swapped out page");
1220           hbufs = g_list_append (hbufs, hbuf);
1221         }
1222       }
1223     }
1224     g_list_free (pad->map.headers);
1225     pad->map.headers = NULL;
1226   }
1227   /* hbufs holds all buffers for the headers now */
1228
1229   /* create caps with the buffers */
1230   caps = gst_pad_get_caps (mux->srcpad);
1231   if (caps) {
1232     caps = gst_ogg_mux_set_header_on_caps (caps, hbufs);
1233     gst_pad_set_caps (mux->srcpad, caps);
1234     gst_caps_unref (caps);
1235   }
1236   /* and send the buffers */
1237   while (hbufs != NULL) {
1238     GstBuffer *buf = GST_BUFFER (hbufs->data);
1239
1240     hbufs = g_list_delete_link (hbufs, hbufs);
1241
1242     if ((ret = gst_ogg_mux_push_buffer (mux, buf, NULL)) != GST_FLOW_OK)
1243       break;
1244   }
1245   /* free any remaining nodes/buffers in case we couldn't push them */
1246   g_list_foreach (hbufs, (GFunc) gst_mini_object_unref, NULL);
1247   g_list_free (hbufs);
1248
1249   return ret;
1250 }
1251
1252 /* this function is called to process data on the best pending pad.
1253  *
1254  * basic idea:
1255  *
1256  * 1) store the selected pad and keep on pulling until we fill a
1257  *    complete ogg page or the ogg page is filled above the max-delay
1258  *    threshold. This is needed because the ogg spec says that
1259  *    you should fill a complete page with data from the same logical
1260  *    stream. When the page is filled, go back to 1).
1261  * 2) before filling a page, read ahead one more buffer to see if this
1262  *    packet is the last of the stream. We need to do this because the ogg
1263  *    spec mandates that the last packet should have the EOS flag set before
1264  *    sending it to ogg. if pad->buffer is NULL we need to wait to find out
1265  *    whether there are any more buffers.
1266  * 3) pages get queued on a per-pad queue. Every time a page is queued, a
1267  *    dequeue is called, which will dequeue the oldest page on any pad, provided
1268  *    that ALL pads have at least one marked page in the queue (or remaining
1269  *    pads are at EOS)
1270  */
1271 static GstFlowReturn
1272 gst_ogg_mux_process_best_pad (GstOggMux * ogg_mux, GstOggPadData * best)
1273 {
1274   GstFlowReturn ret = GST_FLOW_OK;
1275   gboolean delta_unit;
1276   gint64 granulepos = 0;
1277   GstClockTime timestamp, gp_time;
1278
1279   GST_LOG_OBJECT (ogg_mux, "best pad %" GST_PTR_FORMAT
1280       ", currently pulling from %" GST_PTR_FORMAT, best->collect.pad,
1281       ogg_mux->pulling);
1282
1283   /* best->buffer is non-NULL, either the pad is EOS's or there is a next 
1284    * buffer */
1285   if (best->next_buffer == NULL && !best->eos) {
1286     GST_WARNING_OBJECT (ogg_mux, "no subsequent buffer and EOS not reached");
1287     return GST_FLOW_WRONG_STATE;
1288   }
1289
1290   /* if we were already pulling from one pad, but the new "best" buffer is
1291    * from another pad, we need to check if we have reason to flush a page
1292    * for the pad we were pulling from before */
1293   if (ogg_mux->pulling && best &&
1294       ogg_mux->pulling != best && ogg_mux->pulling->buffer) {
1295     GstOggPadData *pad = ogg_mux->pulling;
1296     GstClockTime last_ts = GST_BUFFER_END_TIME (pad->buffer);
1297
1298     /* if the next packet in the current page is going to make the page
1299      * too long, we need to flush */
1300     if (last_ts > ogg_mux->next_ts + ogg_mux->max_delay) {
1301       ogg_page page;
1302
1303       GST_LOG_OBJECT (pad->collect.pad,
1304           GST_GP_FORMAT " stored packet %" G_GINT64_FORMAT
1305           " will make page too long, flushing",
1306           GST_BUFFER_OFFSET_END (pad->buffer),
1307           (gint64) pad->map.stream.packetno);
1308
1309       while (ogg_stream_flush (&pad->map.stream, &page)) {
1310         /* end time of this page is the timestamp of the next buffer */
1311         ogg_mux->pulling->timestamp_end = GST_BUFFER_TIMESTAMP (pad->buffer);
1312         /* Place page into the per-pad queue */
1313         ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
1314             pad->first_delta);
1315         /* increment the page number counter */
1316         pad->pageno++;
1317         /* mark other pages as delta */
1318         pad->first_delta = TRUE;
1319       }
1320       pad->new_page = TRUE;
1321       ogg_mux->pulling = NULL;
1322     }
1323   }
1324
1325   /* if we don't know which pad to pull on, use the best one */
1326   if (ogg_mux->pulling == NULL) {
1327     ogg_mux->pulling = best;
1328     GST_LOG_OBJECT (ogg_mux->pulling->collect.pad, "pulling from best pad");
1329
1330     /* remember timestamp and gp time of first buffer for this new pad */
1331     if (ogg_mux->pulling != NULL) {
1332       ogg_mux->next_ts = GST_BUFFER_TIMESTAMP (ogg_mux->pulling->buffer);
1333       GST_LOG_OBJECT (ogg_mux->pulling->collect.pad, "updated times, next ts %"
1334           GST_TIME_FORMAT, GST_TIME_ARGS (ogg_mux->next_ts));
1335     } else {
1336       /* no pad to pull on, send EOS */
1337       gst_pad_push_event (ogg_mux->srcpad, gst_event_new_eos ());
1338       return GST_FLOW_WRONG_STATE;
1339     }
1340   }
1341
1342   if (ogg_mux->need_headers) {
1343     ret = gst_ogg_mux_send_headers (ogg_mux);
1344     ogg_mux->need_headers = FALSE;
1345   }
1346
1347   /* we are pulling from a pad, continue to do so until a page
1348    * has been filled and queued */
1349   if (ogg_mux->pulling != NULL) {
1350     ogg_packet packet;
1351     ogg_page page;
1352     GstBuffer *buf, *tmpbuf;
1353     GstOggPadData *pad = ogg_mux->pulling;
1354     gint64 duration;
1355     gboolean force_flush;
1356     gsize size;
1357
1358     GST_LOG_OBJECT (ogg_mux->pulling->collect.pad, "pulling from pad");
1359
1360     /* now see if we have a buffer */
1361     buf = pad->buffer;
1362     if (buf == NULL) {
1363       GST_DEBUG_OBJECT (ogg_mux, "pad was EOS");
1364       ogg_mux->pulling = NULL;
1365       return GST_FLOW_OK;
1366     }
1367
1368     delta_unit = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
1369     duration = GST_BUFFER_DURATION (buf);
1370
1371     /* if the current "next timestamp" on the pad is unset, then this is the
1372      * first packet on the new page.  Update our pad's page timestamp */
1373     if (ogg_mux->pulling->timestamp == GST_CLOCK_TIME_NONE) {
1374       ogg_mux->pulling->timestamp = GST_BUFFER_TIMESTAMP (buf);
1375       GST_LOG_OBJECT (ogg_mux->pulling->collect.pad,
1376           "updated pad timestamp to %" GST_TIME_FORMAT,
1377           GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
1378     }
1379     /* create a packet from the buffer */
1380     packet.packet = gst_buffer_map (buf, &size, NULL, GST_MAP_READ);
1381     packet.bytes = size;
1382     packet.granulepos = GST_BUFFER_OFFSET_END (buf);
1383     if (packet.granulepos == -1)
1384       packet.granulepos = 0;
1385     /* mark BOS and packet number */
1386     packet.b_o_s = (pad->packetno == 0);
1387     packet.packetno = pad->packetno++;
1388     GST_LOG_OBJECT (pad->collect.pad, GST_GP_FORMAT
1389         " packet %" G_GINT64_FORMAT " (%ld bytes) created from buffer",
1390         GST_GP_CAST (packet.granulepos), (gint64) packet.packetno,
1391         packet.bytes);
1392
1393     packet.e_o_s = (pad->eos ? 1 : 0);
1394     tmpbuf = NULL;
1395
1396     /* we flush when we see a new keyframe */
1397     force_flush = (pad->prev_delta && !delta_unit) || pad->always_flush_page;
1398     if (duration != -1) {
1399       pad->duration += duration;
1400       /* if page duration exceeds max, flush page */
1401       if (pad->duration > ogg_mux->max_page_delay) {
1402         force_flush = TRUE;
1403         pad->duration = 0;
1404       }
1405     }
1406
1407     if (GST_BUFFER_IS_DISCONT (buf)) {
1408       if (pad->data_pushed) {
1409         GST_LOG_OBJECT (pad->collect.pad, "got discont");
1410         packet.packetno++;
1411         /* No public API for this; hack things in */
1412         pad->map.stream.pageno++;
1413         force_flush = TRUE;
1414       } else {
1415         GST_LOG_OBJECT (pad->collect.pad, "discont at stream start");
1416       }
1417     }
1418
1419     /* flush the currently built page if necessary */
1420     if (force_flush) {
1421       GST_LOG_OBJECT (pad->collect.pad,
1422           GST_GP_FORMAT " forced flush of page before this packet",
1423           GST_BUFFER_OFFSET_END (pad->buffer));
1424       while (ogg_stream_flush (&pad->map.stream, &page)) {
1425         /* end time of this page is the timestamp of the next buffer */
1426         ogg_mux->pulling->timestamp_end = GST_BUFFER_TIMESTAMP (pad->buffer);
1427         ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
1428             pad->first_delta);
1429
1430         /* increment the page number counter */
1431         pad->pageno++;
1432         /* mark other pages as delta */
1433         pad->first_delta = TRUE;
1434       }
1435       pad->new_page = TRUE;
1436     }
1437
1438     /* if this is the first packet of a new page figure out the delta flag */
1439     if (pad->new_page) {
1440       if (delta_unit) {
1441         /* mark the page as delta */
1442         pad->first_delta = TRUE;
1443       } else {
1444         /* got a keyframe */
1445         if (ogg_mux->delta_pad == pad) {
1446           /* if we get it on the pad with deltaunits,
1447            * we mark the page as non delta */
1448           pad->first_delta = FALSE;
1449         } else if (ogg_mux->delta_pad != NULL) {
1450           /* if there are pads with delta frames, we
1451            * must mark this one as delta */
1452           pad->first_delta = TRUE;
1453         } else {
1454           pad->first_delta = FALSE;
1455         }
1456       }
1457       pad->new_page = FALSE;
1458     }
1459
1460     /* save key unit to track delta->key unit transitions */
1461     pad->prev_delta = delta_unit;
1462
1463     /* swap the packet in */
1464     if (packet.e_o_s == 1)
1465       GST_DEBUG_OBJECT (pad->collect.pad, "swapping in EOS packet");
1466     if (packet.b_o_s == 1)
1467       GST_DEBUG_OBJECT (pad->collect.pad, "swapping in BOS packet");
1468
1469     ogg_stream_packetin (&pad->map.stream, &packet);
1470     gst_buffer_unmap (buf, packet.packet, size);
1471     pad->data_pushed = TRUE;
1472
1473     gp_time = GST_BUFFER_OFFSET (pad->buffer);
1474     granulepos = GST_BUFFER_OFFSET_END (pad->buffer);
1475     timestamp = GST_BUFFER_TIMESTAMP (pad->buffer);
1476
1477     GST_LOG_OBJECT (pad->collect.pad,
1478         GST_GP_FORMAT " packet %" G_GINT64_FORMAT ", gp time %"
1479         GST_TIME_FORMAT ", timestamp %" GST_TIME_FORMAT " packetin'd",
1480         granulepos, (gint64) packet.packetno, GST_TIME_ARGS (gp_time),
1481         GST_TIME_ARGS (timestamp));
1482     /* don't need the old buffer anymore */
1483     gst_buffer_unref (pad->buffer);
1484     /* store new readahead buffer */
1485     pad->buffer = tmpbuf;
1486
1487     /* let ogg write out the pages now. The packet we got could end
1488      * up in more than one page so we need to write them all */
1489     if (ogg_stream_pageout (&pad->map.stream, &page) > 0) {
1490       /* we have a new page, so we need to timestamp it correctly.
1491        * if this fresh packet ends on this page, then the page's granulepos
1492        * comes from that packet, and we should set this buffer's timestamp */
1493
1494       GST_LOG_OBJECT (pad->collect.pad,
1495           GST_GP_FORMAT " packet %" G_GINT64_FORMAT ", time %"
1496           GST_TIME_FORMAT ") caused new page",
1497           granulepos, (gint64) packet.packetno, GST_TIME_ARGS (timestamp));
1498       GST_LOG_OBJECT (pad->collect.pad,
1499           GST_GP_FORMAT " new page %ld",
1500           GST_GP_CAST (ogg_page_granulepos (&page)), pad->map.stream.pageno);
1501
1502       if (ogg_page_granulepos (&page) == granulepos) {
1503         /* the packet we streamed in finishes on the current page,
1504          * because the page's granulepos is the granulepos of the last
1505          * packet completed on that page,
1506          * so update the timestamp that we will give to the page */
1507         GST_LOG_OBJECT (pad->collect.pad,
1508             GST_GP_FORMAT
1509             " packet finishes on current page, updating gp time to %"
1510             GST_TIME_FORMAT, granulepos, GST_TIME_ARGS (gp_time));
1511         pad->gp_time = gp_time;
1512       } else {
1513         GST_LOG_OBJECT (pad->collect.pad,
1514             GST_GP_FORMAT
1515             " packet spans beyond current page, keeping old gp time %"
1516             GST_TIME_FORMAT, granulepos, GST_TIME_ARGS (pad->gp_time));
1517       }
1518
1519       /* push the page */
1520       /* end time of this page is the timestamp of the next buffer */
1521       pad->timestamp_end = timestamp;
1522       ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page, pad->first_delta);
1523       pad->pageno++;
1524       /* mark next pages as delta */
1525       pad->first_delta = TRUE;
1526
1527       /* use an inner loop here to flush the remaining pages and
1528        * mark them as delta frames as well */
1529       while (ogg_stream_pageout (&pad->map.stream, &page) > 0) {
1530         if (ogg_page_granulepos (&page) == granulepos) {
1531           /* the page has taken up the new packet completely, which means
1532            * the packet ends the page and we can update the gp time
1533            * before pushing out */
1534           pad->gp_time = gp_time;
1535         }
1536
1537         /* we have a complete page now, we can push the page
1538          * and make sure to pull on a new pad the next time around */
1539         ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
1540             pad->first_delta);
1541         /* increment the page number counter */
1542         pad->pageno++;
1543       }
1544       /* need a new page as well */
1545       pad->new_page = TRUE;
1546       pad->duration = 0;
1547       /* we're done pulling on this pad, make sure to choose a new
1548        * pad for pulling in the next iteration */
1549       ogg_mux->pulling = NULL;
1550     }
1551
1552     /* Update the gp time, if necessary, since any future page will have at
1553      * least this gp time.
1554      */
1555     if (pad->gp_time < gp_time) {
1556       pad->gp_time = gp_time;
1557       GST_LOG_OBJECT (pad->collect.pad,
1558           "Updated running gp time of pad %" GST_PTR_FORMAT
1559           " to %" GST_TIME_FORMAT, pad->collect.pad, GST_TIME_ARGS (gp_time));
1560     }
1561   }
1562
1563   return ret;
1564 }
1565
1566 /* all_pads_eos:
1567  *
1568  * Checks if all pads are EOS'd by peeking.
1569  *
1570  * Returns TRUE if all pads are EOS.
1571  */
1572 static gboolean
1573 all_pads_eos (GstCollectPads * pads)
1574 {
1575   GSList *walk;
1576   gboolean alleos = TRUE;
1577
1578   walk = pads->data;
1579   while (walk) {
1580     GstBuffer *buf;
1581     GstCollectData *data = (GstCollectData *) walk->data;
1582
1583     buf = gst_collect_pads_peek (pads, data);
1584     if (buf) {
1585       alleos = FALSE;
1586       gst_buffer_unref (buf);
1587       goto beach;
1588     }
1589     walk = walk->next;
1590   }
1591 beach:
1592   return alleos;
1593 }
1594
1595 /* This function is called when there is data on all pads.
1596  * 
1597  * It finds a pad to pull on, this is done by looking at the buffers
1598  * to decide which one to use, and using the 'oldest' one first. It then calls
1599  * gst_ogg_mux_process_best_pad() to process as much data as possible.
1600  * 
1601  * If all the pads have received EOS, it flushes out all data by continually
1602  * getting the best pad and calling gst_ogg_mux_process_best_pad() until they
1603  * are all empty, and then sends EOS.
1604  */
1605 static GstFlowReturn
1606 gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux)
1607 {
1608   GstOggPadData *best;
1609   GstFlowReturn ret;
1610   gint activebefore;
1611
1612   GST_LOG_OBJECT (ogg_mux, "collected");
1613
1614   activebefore = ogg_mux->active_pads;
1615
1616   /* queue buffers on all pads; find a buffer with the lowest timestamp */
1617   best = gst_ogg_mux_queue_pads (ogg_mux);
1618   if (best && !best->buffer) {
1619     GST_DEBUG_OBJECT (ogg_mux, "No buffer available on best pad");
1620     return GST_FLOW_OK;
1621   }
1622
1623   if (!best) {
1624     return GST_FLOW_WRONG_STATE;
1625   }
1626
1627   ret = gst_ogg_mux_process_best_pad (ogg_mux, best);
1628
1629   if (ogg_mux->active_pads < activebefore) {
1630     /* If the active pad count went down, this mean at least one pad has gone
1631      * EOS. Since CollectPads only calls _collected() once when all pads are
1632      * EOS, and our code doesn't _pop() from all pads we need to check that by
1633      * peeking on all pads, else we won't be called again and the muxing will
1634      * not terminate (push out EOS). */
1635
1636     /* if all the pads have been removed, flush all pending data */
1637     if ((ret == GST_FLOW_OK) && all_pads_eos (pads)) {
1638       GST_LOG_OBJECT (ogg_mux, "no pads remaining, flushing data");
1639
1640       do {
1641         best = gst_ogg_mux_queue_pads (ogg_mux);
1642         if (best)
1643           ret = gst_ogg_mux_process_best_pad (ogg_mux, best);
1644       } while ((ret == GST_FLOW_OK) && (best != NULL));
1645
1646       GST_DEBUG_OBJECT (ogg_mux, "Pushing EOS");
1647       gst_pad_push_event (ogg_mux->srcpad, gst_event_new_eos ());
1648     }
1649   }
1650
1651   return ret;
1652 }
1653
1654 static void
1655 gst_ogg_mux_get_property (GObject * object,
1656     guint prop_id, GValue * value, GParamSpec * pspec)
1657 {
1658   GstOggMux *ogg_mux;
1659
1660   ogg_mux = GST_OGG_MUX (object);
1661
1662   switch (prop_id) {
1663     case ARG_MAX_DELAY:
1664       g_value_set_uint64 (value, ogg_mux->max_delay);
1665       break;
1666     case ARG_MAX_PAGE_DELAY:
1667       g_value_set_uint64 (value, ogg_mux->max_page_delay);
1668       break;
1669     default:
1670       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1671       break;
1672   }
1673 }
1674
1675 static void
1676 gst_ogg_mux_set_property (GObject * object,
1677     guint prop_id, const GValue * value, GParamSpec * pspec)
1678 {
1679   GstOggMux *ogg_mux;
1680
1681   ogg_mux = GST_OGG_MUX (object);
1682
1683   switch (prop_id) {
1684     case ARG_MAX_DELAY:
1685       ogg_mux->max_delay = g_value_get_uint64 (value);
1686       break;
1687     case ARG_MAX_PAGE_DELAY:
1688       ogg_mux->max_page_delay = g_value_get_uint64 (value);
1689       break;
1690     default:
1691       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1692       break;
1693   }
1694 }
1695
1696 /* reset all variables in the ogg pads. */
1697 static void
1698 gst_ogg_mux_init_collectpads (GstCollectPads * collect)
1699 {
1700   GSList *walk;
1701
1702   walk = collect->data;
1703   while (walk) {
1704     GstOggPadData *oggpad = (GstOggPadData *) walk->data;
1705
1706     ogg_stream_init (&oggpad->map.stream, oggpad->map.serialno);
1707     oggpad->packetno = 0;
1708     oggpad->pageno = 0;
1709     oggpad->eos = FALSE;
1710     /* we assume there will be some control data first for this pad */
1711     oggpad->state = GST_OGG_PAD_STATE_CONTROL;
1712     oggpad->new_page = TRUE;
1713     oggpad->first_delta = FALSE;
1714     oggpad->prev_delta = FALSE;
1715     oggpad->data_pushed = FALSE;
1716     oggpad->pagebuffers = g_queue_new ();
1717
1718     gst_segment_init (&oggpad->segment, GST_FORMAT_TIME);
1719
1720     walk = g_slist_next (walk);
1721   }
1722 }
1723
1724 /* Clear all buffers from the collectpads object */
1725 static void
1726 gst_ogg_mux_clear_collectpads (GstCollectPads * collect)
1727 {
1728   GSList *walk;
1729
1730   for (walk = collect->data; walk; walk = g_slist_next (walk)) {
1731     GstOggPadData *oggpad = (GstOggPadData *) walk->data;
1732     GstBuffer *buf;
1733
1734     ogg_stream_clear (&oggpad->map.stream);
1735
1736     while ((buf = g_queue_pop_head (oggpad->pagebuffers)) != NULL) {
1737       gst_buffer_unref (buf);
1738     }
1739     g_queue_free (oggpad->pagebuffers);
1740     oggpad->pagebuffers = NULL;
1741
1742     if (oggpad->buffer) {
1743       gst_buffer_unref (oggpad->buffer);
1744       oggpad->buffer = NULL;
1745     }
1746     if (oggpad->next_buffer) {
1747       gst_buffer_unref (oggpad->next_buffer);
1748       oggpad->next_buffer = NULL;
1749     }
1750
1751     gst_segment_init (&oggpad->segment, GST_FORMAT_TIME);
1752   }
1753 }
1754
1755 static GstStateChangeReturn
1756 gst_ogg_mux_change_state (GstElement * element, GstStateChange transition)
1757 {
1758   GstOggMux *ogg_mux;
1759   GstStateChangeReturn ret;
1760
1761   ogg_mux = GST_OGG_MUX (element);
1762
1763   switch (transition) {
1764     case GST_STATE_CHANGE_NULL_TO_READY:
1765       break;
1766     case GST_STATE_CHANGE_READY_TO_PAUSED:
1767       gst_ogg_mux_clear (ogg_mux);
1768       gst_ogg_mux_init_collectpads (ogg_mux->collect);
1769       gst_collect_pads_start (ogg_mux->collect);
1770       break;
1771     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1772       break;
1773     case GST_STATE_CHANGE_PAUSED_TO_READY:
1774       gst_collect_pads_stop (ogg_mux->collect);
1775       break;
1776     default:
1777       break;
1778   }
1779
1780   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1781
1782   switch (transition) {
1783     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1784       break;
1785     case GST_STATE_CHANGE_PAUSED_TO_READY:
1786       gst_ogg_mux_clear_collectpads (ogg_mux->collect);
1787       break;
1788     case GST_STATE_CHANGE_READY_TO_NULL:
1789       break;
1790     default:
1791       break;
1792   }
1793
1794   return ret;
1795 }
1796
1797 gboolean
1798 gst_ogg_mux_plugin_init (GstPlugin * plugin)
1799 {
1800   GST_DEBUG_CATEGORY_INIT (gst_ogg_mux_debug, "oggmux", 0, "ogg muxer");
1801
1802   return gst_element_register (plugin, "oggmux", GST_RANK_PRIMARY,
1803       GST_TYPE_OGG_MUX);
1804 }