aggregator: allow src GstAggregatorPads
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
1 /* GStreamer aggregator base class
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @title: GstAggregator
25  * @short_description: Base class for mixers and muxers, manages a set of input
26  *     pads and aggregates their streams
27  * @see_also: gstcollectpads for historical reasons.
28  *
29  * Manages a set of pads with the purpose of aggregating their buffers.
30  * Control is given to the subclass when all pads have data.
31  *
32  *  * Base class for mixers and muxers. Subclasses should at least implement
33  *    the #GstAggregatorClass.aggregate() virtual method.
34  *
35  *  * Installs a #GstPadChainFunction, a #GstPadEventFullFunction and a
36  *    #GstPadQueryFunction to queue all serialized data packets per sink pad.
37  *    Subclasses should not overwrite those, but instead implement
38  *    #GstAggregatorClass.sink_event() and #GstAggregatorClass.sink_query() as
39  *    needed.
40  *
41  *  * When data is queued on all pads, the aggregate vmethod is called.
42  *
43  *  * One can peek at the data on any given GstAggregatorPad with the
44  *    gst_aggregator_pad_peek_buffer () method, and remove it from the pad
45  *    with the gst_aggregator_pad_pop_buffer () method. When a buffer
46  *    has been taken with pop_buffer (), a new buffer can be queued
47  *    on that pad.
48  *
49  *  * If the subclass wishes to push a buffer downstream in its aggregate
50  *    implementation, it should do so through the
51  *    gst_aggregator_finish_buffer () method. This method will take care
52  *    of sending and ordering mandatory events such as stream start, caps
53  *    and segment.
54  *
55  *  * Same goes for EOS events, which should not be pushed directly by the
56  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
57  *    implementation.
58  *
59  *  * Note that the aggregator logic regarding gap event handling is to turn
60  *    these into gap buffers with matching PTS and duration. It will also
61  *    flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
62  *    to ease their identification and subsequent processing.
63  *
64  * This class used to live in gst-plugins-bad and was moved to core.
65  *
66  * Since: 1.14
67  */
68
69 /**
70  * SECTION: gstaggregatorpad
71  * @title: GstAggregatorPad
72  * @short_description: #GstPad subclass for pads managed by #GstAggregator
73  * @see_also: gstcollectpads for historical reasons.
74  *
75  * Pads managed by a #GstAggregor subclass.
76  *
77  * This class used to live in gst-plugins-bad and was moved to core.
78  *
79  * Since: 1.14
80  */
81
82 #ifdef HAVE_CONFIG_H
83 #  include "config.h"
84 #endif
85
86 #include <string.h>             /* strlen */
87
88 #include "gstaggregator.h"
89
90 typedef enum
91 {
92   GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
93   GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
94   GST_AGGREGATOR_START_TIME_SELECTION_SET
95 } GstAggregatorStartTimeSelection;
96
97 static GType
98 gst_aggregator_start_time_selection_get_type (void)
99 {
100   static GType gtype = 0;
101
102   if (gtype == 0) {
103     static const GEnumValue values[] = {
104       {GST_AGGREGATOR_START_TIME_SELECTION_ZERO,
105           "Start at 0 running time (default)", "zero"},
106       {GST_AGGREGATOR_START_TIME_SELECTION_FIRST,
107           "Start at first observed input running time", "first"},
108       {GST_AGGREGATOR_START_TIME_SELECTION_SET,
109           "Set start time with start-time property", "set"},
110       {0, NULL, NULL}
111     };
112
113     gtype = g_enum_register_static ("GstAggregatorStartTimeSelection", values);
114   }
115   return gtype;
116 }
117
118 /*  Might become API */
119 #if 0
120 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
121     const GstTagList * tags, GstTagMergeMode mode);
122 #endif
123 static void gst_aggregator_set_latency_property (GstAggregator * agg,
124     GstClockTime latency);
125 static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
126
127 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
128
129 static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad);
130
131 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
132 #define GST_CAT_DEFAULT aggregator_debug
133
134 /* Locking order, locks in this element must always be taken in this order
135  *
136  * standard sink pad stream lock -> GST_PAD_STREAM_LOCK (aggpad)
137  * Aggregator pad flush lock -> PAD_FLUSH_LOCK(aggpad)
138  * standard src pad stream lock -> GST_PAD_STREAM_LOCK (srcpad)
139  * Aggregator src lock -> SRC_LOCK(agg) w/ SRC_WAIT/BROADCAST
140  * standard element object lock -> GST_OBJECT_LOCK(agg)
141  * Aggregator pad lock -> PAD_LOCK (aggpad) w/ PAD_WAIT/BROADCAST_EVENT(aggpad)
142  * standard src pad object lock -> GST_OBJECT_LOCK(srcpad)
143  * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad)
144  */
145
146 /* GstAggregatorPad definitions */
147 #define PAD_LOCK(pad)   G_STMT_START {                                  \
148   GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p",              \
149         g_thread_self());                                               \
150   g_mutex_lock(&pad->priv->lock);                                       \
151   GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p",                \
152         g_thread_self());                                               \
153   } G_STMT_END
154
155 #define PAD_UNLOCK(pad)  G_STMT_START {                                 \
156   GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p",           \
157       g_thread_self());                                                 \
158   g_mutex_unlock(&pad->priv->lock);                                     \
159   GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p",             \
160         g_thread_self());                                               \
161   } G_STMT_END
162
163
164 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
165   GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p",   \
166         g_thread_self());                                               \
167   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),           \
168       (&((GstAggregatorPad*)pad)->priv->lock));                         \
169   GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \
170         g_thread_self());                                               \
171   } G_STMT_END
172
173 #define PAD_BROADCAST_EVENT(pad) G_STMT_START {                        \
174   GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p",     \
175         g_thread_self());                                              \
176   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond));    \
177   } G_STMT_END
178
179
180 #define PAD_FLUSH_LOCK(pad)     G_STMT_START {                          \
181   GST_TRACE_OBJECT (pad, "Taking lock from thread %p",                  \
182         g_thread_self());                                               \
183   g_mutex_lock(&pad->priv->flush_lock);                                 \
184   GST_TRACE_OBJECT (pad, "Took lock from thread %p",                    \
185         g_thread_self());                                               \
186   } G_STMT_END
187
188 #define PAD_FLUSH_UNLOCK(pad)   G_STMT_START {                          \
189   GST_TRACE_OBJECT (pad, "Releasing lock from thread %p",               \
190         g_thread_self());                                               \
191   g_mutex_unlock(&pad->priv->flush_lock);                               \
192   GST_TRACE_OBJECT (pad, "Release lock from thread %p",                 \
193         g_thread_self());                                               \
194   } G_STMT_END
195
196 #define SRC_LOCK(self)   G_STMT_START {                             \
197   GST_TRACE_OBJECT (self, "Taking src lock from thread %p",         \
198       g_thread_self());                                             \
199   g_mutex_lock(&self->priv->src_lock);                              \
200   GST_TRACE_OBJECT (self, "Took src lock from thread %p",           \
201         g_thread_self());                                           \
202   } G_STMT_END
203
204 #define SRC_UNLOCK(self)  G_STMT_START {                            \
205   GST_TRACE_OBJECT (self, "Releasing src lock from thread %p",      \
206         g_thread_self());                                           \
207   g_mutex_unlock(&self->priv->src_lock);                            \
208   GST_TRACE_OBJECT (self, "Released src lock from thread %p",       \
209         g_thread_self());                                           \
210   } G_STMT_END
211
212 #define SRC_WAIT(self) G_STMT_START {                               \
213   GST_LOG_OBJECT (self, "Waiting for src on thread %p",             \
214         g_thread_self());                                           \
215   g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));    \
216   GST_LOG_OBJECT (self, "DONE Waiting for src on thread %p",        \
217         g_thread_self());                                           \
218   } G_STMT_END
219
220 #define SRC_BROADCAST(self) G_STMT_START {                          \
221     GST_LOG_OBJECT (self, "Signaling src from thread %p",           \
222         g_thread_self());                                           \
223     if (self->priv->aggregate_id)                                   \
224       gst_clock_id_unschedule (self->priv->aggregate_id);           \
225     g_cond_broadcast(&(self->priv->src_cond));                      \
226   } G_STMT_END
227
228 struct _GstAggregatorPadPrivate
229 {
230   /* Following fields are protected by the PAD_LOCK */
231   GstFlowReturn flow_return;
232   gboolean pending_flush_start;
233   gboolean pending_flush_stop;
234
235   gboolean first_buffer;
236
237   GQueue data;                  /* buffers, events and queries */
238   GstBuffer *clipped_buffer;
239   guint num_buffers;
240
241   /* used to track fill state of queues, only used with live-src and when
242    * latency property is set to > 0 */
243   GstClockTime head_position;
244   GstClockTime tail_position;
245   GstClockTime head_time;       /* running time */
246   GstClockTime tail_time;
247   GstClockTime time_level;      /* how much head is ahead of tail */
248   GstSegment head_segment;      /* segment before the queue */
249
250   gboolean negotiated;
251
252   gboolean eos;
253
254   GMutex lock;
255   GCond event_cond;
256   /* This lock prevents a flush start processing happening while
257    * the chain function is also happening.
258    */
259   GMutex flush_lock;
260 };
261
262 /* Must be called with PAD_LOCK held */
263 static void
264 gst_aggregator_pad_reset_unlocked (GstAggregatorPad * aggpad)
265 {
266   aggpad->priv->eos = FALSE;
267   aggpad->priv->flow_return = GST_FLOW_OK;
268   GST_OBJECT_LOCK (aggpad);
269   gst_segment_init (&aggpad->segment, GST_FORMAT_UNDEFINED);
270   gst_segment_init (&aggpad->priv->head_segment, GST_FORMAT_UNDEFINED);
271   GST_OBJECT_UNLOCK (aggpad);
272   aggpad->priv->head_position = GST_CLOCK_TIME_NONE;
273   aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
274   aggpad->priv->head_time = GST_CLOCK_TIME_NONE;
275   aggpad->priv->tail_time = GST_CLOCK_TIME_NONE;
276   aggpad->priv->time_level = 0;
277   aggpad->priv->first_buffer = TRUE;
278 }
279
280 static gboolean
281 gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
282 {
283   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
284
285   PAD_LOCK (aggpad);
286   gst_aggregator_pad_reset_unlocked (aggpad);
287   PAD_UNLOCK (aggpad);
288
289   if (klass->flush)
290     return klass->flush (aggpad, agg);
291
292   return TRUE;
293 }
294
295 /*************************************
296  * GstAggregator implementation  *
297  *************************************/
298 static GstElementClass *aggregator_parent_class = NULL;
299
300 /* All members are protected by the object lock unless otherwise noted */
301
302 struct _GstAggregatorPrivate
303 {
304   gint max_padserial;
305
306   /* Our state is >= PAUSED */
307   gboolean running;             /* protected by src_lock */
308
309   /* seqnum from seek or segment,
310    * to be applied to synthetic segment/eos events */
311   gint seqnum;
312   gboolean send_stream_start;   /* protected by srcpad stream lock */
313   gboolean send_segment;
314   gboolean flush_seeking;
315   gboolean pending_flush_start;
316   gboolean send_eos;            /* protected by srcpad stream lock */
317
318   GstCaps *srccaps;             /* protected by the srcpad stream lock */
319
320   GstTagList *tags;
321   gboolean tags_changed;
322
323   gboolean peer_latency_live;   /* protected by src_lock */
324   GstClockTime peer_latency_min;        /* protected by src_lock */
325   GstClockTime peer_latency_max;        /* protected by src_lock */
326   gboolean has_peer_latency;    /* protected by src_lock */
327
328   GstClockTime sub_latency_min; /* protected by src_lock */
329   GstClockTime sub_latency_max; /* protected by src_lock */
330
331   /* aggregate */
332   GstClockID aggregate_id;      /* protected by src_lock */
333   GMutex src_lock;
334   GCond src_cond;
335
336   gboolean first_buffer;        /* protected by object lock */
337   GstAggregatorStartTimeSelection start_time_selection;
338   GstClockTime start_time;
339
340   /* protected by the object lock */
341   GstQuery *allocation_query;
342   GstAllocator *allocator;
343   GstBufferPool *pool;
344   GstAllocationParams allocation_params;
345
346   /* properties */
347   gint64 latency;               /* protected by both src_lock and all pad locks */
348 };
349
350 /* Seek event forwarding helper */
351 typedef struct
352 {
353   /* parameters */
354   GstEvent *event;
355   gboolean flush;
356   gboolean only_to_active_pads;
357
358   /* results */
359   gboolean result;
360   gboolean one_actually_seeked;
361 } EventData;
362
363 #define DEFAULT_LATENCY              0
364 #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
365 #define DEFAULT_START_TIME           (-1)
366
367 enum
368 {
369   PROP_0,
370   PROP_LATENCY,
371   PROP_START_TIME_SELECTION,
372   PROP_START_TIME,
373   PROP_LAST
374 };
375
376 static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
377     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
378
379 static gboolean
380 gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
381 {
382   return (g_queue_peek_tail (&pad->priv->data) == NULL &&
383       pad->priv->clipped_buffer == NULL);
384 }
385
386 static gboolean
387 gst_aggregator_check_pads_ready (GstAggregator * self)
388 {
389   GstAggregatorPad *pad = NULL;
390   GList *l, *sinkpads;
391   gboolean have_buffer = TRUE;
392   gboolean have_event_or_query = FALSE;
393
394   GST_LOG_OBJECT (self, "checking pads");
395
396   GST_OBJECT_LOCK (self);
397
398   sinkpads = GST_ELEMENT_CAST (self)->sinkpads;
399   if (sinkpads == NULL)
400     goto no_sinkpads;
401
402   for (l = sinkpads; l != NULL; l = l->next) {
403     pad = l->data;
404
405     PAD_LOCK (pad);
406
407     if (pad->priv->num_buffers == 0) {
408       if (!gst_aggregator_pad_queue_is_empty (pad))
409         have_event_or_query = TRUE;
410       if (!pad->priv->eos) {
411         have_buffer = FALSE;
412
413         /* If not live we need data on all pads, so leave the loop */
414         if (!self->priv->peer_latency_live) {
415           PAD_UNLOCK (pad);
416           goto pad_not_ready;
417         }
418       }
419     } else if (self->priv->peer_latency_live) {
420       /* In live mode, having a single pad with buffers is enough to
421        * generate a start time from it. In non-live mode all pads need
422        * to have a buffer
423        */
424       self->priv->first_buffer = FALSE;
425     }
426
427     PAD_UNLOCK (pad);
428   }
429
430   if (!have_buffer && !have_event_or_query)
431     goto pad_not_ready;
432
433   if (have_buffer)
434     self->priv->first_buffer = FALSE;
435
436   GST_OBJECT_UNLOCK (self);
437   GST_LOG_OBJECT (self, "pads are ready");
438   return TRUE;
439
440 no_sinkpads:
441   {
442     GST_LOG_OBJECT (self, "pads not ready: no sink pads");
443     GST_OBJECT_UNLOCK (self);
444     return FALSE;
445   }
446 pad_not_ready:
447   {
448     if (have_event_or_query)
449       GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet,"
450           " but waking up for serialized event");
451     else
452       GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
453     GST_OBJECT_UNLOCK (self);
454     return have_event_or_query;
455   }
456 }
457
458 static void
459 gst_aggregator_reset_flow_values (GstAggregator * self)
460 {
461   GST_OBJECT_LOCK (self);
462   self->priv->send_stream_start = TRUE;
463   self->priv->send_segment = TRUE;
464   gst_segment_init (&self->segment, GST_FORMAT_TIME);
465   self->priv->first_buffer = TRUE;
466   GST_OBJECT_UNLOCK (self);
467 }
468
469 static inline void
470 gst_aggregator_push_mandatory_events (GstAggregator * self)
471 {
472   GstAggregatorPrivate *priv = self->priv;
473   GstEvent *segment = NULL;
474   GstEvent *tags = NULL;
475
476   if (self->priv->send_stream_start) {
477     gchar s_id[32];
478
479     GST_INFO_OBJECT (self, "pushing stream start");
480     /* stream-start (FIXME: create id based on input ids) */
481     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
482     if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) {
483       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
484     }
485     self->priv->send_stream_start = FALSE;
486   }
487
488   if (self->priv->srccaps) {
489
490     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
491         self->priv->srccaps);
492     if (!gst_pad_push_event (self->srcpad,
493             gst_event_new_caps (self->priv->srccaps))) {
494       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
495     }
496     gst_caps_unref (self->priv->srccaps);
497     self->priv->srccaps = NULL;
498   }
499
500   GST_OBJECT_LOCK (self);
501   if (self->priv->send_segment && !self->priv->flush_seeking) {
502     segment = gst_event_new_segment (&self->segment);
503
504     if (!self->priv->seqnum)
505       /* This code-path is in preparation to be able to run without a source
506        * connected. Then we won't have a seq-num from a segment event. */
507       self->priv->seqnum = gst_event_get_seqnum (segment);
508     else
509       gst_event_set_seqnum (segment, self->priv->seqnum);
510     self->priv->send_segment = FALSE;
511
512     GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
513   }
514
515   if (priv->tags && priv->tags_changed && !self->priv->flush_seeking) {
516     tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
517     priv->tags_changed = FALSE;
518   }
519   GST_OBJECT_UNLOCK (self);
520
521   if (segment)
522     gst_pad_push_event (self->srcpad, segment);
523   if (tags)
524     gst_pad_push_event (self->srcpad, tags);
525
526 }
527
528 /**
529  * gst_aggregator_set_src_caps:
530  * @self: The #GstAggregator
531  * @caps: The #GstCaps to set on the src pad.
532  *
533  * Sets the caps to be used on the src pad.
534  */
535 void
536 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
537 {
538   GST_PAD_STREAM_LOCK (self->srcpad);
539   gst_caps_replace (&self->priv->srccaps, caps);
540   gst_aggregator_push_mandatory_events (self);
541   GST_PAD_STREAM_UNLOCK (self->srcpad);
542 }
543
544 static GstFlowReturn
545 gst_aggregator_default_finish_buffer (GstAggregator * self, GstBuffer * buffer)
546 {
547   gst_aggregator_push_mandatory_events (self);
548
549   GST_OBJECT_LOCK (self);
550   if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) {
551     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
552     GST_OBJECT_UNLOCK (self);
553     return gst_pad_push (self->srcpad, buffer);
554   } else {
555     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
556         self->priv->flush_seeking, gst_pad_is_active (self->srcpad));
557     GST_OBJECT_UNLOCK (self);
558     gst_buffer_unref (buffer);
559     return GST_FLOW_OK;
560   }
561 }
562
563 /**
564  * gst_aggregator_finish_buffer:
565  * @aggregator: The #GstAggregator
566  * @buffer: (transfer full): the #GstBuffer to push.
567  *
568  * This method will push the provided output buffer downstream. If needed,
569  * mandatory events such as stream-start, caps, and segment events will be
570  * sent before pushing the buffer.
571  */
572 GstFlowReturn
573 gst_aggregator_finish_buffer (GstAggregator * aggregator, GstBuffer * buffer)
574 {
575   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (aggregator);
576
577   g_assert (klass->finish_buffer != NULL);
578
579   return klass->finish_buffer (aggregator, buffer);
580 }
581
582 static void
583 gst_aggregator_push_eos (GstAggregator * self)
584 {
585   GstEvent *event;
586   gst_aggregator_push_mandatory_events (self);
587
588   event = gst_event_new_eos ();
589
590   GST_OBJECT_LOCK (self);
591   self->priv->send_eos = FALSE;
592   gst_event_set_seqnum (event, self->priv->seqnum);
593   GST_OBJECT_UNLOCK (self);
594
595   gst_pad_push_event (self->srcpad, event);
596 }
597
598 static GstClockTime
599 gst_aggregator_get_next_time (GstAggregator * self)
600 {
601   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
602
603   if (klass->get_next_time)
604     return klass->get_next_time (self);
605
606   return GST_CLOCK_TIME_NONE;
607 }
608
609 static gboolean
610 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
611 {
612   GstClockTime latency;
613   GstClockTime start;
614   gboolean res;
615
616   *timeout = FALSE;
617
618   SRC_LOCK (self);
619
620   latency = gst_aggregator_get_latency_unlocked (self);
621
622   if (gst_aggregator_check_pads_ready (self)) {
623     GST_DEBUG_OBJECT (self, "all pads have data");
624     SRC_UNLOCK (self);
625
626     return TRUE;
627   }
628
629   /* Before waiting, check if we're actually still running */
630   if (!self->priv->running || !self->priv->send_eos) {
631     SRC_UNLOCK (self);
632
633     return FALSE;
634   }
635
636   start = gst_aggregator_get_next_time (self);
637
638   /* If we're not live, or if we use the running time
639    * of the first buffer as start time, we wait until
640    * all pads have buffers.
641    * Otherwise (i.e. if we are live!), we wait on the clock
642    * and if a pad does not have a buffer in time we ignore
643    * that pad.
644    */
645   GST_OBJECT_LOCK (self);
646   if (!GST_CLOCK_TIME_IS_VALID (latency) ||
647       !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) ||
648       !GST_CLOCK_TIME_IS_VALID (start) ||
649       (self->priv->first_buffer
650           && self->priv->start_time_selection ==
651           GST_AGGREGATOR_START_TIME_SELECTION_FIRST)) {
652     /* We wake up here when something happened, and below
653      * then check if we're ready now. If we return FALSE,
654      * we will be directly called again.
655      */
656     GST_OBJECT_UNLOCK (self);
657     SRC_WAIT (self);
658   } else {
659     GstClockTime base_time, time;
660     GstClock *clock;
661     GstClockReturn status;
662     GstClockTimeDiff jitter;
663
664     GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
665         GST_TIME_ARGS (start));
666
667     base_time = GST_ELEMENT_CAST (self)->base_time;
668     clock = gst_object_ref (GST_ELEMENT_CLOCK (self));
669     GST_OBJECT_UNLOCK (self);
670
671     time = base_time + start;
672     time += latency;
673
674     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
675         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
676         " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")",
677         GST_TIME_ARGS (time),
678         GST_TIME_ARGS (base_time),
679         GST_TIME_ARGS (start), GST_TIME_ARGS (latency),
680         GST_TIME_ARGS (gst_clock_get_time (clock)));
681
682     self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
683     gst_object_unref (clock);
684     SRC_UNLOCK (self);
685
686     jitter = 0;
687     status = gst_clock_id_wait (self->priv->aggregate_id, &jitter);
688
689     SRC_LOCK (self);
690     if (self->priv->aggregate_id) {
691       gst_clock_id_unref (self->priv->aggregate_id);
692       self->priv->aggregate_id = NULL;
693     }
694
695     GST_DEBUG_OBJECT (self,
696         "clock returned %d (jitter: %" GST_STIME_FORMAT ")",
697         status, GST_STIME_ARGS (jitter));
698
699     /* we timed out */
700     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
701       SRC_UNLOCK (self);
702       *timeout = TRUE;
703       return TRUE;
704     }
705   }
706
707   res = gst_aggregator_check_pads_ready (self);
708   SRC_UNLOCK (self);
709
710   return res;
711 }
712
713 static gboolean
714 gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
715     gpointer user_data)
716 {
717   GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
718   GstAggregator *aggregator = GST_AGGREGATOR_CAST (self);
719   GstEvent *event = NULL;
720   GstQuery *query = NULL;
721   GstAggregatorClass *klass = NULL;
722   gboolean *processed_event = user_data;
723
724   do {
725     event = NULL;
726     query = NULL;
727
728     PAD_LOCK (pad);
729     if (pad->priv->clipped_buffer == NULL &&
730         !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
731       if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data)))
732         event = gst_event_ref (g_queue_peek_tail (&pad->priv->data));
733       if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data)))
734         query = g_queue_peek_tail (&pad->priv->data);
735     }
736     PAD_UNLOCK (pad);
737     if (event || query) {
738       gboolean ret;
739
740       if (processed_event)
741         *processed_event = TRUE;
742       if (klass == NULL)
743         klass = GST_AGGREGATOR_GET_CLASS (self);
744
745       if (event) {
746         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
747         gst_event_ref (event);
748         ret = klass->sink_event (aggregator, pad, event);
749
750         PAD_LOCK (pad);
751         if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS)
752           pad->priv->negotiated = ret;
753         if (g_queue_peek_tail (&pad->priv->data) == event)
754           gst_event_unref (g_queue_pop_tail (&pad->priv->data));
755         gst_event_unref (event);
756       } else if (query) {
757         GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query);
758         ret = klass->sink_query (aggregator, pad, query);
759
760         PAD_LOCK (pad);
761         if (g_queue_peek_tail (&pad->priv->data) == query) {
762           GstStructure *s;
763
764           s = gst_query_writable_structure (query);
765           gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret,
766               NULL);
767           g_queue_pop_tail (&pad->priv->data);
768         }
769       }
770
771       PAD_BROADCAST_EVENT (pad);
772       PAD_UNLOCK (pad);
773     }
774   } while (event || query);
775
776   return TRUE;
777 }
778
779 static gboolean
780 gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
781     gpointer user_data)
782 {
783   GList *item;
784   GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
785   GstAggregator *agg = (GstAggregator *) self;
786   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
787
788   if (!klass->skip_buffer)
789     return FALSE;
790
791   PAD_LOCK (aggpad);
792
793   item = g_queue_peek_head_link (&aggpad->priv->data);
794   while (item) {
795     GList *next = item->next;
796
797     if (GST_IS_BUFFER (item->data)
798         && klass->skip_buffer (aggpad, agg, item->data)) {
799       GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
800       gst_aggregator_pad_buffer_consumed (aggpad);
801       gst_buffer_unref (item->data);
802       g_queue_delete_link (&aggpad->priv->data, item);
803     } else {
804       break;
805     }
806
807     item = next;
808   }
809
810   PAD_UNLOCK (aggpad);
811
812   return TRUE;
813 }
814
815 static void
816 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
817     GstFlowReturn flow_return, gboolean full)
818 {
819   GList *item;
820
821   PAD_LOCK (aggpad);
822   if (flow_return == GST_FLOW_NOT_LINKED)
823     aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
824   else
825     aggpad->priv->flow_return = flow_return;
826
827   item = g_queue_peek_head_link (&aggpad->priv->data);
828   while (item) {
829     GList *next = item->next;
830
831     /* In partial flush, we do like the pad, we get rid of non-sticky events
832      * and EOS/SEGMENT.
833      */
834     if (full || GST_IS_BUFFER (item->data) ||
835         GST_EVENT_TYPE (item->data) == GST_EVENT_EOS ||
836         GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT ||
837         !GST_EVENT_IS_STICKY (item->data)) {
838       if (!GST_IS_QUERY (item->data))
839         gst_mini_object_unref (item->data);
840       g_queue_delete_link (&aggpad->priv->data, item);
841     }
842     item = next;
843   }
844   aggpad->priv->num_buffers = 0;
845   gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
846
847   PAD_BROADCAST_EVENT (aggpad);
848   PAD_UNLOCK (aggpad);
849 }
850
851 static GstFlowReturn
852 gst_aggregator_default_update_src_caps (GstAggregator * agg, GstCaps * caps,
853     GstCaps ** ret)
854 {
855   *ret = gst_caps_ref (caps);
856
857   return GST_FLOW_OK;
858 }
859
860 static GstCaps *
861 gst_aggregator_default_fixate_src_caps (GstAggregator * agg, GstCaps * caps)
862 {
863   caps = gst_caps_fixate (caps);
864
865   return caps;
866 }
867
868 static gboolean
869 gst_aggregator_default_negotiated_src_caps (GstAggregator * agg, GstCaps * caps)
870 {
871   return TRUE;
872 }
873
874
875 /* takes ownership of the pool, allocator and query */
876 static gboolean
877 gst_aggregator_set_allocation (GstAggregator * self,
878     GstBufferPool * pool, GstAllocator * allocator,
879     GstAllocationParams * params, GstQuery * query)
880 {
881   GstAllocator *oldalloc;
882   GstBufferPool *oldpool;
883   GstQuery *oldquery;
884
885   GST_DEBUG ("storing allocation query");
886
887   GST_OBJECT_LOCK (self);
888   oldpool = self->priv->pool;
889   self->priv->pool = pool;
890
891   oldalloc = self->priv->allocator;
892   self->priv->allocator = allocator;
893
894   oldquery = self->priv->allocation_query;
895   self->priv->allocation_query = query;
896
897   if (params)
898     self->priv->allocation_params = *params;
899   else
900     gst_allocation_params_init (&self->priv->allocation_params);
901   GST_OBJECT_UNLOCK (self);
902
903   if (oldpool) {
904     GST_DEBUG_OBJECT (self, "deactivating old pool %p", oldpool);
905     gst_buffer_pool_set_active (oldpool, FALSE);
906     gst_object_unref (oldpool);
907   }
908   if (oldalloc) {
909     gst_object_unref (oldalloc);
910   }
911   if (oldquery) {
912     gst_query_unref (oldquery);
913   }
914   return TRUE;
915 }
916
917
918 static gboolean
919 gst_aggregator_decide_allocation (GstAggregator * self, GstQuery * query)
920 {
921   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (self);
922
923   if (aggclass->decide_allocation)
924     if (!aggclass->decide_allocation (self, query))
925       return FALSE;
926
927   return TRUE;
928 }
929
930 static gboolean
931 gst_aggregator_do_allocation (GstAggregator * self, GstCaps * caps)
932 {
933   GstQuery *query;
934   gboolean result = TRUE;
935   GstBufferPool *pool = NULL;
936   GstAllocator *allocator;
937   GstAllocationParams params;
938
939   /* find a pool for the negotiated caps now */
940   GST_DEBUG_OBJECT (self, "doing allocation query");
941   query = gst_query_new_allocation (caps, TRUE);
942   if (!gst_pad_peer_query (self->srcpad, query)) {
943     /* not a problem, just debug a little */
944     GST_DEBUG_OBJECT (self, "peer ALLOCATION query failed");
945   }
946
947   GST_DEBUG_OBJECT (self, "calling decide_allocation");
948   result = gst_aggregator_decide_allocation (self, query);
949
950   GST_DEBUG_OBJECT (self, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
951       query);
952
953   if (!result)
954     goto no_decide_allocation;
955
956   /* we got configuration from our peer or the decide_allocation method,
957    * parse them */
958   if (gst_query_get_n_allocation_params (query) > 0) {
959     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
960   } else {
961     allocator = NULL;
962     gst_allocation_params_init (&params);
963   }
964
965   if (gst_query_get_n_allocation_pools (query) > 0)
966     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
967
968   /* now store */
969   result =
970       gst_aggregator_set_allocation (self, pool, allocator, &params, query);
971
972   return result;
973
974   /* Errors */
975 no_decide_allocation:
976   {
977     GST_WARNING_OBJECT (self, "Failed to decide allocation");
978     gst_query_unref (query);
979
980     return result;
981   }
982
983 }
984
985 /* WITH SRC_LOCK held */
986 static GstFlowReturn
987 gst_aggregator_update_src_caps (GstAggregator * self)
988 {
989   GstAggregatorClass *agg_klass = GST_AGGREGATOR_GET_CLASS (self);
990   GstCaps *downstream_caps, *template_caps, *caps = NULL;
991   GstFlowReturn ret = GST_FLOW_OK;
992
993   template_caps = gst_pad_get_pad_template_caps (self->srcpad);
994   downstream_caps = gst_pad_peer_query_caps (self->srcpad, template_caps);
995
996   if (gst_caps_is_empty (downstream_caps)) {
997     GST_INFO_OBJECT (self, "Downstream caps (%"
998         GST_PTR_FORMAT ") not compatible with pad template caps (%"
999         GST_PTR_FORMAT ")", downstream_caps, template_caps);
1000     ret = GST_FLOW_NOT_NEGOTIATED;
1001     goto done;
1002   }
1003
1004   g_assert (agg_klass->update_src_caps);
1005   GST_DEBUG_OBJECT (self, "updating caps from %" GST_PTR_FORMAT,
1006       downstream_caps);
1007   ret = agg_klass->update_src_caps (self, downstream_caps, &caps);
1008   if (ret < GST_FLOW_OK) {
1009     GST_WARNING_OBJECT (self, "Subclass failed to update provided caps");
1010     goto done;
1011   }
1012   if ((caps == NULL || gst_caps_is_empty (caps)) && ret >= GST_FLOW_OK) {
1013     ret = GST_FLOW_NOT_NEGOTIATED;
1014     goto done;
1015   }
1016   GST_DEBUG_OBJECT (self, "               to %" GST_PTR_FORMAT, caps);
1017
1018 #ifdef GST_ENABLE_EXTRA_CHECKS
1019   if (!gst_caps_is_subset (caps, template_caps)) {
1020     GstCaps *intersection;
1021
1022     GST_ERROR_OBJECT (self,
1023         "update_src_caps returned caps %" GST_PTR_FORMAT
1024         " which are not a real subset of the template caps %"
1025         GST_PTR_FORMAT, caps, template_caps);
1026     g_warning ("%s: update_src_caps returned caps which are not a real "
1027         "subset of the filter caps", GST_ELEMENT_NAME (self));
1028
1029     intersection =
1030         gst_caps_intersect_full (template_caps, caps, GST_CAPS_INTERSECT_FIRST);
1031     gst_caps_unref (caps);
1032     caps = intersection;
1033   }
1034 #endif
1035
1036   if (gst_caps_is_any (caps)) {
1037     goto done;
1038   }
1039
1040   if (!gst_caps_is_fixed (caps)) {
1041     g_assert (agg_klass->fixate_src_caps);
1042
1043     GST_DEBUG_OBJECT (self, "fixate caps from %" GST_PTR_FORMAT, caps);
1044     if (!(caps = agg_klass->fixate_src_caps (self, caps))) {
1045       GST_WARNING_OBJECT (self, "Subclass failed to fixate provided caps");
1046       ret = GST_FLOW_NOT_NEGOTIATED;
1047       goto done;
1048     }
1049     GST_DEBUG_OBJECT (self, "             to %" GST_PTR_FORMAT, caps);
1050   }
1051
1052   if (agg_klass->negotiated_src_caps) {
1053     if (!agg_klass->negotiated_src_caps (self, caps)) {
1054       GST_WARNING_OBJECT (self, "Subclass failed to accept negotiated caps");
1055       ret = GST_FLOW_NOT_NEGOTIATED;
1056       goto done;
1057     }
1058   }
1059
1060   gst_aggregator_set_src_caps (self, caps);
1061
1062   if (!gst_aggregator_do_allocation (self, caps)) {
1063     GST_WARNING_OBJECT (self, "Allocation negotiation failed");
1064     ret = GST_FLOW_NOT_NEGOTIATED;
1065   }
1066
1067 done:
1068   gst_caps_unref (downstream_caps);
1069   gst_caps_unref (template_caps);
1070
1071   if (caps)
1072     gst_caps_unref (caps);
1073
1074   return ret;
1075 }
1076
1077 static void
1078 gst_aggregator_aggregate_func (GstAggregator * self)
1079 {
1080   GstAggregatorPrivate *priv = self->priv;
1081   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1082   gboolean timeout = FALSE;
1083
1084   if (self->priv->running == FALSE) {
1085     GST_DEBUG_OBJECT (self, "Not running anymore");
1086     return;
1087   }
1088
1089   GST_LOG_OBJECT (self, "Checking aggregate");
1090   while (priv->send_eos && priv->running) {
1091     GstFlowReturn flow_return = GST_FLOW_OK;
1092     gboolean processed_event = FALSE;
1093
1094     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1095         gst_aggregator_do_events_and_queries, NULL);
1096
1097     if (self->priv->peer_latency_live)
1098       gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1099           gst_aggregator_pad_skip_buffers, NULL);
1100
1101     /* Ensure we have buffers ready (either in clipped_buffer or at the head of
1102      * the queue */
1103     if (!gst_aggregator_wait_and_check (self, &timeout))
1104       continue;
1105
1106     gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
1107         gst_aggregator_do_events_and_queries, &processed_event);
1108
1109     if (processed_event)
1110       continue;
1111
1112     if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
1113       flow_return = gst_aggregator_update_src_caps (self);
1114       if (flow_return != GST_FLOW_OK)
1115         gst_pad_mark_reconfigure (GST_AGGREGATOR_SRC_PAD (self));
1116     }
1117
1118     if (timeout || flow_return >= GST_FLOW_OK) {
1119       GST_TRACE_OBJECT (self, "Actually aggregating!");
1120       flow_return = klass->aggregate (self, timeout);
1121     }
1122
1123     if (flow_return == GST_AGGREGATOR_FLOW_NEED_DATA)
1124       continue;
1125
1126     GST_OBJECT_LOCK (self);
1127     if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) {
1128       /* We don't want to set the pads to flushing, but we want to
1129        * stop the thread, so just break here */
1130       GST_OBJECT_UNLOCK (self);
1131       break;
1132     }
1133     GST_OBJECT_UNLOCK (self);
1134
1135     if (flow_return == GST_FLOW_EOS || flow_return == GST_FLOW_ERROR) {
1136       gst_aggregator_push_eos (self);
1137     }
1138
1139     GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
1140
1141     if (flow_return != GST_FLOW_OK) {
1142       GList *item;
1143
1144       GST_OBJECT_LOCK (self);
1145       for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
1146         GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
1147
1148         gst_aggregator_pad_set_flushing (aggpad, flow_return, TRUE);
1149       }
1150       GST_OBJECT_UNLOCK (self);
1151       break;
1152     }
1153   }
1154
1155   /* Pause the task here, the only ways to get here are:
1156    * 1) We're stopping, in which case the task is stopped anyway
1157    * 2) We got a flow error above, in which case it might take
1158    *    some time to forward the flow return upstream and we
1159    *    would otherwise call the task function over and over
1160    *    again without doing anything
1161    */
1162   gst_pad_pause_task (self->srcpad);
1163 }
1164
1165 static gboolean
1166 gst_aggregator_start (GstAggregator * self)
1167 {
1168   GstAggregatorClass *klass;
1169   gboolean result;
1170
1171   self->priv->send_stream_start = TRUE;
1172   self->priv->send_segment = TRUE;
1173   self->priv->send_eos = TRUE;
1174   self->priv->srccaps = NULL;
1175
1176   gst_aggregator_set_allocation (self, NULL, NULL, NULL, NULL);
1177
1178   klass = GST_AGGREGATOR_GET_CLASS (self);
1179
1180   if (klass->start)
1181     result = klass->start (self);
1182   else
1183     result = TRUE;
1184
1185   return result;
1186 }
1187
1188 static gboolean
1189 _check_pending_flush_stop (GstAggregatorPad * pad)
1190 {
1191   gboolean res;
1192
1193   PAD_LOCK (pad);
1194   res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
1195   PAD_UNLOCK (pad);
1196
1197   return res;
1198 }
1199
1200 static gboolean
1201 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
1202 {
1203   gboolean res = TRUE;
1204
1205   GST_INFO_OBJECT (self, "%s srcpad task",
1206       flush_start ? "Pausing" : "Stopping");
1207
1208   SRC_LOCK (self);
1209   self->priv->running = FALSE;
1210   SRC_BROADCAST (self);
1211   SRC_UNLOCK (self);
1212
1213   if (flush_start) {
1214     res = gst_pad_push_event (self->srcpad, flush_start);
1215   }
1216
1217   gst_pad_stop_task (self->srcpad);
1218
1219   return res;
1220 }
1221
1222 static void
1223 gst_aggregator_start_srcpad_task (GstAggregator * self)
1224 {
1225   GST_INFO_OBJECT (self, "Starting srcpad task");
1226
1227   self->priv->running = TRUE;
1228   gst_pad_start_task (GST_PAD (self->srcpad),
1229       (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL);
1230 }
1231
1232 static GstFlowReturn
1233 gst_aggregator_flush (GstAggregator * self)
1234 {
1235   GstFlowReturn ret = GST_FLOW_OK;
1236   GstAggregatorPrivate *priv = self->priv;
1237   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
1238
1239   GST_DEBUG_OBJECT (self, "Flushing everything");
1240   GST_OBJECT_LOCK (self);
1241   priv->send_segment = TRUE;
1242   priv->flush_seeking = FALSE;
1243   priv->tags_changed = FALSE;
1244   GST_OBJECT_UNLOCK (self);
1245   if (klass->flush)
1246     ret = klass->flush (self);
1247
1248   return ret;
1249 }
1250
1251
1252 /* Called with GstAggregator's object lock held */
1253
1254 static gboolean
1255 gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
1256 {
1257   GList *tmp;
1258   GstAggregatorPad *tmppad;
1259
1260   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
1261     tmppad = (GstAggregatorPad *) tmp->data;
1262
1263     if (_check_pending_flush_stop (tmppad) == FALSE) {
1264       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
1265           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
1266       return FALSE;
1267     }
1268   }
1269
1270   return TRUE;
1271 }
1272
1273 static void
1274 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
1275     GstEvent * event)
1276 {
1277   GstAggregatorPrivate *priv = self->priv;
1278   GstAggregatorPadPrivate *padpriv = aggpad->priv;
1279
1280   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, FALSE);
1281
1282   PAD_FLUSH_LOCK (aggpad);
1283   PAD_LOCK (aggpad);
1284   if (padpriv->pending_flush_start) {
1285     GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
1286
1287     padpriv->pending_flush_start = FALSE;
1288     padpriv->pending_flush_stop = TRUE;
1289   }
1290   PAD_UNLOCK (aggpad);
1291
1292   GST_OBJECT_LOCK (self);
1293   if (priv->flush_seeking) {
1294     /* If flush_seeking we forward the first FLUSH_START */
1295     if (priv->pending_flush_start) {
1296       priv->pending_flush_start = FALSE;
1297       GST_OBJECT_UNLOCK (self);
1298
1299       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
1300       gst_aggregator_stop_srcpad_task (self, event);
1301
1302       GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
1303       GST_PAD_STREAM_LOCK (self->srcpad);
1304       GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
1305       event = NULL;
1306     } else {
1307       GST_OBJECT_UNLOCK (self);
1308       gst_event_unref (event);
1309     }
1310   } else {
1311     GST_OBJECT_UNLOCK (self);
1312     gst_event_unref (event);
1313   }
1314   PAD_FLUSH_UNLOCK (aggpad);
1315 }
1316
1317 /* Must be called with the the PAD_LOCK held */
1318 static void
1319 update_time_level (GstAggregatorPad * aggpad, gboolean head)
1320 {
1321   GstAggregatorPadPrivate *priv = aggpad->priv;
1322
1323   if (head) {
1324     if (GST_CLOCK_TIME_IS_VALID (priv->head_position) &&
1325         priv->head_segment.format == GST_FORMAT_TIME)
1326       priv->head_time = gst_segment_to_running_time (&priv->head_segment,
1327           GST_FORMAT_TIME, priv->head_position);
1328     else
1329       priv->head_time = GST_CLOCK_TIME_NONE;
1330
1331     if (!GST_CLOCK_TIME_IS_VALID (priv->tail_time))
1332       priv->tail_time = priv->head_time;
1333   } else {
1334     if (GST_CLOCK_TIME_IS_VALID (priv->tail_position) &&
1335         aggpad->segment.format == GST_FORMAT_TIME)
1336       priv->tail_time = gst_segment_to_running_time (&aggpad->segment,
1337           GST_FORMAT_TIME, priv->tail_position);
1338     else
1339       priv->tail_time = priv->head_time;
1340   }
1341
1342   if (priv->head_time == GST_CLOCK_TIME_NONE ||
1343       priv->tail_time == GST_CLOCK_TIME_NONE) {
1344     priv->time_level = 0;
1345     return;
1346   }
1347
1348   if (priv->tail_time > priv->head_time)
1349     priv->time_level = 0;
1350   else
1351     priv->time_level = priv->head_time - priv->tail_time;
1352 }
1353
1354
1355 /* GstAggregator vmethods default implementations */
1356 static gboolean
1357 gst_aggregator_default_sink_event (GstAggregator * self,
1358     GstAggregatorPad * aggpad, GstEvent * event)
1359 {
1360   gboolean res = TRUE;
1361   GstPad *pad = GST_PAD (aggpad);
1362   GstAggregatorPrivate *priv = self->priv;
1363
1364   GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event);
1365
1366   switch (GST_EVENT_TYPE (event)) {
1367     case GST_EVENT_FLUSH_START:
1368     {
1369       gst_aggregator_flush_start (self, aggpad, event);
1370       /* We forward only in one case: right after flush_seeking */
1371       event = NULL;
1372       goto eat;
1373     }
1374     case GST_EVENT_FLUSH_STOP:
1375     {
1376       gst_aggregator_pad_flush (aggpad, self);
1377       GST_OBJECT_LOCK (self);
1378       if (priv->flush_seeking) {
1379         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
1380         if (gst_aggregator_all_flush_stop_received_locked (self)) {
1381           GST_OBJECT_UNLOCK (self);
1382           /* That means we received FLUSH_STOP/FLUSH_STOP on
1383            * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
1384           gst_aggregator_flush (self);
1385           gst_pad_push_event (self->srcpad, event);
1386           event = NULL;
1387           SRC_LOCK (self);
1388           priv->send_eos = TRUE;
1389           SRC_BROADCAST (self);
1390           SRC_UNLOCK (self);
1391
1392           GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
1393           GST_PAD_STREAM_UNLOCK (self->srcpad);
1394           gst_aggregator_start_srcpad_task (self);
1395         } else {
1396           GST_OBJECT_UNLOCK (self);
1397         }
1398       } else {
1399         GST_OBJECT_UNLOCK (self);
1400       }
1401
1402       /* We never forward the event */
1403       goto eat;
1404     }
1405     case GST_EVENT_EOS:
1406     {
1407       SRC_LOCK (self);
1408       PAD_LOCK (aggpad);
1409       g_assert (aggpad->priv->num_buffers == 0);
1410       aggpad->priv->eos = TRUE;
1411       PAD_UNLOCK (aggpad);
1412       SRC_BROADCAST (self);
1413       SRC_UNLOCK (self);
1414       goto eat;
1415     }
1416     case GST_EVENT_SEGMENT:
1417     {
1418       PAD_LOCK (aggpad);
1419       GST_OBJECT_LOCK (aggpad);
1420       gst_event_copy_segment (event, &aggpad->segment);
1421       /* We've got a new segment, tail_position is now meaningless
1422        * and may interfere with the time_level calculation
1423        */
1424       aggpad->priv->tail_position = GST_CLOCK_TIME_NONE;
1425       update_time_level (aggpad, FALSE);
1426       GST_OBJECT_UNLOCK (aggpad);
1427       PAD_UNLOCK (aggpad);
1428
1429       GST_OBJECT_LOCK (self);
1430       self->priv->seqnum = gst_event_get_seqnum (event);
1431       GST_OBJECT_UNLOCK (self);
1432       goto eat;
1433     }
1434     case GST_EVENT_STREAM_START:
1435     {
1436       goto eat;
1437     }
1438     case GST_EVENT_GAP:
1439     {
1440       GstClockTime pts, endpts;
1441       GstClockTime duration;
1442       GstBuffer *gapbuf;
1443
1444       gst_event_parse_gap (event, &pts, &duration);
1445       gapbuf = gst_buffer_new ();
1446
1447       if (GST_CLOCK_TIME_IS_VALID (duration))
1448         endpts = pts + duration;
1449       else
1450         endpts = GST_CLOCK_TIME_NONE;
1451
1452       GST_OBJECT_LOCK (aggpad);
1453       res = gst_segment_clip (&aggpad->segment, GST_FORMAT_TIME, pts, endpts,
1454           &pts, &endpts);
1455       GST_OBJECT_UNLOCK (aggpad);
1456
1457       if (!res) {
1458         GST_WARNING_OBJECT (self, "GAP event outside segment, dropping");
1459         goto eat;
1460       }
1461
1462       if (GST_CLOCK_TIME_IS_VALID (endpts) && GST_CLOCK_TIME_IS_VALID (pts))
1463         duration = endpts - pts;
1464       else
1465         duration = GST_CLOCK_TIME_NONE;
1466
1467       GST_BUFFER_PTS (gapbuf) = pts;
1468       GST_BUFFER_DURATION (gapbuf) = duration;
1469       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
1470       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
1471
1472       /* Remove GAP event so we can replace it with the buffer */
1473       if (g_queue_peek_tail (&aggpad->priv->data) == event)
1474         gst_event_unref (g_queue_pop_tail (&aggpad->priv->data));
1475
1476       if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
1477           GST_FLOW_OK) {
1478         GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
1479         res = FALSE;
1480       }
1481
1482       goto eat;
1483     }
1484     case GST_EVENT_TAG:
1485       goto eat;
1486     default:
1487     {
1488       break;
1489     }
1490   }
1491
1492   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
1493   return gst_pad_event_default (pad, GST_OBJECT (self), event);
1494
1495 eat:
1496   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
1497   if (event)
1498     gst_event_unref (event);
1499
1500   return res;
1501 }
1502
1503 static gboolean
1504 gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data)
1505 {
1506   GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
1507   GstAggregator *agg = GST_AGGREGATOR_CAST (self);
1508
1509   gst_aggregator_pad_flush (pad, agg);
1510
1511   PAD_LOCK (pad);
1512   pad->priv->flow_return = GST_FLOW_FLUSHING;
1513   pad->priv->negotiated = FALSE;
1514   PAD_BROADCAST_EVENT (pad);
1515   PAD_UNLOCK (pad);
1516
1517   return TRUE;
1518 }
1519
1520 static gboolean
1521 gst_aggregator_stop (GstAggregator * agg)
1522 {
1523   GstAggregatorClass *klass;
1524   gboolean result;
1525
1526   gst_aggregator_reset_flow_values (agg);
1527
1528   /* Application needs to make sure no pads are added while it shuts us down */
1529   gst_element_foreach_sink_pad (GST_ELEMENT_CAST (agg),
1530       gst_aggregator_stop_pad, NULL);
1531
1532   klass = GST_AGGREGATOR_GET_CLASS (agg);
1533
1534   if (klass->stop)
1535     result = klass->stop (agg);
1536   else
1537     result = TRUE;
1538
1539   agg->priv->has_peer_latency = FALSE;
1540   agg->priv->peer_latency_live = FALSE;
1541   agg->priv->peer_latency_min = agg->priv->peer_latency_max = 0;
1542
1543   if (agg->priv->tags)
1544     gst_tag_list_unref (agg->priv->tags);
1545   agg->priv->tags = NULL;
1546
1547   gst_aggregator_set_allocation (agg, NULL, NULL, NULL, NULL);
1548
1549   return result;
1550 }
1551
1552 /* GstElement vmethods implementations */
1553 static GstStateChangeReturn
1554 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
1555 {
1556   GstStateChangeReturn ret;
1557   GstAggregator *self = GST_AGGREGATOR (element);
1558
1559   switch (transition) {
1560     case GST_STATE_CHANGE_READY_TO_PAUSED:
1561       if (!gst_aggregator_start (self))
1562         goto error_start;
1563       break;
1564     default:
1565       break;
1566   }
1567
1568   if ((ret =
1569           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
1570               transition)) == GST_STATE_CHANGE_FAILURE)
1571     goto failure;
1572
1573
1574   switch (transition) {
1575     case GST_STATE_CHANGE_PAUSED_TO_READY:
1576       if (!gst_aggregator_stop (self)) {
1577         /* What to do in this case? Error out? */
1578         GST_ERROR_OBJECT (self, "Subclass failed to stop.");
1579       }
1580       break;
1581     default:
1582       break;
1583   }
1584
1585   return ret;
1586
1587 /* ERRORS */
1588 failure:
1589   {
1590     GST_ERROR_OBJECT (element, "parent failed state change");
1591     return ret;
1592   }
1593 error_start:
1594   {
1595     GST_ERROR_OBJECT (element, "Subclass failed to start");
1596     return GST_STATE_CHANGE_FAILURE;
1597   }
1598 }
1599
1600 static void
1601 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
1602 {
1603   GstAggregator *self = GST_AGGREGATOR (element);
1604   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1605
1606   GST_INFO_OBJECT (pad, "Removing pad");
1607
1608   SRC_LOCK (self);
1609   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
1610   gst_element_remove_pad (element, pad);
1611
1612   self->priv->has_peer_latency = FALSE;
1613   SRC_BROADCAST (self);
1614   SRC_UNLOCK (self);
1615 }
1616
1617 static GstAggregatorPad *
1618 gst_aggregator_default_create_new_pad (GstAggregator * self,
1619     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1620 {
1621   GstAggregatorPad *agg_pad;
1622   GstAggregatorPrivate *priv = self->priv;
1623   gint serial = 0;
1624   gchar *name = NULL;
1625   GType pad_type =
1626       GST_PAD_TEMPLATE_GTYPE (templ) ==
1627       G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ);
1628
1629   if (templ->direction != GST_PAD_SINK)
1630     goto not_sink;
1631
1632   if (templ->presence != GST_PAD_REQUEST)
1633     goto not_request;
1634
1635   GST_OBJECT_LOCK (self);
1636   if (req_name == NULL || strlen (req_name) < 6
1637       || !g_str_has_prefix (req_name, "sink_")) {
1638     /* no name given when requesting the pad, use next available int */
1639     serial = ++priv->max_padserial;
1640   } else {
1641     /* parse serial number from requested padname */
1642     serial = g_ascii_strtoull (&req_name[5], NULL, 10);
1643     if (serial > priv->max_padserial)
1644       priv->max_padserial = serial;
1645   }
1646
1647   name = g_strdup_printf ("sink_%u", serial);
1648   agg_pad = g_object_new (pad_type,
1649       "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
1650   g_free (name);
1651
1652   GST_OBJECT_UNLOCK (self);
1653
1654   return agg_pad;
1655
1656   /* errors */
1657 not_sink:
1658   {
1659     GST_WARNING_OBJECT (self, "request new pad that is not a SINK pad");
1660     return NULL;
1661   }
1662 not_request:
1663   {
1664     GST_WARNING_OBJECT (self, "request new pad that is not a REQUEST pad");
1665     return NULL;
1666   }
1667 }
1668
1669 static GstPad *
1670 gst_aggregator_request_new_pad (GstElement * element,
1671     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1672 {
1673   GstAggregator *self;
1674   GstAggregatorPad *agg_pad;
1675   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (element);
1676   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
1677
1678   self = GST_AGGREGATOR (element);
1679
1680   agg_pad = klass->create_new_pad (self, templ, req_name, caps);
1681   if (!agg_pad) {
1682     GST_ERROR_OBJECT (element, "Couldn't create new pad");
1683     return NULL;
1684   }
1685
1686   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
1687
1688   if (priv->running)
1689     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
1690
1691   /* add the pad to the element */
1692   gst_element_add_pad (element, GST_PAD (agg_pad));
1693
1694   return GST_PAD (agg_pad);
1695 }
1696
1697 /* Must be called with SRC_LOCK held */
1698
1699 static gboolean
1700 gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
1701 {
1702   gboolean query_ret, live;
1703   GstClockTime our_latency, min, max;
1704
1705   query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1706
1707   if (!query_ret) {
1708     GST_WARNING_OBJECT (self, "Latency query failed");
1709     return FALSE;
1710   }
1711
1712   gst_query_parse_latency (query, &live, &min, &max);
1713
1714   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
1715     GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
1716         ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
1717     return FALSE;
1718   }
1719
1720   if (min > max && GST_CLOCK_TIME_IS_VALID (max)) {
1721     GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL),
1722         ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
1723             GST_TIME_FORMAT ". Add queues or other buffering elements.",
1724             GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
1725     return FALSE;
1726   }
1727
1728   our_latency = self->priv->latency;
1729
1730   self->priv->peer_latency_live = live;
1731   self->priv->peer_latency_min = min;
1732   self->priv->peer_latency_max = max;
1733   self->priv->has_peer_latency = TRUE;
1734
1735   /* add our own */
1736   min += our_latency;
1737   min += self->priv->sub_latency_min;
1738   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1739       && GST_CLOCK_TIME_IS_VALID (max))
1740     max += self->priv->sub_latency_max + our_latency;
1741   else
1742     max = GST_CLOCK_TIME_NONE;
1743
1744   SRC_BROADCAST (self);
1745
1746   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
1747       " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
1748
1749   gst_query_set_latency (query, live, min, max);
1750
1751   return query_ret;
1752 }
1753
1754 /*
1755  * MUST be called with the src_lock held.
1756  *
1757  * See  gst_aggregator_get_latency() for doc
1758  */
1759 static GstClockTime
1760 gst_aggregator_get_latency_unlocked (GstAggregator * self)
1761 {
1762   GstClockTime latency;
1763
1764   g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0);
1765
1766   if (!self->priv->has_peer_latency) {
1767     GstQuery *query = gst_query_new_latency ();
1768     gboolean ret;
1769
1770     ret = gst_aggregator_query_latency_unlocked (self, query);
1771     gst_query_unref (query);
1772     if (!ret)
1773       return GST_CLOCK_TIME_NONE;
1774   }
1775
1776   if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
1777     return GST_CLOCK_TIME_NONE;
1778
1779   /* latency_min is never GST_CLOCK_TIME_NONE by construction */
1780   latency = self->priv->peer_latency_min;
1781
1782   /* add our own */
1783   latency += self->priv->latency;
1784   latency += self->priv->sub_latency_min;
1785
1786   return latency;
1787 }
1788
1789 /**
1790  * gst_aggregator_get_latency:
1791  * @self: a #GstAggregator
1792  *
1793  * Retrieves the latency values reported by @self in response to the latency
1794  * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element
1795  * will not wait for the clock.
1796  *
1797  * Typically only called by subclasses.
1798  *
1799  * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync
1800  */
1801 GstClockTime
1802 gst_aggregator_get_latency (GstAggregator * self)
1803 {
1804   GstClockTime ret;
1805
1806   SRC_LOCK (self);
1807   ret = gst_aggregator_get_latency_unlocked (self);
1808   SRC_UNLOCK (self);
1809
1810   return ret;
1811 }
1812
1813 static gboolean
1814 gst_aggregator_send_event (GstElement * element, GstEvent * event)
1815 {
1816   GstAggregator *self = GST_AGGREGATOR (element);
1817
1818   GST_STATE_LOCK (element);
1819   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
1820       GST_STATE (element) < GST_STATE_PAUSED) {
1821     gdouble rate;
1822     GstFormat fmt;
1823     GstSeekFlags flags;
1824     GstSeekType start_type, stop_type;
1825     gint64 start, stop;
1826
1827     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1828         &start, &stop_type, &stop);
1829
1830     GST_OBJECT_LOCK (self);
1831     gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1832         stop_type, stop, NULL);
1833     self->priv->seqnum = gst_event_get_seqnum (event);
1834     self->priv->first_buffer = FALSE;
1835     GST_OBJECT_UNLOCK (self);
1836
1837     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
1838   }
1839   GST_STATE_UNLOCK (element);
1840
1841
1842   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
1843       event);
1844 }
1845
1846 static gboolean
1847 gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
1848 {
1849   gboolean res = TRUE;
1850
1851   switch (GST_QUERY_TYPE (query)) {
1852     case GST_QUERY_SEEKING:
1853     {
1854       GstFormat format;
1855
1856       /* don't pass it along as some (file)sink might claim it does
1857        * whereas with a collectpads in between that will not likely work */
1858       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1859       gst_query_set_seeking (query, format, FALSE, 0, -1);
1860       res = TRUE;
1861
1862       break;
1863     }
1864     case GST_QUERY_LATENCY:
1865       SRC_LOCK (self);
1866       res = gst_aggregator_query_latency_unlocked (self, query);
1867       SRC_UNLOCK (self);
1868       break;
1869     default:
1870       return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1871   }
1872
1873   return res;
1874 }
1875
1876 static gboolean
1877 gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
1878 {
1879   EventData *evdata = user_data;
1880   gboolean ret = TRUE;
1881   GstPad *peer = gst_pad_get_peer (pad);
1882   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1883
1884   if (peer) {
1885     if (evdata->only_to_active_pads && aggpad->priv->first_buffer) {
1886       GST_DEBUG_OBJECT (pad, "not sending event to inactive pad");
1887       ret = TRUE;
1888     } else {
1889       ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
1890       GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
1891       gst_object_unref (peer);
1892     }
1893   }
1894
1895   if (ret == FALSE) {
1896     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
1897       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
1898
1899       GST_DEBUG_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
1900
1901       if (gst_pad_query (peer, seeking)) {
1902         gboolean seekable;
1903
1904         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
1905
1906         if (seekable == FALSE) {
1907           GST_INFO_OBJECT (pad,
1908               "Source not seekable, We failed but it does not matter!");
1909
1910           ret = TRUE;
1911         }
1912       } else {
1913         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
1914       }
1915
1916       gst_query_unref (seeking);
1917     }
1918
1919     if (evdata->flush) {
1920       PAD_LOCK (aggpad);
1921       aggpad->priv->pending_flush_start = FALSE;
1922       aggpad->priv->pending_flush_stop = FALSE;
1923       PAD_UNLOCK (aggpad);
1924     }
1925   } else {
1926     evdata->one_actually_seeked = TRUE;
1927   }
1928
1929   evdata->result &= ret;
1930
1931   /* Always send to all pads */
1932   return FALSE;
1933 }
1934
1935 static void
1936 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
1937     EventData * evdata)
1938 {
1939   evdata->result = TRUE;
1940   evdata->one_actually_seeked = FALSE;
1941
1942   /* We first need to set all pads as flushing in a first pass
1943    * as flush_start flush_stop is sometimes sent synchronously
1944    * while we send the seek event */
1945   if (evdata->flush) {
1946     GList *l;
1947
1948     GST_OBJECT_LOCK (self);
1949     for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
1950       GstAggregatorPad *pad = l->data;
1951
1952       PAD_LOCK (pad);
1953       pad->priv->pending_flush_start = TRUE;
1954       pad->priv->pending_flush_stop = FALSE;
1955       PAD_UNLOCK (pad);
1956     }
1957     GST_OBJECT_UNLOCK (self);
1958   }
1959
1960   gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, evdata);
1961
1962   gst_event_unref (evdata->event);
1963 }
1964
1965 static gboolean
1966 gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
1967 {
1968   gdouble rate;
1969   GstFormat fmt;
1970   GstSeekFlags flags;
1971   GstSeekType start_type, stop_type;
1972   gint64 start, stop;
1973   gboolean flush;
1974   EventData evdata = { 0, };
1975   GstAggregatorPrivate *priv = self->priv;
1976
1977   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1978       &start, &stop_type, &stop);
1979
1980   GST_INFO_OBJECT (self, "starting SEEK");
1981
1982   flush = flags & GST_SEEK_FLAG_FLUSH;
1983
1984   GST_OBJECT_LOCK (self);
1985   if (flush) {
1986     priv->pending_flush_start = TRUE;
1987     priv->flush_seeking = TRUE;
1988   }
1989
1990   gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1991       stop_type, stop, NULL);
1992
1993   /* Seeking sets a position */
1994   self->priv->first_buffer = FALSE;
1995   GST_OBJECT_UNLOCK (self);
1996
1997   /* forward the seek upstream */
1998   evdata.event = event;
1999   evdata.flush = flush;
2000   evdata.only_to_active_pads = FALSE;
2001   gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2002   event = NULL;
2003
2004   if (!evdata.result || !evdata.one_actually_seeked) {
2005     GST_OBJECT_LOCK (self);
2006     priv->flush_seeking = FALSE;
2007     priv->pending_flush_start = FALSE;
2008     GST_OBJECT_UNLOCK (self);
2009   }
2010
2011   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
2012
2013   return evdata.result;
2014 }
2015
2016 static gboolean
2017 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
2018 {
2019   EventData evdata = { 0, };
2020
2021   switch (GST_EVENT_TYPE (event)) {
2022     case GST_EVENT_SEEK:
2023       /* _do_seek() unrefs the event. */
2024       return gst_aggregator_do_seek (self, event);
2025     case GST_EVENT_NAVIGATION:
2026       /* navigation is rather pointless. */
2027       gst_event_unref (event);
2028       return FALSE;
2029     default:
2030       break;
2031   }
2032
2033   /* Don't forward QOS events to pads that had no active buffer yet. Otherwise
2034    * they will receive a QOS event that has earliest_time=0 (because we can't
2035    * have negative timestamps), and consider their buffer as too late */
2036   evdata.event = event;
2037   evdata.flush = FALSE;
2038   evdata.only_to_active_pads = GST_EVENT_TYPE (event) == GST_EVENT_QOS;
2039   gst_aggregator_forward_event_to_all_sinkpads (self, &evdata);
2040   return evdata.result;
2041 }
2042
2043 static gboolean
2044 gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent,
2045     GstEvent * event)
2046 {
2047   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2048
2049   return klass->src_event (GST_AGGREGATOR (parent), event);
2050 }
2051
2052 static gboolean
2053 gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent,
2054     GstQuery * query)
2055 {
2056   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2057
2058   return klass->src_query (GST_AGGREGATOR (parent), query);
2059 }
2060
2061 static gboolean
2062 gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
2063     GstObject * parent, GstPadMode mode, gboolean active)
2064 {
2065   GstAggregator *self = GST_AGGREGATOR (parent);
2066   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2067
2068   if (klass->src_activate) {
2069     if (klass->src_activate (self, mode, active) == FALSE) {
2070       return FALSE;
2071     }
2072   }
2073
2074   if (active == TRUE) {
2075     switch (mode) {
2076       case GST_PAD_MODE_PUSH:
2077       {
2078         GST_INFO_OBJECT (pad, "Activating pad!");
2079         gst_aggregator_start_srcpad_task (self);
2080         return TRUE;
2081       }
2082       default:
2083       {
2084         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
2085         return FALSE;
2086       }
2087     }
2088   }
2089
2090   /* deactivating */
2091   GST_INFO_OBJECT (self, "Deactivating srcpad");
2092   gst_aggregator_stop_srcpad_task (self, FALSE);
2093
2094   return TRUE;
2095 }
2096
2097 static gboolean
2098 gst_aggregator_default_sink_query (GstAggregator * self,
2099     GstAggregatorPad * aggpad, GstQuery * query)
2100 {
2101   GstPad *pad = GST_PAD (aggpad);
2102
2103   if (GST_QUERY_TYPE (query) == GST_QUERY_ALLOCATION) {
2104     GstQuery *decide_query = NULL;
2105     GstAggregatorClass *agg_class;
2106     gboolean ret;
2107
2108     GST_OBJECT_LOCK (self);
2109     PAD_LOCK (aggpad);
2110     if (G_UNLIKELY (!aggpad->priv->negotiated)) {
2111       GST_DEBUG_OBJECT (self,
2112           "not negotiated yet, can't answer ALLOCATION query");
2113       PAD_UNLOCK (aggpad);
2114       GST_OBJECT_UNLOCK (self);
2115
2116       return FALSE;
2117     }
2118
2119     if ((decide_query = self->priv->allocation_query))
2120       gst_query_ref (decide_query);
2121     PAD_UNLOCK (aggpad);
2122     GST_OBJECT_UNLOCK (self);
2123
2124     GST_DEBUG_OBJECT (self,
2125         "calling propose allocation with query %" GST_PTR_FORMAT, decide_query);
2126
2127     agg_class = GST_AGGREGATOR_GET_CLASS (self);
2128
2129     /* pass the query to the propose_allocation vmethod if any */
2130     if (agg_class->propose_allocation)
2131       ret = agg_class->propose_allocation (self, aggpad, decide_query, query);
2132     else
2133       ret = FALSE;
2134
2135     if (decide_query)
2136       gst_query_unref (decide_query);
2137
2138     GST_DEBUG_OBJECT (self, "ALLOCATION ret %d, %" GST_PTR_FORMAT, ret, query);
2139     return ret;
2140   }
2141
2142   return gst_pad_query_default (pad, GST_OBJECT (self), query);
2143 }
2144
2145 static void
2146 gst_aggregator_finalize (GObject * object)
2147 {
2148   GstAggregator *self = (GstAggregator *) object;
2149
2150   g_mutex_clear (&self->priv->src_lock);
2151   g_cond_clear (&self->priv->src_cond);
2152
2153   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
2154 }
2155
2156 /*
2157  * gst_aggregator_set_latency_property:
2158  * @agg: a #GstAggregator
2159  * @latency: the new latency value (in nanoseconds).
2160  *
2161  * Sets the new latency value to @latency. This value is used to limit the
2162  * amount of time a pad waits for data to appear before considering the pad
2163  * as unresponsive.
2164  */
2165 static void
2166 gst_aggregator_set_latency_property (GstAggregator * self, GstClockTime latency)
2167 {
2168   gboolean changed;
2169
2170   g_return_if_fail (GST_IS_AGGREGATOR (self));
2171   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
2172
2173   SRC_LOCK (self);
2174   changed = (self->priv->latency != latency);
2175
2176   if (changed) {
2177     GList *item;
2178
2179     GST_OBJECT_LOCK (self);
2180     /* First lock all the pads */
2181     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2182       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2183       PAD_LOCK (aggpad);
2184     }
2185
2186     self->priv->latency = latency;
2187
2188     SRC_BROADCAST (self);
2189
2190     /* Now wake up the pads */
2191     for (item = GST_ELEMENT_CAST (self)->sinkpads; item; item = item->next) {
2192       GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
2193       PAD_BROADCAST_EVENT (aggpad);
2194       PAD_UNLOCK (aggpad);
2195     }
2196     GST_OBJECT_UNLOCK (self);
2197   }
2198
2199   SRC_UNLOCK (self);
2200
2201   if (changed)
2202     gst_element_post_message (GST_ELEMENT_CAST (self),
2203         gst_message_new_latency (GST_OBJECT_CAST (self)));
2204 }
2205
2206 /*
2207  * gst_aggregator_get_latency_property:
2208  * @agg: a #GstAggregator
2209  *
2210  * Gets the latency value. See gst_aggregator_set_latency for
2211  * more details.
2212  *
2213  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad
2214  * before a pad is deemed unresponsive. A value of -1 means an
2215  * unlimited time.
2216  */
2217 static GstClockTime
2218 gst_aggregator_get_latency_property (GstAggregator * agg)
2219 {
2220   GstClockTime res;
2221
2222   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), GST_CLOCK_TIME_NONE);
2223
2224   GST_OBJECT_LOCK (agg);
2225   res = agg->priv->latency;
2226   GST_OBJECT_UNLOCK (agg);
2227
2228   return res;
2229 }
2230
2231 static void
2232 gst_aggregator_set_property (GObject * object, guint prop_id,
2233     const GValue * value, GParamSpec * pspec)
2234 {
2235   GstAggregator *agg = GST_AGGREGATOR (object);
2236
2237   switch (prop_id) {
2238     case PROP_LATENCY:
2239       gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value));
2240       break;
2241     case PROP_START_TIME_SELECTION:
2242       agg->priv->start_time_selection = g_value_get_enum (value);
2243       break;
2244     case PROP_START_TIME:
2245       agg->priv->start_time = g_value_get_uint64 (value);
2246       break;
2247     default:
2248       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2249       break;
2250   }
2251 }
2252
2253 static void
2254 gst_aggregator_get_property (GObject * object, guint prop_id,
2255     GValue * value, GParamSpec * pspec)
2256 {
2257   GstAggregator *agg = GST_AGGREGATOR (object);
2258
2259   switch (prop_id) {
2260     case PROP_LATENCY:
2261       g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg));
2262       break;
2263     case PROP_START_TIME_SELECTION:
2264       g_value_set_enum (value, agg->priv->start_time_selection);
2265       break;
2266     case PROP_START_TIME:
2267       g_value_set_uint64 (value, agg->priv->start_time);
2268       break;
2269     default:
2270       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2271       break;
2272   }
2273 }
2274
2275 /* GObject vmethods implementations */
2276 static void
2277 gst_aggregator_class_init (GstAggregatorClass * klass)
2278 {
2279   GObjectClass *gobject_class = (GObjectClass *) klass;
2280   GstElementClass *gstelement_class = (GstElementClass *) klass;
2281
2282   aggregator_parent_class = g_type_class_peek_parent (klass);
2283   g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
2284
2285   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
2286       GST_DEBUG_FG_MAGENTA, "GstAggregator");
2287
2288   klass->finish_buffer = gst_aggregator_default_finish_buffer;
2289
2290   klass->sink_event = gst_aggregator_default_sink_event;
2291   klass->sink_query = gst_aggregator_default_sink_query;
2292
2293   klass->src_event = gst_aggregator_default_src_event;
2294   klass->src_query = gst_aggregator_default_src_query;
2295
2296   klass->create_new_pad = gst_aggregator_default_create_new_pad;
2297   klass->update_src_caps = gst_aggregator_default_update_src_caps;
2298   klass->fixate_src_caps = gst_aggregator_default_fixate_src_caps;
2299   klass->negotiated_src_caps = gst_aggregator_default_negotiated_src_caps;
2300
2301   gstelement_class->request_new_pad =
2302       GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
2303   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
2304   gstelement_class->release_pad =
2305       GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
2306   gstelement_class->change_state =
2307       GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
2308
2309   gobject_class->set_property = gst_aggregator_set_property;
2310   gobject_class->get_property = gst_aggregator_get_property;
2311   gobject_class->finalize = gst_aggregator_finalize;
2312
2313   g_object_class_install_property (gobject_class, PROP_LATENCY,
2314       g_param_spec_uint64 ("latency", "Buffer latency",
2315           "Additional latency in live mode to allow upstream "
2316           "to take longer to produce buffers for the current "
2317           "position (in nanoseconds)", 0, G_MAXUINT64,
2318           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2319
2320   g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION,
2321       g_param_spec_enum ("start-time-selection", "Start Time Selection",
2322           "Decides which start time is output",
2323           gst_aggregator_start_time_selection_get_type (),
2324           DEFAULT_START_TIME_SELECTION,
2325           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2326
2327   g_object_class_install_property (gobject_class, PROP_START_TIME,
2328       g_param_spec_uint64 ("start-time", "Start Time",
2329           "Start time to use if start-time-selection=set", 0,
2330           G_MAXUINT64,
2331           DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
2332 }
2333
2334 static void
2335 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
2336 {
2337   GstPadTemplate *pad_template;
2338   GstAggregatorPrivate *priv;
2339
2340   g_return_if_fail (klass->aggregate != NULL);
2341
2342   self->priv =
2343       G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
2344       GstAggregatorPrivate);
2345
2346   priv = self->priv;
2347
2348   pad_template =
2349       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
2350   g_return_if_fail (pad_template != NULL);
2351
2352   priv->max_padserial = -1;
2353   priv->tags_changed = FALSE;
2354
2355   self->priv->peer_latency_live = FALSE;
2356   self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
2357   self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
2358   self->priv->has_peer_latency = FALSE;
2359   gst_aggregator_reset_flow_values (self);
2360
2361   self->srcpad = gst_pad_new_from_template (pad_template, "src");
2362
2363   gst_pad_set_event_function (self->srcpad,
2364       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
2365   gst_pad_set_query_function (self->srcpad,
2366       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func));
2367   gst_pad_set_activatemode_function (self->srcpad,
2368       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func));
2369
2370   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
2371
2372   self->priv->latency = DEFAULT_LATENCY;
2373   self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION;
2374   self->priv->start_time = DEFAULT_START_TIME;
2375
2376   g_mutex_init (&self->priv->src_lock);
2377   g_cond_init (&self->priv->src_cond);
2378 }
2379
2380 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
2381  * method to get to the padtemplates */
2382 GType
2383 gst_aggregator_get_type (void)
2384 {
2385   static volatile gsize type = 0;
2386
2387   if (g_once_init_enter (&type)) {
2388     GType _type;
2389     static const GTypeInfo info = {
2390       sizeof (GstAggregatorClass),
2391       NULL,
2392       NULL,
2393       (GClassInitFunc) gst_aggregator_class_init,
2394       NULL,
2395       NULL,
2396       sizeof (GstAggregator),
2397       0,
2398       (GInstanceInitFunc) gst_aggregator_init,
2399     };
2400
2401     _type = g_type_register_static (GST_TYPE_ELEMENT,
2402         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
2403     g_once_init_leave (&type, _type);
2404   }
2405   return type;
2406 }
2407
2408 /* Must be called with SRC lock and PAD lock held */
2409 static gboolean
2410 gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
2411 {
2412   /* Empty queue always has space */
2413   if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL)
2414     return TRUE;
2415
2416   /* We also want at least two buffers, one is being processed and one is ready
2417    * for the next iteration when we operate in live mode. */
2418   if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2)
2419     return TRUE;
2420
2421   /* zero latency, if there is a buffer, it's full */
2422   if (self->priv->latency == 0)
2423     return FALSE;
2424
2425   /* Allow no more buffers than the latency */
2426   return (aggpad->priv->time_level <= self->priv->latency);
2427 }
2428
2429 /* Must be called with the PAD_LOCK held */
2430 static void
2431 apply_buffer (GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
2432 {
2433   GstClockTime timestamp;
2434
2435   if (GST_BUFFER_DTS_IS_VALID (buffer))
2436     timestamp = GST_BUFFER_DTS (buffer);
2437   else
2438     timestamp = GST_BUFFER_PTS (buffer);
2439
2440   if (timestamp == GST_CLOCK_TIME_NONE) {
2441     if (head)
2442       timestamp = aggpad->priv->head_position;
2443     else
2444       timestamp = aggpad->priv->tail_position;
2445   }
2446
2447   /* add duration */
2448   if (GST_BUFFER_DURATION_IS_VALID (buffer))
2449     timestamp += GST_BUFFER_DURATION (buffer);
2450
2451   if (head)
2452     aggpad->priv->head_position = timestamp;
2453   else
2454     aggpad->priv->tail_position = timestamp;
2455
2456   update_time_level (aggpad, head);
2457 }
2458
2459 /*
2460  * Can be called either from the sinkpad's chain function or from the srcpad's
2461  * thread in the case of a buffer synthetized from a GAP event.
2462  * Because of this second case, FLUSH_LOCK can't be used here.
2463  */
2464
2465 static GstFlowReturn
2466 gst_aggregator_pad_chain_internal (GstAggregator * self,
2467     GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
2468 {
2469   GstFlowReturn flow_return;
2470   GstClockTime buf_pts;
2471
2472   PAD_LOCK (aggpad);
2473   flow_return = aggpad->priv->flow_return;
2474   if (flow_return != GST_FLOW_OK)
2475     goto flushing;
2476
2477   PAD_UNLOCK (aggpad);
2478
2479   buf_pts = GST_BUFFER_PTS (buffer);
2480
2481   for (;;) {
2482     SRC_LOCK (self);
2483     GST_OBJECT_LOCK (self);
2484     PAD_LOCK (aggpad);
2485
2486     if (aggpad->priv->first_buffer) {
2487       self->priv->has_peer_latency = FALSE;
2488       aggpad->priv->first_buffer = FALSE;
2489     }
2490
2491     if ((gst_aggregator_pad_has_space (self, aggpad) || !head)
2492         && aggpad->priv->flow_return == GST_FLOW_OK) {
2493       if (head)
2494         g_queue_push_head (&aggpad->priv->data, buffer);
2495       else
2496         g_queue_push_tail (&aggpad->priv->data, buffer);
2497       apply_buffer (aggpad, buffer, head);
2498       aggpad->priv->num_buffers++;
2499       buffer = NULL;
2500       SRC_BROADCAST (self);
2501       break;
2502     }
2503
2504     flow_return = aggpad->priv->flow_return;
2505     if (flow_return != GST_FLOW_OK) {
2506       GST_OBJECT_UNLOCK (self);
2507       SRC_UNLOCK (self);
2508       goto flushing;
2509     }
2510     GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2511     GST_OBJECT_UNLOCK (self);
2512     SRC_UNLOCK (self);
2513     PAD_WAIT_EVENT (aggpad);
2514
2515     PAD_UNLOCK (aggpad);
2516   }
2517
2518   if (self->priv->first_buffer) {
2519     GstClockTime start_time;
2520
2521     switch (self->priv->start_time_selection) {
2522       case GST_AGGREGATOR_START_TIME_SELECTION_ZERO:
2523       default:
2524         start_time = 0;
2525         break;
2526       case GST_AGGREGATOR_START_TIME_SELECTION_FIRST:
2527         GST_OBJECT_LOCK (aggpad);
2528         if (aggpad->priv->head_segment.format == GST_FORMAT_TIME) {
2529           start_time = buf_pts;
2530           if (start_time != -1) {
2531             start_time = MAX (start_time, aggpad->priv->head_segment.start);
2532             start_time =
2533                 gst_segment_to_running_time (&aggpad->priv->head_segment,
2534                 GST_FORMAT_TIME, start_time);
2535           }
2536         } else {
2537           start_time = 0;
2538           GST_WARNING_OBJECT (aggpad,
2539               "Ignoring request of selecting the first start time "
2540               "as the segment is a %s segment instead of a time segment",
2541               gst_format_get_name (aggpad->segment.format));
2542         }
2543         GST_OBJECT_UNLOCK (aggpad);
2544         break;
2545       case GST_AGGREGATOR_START_TIME_SELECTION_SET:
2546         start_time = self->priv->start_time;
2547         if (start_time == -1)
2548           start_time = 0;
2549         break;
2550     }
2551
2552     if (start_time != -1) {
2553       if (self->segment.position == -1)
2554         self->segment.position = start_time;
2555       else
2556         self->segment.position = MIN (start_time, self->segment.position);
2557
2558       GST_DEBUG_OBJECT (self, "Selecting start time %" GST_TIME_FORMAT,
2559           GST_TIME_ARGS (start_time));
2560     }
2561   }
2562
2563   PAD_UNLOCK (aggpad);
2564   GST_OBJECT_UNLOCK (self);
2565   SRC_UNLOCK (self);
2566
2567   GST_DEBUG_OBJECT (aggpad, "Done chaining");
2568
2569   return flow_return;
2570
2571 flushing:
2572   PAD_UNLOCK (aggpad);
2573
2574   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
2575       gst_flow_get_name (flow_return));
2576   if (buffer)
2577     gst_buffer_unref (buffer);
2578
2579   return flow_return;
2580 }
2581
2582 static GstFlowReturn
2583 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
2584 {
2585   GstFlowReturn ret;
2586   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2587
2588   PAD_FLUSH_LOCK (aggpad);
2589
2590   ret = gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
2591       aggpad, buffer, TRUE);
2592
2593   PAD_FLUSH_UNLOCK (aggpad);
2594
2595   return ret;
2596 }
2597
2598 static gboolean
2599 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
2600     GstQuery * query)
2601 {
2602   GstAggregator *self = GST_AGGREGATOR (parent);
2603   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2604
2605   if (GST_QUERY_IS_SERIALIZED (query)) {
2606     GstStructure *s;
2607     gboolean ret = FALSE;
2608
2609     SRC_LOCK (self);
2610     PAD_LOCK (aggpad);
2611
2612     if (aggpad->priv->flow_return != GST_FLOW_OK) {
2613       SRC_UNLOCK (self);
2614       goto flushing;
2615     }
2616
2617     g_queue_push_head (&aggpad->priv->data, query);
2618     SRC_BROADCAST (self);
2619     SRC_UNLOCK (self);
2620
2621     while (!gst_aggregator_pad_queue_is_empty (aggpad)
2622         && aggpad->priv->flow_return == GST_FLOW_OK) {
2623       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
2624       PAD_WAIT_EVENT (aggpad);
2625     }
2626
2627     s = gst_query_writable_structure (query);
2628     if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret))
2629       gst_structure_remove_field (s, "gst-aggregator-retval");
2630     else
2631       g_queue_remove (&aggpad->priv->data, query);
2632
2633     if (aggpad->priv->flow_return != GST_FLOW_OK)
2634       goto flushing;
2635
2636     PAD_UNLOCK (aggpad);
2637
2638     return ret;
2639   } else {
2640     GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2641
2642     return klass->sink_query (self, aggpad, query);
2643   }
2644
2645 flushing:
2646   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
2647       gst_flow_get_name (aggpad->priv->flow_return));
2648   PAD_UNLOCK (aggpad);
2649
2650   return FALSE;
2651 }
2652
2653 /* Queue serialized events and let the others go through directly.
2654  * The queued events with be handled from the src-pad task in
2655  * gst_aggregator_do_events_and_queries().
2656  */
2657 static GstFlowReturn
2658 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
2659     GstEvent * event)
2660 {
2661   GstFlowReturn ret = GST_FLOW_OK;
2662   GstAggregator *self = GST_AGGREGATOR (parent);
2663   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2664
2665   if (GST_EVENT_IS_SERIALIZED (event)
2666       && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
2667     SRC_LOCK (self);
2668     PAD_LOCK (aggpad);
2669
2670     if (aggpad->priv->flow_return != GST_FLOW_OK)
2671       goto flushing;
2672
2673     if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
2674       GST_OBJECT_LOCK (aggpad);
2675       gst_event_copy_segment (event, &aggpad->priv->head_segment);
2676       aggpad->priv->head_position = aggpad->priv->head_segment.position;
2677       update_time_level (aggpad, TRUE);
2678       GST_OBJECT_UNLOCK (aggpad);
2679     }
2680
2681     GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event);
2682     g_queue_push_head (&aggpad->priv->data, event);
2683     SRC_BROADCAST (self);
2684     PAD_UNLOCK (aggpad);
2685     SRC_UNLOCK (self);
2686   } else {
2687     GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
2688
2689     if (!klass->sink_event (self, aggpad, event)) {
2690       /* Copied from GstPad to convert boolean to a GstFlowReturn in
2691        * the event handling func */
2692       ret = GST_FLOW_ERROR;
2693     }
2694   }
2695
2696   return ret;
2697
2698 flushing:
2699   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
2700       gst_flow_get_name (aggpad->priv->flow_return));
2701   PAD_UNLOCK (aggpad);
2702   SRC_UNLOCK (self);
2703   if (GST_EVENT_IS_STICKY (event))
2704     gst_pad_store_sticky_event (pad, event);
2705   gst_event_unref (event);
2706
2707   return aggpad->priv->flow_return;
2708 }
2709
2710 static gboolean
2711 gst_aggregator_pad_activate_mode_func (GstPad * pad,
2712     GstObject * parent, GstPadMode mode, gboolean active)
2713 {
2714   GstAggregator *self = GST_AGGREGATOR (parent);
2715   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
2716
2717   if (active == FALSE) {
2718     SRC_LOCK (self);
2719     gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING, TRUE);
2720     SRC_BROADCAST (self);
2721     SRC_UNLOCK (self);
2722   } else {
2723     PAD_LOCK (aggpad);
2724     aggpad->priv->flow_return = GST_FLOW_OK;
2725     PAD_BROADCAST_EVENT (aggpad);
2726     PAD_UNLOCK (aggpad);
2727   }
2728
2729   return TRUE;
2730 }
2731
2732 /***********************************
2733  * GstAggregatorPad implementation  *
2734  ************************************/
2735 G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
2736
2737 static void
2738 gst_aggregator_pad_constructed (GObject * object)
2739 {
2740   GstPad *pad = GST_PAD (object);
2741
2742   if (GST_PAD_IS_SINK (pad)) {
2743     gst_pad_set_chain_function (pad,
2744         GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
2745     gst_pad_set_event_full_function_full (pad,
2746         GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
2747     gst_pad_set_query_function (pad,
2748         GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
2749     gst_pad_set_activatemode_function (pad,
2750         GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
2751   }
2752 }
2753
2754 static void
2755 gst_aggregator_pad_finalize (GObject * object)
2756 {
2757   GstAggregatorPad *pad = (GstAggregatorPad *) object;
2758
2759   g_cond_clear (&pad->priv->event_cond);
2760   g_mutex_clear (&pad->priv->flush_lock);
2761   g_mutex_clear (&pad->priv->lock);
2762
2763   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object);
2764 }
2765
2766 static void
2767 gst_aggregator_pad_dispose (GObject * object)
2768 {
2769   GstAggregatorPad *pad = (GstAggregatorPad *) object;
2770
2771   gst_aggregator_pad_set_flushing (pad, GST_FLOW_FLUSHING, TRUE);
2772
2773   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
2774 }
2775
2776 static void
2777 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
2778 {
2779   GObjectClass *gobject_class = (GObjectClass *) klass;
2780
2781   g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
2782
2783   gobject_class->constructed = gst_aggregator_pad_constructed;
2784   gobject_class->finalize = gst_aggregator_pad_finalize;
2785   gobject_class->dispose = gst_aggregator_pad_dispose;
2786 }
2787
2788 static void
2789 gst_aggregator_pad_init (GstAggregatorPad * pad)
2790 {
2791   pad->priv =
2792       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
2793       GstAggregatorPadPrivate);
2794
2795   g_queue_init (&pad->priv->data);
2796   g_cond_init (&pad->priv->event_cond);
2797
2798   g_mutex_init (&pad->priv->flush_lock);
2799   g_mutex_init (&pad->priv->lock);
2800
2801   gst_aggregator_pad_reset_unlocked (pad);
2802   pad->priv->negotiated = FALSE;
2803 }
2804
2805 /* Must be called with the PAD_LOCK held */
2806 static void
2807 gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad)
2808 {
2809   pad->priv->num_buffers--;
2810   GST_TRACE_OBJECT (pad, "Consuming buffer");
2811   PAD_BROADCAST_EVENT (pad);
2812 }
2813
2814 /* Must be called with the PAD_LOCK held */
2815 static void
2816 gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
2817 {
2818   GstAggregator *self = NULL;
2819   GstAggregatorClass *aggclass = NULL;
2820   GstBuffer *buffer = NULL;
2821
2822   while (pad->priv->clipped_buffer == NULL &&
2823       GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) {
2824     buffer = g_queue_pop_tail (&pad->priv->data);
2825
2826     apply_buffer (pad, buffer, FALSE);
2827
2828     /* We only take the parent here so that it's not taken if the buffer is
2829      * already clipped or if the queue is empty.
2830      */
2831     if (self == NULL) {
2832       self = GST_AGGREGATOR (gst_pad_get_parent_element (GST_PAD (pad)));
2833       if (self == NULL) {
2834         gst_buffer_unref (buffer);
2835         return;
2836       }
2837
2838       aggclass = GST_AGGREGATOR_GET_CLASS (self);
2839     }
2840
2841     if (aggclass->clip) {
2842       GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer);
2843
2844       buffer = aggclass->clip (self, pad, buffer);
2845
2846       if (buffer == NULL) {
2847         gst_aggregator_pad_buffer_consumed (pad);
2848         GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
2849       }
2850     }
2851
2852     pad->priv->clipped_buffer = buffer;
2853   }
2854
2855   if (self)
2856     gst_object_unref (self);
2857 }
2858
2859 /**
2860  * gst_aggregator_pad_pop_buffer:
2861  * @pad: the pad to get buffer from
2862  *
2863  * Steal the ref to the buffer currently queued in @pad.
2864  *
2865  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
2866  *   queued. You should unref the buffer after usage.
2867  */
2868 GstBuffer *
2869 gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
2870 {
2871   GstBuffer *buffer;
2872
2873   PAD_LOCK (pad);
2874
2875   gst_aggregator_pad_clip_buffer_unlocked (pad);
2876
2877   buffer = pad->priv->clipped_buffer;
2878
2879   if (buffer) {
2880     pad->priv->clipped_buffer = NULL;
2881     gst_aggregator_pad_buffer_consumed (pad);
2882     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
2883   }
2884
2885   PAD_UNLOCK (pad);
2886
2887   return buffer;
2888 }
2889
2890 /**
2891  * gst_aggregator_pad_drop_buffer:
2892  * @pad: the pad where to drop any pending buffer
2893  *
2894  * Drop the buffer currently queued in @pad.
2895  *
2896  * Returns: TRUE if there was a buffer queued in @pad, or FALSE if not.
2897  */
2898 gboolean
2899 gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
2900 {
2901   GstBuffer *buf;
2902
2903   buf = gst_aggregator_pad_pop_buffer (pad);
2904
2905   if (buf == NULL)
2906     return FALSE;
2907
2908   gst_buffer_unref (buf);
2909   return TRUE;
2910 }
2911
2912 /**
2913  * gst_aggregator_pad_peek_buffer:
2914  * @pad: the pad to get buffer from
2915  *
2916  * Returns: (transfer full): A reference to the buffer in @pad or
2917  * NULL if no buffer was queued. You should unref the buffer after
2918  * usage.
2919  */
2920 GstBuffer *
2921 gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
2922 {
2923   GstBuffer *buffer;
2924
2925   PAD_LOCK (pad);
2926
2927   gst_aggregator_pad_clip_buffer_unlocked (pad);
2928
2929   if (pad->priv->clipped_buffer) {
2930     buffer = gst_buffer_ref (pad->priv->clipped_buffer);
2931   } else {
2932     buffer = NULL;
2933   }
2934   PAD_UNLOCK (pad);
2935
2936   return buffer;
2937 }
2938
2939 /**
2940  * gst_aggregator_pad_is_eos:
2941  * @pad: an aggregator pad
2942  *
2943  * Returns: %TRUE if the pad is EOS, otherwise %FALSE.
2944  */
2945 gboolean
2946 gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
2947 {
2948   gboolean is_eos;
2949
2950   PAD_LOCK (pad);
2951   is_eos = pad->priv->eos;
2952   PAD_UNLOCK (pad);
2953
2954   return is_eos;
2955 }
2956
2957 #if 0
2958 /*
2959  * gst_aggregator_merge_tags:
2960  * @self: a #GstAggregator
2961  * @tags: a #GstTagList to merge
2962  * @mode: the #GstTagMergeMode to use
2963  *
2964  * Adds tags to so-called pending tags, which will be processed
2965  * before pushing out data downstream.
2966  *
2967  * Note that this is provided for convenience, and the subclass is
2968  * not required to use this and can still do tag handling on its own.
2969  *
2970  * MT safe.
2971  */
2972 void
2973 gst_aggregator_merge_tags (GstAggregator * self,
2974     const GstTagList * tags, GstTagMergeMode mode)
2975 {
2976   GstTagList *otags;
2977
2978   g_return_if_fail (GST_IS_AGGREGATOR (self));
2979   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
2980
2981   /* FIXME Check if we can use OBJECT lock here! */
2982   GST_OBJECT_LOCK (self);
2983   if (tags)
2984     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
2985   otags = self->priv->tags;
2986   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
2987   if (otags)
2988     gst_tag_list_unref (otags);
2989   self->priv->tags_changed = TRUE;
2990   GST_OBJECT_UNLOCK (self);
2991 }
2992 #endif
2993
2994 /**
2995  * gst_aggregator_set_latency:
2996  * @self: a #GstAggregator
2997  * @min_latency: minimum latency
2998  * @max_latency: maximum latency
2999  *
3000  * Lets #GstAggregator sub-classes tell the baseclass what their internal
3001  * latency is. Will also post a LATENCY message on the bus so the pipeline
3002  * can reconfigure its global latency.
3003  */
3004 void
3005 gst_aggregator_set_latency (GstAggregator * self,
3006     GstClockTime min_latency, GstClockTime max_latency)
3007 {
3008   gboolean changed = FALSE;
3009
3010   g_return_if_fail (GST_IS_AGGREGATOR (self));
3011   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
3012   g_return_if_fail (max_latency >= min_latency);
3013
3014   SRC_LOCK (self);
3015   if (self->priv->sub_latency_min != min_latency) {
3016     self->priv->sub_latency_min = min_latency;
3017     changed = TRUE;
3018   }
3019   if (self->priv->sub_latency_max != max_latency) {
3020     self->priv->sub_latency_max = max_latency;
3021     changed = TRUE;
3022   }
3023
3024   if (changed)
3025     SRC_BROADCAST (self);
3026   SRC_UNLOCK (self);
3027
3028   if (changed) {
3029     gst_element_post_message (GST_ELEMENT_CAST (self),
3030         gst_message_new_latency (GST_OBJECT_CAST (self)));
3031   }
3032 }
3033
3034 /**
3035  * gst_aggregator_get_buffer_pool:
3036  * @self: a #GstAggregator
3037  *
3038  * Returns: (transfer full): the instance of the #GstBufferPool used
3039  * by @trans; free it after use it
3040  */
3041 GstBufferPool *
3042 gst_aggregator_get_buffer_pool (GstAggregator * self)
3043 {
3044   GstBufferPool *pool;
3045
3046   g_return_val_if_fail (GST_IS_AGGREGATOR (self), NULL);
3047
3048   GST_OBJECT_LOCK (self);
3049   pool = self->priv->pool;
3050   if (pool)
3051     gst_object_ref (pool);
3052   GST_OBJECT_UNLOCK (self);
3053
3054   return pool;
3055 }
3056
3057 /**
3058  * gst_aggregator_get_allocator:
3059  * @self: a #GstAggregator
3060  * @allocator: (out) (allow-none) (transfer full): the #GstAllocator
3061  * used
3062  * @params: (out) (allow-none) (transfer full): the
3063  * #GstAllocationParams of @allocator
3064  *
3065  * Lets #GstAggregator sub-classes get the memory @allocator
3066  * acquired by the base class and its @params.
3067  *
3068  * Unref the @allocator after use it.
3069  */
3070 void
3071 gst_aggregator_get_allocator (GstAggregator * self,
3072     GstAllocator ** allocator, GstAllocationParams * params)
3073 {
3074   g_return_if_fail (GST_IS_AGGREGATOR (self));
3075
3076   if (allocator)
3077     *allocator = self->priv->allocator ?
3078         gst_object_ref (self->priv->allocator) : NULL;
3079
3080   if (params)
3081     *params = self->priv->allocation_params;
3082 }