debug changes
[platform/upstream/gstreamer.git] / ext / ogg / gstoggmux.c
1 /* OGG muxer plugin for GStreamer
2  * Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
3  * Copyright (C) 2006 Thomas Vander Stichele <thomas at apestaart dot org>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include <gst/gst.h>
26 #include <gst/base/gstcollectpads.h>
27
28 #include <ogg/ogg.h>
29 /* memcpy - if someone knows a way to get rid of it, please speak up
30  * note: the ogg docs even say you need this... */
31 #include <string.h>
32 #include <time.h>
33 #include <stdlib.h>             /* rand, srand, atoi */
34
35 GST_DEBUG_CATEGORY_STATIC (gst_ogg_mux_debug);
36 #define GST_CAT_DEFAULT gst_ogg_mux_debug
37
38 #define GST_TYPE_OGG_MUX (gst_ogg_mux_get_type())
39 #define GST_OGG_MUX(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_OGG_MUX, GstOggMux))
40 #define GST_OGG_MUX_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_OGG_MUX, GstOggMux))
41 #define GST_IS_OGG_MUX(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_OGG_MUX))
42 #define GST_IS_OGG_MUX_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_OGG_MUX))
43
44 /* This isn't generally what you'd want with an end-time macro, because
45    technically the end time of a buffer with invalid duration is invalid. But
46    for sorting ogg pages this is what we want. */
47 #define GST_BUFFER_END_TIME(buf) \
48     (GST_BUFFER_DURATION_IS_VALID (buf) \
49     ? GST_BUFFER_TIMESTAMP (buf) + GST_BUFFER_DURATION (buf) \
50     : GST_BUFFER_TIMESTAMP (buf))
51
52 #define GST_GP_FORMAT "[gp %8" G_GINT64_FORMAT "]"
53
54 typedef struct _GstOggMux GstOggMux;
55 typedef struct _GstOggMuxClass GstOggMuxClass;
56
57 typedef enum
58 {
59   GST_OGG_PAD_STATE_CONTROL = 0,
60   GST_OGG_PAD_STATE_DATA = 1
61 }
62 GstOggPadState;
63
64 /* all information needed for one ogg stream */
65 typedef struct
66 {
67   GstCollectData collect;       /* we extend the CollectData */
68
69   /* These two buffers make a very simple queue - they enter as 'next_buffer'
70    * and (usually) leave as 'buffer', except at EOS, when buffer will be NULL */
71   GstBuffer *buffer;            /* the first waiting buffer for the pad */
72   GstBuffer *next_buffer;       /* the second waiting buffer for the pad */
73
74   gint serial;
75   ogg_stream_state stream;
76   gint64 packetno;              /* number of next packet */
77   gint64 pageno;                /* number of next page */
78   guint64 duration;             /* duration of current page */
79   gboolean eos;
80   gint64 offset;
81   GstClockTime timestamp;       /* timestamp of the first packet on the next
82                                  * page to be dequeued */
83   GstClockTime timestamp_end;   /* end timestamp of last complete packet on
84                                    the next page to be dequeued */
85   GstClockTime gp_time;         /* time corresponding to the gp value of the
86                                    last complete packet on the next page to be
87                                    dequeued */
88
89   GstOggPadState state;         /* state of the pad */
90
91   GList *headers;
92
93   GQueue *pagebuffers;          /* List of pages in buffers ready for pushing */
94
95   gboolean new_page;            /* starting a new page */
96   gboolean first_delta;         /* was the first packet in the page a delta */
97   gboolean prev_delta;          /* was the previous buffer a delta frame */
98 }
99 GstOggPad;
100
101 struct _GstOggMux
102 {
103   GstElement element;
104
105   /* source pad */
106   GstPad *srcpad;
107
108   /* sinkpads */
109   GstCollectPads *collect;
110
111   /* number of pads which have not received EOS */
112   gint active_pads;
113
114   /* the pad we are currently using to fill a page */
115   GstOggPad *pulling;
116
117   /* next timestamp for the page */
118   GstClockTime next_ts;
119
120   /* Last timestamp actually output on src pad */
121   GstClockTime last_ts;
122
123   /* offset in stream */
124   guint64 offset;
125
126   /* need_headers */
127   gboolean need_headers;
128
129   guint64 max_delay;
130   guint64 max_page_delay;
131
132   GstOggPad *delta_pad;         /* when a delta frame is detected on a stream, we mark
133                                    pages as delta frames up to the page that has the
134                                    keyframe */
135
136 };
137
138 typedef enum
139 {
140   GST_OGG_FLAG_BOS = GST_ELEMENT_FLAG_LAST,
141   GST_OGG_FLAG_EOS
142 }
143 GstOggFlag;
144
145 struct _GstOggMuxClass
146 {
147   GstElementClass parent_class;
148 };
149
150 /* elementfactory information */
151 static const GstElementDetails gst_ogg_mux_details =
152 GST_ELEMENT_DETAILS ("Ogg muxer",
153     "Codec/Muxer",
154     "mux ogg streams (info about ogg: http://xiph.org)",
155     "Wim Taymans <wim@fluendo.com>");
156
157 /* OggMux signals and args */
158 enum
159 {
160   /* FILL ME */
161   LAST_SIGNAL
162 };
163
164 /* set to 0.5 seconds by default */
165 #define DEFAULT_MAX_DELAY       G_GINT64_CONSTANT(500000000)
166 #define DEFAULT_MAX_PAGE_DELAY  G_GINT64_CONSTANT(500000000)
167 enum
168 {
169   ARG_0,
170   ARG_MAX_DELAY,
171   ARG_MAX_PAGE_DELAY,
172 };
173
174 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
175     GST_PAD_SRC,
176     GST_PAD_ALWAYS,
177     GST_STATIC_CAPS ("application/ogg")
178     );
179
180 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink_%d",
181     GST_PAD_SINK,
182     GST_PAD_REQUEST,
183     GST_STATIC_CAPS ("video/x-theora; "
184         "audio/x-vorbis; audio/x-flac; audio/x-speex; "
185         "application/x-ogm-video; application/x-ogm-audio; video/x-dirac; "
186         "video/x-smoke; text/x-cmml, encoded = (boolean) TRUE")
187     );
188
189 static void gst_ogg_mux_base_init (gpointer g_class);
190 static void gst_ogg_mux_class_init (GstOggMuxClass * klass);
191 static void gst_ogg_mux_init (GstOggMux * ogg_mux);
192 static void gst_ogg_mux_finalize (GObject * object);
193
194 static GstFlowReturn
195 gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux);
196 static gboolean gst_ogg_mux_handle_src_event (GstPad * pad, GstEvent * event);
197 static GstPad *gst_ogg_mux_request_new_pad (GstElement * element,
198     GstPadTemplate * templ, const gchar * name);
199 static void gst_ogg_mux_release_pad (GstElement * element, GstPad * pad);
200
201 static void gst_ogg_mux_set_property (GObject * object,
202     guint prop_id, const GValue * value, GParamSpec * pspec);
203 static void gst_ogg_mux_get_property (GObject * object,
204     guint prop_id, GValue * value, GParamSpec * pspec);
205 static GstStateChangeReturn gst_ogg_mux_change_state (GstElement * element,
206     GstStateChange transition);
207
208 static GstElementClass *parent_class = NULL;
209
210 /*static guint gst_ogg_mux_signals[LAST_SIGNAL] = { 0 }; */
211
212 GType
213 gst_ogg_mux_get_type (void)
214 {
215   static GType ogg_mux_type = 0;
216
217   if (G_UNLIKELY (ogg_mux_type == 0)) {
218     static const GTypeInfo ogg_mux_info = {
219       sizeof (GstOggMuxClass),
220       gst_ogg_mux_base_init,
221       NULL,
222       (GClassInitFunc) gst_ogg_mux_class_init,
223       NULL,
224       NULL,
225       sizeof (GstOggMux),
226       0,
227       (GInstanceInitFunc) gst_ogg_mux_init,
228     };
229
230     ogg_mux_type =
231         g_type_register_static (GST_TYPE_ELEMENT, "GstOggMux", &ogg_mux_info,
232         0);
233   }
234   return ogg_mux_type;
235 }
236
237 static void
238 gst_ogg_mux_base_init (gpointer g_class)
239 {
240   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
241
242   gst_element_class_add_pad_template (element_class,
243       gst_static_pad_template_get (&src_factory));
244   gst_element_class_add_pad_template (element_class,
245       gst_static_pad_template_get (&sink_factory));
246
247   gst_element_class_set_details (element_class, &gst_ogg_mux_details);
248 }
249
250 static void
251 gst_ogg_mux_class_init (GstOggMuxClass * klass)
252 {
253   GObjectClass *gobject_class;
254   GstElementClass *gstelement_class;
255
256   gobject_class = (GObjectClass *) klass;
257   gstelement_class = (GstElementClass *) klass;
258
259   parent_class = g_type_class_peek_parent (klass);
260
261   gobject_class->finalize = gst_ogg_mux_finalize;
262   gobject_class->get_property = gst_ogg_mux_get_property;
263   gobject_class->set_property = gst_ogg_mux_set_property;
264
265   gstelement_class->request_new_pad = gst_ogg_mux_request_new_pad;
266   gstelement_class->release_pad = gst_ogg_mux_release_pad;
267
268   g_object_class_install_property (gobject_class, ARG_MAX_DELAY,
269       g_param_spec_uint64 ("max-delay", "Max delay",
270           "Maximum delay in multiplexing streams", 0, G_MAXUINT64,
271           DEFAULT_MAX_DELAY, (GParamFlags) G_PARAM_READWRITE));
272   g_object_class_install_property (gobject_class, ARG_MAX_PAGE_DELAY,
273       g_param_spec_uint64 ("max-page-delay", "Max page delay",
274           "Maximum delay for sending out a page", 0, G_MAXUINT64,
275           DEFAULT_MAX_PAGE_DELAY, (GParamFlags) G_PARAM_READWRITE));
276
277   gstelement_class->change_state = gst_ogg_mux_change_state;
278
279 }
280
281 #if 0
282 static const GstEventMask *
283 gst_ogg_mux_get_sink_event_masks (GstPad * pad)
284 {
285   static const GstEventMask gst_ogg_mux_sink_event_masks[] = {
286     {GST_EVENT_EOS, 0},
287     {GST_EVENT_DISCONTINUOUS, 0},
288     {0,}
289   };
290
291   return gst_ogg_mux_sink_event_masks;
292 }
293 #endif
294
295 static void
296 gst_ogg_mux_clear (GstOggMux * ogg_mux)
297 {
298   ogg_mux->pulling = NULL;
299   ogg_mux->need_headers = TRUE;
300   ogg_mux->max_delay = DEFAULT_MAX_DELAY;
301   ogg_mux->max_page_delay = DEFAULT_MAX_PAGE_DELAY;
302   ogg_mux->delta_pad = NULL;
303   ogg_mux->offset = 0;
304   ogg_mux->next_ts = 0;
305   ogg_mux->last_ts = GST_CLOCK_TIME_NONE;
306 }
307
308 static void
309 gst_ogg_mux_init (GstOggMux * ogg_mux)
310 {
311   GstElementClass *klass = GST_ELEMENT_GET_CLASS (ogg_mux);
312
313   ogg_mux->srcpad =
314       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
315           "src"), "src");
316   gst_pad_set_event_function (ogg_mux->srcpad, gst_ogg_mux_handle_src_event);
317   gst_element_add_pad (GST_ELEMENT (ogg_mux), ogg_mux->srcpad);
318
319   GST_OBJECT_FLAG_SET (GST_ELEMENT (ogg_mux), GST_OGG_FLAG_BOS);
320
321   /* seed random number generator for creation of serial numbers */
322   srand (time (NULL));
323
324   ogg_mux->collect = gst_collect_pads_new ();
325   gst_collect_pads_set_function (ogg_mux->collect,
326       (GstCollectPadsFunction) GST_DEBUG_FUNCPTR (gst_ogg_mux_collected),
327       ogg_mux);
328
329   gst_ogg_mux_clear (ogg_mux);
330 }
331
332 static void
333 gst_ogg_mux_finalize (GObject * object)
334 {
335   GstOggMux *ogg_mux;
336
337   ogg_mux = GST_OGG_MUX (object);
338
339   if (ogg_mux->collect) {
340     gst_object_unref (ogg_mux->collect);
341     ogg_mux->collect = NULL;
342   }
343
344   G_OBJECT_CLASS (parent_class)->finalize (object);
345 }
346
347 static void
348 gst_ogg_mux_ogg_pad_destroy_notify (GstCollectData * data)
349 {
350   GstOggPad *oggpad = (GstOggPad *) data;
351   GstBuffer *buf;
352
353   ogg_stream_clear (&oggpad->stream);
354
355   if (oggpad->pagebuffers) {
356     while ((buf = g_queue_pop_head (oggpad->pagebuffers)) != NULL) {
357       gst_buffer_unref (buf);
358     }
359     g_queue_free (oggpad->pagebuffers);
360     oggpad->pagebuffers = NULL;
361   }
362 }
363
364 static GstPadLinkReturn
365 gst_ogg_mux_sinkconnect (GstPad * pad, GstPad * peer)
366 {
367   GstOggMux *ogg_mux;
368
369   ogg_mux = GST_OGG_MUX (gst_pad_get_parent (pad));
370
371   GST_DEBUG_OBJECT (ogg_mux, "sinkconnect triggered on %s", GST_PAD_NAME (pad));
372
373   gst_object_unref (ogg_mux);
374
375   return GST_PAD_LINK_OK;
376 }
377
378 static GstPad *
379 gst_ogg_mux_request_new_pad (GstElement * element,
380     GstPadTemplate * templ, const gchar * req_name)
381 {
382   GstOggMux *ogg_mux;
383   GstPad *newpad;
384   GstElementClass *klass;
385
386   g_return_val_if_fail (templ != NULL, NULL);
387
388   if (templ->direction != GST_PAD_SINK)
389     goto wrong_direction;
390
391   g_return_val_if_fail (GST_IS_OGG_MUX (element), NULL);
392   ogg_mux = GST_OGG_MUX (element);
393
394   klass = GST_ELEMENT_GET_CLASS (element);
395
396   if (templ != gst_element_class_get_pad_template (klass, "sink_%d"))
397     goto wrong_template;
398
399   {
400     gint serial;
401     gchar *name;
402
403     if (req_name == NULL || strlen (req_name) < 6) {
404       /* no name given when requesting the pad, use random serial number */
405       serial = rand ();
406     } else {
407       /* parse serial number from requested padname */
408       serial = atoi (&req_name[5]);
409     }
410     /* create new pad with the name */
411     GST_DEBUG_OBJECT (ogg_mux, "Creating new pad for serial %d", serial);
412     name = g_strdup_printf ("sink_%d", serial);
413     newpad = gst_pad_new_from_template (templ, name);
414     g_free (name);
415
416     /* construct our own wrapper data structure for the pad to
417      * keep track of its status */
418     {
419       GstOggPad *oggpad;
420
421       oggpad = (GstOggPad *)
422           gst_collect_pads_add_pad_full (ogg_mux->collect, newpad,
423           sizeof (GstOggPad), gst_ogg_mux_ogg_pad_destroy_notify);
424       ogg_mux->active_pads++;
425
426       oggpad->serial = serial;
427       ogg_stream_init (&oggpad->stream, serial);
428       oggpad->packetno = 0;
429       oggpad->pageno = 0;
430       oggpad->eos = FALSE;
431       /* we assume there will be some control data first for this pad */
432       oggpad->state = GST_OGG_PAD_STATE_CONTROL;
433       oggpad->new_page = TRUE;
434       oggpad->first_delta = FALSE;
435       oggpad->prev_delta = FALSE;
436       oggpad->pagebuffers = g_queue_new ();
437     }
438   }
439
440   /* setup some pad functions */
441   gst_pad_set_link_function (newpad, gst_ogg_mux_sinkconnect);
442   /* dd the pad to the element */
443   gst_element_add_pad (element, newpad);
444
445   return newpad;
446
447   /* ERRORS */
448 wrong_direction:
449   {
450     g_warning ("ogg_mux: request pad that is not a SINK pad\n");
451     return NULL;
452   }
453 wrong_template:
454   {
455     g_warning ("ogg_mux: this is not our template!\n");
456     return NULL;
457   }
458 }
459
460 static void
461 gst_ogg_mux_release_pad (GstElement * element, GstPad * pad)
462 {
463   GstOggMux *ogg_mux;
464
465   ogg_mux = GST_OGG_MUX (gst_pad_get_parent (pad));
466
467   gst_collect_pads_remove_pad (ogg_mux->collect, pad);
468   gst_element_remove_pad (element, pad);
469 }
470
471 /* handle events */
472 static gboolean
473 gst_ogg_mux_handle_src_event (GstPad * pad, GstEvent * event)
474 {
475   GstEventType type;
476
477   type = event ? GST_EVENT_TYPE (event) : GST_EVENT_UNKNOWN;
478
479   switch (type) {
480     case GST_EVENT_SEEK:
481       /* disable seeking for now */
482       return FALSE;
483     default:
484       break;
485   }
486
487   return gst_pad_event_default (pad, event);
488 }
489
490 static GstBuffer *
491 gst_ogg_mux_buffer_from_page (GstOggMux * mux, ogg_page * page, gboolean delta)
492 {
493   GstBuffer *buffer;
494
495   /* allocate space for header and body */
496   buffer = gst_buffer_new_and_alloc (page->header_len + page->body_len);
497   memcpy (GST_BUFFER_DATA (buffer), page->header, page->header_len);
498   memcpy (GST_BUFFER_DATA (buffer) + page->header_len,
499       page->body, page->body_len);
500
501   /* Here we set granulepos as our OFFSET_END to give easy direct access to
502    * this value later. Before we push it, we reset this to OFFSET + SIZE
503    * (see gst_ogg_mux_push_buffer). */
504   GST_BUFFER_OFFSET_END (buffer) = ogg_page_granulepos (page);
505   if (delta)
506     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
507
508   GST_LOG_OBJECT (mux, GST_GP_FORMAT
509       " created buffer %p from ogg page", ogg_page_granulepos (page), buffer);
510
511   return buffer;
512 }
513
514 static GstFlowReturn
515 gst_ogg_mux_push_buffer (GstOggMux * mux, GstBuffer * buffer)
516 {
517   GstCaps *caps;
518
519   /* fix up OFFSET and OFFSET_END again */
520   GST_BUFFER_OFFSET (buffer) = mux->offset;
521   mux->offset += GST_BUFFER_SIZE (buffer);
522   GST_BUFFER_OFFSET_END (buffer) = mux->offset;
523
524   /* Ensure we have monotonically increasing timestamps in the output. */
525   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)) {
526     if (mux->last_ts != GST_CLOCK_TIME_NONE &&
527         GST_BUFFER_TIMESTAMP (buffer) < mux->last_ts)
528       GST_BUFFER_TIMESTAMP (buffer) = mux->last_ts;
529     else
530       mux->last_ts = GST_BUFFER_TIMESTAMP (buffer);
531   }
532
533   caps = gst_pad_get_negotiated_caps (mux->srcpad);
534   gst_buffer_set_caps (buffer, caps);
535   gst_caps_unref (caps);
536
537   return gst_pad_push (mux->srcpad, buffer);
538 }
539
540 /* if all queues have at least one page, dequeue the page with the lowest
541  * timestamp */
542 static gboolean
543 gst_ogg_mux_dequeue_page (GstOggMux * mux, GstFlowReturn * flowret)
544 {
545   GSList *walk;
546   GstOggPad *opad = NULL;       /* "oldest" pad */
547   GstClockTime oldest = GST_CLOCK_TIME_NONE;
548   GstBuffer *buf = NULL;
549   gboolean ret = FALSE;
550
551   *flowret = GST_FLOW_OK;
552
553   walk = mux->collect->data;
554   while (walk) {
555     GstOggPad *pad = (GstOggPad *) walk->data;
556
557     /* We need each queue to either be at EOS, or have one or more pages
558      * available with a set granulepos (i.e. not -1), otherwise we don't have
559      * enough data yet to determine which stream needs to go next for correct
560      * time ordering. */
561     if (pad->pagebuffers->length == 0) {
562       if (pad->eos) {
563         GST_LOG_OBJECT (pad->collect.pad,
564             "pad is EOS, skipping for dequeue decision");
565       } else {
566         GST_LOG_OBJECT (pad->collect.pad,
567             "no pages in this queue, can't dequeue");
568         return FALSE;
569       }
570     } else {
571       /* We then need to check for a non-negative granulepos */
572       int i;
573       gboolean valid = FALSE;
574
575       for (i = 0; i < pad->pagebuffers->length; i++) {
576         buf = g_queue_peek_nth (pad->pagebuffers, i);
577         /* Here we check the OFFSET_END, which is actually temporarily the
578          * granulepos value for this buffer */
579         if (GST_BUFFER_OFFSET_END (buf) != -1) {
580           valid = TRUE;
581           break;
582         }
583       }
584       if (!valid) {
585         GST_LOG_OBJECT (pad->collect.pad,
586             "No page timestamps in queue, can't dequeue");
587         return FALSE;
588       }
589     }
590
591     walk = g_slist_next (walk);
592   }
593
594   walk = mux->collect->data;
595   while (walk) {
596     GstOggPad *pad = (GstOggPad *) walk->data;
597
598     /* any page with a granulepos of -1 can be pushed immediately.
599      * TODO: it CAN be, but it seems silly to do so? */
600     buf = g_queue_peek_head (pad->pagebuffers);
601     while (buf && GST_BUFFER_OFFSET_END (buf) == -1) {
602       GST_LOG_OBJECT (pad->collect.pad, "[gp        -1] pushing page");
603       g_queue_pop_head (pad->pagebuffers);
604       *flowret = gst_ogg_mux_push_buffer (mux, buf);
605       buf = g_queue_peek_head (pad->pagebuffers);
606       ret = TRUE;
607     }
608
609     if (buf) {
610       /* if no oldest buffer yet, take this one */
611       if (oldest == GST_CLOCK_TIME_NONE) {
612         GST_LOG_OBJECT (mux, "no oldest yet, taking buffer %p from pad %"
613             GST_PTR_FORMAT " with gp time %" GST_TIME_FORMAT,
614             buf, pad->collect.pad, GST_TIME_ARGS (GST_BUFFER_OFFSET (buf)));
615         oldest = GST_BUFFER_OFFSET (buf);
616         opad = pad;
617       } else {
618         /* if we have an oldest, compare with this one */
619         if (GST_BUFFER_OFFSET (buf) < oldest) {
620           GST_LOG_OBJECT (mux, "older buffer %p, taking from pad %"
621               GST_PTR_FORMAT " with gp time %" GST_TIME_FORMAT,
622               buf, pad->collect.pad, GST_TIME_ARGS (GST_BUFFER_OFFSET (buf)));
623           oldest = GST_BUFFER_OFFSET (buf);
624           opad = pad;
625         }
626       }
627     }
628     walk = g_slist_next (walk);
629   }
630
631   if (oldest != GST_CLOCK_TIME_NONE) {
632     g_assert (opad);
633     buf = g_queue_pop_head (opad->pagebuffers);
634     GST_LOG_OBJECT (opad->collect.pad,
635         GST_GP_FORMAT " pushing oldest page buffer %p (granulepos time %"
636         GST_TIME_FORMAT ")", GST_BUFFER_OFFSET_END (buf), buf,
637         GST_TIME_ARGS (GST_BUFFER_OFFSET (buf)));
638     *flowret = gst_ogg_mux_push_buffer (mux, buf);
639     ret = TRUE;
640   }
641
642   return ret;
643 }
644
645 /* put the given ogg page on a per-pad queue, timestamping it correctly.
646  * after that, dequeue and push as many pages as possible.
647  * Caller should make sure:
648  * pad->timestamp     was set with the timestamp of the first packet put
649  *                    on the page
650  * pad->timestamp_end was set with the timestamp + duration of the last packet
651  *                    put on the page
652  * pad->gp_time       was set with the time matching the gp of the last
653  *                    packet put on the page
654  *
655  * will also reset timestamp and timestamp_end, so caller func can restart
656  * counting.
657  */
658 static GstFlowReturn
659 gst_ogg_mux_pad_queue_page (GstOggMux * mux, GstOggPad * pad, ogg_page * page,
660     gboolean delta)
661 {
662   GstFlowReturn ret;
663   GstBuffer *buffer = gst_ogg_mux_buffer_from_page (mux, page, delta);
664
665   /* take the timestamp of the first packet on this page */
666   GST_BUFFER_TIMESTAMP (buffer) = pad->timestamp;
667   GST_BUFFER_DURATION (buffer) = pad->timestamp_end - pad->timestamp;
668   /* take the gp time of the last completed packet on this page */
669   GST_BUFFER_OFFSET (buffer) = pad->gp_time;
670
671   /* the next page will start where the current page's end time leaves off */
672   pad->timestamp = pad->timestamp_end;
673
674   g_queue_push_tail (pad->pagebuffers, buffer);
675   GST_LOG_OBJECT (pad->collect.pad, GST_GP_FORMAT
676       " queued buffer page %p (gp time %"
677       GST_TIME_FORMAT ", timestamp %" GST_TIME_FORMAT
678       "), %d page buffers queued", ogg_page_granulepos (page),
679       buffer, GST_TIME_ARGS (GST_BUFFER_OFFSET (buffer)),
680       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
681       g_queue_get_length (pad->pagebuffers));
682
683   while (gst_ogg_mux_dequeue_page (mux, &ret)) {
684     if (ret != GST_FLOW_OK)
685       break;
686   }
687
688   return ret;
689 }
690
691 /*
692  * Given two pads, compare the buffers queued on it.
693  * Returns:
694  *  0 if they have an equal priority
695  * -1 if the first is better
696  *  1 if the second is better
697  * Priority decided by: a) validity, b) older timestamp, c) smaller number
698  * of muxed pages
699  */
700 static gint
701 gst_ogg_mux_compare_pads (GstOggMux * ogg_mux, GstOggPad * first,
702     GstOggPad * second)
703 {
704   guint64 firsttime, secondtime;
705
706   /* if the first pad doesn't contain anything or is even NULL, return
707    * the second pad as best candidate and vice versa */
708   if (first == NULL || (first->buffer == NULL && first->next_buffer == NULL))
709     return 1;
710   if (second == NULL || (second->buffer == NULL && second->next_buffer == NULL))
711     return -1;
712
713   /* no timestamp on first buffer, it must go first */
714   if (first->buffer)
715     firsttime = GST_BUFFER_TIMESTAMP (first->buffer);
716   else
717     firsttime = GST_BUFFER_TIMESTAMP (first->next_buffer);
718   if (firsttime == GST_CLOCK_TIME_NONE)
719     return -1;
720
721   /* no timestamp on second buffer, it must go first */
722   if (second->buffer)
723     secondtime = GST_BUFFER_TIMESTAMP (second->buffer);
724   else
725     secondtime = GST_BUFFER_TIMESTAMP (second->next_buffer);
726   if (secondtime == GST_CLOCK_TIME_NONE)
727     return 1;
728
729   /* first buffer has higher timestamp, second one should go first */
730   if (secondtime < firsttime)
731     return 1;
732   /* second buffer has higher timestamp, first one should go first */
733   else if (secondtime > firsttime)
734     return -1;
735   else {
736     /* buffers with equal timestamps, prefer the pad that has the
737      * least number of pages muxed */
738     if (second->pageno < first->pageno)
739       return 1;
740     else if (second->pageno > first->pageno)
741       return -1;
742   }
743
744   /* same priority if all of the above failed */
745   return 0;
746 }
747
748 /* make sure at least one buffer is queued on all pads, two if possible
749  * 
750  * if pad->buffer == NULL, pad->next_buffer !=  NULL, then
751  *   we do not know if the buffer is the last or not
752  * if pad->buffer != NULL, pad->next_buffer != NULL, then
753  *   pad->buffer is not the last buffer for the pad
754  * if pad->buffer != NULL, pad->next_buffer == NULL, then
755  *   pad->buffer if the last buffer for the pad
756  * 
757  * returns a pointer to an oggpad that holds the best buffer, or
758  * NULL when no pad was usable. "best" means the buffer marked
759  * with the lowest timestamp. If best->buffer == NULL then nothing
760  * should be done until more data arrives */
761 static GstOggPad *
762 gst_ogg_mux_queue_pads (GstOggMux * ogg_mux)
763 {
764   GstOggPad *bestpad = NULL, *still_hungry = NULL;
765   GSList *walk;
766
767   /* try to make sure we have a buffer from each usable pad first */
768   walk = ogg_mux->collect->data;
769   while (walk) {
770     GstOggPad *pad;
771     GstCollectData *data;
772
773     data = (GstCollectData *) walk->data;
774     pad = (GstOggPad *) data;
775
776     walk = g_slist_next (walk);
777
778     GST_LOG_OBJECT (data->pad, "looking at pad for buffer");
779
780     /* try to get a new buffer for this pad if needed and possible */
781     if (pad->buffer == NULL) {
782       GstBuffer *buf;
783       gboolean incaps;
784
785       /* shift the buffer along if needed (it's okay if next_buffer is NULL) */
786       if (pad->buffer == NULL) {
787         GST_LOG_OBJECT (data->pad, "shifting buffer %" GST_PTR_FORMAT,
788             pad->next_buffer);
789         pad->buffer = pad->next_buffer;
790         pad->next_buffer = NULL;
791       }
792
793       buf = gst_collect_pads_pop (ogg_mux->collect, data);
794       GST_LOG_OBJECT (data->pad, "popped buffer %" GST_PTR_FORMAT, buf);
795
796       /* On EOS we get a NULL buffer */
797       if (buf != NULL) {
798         incaps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS);
799         /* if we need headers */
800         if (pad->state == GST_OGG_PAD_STATE_CONTROL) {
801           /* and we have one */
802           if (incaps) {
803             GST_DEBUG_OBJECT (ogg_mux,
804                 "got incaps buffer in control state, ignoring");
805             /* just ignore */
806             gst_buffer_unref (buf);
807             buf = NULL;
808           } else {
809             GST_DEBUG_OBJECT (ogg_mux,
810                 "got data buffer in control state, switching " "to data mode");
811             /* this is a data buffer so switch to data state */
812             pad->state = GST_OGG_PAD_STATE_DATA;
813           }
814         }
815       } else {
816         GST_DEBUG_OBJECT (data->pad, "EOS on pad");
817         if (!pad->eos) {
818           ogg_page page;
819           GstFlowReturn ret;
820
821           /* it's no longer active */
822           ogg_mux->active_pads--;
823
824           /* Just gone to EOS. Flush existing page(s) */
825           pad->eos = TRUE;
826
827           while (ogg_stream_flush (&pad->stream, &page)) {
828             /* Place page into the per-pad queue */
829             ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
830                 pad->first_delta);
831             /* increment the page number counter */
832             pad->pageno++;
833             /* mark other pages as delta */
834             pad->first_delta = TRUE;
835           }
836         }
837       }
838
839       pad->next_buffer = buf;
840     }
841
842     /* we should have a buffer now, see if it is the best pad to
843      * pull on */
844     if (pad->buffer || pad->next_buffer) {
845       if (gst_ogg_mux_compare_pads (ogg_mux, bestpad, pad) > 0) {
846         GST_LOG_OBJECT (data->pad,
847             "new best pad, with buffers %" GST_PTR_FORMAT
848             " and %" GST_PTR_FORMAT, pad->buffer, pad->next_buffer);
849
850         bestpad = pad;
851       }
852     } else if (!pad->eos) {
853       GST_LOG_OBJECT (data->pad, "hungry pad");
854       still_hungry = pad;
855     }
856   }
857
858   if (still_hungry)
859     /* drop back into collectpads... */
860     return still_hungry;
861   else
862     return bestpad;
863 }
864
865 static GList *
866 gst_ogg_mux_get_headers (GstOggPad * pad)
867 {
868   GList *res = NULL;
869   GstOggMux *ogg_mux;
870   GstStructure *structure;
871   GstCaps *caps;
872   GstPad *thepad;
873
874   thepad = pad->collect.pad;
875
876   ogg_mux = GST_OGG_MUX (GST_PAD_PARENT (thepad));
877
878   GST_LOG_OBJECT (thepad, "getting headers");
879
880   caps = gst_pad_get_negotiated_caps (thepad);
881   if (caps != NULL) {
882     const GValue *streamheader;
883
884     structure = gst_caps_get_structure (caps, 0);
885     if (strcmp (gst_structure_get_name (structure), "video/x-dirac") == 0) {
886       GstBuffer *buf = gst_buffer_new_and_alloc (16);
887       int fps_n = 12;
888       int fps_d = 1;
889
890       gst_structure_get_fraction (structure, "framerate", &fps_n, &fps_d);
891
892       memcpy (GST_BUFFER_DATA (buf), "KW-DIRAC", 8);
893       GST_WRITE_UINT32_BE (GST_BUFFER_DATA (buf) + 8, fps_n);
894       GST_WRITE_UINT32_BE (GST_BUFFER_DATA (buf) + 12, fps_d);
895
896       res = g_list_append (res, buf);
897
898       //res = g_list_append (res, gst_buffer_ref(pad->buffer));
899     } else {
900       streamheader = gst_structure_get_value (structure, "streamheader");
901       if (streamheader != NULL) {
902         GST_LOG_OBJECT (thepad, "got header");
903         if (G_VALUE_TYPE (streamheader) == GST_TYPE_ARRAY) {
904           GArray *bufarr = g_value_peek_pointer (streamheader);
905           gint i;
906
907           GST_LOG_OBJECT (thepad, "got fixed list");
908
909           for (i = 0; i < bufarr->len; i++) {
910             GValue *bufval = &g_array_index (bufarr, GValue, i);
911
912             GST_LOG_OBJECT (thepad, "item %d", i);
913             if (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER) {
914               GstBuffer *buf = g_value_peek_pointer (bufval);
915
916               GST_LOG_OBJECT (thepad, "adding item %d to header list", i);
917
918               gst_buffer_ref (buf);
919               res = g_list_append (res, buf);
920             }
921           }
922         } else {
923           GST_LOG_OBJECT (thepad, "streamheader is not fixed list");
924         }
925       } else {
926         GST_LOG_OBJECT (thepad, "caps don't have streamheader");
927       }
928     }
929     gst_caps_unref (caps);
930   } else {
931     GST_LOG_OBJECT (thepad, "got empty caps as negotiated format");
932   }
933   return res;
934 }
935
936 static GstCaps *
937 gst_ogg_mux_set_header_on_caps (GstCaps * caps, GList * buffers)
938 {
939   GstStructure *structure;
940   GValue array = { 0 };
941   GList *walk = buffers;
942
943   caps = gst_caps_make_writable (caps);
944
945   structure = gst_caps_get_structure (caps, 0);
946
947   /* put buffers in a fixed list */
948   g_value_init (&array, GST_TYPE_ARRAY);
949
950   while (walk) {
951     GstBuffer *buf = GST_BUFFER (walk->data);
952     GstBuffer *copy;
953     GValue value = { 0 };
954
955     walk = walk->next;
956
957     /* mark buffer */
958     GST_LOG ("Setting IN_CAPS on buffer of length %d", GST_BUFFER_SIZE (buf));
959     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_IN_CAPS);
960
961     g_value_init (&value, GST_TYPE_BUFFER);
962     copy = gst_buffer_copy (buf);
963     gst_value_set_buffer (&value, copy);
964     gst_buffer_unref (copy);
965     gst_value_array_append_value (&array, &value);
966     g_value_unset (&value);
967   }
968   gst_structure_set_value (structure, "streamheader", &array);
969   g_value_unset (&array);
970
971   return caps;
972 }
973
974 /*
975  * For each pad we need to write out one (small) header in one
976  * page that allows decoders to identify the type of the stream.
977  * After that we need to write out all extra info for the decoders.
978  * In the case of a codec that also needs data as configuration, we can
979  * find that info in the streamcaps. 
980  * After writing the headers we must start a new page for the data.
981  */
982 static GstFlowReturn
983 gst_ogg_mux_send_headers (GstOggMux * mux)
984 {
985   GSList *walk;
986   GList *hbufs, *hwalk;
987   GstCaps *caps;
988   GstFlowReturn ret;
989
990   hbufs = NULL;
991   ret = GST_FLOW_OK;
992
993   GST_LOG_OBJECT (mux, "collecting headers");
994
995   walk = mux->collect->data;
996   while (walk) {
997     GstOggPad *pad;
998     GstPad *thepad;
999
1000     pad = (GstOggPad *) walk->data;
1001     thepad = pad->collect.pad;
1002
1003     walk = g_slist_next (walk);
1004
1005     GST_LOG_OBJECT (mux, "looking at pad %s:%s", GST_DEBUG_PAD_NAME (thepad));
1006
1007     /* if the pad has no buffer, we don't care */
1008     if (pad->buffer == NULL && pad->next_buffer == NULL)
1009       continue;
1010
1011     /* now figure out the headers */
1012     pad->headers = gst_ogg_mux_get_headers (pad);
1013   }
1014
1015   GST_LOG_OBJECT (mux, "creating BOS pages");
1016   walk = mux->collect->data;
1017   while (walk) {
1018     GstOggPad *pad;
1019     GstBuffer *buf;
1020     ogg_packet packet;
1021     ogg_page page;
1022     GstPad *thepad;
1023     GstCaps *caps;
1024     GstStructure *structure;
1025     GstBuffer *hbuf;
1026
1027     pad = (GstOggPad *) walk->data;
1028     thepad = pad->collect.pad;
1029     caps = gst_pad_get_negotiated_caps (thepad);
1030     structure = gst_caps_get_structure (caps, 0);
1031
1032     walk = walk->next;
1033
1034     pad->packetno = 0;
1035
1036     GST_LOG_OBJECT (thepad, "looping over headers");
1037
1038     if (pad->headers) {
1039       buf = GST_BUFFER (pad->headers->data);
1040       pad->headers = g_list_remove (pad->headers, buf);
1041     } else if (pad->buffer) {
1042       buf = pad->buffer;
1043       gst_buffer_ref (buf);
1044     } else if (pad->next_buffer) {
1045       buf = pad->next_buffer;
1046       gst_buffer_ref (buf);
1047     } else {
1048       /* fixme -- should be caught in the previous list traversal. */
1049       GST_OBJECT_LOCK (pad);
1050       g_critical ("No headers or buffers on pad %s:%s",
1051           GST_DEBUG_PAD_NAME (pad));
1052       GST_OBJECT_UNLOCK (pad);
1053       continue;
1054     }
1055
1056     /* create a packet from the buffer */
1057     packet.packet = GST_BUFFER_DATA (buf);
1058     packet.bytes = GST_BUFFER_SIZE (buf);
1059     packet.granulepos = GST_BUFFER_OFFSET_END (buf);
1060     if (packet.granulepos == -1)
1061       packet.granulepos = 0;
1062     /* mark BOS and packet number */
1063     packet.b_o_s = (pad->packetno == 0);
1064     packet.packetno = pad->packetno++;
1065     /* mark EOS */
1066     packet.e_o_s = 0;
1067
1068     /* swap the packet in */
1069     ogg_stream_packetin (&pad->stream, &packet);
1070     gst_buffer_unref (buf);
1071
1072     GST_LOG_OBJECT (thepad, "flushing out BOS page");
1073     if (!ogg_stream_flush (&pad->stream, &page))
1074       g_critical ("Could not flush BOS page");
1075
1076     hbuf = gst_ogg_mux_buffer_from_page (mux, &page, FALSE);
1077
1078     GST_LOG_OBJECT (mux, "swapped out page with mime type %s",
1079         gst_structure_get_name (structure));
1080
1081     /* quick hack: put theora pages at the front.
1082      * Ideally, we would have a settable enum for which Ogg
1083      * profile we work with, and order based on that */
1084     if (strcmp (gst_structure_get_name (structure), "video/x-theora") == 0) {
1085       GST_DEBUG_OBJECT (thepad, "putting Theora page at the front");
1086       hbufs = g_list_prepend (hbufs, hbuf);
1087     } else {
1088       hbufs = g_list_append (hbufs, hbuf);
1089     }
1090     gst_caps_unref (caps);
1091   }
1092
1093   GST_LOG_OBJECT (mux, "creating next headers");
1094   walk = mux->collect->data;
1095   while (walk) {
1096     GstOggPad *pad;
1097     GstPad *thepad;
1098
1099     pad = (GstOggPad *) walk->data;
1100     thepad = pad->collect.pad;
1101
1102     walk = walk->next;
1103
1104     GST_LOG_OBJECT (mux, "looping over headers for pad %s:%s",
1105         GST_DEBUG_PAD_NAME (thepad));
1106
1107     hwalk = pad->headers;
1108     while (hwalk) {
1109       GstBuffer *buf = GST_BUFFER (hwalk->data);
1110       ogg_packet packet;
1111       ogg_page page;
1112
1113       hwalk = hwalk->next;
1114
1115       /* create a packet from the buffer */
1116       packet.packet = GST_BUFFER_DATA (buf);
1117       packet.bytes = GST_BUFFER_SIZE (buf);
1118       packet.granulepos = GST_BUFFER_OFFSET_END (buf);
1119       if (packet.granulepos == -1)
1120         packet.granulepos = 0;
1121       /* mark BOS and packet number */
1122       packet.b_o_s = (pad->packetno == 0);
1123       packet.packetno = pad->packetno++;
1124       /* mark EOS */
1125       packet.e_o_s = 0;
1126
1127       /* swap the packet in */
1128       ogg_stream_packetin (&pad->stream, &packet);
1129       gst_buffer_unref (buf);
1130
1131       /* if last header, flush page */
1132       if (hwalk == NULL) {
1133         GST_LOG_OBJECT (mux,
1134             "flushing page as packet %" G_GUINT64_FORMAT " is first or "
1135             "last packet", pad->packetno);
1136         while (ogg_stream_flush (&pad->stream, &page)) {
1137           GstBuffer *hbuf = gst_ogg_mux_buffer_from_page (mux, &page, FALSE);
1138
1139           GST_LOG_OBJECT (mux, "swapped out page");
1140           hbufs = g_list_append (hbufs, hbuf);
1141         }
1142       } else {
1143         GST_LOG_OBJECT (mux, "try to swap out page");
1144         /* just try to swap out a page then */
1145         while (ogg_stream_pageout (&pad->stream, &page) > 0) {
1146           GstBuffer *hbuf = gst_ogg_mux_buffer_from_page (mux, &page, FALSE);
1147
1148           GST_LOG_OBJECT (mux, "swapped out page");
1149           hbufs = g_list_append (hbufs, hbuf);
1150         }
1151       }
1152     }
1153     g_list_free (pad->headers);
1154     pad->headers = NULL;
1155   }
1156   /* hbufs holds all buffers for the headers now */
1157
1158   /* create caps with the buffers */
1159   caps = gst_pad_get_caps (mux->srcpad);
1160   if (caps) {
1161     caps = gst_ogg_mux_set_header_on_caps (caps, hbufs);
1162     gst_pad_set_caps (mux->srcpad, caps);
1163     gst_caps_unref (caps);
1164   }
1165   /* and send the buffers */
1166   hwalk = hbufs;
1167   while (hwalk) {
1168     GstBuffer *buf = GST_BUFFER (hwalk->data);
1169
1170     hwalk = hwalk->next;
1171
1172     if ((ret = gst_ogg_mux_push_buffer (mux, buf)) != GST_FLOW_OK)
1173       break;
1174   }
1175   g_list_free (hbufs);
1176
1177   return ret;
1178 }
1179
1180 /* this function is called to process data on the best pending pad.
1181  *
1182  * basic idea:
1183  *
1184  * 1) store the selected pad and keep on pulling until we fill a
1185  *    complete ogg page or the ogg page is filled above the max-delay
1186  *    threshold. This is needed because the ogg spec says that
1187  *    you should fill a complete page with data from the same logical
1188  *    stream. When the page is filled, go back to 1).
1189  * 2) before filling a page, read ahead one more buffer to see if this
1190  *    packet is the last of the stream. We need to do this because the ogg
1191  *    spec mandates that the last packet should have the EOS flag set before
1192  *    sending it to ogg. if pad->buffer is NULL we need to wait to find out
1193  *    whether there are any more buffers.
1194  * 3) pages get queued on a per-pad queue. Every time a page is queued, a
1195  *    dequeue is called, which will dequeue the oldest page on any pad, provided
1196  *    that ALL pads have at least one marked page in the queue (or remaining
1197  *    pads are at EOS)
1198  */
1199 static GstFlowReturn
1200 gst_ogg_mux_process_best_pad (GstOggMux * ogg_mux, GstOggPad * best)
1201 {
1202   gboolean delta_unit;
1203   GstFlowReturn ret;
1204   gint64 granulepos = 0;
1205   GstClockTime timestamp, gp_time;
1206
1207   GST_LOG_OBJECT (ogg_mux, "best pad %" GST_PTR_FORMAT
1208       ", currently pulling from %" GST_PTR_FORMAT, best->collect.pad,
1209       ogg_mux->pulling);
1210
1211   /* best->buffer is non-NULL, either the pad is EOS's or there is a next 
1212    * buffer */
1213   if (best->next_buffer == NULL && !best->eos) {
1214     GST_WARNING_OBJECT (ogg_mux, "no subsequent buffer and EOS not reached");
1215     return GST_FLOW_WRONG_STATE;
1216   }
1217
1218   /* if we were already pulling from one pad, but the new "best" buffer is
1219    * from another pad, we need to check if we have reason to flush a page
1220    * for the pad we were pulling from before */
1221   if (ogg_mux->pulling && best &&
1222       ogg_mux->pulling != best && ogg_mux->pulling->buffer) {
1223     GstOggPad *pad = ogg_mux->pulling;
1224
1225     GstClockTime last_ts = GST_BUFFER_END_TIME (pad->buffer);
1226
1227     /* if the next packet in the current page is going to make the page
1228      * too long, we need to flush */
1229     if (last_ts > ogg_mux->next_ts + ogg_mux->max_delay) {
1230       ogg_page page;
1231
1232       GST_LOG_OBJECT (pad->collect.pad,
1233           GST_GP_FORMAT " stored packet %" G_GINT64_FORMAT
1234           " will make page too long, flushing",
1235           GST_BUFFER_OFFSET_END (pad->buffer), pad->stream.packetno);
1236
1237       while (ogg_stream_flush (&pad->stream, &page)) {
1238         /* end time of this page is the timestamp of the next buffer */
1239         ogg_mux->pulling->timestamp_end = GST_BUFFER_TIMESTAMP (pad->buffer);
1240         /* Place page into the per-pad queue */
1241         ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
1242             pad->first_delta);
1243         /* increment the page number counter */
1244         pad->pageno++;
1245         /* mark other pages as delta */
1246         pad->first_delta = TRUE;
1247       }
1248       pad->new_page = TRUE;
1249       ogg_mux->pulling = NULL;
1250     }
1251   }
1252
1253   /* if we don't know which pad to pull on, use the best one */
1254   if (ogg_mux->pulling == NULL) {
1255     ogg_mux->pulling = best;
1256     GST_LOG_OBJECT (ogg_mux->pulling->collect.pad, "pulling from best pad");
1257
1258     /* remember timestamp and gp time of first buffer for this new pad */
1259     if (ogg_mux->pulling != NULL) {
1260       ogg_mux->next_ts = GST_BUFFER_TIMESTAMP (ogg_mux->pulling->buffer);
1261       GST_LOG_OBJECT (ogg_mux->pulling->collect.pad, "updated times, next ts %"
1262           GST_TIME_FORMAT, GST_TIME_ARGS (ogg_mux->next_ts));
1263     } else {
1264       /* no pad to pull on, send EOS */
1265       gst_pad_push_event (ogg_mux->srcpad, gst_event_new_eos ());
1266       return GST_FLOW_WRONG_STATE;
1267     }
1268   }
1269
1270   if (ogg_mux->need_headers) {
1271     ret = gst_ogg_mux_send_headers (ogg_mux);
1272     ogg_mux->need_headers = FALSE;
1273   }
1274
1275   /* we are pulling from a pad, continue to do so until a page
1276    * has been filled and queued */
1277   if (ogg_mux->pulling != NULL) {
1278     ogg_packet packet;
1279     ogg_page page;
1280     GstBuffer *buf, *tmpbuf;
1281     GstOggPad *pad = ogg_mux->pulling;
1282     gint64 duration;
1283     gboolean force_flush;
1284
1285     GST_LOG_OBJECT (ogg_mux->pulling->collect.pad, "pulling from pad");
1286
1287     /* now see if we have a buffer */
1288     buf = pad->buffer;
1289     if (buf == NULL) {
1290       GST_DEBUG_OBJECT (ogg_mux, "pad was EOS");
1291       ogg_mux->pulling = NULL;
1292       return GST_FLOW_OK;
1293     }
1294
1295     delta_unit = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
1296     duration = GST_BUFFER_DURATION (buf);
1297
1298     /* if the current "next timestamp" on the pad is unset, then this is the
1299      * first packet on the new page.  Update our pad's page timestamp */
1300     if (ogg_mux->pulling->timestamp == GST_CLOCK_TIME_NONE) {
1301       ogg_mux->pulling->timestamp = GST_BUFFER_TIMESTAMP (buf);
1302       GST_LOG_OBJECT (ogg_mux->pulling->collect.pad,
1303           "updated pad timestamp to %" GST_TIME_FORMAT,
1304           GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
1305     }
1306     /* create a packet from the buffer */
1307     packet.packet = GST_BUFFER_DATA (buf);
1308     packet.bytes = GST_BUFFER_SIZE (buf);
1309     packet.granulepos = GST_BUFFER_OFFSET_END (buf);
1310     if (packet.granulepos == -1)
1311       packet.granulepos = 0;
1312     /* mark BOS and packet number */
1313     packet.b_o_s = (pad->packetno == 0);
1314     packet.packetno = pad->packetno++;
1315     GST_LOG_OBJECT (pad->collect.pad, GST_GP_FORMAT
1316         " packet %" G_GINT64_FORMAT " (%ld bytes) created from buffer",
1317         packet.granulepos, packet.packetno, packet.bytes);
1318
1319     packet.e_o_s = (pad->eos ? 1 : 0);
1320     tmpbuf = NULL;
1321
1322     /* we flush when we see a new keyframe */
1323     force_flush = (pad->prev_delta && !delta_unit);
1324     if (duration != -1) {
1325       pad->duration += duration;
1326       /* if page duration exceeds max, flush page */
1327       if (pad->duration > ogg_mux->max_page_delay) {
1328         force_flush = TRUE;
1329         pad->duration = 0;
1330       }
1331     }
1332
1333     if (GST_BUFFER_IS_DISCONT (buf)) {
1334       packet.packetno++;
1335       /* No public API for this; hack things in */
1336       pad->stream.pageno++;
1337       force_flush = TRUE;
1338     }
1339
1340     /* flush the currently built page if necessary */
1341     if (force_flush) {
1342       GST_LOG_OBJECT (pad->collect.pad,
1343           GST_GP_FORMAT " forced flush of page before this packet",
1344           GST_BUFFER_OFFSET_END (pad->buffer));
1345       while (ogg_stream_flush (&pad->stream, &page)) {
1346         /* end time of this page is the timestamp of the next buffer */
1347         ogg_mux->pulling->timestamp_end = GST_BUFFER_TIMESTAMP (pad->buffer);
1348         ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
1349             pad->first_delta);
1350
1351         /* increment the page number counter */
1352         pad->pageno++;
1353         /* mark other pages as delta */
1354         pad->first_delta = TRUE;
1355       }
1356       pad->new_page = TRUE;
1357     }
1358
1359     /* if this is the first packet of a new page figure out the delta flag */
1360     if (pad->new_page) {
1361       if (delta_unit) {
1362         /* This page is a delta frame */
1363         if (ogg_mux->delta_pad == NULL) {
1364           /* we got a delta unit on this pad */
1365           ogg_mux->delta_pad = pad;
1366         }
1367         /* mark the page as delta */
1368         pad->first_delta = TRUE;
1369       } else {
1370         /* got a keyframe */
1371         if (ogg_mux->delta_pad == pad) {
1372           /* if we get it on the pad with deltaunits,
1373            * we mark the page as non delta */
1374           pad->first_delta = FALSE;
1375         } else if (ogg_mux->delta_pad != NULL) {
1376           /* if there are pads with delta frames, we
1377            * must mark this one as delta */
1378           pad->first_delta = TRUE;
1379         } else {
1380           pad->first_delta = FALSE;
1381         }
1382       }
1383       pad->new_page = FALSE;
1384     }
1385
1386     /* save key unit to track delta->key unit transitions */
1387     pad->prev_delta = delta_unit;
1388
1389     /* swap the packet in */
1390     if (packet.e_o_s == 1)
1391       GST_DEBUG_OBJECT (pad->collect.pad, "swapping in EOS packet");
1392     if (packet.b_o_s == 1)
1393       GST_DEBUG_OBJECT (pad->collect.pad, "swapping in BOS packet");
1394
1395     ogg_stream_packetin (&pad->stream, &packet);
1396
1397     gp_time = GST_BUFFER_OFFSET (pad->buffer);
1398     granulepos = GST_BUFFER_OFFSET_END (pad->buffer);
1399     timestamp = GST_BUFFER_TIMESTAMP (pad->buffer);
1400
1401     GST_LOG_OBJECT (pad->collect.pad,
1402         GST_GP_FORMAT " packet %" G_GINT64_FORMAT ", gp time %"
1403         GST_TIME_FORMAT ", timestamp %" GST_TIME_FORMAT " packetin'd",
1404         granulepos, packet.packetno, GST_TIME_ARGS (gp_time),
1405         GST_TIME_ARGS (timestamp));
1406     /* don't need the old buffer anymore */
1407     gst_buffer_unref (pad->buffer);
1408     /* store new readahead buffer */
1409     pad->buffer = tmpbuf;
1410
1411     /* let ogg write out the pages now. The packet we got could end
1412      * up in more than one page so we need to write them all */
1413     if (ogg_stream_pageout (&pad->stream, &page) > 0) {
1414       /* we have a new page, so we need to timestamp it correctly.
1415        * if this fresh packet ends on this page, then the page's granulepos
1416        * comes from that packet, and we should set this buffer's timestamp */
1417
1418       GST_LOG_OBJECT (pad->collect.pad,
1419           GST_GP_FORMAT " packet %" G_GINT64_FORMAT ", time %"
1420           GST_TIME_FORMAT ") caused new page",
1421           granulepos, packet.packetno, GST_TIME_ARGS (timestamp));
1422       GST_LOG_OBJECT (pad->collect.pad,
1423           GST_GP_FORMAT " new page %ld", ogg_page_granulepos (&page),
1424           pad->stream.pageno);
1425
1426       if (ogg_page_granulepos (&page) == granulepos) {
1427         /* the packet we streamed in finishes on the current page,
1428          * because the page's granulepos is the granulepos of the last
1429          * packet completed on that page,
1430          * so update the timestamp that we will give to the page */
1431         GST_LOG_OBJECT (pad->collect.pad,
1432             GST_GP_FORMAT
1433             " packet finishes on current page, updating gp time to %"
1434             GST_TIME_FORMAT, granulepos, GST_TIME_ARGS (gp_time));
1435         pad->gp_time = gp_time;
1436       } else {
1437         GST_LOG_OBJECT (pad->collect.pad,
1438             GST_GP_FORMAT
1439             " packet spans beyond current page, keeping old gp time %"
1440             GST_TIME_FORMAT, granulepos, GST_TIME_ARGS (pad->gp_time));
1441       }
1442
1443       /* push the page */
1444       /* end time of this page is the timestamp of the next buffer */
1445       pad->timestamp_end = timestamp;
1446       ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page, pad->first_delta);
1447       pad->pageno++;
1448       /* mark next pages as delta */
1449       pad->first_delta = TRUE;
1450
1451       /* use an inner loop here to flush the remaining pages and
1452        * mark them as delta frames as well */
1453       while (ogg_stream_pageout (&pad->stream, &page) > 0) {
1454         if (ogg_page_granulepos (&page) == granulepos) {
1455           /* the page has taken up the new packet completely, which means
1456            * the packet ends the page and we can update the gp time
1457            * before pushing out */
1458           pad->gp_time = gp_time;
1459         }
1460
1461         /* we have a complete page now, we can push the page
1462          * and make sure to pull on a new pad the next time around */
1463         ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
1464             pad->first_delta);
1465         /* increment the page number counter */
1466         pad->pageno++;
1467       }
1468       /* need a new page as well */
1469       pad->new_page = TRUE;
1470       pad->duration = 0;
1471       /* we're done pulling on this pad, make sure to choose a new
1472        * pad for pulling in the next iteration */
1473       ogg_mux->pulling = NULL;
1474     }
1475
1476     /* Update the gp time, if necessary, since any future page will have at
1477      * least this gp time.
1478      */
1479     if (pad->gp_time < gp_time) {
1480       pad->gp_time = gp_time;
1481       GST_LOG_OBJECT (pad->collect.pad,
1482           "Updated running gp time of pad %" GST_PTR_FORMAT
1483           " to %" GST_TIME_FORMAT, pad->collect.pad, GST_TIME_ARGS (gp_time));
1484     }
1485   }
1486
1487   return GST_FLOW_OK;
1488 }
1489
1490 /** all_pads_eos:
1491  *
1492  * Checks if all pads are EOS'd by peeking.
1493  *
1494  * Returns TRUE if all pads are EOS.
1495  */
1496 static gboolean
1497 all_pads_eos (GstCollectPads * pads)
1498 {
1499   GSList *walk;
1500   gboolean alleos = TRUE;
1501
1502   walk = pads->data;
1503   while (walk) {
1504     GstBuffer *buf;
1505     GstCollectData *data = (GstCollectData *) walk->data;
1506
1507     buf = gst_collect_pads_peek (pads, data);
1508     if (buf) {
1509       alleos = FALSE;
1510       gst_buffer_unref (buf);
1511       goto beach;
1512     }
1513     walk = walk->next;
1514   }
1515 beach:
1516   return alleos;
1517 }
1518
1519 /* This function is called when there is data on all pads.
1520  * 
1521  * It finds a pad to pull on, this is done by looking at the buffers
1522  * to decide which one to use, and using the 'oldest' one first. It then calls
1523  * gst_ogg_mux_process_best_pad() to process as much data as possible.
1524  * 
1525  * If all the pads have received EOS, it flushes out all data by continually
1526  * getting the best pad and calling gst_ogg_mux_process_best_pad() until they
1527  * are all empty, and then sends EOS.
1528  */
1529 static GstFlowReturn
1530 gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux)
1531 {
1532   GstOggPad *best;
1533   GstFlowReturn ret;
1534   gint activebefore;
1535
1536   GST_LOG_OBJECT (ogg_mux, "collected");
1537
1538   activebefore = ogg_mux->active_pads;
1539
1540   /* queue buffers on all pads; find a buffer with the lowest timestamp */
1541   best = gst_ogg_mux_queue_pads (ogg_mux);
1542   if (best && !best->buffer) {
1543     GST_DEBUG_OBJECT (ogg_mux, "No buffer available on best pad");
1544     return GST_FLOW_OK;
1545   }
1546
1547   if (!best) {
1548     return GST_FLOW_WRONG_STATE;
1549   }
1550
1551   ret = gst_ogg_mux_process_best_pad (ogg_mux, best);
1552
1553   if (ogg_mux->active_pads < activebefore) {
1554     /* If the active pad count went down, this mean at least one pad has gone
1555      * EOS. Since CollectPads only calls _collected() once when all pads are
1556      * EOS, and our code doesn't _pop() from all pads we need to check that by
1557      * peeking on all pads, else we won't be called again and the muxing will
1558      * not terminate (push out EOS). */
1559
1560     /* if all the pads have been removed, flush all pending data */
1561     if ((ret == GST_FLOW_OK) && all_pads_eos (pads)) {
1562       GST_LOG_OBJECT (ogg_mux, "no pads remaining, flushing data");
1563
1564       do {
1565         best = gst_ogg_mux_queue_pads (ogg_mux);
1566         if (best)
1567           ret = gst_ogg_mux_process_best_pad (ogg_mux, best);
1568       } while ((ret == GST_FLOW_OK) && (best != NULL));
1569
1570       GST_DEBUG_OBJECT (ogg_mux, "Pushing EOS");
1571       gst_pad_push_event (ogg_mux->srcpad, gst_event_new_eos ());
1572     }
1573   }
1574
1575   return ret;
1576 }
1577
1578 static void
1579 gst_ogg_mux_get_property (GObject * object,
1580     guint prop_id, GValue * value, GParamSpec * pspec)
1581 {
1582   GstOggMux *ogg_mux;
1583
1584   ogg_mux = GST_OGG_MUX (object);
1585
1586   switch (prop_id) {
1587     case ARG_MAX_DELAY:
1588       g_value_set_uint64 (value, ogg_mux->max_delay);
1589       break;
1590     case ARG_MAX_PAGE_DELAY:
1591       g_value_set_uint64 (value, ogg_mux->max_page_delay);
1592       break;
1593     default:
1594       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1595       break;
1596   }
1597 }
1598
1599 static void
1600 gst_ogg_mux_set_property (GObject * object,
1601     guint prop_id, const GValue * value, GParamSpec * pspec)
1602 {
1603   GstOggMux *ogg_mux;
1604
1605   ogg_mux = GST_OGG_MUX (object);
1606
1607   switch (prop_id) {
1608     case ARG_MAX_DELAY:
1609       ogg_mux->max_delay = g_value_get_uint64 (value);
1610       break;
1611     case ARG_MAX_PAGE_DELAY:
1612       ogg_mux->max_page_delay = g_value_get_uint64 (value);
1613       break;
1614     default:
1615       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1616       break;
1617   }
1618 }
1619
1620 /* reset all variables in the ogg pads. */
1621 static void
1622 gst_ogg_mux_init_collectpads (GstCollectPads * collect)
1623 {
1624   GSList *walk;
1625
1626   walk = collect->data;
1627   while (walk) {
1628     GstOggPad *oggpad = (GstOggPad *) walk->data;
1629
1630     ogg_stream_init (&oggpad->stream, oggpad->serial);
1631     oggpad->packetno = 0;
1632     oggpad->pageno = 0;
1633     oggpad->eos = FALSE;
1634     /* we assume there will be some control data first for this pad */
1635     oggpad->state = GST_OGG_PAD_STATE_CONTROL;
1636     oggpad->new_page = TRUE;
1637     oggpad->first_delta = FALSE;
1638     oggpad->prev_delta = FALSE;
1639     oggpad->pagebuffers = g_queue_new ();
1640
1641     walk = g_slist_next (walk);
1642   }
1643 }
1644
1645 /* Clear all buffers from the collectpads object */
1646 static void
1647 gst_ogg_mux_clear_collectpads (GstCollectPads * collect)
1648 {
1649   GSList *walk;
1650
1651   for (walk = collect->data; walk; walk = g_slist_next (walk)) {
1652     GstOggPad *oggpad = (GstOggPad *) walk->data;
1653     GstBuffer *buf;
1654
1655     ogg_stream_clear (&oggpad->stream);
1656
1657     while ((buf = g_queue_pop_head (oggpad->pagebuffers)) != NULL) {
1658       gst_buffer_unref (buf);
1659     }
1660     g_queue_free (oggpad->pagebuffers);
1661     oggpad->pagebuffers = NULL;
1662   }
1663 }
1664
1665 static GstStateChangeReturn
1666 gst_ogg_mux_change_state (GstElement * element, GstStateChange transition)
1667 {
1668   GstOggMux *ogg_mux;
1669   GstStateChangeReturn ret;
1670
1671   ogg_mux = GST_OGG_MUX (element);
1672
1673   switch (transition) {
1674     case GST_STATE_CHANGE_NULL_TO_READY:
1675       break;
1676     case GST_STATE_CHANGE_READY_TO_PAUSED:
1677       gst_ogg_mux_clear (ogg_mux);
1678       gst_ogg_mux_init_collectpads (ogg_mux->collect);
1679       gst_collect_pads_start (ogg_mux->collect);
1680       break;
1681     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1682       break;
1683     case GST_STATE_CHANGE_PAUSED_TO_READY:
1684       gst_collect_pads_stop (ogg_mux->collect);
1685       break;
1686     default:
1687       break;
1688   }
1689
1690   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1691
1692   switch (transition) {
1693     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1694       break;
1695     case GST_STATE_CHANGE_PAUSED_TO_READY:
1696       gst_ogg_mux_clear_collectpads (ogg_mux->collect);
1697       break;
1698     case GST_STATE_CHANGE_READY_TO_NULL:
1699       break;
1700     default:
1701       break;
1702   }
1703
1704   return ret;
1705 }
1706
1707 gboolean
1708 gst_ogg_mux_plugin_init (GstPlugin * plugin)
1709 {
1710   GST_DEBUG_CATEGORY_INIT (gst_ogg_mux_debug, "oggmux", 0, "ogg muxer");
1711
1712   return gst_element_register (plugin, "oggmux", GST_RANK_NONE,
1713       GST_TYPE_OGG_MUX);
1714 }