gio: Remove unused function
[platform/upstream/gstreamer.git] / ext / ogg / gstoggmux.c
1 /* OGG muxer plugin for GStreamer
2  * Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
3  * Copyright (C) 2006 Thomas Vander Stichele <thomas at apestaart dot org>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * SECTION:element-oggmux
23  * @see_also: <link linkend="gst-plugins-base-plugins-deoggmux">oggdemux</link>
24  *
25  * This element merges streams (audio and video) into ogg files.
26  *
27  * <refsect2>
28  * <title>Example pipelines</title>
29  * |[
30  * gst-launch v4l2src num-buffers=500 ! video/x-raw-yuv,width=320,height=240 ! ffmpegcolorspace ! theoraenc ! oggmux ! filesink location=video.ogg
31  * ]| Encodes a video stream captured from a v4l2-compatible camera to Ogg/Theora
32  * (the encoding will stop automatically after 500 frames)
33  * </refsect2>
34  *
35  * Last reviewed on 2008-02-06 (0.10.17)
36  */
37
38 #ifdef HAVE_CONFIG_H
39 #include "config.h"
40 #endif
41
42 #include <gst/gst.h>
43 #include <gst/base/gstcollectpads.h>
44
45 #include "gstoggmux.h"
46
47 /* memcpy - if someone knows a way to get rid of it, please speak up
48  * note: the ogg docs even say you need this... */
49 #include <string.h>
50 #include <time.h>
51 #include <stdlib.h>             /* rand, srand, atoi */
52
53 GST_DEBUG_CATEGORY_STATIC (gst_ogg_mux_debug);
54 #define GST_CAT_DEFAULT gst_ogg_mux_debug
55
56 /* This isn't generally what you'd want with an end-time macro, because
57    technically the end time of a buffer with invalid duration is invalid. But
58    for sorting ogg pages this is what we want. */
59 #define GST_BUFFER_END_TIME(buf) \
60     (GST_BUFFER_DURATION_IS_VALID (buf) \
61     ? GST_BUFFER_TIMESTAMP (buf) + GST_BUFFER_DURATION (buf) \
62     : GST_BUFFER_TIMESTAMP (buf))
63
64 #define GST_GP_FORMAT "[gp %8" G_GINT64_FORMAT "]"
65 #define GST_GP_CAST(_gp) ((gint64) _gp)
66
67 typedef enum
68 {
69   GST_OGG_FLAG_BOS = GST_ELEMENT_FLAG_LAST,
70   GST_OGG_FLAG_EOS
71 }
72 GstOggFlag;
73
74 /* elementfactory information */
75 static const GstElementDetails gst_ogg_mux_details =
76 GST_ELEMENT_DETAILS ("Ogg muxer",
77     "Codec/Muxer",
78     "mux ogg streams (info about ogg: http://xiph.org)",
79     "Wim Taymans <wim@fluendo.com>");
80
81 /* OggMux signals and args */
82 enum
83 {
84   /* FILL ME */
85   LAST_SIGNAL
86 };
87
88 /* set to 0.5 seconds by default */
89 #define DEFAULT_MAX_DELAY       G_GINT64_CONSTANT(500000000)
90 #define DEFAULT_MAX_PAGE_DELAY  G_GINT64_CONSTANT(500000000)
91 enum
92 {
93   ARG_0,
94   ARG_MAX_DELAY,
95   ARG_MAX_PAGE_DELAY,
96 };
97
98 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
99     GST_PAD_SRC,
100     GST_PAD_ALWAYS,
101     GST_STATIC_CAPS ("application/ogg")
102     );
103
104 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink_%d",
105     GST_PAD_SINK,
106     GST_PAD_REQUEST,
107     GST_STATIC_CAPS ("video/x-theora; "
108         "audio/x-vorbis; audio/x-flac; audio/x-speex; audio/x-celt; "
109         "application/x-ogm-video; application/x-ogm-audio; video/x-dirac; "
110         "video/x-smoke; text/x-cmml, encoded = (boolean) TRUE; "
111         "subtitle/x-kate; application/x-kate")
112     );
113
114 static void gst_ogg_mux_base_init (gpointer g_class);
115 static void gst_ogg_mux_class_init (GstOggMuxClass * klass);
116 static void gst_ogg_mux_init (GstOggMux * ogg_mux);
117 static void gst_ogg_mux_finalize (GObject * object);
118
119 static GstFlowReturn
120 gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux);
121 static gboolean gst_ogg_mux_handle_src_event (GstPad * pad, GstEvent * event);
122 static GstPad *gst_ogg_mux_request_new_pad (GstElement * element,
123     GstPadTemplate * templ, const gchar * name);
124 static void gst_ogg_mux_release_pad (GstElement * element, GstPad * pad);
125
126 static void gst_ogg_mux_set_property (GObject * object,
127     guint prop_id, const GValue * value, GParamSpec * pspec);
128 static void gst_ogg_mux_get_property (GObject * object,
129     guint prop_id, GValue * value, GParamSpec * pspec);
130 static GstStateChangeReturn gst_ogg_mux_change_state (GstElement * element,
131     GstStateChange transition);
132
133 static GstElementClass *parent_class = NULL;
134
135 /*static guint gst_ogg_mux_signals[LAST_SIGNAL] = { 0 }; */
136
137 GType
138 gst_ogg_mux_get_type (void)
139 {
140   static GType ogg_mux_type = 0;
141
142   if (G_UNLIKELY (ogg_mux_type == 0)) {
143     static const GTypeInfo ogg_mux_info = {
144       sizeof (GstOggMuxClass),
145       gst_ogg_mux_base_init,
146       NULL,
147       (GClassInitFunc) gst_ogg_mux_class_init,
148       NULL,
149       NULL,
150       sizeof (GstOggMux),
151       0,
152       (GInstanceInitFunc) gst_ogg_mux_init,
153     };
154     static const GInterfaceInfo preset_info = {
155       NULL,
156       NULL,
157       NULL
158     };
159
160     ogg_mux_type =
161         g_type_register_static (GST_TYPE_ELEMENT, "GstOggMux", &ogg_mux_info,
162         0);
163
164     g_type_add_interface_static (ogg_mux_type, GST_TYPE_PRESET, &preset_info);
165   }
166   return ogg_mux_type;
167 }
168
169 static void
170 gst_ogg_mux_base_init (gpointer g_class)
171 {
172   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
173
174   gst_element_class_add_pad_template (element_class,
175       gst_static_pad_template_get (&src_factory));
176   gst_element_class_add_pad_template (element_class,
177       gst_static_pad_template_get (&sink_factory));
178
179   gst_element_class_set_details (element_class, &gst_ogg_mux_details);
180 }
181
182 static void
183 gst_ogg_mux_class_init (GstOggMuxClass * klass)
184 {
185   GObjectClass *gobject_class;
186   GstElementClass *gstelement_class;
187
188   gobject_class = (GObjectClass *) klass;
189   gstelement_class = (GstElementClass *) klass;
190
191   parent_class = g_type_class_peek_parent (klass);
192
193   gobject_class->finalize = gst_ogg_mux_finalize;
194   gobject_class->get_property = gst_ogg_mux_get_property;
195   gobject_class->set_property = gst_ogg_mux_set_property;
196
197   gstelement_class->request_new_pad = gst_ogg_mux_request_new_pad;
198   gstelement_class->release_pad = gst_ogg_mux_release_pad;
199
200   g_object_class_install_property (gobject_class, ARG_MAX_DELAY,
201       g_param_spec_uint64 ("max-delay", "Max delay",
202           "Maximum delay in multiplexing streams", 0, G_MAXUINT64,
203           DEFAULT_MAX_DELAY,
204           (GParamFlags) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
205   g_object_class_install_property (gobject_class, ARG_MAX_PAGE_DELAY,
206       g_param_spec_uint64 ("max-page-delay", "Max page delay",
207           "Maximum delay for sending out a page", 0, G_MAXUINT64,
208           DEFAULT_MAX_PAGE_DELAY,
209           (GParamFlags) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
210
211   gstelement_class->change_state = gst_ogg_mux_change_state;
212
213 }
214
215 #if 0
216 static const GstEventMask *
217 gst_ogg_mux_get_sink_event_masks (GstPad * pad)
218 {
219   static const GstEventMask gst_ogg_mux_sink_event_masks[] = {
220     {GST_EVENT_EOS, 0},
221     {GST_EVENT_DISCONTINUOUS, 0},
222     {0,}
223   };
224
225   return gst_ogg_mux_sink_event_masks;
226 }
227 #endif
228
229 static void
230 gst_ogg_mux_clear (GstOggMux * ogg_mux)
231 {
232   ogg_mux->pulling = NULL;
233   ogg_mux->need_headers = TRUE;
234   ogg_mux->delta_pad = NULL;
235   ogg_mux->offset = 0;
236   ogg_mux->next_ts = 0;
237   ogg_mux->last_ts = GST_CLOCK_TIME_NONE;
238 }
239
240 static void
241 gst_ogg_mux_init (GstOggMux * ogg_mux)
242 {
243   GstElementClass *klass = GST_ELEMENT_GET_CLASS (ogg_mux);
244
245   ogg_mux->srcpad =
246       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
247           "src"), "src");
248   gst_pad_set_event_function (ogg_mux->srcpad, gst_ogg_mux_handle_src_event);
249   gst_element_add_pad (GST_ELEMENT (ogg_mux), ogg_mux->srcpad);
250
251   GST_OBJECT_FLAG_SET (GST_ELEMENT (ogg_mux), GST_OGG_FLAG_BOS);
252
253   /* seed random number generator for creation of serial numbers */
254   srand (time (NULL));
255
256   ogg_mux->collect = gst_collect_pads_new ();
257   gst_collect_pads_set_function (ogg_mux->collect,
258       (GstCollectPadsFunction) GST_DEBUG_FUNCPTR (gst_ogg_mux_collected),
259       ogg_mux);
260
261   ogg_mux->max_delay = DEFAULT_MAX_DELAY;
262   ogg_mux->max_page_delay = DEFAULT_MAX_PAGE_DELAY;
263
264   gst_ogg_mux_clear (ogg_mux);
265 }
266
267 static void
268 gst_ogg_mux_finalize (GObject * object)
269 {
270   GstOggMux *ogg_mux;
271
272   ogg_mux = GST_OGG_MUX (object);
273
274   if (ogg_mux->collect) {
275     gst_object_unref (ogg_mux->collect);
276     ogg_mux->collect = NULL;
277   }
278
279   G_OBJECT_CLASS (parent_class)->finalize (object);
280 }
281
282 static void
283 gst_ogg_mux_ogg_pad_destroy_notify (GstCollectData * data)
284 {
285   GstOggPad *oggpad = (GstOggPad *) data;
286   GstBuffer *buf;
287
288   ogg_stream_clear (&oggpad->stream);
289
290   if (oggpad->pagebuffers) {
291     while ((buf = g_queue_pop_head (oggpad->pagebuffers)) != NULL) {
292       gst_buffer_unref (buf);
293     }
294     g_queue_free (oggpad->pagebuffers);
295     oggpad->pagebuffers = NULL;
296   }
297 }
298
299 static GstPadLinkReturn
300 gst_ogg_mux_sinkconnect (GstPad * pad, GstPad * peer)
301 {
302   GstOggMux *ogg_mux;
303
304   ogg_mux = GST_OGG_MUX (gst_pad_get_parent (pad));
305
306   GST_DEBUG_OBJECT (ogg_mux, "sinkconnect triggered on %s", GST_PAD_NAME (pad));
307
308   gst_object_unref (ogg_mux);
309
310   return GST_PAD_LINK_OK;
311 }
312
313 static gboolean
314 gst_ogg_mux_sink_event (GstPad * pad, GstEvent * event)
315 {
316   GstOggMux *ogg_mux = GST_OGG_MUX (gst_pad_get_parent (pad));
317   GstOggPad *ogg_pad = (GstOggPad *) gst_pad_get_element_private (pad);
318   gboolean ret;
319
320   GST_DEBUG ("Got %s event on pad %s:%s", GST_EVENT_TYPE_NAME (event),
321       GST_DEBUG_PAD_NAME (pad));
322
323   switch (GST_EVENT_TYPE (event)) {
324     case GST_EVENT_NEWSEGMENT:
325       /* We don't support NEWSEGMENT events */
326       gst_event_unref (event);
327       ret = FALSE;
328       break;
329     default:
330       ret = TRUE;
331       break;
332   }
333
334   /* now GstCollectPads can take care of the rest, e.g. EOS */
335   if (ret)
336     ret = ogg_pad->collect_event (pad, event);
337
338   gst_object_unref (ogg_mux);
339   return ret;
340 }
341
342 static GstPad *
343 gst_ogg_mux_request_new_pad (GstElement * element,
344     GstPadTemplate * templ, const gchar * req_name)
345 {
346   GstOggMux *ogg_mux;
347   GstPad *newpad;
348   GstElementClass *klass;
349
350   g_return_val_if_fail (templ != NULL, NULL);
351
352   if (templ->direction != GST_PAD_SINK)
353     goto wrong_direction;
354
355   g_return_val_if_fail (GST_IS_OGG_MUX (element), NULL);
356   ogg_mux = GST_OGG_MUX (element);
357
358   klass = GST_ELEMENT_GET_CLASS (element);
359
360   if (templ != gst_element_class_get_pad_template (klass, "sink_%d"))
361     goto wrong_template;
362
363   {
364     gint serial;
365     gchar *name;
366
367     if (req_name == NULL || strlen (req_name) < 6) {
368       /* no name given when requesting the pad, use random serial number */
369       serial = rand ();
370     } else {
371       /* parse serial number from requested padname */
372       serial = atoi (&req_name[5]);
373     }
374     /* create new pad with the name */
375     GST_DEBUG_OBJECT (ogg_mux, "Creating new pad for serial %d", serial);
376     name = g_strdup_printf ("sink_%d", serial);
377     newpad = gst_pad_new_from_template (templ, name);
378     g_free (name);
379
380     /* construct our own wrapper data structure for the pad to
381      * keep track of its status */
382     {
383       GstOggPad *oggpad;
384
385       oggpad = (GstOggPad *)
386           gst_collect_pads_add_pad_full (ogg_mux->collect, newpad,
387           sizeof (GstOggPad), gst_ogg_mux_ogg_pad_destroy_notify);
388       ogg_mux->active_pads++;
389
390       oggpad->serial = serial;
391       ogg_stream_init (&oggpad->stream, serial);
392       oggpad->packetno = 0;
393       oggpad->pageno = 0;
394       oggpad->eos = FALSE;
395       /* we assume there will be some control data first for this pad */
396       oggpad->state = GST_OGG_PAD_STATE_CONTROL;
397       oggpad->new_page = TRUE;
398       oggpad->first_delta = FALSE;
399       oggpad->prev_delta = FALSE;
400       oggpad->pagebuffers = g_queue_new ();
401
402       oggpad->collect_event = (GstPadEventFunction) GST_PAD_EVENTFUNC (newpad);
403       gst_pad_set_event_function (newpad,
404           GST_DEBUG_FUNCPTR (gst_ogg_mux_sink_event));
405     }
406   }
407
408   /* setup some pad functions */
409   gst_pad_set_link_function (newpad, gst_ogg_mux_sinkconnect);
410
411   /* dd the pad to the element */
412   gst_element_add_pad (element, newpad);
413
414   return newpad;
415
416   /* ERRORS */
417 wrong_direction:
418   {
419     g_warning ("ogg_mux: request pad that is not a SINK pad\n");
420     return NULL;
421   }
422 wrong_template:
423   {
424     g_warning ("ogg_mux: this is not our template!\n");
425     return NULL;
426   }
427 }
428
429 static void
430 gst_ogg_mux_release_pad (GstElement * element, GstPad * pad)
431 {
432   GstOggMux *ogg_mux;
433
434   ogg_mux = GST_OGG_MUX (gst_pad_get_parent (pad));
435
436   gst_collect_pads_remove_pad (ogg_mux->collect, pad);
437   gst_element_remove_pad (element, pad);
438
439   gst_object_unref (ogg_mux);
440 }
441
442 /* handle events */
443 static gboolean
444 gst_ogg_mux_handle_src_event (GstPad * pad, GstEvent * event)
445 {
446   GstEventType type;
447
448   type = event ? GST_EVENT_TYPE (event) : GST_EVENT_UNKNOWN;
449
450   switch (type) {
451     case GST_EVENT_SEEK:
452       /* disable seeking for now */
453       return FALSE;
454     default:
455       break;
456   }
457
458   return gst_pad_event_default (pad, event);
459 }
460
461 static GstBuffer *
462 gst_ogg_mux_buffer_from_page (GstOggMux * mux, ogg_page * page, gboolean delta)
463 {
464   GstBuffer *buffer;
465
466   /* allocate space for header and body */
467   buffer = gst_buffer_new_and_alloc (page->header_len + page->body_len);
468   memcpy (GST_BUFFER_DATA (buffer), page->header, page->header_len);
469   memcpy (GST_BUFFER_DATA (buffer) + page->header_len,
470       page->body, page->body_len);
471
472   /* Here we set granulepos as our OFFSET_END to give easy direct access to
473    * this value later. Before we push it, we reset this to OFFSET + SIZE
474    * (see gst_ogg_mux_push_buffer). */
475   GST_BUFFER_OFFSET_END (buffer) = ogg_page_granulepos (page);
476   if (delta)
477     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
478
479   GST_LOG_OBJECT (mux, GST_GP_FORMAT
480       " created buffer %p from ogg page",
481       GST_GP_CAST (ogg_page_granulepos (page)), buffer);
482
483   return buffer;
484 }
485
486 static GstFlowReturn
487 gst_ogg_mux_push_buffer (GstOggMux * mux, GstBuffer * buffer)
488 {
489   GstCaps *caps;
490
491   /* fix up OFFSET and OFFSET_END again */
492   GST_BUFFER_OFFSET (buffer) = mux->offset;
493   mux->offset += GST_BUFFER_SIZE (buffer);
494   GST_BUFFER_OFFSET_END (buffer) = mux->offset;
495
496   /* Ensure we have monotonically increasing timestamps in the output. */
497   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)) {
498     if (mux->last_ts != GST_CLOCK_TIME_NONE &&
499         GST_BUFFER_TIMESTAMP (buffer) < mux->last_ts)
500       GST_BUFFER_TIMESTAMP (buffer) = mux->last_ts;
501     else
502       mux->last_ts = GST_BUFFER_TIMESTAMP (buffer);
503   }
504
505   caps = gst_pad_get_negotiated_caps (mux->srcpad);
506   gst_buffer_set_caps (buffer, caps);
507   if (caps)
508     gst_caps_unref (caps);
509
510   return gst_pad_push (mux->srcpad, buffer);
511 }
512
513 /* if all queues have at least one page, dequeue the page with the lowest
514  * timestamp */
515 static gboolean
516 gst_ogg_mux_dequeue_page (GstOggMux * mux, GstFlowReturn * flowret)
517 {
518   GSList *walk;
519   GstOggPad *opad = NULL;       /* "oldest" pad */
520   GstClockTime oldest = GST_CLOCK_TIME_NONE;
521   GstBuffer *buf = NULL;
522   gboolean ret = FALSE;
523
524   *flowret = GST_FLOW_OK;
525
526   walk = mux->collect->data;
527   while (walk) {
528     GstOggPad *pad = (GstOggPad *) walk->data;
529
530     /* We need each queue to either be at EOS, or have one or more pages
531      * available with a set granulepos (i.e. not -1), otherwise we don't have
532      * enough data yet to determine which stream needs to go next for correct
533      * time ordering. */
534     if (pad->pagebuffers->length == 0) {
535       if (pad->eos) {
536         GST_LOG_OBJECT (pad->collect.pad,
537             "pad is EOS, skipping for dequeue decision");
538       } else {
539         GST_LOG_OBJECT (pad->collect.pad,
540             "no pages in this queue, can't dequeue");
541         return FALSE;
542       }
543     } else {
544       /* We then need to check for a non-negative granulepos */
545       int i;
546       gboolean valid = FALSE;
547
548       for (i = 0; i < pad->pagebuffers->length; i++) {
549         buf = g_queue_peek_nth (pad->pagebuffers, i);
550         /* Here we check the OFFSET_END, which is actually temporarily the
551          * granulepos value for this buffer */
552         if (GST_BUFFER_OFFSET_END (buf) != -1) {
553           valid = TRUE;
554           break;
555         }
556       }
557       if (!valid) {
558         GST_LOG_OBJECT (pad->collect.pad,
559             "No page timestamps in queue, can't dequeue");
560         return FALSE;
561       }
562     }
563
564     walk = g_slist_next (walk);
565   }
566
567   walk = mux->collect->data;
568   while (walk) {
569     GstOggPad *pad = (GstOggPad *) walk->data;
570
571     /* any page with a granulepos of -1 can be pushed immediately.
572      * TODO: it CAN be, but it seems silly to do so? */
573     buf = g_queue_peek_head (pad->pagebuffers);
574     while (buf && GST_BUFFER_OFFSET_END (buf) == -1) {
575       GST_LOG_OBJECT (pad->collect.pad, "[gp        -1] pushing page");
576       g_queue_pop_head (pad->pagebuffers);
577       *flowret = gst_ogg_mux_push_buffer (mux, buf);
578       buf = g_queue_peek_head (pad->pagebuffers);
579       ret = TRUE;
580     }
581
582     if (buf) {
583       /* if no oldest buffer yet, take this one */
584       if (oldest == GST_CLOCK_TIME_NONE) {
585         GST_LOG_OBJECT (mux, "no oldest yet, taking buffer %p from pad %"
586             GST_PTR_FORMAT " with gp time %" GST_TIME_FORMAT,
587             buf, pad->collect.pad, GST_TIME_ARGS (GST_BUFFER_OFFSET (buf)));
588         oldest = GST_BUFFER_OFFSET (buf);
589         opad = pad;
590       } else {
591         /* if we have an oldest, compare with this one */
592         if (GST_BUFFER_OFFSET (buf) < oldest) {
593           GST_LOG_OBJECT (mux, "older buffer %p, taking from pad %"
594               GST_PTR_FORMAT " with gp time %" GST_TIME_FORMAT,
595               buf, pad->collect.pad, GST_TIME_ARGS (GST_BUFFER_OFFSET (buf)));
596           oldest = GST_BUFFER_OFFSET (buf);
597           opad = pad;
598         }
599       }
600     }
601     walk = g_slist_next (walk);
602   }
603
604   if (oldest != GST_CLOCK_TIME_NONE) {
605     g_assert (opad);
606     buf = g_queue_pop_head (opad->pagebuffers);
607     GST_LOG_OBJECT (opad->collect.pad,
608         GST_GP_FORMAT " pushing oldest page buffer %p (granulepos time %"
609         GST_TIME_FORMAT ")", GST_BUFFER_OFFSET_END (buf), buf,
610         GST_TIME_ARGS (GST_BUFFER_OFFSET (buf)));
611     *flowret = gst_ogg_mux_push_buffer (mux, buf);
612     ret = TRUE;
613   }
614
615   return ret;
616 }
617
618 /* put the given ogg page on a per-pad queue, timestamping it correctly.
619  * after that, dequeue and push as many pages as possible.
620  * Caller should make sure:
621  * pad->timestamp     was set with the timestamp of the first packet put
622  *                    on the page
623  * pad->timestamp_end was set with the timestamp + duration of the last packet
624  *                    put on the page
625  * pad->gp_time       was set with the time matching the gp of the last
626  *                    packet put on the page
627  *
628  * will also reset timestamp and timestamp_end, so caller func can restart
629  * counting.
630  */
631 static GstFlowReturn
632 gst_ogg_mux_pad_queue_page (GstOggMux * mux, GstOggPad * pad, ogg_page * page,
633     gboolean delta)
634 {
635   GstFlowReturn ret;
636   GstBuffer *buffer = gst_ogg_mux_buffer_from_page (mux, page, delta);
637
638   /* take the timestamp of the first packet on this page */
639   GST_BUFFER_TIMESTAMP (buffer) = pad->timestamp;
640   GST_BUFFER_DURATION (buffer) = pad->timestamp_end - pad->timestamp;
641   /* take the gp time of the last completed packet on this page */
642   GST_BUFFER_OFFSET (buffer) = pad->gp_time;
643
644   /* the next page will start where the current page's end time leaves off */
645   pad->timestamp = pad->timestamp_end;
646
647   g_queue_push_tail (pad->pagebuffers, buffer);
648   GST_LOG_OBJECT (pad->collect.pad, GST_GP_FORMAT
649       " queued buffer page %p (gp time %"
650       GST_TIME_FORMAT ", timestamp %" GST_TIME_FORMAT
651       "), %d page buffers queued", GST_GP_CAST (ogg_page_granulepos (page)),
652       buffer, GST_TIME_ARGS (GST_BUFFER_OFFSET (buffer)),
653       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
654       g_queue_get_length (pad->pagebuffers));
655
656   while (gst_ogg_mux_dequeue_page (mux, &ret)) {
657     if (ret != GST_FLOW_OK)
658       break;
659   }
660
661   return ret;
662 }
663
664 /*
665  * Given two pads, compare the buffers queued on it.
666  * Returns:
667  *  0 if they have an equal priority
668  * -1 if the first is better
669  *  1 if the second is better
670  * Priority decided by: a) validity, b) older timestamp, c) smaller number
671  * of muxed pages
672  */
673 static gint
674 gst_ogg_mux_compare_pads (GstOggMux * ogg_mux, GstOggPad * first,
675     GstOggPad * second)
676 {
677   guint64 firsttime, secondtime;
678
679   /* if the first pad doesn't contain anything or is even NULL, return
680    * the second pad as best candidate and vice versa */
681   if (first == NULL || (first->buffer == NULL && first->next_buffer == NULL))
682     return 1;
683   if (second == NULL || (second->buffer == NULL && second->next_buffer == NULL))
684     return -1;
685
686   /* no timestamp on first buffer, it must go first */
687   if (first->buffer)
688     firsttime = GST_BUFFER_TIMESTAMP (first->buffer);
689   else
690     firsttime = GST_BUFFER_TIMESTAMP (first->next_buffer);
691   if (firsttime == GST_CLOCK_TIME_NONE)
692     return -1;
693
694   /* no timestamp on second buffer, it must go first */
695   if (second->buffer)
696     secondtime = GST_BUFFER_TIMESTAMP (second->buffer);
697   else
698     secondtime = GST_BUFFER_TIMESTAMP (second->next_buffer);
699   if (secondtime == GST_CLOCK_TIME_NONE)
700     return 1;
701
702   /* first buffer has higher timestamp, second one should go first */
703   if (secondtime < firsttime)
704     return 1;
705   /* second buffer has higher timestamp, first one should go first */
706   else if (secondtime > firsttime)
707     return -1;
708   else {
709     /* buffers with equal timestamps, prefer the pad that has the
710      * least number of pages muxed */
711     if (second->pageno < first->pageno)
712       return 1;
713     else if (second->pageno > first->pageno)
714       return -1;
715   }
716
717   /* same priority if all of the above failed */
718   return 0;
719 }
720
721 /* make sure at least one buffer is queued on all pads, two if possible
722  * 
723  * if pad->buffer == NULL, pad->next_buffer !=  NULL, then
724  *   we do not know if the buffer is the last or not
725  * if pad->buffer != NULL, pad->next_buffer != NULL, then
726  *   pad->buffer is not the last buffer for the pad
727  * if pad->buffer != NULL, pad->next_buffer == NULL, then
728  *   pad->buffer if the last buffer for the pad
729  * 
730  * returns a pointer to an oggpad that holds the best buffer, or
731  * NULL when no pad was usable. "best" means the buffer marked
732  * with the lowest timestamp. If best->buffer == NULL then nothing
733  * should be done until more data arrives */
734 static GstOggPad *
735 gst_ogg_mux_queue_pads (GstOggMux * ogg_mux)
736 {
737   GstOggPad *bestpad = NULL, *still_hungry = NULL;
738   GSList *walk;
739
740   /* try to make sure we have a buffer from each usable pad first */
741   walk = ogg_mux->collect->data;
742   while (walk) {
743     GstOggPad *pad;
744     GstCollectData *data;
745
746     data = (GstCollectData *) walk->data;
747     pad = (GstOggPad *) data;
748
749     walk = g_slist_next (walk);
750
751     GST_LOG_OBJECT (data->pad, "looking at pad for buffer");
752
753     /* try to get a new buffer for this pad if needed and possible */
754     if (pad->buffer == NULL) {
755       GstBuffer *buf;
756       gboolean incaps;
757
758       /* shift the buffer along if needed (it's okay if next_buffer is NULL) */
759       if (pad->buffer == NULL) {
760         GST_LOG_OBJECT (data->pad, "shifting buffer %" GST_PTR_FORMAT,
761             pad->next_buffer);
762         pad->buffer = pad->next_buffer;
763         pad->next_buffer = NULL;
764       }
765
766       buf = gst_collect_pads_pop (ogg_mux->collect, data);
767       GST_LOG_OBJECT (data->pad, "popped buffer %" GST_PTR_FORMAT, buf);
768
769       /* On EOS we get a NULL buffer */
770       if (buf != NULL) {
771         if (ogg_mux->delta_pad == NULL &&
772             GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT))
773           ogg_mux->delta_pad = pad;
774
775         incaps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS);
776         /* if we need headers */
777         if (pad->state == GST_OGG_PAD_STATE_CONTROL) {
778           /* and we have one */
779           if (incaps) {
780             GST_DEBUG_OBJECT (ogg_mux,
781                 "got incaps buffer in control state, ignoring");
782             /* just ignore */
783             gst_buffer_unref (buf);
784             buf = NULL;
785           } else {
786             GST_DEBUG_OBJECT (ogg_mux,
787                 "got data buffer in control state, switching " "to data mode");
788             /* this is a data buffer so switch to data state */
789             pad->state = GST_OGG_PAD_STATE_DATA;
790           }
791         }
792       } else {
793         GST_DEBUG_OBJECT (data->pad, "EOS on pad");
794         if (!pad->eos) {
795           ogg_page page;
796           GstFlowReturn ret;
797
798           /* it's no longer active */
799           ogg_mux->active_pads--;
800
801           /* Just gone to EOS. Flush existing page(s) */
802           pad->eos = TRUE;
803
804           while (ogg_stream_flush (&pad->stream, &page)) {
805             /* Place page into the per-pad queue */
806             ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
807                 pad->first_delta);
808             /* increment the page number counter */
809             pad->pageno++;
810             /* mark other pages as delta */
811             pad->first_delta = TRUE;
812           }
813         }
814       }
815
816       pad->next_buffer = buf;
817     }
818
819     /* we should have a buffer now, see if it is the best pad to
820      * pull on */
821     if (pad->buffer || pad->next_buffer) {
822       if (gst_ogg_mux_compare_pads (ogg_mux, bestpad, pad) > 0) {
823         GST_LOG_OBJECT (data->pad,
824             "new best pad, with buffers %" GST_PTR_FORMAT
825             " and %" GST_PTR_FORMAT, pad->buffer, pad->next_buffer);
826
827         bestpad = pad;
828       }
829     } else if (!pad->eos) {
830       GST_LOG_OBJECT (data->pad, "hungry pad");
831       still_hungry = pad;
832     }
833   }
834
835   if (still_hungry)
836     /* drop back into collectpads... */
837     return still_hungry;
838   else
839     return bestpad;
840 }
841
842 static GList *
843 gst_ogg_mux_get_headers (GstOggPad * pad)
844 {
845   GList *res = NULL;
846   GstStructure *structure;
847   GstCaps *caps;
848   GstPad *thepad;
849
850   thepad = pad->collect.pad;
851
852   GST_LOG_OBJECT (thepad, "getting headers");
853
854   caps = gst_pad_get_negotiated_caps (thepad);
855   if (caps != NULL) {
856     const GValue *streamheader;
857
858     structure = gst_caps_get_structure (caps, 0);
859     streamheader = gst_structure_get_value (structure, "streamheader");
860     if (streamheader != NULL) {
861       GST_LOG_OBJECT (thepad, "got header");
862       if (G_VALUE_TYPE (streamheader) == GST_TYPE_ARRAY) {
863         GArray *bufarr = g_value_peek_pointer (streamheader);
864         gint i;
865
866         GST_LOG_OBJECT (thepad, "got fixed list");
867
868         for (i = 0; i < bufarr->len; i++) {
869           GValue *bufval = &g_array_index (bufarr, GValue, i);
870
871           GST_LOG_OBJECT (thepad, "item %d", i);
872           if (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER) {
873             GstBuffer *buf = g_value_peek_pointer (bufval);
874
875             GST_LOG_OBJECT (thepad, "adding item %d to header list", i);
876
877             gst_buffer_ref (buf);
878             res = g_list_append (res, buf);
879           }
880         }
881       } else {
882         GST_LOG_OBJECT (thepad, "streamheader is not fixed list");
883       }
884     } else if (gst_structure_has_name (structure, "video/x-dirac")) {
885       res = g_list_append (res, pad->buffer);
886       pad->buffer = pad->next_buffer;
887       pad->next_buffer = NULL;
888       pad->always_flush_page = TRUE;
889     } else {
890       GST_LOG_OBJECT (thepad, "caps don't have streamheader");
891     }
892     gst_caps_unref (caps);
893   } else {
894     GST_LOG_OBJECT (thepad, "got empty caps as negotiated format");
895   }
896   return res;
897 }
898
899 static GstCaps *
900 gst_ogg_mux_set_header_on_caps (GstCaps * caps, GList * buffers)
901 {
902   GstStructure *structure;
903   GValue array = { 0 };
904   GList *walk = buffers;
905
906   caps = gst_caps_make_writable (caps);
907
908   structure = gst_caps_get_structure (caps, 0);
909
910   /* put buffers in a fixed list */
911   g_value_init (&array, GST_TYPE_ARRAY);
912
913   while (walk) {
914     GstBuffer *buf = GST_BUFFER (walk->data);
915     GstBuffer *copy;
916     GValue value = { 0 };
917
918     walk = walk->next;
919
920     /* mark buffer */
921     GST_LOG ("Setting IN_CAPS on buffer of length %d", GST_BUFFER_SIZE (buf));
922     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_IN_CAPS);
923
924     g_value_init (&value, GST_TYPE_BUFFER);
925     copy = gst_buffer_copy (buf);
926     gst_value_set_buffer (&value, copy);
927     gst_buffer_unref (copy);
928     gst_value_array_append_value (&array, &value);
929     g_value_unset (&value);
930   }
931   gst_structure_set_value (structure, "streamheader", &array);
932   g_value_unset (&array);
933
934   return caps;
935 }
936
937 /*
938  * For each pad we need to write out one (small) header in one
939  * page that allows decoders to identify the type of the stream.
940  * After that we need to write out all extra info for the decoders.
941  * In the case of a codec that also needs data as configuration, we can
942  * find that info in the streamcaps. 
943  * After writing the headers we must start a new page for the data.
944  */
945 static GstFlowReturn
946 gst_ogg_mux_send_headers (GstOggMux * mux)
947 {
948   GSList *walk;
949   GList *hbufs, *hwalk;
950   GstCaps *caps;
951   GstFlowReturn ret;
952
953   hbufs = NULL;
954   ret = GST_FLOW_OK;
955
956   GST_LOG_OBJECT (mux, "collecting headers");
957
958   walk = mux->collect->data;
959   while (walk) {
960     GstOggPad *pad;
961     GstPad *thepad;
962
963     pad = (GstOggPad *) walk->data;
964     thepad = pad->collect.pad;
965
966     walk = g_slist_next (walk);
967
968     GST_LOG_OBJECT (mux, "looking at pad %s:%s", GST_DEBUG_PAD_NAME (thepad));
969
970     /* if the pad has no buffer, we don't care */
971     if (pad->buffer == NULL && pad->next_buffer == NULL)
972       continue;
973
974     /* now figure out the headers */
975     pad->headers = gst_ogg_mux_get_headers (pad);
976   }
977
978   GST_LOG_OBJECT (mux, "creating BOS pages");
979   walk = mux->collect->data;
980   while (walk) {
981     GstOggPad *pad;
982     GstBuffer *buf;
983     ogg_packet packet;
984     ogg_page page;
985     GstPad *thepad;
986     GstCaps *caps;
987     GstStructure *structure;
988     GstBuffer *hbuf;
989
990     pad = (GstOggPad *) walk->data;
991     thepad = pad->collect.pad;
992     caps = gst_pad_get_negotiated_caps (thepad);
993     structure = gst_caps_get_structure (caps, 0);
994
995     walk = walk->next;
996
997     pad->packetno = 0;
998
999     GST_LOG_OBJECT (thepad, "looping over headers");
1000
1001     if (pad->headers) {
1002       buf = GST_BUFFER (pad->headers->data);
1003       pad->headers = g_list_remove (pad->headers, buf);
1004     } else if (pad->buffer) {
1005       buf = pad->buffer;
1006       gst_buffer_ref (buf);
1007     } else if (pad->next_buffer) {
1008       buf = pad->next_buffer;
1009       gst_buffer_ref (buf);
1010     } else {
1011       /* fixme -- should be caught in the previous list traversal. */
1012       GST_OBJECT_LOCK (pad);
1013       g_critical ("No headers or buffers on pad %s:%s",
1014           GST_DEBUG_PAD_NAME (pad));
1015       GST_OBJECT_UNLOCK (pad);
1016       continue;
1017     }
1018
1019     /* create a packet from the buffer */
1020     packet.packet = GST_BUFFER_DATA (buf);
1021     packet.bytes = GST_BUFFER_SIZE (buf);
1022     packet.granulepos = GST_BUFFER_OFFSET_END (buf);
1023     if (packet.granulepos == -1)
1024       packet.granulepos = 0;
1025     /* mark BOS and packet number */
1026     packet.b_o_s = (pad->packetno == 0);
1027     packet.packetno = pad->packetno++;
1028     /* mark EOS */
1029     packet.e_o_s = 0;
1030
1031     /* swap the packet in */
1032     ogg_stream_packetin (&pad->stream, &packet);
1033     gst_buffer_unref (buf);
1034
1035     GST_LOG_OBJECT (thepad, "flushing out BOS page");
1036     if (!ogg_stream_flush (&pad->stream, &page))
1037       g_critical ("Could not flush BOS page");
1038
1039     hbuf = gst_ogg_mux_buffer_from_page (mux, &page, FALSE);
1040
1041     GST_LOG_OBJECT (mux, "swapped out page with mime type %s",
1042         gst_structure_get_name (structure));
1043
1044     /* quick hack: put Theora and Dirac video pages at the front.
1045      * Ideally, we would have a settable enum for which Ogg
1046      * profile we work with, and order based on that.
1047      * (FIXME: if there is more than one video stream, shouldn't we only put
1048      * one's BOS into the first page, followed by an audio stream's BOS, and
1049      * only then followed by the remaining video and audio streams?) */
1050     if (gst_structure_has_name (structure, "video/x-theora")) {
1051       GST_DEBUG_OBJECT (thepad, "putting %s page at the front", "Theora");
1052       hbufs = g_list_prepend (hbufs, hbuf);
1053     } else if (gst_structure_has_name (structure, "video/x-dirac")) {
1054       GST_DEBUG_OBJECT (thepad, "putting %s page at the front", "Dirac");
1055       hbufs = g_list_prepend (hbufs, hbuf);
1056       pad->always_flush_page = TRUE;
1057     } else {
1058       hbufs = g_list_append (hbufs, hbuf);
1059     }
1060     gst_caps_unref (caps);
1061   }
1062
1063   GST_LOG_OBJECT (mux, "creating next headers");
1064   walk = mux->collect->data;
1065   while (walk) {
1066     GstOggPad *pad;
1067     GstPad *thepad;
1068
1069     pad = (GstOggPad *) walk->data;
1070     thepad = pad->collect.pad;
1071
1072     walk = walk->next;
1073
1074     GST_LOG_OBJECT (mux, "looping over headers for pad %s:%s",
1075         GST_DEBUG_PAD_NAME (thepad));
1076
1077     hwalk = pad->headers;
1078     while (hwalk) {
1079       GstBuffer *buf = GST_BUFFER (hwalk->data);
1080       ogg_packet packet;
1081       ogg_page page;
1082
1083       hwalk = hwalk->next;
1084
1085       /* create a packet from the buffer */
1086       packet.packet = GST_BUFFER_DATA (buf);
1087       packet.bytes = GST_BUFFER_SIZE (buf);
1088       packet.granulepos = GST_BUFFER_OFFSET_END (buf);
1089       if (packet.granulepos == -1)
1090         packet.granulepos = 0;
1091       /* mark BOS and packet number */
1092       packet.b_o_s = (pad->packetno == 0);
1093       packet.packetno = pad->packetno++;
1094       /* mark EOS */
1095       packet.e_o_s = 0;
1096
1097       /* swap the packet in */
1098       ogg_stream_packetin (&pad->stream, &packet);
1099       gst_buffer_unref (buf);
1100
1101       /* if last header, flush page */
1102       if (hwalk == NULL) {
1103         GST_LOG_OBJECT (mux,
1104             "flushing page as packet %" G_GUINT64_FORMAT " is first or "
1105             "last packet", pad->packetno);
1106         while (ogg_stream_flush (&pad->stream, &page)) {
1107           GstBuffer *hbuf = gst_ogg_mux_buffer_from_page (mux, &page, FALSE);
1108
1109           GST_LOG_OBJECT (mux, "swapped out page");
1110           hbufs = g_list_append (hbufs, hbuf);
1111         }
1112       } else {
1113         GST_LOG_OBJECT (mux, "try to swap out page");
1114         /* just try to swap out a page then */
1115         while (ogg_stream_pageout (&pad->stream, &page) > 0) {
1116           GstBuffer *hbuf = gst_ogg_mux_buffer_from_page (mux, &page, FALSE);
1117
1118           GST_LOG_OBJECT (mux, "swapped out page");
1119           hbufs = g_list_append (hbufs, hbuf);
1120         }
1121       }
1122     }
1123     g_list_free (pad->headers);
1124     pad->headers = NULL;
1125   }
1126   /* hbufs holds all buffers for the headers now */
1127
1128   /* create caps with the buffers */
1129   caps = gst_pad_get_caps (mux->srcpad);
1130   if (caps) {
1131     caps = gst_ogg_mux_set_header_on_caps (caps, hbufs);
1132     gst_pad_set_caps (mux->srcpad, caps);
1133     gst_caps_unref (caps);
1134   }
1135   /* and send the buffers */
1136   while (hbufs != NULL) {
1137     GstBuffer *buf = GST_BUFFER (hbufs->data);
1138
1139     hbufs = g_list_delete_link (hbufs, hbufs);
1140
1141     if ((ret = gst_ogg_mux_push_buffer (mux, buf)) != GST_FLOW_OK)
1142       break;
1143   }
1144   /* free any remaining nodes/buffers in case we couldn't push them */
1145   g_list_foreach (hbufs, (GFunc) gst_mini_object_unref, NULL);
1146   g_list_free (hbufs);
1147
1148   return ret;
1149 }
1150
1151 /* this function is called to process data on the best pending pad.
1152  *
1153  * basic idea:
1154  *
1155  * 1) store the selected pad and keep on pulling until we fill a
1156  *    complete ogg page or the ogg page is filled above the max-delay
1157  *    threshold. This is needed because the ogg spec says that
1158  *    you should fill a complete page with data from the same logical
1159  *    stream. When the page is filled, go back to 1).
1160  * 2) before filling a page, read ahead one more buffer to see if this
1161  *    packet is the last of the stream. We need to do this because the ogg
1162  *    spec mandates that the last packet should have the EOS flag set before
1163  *    sending it to ogg. if pad->buffer is NULL we need to wait to find out
1164  *    whether there are any more buffers.
1165  * 3) pages get queued on a per-pad queue. Every time a page is queued, a
1166  *    dequeue is called, which will dequeue the oldest page on any pad, provided
1167  *    that ALL pads have at least one marked page in the queue (or remaining
1168  *    pads are at EOS)
1169  */
1170 static GstFlowReturn
1171 gst_ogg_mux_process_best_pad (GstOggMux * ogg_mux, GstOggPad * best)
1172 {
1173   GstFlowReturn ret = GST_FLOW_OK;
1174   gboolean delta_unit;
1175   gint64 granulepos = 0;
1176   GstClockTime timestamp, gp_time;
1177
1178   GST_LOG_OBJECT (ogg_mux, "best pad %" GST_PTR_FORMAT
1179       ", currently pulling from %" GST_PTR_FORMAT, best->collect.pad,
1180       ogg_mux->pulling);
1181
1182   /* best->buffer is non-NULL, either the pad is EOS's or there is a next 
1183    * buffer */
1184   if (best->next_buffer == NULL && !best->eos) {
1185     GST_WARNING_OBJECT (ogg_mux, "no subsequent buffer and EOS not reached");
1186     return GST_FLOW_WRONG_STATE;
1187   }
1188
1189   /* if we were already pulling from one pad, but the new "best" buffer is
1190    * from another pad, we need to check if we have reason to flush a page
1191    * for the pad we were pulling from before */
1192   if (ogg_mux->pulling && best &&
1193       ogg_mux->pulling != best && ogg_mux->pulling->buffer) {
1194     GstOggPad *pad = ogg_mux->pulling;
1195
1196     GstClockTime last_ts = GST_BUFFER_END_TIME (pad->buffer);
1197
1198     /* if the next packet in the current page is going to make the page
1199      * too long, we need to flush */
1200     if (last_ts > ogg_mux->next_ts + ogg_mux->max_delay) {
1201       ogg_page page;
1202
1203       GST_LOG_OBJECT (pad->collect.pad,
1204           GST_GP_FORMAT " stored packet %" G_GINT64_FORMAT
1205           " will make page too long, flushing",
1206           GST_BUFFER_OFFSET_END (pad->buffer), (gint64) pad->stream.packetno);
1207
1208       while (ogg_stream_flush (&pad->stream, &page)) {
1209         /* end time of this page is the timestamp of the next buffer */
1210         ogg_mux->pulling->timestamp_end = GST_BUFFER_TIMESTAMP (pad->buffer);
1211         /* Place page into the per-pad queue */
1212         ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
1213             pad->first_delta);
1214         /* increment the page number counter */
1215         pad->pageno++;
1216         /* mark other pages as delta */
1217         pad->first_delta = TRUE;
1218       }
1219       pad->new_page = TRUE;
1220       ogg_mux->pulling = NULL;
1221     }
1222   }
1223
1224   /* if we don't know which pad to pull on, use the best one */
1225   if (ogg_mux->pulling == NULL) {
1226     ogg_mux->pulling = best;
1227     GST_LOG_OBJECT (ogg_mux->pulling->collect.pad, "pulling from best pad");
1228
1229     /* remember timestamp and gp time of first buffer for this new pad */
1230     if (ogg_mux->pulling != NULL) {
1231       ogg_mux->next_ts = GST_BUFFER_TIMESTAMP (ogg_mux->pulling->buffer);
1232       GST_LOG_OBJECT (ogg_mux->pulling->collect.pad, "updated times, next ts %"
1233           GST_TIME_FORMAT, GST_TIME_ARGS (ogg_mux->next_ts));
1234     } else {
1235       /* no pad to pull on, send EOS */
1236       gst_pad_push_event (ogg_mux->srcpad, gst_event_new_eos ());
1237       return GST_FLOW_WRONG_STATE;
1238     }
1239   }
1240
1241   if (ogg_mux->need_headers) {
1242     ret = gst_ogg_mux_send_headers (ogg_mux);
1243     ogg_mux->need_headers = FALSE;
1244   }
1245
1246   /* we are pulling from a pad, continue to do so until a page
1247    * has been filled and queued */
1248   if (ogg_mux->pulling != NULL) {
1249     ogg_packet packet;
1250     ogg_page page;
1251     GstBuffer *buf, *tmpbuf;
1252     GstOggPad *pad = ogg_mux->pulling;
1253     gint64 duration;
1254     gboolean force_flush;
1255
1256     GST_LOG_OBJECT (ogg_mux->pulling->collect.pad, "pulling from pad");
1257
1258     /* now see if we have a buffer */
1259     buf = pad->buffer;
1260     if (buf == NULL) {
1261       GST_DEBUG_OBJECT (ogg_mux, "pad was EOS");
1262       ogg_mux->pulling = NULL;
1263       return GST_FLOW_OK;
1264     }
1265
1266     delta_unit = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
1267     duration = GST_BUFFER_DURATION (buf);
1268
1269     /* if the current "next timestamp" on the pad is unset, then this is the
1270      * first packet on the new page.  Update our pad's page timestamp */
1271     if (ogg_mux->pulling->timestamp == GST_CLOCK_TIME_NONE) {
1272       ogg_mux->pulling->timestamp = GST_BUFFER_TIMESTAMP (buf);
1273       GST_LOG_OBJECT (ogg_mux->pulling->collect.pad,
1274           "updated pad timestamp to %" GST_TIME_FORMAT,
1275           GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
1276     }
1277     /* create a packet from the buffer */
1278     packet.packet = GST_BUFFER_DATA (buf);
1279     packet.bytes = GST_BUFFER_SIZE (buf);
1280     packet.granulepos = GST_BUFFER_OFFSET_END (buf);
1281     if (packet.granulepos == -1)
1282       packet.granulepos = 0;
1283     /* mark BOS and packet number */
1284     packet.b_o_s = (pad->packetno == 0);
1285     packet.packetno = pad->packetno++;
1286     GST_LOG_OBJECT (pad->collect.pad, GST_GP_FORMAT
1287         " packet %" G_GINT64_FORMAT " (%ld bytes) created from buffer",
1288         GST_GP_CAST (packet.granulepos), (gint64) packet.packetno,
1289         packet.bytes);
1290
1291     packet.e_o_s = (pad->eos ? 1 : 0);
1292     tmpbuf = NULL;
1293
1294     /* we flush when we see a new keyframe */
1295     force_flush = (pad->prev_delta && !delta_unit) || pad->always_flush_page;
1296     if (duration != -1) {
1297       pad->duration += duration;
1298       /* if page duration exceeds max, flush page */
1299       if (pad->duration > ogg_mux->max_page_delay) {
1300         force_flush = TRUE;
1301         pad->duration = 0;
1302       }
1303     }
1304
1305     if (GST_BUFFER_IS_DISCONT (buf)) {
1306       GST_LOG_OBJECT (pad->collect.pad, "got discont");
1307       packet.packetno++;
1308       /* No public API for this; hack things in */
1309       pad->stream.pageno++;
1310       force_flush = TRUE;
1311     }
1312
1313     /* flush the currently built page if necessary */
1314     if (force_flush) {
1315       GST_LOG_OBJECT (pad->collect.pad,
1316           GST_GP_FORMAT " forced flush of page before this packet",
1317           GST_BUFFER_OFFSET_END (pad->buffer));
1318       while (ogg_stream_flush (&pad->stream, &page)) {
1319         /* end time of this page is the timestamp of the next buffer */
1320         ogg_mux->pulling->timestamp_end = GST_BUFFER_TIMESTAMP (pad->buffer);
1321         ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
1322             pad->first_delta);
1323
1324         /* increment the page number counter */
1325         pad->pageno++;
1326         /* mark other pages as delta */
1327         pad->first_delta = TRUE;
1328       }
1329       pad->new_page = TRUE;
1330     }
1331
1332     /* if this is the first packet of a new page figure out the delta flag */
1333     if (pad->new_page) {
1334       if (delta_unit) {
1335         /* mark the page as delta */
1336         pad->first_delta = TRUE;
1337       } else {
1338         /* got a keyframe */
1339         if (ogg_mux->delta_pad == pad) {
1340           /* if we get it on the pad with deltaunits,
1341            * we mark the page as non delta */
1342           pad->first_delta = FALSE;
1343         } else if (ogg_mux->delta_pad != NULL) {
1344           /* if there are pads with delta frames, we
1345            * must mark this one as delta */
1346           pad->first_delta = TRUE;
1347         } else {
1348           pad->first_delta = FALSE;
1349         }
1350       }
1351       pad->new_page = FALSE;
1352     }
1353
1354     /* save key unit to track delta->key unit transitions */
1355     pad->prev_delta = delta_unit;
1356
1357     /* swap the packet in */
1358     if (packet.e_o_s == 1)
1359       GST_DEBUG_OBJECT (pad->collect.pad, "swapping in EOS packet");
1360     if (packet.b_o_s == 1)
1361       GST_DEBUG_OBJECT (pad->collect.pad, "swapping in BOS packet");
1362
1363     ogg_stream_packetin (&pad->stream, &packet);
1364
1365     gp_time = GST_BUFFER_OFFSET (pad->buffer);
1366     granulepos = GST_BUFFER_OFFSET_END (pad->buffer);
1367     timestamp = GST_BUFFER_TIMESTAMP (pad->buffer);
1368
1369     GST_LOG_OBJECT (pad->collect.pad,
1370         GST_GP_FORMAT " packet %" G_GINT64_FORMAT ", gp time %"
1371         GST_TIME_FORMAT ", timestamp %" GST_TIME_FORMAT " packetin'd",
1372         granulepos, (gint64) packet.packetno, GST_TIME_ARGS (gp_time),
1373         GST_TIME_ARGS (timestamp));
1374     /* don't need the old buffer anymore */
1375     gst_buffer_unref (pad->buffer);
1376     /* store new readahead buffer */
1377     pad->buffer = tmpbuf;
1378
1379     /* let ogg write out the pages now. The packet we got could end
1380      * up in more than one page so we need to write them all */
1381     if (ogg_stream_pageout (&pad->stream, &page) > 0) {
1382       /* we have a new page, so we need to timestamp it correctly.
1383        * if this fresh packet ends on this page, then the page's granulepos
1384        * comes from that packet, and we should set this buffer's timestamp */
1385
1386       GST_LOG_OBJECT (pad->collect.pad,
1387           GST_GP_FORMAT " packet %" G_GINT64_FORMAT ", time %"
1388           GST_TIME_FORMAT ") caused new page",
1389           granulepos, (gint64) packet.packetno, GST_TIME_ARGS (timestamp));
1390       GST_LOG_OBJECT (pad->collect.pad,
1391           GST_GP_FORMAT " new page %ld",
1392           GST_GP_CAST (ogg_page_granulepos (&page)), pad->stream.pageno);
1393
1394       if (ogg_page_granulepos (&page) == granulepos) {
1395         /* the packet we streamed in finishes on the current page,
1396          * because the page's granulepos is the granulepos of the last
1397          * packet completed on that page,
1398          * so update the timestamp that we will give to the page */
1399         GST_LOG_OBJECT (pad->collect.pad,
1400             GST_GP_FORMAT
1401             " packet finishes on current page, updating gp time to %"
1402             GST_TIME_FORMAT, granulepos, GST_TIME_ARGS (gp_time));
1403         pad->gp_time = gp_time;
1404       } else {
1405         GST_LOG_OBJECT (pad->collect.pad,
1406             GST_GP_FORMAT
1407             " packet spans beyond current page, keeping old gp time %"
1408             GST_TIME_FORMAT, granulepos, GST_TIME_ARGS (pad->gp_time));
1409       }
1410
1411       /* push the page */
1412       /* end time of this page is the timestamp of the next buffer */
1413       pad->timestamp_end = timestamp;
1414       ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page, pad->first_delta);
1415       pad->pageno++;
1416       /* mark next pages as delta */
1417       pad->first_delta = TRUE;
1418
1419       /* use an inner loop here to flush the remaining pages and
1420        * mark them as delta frames as well */
1421       while (ogg_stream_pageout (&pad->stream, &page) > 0) {
1422         if (ogg_page_granulepos (&page) == granulepos) {
1423           /* the page has taken up the new packet completely, which means
1424            * the packet ends the page and we can update the gp time
1425            * before pushing out */
1426           pad->gp_time = gp_time;
1427         }
1428
1429         /* we have a complete page now, we can push the page
1430          * and make sure to pull on a new pad the next time around */
1431         ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
1432             pad->first_delta);
1433         /* increment the page number counter */
1434         pad->pageno++;
1435       }
1436       /* need a new page as well */
1437       pad->new_page = TRUE;
1438       pad->duration = 0;
1439       /* we're done pulling on this pad, make sure to choose a new
1440        * pad for pulling in the next iteration */
1441       ogg_mux->pulling = NULL;
1442     }
1443
1444     /* Update the gp time, if necessary, since any future page will have at
1445      * least this gp time.
1446      */
1447     if (pad->gp_time < gp_time) {
1448       pad->gp_time = gp_time;
1449       GST_LOG_OBJECT (pad->collect.pad,
1450           "Updated running gp time of pad %" GST_PTR_FORMAT
1451           " to %" GST_TIME_FORMAT, pad->collect.pad, GST_TIME_ARGS (gp_time));
1452     }
1453   }
1454
1455   return ret;
1456 }
1457
1458 /* all_pads_eos:
1459  *
1460  * Checks if all pads are EOS'd by peeking.
1461  *
1462  * Returns TRUE if all pads are EOS.
1463  */
1464 static gboolean
1465 all_pads_eos (GstCollectPads * pads)
1466 {
1467   GSList *walk;
1468   gboolean alleos = TRUE;
1469
1470   walk = pads->data;
1471   while (walk) {
1472     GstBuffer *buf;
1473     GstCollectData *data = (GstCollectData *) walk->data;
1474
1475     buf = gst_collect_pads_peek (pads, data);
1476     if (buf) {
1477       alleos = FALSE;
1478       gst_buffer_unref (buf);
1479       goto beach;
1480     }
1481     walk = walk->next;
1482   }
1483 beach:
1484   return alleos;
1485 }
1486
1487 /* This function is called when there is data on all pads.
1488  * 
1489  * It finds a pad to pull on, this is done by looking at the buffers
1490  * to decide which one to use, and using the 'oldest' one first. It then calls
1491  * gst_ogg_mux_process_best_pad() to process as much data as possible.
1492  * 
1493  * If all the pads have received EOS, it flushes out all data by continually
1494  * getting the best pad and calling gst_ogg_mux_process_best_pad() until they
1495  * are all empty, and then sends EOS.
1496  */
1497 static GstFlowReturn
1498 gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux)
1499 {
1500   GstOggPad *best;
1501   GstFlowReturn ret;
1502   gint activebefore;
1503
1504   GST_LOG_OBJECT (ogg_mux, "collected");
1505
1506   activebefore = ogg_mux->active_pads;
1507
1508   /* queue buffers on all pads; find a buffer with the lowest timestamp */
1509   best = gst_ogg_mux_queue_pads (ogg_mux);
1510   if (best && !best->buffer) {
1511     GST_DEBUG_OBJECT (ogg_mux, "No buffer available on best pad");
1512     return GST_FLOW_OK;
1513   }
1514
1515   if (!best) {
1516     return GST_FLOW_WRONG_STATE;
1517   }
1518
1519   ret = gst_ogg_mux_process_best_pad (ogg_mux, best);
1520
1521   if (ogg_mux->active_pads < activebefore) {
1522     /* If the active pad count went down, this mean at least one pad has gone
1523      * EOS. Since CollectPads only calls _collected() once when all pads are
1524      * EOS, and our code doesn't _pop() from all pads we need to check that by
1525      * peeking on all pads, else we won't be called again and the muxing will
1526      * not terminate (push out EOS). */
1527
1528     /* if all the pads have been removed, flush all pending data */
1529     if ((ret == GST_FLOW_OK) && all_pads_eos (pads)) {
1530       GST_LOG_OBJECT (ogg_mux, "no pads remaining, flushing data");
1531
1532       do {
1533         best = gst_ogg_mux_queue_pads (ogg_mux);
1534         if (best)
1535           ret = gst_ogg_mux_process_best_pad (ogg_mux, best);
1536       } while ((ret == GST_FLOW_OK) && (best != NULL));
1537
1538       GST_DEBUG_OBJECT (ogg_mux, "Pushing EOS");
1539       gst_pad_push_event (ogg_mux->srcpad, gst_event_new_eos ());
1540     }
1541   }
1542
1543   return ret;
1544 }
1545
1546 static void
1547 gst_ogg_mux_get_property (GObject * object,
1548     guint prop_id, GValue * value, GParamSpec * pspec)
1549 {
1550   GstOggMux *ogg_mux;
1551
1552   ogg_mux = GST_OGG_MUX (object);
1553
1554   switch (prop_id) {
1555     case ARG_MAX_DELAY:
1556       g_value_set_uint64 (value, ogg_mux->max_delay);
1557       break;
1558     case ARG_MAX_PAGE_DELAY:
1559       g_value_set_uint64 (value, ogg_mux->max_page_delay);
1560       break;
1561     default:
1562       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1563       break;
1564   }
1565 }
1566
1567 static void
1568 gst_ogg_mux_set_property (GObject * object,
1569     guint prop_id, const GValue * value, GParamSpec * pspec)
1570 {
1571   GstOggMux *ogg_mux;
1572
1573   ogg_mux = GST_OGG_MUX (object);
1574
1575   switch (prop_id) {
1576     case ARG_MAX_DELAY:
1577       ogg_mux->max_delay = g_value_get_uint64 (value);
1578       break;
1579     case ARG_MAX_PAGE_DELAY:
1580       ogg_mux->max_page_delay = g_value_get_uint64 (value);
1581       break;
1582     default:
1583       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1584       break;
1585   }
1586 }
1587
1588 /* reset all variables in the ogg pads. */
1589 static void
1590 gst_ogg_mux_init_collectpads (GstCollectPads * collect)
1591 {
1592   GSList *walk;
1593
1594   walk = collect->data;
1595   while (walk) {
1596     GstOggPad *oggpad = (GstOggPad *) walk->data;
1597
1598     ogg_stream_init (&oggpad->stream, oggpad->serial);
1599     oggpad->packetno = 0;
1600     oggpad->pageno = 0;
1601     oggpad->eos = FALSE;
1602     /* we assume there will be some control data first for this pad */
1603     oggpad->state = GST_OGG_PAD_STATE_CONTROL;
1604     oggpad->new_page = TRUE;
1605     oggpad->first_delta = FALSE;
1606     oggpad->prev_delta = FALSE;
1607     oggpad->pagebuffers = g_queue_new ();
1608
1609     walk = g_slist_next (walk);
1610   }
1611 }
1612
1613 /* Clear all buffers from the collectpads object */
1614 static void
1615 gst_ogg_mux_clear_collectpads (GstCollectPads * collect)
1616 {
1617   GSList *walk;
1618
1619   for (walk = collect->data; walk; walk = g_slist_next (walk)) {
1620     GstOggPad *oggpad = (GstOggPad *) walk->data;
1621     GstBuffer *buf;
1622
1623     ogg_stream_clear (&oggpad->stream);
1624
1625     while ((buf = g_queue_pop_head (oggpad->pagebuffers)) != NULL) {
1626       gst_buffer_unref (buf);
1627     }
1628     g_queue_free (oggpad->pagebuffers);
1629     oggpad->pagebuffers = NULL;
1630
1631     if (oggpad->buffer) {
1632       gst_buffer_unref (oggpad->buffer);
1633       oggpad->buffer = NULL;
1634     }
1635     if (oggpad->next_buffer) {
1636       gst_buffer_unref (oggpad->next_buffer);
1637       oggpad->next_buffer = NULL;
1638     }
1639   }
1640 }
1641
1642 static GstStateChangeReturn
1643 gst_ogg_mux_change_state (GstElement * element, GstStateChange transition)
1644 {
1645   GstOggMux *ogg_mux;
1646   GstStateChangeReturn ret;
1647
1648   ogg_mux = GST_OGG_MUX (element);
1649
1650   switch (transition) {
1651     case GST_STATE_CHANGE_NULL_TO_READY:
1652       break;
1653     case GST_STATE_CHANGE_READY_TO_PAUSED:
1654       gst_ogg_mux_clear (ogg_mux);
1655       gst_ogg_mux_init_collectpads (ogg_mux->collect);
1656       gst_collect_pads_start (ogg_mux->collect);
1657       break;
1658     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1659       break;
1660     case GST_STATE_CHANGE_PAUSED_TO_READY:
1661       gst_collect_pads_stop (ogg_mux->collect);
1662       break;
1663     default:
1664       break;
1665   }
1666
1667   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1668
1669   switch (transition) {
1670     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1671       break;
1672     case GST_STATE_CHANGE_PAUSED_TO_READY:
1673       gst_ogg_mux_clear_collectpads (ogg_mux->collect);
1674       break;
1675     case GST_STATE_CHANGE_READY_TO_NULL:
1676       break;
1677     default:
1678       break;
1679   }
1680
1681   return ret;
1682 }
1683
1684 gboolean
1685 gst_ogg_mux_plugin_init (GstPlugin * plugin)
1686 {
1687   GST_DEBUG_CATEGORY_INIT (gst_ogg_mux_debug, "oggmux", 0, "ogg muxer");
1688
1689   return gst_element_register (plugin, "oggmux", GST_RANK_PRIMARY,
1690       GST_TYPE_OGG_MUX);
1691 }