c941915c9771c612d72f3f04a2a3928c5f9e5076
[platform/upstream/gstreamer.git] / ext / ogg / gstoggmux.c
1 /* OGG muxer plugin for GStreamer
2  * Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
3  * Copyright (C) 2006 Thomas Vander Stichele <thomas at apestaart dot org>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24
25 #include <gst/gst.h>
26 #include <gst/base/gstcollectpads.h>
27
28 #include <ogg/ogg.h>
29 /* memcpy - if someone knows a way to get rid of it, please speak up
30  * note: the ogg docs even say you need this... */
31 #include <string.h>
32 #include <time.h>
33 #include <stdlib.h>             /* rand, srand, atoi */
34
35 GST_DEBUG_CATEGORY_STATIC (gst_ogg_mux_debug);
36 #define GST_CAT_DEFAULT gst_ogg_mux_debug
37
38 #define GST_TYPE_OGG_MUX (gst_ogg_mux_get_type())
39 #define GST_OGG_MUX(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_OGG_MUX, GstOggMux))
40 #define GST_OGG_MUX_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_OGG_MUX, GstOggMux))
41 #define GST_IS_OGG_MUX(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_OGG_MUX))
42 #define GST_IS_OGG_MUX_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_OGG_MUX))
43
44 /* This isn't generally what you'd want with an end-time macro, because
45    technically the end time of a buffer with invalid duration is invalid. But
46    for sorting ogg pages this is what we want. */
47 #define GST_BUFFER_END_TIME(buf) \
48     (GST_BUFFER_DURATION_IS_VALID (buf) \
49     ? GST_BUFFER_TIMESTAMP (buf) + GST_BUFFER_DURATION (buf) \
50     : GST_BUFFER_TIMESTAMP (buf))
51
52 #define GST_GP_FORMAT "[gp %8" G_GINT64_FORMAT "]"
53
54 typedef struct _GstOggMux GstOggMux;
55 typedef struct _GstOggMuxClass GstOggMuxClass;
56
57 typedef enum
58 {
59   GST_OGG_PAD_STATE_CONTROL = 0,
60   GST_OGG_PAD_STATE_DATA = 1
61 }
62 GstOggPadState;
63
64 /* all information needed for one ogg stream */
65 typedef struct
66 {
67   GstCollectData collect;       /* we extend the CollectData */
68
69   /* These two buffers make a very simple queue - they enter as 'next_buffer'
70    * and (usually) leave as 'buffer', except at EOS, when buffer will be NULL */
71   GstBuffer *buffer;            /* the first waiting buffer for the pad */
72   GstBuffer *next_buffer;       /* the second waiting buffer for the pad */
73
74   gint serial;
75   ogg_stream_state stream;
76   gint64 packetno;              /* number of next packet */
77   gint64 pageno;                /* number of next page */
78   guint64 duration;             /* duration of current page */
79   gboolean eos;
80   gint64 offset;
81   GstClockTime timestamp;       /* timestamp of the first packet on the next
82                                  * page to be dequeued */
83   GstClockTime timestamp_end;   /* end timestamp of last complete packet on
84                                    the next page to be dequeued */
85   GstClockTime gp_time;         /* time corresponding to the gp value of the
86                                    last complete packet on the next page to be
87                                    dequeued */
88
89   GstOggPadState state;         /* state of the pad */
90
91   GList *headers;
92
93   GQueue *pagebuffers;          /* List of pages in buffers ready for pushing */
94
95   gboolean new_page;            /* starting a new page */
96   gboolean first_delta;         /* was the first packet in the page a delta */
97   gboolean prev_delta;          /* was the previous buffer a delta frame */
98 }
99 GstOggPad;
100
101 struct _GstOggMux
102 {
103   GstElement element;
104
105   /* source pad */
106   GstPad *srcpad;
107
108   /* sinkpads */
109   GstCollectPads *collect;
110
111   /* number of pads which have not received EOS */
112   gint active_pads;
113
114   /* the pad we are currently using to fill a page */
115   GstOggPad *pulling;
116
117   /* next timestamp for the page */
118   GstClockTime next_ts;
119
120   /* Last timestamp actually output on src pad */
121   GstClockTime last_ts;
122
123   /* offset in stream */
124   guint64 offset;
125
126   /* need_headers */
127   gboolean need_headers;
128
129   guint64 max_delay;
130   guint64 max_page_delay;
131
132   GstOggPad *delta_pad;         /* when a delta frame is detected on a stream, we mark
133                                    pages as delta frames up to the page that has the
134                                    keyframe */
135
136 };
137
138 typedef enum
139 {
140   GST_OGG_FLAG_BOS = GST_ELEMENT_FLAG_LAST,
141   GST_OGG_FLAG_EOS
142 }
143 GstOggFlag;
144
145 struct _GstOggMuxClass
146 {
147   GstElementClass parent_class;
148 };
149
150 /* elementfactory information */
151 static const GstElementDetails gst_ogg_mux_details =
152 GST_ELEMENT_DETAILS ("Ogg muxer",
153     "Codec/Muxer",
154     "mux ogg streams (info about ogg: http://xiph.org)",
155     "Wim Taymans <wim@fluendo.com>");
156
157 /* OggMux signals and args */
158 enum
159 {
160   /* FILL ME */
161   LAST_SIGNAL
162 };
163
164 /* set to 0.5 seconds by default */
165 #define DEFAULT_MAX_DELAY       G_GINT64_CONSTANT(500000000)
166 #define DEFAULT_MAX_PAGE_DELAY  G_GINT64_CONSTANT(500000000)
167 enum
168 {
169   ARG_0,
170   ARG_MAX_DELAY,
171   ARG_MAX_PAGE_DELAY,
172 };
173
174 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
175     GST_PAD_SRC,
176     GST_PAD_ALWAYS,
177     GST_STATIC_CAPS ("application/ogg")
178     );
179
180 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink_%d",
181     GST_PAD_SINK,
182     GST_PAD_REQUEST,
183     GST_STATIC_CAPS ("video/x-theora; "
184         "audio/x-vorbis; audio/x-flac; audio/x-speex; "
185         "application/x-ogm-video; application/x-ogm-audio; video/x-dirac; "
186         "video/x-smoke; text/x-cmml, encoded = (boolean) TRUE")
187     );
188
189 static void gst_ogg_mux_base_init (gpointer g_class);
190 static void gst_ogg_mux_class_init (GstOggMuxClass * klass);
191 static void gst_ogg_mux_init (GstOggMux * ogg_mux);
192 static void gst_ogg_mux_finalize (GObject * object);
193
194 static GstFlowReturn
195 gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux);
196 static gboolean gst_ogg_mux_handle_src_event (GstPad * pad, GstEvent * event);
197 static GstPad *gst_ogg_mux_request_new_pad (GstElement * element,
198     GstPadTemplate * templ, const gchar * name);
199 static void gst_ogg_mux_release_pad (GstElement * element, GstPad * pad);
200
201 static void gst_ogg_mux_set_property (GObject * object,
202     guint prop_id, const GValue * value, GParamSpec * pspec);
203 static void gst_ogg_mux_get_property (GObject * object,
204     guint prop_id, GValue * value, GParamSpec * pspec);
205 static GstStateChangeReturn gst_ogg_mux_change_state (GstElement * element,
206     GstStateChange transition);
207
208 static GstElementClass *parent_class = NULL;
209
210 /*static guint gst_ogg_mux_signals[LAST_SIGNAL] = { 0 }; */
211
212 GType
213 gst_ogg_mux_get_type (void)
214 {
215   static GType ogg_mux_type = 0;
216
217   if (G_UNLIKELY (ogg_mux_type == 0)) {
218     static const GTypeInfo ogg_mux_info = {
219       sizeof (GstOggMuxClass),
220       gst_ogg_mux_base_init,
221       NULL,
222       (GClassInitFunc) gst_ogg_mux_class_init,
223       NULL,
224       NULL,
225       sizeof (GstOggMux),
226       0,
227       (GInstanceInitFunc) gst_ogg_mux_init,
228     };
229
230     ogg_mux_type =
231         g_type_register_static (GST_TYPE_ELEMENT, "GstOggMux", &ogg_mux_info,
232         0);
233   }
234   return ogg_mux_type;
235 }
236
237 static void
238 gst_ogg_mux_base_init (gpointer g_class)
239 {
240   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
241
242   gst_element_class_add_pad_template (element_class,
243       gst_static_pad_template_get (&src_factory));
244   gst_element_class_add_pad_template (element_class,
245       gst_static_pad_template_get (&sink_factory));
246
247   gst_element_class_set_details (element_class, &gst_ogg_mux_details);
248 }
249
250 static void
251 gst_ogg_mux_class_init (GstOggMuxClass * klass)
252 {
253   GObjectClass *gobject_class;
254   GstElementClass *gstelement_class;
255
256   gobject_class = (GObjectClass *) klass;
257   gstelement_class = (GstElementClass *) klass;
258
259   parent_class = g_type_class_peek_parent (klass);
260
261   gobject_class->finalize = gst_ogg_mux_finalize;
262   gobject_class->get_property = gst_ogg_mux_get_property;
263   gobject_class->set_property = gst_ogg_mux_set_property;
264
265   gstelement_class->request_new_pad = gst_ogg_mux_request_new_pad;
266   gstelement_class->release_pad = gst_ogg_mux_release_pad;
267
268   g_object_class_install_property (gobject_class, ARG_MAX_DELAY,
269       g_param_spec_uint64 ("max-delay", "Max delay",
270           "Maximum delay in multiplexing streams", 0, G_MAXUINT64,
271           DEFAULT_MAX_DELAY, (GParamFlags) G_PARAM_READWRITE));
272   g_object_class_install_property (gobject_class, ARG_MAX_PAGE_DELAY,
273       g_param_spec_uint64 ("max-page-delay", "Max page delay",
274           "Maximum delay for sending out a page", 0, G_MAXUINT64,
275           DEFAULT_MAX_PAGE_DELAY, (GParamFlags) G_PARAM_READWRITE));
276
277   gstelement_class->change_state = gst_ogg_mux_change_state;
278
279 }
280
281 #if 0
282 static const GstEventMask *
283 gst_ogg_mux_get_sink_event_masks (GstPad * pad)
284 {
285   static const GstEventMask gst_ogg_mux_sink_event_masks[] = {
286     {GST_EVENT_EOS, 0},
287     {GST_EVENT_DISCONTINUOUS, 0},
288     {0,}
289   };
290
291   return gst_ogg_mux_sink_event_masks;
292 }
293 #endif
294
295 static void
296 gst_ogg_mux_clear (GstOggMux * ogg_mux)
297 {
298   ogg_mux->pulling = NULL;
299   ogg_mux->need_headers = TRUE;
300   ogg_mux->max_delay = DEFAULT_MAX_DELAY;
301   ogg_mux->max_page_delay = DEFAULT_MAX_PAGE_DELAY;
302   ogg_mux->delta_pad = NULL;
303   ogg_mux->offset = 0;
304   ogg_mux->next_ts = 0;
305   ogg_mux->last_ts = GST_CLOCK_TIME_NONE;
306 }
307
308 static void
309 gst_ogg_mux_init (GstOggMux * ogg_mux)
310 {
311   GstElementClass *klass = GST_ELEMENT_GET_CLASS (ogg_mux);
312
313   ogg_mux->srcpad =
314       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
315           "src"), "src");
316   gst_pad_set_event_function (ogg_mux->srcpad, gst_ogg_mux_handle_src_event);
317   gst_element_add_pad (GST_ELEMENT (ogg_mux), ogg_mux->srcpad);
318
319   GST_OBJECT_FLAG_SET (GST_ELEMENT (ogg_mux), GST_OGG_FLAG_BOS);
320
321   /* seed random number generator for creation of serial numbers */
322   srand (time (NULL));
323
324   ogg_mux->collect = gst_collect_pads_new ();
325   gst_collect_pads_set_function (ogg_mux->collect,
326       (GstCollectPadsFunction) GST_DEBUG_FUNCPTR (gst_ogg_mux_collected),
327       ogg_mux);
328
329   gst_ogg_mux_clear (ogg_mux);
330 }
331
332 static void
333 gst_ogg_mux_finalize (GObject * object)
334 {
335   GstOggMux *ogg_mux;
336
337   ogg_mux = GST_OGG_MUX (object);
338
339   if (ogg_mux->collect) {
340     gst_object_unref (ogg_mux->collect);
341     ogg_mux->collect = NULL;
342   }
343
344   G_OBJECT_CLASS (parent_class)->finalize (object);
345 }
346
347 static void
348 gst_ogg_mux_ogg_pad_destroy_notify (GstCollectData * data)
349 {
350   GstOggPad *oggpad = (GstOggPad *) data;
351   GstBuffer *buf;
352
353   ogg_stream_clear (&oggpad->stream);
354
355   if (oggpad->pagebuffers) {
356     while ((buf = g_queue_pop_head (oggpad->pagebuffers)) != NULL) {
357       gst_buffer_unref (buf);
358     }
359     g_queue_free (oggpad->pagebuffers);
360     oggpad->pagebuffers = NULL;
361   }
362 }
363
364 static GstPadLinkReturn
365 gst_ogg_mux_sinkconnect (GstPad * pad, GstPad * peer)
366 {
367   GstOggMux *ogg_mux;
368
369   ogg_mux = GST_OGG_MUX (gst_pad_get_parent (pad));
370
371   GST_DEBUG_OBJECT (ogg_mux, "sinkconnect triggered on %s", GST_PAD_NAME (pad));
372
373   gst_object_unref (ogg_mux);
374
375   return GST_PAD_LINK_OK;
376 }
377
378 static GstPad *
379 gst_ogg_mux_request_new_pad (GstElement * element,
380     GstPadTemplate * templ, const gchar * req_name)
381 {
382   GstOggMux *ogg_mux;
383   GstPad *newpad;
384   GstElementClass *klass;
385
386   g_return_val_if_fail (templ != NULL, NULL);
387
388   if (templ->direction != GST_PAD_SINK)
389     goto wrong_direction;
390
391   g_return_val_if_fail (GST_IS_OGG_MUX (element), NULL);
392   ogg_mux = GST_OGG_MUX (element);
393
394   klass = GST_ELEMENT_GET_CLASS (element);
395
396   if (templ != gst_element_class_get_pad_template (klass, "sink_%d"))
397     goto wrong_template;
398
399   {
400     gint serial;
401     gchar *name;
402
403     if (req_name == NULL || strlen (req_name) < 6) {
404       /* no name given when requesting the pad, use random serial number */
405       serial = rand ();
406     } else {
407       /* parse serial number from requested padname */
408       serial = atoi (&req_name[5]);
409     }
410     /* create new pad with the name */
411     name = g_strdup_printf ("sink_%d", serial);
412     newpad = gst_pad_new_from_template (templ, name);
413     g_free (name);
414
415     /* construct our own wrapper data structure for the pad to
416      * keep track of its status */
417     {
418       GstOggPad *oggpad;
419
420       oggpad = (GstOggPad *)
421           gst_collect_pads_add_pad_full (ogg_mux->collect, newpad,
422           sizeof (GstOggPad), gst_ogg_mux_ogg_pad_destroy_notify);
423       ogg_mux->active_pads++;
424
425       oggpad->serial = serial;
426       ogg_stream_init (&oggpad->stream, serial);
427       oggpad->packetno = 0;
428       oggpad->pageno = 0;
429       oggpad->eos = FALSE;
430       /* we assume there will be some control data first for this pad */
431       oggpad->state = GST_OGG_PAD_STATE_CONTROL;
432       oggpad->new_page = TRUE;
433       oggpad->first_delta = FALSE;
434       oggpad->prev_delta = FALSE;
435       oggpad->pagebuffers = g_queue_new ();
436     }
437   }
438
439   /* setup some pad functions */
440   gst_pad_set_link_function (newpad, gst_ogg_mux_sinkconnect);
441   /* dd the pad to the element */
442   gst_element_add_pad (element, newpad);
443
444   return newpad;
445
446   /* ERRORS */
447 wrong_direction:
448   {
449     g_warning ("ogg_mux: request pad that is not a SINK pad\n");
450     return NULL;
451   }
452 wrong_template:
453   {
454     g_warning ("ogg_mux: this is not our template!\n");
455     return NULL;
456   }
457 }
458
459 static void
460 gst_ogg_mux_release_pad (GstElement * element, GstPad * pad)
461 {
462   GstOggMux *ogg_mux;
463
464   ogg_mux = GST_OGG_MUX (gst_pad_get_parent (pad));
465
466   gst_collect_pads_remove_pad (ogg_mux->collect, pad);
467   gst_element_remove_pad (element, pad);
468 }
469
470 /* handle events */
471 static gboolean
472 gst_ogg_mux_handle_src_event (GstPad * pad, GstEvent * event)
473 {
474   GstEventType type;
475
476   type = event ? GST_EVENT_TYPE (event) : GST_EVENT_UNKNOWN;
477
478   switch (type) {
479     case GST_EVENT_SEEK:
480       /* disable seeking for now */
481       return FALSE;
482     default:
483       break;
484   }
485
486   return gst_pad_event_default (pad, event);
487 }
488
489 static GstBuffer *
490 gst_ogg_mux_buffer_from_page (GstOggMux * mux, ogg_page * page, gboolean delta)
491 {
492   GstBuffer *buffer;
493
494   /* allocate space for header and body */
495   buffer = gst_buffer_new_and_alloc (page->header_len + page->body_len);
496   memcpy (GST_BUFFER_DATA (buffer), page->header, page->header_len);
497   memcpy (GST_BUFFER_DATA (buffer) + page->header_len,
498       page->body, page->body_len);
499
500   /* Here we set granulepos as our OFFSET_END to give easy direct access to
501    * this value later. Before we push it, we reset this to OFFSET + SIZE
502    * (see gst_ogg_mux_push_buffer). */
503   GST_BUFFER_OFFSET_END (buffer) = ogg_page_granulepos (page);
504   if (delta)
505     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
506
507   GST_LOG_OBJECT (mux, GST_GP_FORMAT
508       " created buffer %p from ogg page", ogg_page_granulepos (page), buffer);
509
510   return buffer;
511 }
512
513 static GstFlowReturn
514 gst_ogg_mux_push_buffer (GstOggMux * mux, GstBuffer * buffer)
515 {
516   GstCaps *caps;
517
518   /* fix up OFFSET and OFFSET_END again */
519   GST_BUFFER_OFFSET (buffer) = mux->offset;
520   mux->offset += GST_BUFFER_SIZE (buffer);
521   GST_BUFFER_OFFSET_END (buffer) = mux->offset;
522
523   /* Ensure we have monotonically increasing timestamps in the output. */
524   if (GST_BUFFER_TIMESTAMP_IS_VALID (buffer)) {
525     if (mux->last_ts != GST_CLOCK_TIME_NONE &&
526         GST_BUFFER_TIMESTAMP (buffer) < mux->last_ts)
527       GST_BUFFER_TIMESTAMP (buffer) = mux->last_ts;
528     else
529       mux->last_ts = GST_BUFFER_TIMESTAMP (buffer);
530   }
531
532   caps = gst_pad_get_negotiated_caps (mux->srcpad);
533   gst_buffer_set_caps (buffer, caps);
534   gst_caps_unref (caps);
535
536   return gst_pad_push (mux->srcpad, buffer);
537 }
538
539 /* if all queues have at least one page, dequeue the page with the lowest
540  * timestamp */
541 static gboolean
542 gst_ogg_mux_dequeue_page (GstOggMux * mux, GstFlowReturn * flowret)
543 {
544   GSList *walk;
545   GstOggPad *opad = NULL;       /* "oldest" pad */
546   GstClockTime oldest = GST_CLOCK_TIME_NONE;
547   GstBuffer *buf = NULL;
548   gboolean ret = FALSE;
549
550   *flowret = GST_FLOW_OK;
551
552   walk = mux->collect->data;
553   while (walk) {
554     GstOggPad *pad = (GstOggPad *) walk->data;
555
556     /* We need each queue to either be at EOS, or have one or more pages
557      * available with a set granulepos (i.e. not -1), otherwise we don't have
558      * enough data yet to determine which stream needs to go next for correct
559      * time ordering. */
560     if (pad->pagebuffers->length == 0) {
561       if (pad->eos) {
562         GST_LOG_OBJECT (pad->collect.pad,
563             "pad is EOS, skipping for dequeue decision");
564       } else {
565         GST_LOG_OBJECT (pad->collect.pad,
566             "no pages in this queue, can't dequeue");
567         return FALSE;
568       }
569     } else {
570       /* We then need to check for a non-negative granulepos */
571       int i;
572       gboolean valid = FALSE;
573
574       for (i = 0; i < pad->pagebuffers->length; i++) {
575         buf = g_queue_peek_nth (pad->pagebuffers, i);
576         /* Here we check the OFFSET_END, which is actually temporarily the
577          * granulepos value for this buffer */
578         if (GST_BUFFER_OFFSET_END (buf) != -1) {
579           valid = TRUE;
580           break;
581         }
582       }
583       if (!valid) {
584         GST_LOG_OBJECT (pad->collect.pad,
585             "No page timestamps in queue, can't dequeue");
586         return FALSE;
587       }
588     }
589
590     walk = g_slist_next (walk);
591   }
592
593   walk = mux->collect->data;
594   while (walk) {
595     GstOggPad *pad = (GstOggPad *) walk->data;
596
597     /* any page with a granulepos of -1 can be pushed immediately.
598      * TODO: it CAN be, but it seems silly to do so? */
599     buf = g_queue_peek_head (pad->pagebuffers);
600     while (buf && GST_BUFFER_OFFSET_END (buf) == -1) {
601       GST_LOG_OBJECT (pad->collect.pad, "[gp        -1] pushing page");
602       g_queue_pop_head (pad->pagebuffers);
603       *flowret = gst_ogg_mux_push_buffer (mux, buf);
604       buf = g_queue_peek_head (pad->pagebuffers);
605       ret = TRUE;
606     }
607
608     if (buf) {
609       /* if no oldest buffer yet, take this one */
610       if (oldest == GST_CLOCK_TIME_NONE) {
611         GST_LOG_OBJECT (mux, "no oldest yet, taking buffer %p from pad %"
612             GST_PTR_FORMAT " with gp time %" GST_TIME_FORMAT,
613             buf, pad->collect.pad, GST_TIME_ARGS (GST_BUFFER_OFFSET (buf)));
614         oldest = GST_BUFFER_OFFSET (buf);
615         opad = pad;
616       } else {
617         /* if we have an oldest, compare with this one */
618         if (GST_BUFFER_OFFSET (buf) < oldest) {
619           GST_LOG_OBJECT (mux, "older buffer %p, taking from pad %"
620               GST_PTR_FORMAT " with gp time %" GST_TIME_FORMAT,
621               buf, pad->collect.pad, GST_TIME_ARGS (GST_BUFFER_OFFSET (buf)));
622           oldest = GST_BUFFER_OFFSET (buf);
623           opad = pad;
624         }
625       }
626     }
627     walk = g_slist_next (walk);
628   }
629
630   if (oldest != GST_CLOCK_TIME_NONE) {
631     g_assert (opad);
632     buf = g_queue_pop_head (opad->pagebuffers);
633     GST_LOG_OBJECT (opad->collect.pad,
634         GST_GP_FORMAT " pushing oldest page buffer %p (granulepos time %"
635         GST_TIME_FORMAT ")", GST_BUFFER_OFFSET_END (buf), buf,
636         GST_TIME_ARGS (GST_BUFFER_OFFSET (buf)));
637     *flowret = gst_ogg_mux_push_buffer (mux, buf);
638     ret = TRUE;
639   }
640
641   return ret;
642 }
643
644 /* put the given ogg page on a per-pad queue, timestamping it correctly.
645  * after that, dequeue and push as many pages as possible.
646  * Caller should make sure:
647  * pad->timestamp     was set with the timestamp of the first packet put
648  *                    on the page
649  * pad->timestamp_end was set with the timestamp + duration of the last packet
650  *                    put on the page
651  * pad->gp_time       was set with the time matching the gp of the last
652  *                    packet put on the page
653  *
654  * will also reset timestamp and timestamp_end, so caller func can restart
655  * counting.
656  */
657 static GstFlowReturn
658 gst_ogg_mux_pad_queue_page (GstOggMux * mux, GstOggPad * pad, ogg_page * page,
659     gboolean delta)
660 {
661   GstFlowReturn ret;
662   GstBuffer *buffer = gst_ogg_mux_buffer_from_page (mux, page, delta);
663
664   /* take the timestamp of the first packet on this page */
665   GST_BUFFER_TIMESTAMP (buffer) = pad->timestamp;
666   GST_BUFFER_DURATION (buffer) = pad->timestamp_end - pad->timestamp;
667   /* take the gp time of the last completed packet on this page */
668   GST_BUFFER_OFFSET (buffer) = pad->gp_time;
669
670   /* the next page will start where the current page's end time leaves off */
671   pad->timestamp = pad->timestamp_end;
672
673   g_queue_push_tail (pad->pagebuffers, buffer);
674   GST_LOG_OBJECT (pad->collect.pad, GST_GP_FORMAT
675       " queued buffer page %p (gp time %"
676       GST_TIME_FORMAT ", timestamp %" GST_TIME_FORMAT
677       "), %d page buffers queued", ogg_page_granulepos (page),
678       buffer, GST_TIME_ARGS (GST_BUFFER_OFFSET (buffer)),
679       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
680       g_queue_get_length (pad->pagebuffers));
681
682   while (gst_ogg_mux_dequeue_page (mux, &ret)) {
683     if (ret != GST_FLOW_OK)
684       break;
685   }
686
687   return ret;
688 }
689
690 /*
691  * Given two pads, compare the buffers queued on it.
692  * Returns:
693  *  0 if they have an equal priority
694  * -1 if the first is better
695  *  1 if the second is better
696  * Priority decided by: a) validity, b) older timestamp, c) smaller number
697  * of muxed pages
698  */
699 static gint
700 gst_ogg_mux_compare_pads (GstOggMux * ogg_mux, GstOggPad * first,
701     GstOggPad * second)
702 {
703   guint64 firsttime, secondtime;
704
705   /* if the first pad doesn't contain anything or is even NULL, return
706    * the second pad as best candidate and vice versa */
707   if (first == NULL || (first->buffer == NULL && first->next_buffer == NULL))
708     return 1;
709   if (second == NULL || (second->buffer == NULL && second->next_buffer == NULL))
710     return -1;
711
712   /* no timestamp on first buffer, it must go first */
713   if (first->buffer)
714     firsttime = GST_BUFFER_TIMESTAMP (first->buffer);
715   else
716     firsttime = GST_BUFFER_TIMESTAMP (first->next_buffer);
717   if (firsttime == GST_CLOCK_TIME_NONE)
718     return -1;
719
720   /* no timestamp on second buffer, it must go first */
721   if (second->buffer)
722     secondtime = GST_BUFFER_TIMESTAMP (second->buffer);
723   else
724     secondtime = GST_BUFFER_TIMESTAMP (second->next_buffer);
725   if (secondtime == GST_CLOCK_TIME_NONE)
726     return 1;
727
728   /* first buffer has higher timestamp, second one should go first */
729   if (secondtime < firsttime)
730     return 1;
731   /* second buffer has higher timestamp, first one should go first */
732   else if (secondtime > firsttime)
733     return -1;
734   else {
735     /* buffers with equal timestamps, prefer the pad that has the
736      * least number of pages muxed */
737     if (second->pageno < first->pageno)
738       return 1;
739     else if (second->pageno > first->pageno)
740       return -1;
741   }
742
743   /* same priority if all of the above failed */
744   return 0;
745 }
746
747 /* make sure at least one buffer is queued on all pads, two if possible
748  * 
749  * if pad->buffer == NULL, pad->next_buffer !=  NULL, then
750  *   we do not know if the buffer is the last or not
751  * if pad->buffer != NULL, pad->next_buffer != NULL, then
752  *   pad->buffer is not the last buffer for the pad
753  * if pad->buffer != NULL, pad->next_buffer == NULL, then
754  *   pad->buffer if the last buffer for the pad
755  * 
756  * returns a pointer to an oggpad that holds the best buffer, or
757  * NULL when no pad was usable. "best" means the buffer marked
758  * with the lowest timestamp. If best->buffer == NULL then nothing
759  * should be done until more data arrives */
760 static GstOggPad *
761 gst_ogg_mux_queue_pads (GstOggMux * ogg_mux)
762 {
763   GstOggPad *bestpad = NULL, *still_hungry = NULL;
764   GSList *walk;
765
766   /* try to make sure we have a buffer from each usable pad first */
767   walk = ogg_mux->collect->data;
768   while (walk) {
769     GstOggPad *pad;
770     GstCollectData *data;
771
772     data = (GstCollectData *) walk->data;
773     pad = (GstOggPad *) data;
774
775     walk = g_slist_next (walk);
776
777     GST_LOG_OBJECT (data->pad, "looking at pad for buffer");
778
779     /* try to get a new buffer for this pad if needed and possible */
780     if (pad->buffer == NULL) {
781       GstBuffer *buf;
782       gboolean incaps;
783
784       /* shift the buffer along if needed (it's okay if next_buffer is NULL) */
785       if (pad->buffer == NULL) {
786         GST_LOG_OBJECT (data->pad, "shifting buffer %" GST_PTR_FORMAT,
787             pad->next_buffer);
788         pad->buffer = pad->next_buffer;
789         pad->next_buffer = NULL;
790       }
791
792       buf = gst_collect_pads_pop (ogg_mux->collect, data);
793       GST_LOG_OBJECT (data->pad, "popped buffer %" GST_PTR_FORMAT, buf);
794
795       /* On EOS we get a NULL buffer */
796       if (buf != NULL) {
797         incaps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS);
798         /* if we need headers */
799         if (pad->state == GST_OGG_PAD_STATE_CONTROL) {
800           /* and we have one */
801           if (incaps) {
802             GST_DEBUG_OBJECT (ogg_mux,
803                 "got incaps buffer in control state, ignoring");
804             /* just ignore */
805             gst_buffer_unref (buf);
806             buf = NULL;
807           } else {
808             GST_DEBUG_OBJECT (ogg_mux,
809                 "got data buffer in control state, switching " "to data mode");
810             /* this is a data buffer so switch to data state */
811             pad->state = GST_OGG_PAD_STATE_DATA;
812           }
813         }
814       } else {
815         GST_DEBUG_OBJECT (data->pad, "EOS on pad");
816         if (!pad->eos) {
817           ogg_page page;
818           GstFlowReturn ret;
819
820           /* it's no longer active */
821           ogg_mux->active_pads--;
822
823           /* Just gone to EOS. Flush existing page(s) */
824           pad->eos = TRUE;
825
826           while (ogg_stream_flush (&pad->stream, &page)) {
827             /* Place page into the per-pad queue */
828             ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
829                 pad->first_delta);
830             /* increment the page number counter */
831             pad->pageno++;
832             /* mark other pages as delta */
833             pad->first_delta = TRUE;
834           }
835         }
836       }
837
838       pad->next_buffer = buf;
839     }
840
841     /* we should have a buffer now, see if it is the best pad to
842      * pull on */
843     if (pad->buffer || pad->next_buffer) {
844       if (gst_ogg_mux_compare_pads (ogg_mux, bestpad, pad) > 0) {
845         GST_LOG_OBJECT (data->pad,
846             "new best pad, with buffers %" GST_PTR_FORMAT
847             " and %" GST_PTR_FORMAT, pad->buffer, pad->next_buffer);
848
849         bestpad = pad;
850       }
851     } else if (!pad->eos) {
852       GST_LOG_OBJECT (data->pad, "hungry pad");
853       still_hungry = pad;
854     }
855   }
856
857   if (still_hungry)
858     /* drop back into collectpads... */
859     return still_hungry;
860   else
861     return bestpad;
862 }
863
864 static GList *
865 gst_ogg_mux_get_headers (GstOggPad * pad)
866 {
867   GList *res = NULL;
868   GstOggMux *ogg_mux;
869   GstStructure *structure;
870   GstCaps *caps;
871   GstPad *thepad;
872
873   thepad = pad->collect.pad;
874
875   ogg_mux = GST_OGG_MUX (GST_PAD_PARENT (thepad));
876
877   GST_LOG_OBJECT (thepad, "getting headers");
878
879   caps = gst_pad_get_negotiated_caps (thepad);
880   if (caps != NULL) {
881     const GValue *streamheader;
882
883     structure = gst_caps_get_structure (caps, 0);
884     if (strcmp (gst_structure_get_name (structure), "video/x-dirac") == 0) {
885       GstBuffer *buf = gst_buffer_new_and_alloc (16);
886       int fps_n = 12;
887       int fps_d = 1;
888
889       gst_structure_get_fraction (structure, "framerate", &fps_n, &fps_d);
890
891       memcpy (GST_BUFFER_DATA (buf), "KW-DIRAC", 8);
892       GST_WRITE_UINT32_BE (GST_BUFFER_DATA (buf) + 8, fps_n);
893       GST_WRITE_UINT32_BE (GST_BUFFER_DATA (buf) + 12, fps_d);
894
895       res = g_list_append (res, buf);
896
897       //res = g_list_append (res, gst_buffer_ref(pad->buffer));
898     } else {
899       streamheader = gst_structure_get_value (structure, "streamheader");
900       if (streamheader != NULL) {
901         GST_LOG_OBJECT (thepad, "got header");
902         if (G_VALUE_TYPE (streamheader) == GST_TYPE_ARRAY) {
903           GArray *bufarr = g_value_peek_pointer (streamheader);
904           gint i;
905
906           GST_LOG_OBJECT (thepad, "got fixed list");
907
908           for (i = 0; i < bufarr->len; i++) {
909             GValue *bufval = &g_array_index (bufarr, GValue, i);
910
911             GST_LOG_OBJECT (thepad, "item %d", i);
912             if (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER) {
913               GstBuffer *buf = g_value_peek_pointer (bufval);
914
915               GST_LOG_OBJECT (thepad, "adding item %d to header list", i);
916
917               gst_buffer_ref (buf);
918               res = g_list_append (res, buf);
919             }
920           }
921         } else {
922           GST_LOG_OBJECT (thepad, "streamheader is not fixed list");
923         }
924       } else {
925         GST_LOG_OBJECT (thepad, "caps don't have streamheader");
926       }
927     }
928     gst_caps_unref (caps);
929   } else {
930     GST_LOG_OBJECT (thepad, "got empty caps as negotiated format");
931   }
932   return res;
933 }
934
935 static GstCaps *
936 gst_ogg_mux_set_header_on_caps (GstCaps * caps, GList * buffers)
937 {
938   GstStructure *structure;
939   GValue array = { 0 };
940   GList *walk = buffers;
941
942   caps = gst_caps_make_writable (caps);
943
944   structure = gst_caps_get_structure (caps, 0);
945
946   /* put buffers in a fixed list */
947   g_value_init (&array, GST_TYPE_ARRAY);
948
949   while (walk) {
950     GstBuffer *buf = GST_BUFFER (walk->data);
951     GstBuffer *copy;
952     GValue value = { 0 };
953
954     walk = walk->next;
955
956     /* mark buffer */
957     GST_LOG ("Setting IN_CAPS on buffer of length %d", GST_BUFFER_SIZE (buf));
958     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_IN_CAPS);
959
960     g_value_init (&value, GST_TYPE_BUFFER);
961     copy = gst_buffer_copy (buf);
962     gst_value_set_buffer (&value, copy);
963     gst_buffer_unref (copy);
964     gst_value_array_append_value (&array, &value);
965     g_value_unset (&value);
966   }
967   gst_structure_set_value (structure, "streamheader", &array);
968   g_value_unset (&array);
969
970   return caps;
971 }
972
973 /*
974  * For each pad we need to write out one (small) header in one
975  * page that allows decoders to identify the type of the stream.
976  * After that we need to write out all extra info for the decoders.
977  * In the case of a codec that also needs data as configuration, we can
978  * find that info in the streamcaps. 
979  * After writing the headers we must start a new page for the data.
980  */
981 static GstFlowReturn
982 gst_ogg_mux_send_headers (GstOggMux * mux)
983 {
984   GSList *walk;
985   GList *hbufs, *hwalk;
986   GstCaps *caps;
987   GstFlowReturn ret;
988
989   hbufs = NULL;
990   ret = GST_FLOW_OK;
991
992   GST_LOG_OBJECT (mux, "collecting headers");
993
994   walk = mux->collect->data;
995   while (walk) {
996     GstOggPad *pad;
997     GstPad *thepad;
998
999     pad = (GstOggPad *) walk->data;
1000     thepad = pad->collect.pad;
1001
1002     walk = g_slist_next (walk);
1003
1004     GST_LOG_OBJECT (mux, "looking at pad %s:%s", GST_DEBUG_PAD_NAME (thepad));
1005
1006     /* if the pad has no buffer, we don't care */
1007     if (pad->buffer == NULL && pad->next_buffer == NULL)
1008       continue;
1009
1010     /* now figure out the headers */
1011     pad->headers = gst_ogg_mux_get_headers (pad);
1012   }
1013
1014   GST_LOG_OBJECT (mux, "creating BOS pages");
1015   walk = mux->collect->data;
1016   while (walk) {
1017     GstOggPad *pad;
1018     GstBuffer *buf;
1019     ogg_packet packet;
1020     ogg_page page;
1021     GstPad *thepad;
1022     GstCaps *caps;
1023     GstStructure *structure;
1024     GstBuffer *hbuf;
1025
1026     pad = (GstOggPad *) walk->data;
1027     thepad = pad->collect.pad;
1028     caps = gst_pad_get_negotiated_caps (thepad);
1029     structure = gst_caps_get_structure (caps, 0);
1030
1031     walk = walk->next;
1032
1033     pad->packetno = 0;
1034
1035     GST_LOG_OBJECT (thepad, "looping over headers");
1036
1037     if (pad->headers) {
1038       buf = GST_BUFFER (pad->headers->data);
1039       pad->headers = g_list_remove (pad->headers, buf);
1040     } else if (pad->buffer) {
1041       buf = pad->buffer;
1042       gst_buffer_ref (buf);
1043     } else if (pad->next_buffer) {
1044       buf = pad->next_buffer;
1045       gst_buffer_ref (buf);
1046     } else {
1047       /* fixme -- should be caught in the previous list traversal. */
1048       GST_OBJECT_LOCK (pad);
1049       g_critical ("No headers or buffers on pad %s:%s",
1050           GST_DEBUG_PAD_NAME (pad));
1051       GST_OBJECT_UNLOCK (pad);
1052       continue;
1053     }
1054
1055     /* create a packet from the buffer */
1056     packet.packet = GST_BUFFER_DATA (buf);
1057     packet.bytes = GST_BUFFER_SIZE (buf);
1058     packet.granulepos = GST_BUFFER_OFFSET_END (buf);
1059     if (packet.granulepos == -1)
1060       packet.granulepos = 0;
1061     /* mark BOS and packet number */
1062     packet.b_o_s = (pad->packetno == 0);
1063     packet.packetno = pad->packetno++;
1064     /* mark EOS */
1065     packet.e_o_s = 0;
1066
1067     /* swap the packet in */
1068     ogg_stream_packetin (&pad->stream, &packet);
1069     gst_buffer_unref (buf);
1070
1071     GST_LOG_OBJECT (thepad, "flushing out BOS page");
1072     if (!ogg_stream_flush (&pad->stream, &page))
1073       g_critical ("Could not flush BOS page");
1074
1075     hbuf = gst_ogg_mux_buffer_from_page (mux, &page, FALSE);
1076
1077     GST_LOG_OBJECT (mux, "swapped out page with mime type %s",
1078         gst_structure_get_name (structure));
1079
1080     /* quick hack: put theora pages at the front.
1081      * Ideally, we would have a settable enum for which Ogg
1082      * profile we work with, and order based on that */
1083     if (strcmp (gst_structure_get_name (structure), "video/x-theora") == 0) {
1084       GST_DEBUG_OBJECT (thepad, "putting Theora page at the front");
1085       hbufs = g_list_prepend (hbufs, hbuf);
1086     } else {
1087       hbufs = g_list_append (hbufs, hbuf);
1088     }
1089     gst_caps_unref (caps);
1090   }
1091
1092   GST_LOG_OBJECT (mux, "creating next headers");
1093   walk = mux->collect->data;
1094   while (walk) {
1095     GstOggPad *pad;
1096     GstPad *thepad;
1097
1098     pad = (GstOggPad *) walk->data;
1099     thepad = pad->collect.pad;
1100
1101     walk = walk->next;
1102
1103     GST_LOG_OBJECT (mux, "looping over headers for pad %s:%s",
1104         GST_DEBUG_PAD_NAME (thepad));
1105
1106     hwalk = pad->headers;
1107     while (hwalk) {
1108       GstBuffer *buf = GST_BUFFER (hwalk->data);
1109       ogg_packet packet;
1110       ogg_page page;
1111
1112       hwalk = hwalk->next;
1113
1114       /* create a packet from the buffer */
1115       packet.packet = GST_BUFFER_DATA (buf);
1116       packet.bytes = GST_BUFFER_SIZE (buf);
1117       packet.granulepos = GST_BUFFER_OFFSET_END (buf);
1118       if (packet.granulepos == -1)
1119         packet.granulepos = 0;
1120       /* mark BOS and packet number */
1121       packet.b_o_s = (pad->packetno == 0);
1122       packet.packetno = pad->packetno++;
1123       /* mark EOS */
1124       packet.e_o_s = 0;
1125
1126       /* swap the packet in */
1127       ogg_stream_packetin (&pad->stream, &packet);
1128       gst_buffer_unref (buf);
1129
1130       /* if last header, flush page */
1131       if (hwalk == NULL) {
1132         GST_LOG_OBJECT (mux,
1133             "flushing page as packet %" G_GUINT64_FORMAT " is first or "
1134             "last packet", pad->packetno);
1135         while (ogg_stream_flush (&pad->stream, &page)) {
1136           GstBuffer *hbuf = gst_ogg_mux_buffer_from_page (mux, &page, FALSE);
1137
1138           GST_LOG_OBJECT (mux, "swapped out page");
1139           hbufs = g_list_append (hbufs, hbuf);
1140         }
1141       } else {
1142         GST_LOG_OBJECT (mux, "try to swap out page");
1143         /* just try to swap out a page then */
1144         while (ogg_stream_pageout (&pad->stream, &page) > 0) {
1145           GstBuffer *hbuf = gst_ogg_mux_buffer_from_page (mux, &page, FALSE);
1146
1147           GST_LOG_OBJECT (mux, "swapped out page");
1148           hbufs = g_list_append (hbufs, hbuf);
1149         }
1150       }
1151     }
1152     g_list_free (pad->headers);
1153     pad->headers = NULL;
1154   }
1155   /* hbufs holds all buffers for the headers now */
1156
1157   /* create caps with the buffers */
1158   caps = gst_pad_get_caps (mux->srcpad);
1159   if (caps) {
1160     caps = gst_ogg_mux_set_header_on_caps (caps, hbufs);
1161     gst_pad_set_caps (mux->srcpad, caps);
1162     gst_caps_unref (caps);
1163   }
1164   /* and send the buffers */
1165   hwalk = hbufs;
1166   while (hwalk) {
1167     GstBuffer *buf = GST_BUFFER (hwalk->data);
1168
1169     hwalk = hwalk->next;
1170
1171     if ((ret = gst_ogg_mux_push_buffer (mux, buf)) != GST_FLOW_OK)
1172       break;
1173   }
1174   g_list_free (hbufs);
1175
1176   return ret;
1177 }
1178
1179 /* this function is called to process data on the best pending pad.
1180  *
1181  * basic idea:
1182  *
1183  * 1) store the selected pad and keep on pulling until we fill a
1184  *    complete ogg page or the ogg page is filled above the max-delay
1185  *    threshold. This is needed because the ogg spec says that
1186  *    you should fill a complete page with data from the same logical
1187  *    stream. When the page is filled, go back to 1).
1188  * 2) before filling a page, read ahead one more buffer to see if this
1189  *    packet is the last of the stream. We need to do this because the ogg
1190  *    spec mandates that the last packet should have the EOS flag set before
1191  *    sending it to ogg. if pad->buffer is NULL we need to wait to find out
1192  *    whether there are any more buffers.
1193  * 3) pages get queued on a per-pad queue. Every time a page is queued, a
1194  *    dequeue is called, which will dequeue the oldest page on any pad, provided
1195  *    that ALL pads have at least one marked page in the queue (or remaining
1196  *    pads are at EOS)
1197  */
1198 static GstFlowReturn
1199 gst_ogg_mux_process_best_pad (GstOggMux * ogg_mux, GstOggPad * best)
1200 {
1201   gboolean delta_unit;
1202   GstFlowReturn ret;
1203   gint64 granulepos = 0;
1204   GstClockTime timestamp, gp_time;
1205
1206   GST_LOG_OBJECT (ogg_mux, "best pad %" GST_PTR_FORMAT
1207       ", currently pulling from %" GST_PTR_FORMAT, best->collect.pad,
1208       ogg_mux->pulling);
1209
1210   /* best->buffer is non-NULL, either the pad is EOS's or there is a next 
1211    * buffer */
1212   if (best->next_buffer == NULL && !best->eos) {
1213     GST_WARNING_OBJECT (ogg_mux, "no subsequent buffer and EOS not reached");
1214     return GST_FLOW_WRONG_STATE;
1215   }
1216
1217   /* if we were already pulling from one pad, but the new "best" buffer is
1218    * from another pad, we need to check if we have reason to flush a page
1219    * for the pad we were pulling from before */
1220   if (ogg_mux->pulling && best &&
1221       ogg_mux->pulling != best && ogg_mux->pulling->buffer) {
1222     GstOggPad *pad = ogg_mux->pulling;
1223
1224     GstClockTime last_ts = GST_BUFFER_END_TIME (pad->buffer);
1225
1226     /* if the next packet in the current page is going to make the page
1227      * too long, we need to flush */
1228     if (last_ts > ogg_mux->next_ts + ogg_mux->max_delay) {
1229       ogg_page page;
1230
1231       GST_LOG_OBJECT (pad->collect.pad,
1232           GST_GP_FORMAT " stored packet %" G_GINT64_FORMAT
1233           " will make page too long, flushing",
1234           GST_BUFFER_OFFSET_END (pad->buffer), pad->stream.packetno);
1235
1236       while (ogg_stream_flush (&pad->stream, &page)) {
1237         /* end time of this page is the timestamp of the next buffer */
1238         ogg_mux->pulling->timestamp_end = GST_BUFFER_TIMESTAMP (pad->buffer);
1239         /* Place page into the per-pad queue */
1240         ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
1241             pad->first_delta);
1242         /* increment the page number counter */
1243         pad->pageno++;
1244         /* mark other pages as delta */
1245         pad->first_delta = TRUE;
1246       }
1247       pad->new_page = TRUE;
1248       ogg_mux->pulling = NULL;
1249     }
1250   }
1251
1252   /* if we don't know which pad to pull on, use the best one */
1253   if (ogg_mux->pulling == NULL) {
1254     ogg_mux->pulling = best;
1255     GST_LOG_OBJECT (ogg_mux->pulling->collect.pad, "pulling from best pad");
1256
1257     /* remember timestamp and gp time of first buffer for this new pad */
1258     if (ogg_mux->pulling != NULL) {
1259       ogg_mux->next_ts = GST_BUFFER_TIMESTAMP (ogg_mux->pulling->buffer);
1260       GST_LOG_OBJECT (ogg_mux->pulling->collect.pad, "updated times, next ts %"
1261           GST_TIME_FORMAT, GST_TIME_ARGS (ogg_mux->next_ts));
1262     } else {
1263       /* no pad to pull on, send EOS */
1264       gst_pad_push_event (ogg_mux->srcpad, gst_event_new_eos ());
1265       return GST_FLOW_WRONG_STATE;
1266     }
1267   }
1268
1269   if (ogg_mux->need_headers) {
1270     ret = gst_ogg_mux_send_headers (ogg_mux);
1271     ogg_mux->need_headers = FALSE;
1272   }
1273
1274   /* we are pulling from a pad, continue to do so until a page
1275    * has been filled and queued */
1276   if (ogg_mux->pulling != NULL) {
1277     ogg_packet packet;
1278     ogg_page page;
1279     GstBuffer *buf, *tmpbuf;
1280     GstOggPad *pad = ogg_mux->pulling;
1281     gint64 duration;
1282     gboolean force_flush;
1283
1284     GST_LOG_OBJECT (ogg_mux->pulling->collect.pad, "pulling from pad");
1285
1286     /* now see if we have a buffer */
1287     buf = pad->buffer;
1288     if (buf == NULL) {
1289       GST_DEBUG_OBJECT (ogg_mux, "pad was EOS");
1290       ogg_mux->pulling = NULL;
1291       return GST_FLOW_OK;
1292     }
1293
1294     delta_unit = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT);
1295     duration = GST_BUFFER_DURATION (buf);
1296
1297     /* if the current "next timestamp" on the pad is unset, then this is the
1298      * first packet on the new page.  Update our pad's page timestamp */
1299     if (ogg_mux->pulling->timestamp == GST_CLOCK_TIME_NONE) {
1300       ogg_mux->pulling->timestamp = GST_BUFFER_TIMESTAMP (buf);
1301       GST_LOG_OBJECT (ogg_mux->pulling->collect.pad,
1302           "updated pad timestamp to %" GST_TIME_FORMAT,
1303           GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
1304     }
1305     /* create a packet from the buffer */
1306     packet.packet = GST_BUFFER_DATA (buf);
1307     packet.bytes = GST_BUFFER_SIZE (buf);
1308     packet.granulepos = GST_BUFFER_OFFSET_END (buf);
1309     if (packet.granulepos == -1)
1310       packet.granulepos = 0;
1311     /* mark BOS and packet number */
1312     packet.b_o_s = (pad->packetno == 0);
1313     packet.packetno = pad->packetno++;
1314     GST_LOG_OBJECT (pad->collect.pad, GST_GP_FORMAT
1315         " packet %" G_GINT64_FORMAT " (%ld bytes) created from buffer",
1316         packet.granulepos, packet.packetno, packet.bytes);
1317
1318     packet.e_o_s = (pad->eos ? 1 : 0);
1319     tmpbuf = NULL;
1320
1321     /* we flush when we see a new keyframe */
1322     force_flush = (pad->prev_delta && !delta_unit);
1323     if (duration != -1) {
1324       pad->duration += duration;
1325       /* if page duration exceeds max, flush page */
1326       if (pad->duration > ogg_mux->max_page_delay) {
1327         force_flush = TRUE;
1328         pad->duration = 0;
1329       }
1330     }
1331
1332     if (GST_BUFFER_IS_DISCONT (buf)) {
1333       packet.packetno++;
1334       /* No public API for this; hack things in */
1335       pad->stream.pageno++;
1336       force_flush = TRUE;
1337     }
1338
1339     /* flush the currently built page if necessary */
1340     if (force_flush) {
1341       GST_LOG_OBJECT (pad->collect.pad,
1342           GST_GP_FORMAT " forced flush of page before this packet",
1343           GST_BUFFER_OFFSET_END (pad->buffer));
1344       while (ogg_stream_flush (&pad->stream, &page)) {
1345         /* end time of this page is the timestamp of the next buffer */
1346         ogg_mux->pulling->timestamp_end = GST_BUFFER_TIMESTAMP (pad->buffer);
1347         ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
1348             pad->first_delta);
1349
1350         /* increment the page number counter */
1351         pad->pageno++;
1352         /* mark other pages as delta */
1353         pad->first_delta = TRUE;
1354       }
1355       pad->new_page = TRUE;
1356     }
1357
1358     /* if this is the first packet of a new page figure out the delta flag */
1359     if (pad->new_page) {
1360       if (delta_unit) {
1361         /* This page is a delta frame */
1362         if (ogg_mux->delta_pad == NULL) {
1363           /* we got a delta unit on this pad */
1364           ogg_mux->delta_pad = pad;
1365         }
1366         /* mark the page as delta */
1367         pad->first_delta = TRUE;
1368       } else {
1369         /* got a keyframe */
1370         if (ogg_mux->delta_pad == pad) {
1371           /* if we get it on the pad with deltaunits,
1372            * we mark the page as non delta */
1373           pad->first_delta = FALSE;
1374         } else if (ogg_mux->delta_pad != NULL) {
1375           /* if there are pads with delta frames, we
1376            * must mark this one as delta */
1377           pad->first_delta = TRUE;
1378         } else {
1379           pad->first_delta = FALSE;
1380         }
1381       }
1382       pad->new_page = FALSE;
1383     }
1384
1385     /* save key unit to track delta->key unit transitions */
1386     pad->prev_delta = delta_unit;
1387
1388     /* swap the packet in */
1389     if (packet.e_o_s == 1)
1390       GST_DEBUG_OBJECT (pad->collect.pad, "swapping in EOS packet");
1391     if (packet.b_o_s == 1)
1392       GST_DEBUG_OBJECT (pad->collect.pad, "swapping in BOS packet");
1393
1394     ogg_stream_packetin (&pad->stream, &packet);
1395
1396     gp_time = GST_BUFFER_OFFSET (pad->buffer);
1397     granulepos = GST_BUFFER_OFFSET_END (pad->buffer);
1398     timestamp = GST_BUFFER_TIMESTAMP (pad->buffer);
1399
1400     GST_LOG_OBJECT (pad->collect.pad,
1401         GST_GP_FORMAT " packet %" G_GINT64_FORMAT ", gp time %"
1402         GST_TIME_FORMAT ", timestamp %" GST_TIME_FORMAT " packetin'd",
1403         granulepos, packet.packetno, GST_TIME_ARGS (gp_time),
1404         GST_TIME_ARGS (timestamp));
1405     /* don't need the old buffer anymore */
1406     gst_buffer_unref (pad->buffer);
1407     /* store new readahead buffer */
1408     pad->buffer = tmpbuf;
1409
1410     /* let ogg write out the pages now. The packet we got could end
1411      * up in more than one page so we need to write them all */
1412     if (ogg_stream_pageout (&pad->stream, &page) > 0) {
1413       /* we have a new page, so we need to timestamp it correctly.
1414        * if this fresh packet ends on this page, then the page's granulepos
1415        * comes from that packet, and we should set this buffer's timestamp */
1416
1417       GST_LOG_OBJECT (pad->collect.pad,
1418           GST_GP_FORMAT " packet %" G_GINT64_FORMAT ", time %"
1419           GST_TIME_FORMAT ") caused new page",
1420           granulepos, packet.packetno, GST_TIME_ARGS (timestamp));
1421       GST_LOG_OBJECT (pad->collect.pad,
1422           GST_GP_FORMAT " new page %ld", ogg_page_granulepos (&page),
1423           pad->stream.pageno);
1424
1425       if (ogg_page_granulepos (&page) == granulepos) {
1426         /* the packet we streamed in finishes on the current page,
1427          * because the page's granulepos is the granulepos of the last
1428          * packet completed on that page,
1429          * so update the timestamp that we will give to the page */
1430         GST_LOG_OBJECT (pad->collect.pad,
1431             GST_GP_FORMAT
1432             " packet finishes on current page, updating gp time to %"
1433             GST_TIME_FORMAT, granulepos, GST_TIME_ARGS (gp_time));
1434         pad->gp_time = gp_time;
1435       } else {
1436         GST_LOG_OBJECT (pad->collect.pad,
1437             GST_GP_FORMAT
1438             " packet spans beyond current page, keeping old gp time %"
1439             GST_TIME_FORMAT, granulepos, GST_TIME_ARGS (pad->gp_time));
1440       }
1441
1442       /* push the page */
1443       /* end time of this page is the timestamp of the next buffer */
1444       pad->timestamp_end = timestamp;
1445       ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page, pad->first_delta);
1446       pad->pageno++;
1447       /* mark next pages as delta */
1448       pad->first_delta = TRUE;
1449
1450       /* use an inner loop here to flush the remaining pages and
1451        * mark them as delta frames as well */
1452       while (ogg_stream_pageout (&pad->stream, &page) > 0) {
1453         if (ogg_page_granulepos (&page) == granulepos) {
1454           /* the page has taken up the new packet completely, which means
1455            * the packet ends the page and we can update the gp time
1456            * before pushing out */
1457           pad->gp_time = gp_time;
1458         }
1459
1460         /* we have a complete page now, we can push the page
1461          * and make sure to pull on a new pad the next time around */
1462         ret = gst_ogg_mux_pad_queue_page (ogg_mux, pad, &page,
1463             pad->first_delta);
1464         /* increment the page number counter */
1465         pad->pageno++;
1466       }
1467       /* need a new page as well */
1468       pad->new_page = TRUE;
1469       pad->duration = 0;
1470       /* we're done pulling on this pad, make sure to choose a new
1471        * pad for pulling in the next iteration */
1472       ogg_mux->pulling = NULL;
1473     }
1474
1475     /* Update the gp time, if necessary, since any future page will have at
1476      * least this gp time.
1477      */
1478     if (pad->gp_time < gp_time) {
1479       pad->gp_time = gp_time;
1480       GST_LOG_OBJECT (pad->collect.pad,
1481           "Updated running gp time of pad %" GST_PTR_FORMAT
1482           " to %" GST_TIME_FORMAT, pad->collect.pad, GST_TIME_ARGS (gp_time));
1483     }
1484   }
1485
1486   return GST_FLOW_OK;
1487 }
1488
1489 /** all_pads_eos:
1490  *
1491  * Checks if all pads are EOS'd by peeking.
1492  *
1493  * Returns TRUE if all pads are EOS.
1494  */
1495 static gboolean
1496 all_pads_eos (GstCollectPads * pads)
1497 {
1498   GSList *walk;
1499   gboolean alleos = TRUE;
1500
1501   walk = pads->data;
1502   while (walk) {
1503     GstBuffer *buf;
1504     GstCollectData *data = (GstCollectData *) walk->data;
1505
1506     buf = gst_collect_pads_peek (pads, data);
1507     if (buf) {
1508       alleos = FALSE;
1509       gst_buffer_unref (buf);
1510       goto beach;
1511     }
1512     walk = walk->next;
1513   }
1514 beach:
1515   return alleos;
1516 }
1517
1518 /* This function is called when there is data on all pads.
1519  * 
1520  * It finds a pad to pull on, this is done by looking at the buffers
1521  * to decide which one to use, and using the 'oldest' one first. It then calls
1522  * gst_ogg_mux_process_best_pad() to process as much data as possible.
1523  * 
1524  * If all the pads have received EOS, it flushes out all data by continually
1525  * getting the best pad and calling gst_ogg_mux_process_best_pad() until they
1526  * are all empty, and then sends EOS.
1527  */
1528 static GstFlowReturn
1529 gst_ogg_mux_collected (GstCollectPads * pads, GstOggMux * ogg_mux)
1530 {
1531   GstOggPad *best;
1532   GstFlowReturn ret;
1533   gint activebefore;
1534
1535   GST_LOG_OBJECT (ogg_mux, "collected");
1536
1537   activebefore = ogg_mux->active_pads;
1538
1539   /* queue buffers on all pads; find a buffer with the lowest timestamp */
1540   best = gst_ogg_mux_queue_pads (ogg_mux);
1541   if (best && !best->buffer) {
1542     GST_DEBUG_OBJECT (ogg_mux, "No buffer available on best pad");
1543     return GST_FLOW_OK;
1544   }
1545
1546   if (!best) {
1547     return GST_FLOW_WRONG_STATE;
1548   }
1549
1550   ret = gst_ogg_mux_process_best_pad (ogg_mux, best);
1551
1552   if (ogg_mux->active_pads < activebefore) {
1553     /* If the active pad count went down, this mean at least one pad has gone
1554      * EOS. Since CollectPads only calls _collected() once when all pads are
1555      * EOS, and our code doesn't _pop() from all pads we need to check that by
1556      * peeking on all pads, else we won't be called again and the muxing will
1557      * not terminate (push out EOS). */
1558
1559     /* if all the pads have been removed, flush all pending data */
1560     if ((ret == GST_FLOW_OK) && all_pads_eos (pads)) {
1561       GST_LOG_OBJECT (ogg_mux, "no pads remaining, flushing data");
1562
1563       do {
1564         best = gst_ogg_mux_queue_pads (ogg_mux);
1565         if (best)
1566           ret = gst_ogg_mux_process_best_pad (ogg_mux, best);
1567       } while ((ret == GST_FLOW_OK) && (best != NULL));
1568
1569       GST_DEBUG_OBJECT (ogg_mux, "Pushing EOS");
1570       gst_pad_push_event (ogg_mux->srcpad, gst_event_new_eos ());
1571     }
1572   }
1573
1574   return ret;
1575 }
1576
1577 static void
1578 gst_ogg_mux_get_property (GObject * object,
1579     guint prop_id, GValue * value, GParamSpec * pspec)
1580 {
1581   GstOggMux *ogg_mux;
1582
1583   ogg_mux = GST_OGG_MUX (object);
1584
1585   switch (prop_id) {
1586     case ARG_MAX_DELAY:
1587       g_value_set_uint64 (value, ogg_mux->max_delay);
1588       break;
1589     case ARG_MAX_PAGE_DELAY:
1590       g_value_set_uint64 (value, ogg_mux->max_page_delay);
1591       break;
1592     default:
1593       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1594       break;
1595   }
1596 }
1597
1598 static void
1599 gst_ogg_mux_set_property (GObject * object,
1600     guint prop_id, const GValue * value, GParamSpec * pspec)
1601 {
1602   GstOggMux *ogg_mux;
1603
1604   ogg_mux = GST_OGG_MUX (object);
1605
1606   switch (prop_id) {
1607     case ARG_MAX_DELAY:
1608       ogg_mux->max_delay = g_value_get_uint64 (value);
1609       break;
1610     case ARG_MAX_PAGE_DELAY:
1611       ogg_mux->max_page_delay = g_value_get_uint64 (value);
1612       break;
1613     default:
1614       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1615       break;
1616   }
1617 }
1618
1619 /* reset all variables in the ogg pads. */
1620 static void
1621 gst_ogg_mux_init_collectpads (GstCollectPads * collect)
1622 {
1623   GSList *walk;
1624
1625   walk = collect->data;
1626   while (walk) {
1627     GstOggPad *oggpad = (GstOggPad *) walk->data;
1628
1629     ogg_stream_init (&oggpad->stream, oggpad->serial);
1630     oggpad->packetno = 0;
1631     oggpad->pageno = 0;
1632     oggpad->eos = FALSE;
1633     /* we assume there will be some control data first for this pad */
1634     oggpad->state = GST_OGG_PAD_STATE_CONTROL;
1635     oggpad->new_page = TRUE;
1636     oggpad->first_delta = FALSE;
1637     oggpad->prev_delta = FALSE;
1638     oggpad->pagebuffers = g_queue_new ();
1639
1640     walk = g_slist_next (walk);
1641   }
1642 }
1643
1644 /* Clear all buffers from the collectpads object */
1645 static void
1646 gst_ogg_mux_clear_collectpads (GstCollectPads * collect)
1647 {
1648   GSList *walk;
1649
1650   for (walk = collect->data; walk; walk = g_slist_next (walk)) {
1651     GstOggPad *oggpad = (GstOggPad *) walk->data;
1652     GstBuffer *buf;
1653
1654     ogg_stream_clear (&oggpad->stream);
1655
1656     while ((buf = g_queue_pop_head (oggpad->pagebuffers)) != NULL) {
1657       gst_buffer_unref (buf);
1658     }
1659     g_queue_free (oggpad->pagebuffers);
1660     oggpad->pagebuffers = NULL;
1661   }
1662 }
1663
1664 static GstStateChangeReturn
1665 gst_ogg_mux_change_state (GstElement * element, GstStateChange transition)
1666 {
1667   GstOggMux *ogg_mux;
1668   GstStateChangeReturn ret;
1669
1670   ogg_mux = GST_OGG_MUX (element);
1671
1672   switch (transition) {
1673     case GST_STATE_CHANGE_NULL_TO_READY:
1674       break;
1675     case GST_STATE_CHANGE_READY_TO_PAUSED:
1676       gst_ogg_mux_clear (ogg_mux);
1677       gst_ogg_mux_init_collectpads (ogg_mux->collect);
1678       gst_collect_pads_start (ogg_mux->collect);
1679       break;
1680     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1681       break;
1682     case GST_STATE_CHANGE_PAUSED_TO_READY:
1683       gst_collect_pads_stop (ogg_mux->collect);
1684       break;
1685     default:
1686       break;
1687   }
1688
1689   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1690
1691   switch (transition) {
1692     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1693       break;
1694     case GST_STATE_CHANGE_PAUSED_TO_READY:
1695       gst_ogg_mux_clear_collectpads (ogg_mux->collect);
1696       break;
1697     case GST_STATE_CHANGE_READY_TO_NULL:
1698       break;
1699     default:
1700       break;
1701   }
1702
1703   return ret;
1704 }
1705
1706 gboolean
1707 gst_ogg_mux_plugin_init (GstPlugin * plugin)
1708 {
1709   GST_DEBUG_CATEGORY_INIT (gst_ogg_mux_debug, "oggmux", 0, "ogg muxer");
1710
1711   return gst_element_register (plugin, "oggmux", GST_RANK_NONE,
1712       GST_TYPE_OGG_MUX);
1713 }