13afdc6572ba0412d03680569585527c99cc0cb2
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
1 /* GStreamer aggregator base class
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @title: GstAggregator
25  * @short_description: manages a set of pads with the purpose of
26  * aggregating their buffers.
27  * @see_also: gstcollectpads for historical reasons.
28  *
29  * Manages a set of pads with the purpose of aggregating their buffers.
30  * Control is given to the subclass when all pads have data.
31  *
32  *  * Base class for mixers and muxers. Subclasses should at least implement
33  *    the #GstAggregatorClass.aggregate() virtual method.
34  *
35  *  * Installs a #GstPadChainFunction, a #GstPadEventFullFunction and a
36  *    #GstPadQueryFunction to queue all serialized data packets per sink pad.
37  *    Subclasses should not overwrite those, but instead implement
38  *    #GstAggregatorClass.sink_event() and #GstAggregatorClass.sink_query() as
39  *    needed.
40  *
41  *  * When data is queued on all pads, the aggregate vmethod is called.
42  *
43  *  * One can peek at the data on any given GstAggregatorPad with the
44  *    gst_aggregator_pad_get_buffer () method, and take ownership of it
45  *    with the gst_aggregator_pad_steal_buffer () method. When a buffer
46  *    has been taken with steal_buffer (), a new buffer can be queued
47  *    on that pad.
48  *
49  *  * If the subclass wishes to push a buffer downstream in its aggregate
50  *    implementation, it should do so through the
51  *    gst_aggregator_finish_buffer () method. This method will take care
52  *    of sending and ordering mandatory events such as stream start, caps
53  *    and segment.
54  *
55  *  * Same goes for EOS events, which should not be pushed directly by the
56  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
57  *    implementation.
58  *
59  *  * Note that the aggregator logic regarding gap event handling is to turn
60  *    these into gap buffers with matching PTS and duration. It will also
61  *    flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
62  *    to ease their identification and subsequent processing.
63  *
64  */
65
66 #ifdef HAVE_CONFIG_H
67 #  include "config.h"
68 #endif
69
70 #include <string.h>             /* strlen */
71
72 #include "gstaggregator.h"
73
74 typedef enum
75 {
76   GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
77   GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
78   GST_AGGREGATOR_START_TIME_SELECTION_SET
79 } GstAggregatorStartTimeSelection;
80
81 static GType
82 gst_aggregator_start_time_selection_get_type (void)
83 {
84   static GType gtype = 0;
85
86   if (gtype == 0) {
87     static const GEnumValue values[] = {
88       {GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
89           "Start at 0 running time (default)", "zero"},
90       {GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
91           "Start at first observed input running time", "first"},
92       {GST_AGGREGATOR_START_TIME_SELECTION_SET,
93           "Set start time with start-time property", "set"},
94       {0, NULL, NULL}
95     };
96
97     gtype = g_enum_register_static ("GstAggregatorStartTimeSelection", values);
98   }
99   return gtype;
100 }
101
102 /*  Might become API */
103 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
104     const GstTagList * tags, GstTagMergeMode mode);
105 static void gst_aggregator_set_latency_property (GstAggregator * agg,
106     gint64 latency);
107 static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
108
109 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
110
111 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
112 #define GST_CAT_DEFAULT aggregator_debug
113
114 /* Locking order, locks in this element must always be taken in this order
115  *
116  * standard sink pad stream lock -> GST_PAD_STREAM_LOCK (aggpad)
117  * Aggregator pad flush lock -> PAD_FLUSH_LOCK(aggpad)
118  * standard src pad stream lock -> GST_PAD_STREAM_LOCK (srcpad)
119  * Aggregator src lock -> SRC_LOCK(agg) w/ SRC_WAIT/BROADCAST
120  * standard element object lock -> GST_OBJECT_LOCK(agg)
121  * Aggregator pad lock -> PAD_LOCK (aggpad) w/ PAD_WAIT/BROADCAST_EVENT(aggpad)
122  * standard src pad object lock -> GST_OBJECT_LOCK(srcpad)
123  * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad)
124  */
125
126 /* GstAggregatorPad definitions */
127 #define PAD_LOCK(pad)   G_STMT_START {                                  \
128   GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p",              \
129         g_thread_self());                                               \
130   g_mutex_lock(&pad->priv->lock);                                       \
131   GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p",                \
132         g_thread_self());                                               \
133   } G_STMT_END
134
135 #define PAD_UNLOCK(pad)  G_STMT_START {                                 \
136   GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p",           \
137       g_thread_self());                                                 \
138   g_mutex_unlock(&pad->priv->lock);                                     \
139   GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p",             \
140         g_thread_self());                                               \
141   } G_STMT_END
142
143
144 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
145   GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p",   \
146         g_thread_self());                                               \
147   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),           \
148       (&((GstAggregatorPad*)pad)->priv->lock));                         \
149   GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \
150         g_thread_self());                                               \
151   } G_STMT_END
152
153 #define PAD_BROADCAST_EVENT(pad) G_STMT_START {                        \
154   GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p",     \
155         g_thread_self());                                              \
156   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond));    \
157   } G_STMT_END
158
159
160 #define PAD_FLUSH_LOCK(pad)     G_STMT_START {                          \
161   GST_TRACE_OBJECT (pad, "Taking lock from thread %p",                  \
162         g_thread_self());                                               \
163   g_mutex_lock(&pad->priv->flush_lock);                                 \
164   GST_TRACE_OBJECT (pad, "Took lock from thread %p",                    \
165         g_thread_self());                                               \
166   } G_STMT_END
167
168 #define PAD_FLUSH_UNLOCK(pad)   G_STMT_START {                          \
169   GST_TRACE_OBJECT (pad, "Releasing lock from thread %p",               \
170         g_thread_self());                                               \
171   g_mutex_unlock(&pad->priv->flush_lock);                               \
172   GST_TRACE_OBJECT (pad, "Release lock from thread %p",                 \
173         g_thread_self());                                               \
174   } G_STMT_END
175
176 #define SRC_LOCK(self)   G_STMT_START {                             \
177   GST_TRACE_OBJECT (self, "Taking src lock from thread %p",         \
178       g_thread_self());                                             \
179   g_mutex_lock(&self->priv->src_lock);                              \
180   GST_TRACE_OBJECT (self, "Took src lock from thread %p",           \
181         g_thread_self());                                           \
182   } G_STMT_END
183
184 #define SRC_UNLOCK(self)  G_STMT_START {                            \
185   GST_TRACE_OBJECT (self, "Releasing src lock from thread %p",      \
186         g_thread_self());                                           \
187   g_mutex_unlock(&self->priv->src_lock);                            \
188   GST_TRACE_OBJECT (self, "Released src lock from thread %p",       \
189         g_thread_self());                                           \
190   } G_STMT_END
191
192 #define SRC_WAIT(self) G_STMT_START {                               \
193   GST_LOG_OBJECT (self, "Waiting for src on thread %p",             \
194         g_thread_self());                                           \
195   g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));    \
196   GST_LOG_OBJECT (self, "DONE Waiting for src on thread %p",        \
197         g_thread_self());                                           \
198   } G_STMT_END
199
200 #define SRC_BROADCAST(self) G_STMT_START {                          \
201     GST_LOG_OBJECT (self, "Signaling src from thread %p",           \
202         g_thread_self());                                           \
203     if (self->priv->aggregate_id)                                   \
204       gst_clock_id_unschedule (self->priv->aggregate_id);           \
205     g_cond_broadcast(&(self->priv->src_cond));                      \
206   } G_STMT_END
207
208 struct _GstAggregatorPadPrivate
209 {
210   /* Following fields are protected by the PAD_LOCK */
211   GstFlowReturn flow_return;
212   gboolean pending_flush_start;
213   gboolean pending_flush_stop;
214   gboolean pending_eos;
215
216   gboolean first_buffer;
217
218   GQueue data;                  /* buffers, events and queries */
219   GstBuffer *clipped_buffer;
220   guint num_buffers;
221   GstClockTime head_position;
222   GstClockTime tail_position;
223   GstClockTime head_time;       /* running time */
224   GstClockTime tail_time;
225   GstClockTime time_level;      /* how much head is ahead of tail */
226   GstSegment head_segment;      /* segment before the queue */
227
228   gboolean negotiated;
229
230   gboolean eos;
231
232   GMutex lock;
233   GCond event_cond;
234   /* This lock prevents a flush start processing happening while
235    * the chain function is also happening.
236    */
237   GMutex flush_lock;
238 };
239
240 /* Must be called with PAD_LOCK held */
241 static void
242 gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
243 {
244   aggpad->priv->pending_eos = FALSE;
245   aggpad->priv->eos = FALSE;
246   aggpad->priv->flow_return = GST_FLOW_OK;
247   GST_OBJECT_LOCK (aggpad);
248   gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
249   gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED);
250   GST_OBJECT_UNLOCK (aggpad);
251   aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
252   aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
253   aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
254   aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
255   aggpad->priv->time_level = 0;
256   aggpad->priv->first_buffer = TRUE;
257 }
258
259 static gboolean
260 gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
261 {
262   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
263
264   PAD_LOCK (aggpad);
265   gst_aggregator_pad_reset_unlocked (aggpad);
266   PAD_UNLOCK (aggpad);
267
268   if (klass->flush)
269     return klass->flush (aggpad, agg);
270
271   return TRUE;
272 }
273
274 /*************************************
275  * GstAggregator implementation  *
276  *************************************/
277 static GstElementClass *aggregator_parent_class = NULL;
278
279 /* All members are protected by the object lock unless otherwise noted */
280
281 struct _GstAggregatorPrivate
282 {
283   gint max_padserial;
284
285   /* Our state is >= PAUSED */
286   gboolean running;             /* protected by src_lock */
287
288   gint seqnum;
289   gboolean send_stream_start;   /* protected by srcpad stream lock */
290   gboolean send_segment;
291   gboolean flush_seeking;
292   gboolean pending_flush_start;
293   gboolean send_eos;            /* protected by srcpad stream lock */
294
295   GstCaps *srccaps;             /* protected by the srcpad stream lock */
296
297   GstTagList *tags;
298   gboolean tags_changed;
299
300   gboolean peer_latency_live;   /* protected by src_lock */
301   GstClockTime peer_latency_min;        /* protected by src_lock */
302   GstClockTime peer_latency_max;        /* protected by src_lock */
303   gboolean has_peer_latency;    /* protected by src_lock */
304
305   GstClockTime sub_latency_min; /* protected by src_lock */
306   GstClockTime sub_latency_max; /* protected by src_lock */
307
308   /* aggregate */
309   GstClockID aggregate_id;      /* protected by src_lock */
310   GMutex src_lock;
311   GCond src_cond;
312
313   gboolean first_buffer;        /* protected by object lock */
314   GstAggregatorStartTimeSelection start_time_selection;
315   GstClockTime start_time;
316
317   /* protected by the object lock */
318   GstQuery *allocation_query;
319   GstAllocator *allocator;
320   GstBufferPool *pool;
321   GstAllocationParams allocation_params;
322
323   /* properties */
324   gint64 latency;               /* protected by both src_lock and all pad locks */
325 };
326
327 /* Seek event forwarding helper */
328 typedef struct
329 {
330   /* parameters */
331   GstEvent *event;
332   gboolean flush;
333   gboolean only_to_active_pads;
334
335   /* results */
336   gboolean result;
337   gboolean one_actually_seeked;
338 } EventData;
339
340 #define DEFAULT_LATENCY              0
341 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
342 #define DEFAULT_START_TIME           (-1)
343
344 enum
345 {
346   PROP_0,
347   PROP_LATENCY,
348   PROP_START_TIME_SELECTION,
349   PROP_START_TIME,
350   PROP_LAST
351 };
352
353 static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
354     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
355
356 /**
357  * gst_aggregator_iterate_sinkpads:
358  * @self: The #GstAggregator
359  * @func: (scope call): The function to call.
360  * @user_data: (closure): The data to pass to @func.
361  *
362  * Iterate the sinkpads of aggregator to call a function on them.
363  *
364  * This method guarantees that @func will be called only once for each
365  * sink pad.
366  *
367  * Returns: %FALSE if there are no sinkpads or if @func returned %FALSE
368  */
369 gboolean
370 gst_aggregator_iterate_sinkpads (GstAggregator * self,
371     GstAggregatorPadForeachFunc func, gpointer user_data)
372 {
373   gboolean result = FALSE;
374   GstIterator *iter;
375   gboolean done = FALSE;
376   GValue item = { 0, };
377   GList *seen_pads = NULL;
378
379   iter = gst_element_iterate_sink_pads (GST_ELEMENT (self));
380
381   if (!iter)
382     goto no_iter;
383
384   while (!done) {
385     switch (gst_iterator_next (iter, &item)) {
386       case GST_ITERATOR_OK:
387       {
388         GstAggregatorPad *pad;
389
390         pad = g_value_get_object (&item);
391
392         /* if already pushed, skip. FIXME, find something faster to tag pads */
393         if (pad == NULL || g_list_find (seen_pads, pad)) {
394           g_value_reset (&item);
395           break;
396         }
397
398         GST_LOG_OBJECT (pad, "calling function %s on pad",
399             GST_DEBUG_FUNCPTR_NAME (func));
400
401         result = func (self, pad, user_data);
402
403         done = !result;
404
405         seen_pads = g_list_prepend (seen_pads, pad);
406
407         g_value_reset (&item);
408         break;
409       }
410       case GST_ITERATOR_RESYNC:
411         gst_iterator_resync (iter);
412         break;
413       case GST_ITERATOR_ERROR:
414         GST_ERROR_OBJECT (self,
415             "Could not iterate over internally linked pads");
416         done = TRUE;
417         break;
418       case GST_ITERATOR_DONE:
419         done = TRUE;
420         break;
421     }
422   }
423   g_value_unset (&item);
424   gst_iterator_free (iter);
425
426   if (seen_pads == NULL) {
427     GST_DEBUG_OBJECT (self, "No pad seen");
428     return FALSE;
429   }
430
431   g_list_free (seen_pads);
432
433 no_iter:
434   return result;
435 }
436
437 static gboolean
438 gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
439 {
440   return (g_queue_peek_tail (&pad->priv->data) == NULL &&
441       pad->priv->clipped_buffer == NULL);
442 }
443
444 static gboolean
445 gst_aggregator_check_pads_ready (GstAggregator * self)
446 {
447   GstAggregatorPad *pad;
448   GList *l, *sinkpads;
449   gboolean have_buffer = TRUE;
450   gboolean have_event_or_query = FALSE;
451
452   GST_LOG_OBJECT (self, "checking pads");
453
454   GST_OBJECT_LOCK (self);
455
456   sinkpads = GST_ELEMENT_CAST (self)->sinkpads;
457   if (sinkpads == NULL)
458     goto no_sinkpads;
459
460   for (l = sinkpads; l != NULL; l = l->next) {
461     pad = l->data;
462
463     PAD_LOCK (pad);
464
465     if (pad->priv->num_buffers == 0) {
466       if (!gst_aggregator_pad_queue_is_empty (pad))
467         have_event_or_query = TRUE;
468       if (!pad->priv->eos) {
469         have_buffer = FALSE;
470
471         /* If not live we need data on all pads, so leave the loop */
472         if (!self->priv->peer_latency_live) {
473           PAD_UNLOCK (pad);
474           goto pad_not_ready;
475         }
476       }
477     } else if (self->priv->peer_latency_live) {
478       /* In live mode, having a single pad with buffers is enough to
479        * generate a start time from it. In non-live mode all pads need
480        * to have a buffer
481        */
482       self->priv->first_buffer = FALSE;
483     }
484
485     PAD_UNLOCK (pad);
486   }
487
488   if (!have_buffer && !have_event_or_query)
489     goto pad_not_ready;
490
491   if (have_buffer)
492     self->priv->first_buffer = FALSE;
493
494   GST_OBJECT_UNLOCK (self);
495   GST_LOG_OBJECT (self, "pads are ready");
496   return TRUE;
497
498 no_sinkpads:
499   {
500     GST_LOG_OBJECT (self, "pads not ready: no sink pads");
501     GST_OBJECT_UNLOCK (self);
502     return FALSE;
503   }
504 pad_not_ready:
505   {
506     if (have_event_or_query)
507       GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet,"
508           " but waking up for serialized event");
509     else
510       GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
511     GST_OBJECT_UNLOCK (self);
512     return have_event_or_query;
513   }
514 }
515
516 static void
517 gst_aggregator_reset_flow_values (GstAggregator * self)
518 {
519   GST_OBJECT_LOCK (self);
520   self->priv->send_stream_start = TRUE;
521   self->priv->send_segment = TRUE;
522   gst_segment_init (&self->segment, GST_FORMAT_TIME);
523   self->priv->first_buffer = TRUE;
524   GST_OBJECT_UNLOCK (self);
525 }
526
527 static inline void
528 gst_aggregator_push_mandatory_events (GstAggregator * self)
529 {
530   GstAggregatorPrivate *priv = self->priv;
531   GstEvent *segment = NULL;
532   GstEvent *tags = NULL;
533
534   if (self->priv->send_stream_start) {
535     gchar s_id[32];
536
537     GST_INFO_OBJECT (self, "pushing stream start");
538     /* stream-start (FIXME: create id based on input ids) */
539     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
540     if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) {
541       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
542     }
543     self->priv->send_stream_start = FALSE;
544   }
545
546   if (self->priv->srccaps) {
547
548     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
549         self->priv->srccaps);
550     if (!gst_pad_push_event (self->srcpad,
551             gst_event_new_caps (self->priv->srccaps))) {
552       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
553     }
554     gst_caps_unref (self->priv->srccaps);
555     self->priv->srccaps = NULL;
556   }
557
558   GST_OBJECT_LOCK (self);
559   if (self->priv->send_segment && !self->priv->flush_seeking) {
560     segment = gst_event_new_segment (&self->segment);
561
562     if (!self->priv->seqnum)
563       self->priv->seqnum = gst_event_get_seqnum (segment);
564     else
565       gst_event_set_seqnum (segment, self->priv->seqnum);
566     self->priv->send_segment = FALSE;
567
568     GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
569   }
570
571   if (priv->tags && priv->tags_changed && !self->priv->flush_seeking) {
572     tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
573     priv->tags_changed = FALSE;
574   }
575   GST_OBJECT_UNLOCK (self);
576
577   if (segment)
578     gst_pad_push_event (self->srcpad, segment);
579   if (tags)
580     gst_pad_push_event (self->srcpad, tags);
581
582 }
583
584 /**
585  * gst_aggregator_set_src_caps:
586  * @self: The #GstAggregator
587  * @caps: The #GstCaps to set on the src pad.
588  *
589  * Sets the caps to be used on the src pad.
590  */
591 void
592 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
593 {
594   GST_PAD_STREAM_LOCK (self->srcpad);
595   gst_caps_replace (&self->priv->srccaps, caps);
596   gst_aggregator_push_mandatory_events (self);
597   GST_PAD_STREAM_UNLOCK (self->srcpad);
598 }
599
600 /**
601  * gst_aggregator_finish_buffer:
602  * @self: The #GstAggregator
603  * @buffer: (transfer full): the #GstBuffer to push.
604  *
605  * This method will push the provided output buffer downstream. If needed,
606  * mandatory events such as stream-start, caps, and segment events will be
607  * sent before pushing the buffer.
608  */
609 GstFlowReturn
610 gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
611 {
612   gst_aggregator_push_mandatory_events (self);
613
614   GST_OBJECT_LOCK (self);
615   if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) {
616     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
617     GST_OBJECT_UNLOCK (self);
618     return gst_pad_push (self->srcpad, buffer);
619   } else {
620     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
621         self->priv->flush_seeking, gst_pad_is_active (self->srcpad));
622     GST_OBJECT_UNLOCK (self);
623     gst_buffer_unref (buffer);
624     return GST_FLOW_OK;
625   }
626 }
627
628 static void
629 gst_aggregator_push_eos (GstAggregator * self)
630 {
631   GstEvent *event;
632   gst_aggregator_push_mandatory_events (self);
633
634   event = gst_event_new_eos ();
635
636   GST_OBJECT_LOCK (self);
637   self->priv->send_eos = FALSE;
638   gst_event_set_seqnum (event, self->priv->seqnum);
639   GST_OBJECT_UNLOCK (self);
640
641   gst_pad_push_event (self->srcpad, event);
642 }
643
644 static GstClockTime
645 gst_aggregator_get_next_time (GstAggregator * self)
646 {
647   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
648
649   if (klass->get_next_time)
650     return klass->get_next_time (self);
651
652   return GST_CLOCK_TIME_NONE;
653 }
654
655 static gboolean
656 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
657 {
658   GstClockTime latency;
659   GstClockTime start;
660   gboolean res;
661
662   *timeout = FALSE;
663
664   SRC_LOCK (self);
665
666   latency = gst_aggregator_get_latency_unlocked (self);
667
668   if (gst_aggregator_check_pads_ready (self)) {
669     GST_DEBUG_OBJECT (self, "all pads have data");
670     SRC_UNLOCK (self);
671
672     return TRUE;
673   }
674
675   /* Before waiting, check if we're actually still running */
676   if (!self->priv->running || !self->priv->send_eos) {
677     SRC_UNLOCK (self);
678
679     return FALSE;
680   }
681
682   start = gst_aggregator_get_next_time (self);
683
684   /* If we're not live, or if we use the running time
685    * of the first buffer as start time, we wait until
686    * all pads have buffers.
687    * Otherwise (i.e. if we are live!), we wait on the clock
688    * and if a pad does not have a buffer in time we ignore
689    * that pad.
690    */
691   GST_OBJECT_LOCK (self);
692   if (!GST_CLOCK_TIME_IS_VALID (latency) ||
693       !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) ||
694       !GST_CLOCK_TIME_IS_VALID (start) ||
695       (self->priv->first_buffer
696           && self->priv->start_time_selection ==
697           GST_AGGREGATOR_START_TIME_SELECTION_FIRST)) {
698     /* We wake up here when something happened, and below
699      * then check if we're ready now. If we return FALSE,
700      * we will be directly called again.
701      */
702     GST_OBJECT_UNLOCK (self);
703     SRC_WAIT (self);
704   } else {
705     GstClockTime base_time, time;
706     GstClock *clock;
707     GstClockReturn status;
708     GstClockTimeDiff jitter;
709
710     GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
711         GST_TIME_ARGS (start));
712
713     base_time = GST_ELEMENT_CAST (self)->base_time;
714     clock = gst_object_ref (GST_ELEMENT_CLOCK (self));
715     GST_OBJECT_UNLOCK (self);
716
717     time = base_time + start;
718     time += latency;
719
720     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
721         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
722         " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")",
723         GST_TIME_ARGS (time),
724         GST_TIME_ARGS (base_time),
725         GST_TIME_ARGS (start), GST_TIME_ARGS (latency),
726         GST_TIME_ARGS (gst_clock_get_time (clock)));
727
728     self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
729     gst_object_unref (clock);
730     SRC_UNLOCK (self);
731
732     jitter = 0;
733     status = gst_clock_id_wait (self->priv->aggregate_id, &jitter);
734
735     SRC_LOCK (self);
736     if (self->priv->aggregate_id) {
737       gst_clock_id_unref (self->priv->aggregate_id);
738       self->priv->aggregate_id = NULL;
739     }
740
741     GST_DEBUG_OBJECT (self,
742         "clock returned %d (jitter: %" GST_STIME_FORMAT ")",
743         status, GST_STIME_ARGS (jitter));
744
745     /* we timed out */
746     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
747       SRC_UNLOCK (self);
748       *timeout = TRUE;
749       return TRUE;
750     }
751   }
752
753   res = gst_aggregator_check_pads_ready (self);
754   SRC_UNLOCK (self);
755
756   return res;
757 }
758
759 static gboolean
760 gst_aggregator_do_events_and_queries (GstAggregator * self,
761     GstAggregatorPad * pad, gpointer user_data)
762 {
763   GstEvent *event = NULL;
764   GstQuery *query = NULL;
765   GstAggregatorClass *klass = NULL;
766   gboolean *processed_event = user_data;
767
768   do {
769     event = NULL;
770     query = NULL;
771
772     PAD_LOCK (pad);
773     if (pad->priv->num_buffers == 0 && pad->priv->pending_eos) {
774       pad->priv->pending_eos = FALSE;
775       pad->priv->eos = TRUE;
776     }
777     if (pad->priv->clipped_buffer == NULL &&
778         !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
779       if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data)))
780         event = gst_event_ref (g_queue_peek_tail (&pad->priv->data));
781       if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))
782         query = g_queue_peek_tail (&pad->priv->data);
783     }
784     PAD_UNLOCK (pad);
785     if (event || query) {
786       gboolean ret;
787
788       if (processed_event)
789         *processed_event = TRUE;
790       if (klass == NULL)
791         klass = GST_AGGREGATOR_GET_CLASS (self);
792
793       if (event) {
794         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
795         gst_event_ref (event);
796         ret = klass->sink_event (self, pad, event);
797
798         PAD_LOCK (pad);
799         if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS)
800           pad->priv->negotiated = ret;
801         if (g_queue_peek_tail (&pad->priv->data) == event)
802           gst_event_unref (g_queue_pop_tail (&pad->priv->data));
803         gst_event_unref (event);
804       } else if (query) {
805         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query);
806         ret = klass->sink_query (self, pad, query);
807
808         PAD_LOCK (pad);
809         if (g_queue_peek_tail (&pad->priv->data) == query) {
810           GstStructure *s;
811
812           s = gst_query_writable_structure (query);
813           gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret,
814               NULL);
815           g_queue_pop_tail (&pad->priv->data);
816         }
817       }
818
819       PAD_BROADCAST_EVENT (pad);
820       PAD_UNLOCK (pad);
821     }
822   } while (event || query);
823
824   return TRUE;
825 }
826
827 static void
828 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
829     GstFlowReturn flow_return, gboolean full)
830 {
831   GList *item;
832
833   PAD_LOCK (aggpad);
834   if (flow_return == GST_FLOW_NOT_LINKED)
835     aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
836   else
837     aggpad->priv->flow_return = flow_return;
838
839   item = g_queue_peek_head_link (&aggpad->priv->data);
840   while (item) {
841     GList *next = item->next;
842
843     /* In partial flush, we do like the pad, we get rid of non-sticky events
844      * and EOS/SEGMENT.
845      */
846     if (full || GST_IS_BUFFER (item->data) ||
847         GST_EVENT_TYPE (item->data) == GST_EVENT_EOS ||
848         GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT ||
849         !GST_EVENT_IS_STICKY (item->data)) {
850       if (!GST_IS_QUERY (item->data))
851         gst_mini_object_unref (item->data);
852       g_queue_delete_link (&aggpad->priv->data, item);
853     }
854     item = next;
855   }
856   aggpad->priv->num_buffers = 0;
857   gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
858
859   PAD_BROADCAST_EVENT (aggpad);
860   PAD_UNLOCK (aggpad);
861 }
862
863 static GstFlowReturn
864 gst_aggregator_default_update_src_caps (GstAggregator * agg, GstCaps * caps,
865     GstCaps ** ret)
866 {
867   *ret = gst_caps_ref (caps);
868
869   return GST_FLOW_OK;
870 }
871
872 static GstCaps *
873 gst_aggregator_default_fixate_src_caps (GstAggregator * agg, GstCaps * caps)
874 {
875   caps = gst_caps_fixate (caps);
876
877   return caps;
878 }
879
880 static gboolean
881 gst_aggregator_default_negotiated_src_caps (GstAggregator * agg, GstCaps * caps)
882 {
883   return TRUE;
884 }
885
886
887 /* takes ownership of the pool, allocator and query */
888 static gboolean
889 gst_aggregator_set_allocation (GstAggregator * self,
890     GstBufferPool * pool, GstAllocator * allocator,
891     GstAllocationParams * params, GstQuery * query)
892 {
893   GstAllocator *oldalloc;
894   GstBufferPool *oldpool;
895   GstQuery *oldquery;
896
897   GST_DEBUG ("storing allocation query");
898
899   GST_OBJECT_LOCK (self);
900   oldpool = self->priv->pool;
901   self->priv->pool = pool;
902
903   oldalloc = self->priv->allocator;
904   self->priv->allocator = allocator;
905
906   oldquery = self->priv->allocation_query;
907   self->priv->allocation_query = query;
908
909   if (params)
910     self->priv->allocation_params = *params;
911   else
912     gst_allocation_params_init (&self->priv->allocation_params);
913   GST_OBJECT_UNLOCK (self);
914
915   if (oldpool) {
916     GST_DEBUG_OBJECT (self, "deactivating old pool %p", oldpool);
917     gst_buffer_pool_set_active (oldpool, FALSE);
918     gst_object_unref (oldpool);
919   }
920   if (oldalloc) {
921     gst_object_unref (oldalloc);
922   }
923   if (oldquery) {
924     gst_query_unref (oldquery);
925   }
926   return TRUE;
927 }
928
929
930 static gboolean
931 gst_aggregator_decide_allocation (GstAggregator * self, GstQuery * query)
932 {
933   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
934
935   if (aggclass->decide_allocation)
936     if (!aggclass->decide_allocation (self, query))
937       return FALSE;
938
939   return TRUE;
940 }
941
942 static gboolean
943 gst_aggregator_do_allocation (GstAggregator * self, GstCaps * caps)
944 {
945   GstQuery *query;
946   gboolean result = TRUE;
947   GstBufferPool *pool = NULL;
948   GstAllocator *allocator;
949   GstAllocationParams params;
950
951   /* find a pool for the negotiated caps now */
952   GST_DEBUG_OBJECT (self, "doing allocation query");
953   query = gst_query_new_allocation (caps, TRUE);
954   if (!gst_pad_peer_query (self->srcpad, query)) {
955     /* not a problem, just debug a little */
956     GST_DEBUG_OBJECT (self, "peer ALLOCATION query failed");
957   }
958
959   GST_DEBUG_OBJECT (self, "calling decide_allocation");
960   result = gst_aggregator_decide_allocation (self, query);
961
962   GST_DEBUG_OBJECT (self, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
963       query);
964
965   if (!result)
966     goto no_decide_allocation;
967
968   /* we got configuration from our peer or the decide_allocation method,
969    * parse them */
970   if (gst_query_get_n_allocation_params (query) > 0) {
971     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
972   } else {
973     allocator = NULL;
974     gst_allocation_params_init (&params);
975   }
976
977   if (gst_query_get_n_allocation_pools (query) > 0)
978     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
979
980   /* now store */
981   result =
982       gst_aggregator_set_allocation (self, pool, allocator, &params, query);
983
984   return result;
985
986   /* Errors */
987 no_decide_allocation:
988   {
989     GST_WARNING_OBJECT (self, "Failed to decide allocation");
990     gst_query_unref (query);
991
992     return result;
993   }
994
995 }
996
997 /* WITH SRC_LOCK held */
998 static GstFlowReturn
999 gst_aggregator_update_src_caps (GstAggregator * self)
1000 {
1001   GstAggregatorClass *agg_klass = GST_AGGREGATOR_GET_CLASS (self);
1002   GstCaps *downstream_caps, *template_caps, *caps = NULL;
1003   GstFlowReturn ret = GST_FLOW_OK;
1004
1005   template_caps = gst_pad_get_pad_template_caps (self->srcpad);
1006   downstream_caps = gst_pad_peer_query_caps (self->srcpad, template_caps);
1007
1008   if (gst_caps_is_empty (downstream_caps)) {
1009     GST_INFO_OBJECT (self, "Downstream caps (%"
1010         GST_PTR_FORMAT ") not compatible with pad template caps (%"
1011         GST_PTR_FORMAT ")", downstream_caps, template_caps);
1012     ret = GST_FLOW_NOT_NEGOTIATED;
1013     goto done;
1014   }
1015
1016   g_assert (agg_klass->update_src_caps);
1017   GST_DEBUG_OBJECT (self, "updating caps from %" GST_PTR_FORMAT,
1018       downstream_caps);
1019   ret = agg_klass->update_src_caps (self, downstream_caps, &caps);
1020   if (ret < GST_FLOW_OK) {
1021     GST_WARNING_OBJECT (self, "Subclass failed to update provided caps");
1022     goto done;
1023   }
1024   if ((caps == NULL || gst_caps_is_empty (caps)) && ret >= GST_FLOW_OK) {
1025     ret = GST_FLOW_NOT_NEGOTIATED;
1026     goto done;
1027   }
1028   GST_DEBUG_OBJECT (self, "               to %" GST_PTR_FORMAT, caps);
1029
1030 #ifdef GST_ENABLE_EXTRA_CHECKS
1031   if (!gst_caps_is_subset (caps, template_caps)) {
1032     GstCaps *intersection;
1033
1034     GST_ERROR_OBJECT (self,
1035         "update_src_caps returned caps %" GST_PTR_FORMAT
1036         " which are not a real subset of the template caps %"
1037         GST_PTR_FORMAT, caps, template_caps);
1038     g_warning ("%s: update_src_caps returned caps which are not a real "
1039         "subset of the filter caps", GST_ELEMENT_NAME (self));
1040
1041     intersection =
1042         gst_caps_intersect_full (template_caps, caps, GST_CAPS_INTERSECT_FIRST);
1043     gst_caps_unref (caps);
1044     caps = intersection;
1045   }
1046 #endif
1047
1048   if (gst_caps_is_any (caps)) {
1049     goto done;
1050   }
1051
1052   if (!gst_caps_is_fixed (caps)) {
1053     g_assert (agg_klass->fixate_src_caps);
1054
1055     GST_DEBUG_OBJECT (self, "fixate caps from %" GST_PTR_FORMAT, caps);
1056     if (!(caps = agg_klass->fixate_src_caps (self, caps))) {
1057       GST_WARNING_OBJECT (self, "Subclass failed to fixate provided caps");
1058       ret = GST_FLOW_NOT_NEGOTIATED;
1059       goto done;
1060     }
1061     GST_DEBUG_OBJECT (self, "             to %" GST_PTR_FORMAT, caps);
1062   }
1063
1064   if (agg_klass->negotiated_src_caps) {
1065     if (!agg_klass->negotiated_src_caps (self, caps)) {
1066       GST_WARNING_OBJECT (self, "Subclass failed to accept negotiated caps");
1067       ret = GST_FLOW_NOT_NEGOTIATED;
1068       goto done;
1069     }
1070   }
1071
1072   gst_aggregator_set_src_caps (self, caps);
1073
1074   if (!gst_aggregator_do_allocation (self, caps)) {
1075     GST_WARNING_OBJECT (self, "Allocation negotiation failed");
1076     ret = GST_FLOW_NOT_NEGOTIATED;
1077   }
1078
1079 done:
1080   gst_caps_unref (downstream_caps);
1081   gst_caps_unref (template_caps);
1082
1083   if (caps)
1084     gst_caps_unref (caps);
1085
1086   return ret;
1087 }
1088
1089 static void
1090 gst_aggregator_aggregate_func (GstAggregator * self)
1091 {
1092   GstAggregatorPrivate *priv = self->priv;
1093   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1094   gboolean timeout = FALSE;
1095
1096   if (self->priv->running == FALSE) {
1097     GST_DEBUG_OBJECT (self, "Not running anymore");
1098     return;
1099   }
1100
1101   GST_LOG_OBJECT (self, "Checking aggregate");
1102   while (priv->send_eos && priv->running) {
1103     GstFlowReturn flow_return = GST_FLOW_OK;
1104     gboolean processed_event = FALSE;
1105
1106     gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries,
1107         NULL);
1108
1109     if (!gst_aggregator_wait_and_check (self, &timeout))
1110       continue;
1111
1112     gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries,
1113         &processed_event);
1114     if (processed_event)
1115       continue;
1116
1117     if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
1118       flow_return = gst_aggregator_update_src_caps (self);
1119       if (flow_return != GST_FLOW_OK)
1120         gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self));
1121     }
1122
1123     if (timeout || flow_return >= GST_FLOW_OK) {
1124       GST_TRACE_OBJECT (self, "Actually aggregating!");
1125       flow_return = klass->aggregate (self, timeout);
1126     }
1127
1128     if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA)
1129       continue;
1130
1131     GST_OBJECT_LOCK (self);
1132     if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) {
1133       /* We don't want to set the pads to flushing, but we want to
1134        * stop the thread, so just break here */
1135       GST_OBJECT_UNLOCK (self);
1136       break;
1137     }
1138     GST_OBJECT_UNLOCK (self);
1139
1140     if (flow_return == GST_FLOW_EOS || flow_return == GST_FLOW_ERROR) {
1141       gst_aggregator_push_eos (self);
1142     }
1143
1144     GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
1145
1146     if (flow_return != GST_FLOW_OK) {
1147       GList *item;
1148
1149       GST_OBJECT_LOCK (self);
1150       for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
1151         GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
1152
1153         gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE);
1154       }
1155       GST_OBJECT_UNLOCK (self);
1156       break;
1157     }
1158   }
1159
1160   /* Pause the task here, the only ways to get here are:
1161    * 1) We're stopping, in which case the task is stopped anyway
1162    * 2) We got a flow error above, in which case it might take
1163    *    some time to forward the flow return upstream and we
1164    *    would otherwise call the task function over and over
1165    *    again without doing anything
1166    */
1167   gst_pad_pause_task (self->srcpad);
1168 }
1169
1170 static gboolean
1171 gst_aggregator_start (GstAggregator * self)
1172 {
1173   GstAggregatorClass *klass;
1174   gboolean result;
1175
1176   self->priv->send_stream_start = TRUE;
1177   self->priv->send_segment = TRUE;
1178   self->priv->send_eos = TRUE;
1179   self->priv->srccaps = NULL;
1180
1181   gst_aggregator_set_allocation (self, NULL, NULL, NULL, NULL);
1182
1183   klass = GST_AGGREGATOR_GET_CLASS (self);
1184
1185   if (klass->start)
1186     result = klass->start (self);
1187   else
1188     result = TRUE;
1189
1190   return result;
1191 }
1192
1193 static gboolean
1194 _check_pending_flush_stop (GstAggregatorPad * pad)
1195 {
1196   gboolean res;
1197
1198   PAD_LOCK (pad);
1199   res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
1200   PAD_UNLOCK (pad);
1201
1202   return res;
1203 }
1204
1205 static gboolean
1206 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
1207 {
1208   gboolean res = TRUE;
1209
1210   GST_INFO_OBJECT (self, "%s srcpad task",
1211       flush_start ? "Pausing" : "Stopping");
1212
1213   SRC_LOCK (self);
1214   self->priv->running = FALSE;
1215   SRC_BROADCAST (self);
1216   SRC_UNLOCK (self);
1217
1218   if (flush_start) {
1219     res = gst_pad_push_event (self->srcpad, flush_start);
1220   }
1221
1222   gst_pad_stop_task (self->srcpad);
1223
1224   return res;
1225 }
1226
1227 static void
1228 gst_aggregator_start_srcpad_task (GstAggregator * self)
1229 {
1230   GST_INFO_OBJECT (self, "Starting srcpad task");
1231
1232   self->priv->running = TRUE;
1233   gst_pad_start_task (GST_PAD (self->srcpad),
1234       (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL);
1235 }
1236
1237 static GstFlowReturn
1238 gst_aggregator_flush (GstAggregator * self)
1239 {
1240   GstFlowReturn ret = GST_FLOW_OK;
1241   GstAggregatorPrivate *priv = self->priv;
1242   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1243
1244   GST_DEBUG_OBJECT (self, "Flushing everything");
1245   GST_OBJECT_LOCK (self);
1246   priv->send_segment = TRUE;
1247   priv->flush_seeking = FALSE;
1248   priv->tags_changed = FALSE;
1249   GST_OBJECT_UNLOCK (self);
1250   if (klass->flush)
1251     ret = klass->flush (self);
1252
1253   return ret;
1254 }
1255
1256
1257 /* Called with GstAggregator's object lock held */
1258
1259 static gboolean
1260 gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
1261 {
1262   GList *tmp;
1263   GstAggregatorPad *tmppad;
1264
1265   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
1266     tmppad = (GstAggregatorPad *) tmp->data;
1267
1268     if (_check_pending_flush_stop (tmppad) == FALSE) {
1269       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
1270           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
1271       return FALSE;
1272     }
1273   }
1274
1275   return TRUE;
1276 }
1277
1278 static void
1279 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
1280     GstEvent * event)
1281 {
1282   GstAggregatorPrivate *priv = self->priv;
1283   GstAggregatorPadPrivate *padpriv = aggpad->priv;
1284
1285   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE);
1286
1287   PAD_FLUSH_LOCK (aggpad);
1288   PAD_LOCK (aggpad);
1289   if (padpriv->pending_flush_start) {
1290     GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
1291
1292     padpriv->pending_flush_start = FALSE;
1293     padpriv->pending_flush_stop = TRUE;
1294   }
1295   PAD_UNLOCK (aggpad);
1296
1297   GST_OBJECT_LOCK (self);
1298   if (priv->flush_seeking) {
1299     /* If flush_seeking we forward the first FLUSH_START */
1300     if (priv->pending_flush_start) {
1301       priv->pending_flush_start = FALSE;
1302       GST_OBJECT_UNLOCK (self);
1303
1304       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
1305       gst_aggregator_stop_srcpad_task (self, event);
1306
1307       GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
1308       GST_PAD_STREAM_LOCK (self->srcpad);
1309       GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
1310       event = NULL;
1311     } else {
1312       GST_OBJECT_UNLOCK (self);
1313       gst_event_unref (event);
1314     }
1315   } else {
1316     GST_OBJECT_UNLOCK (self);
1317     gst_event_unref (event);
1318   }
1319   PAD_FLUSH_UNLOCK (aggpad);
1320 }
1321
1322 /* Must be called with the the PAD_LOCK held */
1323 static void
1324 update_time_level (GstAggregatorPad * aggpad, gboolean head)
1325 {
1326   if (head) {
1327     if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->head_position) &&
1328         aggpad->priv->head_segment.format == GST_FORMAT_TIME)
1329       aggpad->priv->head_time =
1330           gst_segment_to_running_time (&aggpad->priv->head_segment,
1331           GST_FORMAT_TIME, aggpad->priv->head_position);
1332     else
1333       aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
1334
1335     if (!GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_time))
1336       aggpad->priv->tail_time = aggpad->priv->head_time;
1337   } else {
1338     if (GST_CLOCK_TIME_IS_VALID (aggpad->priv->tail_position) &&
1339         aggpad->segment.format == GST_FORMAT_TIME)
1340       aggpad->priv->tail_time =
1341           gst_segment_to_running_time (&aggpad->segment,
1342           GST_FORMAT_TIME, aggpad->priv->tail_position);
1343     else
1344       aggpad->priv->tail_time = aggpad->priv->head_time;
1345   }
1346
1347   if (aggpad->priv->head_time == GST_CLOCK_TIME_NONE ||
1348       aggpad->priv->tail_time == GST_CLOCK_TIME_NONE) {
1349     aggpad->priv->time_level = 0;
1350     return;
1351   }
1352
1353   if (aggpad->priv->tail_time > aggpad->priv->head_time)
1354     aggpad->priv->time_level = 0;
1355   else
1356     aggpad->priv->time_level = aggpad->priv->head_time -
1357         aggpad->priv->tail_time;
1358 }
1359
1360
1361 /* GstAggregator vmethods default implementations */
1362 static gboolean
1363 gst_aggregator_default_sink_event (GstAggregator * self,
1364     GstAggregatorPad * aggpad, GstEvent * event)
1365 {
1366   gboolean res = TRUE;
1367   GstPad *pad = GST_PAD (aggpad);
1368   GstAggregatorPrivate *priv = self->priv;
1369
1370   GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event);
1371
1372   switch (GST_EVENT_TYPE (event)) {
1373     case GST_EVENT_FLUSH_START:
1374     {
1375       gst_aggregator_flush_start (self, aggpad, event);
1376       /* We forward only in one case: right after flush_seeking */
1377       event = NULL;
1378       goto eat;
1379     }
1380     case GST_EVENT_FLUSH_STOP:
1381     {
1382       gst_aggregator_pad_flush (aggpad, self);
1383       GST_OBJECT_LOCK (self);
1384       if (priv->flush_seeking) {
1385         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
1386         if (gst_aggregator_all_flush_stop_received_locked (self)) {
1387           GST_OBJECT_UNLOCK (self);
1388           /* That means we received FLUSH_STOP/FLUSH_STOP on
1389            * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
1390           gst_aggregator_flush (self);
1391           gst_pad_push_event (self->srcpad, event);
1392           event = NULL;
1393           SRC_LOCK (self);
1394           priv->send_eos = TRUE;
1395           SRC_BROADCAST (self);
1396           SRC_UNLOCK (self);
1397
1398           GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
1399           GST_PAD_STREAM_UNLOCK (self->srcpad);
1400           gst_aggregator_start_srcpad_task (self);
1401         } else {
1402           GST_OBJECT_UNLOCK (self);
1403         }
1404       } else {
1405         GST_OBJECT_UNLOCK (self);
1406       }
1407
1408       /* We never forward the event */
1409       goto eat;
1410     }
1411     case GST_EVENT_EOS:
1412     {
1413       /* We still have a buffer, and we don't want the subclass to have to
1414        * check for it. Mark pending_eos, eos will be set when steal_buffer is
1415        * called
1416        */
1417       SRC_LOCK (self);
1418       PAD_LOCK (aggpad);
1419       if (aggpad->priv->num_buffers == 0) {
1420         aggpad->priv->eos = TRUE;
1421       } else {
1422         aggpad->priv->pending_eos = TRUE;
1423       }
1424       PAD_UNLOCK (aggpad);
1425
1426       SRC_BROADCAST (self);
1427       SRC_UNLOCK (self);
1428       goto eat;
1429     }
1430     case GST_EVENT_SEGMENT:
1431     {
1432       PAD_LOCK (aggpad);
1433       GST_OBJECT_LOCK (aggpad);
1434       gst_event_copy_segment (event, &aggpad->segment);
1435       /* We've got a new segment, tail_position is now meaningless
1436        * and may interfere with the time_level calculation
1437        */
1438       aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
1439       update_time_level (aggpad, FALSE);
1440       GST_OBJECT_UNLOCK (aggpad);
1441       PAD_UNLOCK (aggpad);
1442
1443       GST_OBJECT_LOCK (self);
1444       self->priv->seqnum = gst_event_get_seqnum (event);
1445       GST_OBJECT_UNLOCK (self);
1446       goto eat;
1447     }
1448     case GST_EVENT_STREAM_START:
1449     {
1450       goto eat;
1451     }
1452     case GST_EVENT_GAP:
1453     {
1454       GstClockTime pts, endpts;
1455       GstClockTime duration;
1456       GstBuffer *gapbuf;
1457
1458       gst_event_parse_gap (event, &pts, &duration);
1459       gapbuf = gst_buffer_new ();
1460
1461       if (GST_CLOCK_TIME_IS_VALID (duration))
1462         endpts = pts + duration;
1463       else
1464         endpts = GST_CLOCK_TIME_NONE;
1465
1466       GST_OBJECT_LOCK (aggpad);
1467       res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts,
1468           &pts, &endpts);
1469       GST_OBJECT_UNLOCK (aggpad);
1470
1471       if (!res) {
1472         GST_WARNING_OBJECT (self, "GAP event outside segment, dropping");
1473         goto eat;
1474       }
1475
1476       if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts))
1477         duration = endpts - pts;
1478       else
1479         duration = GST_CLOCK_TIME_NONE;
1480
1481       GST_BUFFER_PTS (gapbuf) = pts;
1482       GST_BUFFER_DURATION (gapbuf) = duration;
1483       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
1484       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
1485
1486       /* Remove GAP event so we can replace it with the buffer */
1487       if (g_queue_peek_tail (&aggpad->priv->data) == event)
1488         gst_event_unref (g_queue_pop_tail (&aggpad->priv->data));
1489
1490       if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
1491           GST_FLOW_OK) {
1492         GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
1493         res = FALSE;
1494       }
1495
1496       goto eat;
1497     }
1498     case GST_EVENT_TAG:
1499     {
1500       GstTagList *tags;
1501
1502       gst_event_parse_tag (event, &tags);
1503
1504       if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) {
1505         gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE);
1506         gst_event_unref (event);
1507         event = NULL;
1508         goto eat;
1509       }
1510       break;
1511     }
1512     default:
1513     {
1514       break;
1515     }
1516   }
1517
1518   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
1519   return gst_pad_event_default (pad, GST_OBJECT (self), event);
1520
1521 eat:
1522   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
1523   if (event)
1524     gst_event_unref (event);
1525
1526   return res;
1527 }
1528
1529 static inline gboolean
1530 gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad,
1531     gpointer unused_udata)
1532 {
1533   gst_aggregator_pad_flush (pad, self);
1534
1535   PAD_LOCK (pad);
1536   pad->priv->flow_return = GST_FLOW_FLUSHING;
1537   pad->priv->negotiated = FALSE;
1538   PAD_BROADCAST_EVENT (pad);
1539   PAD_UNLOCK (pad);
1540
1541   return TRUE;
1542 }
1543
1544 static gboolean
1545 gst_aggregator_stop (GstAggregator * agg)
1546 {
1547   GstAggregatorClass *klass;
1548   gboolean result;
1549
1550   gst_aggregator_reset_flow_values (agg);
1551
1552   gst_aggregator_iterate_sinkpads (agg, gst_aggregator_stop_pad, NULL);
1553
1554   klass = GST_AGGREGATOR_GET_CLASS (agg);
1555
1556   if (klass->stop)
1557     result = klass->stop (agg);
1558   else
1559     result = TRUE;
1560
1561   agg->priv->has_peer_latency = FALSE;
1562   agg->priv->peer_latency_live = FALSE;
1563   agg->priv->peer_latency_min = agg->priv->peer_latency_max = FALSE;
1564
1565   if (agg->priv->tags)
1566     gst_tag_list_unref (agg->priv->tags);
1567   agg->priv->tags = NULL;
1568
1569   gst_aggregator_set_allocation (agg, NULL, NULL, NULL, NULL);
1570
1571   return result;
1572 }
1573
1574 /* GstElement vmethods implementations */
1575 static GstStateChangeReturn
1576 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
1577 {
1578   GstStateChangeReturn ret;
1579   GstAggregator *self = GST_AGGREGATOR (element);
1580
1581   switch (transition) {
1582     case GST_STATE_CHANGE_READY_TO_PAUSED:
1583       if (!gst_aggregator_start (self))
1584         goto error_start;
1585       break;
1586     default:
1587       break;
1588   }
1589
1590   if ((ret =
1591           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
1592               transition)) == GST_STATE_CHANGE_FAILURE)
1593     goto failure;
1594
1595
1596   switch (transition) {
1597     case GST_STATE_CHANGE_PAUSED_TO_READY:
1598       if (!gst_aggregator_stop (self)) {
1599         /* What to do in this case? Error out? */
1600         GST_ERROR_OBJECT (self, "Subclass failed to stop.");
1601       }
1602       break;
1603     default:
1604       break;
1605   }
1606
1607   return ret;
1608
1609 /* ERRORS */
1610 failure:
1611   {
1612     GST_ERROR_OBJECT (element, "parent failed state change");
1613     return ret;
1614   }
1615 error_start:
1616   {
1617     GST_ERROR_OBJECT (element, "Subclass failed to start");
1618     return GST_STATE_CHANGE_FAILURE;
1619   }
1620 }
1621
1622 static void
1623 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
1624 {
1625   GstAggregator *self = GST_AGGREGATOR (element);
1626   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1627
1628   GST_INFO_OBJECT (pad, "Removing pad");
1629
1630   SRC_LOCK (self);
1631   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
1632   gst_element_remove_pad (element, pad);
1633
1634   self->priv->has_peer_latency = FALSE;
1635   SRC_BROADCAST (self);
1636   SRC_UNLOCK (self);
1637 }
1638
1639 static GstAggregatorPad *
1640 gst_aggregator_default_create_new_pad (GstAggregator * self,
1641     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1642 {
1643   GstAggregatorPad *agg_pad;
1644   GstAggregatorPrivate *priv = self->priv;
1645   gint serial = 0;
1646   gchar *name = NULL;
1647
1648   if (templ->direction != GST_PAD_SINK)
1649     goto not_sink;
1650
1651   if (templ->presence != GST_PAD_REQUEST)
1652     goto not_request;
1653
1654   GST_OBJECT_LOCK (self);
1655   if (req_name == NULL || strlen (req_name) < 6
1656       || !g_str_has_prefix (req_name, "sink_")) {
1657     /* no name given when requesting the pad, use next available int */
1658     serial = ++priv->max_padserial;
1659   } else {
1660     /* parse serial number from requested padname */
1661     serial = g_ascii_strtoull (&req_name[5], NULL, 10);
1662     if (serial > priv->max_padserial)
1663       priv->max_padserial = serial;
1664   }
1665
1666   name = g_strdup_printf ("sink_%u", serial);
1667   agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
1668       "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
1669   g_free (name);
1670
1671   GST_OBJECT_UNLOCK (self);
1672
1673   return agg_pad;
1674
1675   /* errors */
1676 not_sink:
1677   {
1678     GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad");
1679     return NULL;
1680   }
1681 not_request:
1682   {
1683     GST_WARNING_OBJECT (self, "request new pad that is not a REQUEST pad");
1684     return NULL;
1685   }
1686 }
1687
1688 static GstPad *
1689 gst_aggregator_request_new_pad (GstElement * element,
1690     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1691 {
1692   GstAggregator *self;
1693   GstAggregatorPad *agg_pad;
1694   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (element);
1695   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
1696
1697   self = GST_AGGREGATOR (element);
1698
1699   agg_pad = klass->create_new_pad (self, templ, req_name, caps);
1700   if (!agg_pad) {
1701     GST_ERROR_OBJECT (element, "Couldn't create new pad");
1702     return NULL;
1703   }
1704
1705   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
1706
1707   if (priv->running)
1708     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
1709
1710   /* add the pad to the element */
1711   gst_element_add_pad (element, GST_PAD (agg_pad));
1712
1713   return GST_PAD (agg_pad);
1714 }
1715
1716 /* Must be called with SRC_LOCK held */
1717
1718 static gboolean
1719 gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
1720 {
1721   gboolean query_ret, live;
1722   GstClockTime our_latency, min, max;
1723
1724   query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1725
1726   if (!query_ret) {
1727     GST_WARNING_OBJECT (self, "Latency query failed");
1728     return FALSE;
1729   }
1730
1731   gst_query_parse_latency (query, &live, &min, &max);
1732
1733   our_latency = self->priv->latency;
1734
1735   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
1736     GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
1737         ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
1738     return FALSE;
1739   }
1740
1741   if (min > max && GST_CLOCK_TIME_IS_VALID (max)) {
1742     GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL),
1743         ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
1744             GST_TIME_FORMAT ". Add queues or other buffering elements.",
1745             GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
1746     return FALSE;
1747   }
1748
1749   self->priv->peer_latency_live = live;
1750   self->priv->peer_latency_min = min;
1751   self->priv->peer_latency_max = max;
1752   self->priv->has_peer_latency = TRUE;
1753
1754   /* add our own */
1755   min += our_latency;
1756   min += self->priv->sub_latency_min;
1757   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1758       && GST_CLOCK_TIME_IS_VALID (max))
1759     max += self->priv->sub_latency_max + our_latency;
1760   else
1761     max = GST_CLOCK_TIME_NONE;
1762
1763   SRC_BROADCAST (self);
1764
1765   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
1766       " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
1767
1768   gst_query_set_latency (query, live, min, max);
1769
1770   return query_ret;
1771 }
1772
1773 /*
1774  * MUST be called with the src_lock held.
1775  *
1776  * See  gst_aggregator_get_latency() for doc
1777  */
1778 static GstClockTime
1779 gst_aggregator_get_latency_unlocked (GstAggregator * self)
1780 {
1781   GstClockTime latency;
1782
1783   g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0);
1784
1785   if (!self->priv->has_peer_latency) {
1786     GstQuery *query = gst_query_new_latency ();
1787     gboolean ret;
1788
1789     ret = gst_aggregator_query_latency_unlocked (self, query);
1790     gst_query_unref (query);
1791     if (!ret)
1792       return GST_CLOCK_TIME_NONE;
1793   }
1794
1795   if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
1796     return GST_CLOCK_TIME_NONE;
1797
1798   /* latency_min is never GST_CLOCK_TIME_NONE by construction */
1799   latency = self->priv->peer_latency_min;
1800
1801   /* add our own */
1802   latency += self->priv->latency;
1803   latency += self->priv->sub_latency_min;
1804
1805   return latency;
1806 }
1807
1808 /**
1809  * gst_aggregator_get_latency:
1810  * @self: a #GstAggregator
1811  *
1812  * Retrieves the latency values reported by @self in response to the latency
1813  * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element
1814  * will not wait for the clock.
1815  *
1816  * Typically only called by subclasses.
1817  *
1818  * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync
1819  */
1820 GstClockTime
1821 gst_aggregator_get_latency (GstAggregator * self)
1822 {
1823   GstClockTime ret;
1824
1825   SRC_LOCK (self);
1826   ret = gst_aggregator_get_latency_unlocked (self);
1827   SRC_UNLOCK (self);
1828
1829   return ret;
1830 }
1831
1832 static gboolean
1833 gst_aggregator_send_event (GstElement * element, GstEvent * event)
1834 {
1835   GstAggregator *self = GST_AGGREGATOR (element);
1836
1837   GST_STATE_LOCK (element);
1838   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
1839       GST_STATE (element) < GST_STATE_PAUSED) {
1840     gdouble rate;
1841     GstFormat fmt;
1842     GstSeekFlags flags;
1843     GstSeekType start_type, stop_type;
1844     gint64 start, stop;
1845
1846     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1847         &start, &stop_type, &stop);
1848
1849     GST_OBJECT_LOCK (self);
1850     gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1851         stop_type, stop, NULL);
1852     self->priv->seqnum = gst_event_get_seqnum (event);
1853     self->priv->first_buffer = FALSE;
1854     GST_OBJECT_UNLOCK (self);
1855
1856     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
1857   }
1858   GST_STATE_UNLOCK (element);
1859
1860
1861   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
1862       event);
1863 }
1864
1865 static gboolean
1866 gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
1867 {
1868   gboolean res = TRUE;
1869
1870   switch (GST_QUERY_TYPE (query)) {
1871     case GST_QUERY_SEEKING:
1872     {
1873       GstFormat format;
1874
1875       /* don't pass it along as some (file)sink might claim it does
1876        * whereas with a collectpads in between that will not likely work */
1877       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1878       gst_query_set_seeking (query, format, FALSE, 0, -1);
1879       res = TRUE;
1880
1881       break;
1882     }
1883     case GST_QUERY_LATENCY:
1884       SRC_LOCK (self);
1885       res = gst_aggregator_query_latency_unlocked (self, query);
1886       SRC_UNLOCK (self);
1887       break;
1888     default:
1889       return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1890   }
1891
1892   return res;
1893 }
1894
1895 static gboolean
1896 gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
1897 {
1898   EventData *evdata = user_data;
1899   gboolean ret = TRUE;
1900   GstPad *peer = gst_pad_get_peer (pad);
1901   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1902
1903   if (peer) {
1904     if (evdata->only_to_active_pads && aggpad->priv->first_buffer) {
1905       GST_DEBUG_OBJECT (pad, "not sending event to inactive pad");
1906       ret = TRUE;
1907     } else {
1908       ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
1909       GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
1910       gst_object_unref (peer);
1911     }
1912   }
1913
1914   if (ret == FALSE) {
1915     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
1916       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
1917
1918       GST_DEBUG_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
1919
1920       if (gst_pad_query (peer, seeking)) {
1921         gboolean seekable;
1922
1923         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
1924
1925         if (seekable == FALSE) {
1926           GST_INFO_OBJECT (pad,
1927               "Source not seekable, We failed but it does not matter!");
1928
1929           ret = TRUE;
1930         }
1931       } else {
1932         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
1933       }
1934
1935       gst_query_unref (seeking);
1936     }
1937
1938     if (evdata->flush) {
1939       PAD_LOCK (aggpad);
1940       aggpad->priv->pending_flush_start = FALSE;
1941       aggpad->priv->pending_flush_stop = FALSE;
1942       PAD_UNLOCK (aggpad);
1943     }
1944   } else {
1945     evdata->one_actually_seeked = TRUE;
1946   }
1947
1948   evdata->result &= ret;
1949
1950   /* Always send to all pads */
1951   return FALSE;
1952 }
1953
1954 static void
1955 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
1956     EventData * evdata)
1957 {
1958   evdata->result = TRUE;
1959   evdata->one_actually_seeked = FALSE;
1960
1961   /* We first need to set all pads as flushing in a first pass
1962    * as flush_start flush_stop is sometimes sent synchronously
1963    * while we send the seek event */
1964   if (evdata->flush) {
1965     GList *l;
1966
1967     GST_OBJECT_LOCK (self);
1968     for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
1969       GstAggregatorPad *pad = l->data;
1970
1971       PAD_LOCK (pad);
1972       pad->priv->pending_flush_start = TRUE;
1973       pad->priv->pending_flush_stop = FALSE;
1974       PAD_UNLOCK (pad);
1975     }
1976     GST_OBJECT_UNLOCK (self);
1977   }
1978
1979   gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata);
1980
1981   gst_event_unref (evdata->event);
1982 }
1983
1984 static gboolean
1985 gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
1986 {
1987   gdouble rate;
1988   GstFormat fmt;
1989   GstSeekFlags flags;
1990   GstSeekType start_type, stop_type;
1991   gint64 start, stop;
1992   gboolean flush;
1993   EventData evdata = { 0, };
1994   GstAggregatorPrivate *priv = self->priv;
1995
1996   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1997       &start, &stop_type, &stop);
1998
1999   GST_INFO_OBJECT (self, "starting SEEK");
2000
2001   flush = flags & GST_SEEK_FLAG_FLUSH;
2002
2003   GST_OBJECT_LOCK (self);
2004   if (flush) {
2005     priv->pending_flush_start = TRUE;
2006     priv->flush_seeking = TRUE;
2007   }
2008
2009   gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
2010       stop_type, stop, NULL);
2011
2012   /* Seeking sets a position */
2013   self->priv->first_buffer = FALSE;
2014   GST_OBJECT_UNLOCK (self);
2015
2016   /* forward the seek upstream */
2017   evdata.event = event;
2018   evdata.flush = flush;
2019   evdata.only_to_active_pads = FALSE;
2020   gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2021   event = NULL;
2022
2023   if (!evdata.result || !evdata.one_actually_seeked) {
2024     GST_OBJECT_LOCK (self);
2025     priv->flush_seeking = FALSE;
2026     priv->pending_flush_start = FALSE;
2027     GST_OBJECT_UNLOCK (self);
2028   }
2029
2030   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
2031
2032   return evdata.result;
2033 }
2034
2035 static gboolean
2036 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
2037 {
2038   EventData evdata = { 0, };
2039
2040   switch (GST_EVENT_TYPE (event)) {
2041     case GST_EVENT_SEEK:
2042       /* _do_seek() unrefs the event. */
2043       return gst_aggregator_do_seek (self, event);
2044     case GST_EVENT_NAVIGATION:
2045       /* navigation is rather pointless. */
2046       gst_event_unref (event);
2047       return FALSE;
2048     default:
2049       break;
2050   }
2051
2052   /* Don't forward QOS events to pads that had no active buffer yet. Otherwise
2053    * they will receive a QOS event that has earliest_time=0 (because we can't
2054    * have negative timestamps), and consider their buffer as too late */
2055   evdata.event = event;
2056   evdata.flush = FALSE;
2057   evdata.only_to_active_pads = GST_EVENT_TYPE (event) == GST_EVENT_QOS;
2058   gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2059   return evdata.result;
2060 }
2061
2062 static gboolean
2063 gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent,
2064     GstEvent * event)
2065 {
2066   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2067
2068   return klass->src_event (GST_AGGREGATOR (parent), event);
2069 }
2070
2071 static gboolean
2072 gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent,
2073     GstQuery * query)
2074 {
2075   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2076
2077   return klass->src_query (GST_AGGREGATOR (parent), query);
2078 }
2079
2080 static gboolean
2081 gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
2082     GstObject * parent, GstPadMode mode, gboolean active)
2083 {
2084   GstAggregator *self = GST_AGGREGATOR (parent);
2085   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2086
2087   if (klass->src_activate) {
2088     if (klass->src_activate (self, mode, active) == FALSE) {
2089       return FALSE;
2090     }
2091   }
2092
2093   if (active == TRUE) {
2094     switch (mode) {
2095       case GST_PAD_MODE_PUSH:
2096       {
2097         GST_INFO_OBJECT (pad, "Activating pad!");
2098         gst_aggregator_start_srcpad_task (self);
2099         return TRUE;
2100       }
2101       default:
2102       {
2103         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
2104         return FALSE;
2105       }
2106     }
2107   }
2108
2109   /* deactivating */
2110   GST_INFO_OBJECT (self, "Deactivating srcpad");
2111   gst_aggregator_stop_srcpad_task (self, FALSE);
2112
2113   return TRUE;
2114 }
2115
2116 static gboolean
2117 gst_aggregator_default_sink_query (GstAggregator * self,
2118     GstAggregatorPad * aggpad, GstQuery * query)
2119 {
2120   GstPad *pad = GST_PAD (aggpad);
2121
2122   if (GST_QUERY_TYPE (query) == GST_QUERY_ALLOCATION) {
2123     GstQuery *decide_query = NULL;
2124     GstAggregatorClass *agg_class;
2125     gboolean ret;
2126
2127     GST_OBJECT_LOCK (self);
2128     PAD_LOCK (aggpad);
2129     if (G_UNLIKELY (!aggpad->priv->negotiated)) {
2130       GST_DEBUG_OBJECT (self,
2131           "not negotiated yet, can't answer ALLOCATION query");
2132       PAD_UNLOCK (aggpad);
2133       GST_OBJECT_UNLOCK (self);
2134
2135       return FALSE;
2136     }
2137
2138     if ((decide_query = self->priv->allocation_query))
2139       gst_query_ref (decide_query);
2140     PAD_UNLOCK (aggpad);
2141     GST_OBJECT_UNLOCK (self);
2142
2143     GST_DEBUG_OBJECT (self,
2144         "calling propose allocation with query %" GST_PTR_FORMAT, decide_query);
2145
2146     agg_class = GST_AGGREGATOR_GET_CLASS (self);
2147
2148     /* pass the query to the propose_allocation vmethod if any */
2149     if (agg_class->propose_allocation)
2150       ret = agg_class->propose_allocation (self, aggpad, decide_query, query);
2151     else
2152       ret = FALSE;
2153
2154     if (decide_query)
2155       gst_query_unref (decide_query);
2156
2157     GST_DEBUG_OBJECT (self, "ALLOCATION ret %d, %" GST_PTR_FORMAT, ret, query);
2158     return ret;
2159   }
2160
2161   return gst_pad_query_default (pad, GST_OBJECT (self), query);
2162 }
2163
2164 static void
2165 gst_aggregator_finalize (GObject * object)
2166 {
2167   GstAggregator *self = (GstAggregator *) object;
2168
2169   g_mutex_clear (&self->priv->src_lock);
2170   g_cond_clear (&self->priv->src_cond);
2171
2172   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
2173 }
2174
2175 /*
2176  * gst_aggregator_set_latency_property:
2177  * @agg: a #GstAggregator
2178  * @latency: the new latency value (in nanoseconds).
2179  *
2180  * Sets the new latency value to @latency. This value is used to limit the
2181  * amount of time a pad waits for data to appear before considering the pad
2182  * as unresponsive.
2183  */
2184 static void
2185 gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
2186 {
2187   gboolean changed;
2188
2189   g_return_if_fail (GST_IS_AGGREGATOR (self));
2190   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
2191
2192   SRC_LOCK (self);
2193   changed = (self->priv->latency != latency);
2194
2195   if (changed) {
2196     GList *item;
2197
2198     GST_OBJECT_LOCK (self);
2199     /* First lock all the pads */
2200     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2201       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2202       PAD_LOCK (aggpad);
2203     }
2204
2205     self->priv->latency = latency;
2206
2207     SRC_BROADCAST (self);
2208
2209     /* Now wake up the pads */
2210     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2211       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2212       PAD_BROADCAST_EVENT (aggpad);
2213       PAD_UNLOCK (aggpad);
2214     }
2215     GST_OBJECT_UNLOCK (self);
2216   }
2217
2218   SRC_UNLOCK (self);
2219
2220   if (changed)
2221     gst_element_post_message (GST_ELEMENT_CAST (self),
2222         gst_message_new_latency (GST_OBJECT_CAST (self)));
2223 }
2224
2225 /*
2226  * gst_aggregator_get_latency_property:
2227  * @agg: a #GstAggregator
2228  *
2229  * Gets the latency value. See gst_aggregator_set_latency for
2230  * more details.
2231  *
2232  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad
2233  * before a pad is deemed unresponsive. A value of -1 means an
2234  * unlimited time.
2235  */
2236 static gint64
2237 gst_aggregator_get_latency_property (GstAggregator * agg)
2238 {
2239   gint64 res;
2240
2241   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
2242
2243   GST_OBJECT_LOCK (agg);
2244   res = agg->priv->latency;
2245   GST_OBJECT_UNLOCK (agg);
2246
2247   return res;
2248 }
2249
2250 static void
2251 gst_aggregator_set_property (GObject * object, guint prop_id,
2252     const GValue * value, GParamSpec * pspec)
2253 {
2254   GstAggregator *agg = GST_AGGREGATOR (object);
2255
2256   switch (prop_id) {
2257     case PROP_LATENCY:
2258       gst_aggregator_set_latency_property (agg, g_value_get_int64 (value));
2259       break;
2260     case PROP_START_TIME_SELECTION:
2261       agg->priv->start_time_selection = g_value_get_enum (value);
2262       break;
2263     case PROP_START_TIME:
2264       agg->priv->start_time = g_value_get_uint64 (value);
2265       break;
2266     default:
2267       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2268       break;
2269   }
2270 }
2271
2272 static void
2273 gst_aggregator_get_property (GObject * object, guint prop_id,
2274     GValue * value, GParamSpec * pspec)
2275 {
2276   GstAggregator *agg = GST_AGGREGATOR (object);
2277
2278   switch (prop_id) {
2279     case PROP_LATENCY:
2280       g_value_set_int64 (value, gst_aggregator_get_latency_property (agg));
2281       break;
2282     case PROP_START_TIME_SELECTION:
2283       g_value_set_enum (value, agg->priv->start_time_selection);
2284       break;
2285     case PROP_START_TIME:
2286       g_value_set_uint64 (value, agg->priv->start_time);
2287       break;
2288     default:
2289       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2290       break;
2291   }
2292 }
2293
2294 /* GObject vmethods implementations */
2295 static void
2296 gst_aggregator_class_init (GstAggregatorClass * klass)
2297 {
2298   GObjectClass *gobject_class = (GObjectClass *) klass;
2299   GstElementClass *gstelement_class = (GstElementClass *) klass;
2300
2301   aggregator_parent_class = g_type_class_peek_parent (klass);
2302   g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
2303
2304   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
2305       GST_DEBUG_FG_MAGENTA, "GstAggregator");
2306
2307   klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD;
2308
2309   klass->sink_event = gst_aggregator_default_sink_event;
2310   klass->sink_query = gst_aggregator_default_sink_query;
2311
2312   klass->src_event = gst_aggregator_default_src_event;
2313   klass->src_query = gst_aggregator_default_src_query;
2314
2315   klass->create_new_pad = gst_aggregator_default_create_new_pad;
2316   klass->update_src_caps = gst_aggregator_default_update_src_caps;
2317   klass->fixate_src_caps = gst_aggregator_default_fixate_src_caps;
2318   klass->negotiated_src_caps = gst_aggregator_default_negotiated_src_caps;
2319
2320   gstelement_class->request_new_pad =
2321       GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
2322   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
2323   gstelement_class->release_pad =
2324       GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
2325   gstelement_class->change_state =
2326       GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
2327
2328   gobject_class->set_property = gst_aggregator_set_property;
2329   gobject_class->get_property = gst_aggregator_get_property;
2330   gobject_class->finalize = gst_aggregator_finalize;
2331
2332   g_object_class_install_property (gobject_class, PROP_LATENCY,
2333       g_param_spec_int64 ("latency", "Buffer latency",
2334           "Additional latency in live mode to allow upstream "
2335           "to take longer to produce buffers for the current "
2336           "position (in nanoseconds)", 0,
2337           (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1),
2338           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2339
2340   g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION,
2341       g_param_spec_enum ("start-time-selection", "Start Time Selection",
2342           "Decides which start time is output",
2343           gst_aggregator_start_time_selection_get_type (),
2344           DEFAULT_START_TIME_SELECTION,
2345           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2346
2347   g_object_class_install_property (gobject_class, PROP_START_TIME,
2348       g_param_spec_uint64 ("start-time", "Start Time",
2349           "Start time to use if start-time-selection=set", 0,
2350           G_MAXUINT64,
2351           DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2352
2353   GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad);
2354   GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_do_events_and_queries);
2355 }
2356
2357 static void
2358 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
2359 {
2360   GstPadTemplate *pad_template;
2361   GstAggregatorPrivate *priv;
2362
2363   g_return_if_fail (klass->aggregate != NULL);
2364
2365   self->priv =
2366       G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
2367       GstAggregatorPrivate);
2368
2369   priv = self->priv;
2370
2371   pad_template =
2372       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
2373   g_return_if_fail (pad_template != NULL);
2374
2375   priv->max_padserial = -1;
2376   priv->tags_changed = FALSE;
2377
2378   self->priv->peer_latency_live = FALSE;
2379   self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
2380   self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
2381   self->priv->has_peer_latency = FALSE;
2382   gst_aggregator_reset_flow_values (self);
2383
2384   self->srcpad = gst_pad_new_from_template (pad_template, "src");
2385
2386   gst_pad_set_event_function (self->srcpad,
2387       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
2388   gst_pad_set_query_function (self->srcpad,
2389       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func));
2390   gst_pad_set_activatemode_function (self->srcpad,
2391       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func));
2392
2393   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
2394
2395   self->priv->latency = DEFAULT_LATENCY;
2396   self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION;
2397   self->priv->start_time = DEFAULT_START_TIME;
2398
2399   g_mutex_init (&self->priv->src_lock);
2400   g_cond_init (&self->priv->src_cond);
2401 }
2402
2403 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
2404  * method to get to the padtemplates */
2405 GType
2406 gst_aggregator_get_type (void)
2407 {
2408   static volatile gsize type = 0;
2409
2410   if (g_once_init_enter (&type)) {
2411     GType _type;
2412     static const GTypeInfo info = {
2413       sizeof (GstAggregatorClass),
2414       NULL,
2415       NULL,
2416       (GClassInitFunc) gst_aggregator_class_init,
2417       NULL,
2418       NULL,
2419       sizeof (GstAggregator),
2420       0,
2421       (GInstanceInitFunc) gst_aggregator_init,
2422     };
2423
2424     _type = g_type_register_static (GST_TYPE_ELEMENT,
2425         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
2426     g_once_init_leave (&type, _type);
2427   }
2428   return type;
2429 }
2430
2431 /* Must be called with SRC lock and PAD lock held */
2432 static gboolean
2433 gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
2434 {
2435   /* Empty queue always has space */
2436   if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL)
2437     return TRUE;
2438
2439   /* We also want at least two buffers, one is being processed and one is ready
2440    * for the next iteration when we operate in live mode. */
2441   if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2)
2442     return TRUE;
2443
2444   /* zero latency, if there is a buffer, it's full */
2445   if (self->priv->latency == 0)
2446     return FALSE;
2447
2448   /* Allow no more buffers than the latency */
2449   return (aggpad->priv->time_level <= self->priv->latency);
2450 }
2451
2452 /* Must be called with the PAD_LOCK held */
2453 static void
2454 apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
2455 {
2456   GstClockTime timestamp;
2457
2458   if (GST_BUFFER_DTS_IS_VALID (buffer))
2459     timestamp = GST_BUFFER_DTS (buffer);
2460   else
2461     timestamp = GST_BUFFER_PTS (buffer);
2462
2463   if (timestamp == GST_CLOCK_TIME_NONE) {
2464     if (head)
2465       timestamp = aggpad->priv->head_position;
2466     else
2467       timestamp = aggpad->priv->tail_position;
2468   }
2469
2470   /* add duration */
2471   if (GST_BUFFER_DURATION_IS_VALID (buffer))
2472     timestamp += GST_BUFFER_DURATION (buffer);
2473
2474   if (head)
2475     aggpad->priv->head_position = timestamp;
2476   else
2477     aggpad->priv->tail_position = timestamp;
2478
2479   update_time_level (aggpad, head);
2480 }
2481
2482 static GstFlowReturn
2483 gst_aggregator_pad_chain_internal (GstAggregator * self,
2484     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
2485 {
2486   GstFlowReturn flow_return;
2487   GstClockTime buf_pts;
2488
2489   GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
2490
2491   PAD_FLUSH_LOCK (aggpad);
2492
2493   PAD_LOCK (aggpad);
2494   flow_return = aggpad->priv->flow_return;
2495   if (flow_return != GST_FLOW_OK)
2496     goto flushing;
2497
2498   if (aggpad->priv->pending_eos == TRUE)
2499     goto eos;
2500
2501   PAD_UNLOCK (aggpad);
2502
2503   buf_pts = GST_BUFFER_PTS (buffer);
2504
2505   for (;;) {
2506     SRC_LOCK (self);
2507     GST_OBJECT_LOCK (self);
2508     PAD_LOCK (aggpad);
2509
2510     if (aggpad->priv->first_buffer) {
2511       self->priv->has_peer_latency = FALSE;
2512       aggpad->priv->first_buffer = FALSE;
2513     }
2514
2515     if (gst_aggregator_pad_has_space (self, aggpad)
2516         && aggpad->priv->flow_return == GST_FLOW_OK) {
2517       if (head)
2518         g_queue_push_head (&aggpad->priv->data, buffer);
2519       else
2520         g_queue_push_tail (&aggpad->priv->data, buffer);
2521       apply_buffer (aggpad, buffer, head);
2522       aggpad->priv->num_buffers++;
2523       buffer = NULL;
2524       SRC_BROADCAST (self);
2525       break;
2526     }
2527
2528     flow_return = aggpad->priv->flow_return;
2529     if (flow_return != GST_FLOW_OK) {
2530       GST_OBJECT_UNLOCK (self);
2531       SRC_UNLOCK (self);
2532       goto flushing;
2533     }
2534     GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2535     GST_OBJECT_UNLOCK (self);
2536     SRC_UNLOCK (self);
2537     PAD_WAIT_EVENT (aggpad);
2538
2539     PAD_UNLOCK (aggpad);
2540   }
2541
2542   if (self->priv->first_buffer) {
2543     GstClockTime start_time;
2544
2545     switch (self->priv->start_time_selection) {
2546       case GST_AGGREGATOR_START_TIME_SELECTION_ZERO:
2547       default:
2548         start_time = 0;
2549         break;
2550       case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
2551         GST_OBJECT_LOCK (aggpad);
2552         if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) {
2553           start_time = buf_pts;
2554           if (start_time != -1) {
2555             start_time = MAX (start_time, aggpad->priv->head_segment.start);
2556             start_time =
2557                 gst_segment_to_running_time (&aggpad->priv->head_segment,
2558                 GST_FORMAT_TIME, start_time);
2559           }
2560         } else {
2561           start_time = 0;
2562           GST_WARNING_OBJECT (aggpad,
2563               "Ignoring request of selecting the first start time "
2564               "as the segment is a %s segment instead of a time segment",
2565               gst_format_get_name (aggpad->segment.format));
2566         }
2567         GST_OBJECT_UNLOCK (aggpad);
2568         break;
2569       case GST_AGGREGATOR_START_TIME_SELECTION_SET:
2570         start_time = self->priv->start_time;
2571         if (start_time == -1)
2572           start_time = 0;
2573         break;
2574     }
2575
2576     if (start_time != -1) {
2577       if (self->segment.position == -1)
2578         self->segment.position = start_time;
2579       else
2580         self->segment.position = MIN (start_time, self->segment.position);
2581
2582       GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT,
2583           GST_TIME_ARGS (start_time));
2584     }
2585   }
2586
2587   PAD_UNLOCK (aggpad);
2588   GST_OBJECT_UNLOCK (self);
2589   SRC_UNLOCK (self);
2590
2591   PAD_FLUSH_UNLOCK (aggpad);
2592
2593   GST_DEBUG_OBJECT (aggpad, "Done chaining");
2594
2595   return flow_return;
2596
2597 flushing:
2598   PAD_UNLOCK (aggpad);
2599   PAD_FLUSH_UNLOCK (aggpad);
2600
2601   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
2602       gst_flow_get_name (flow_return));
2603   if (buffer)
2604     gst_buffer_unref (buffer);
2605
2606   return flow_return;
2607
2608 eos:
2609   PAD_UNLOCK (aggpad);
2610   PAD_FLUSH_UNLOCK (aggpad);
2611
2612   gst_buffer_unref (buffer);
2613   GST_DEBUG_OBJECT (aggpad, "We are EOS already...");
2614
2615   return GST_FLOW_EOS;
2616 }
2617
2618 static GstFlowReturn
2619 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
2620 {
2621   return gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
2622       GST_AGGREGATOR_PAD_CAST (pad), buffer, TRUE);
2623 }
2624
2625 static gboolean
2626 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
2627     GstQuery * query)
2628 {
2629   GstAggregator *self = GST_AGGREGATOR (parent);
2630   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2631   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2632
2633   if (GST_QUERY_IS_SERIALIZED (query)) {
2634     GstStructure *s;
2635     gboolean ret = FALSE;
2636
2637     SRC_LOCK (self);
2638     PAD_LOCK (aggpad);
2639
2640     if (aggpad->priv->flow_return != GST_FLOW_OK) {
2641       SRC_UNLOCK (self);
2642       goto flushing;
2643     }
2644
2645     g_queue_push_head (&aggpad->priv->data, query);
2646     SRC_BROADCAST (self);
2647     SRC_UNLOCK (self);
2648
2649     while (!gst_aggregator_pad_queue_is_empty (aggpad)
2650         && aggpad->priv->flow_return == GST_FLOW_OK) {
2651       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2652       PAD_WAIT_EVENT (aggpad);
2653     }
2654
2655     s = gst_query_writable_structure (query);
2656     if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret))
2657       gst_structure_remove_field (s, "gst-aggregator-retval");
2658     else
2659       g_queue_remove (&aggpad->priv->data, query);
2660
2661     if (aggpad->priv->flow_return != GST_FLOW_OK)
2662       goto flushing;
2663
2664     PAD_UNLOCK (aggpad);
2665
2666     return ret;
2667   }
2668
2669   return klass->sink_query (self, aggpad, query);
2670
2671 flushing:
2672   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
2673       gst_flow_get_name (aggpad->priv->flow_return));
2674   PAD_UNLOCK (aggpad);
2675
2676   return FALSE;
2677 }
2678
2679 /* Queue serialized events and let the others go though directly.
2680  * The queued events with be handled from the src-pad task in
2681  * gst_aggregator_do_events_and_queries().
2682  */
2683 static GstFlowReturn
2684 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
2685     GstEvent * event)
2686 {
2687   GstFlowReturn ret = GST_FLOW_OK;
2688   GstAggregator *self = GST_AGGREGATOR (parent);
2689   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2690   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2691
2692   if (GST_EVENT_IS_SERIALIZED (event)
2693       && GST_EVENT_TYPE (event) != GST_EVENT_EOS) {
2694     SRC_LOCK (self);
2695     PAD_LOCK (aggpad);
2696
2697     if (aggpad->priv->flow_return != GST_FLOW_OK
2698         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
2699       ret = aggpad->priv->flow_return;
2700       goto flushing;
2701     }
2702
2703     if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
2704       GST_OBJECT_LOCK (aggpad);
2705       gst_event_copy_segment (event, &aggpad->priv->head_segment);
2706       aggpad->priv->head_position = aggpad->priv->head_segment.position;
2707       update_time_level (aggpad, TRUE);
2708       GST_OBJECT_UNLOCK (aggpad);
2709     }
2710
2711     if (GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
2712       GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT,
2713           event);
2714       g_queue_push_head (&aggpad->priv->data, event);
2715       event = NULL;
2716       SRC_BROADCAST (self);
2717     }
2718     PAD_UNLOCK (aggpad);
2719     SRC_UNLOCK (self);
2720   }
2721
2722   if (event) {
2723     if (!klass->sink_event (self, aggpad, event)) {
2724       /* Copied from GstPad to convert boolean to a GstFlowReturn in
2725        * the event handling func */
2726       ret = GST_FLOW_ERROR;
2727     }
2728   }
2729
2730   return ret;
2731
2732 flushing:
2733   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
2734       gst_flow_get_name (aggpad->priv->flow_return));
2735   PAD_UNLOCK (aggpad);
2736   SRC_UNLOCK (self);
2737   if (GST_EVENT_IS_STICKY (event))
2738     gst_pad_store_sticky_event (pad, event);
2739   gst_event_unref (event);
2740
2741   return ret;
2742 }
2743
2744 static gboolean
2745 gst_aggregator_pad_activate_mode_func (GstPad * pad,
2746     GstObject * parent, GstPadMode mode, gboolean active)
2747 {
2748   GstAggregator *self = GST_AGGREGATOR (parent);
2749   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2750
2751   if (active == FALSE) {
2752     SRC_LOCK (self);
2753     gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
2754     SRC_BROADCAST (self);
2755     SRC_UNLOCK (self);
2756   } else {
2757     PAD_LOCK (aggpad);
2758     aggpad->priv->flow_return = GST_FLOW_OK;
2759     PAD_BROADCAST_EVENT (aggpad);
2760     PAD_UNLOCK (aggpad);
2761   }
2762
2763   return TRUE;
2764 }
2765
2766 /***********************************
2767  * GstAggregatorPad implementation  *
2768  ************************************/
2769 G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
2770
2771 static void
2772 gst_aggregator_pad_constructed (GObject * object)
2773 {
2774   GstPad *pad = GST_PAD (object);
2775
2776   gst_pad_set_chain_function (pad,
2777       GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
2778   gst_pad_set_event_full_function_full (pad,
2779       GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
2780   gst_pad_set_query_function (pad,
2781       GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
2782   gst_pad_set_activatemode_function (pad,
2783       GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
2784 }
2785
2786 static void
2787 gst_aggregator_pad_finalize (GObject * object)
2788 {
2789   GstAggregatorPad *pad = (GstAggregatorPad *) object;
2790
2791   g_cond_clear (&pad->priv->event_cond);
2792   g_mutex_clear (&pad->priv->flush_lock);
2793   g_mutex_clear (&pad->priv->lock);
2794
2795   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object);
2796 }
2797
2798 static void
2799 gst_aggregator_pad_dispose (GObject * object)
2800 {
2801   GstAggregatorPad *pad = (GstAggregatorPad *) object;
2802
2803   gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE);
2804
2805   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
2806 }
2807
2808 static void
2809 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
2810 {
2811   GObjectClass *gobject_class = (GObjectClass *) klass;
2812
2813   g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
2814
2815   gobject_class->constructed = gst_aggregator_pad_constructed;
2816   gobject_class->finalize = gst_aggregator_pad_finalize;
2817   gobject_class->dispose = gst_aggregator_pad_dispose;
2818 }
2819
2820 static void
2821 gst_aggregator_pad_init (GstAggregatorPad * pad)
2822 {
2823   pad->priv =
2824       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
2825       GstAggregatorPadPrivate);
2826
2827   g_queue_init (&pad->priv->data);
2828   g_cond_init (&pad->priv->event_cond);
2829
2830   g_mutex_init (&pad->priv->flush_lock);
2831   g_mutex_init (&pad->priv->lock);
2832
2833   gst_aggregator_pad_reset_unlocked (pad);
2834   pad->priv->negotiated = FALSE;
2835 }
2836
2837 /* Must be called with the PAD_LOCK held */
2838 static void
2839 gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad)
2840 {
2841   pad->priv->num_buffers--;
2842   GST_TRACE_OBJECT (pad, "Consuming buffer");
2843   if (gst_aggregator_pad_queue_is_empty (pad) && pad->priv->pending_eos) {
2844     pad->priv->pending_eos = FALSE;
2845     pad->priv->eos = TRUE;
2846   }
2847   PAD_BROADCAST_EVENT (pad);
2848 }
2849
2850 /* Must be called with the PAD_LOCK held */
2851 static void
2852 gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
2853 {
2854   GstAggregator *self = NULL;
2855   GstAggregatorClass *aggclass = NULL;
2856   GstBuffer *buffer = NULL;
2857
2858   while (pad->priv->clipped_buffer == NULL &&
2859       GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
2860     buffer = g_queue_pop_tail (&pad->priv->data);
2861
2862     apply_buffer (pad, buffer, FALSE);
2863
2864     /* We only take the parent here so that it's not taken if the buffer is
2865      * already clipped or if the queue is empty.
2866      */
2867     if (self == NULL) {
2868       self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
2869       if (self == NULL) {
2870         gst_buffer_unref (buffer);
2871         return;
2872       }
2873
2874       aggclass = GST_AGGREGATOR_GET_CLASS (self);
2875     }
2876
2877     if (aggclass->clip) {
2878       GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer);
2879
2880       buffer = aggclass->clip (self, pad, buffer);
2881
2882       if (buffer == NULL) {
2883         gst_aggregator_pad_buffer_consumed (pad);
2884         GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
2885       }
2886     }
2887
2888     pad->priv->clipped_buffer = buffer;
2889   }
2890
2891   if (self)
2892     gst_object_unref (self);
2893 }
2894
2895 /**
2896  * gst_aggregator_pad_steal_buffer:
2897  * @pad: the pad to get buffer from
2898  *
2899  * Steal the ref to the buffer currently queued in @pad.
2900  *
2901  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
2902  *   queued. You should unref the buffer after usage.
2903  */
2904 GstBuffer *
2905 gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
2906 {
2907   GstBuffer *buffer;
2908
2909   PAD_LOCK (pad);
2910
2911   gst_aggregator_pad_clip_buffer_unlocked (pad);
2912
2913   buffer = pad->priv->clipped_buffer;
2914
2915   if (buffer) {
2916     pad->priv->clipped_buffer = NULL;
2917     gst_aggregator_pad_buffer_consumed (pad);
2918     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
2919   }
2920
2921   PAD_UNLOCK (pad);
2922
2923   return buffer;
2924 }
2925
2926 /**
2927  * gst_aggregator_pad_drop_buffer:
2928  * @pad: the pad where to drop any pending buffer
2929  *
2930  * Drop the buffer currently queued in @pad.
2931  *
2932  * Returns: TRUE if there was a buffer queued in @pad, or FALSE if not.
2933  */
2934 gboolean
2935 gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
2936 {
2937   GstBuffer *buf;
2938
2939   buf = gst_aggregator_pad_steal_buffer (pad);
2940
2941   if (buf == NULL)
2942     return FALSE;
2943
2944   gst_buffer_unref (buf);
2945   return TRUE;
2946 }
2947
2948 /**
2949  * gst_aggregator_pad_get_buffer:
2950  * @pad: the pad to get buffer from
2951  *
2952  * Returns: (transfer full): A reference to the buffer in @pad or
2953  * NULL if no buffer was queued. You should unref the buffer after
2954  * usage.
2955  */
2956 GstBuffer *
2957 gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
2958 {
2959   GstBuffer *buffer;
2960
2961   PAD_LOCK (pad);
2962
2963   gst_aggregator_pad_clip_buffer_unlocked (pad);
2964
2965   if (pad->priv->clipped_buffer) {
2966     buffer = gst_buffer_ref (pad->priv->clipped_buffer);
2967   } else {
2968     buffer = NULL;
2969   }
2970   PAD_UNLOCK (pad);
2971
2972   return buffer;
2973 }
2974
2975 gboolean
2976 gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
2977 {
2978   gboolean is_eos;
2979
2980   PAD_LOCK (pad);
2981   is_eos = pad->priv->eos;
2982   PAD_UNLOCK (pad);
2983
2984   return is_eos;
2985 }
2986
2987 /**
2988  * gst_aggregator_merge_tags:
2989  * @self: a #GstAggregator
2990  * @tags: a #GstTagList to merge
2991  * @mode: the #GstTagMergeMode to use
2992  *
2993  * Adds tags to so-called pending tags, which will be processed
2994  * before pushing out data downstream.
2995  *
2996  * Note that this is provided for convenience, and the subclass is
2997  * not required to use this and can still do tag handling on its own.
2998  *
2999  * MT safe.
3000  */
3001 void
3002 gst_aggregator_merge_tags (GstAggregator * self,
3003     const GstTagList * tags, GstTagMergeMode mode)
3004 {
3005   GstTagList *otags;
3006
3007   g_return_if_fail (GST_IS_AGGREGATOR (self));
3008   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
3009
3010   /* FIXME Check if we can use OBJECT lock here! */
3011   GST_OBJECT_LOCK (self);
3012   if (tags)
3013     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
3014   otags = self->priv->tags;
3015   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
3016   if (otags)
3017     gst_tag_list_unref (otags);
3018   self->priv->tags_changed = TRUE;
3019   GST_OBJECT_UNLOCK (self);
3020 }
3021
3022 /**
3023  * gst_aggregator_set_latency:
3024  * @self: a #GstAggregator
3025  * @min_latency: minimum latency
3026  * @max_latency: maximum latency
3027  *
3028  * Lets #GstAggregator sub-classes tell the baseclass what their internal
3029  * latency is. Will also post a LATENCY message on the bus so the pipeline
3030  * can reconfigure its global latency.
3031  */
3032 void
3033 gst_aggregator_set_latency (GstAggregator * self,
3034     GstClockTime min_latency, GstClockTime max_latency)
3035 {
3036   gboolean changed = FALSE;
3037
3038   g_return_if_fail (GST_IS_AGGREGATOR (self));
3039   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
3040   g_return_if_fail (max_latency >= min_latency);
3041
3042   SRC_LOCK (self);
3043   if (self->priv->sub_latency_min != min_latency) {
3044     self->priv->sub_latency_min = min_latency;
3045     changed = TRUE;
3046   }
3047   if (self->priv->sub_latency_max != max_latency) {
3048     self->priv->sub_latency_max = max_latency;
3049     changed = TRUE;
3050   }
3051
3052   if (changed)
3053     SRC_BROADCAST (self);
3054   SRC_UNLOCK (self);
3055
3056   if (changed) {
3057     gst_element_post_message (GST_ELEMENT_CAST (self),
3058         gst_message_new_latency (GST_OBJECT_CAST (self)));
3059   }
3060 }
3061
3062 /**
3063  * gst_aggregator_get_buffer_pool:
3064  * @self: a #GstAggregator
3065  *
3066  * Returns: (transfer full): the instance of the #GstBufferPool used
3067  * by @trans; free it after use it
3068  */
3069 GstBufferPool *
3070 gst_aggregator_get_buffer_pool (GstAggregator * self)
3071 {
3072   GstBufferPool *pool;
3073
3074   g_return_val_if_fail (GST_IS_AGGREGATOR (self), NULL);
3075
3076   GST_OBJECT_LOCK (self);
3077   pool = self->priv->pool;
3078   if (pool)
3079     gst_object_ref (pool);
3080   GST_OBJECT_UNLOCK (self);
3081
3082   return pool;
3083 }
3084
3085 /**
3086  * gst_aggregator_get_allocator:
3087  * @self: a #GstAggregator
3088  * @allocator: (out) (allow-none) (transfer full): the #GstAllocator
3089  * used
3090  * @params: (out) (allow-none) (transfer full): the
3091  * #GstAllocationParams of @allocator
3092  *
3093  * Lets #GstAggregator sub-classes get the memory @allocator
3094  * acquired by the base class and its @params.
3095  *
3096  * Unref the @allocator after use it.
3097  */
3098 void
3099 gst_aggregator_get_allocator (GstAggregator * self,
3100     GstAllocator ** allocator, GstAllocationParams * params)
3101 {
3102   g_return_if_fail (GST_IS_AGGREGATOR (self));
3103
3104   if (allocator)
3105     *allocator = self->priv->allocator ?
3106         gst_object_ref (self->priv->allocator) : NULL;
3107
3108   if (params)
3109     *params = self->priv->allocation_params;
3110 }