880aa6d9365b8c804ad105fb2fc52c47873ab605
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
1 /* GStreamer aggregator base class
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @short_description: manages a set of pads with the purpose of
25  * aggregating their buffers.
26  * @see_also: gstcollectpads for historical reasons.
27  *
28  * Manages a set of pads with the purpose of aggregating their buffers.
29  * Control is given to the subclass when all pads have data.
30  * <itemizedlist>
31  *  <listitem><para>
32  *    Base class for mixers and muxers. Subclasses should at least implement
33  *    the #GstAggregatorClass.aggregate() virtual method.
34  *  </para></listitem>
35  *  <listitem><para>
36  *    When data is queued on all pads, tha aggregate vmethod is called.
37  *  </para></listitem>
38  *  <listitem><para>
39  *    One can peek at the data on any given GstAggregatorPad with the
40  *    gst_aggregator_pad_get_buffer () method, and take ownership of it
41  *    with the gst_aggregator_pad_steal_buffer () method. When a buffer
42  *    has been taken with steal_buffer (), a new buffer can be queued
43  *    on that pad.
44  *  </para></listitem>
45  *  <listitem><para>
46  *    If the subclass wishes to push a buffer downstream in its aggregate
47  *    implementation, it should do so through the
48  *    gst_aggregator_finish_buffer () method. This method will take care
49  *    of sending and ordering mandatory events such as stream start, caps
50  *    and segment.
51  *  </para></listitem>
52  *  <listitem><para>
53  *    Same goes for EOS events, which should not be pushed directly by the
54  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
55  *    implementation.
56  *  </para></listitem>
57  * </itemizedlist>
58  */
59
60 #ifdef HAVE_CONFIG_H
61 #  include "config.h"
62 #endif
63
64 #include <string.h>             /* strlen */
65
66 #include "gstaggregator.h"
67
68
69 /*  Might become API */
70 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
71     const GstTagList * tags, GstTagMergeMode mode);
72 static void gst_aggregator_set_latency_property (GstAggregator * agg,
73     gint64 latency);
74 static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
75
76
77 /* Locking order, locks in this element must always be taken in this order
78  *
79  * standard sink pad stream lock -> GST_PAD_STREAM_LOCK (aggpad)
80  * Aggregator pad flush lock -> PAD_FLUSH_LOCK(aggpad)
81  * standard src pad stream lock -> GST_PAD_STREAM_LOCK (srcpad)
82  * Aggregator src lock -> SRC_LOCK(agg) w/ SRC_WAIT/BROADCAST
83  * standard element object lock -> GST_OBJECT_LOCK(agg)
84  * Aggregator pad lock -> PAD_LOCK (aggpad) w/ PAD_WAIT/BROADCAST_EVENT(aggpad)
85  * standard src pad object lock -> GST_OBJECT_LOCK(srcpad)
86  * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad)
87  */
88
89
90 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
91
92 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
93 #define GST_CAT_DEFAULT aggregator_debug
94
95 /* GstAggregatorPad definitions */
96 #define PAD_LOCK(pad)   G_STMT_START {                                  \
97   GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p",              \
98         g_thread_self());                                               \
99   g_mutex_lock(&pad->priv->lock);                                       \
100   GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p",                \
101         g_thread_self());                                               \
102   } G_STMT_END
103
104 #define PAD_UNLOCK(pad)  G_STMT_START {                                 \
105   GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p",           \
106       g_thread_self());                                                 \
107   g_mutex_unlock(&pad->priv->lock);                                     \
108   GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p",             \
109         g_thread_self());                                               \
110   } G_STMT_END
111
112
113 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
114   GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p",   \
115         g_thread_self());                                               \
116   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),           \
117       (&((GstAggregatorPad*)pad)->priv->lock));                         \
118   GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \
119         g_thread_self());                                               \
120   } G_STMT_END
121
122 #define PAD_BROADCAST_EVENT(pad) G_STMT_START {                        \
123   GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p",     \
124         g_thread_self());                                              \
125   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond));    \
126   } G_STMT_END
127
128
129 #define PAD_FLUSH_LOCK(pad)     G_STMT_START {                          \
130   GST_TRACE_OBJECT (pad, "Taking lock from thread %p",                  \
131         g_thread_self());                                               \
132   g_mutex_lock(&pad->priv->flush_lock);                                 \
133   GST_TRACE_OBJECT (pad, "Took lock from thread %p",                    \
134         g_thread_self());                                               \
135   } G_STMT_END
136
137 #define PAD_FLUSH_UNLOCK(pad)   G_STMT_START {                          \
138   GST_TRACE_OBJECT (pad, "Releasing lock from thread %p",               \
139         g_thread_self());                                               \
140   g_mutex_unlock(&pad->priv->flush_lock);                               \
141   GST_TRACE_OBJECT (pad, "Release lock from thread %p",                 \
142         g_thread_self());                                               \
143   } G_STMT_END
144
145 #define SRC_LOCK(self)   G_STMT_START {                             \
146   GST_TRACE_OBJECT (self, "Taking src lock from thread %p",         \
147       g_thread_self());                                             \
148   g_mutex_lock(&self->priv->src_lock);                              \
149   GST_TRACE_OBJECT (self, "Took src lock from thread %p",           \
150         g_thread_self());                                           \
151   } G_STMT_END
152
153 #define SRC_UNLOCK(self)  G_STMT_START {                            \
154   GST_TRACE_OBJECT (self, "Releasing src lock from thread %p",      \
155         g_thread_self());                                           \
156   g_mutex_unlock(&self->priv->src_lock);                            \
157   GST_TRACE_OBJECT (self, "Released src lock from thread %p",       \
158         g_thread_self());                                           \
159   } G_STMT_END
160
161 #define SRC_WAIT(self) G_STMT_START {                               \
162   GST_LOG_OBJECT (self, "Waiting for src on thread %p",             \
163         g_thread_self());                                           \
164   g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));    \
165   GST_LOG_OBJECT (self, "DONE Waiting for src on thread %p",        \
166         g_thread_self());                                           \
167   } G_STMT_END
168
169 #define SRC_BROADCAST(self) G_STMT_START {                          \
170     GST_LOG_OBJECT (self, "Signaling src from thread %p",           \
171         g_thread_self());                                           \
172     if (self->priv->aggregate_id)                                   \
173       gst_clock_id_unschedule (self->priv->aggregate_id);           \
174     g_cond_broadcast(&(self->priv->src_cond));                      \
175   } G_STMT_END
176
177 struct _GstAggregatorPadPrivate
178 {
179   /* Following fields are protected by the PAD_LOCK */
180   GstFlowReturn flow_return;
181   gboolean pending_flush_start;
182   gboolean pending_flush_stop;
183   gboolean pending_eos;
184
185   GstBuffer *buffer;
186   gboolean eos;
187
188   GMutex lock;
189   GCond event_cond;
190   /* This lock prevents a flush start processing happening while
191    * the chain function is also happening.
192    */
193   GMutex flush_lock;
194 };
195
196 static gboolean
197 gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
198 {
199   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
200
201   PAD_LOCK (aggpad);
202   aggpad->priv->pending_eos = FALSE;
203   aggpad->priv->eos = FALSE;
204   aggpad->priv->flow_return = GST_FLOW_OK;
205   PAD_UNLOCK (aggpad);
206
207   if (klass->flush)
208     return klass->flush (aggpad, agg);
209
210   return TRUE;
211 }
212
213 /*************************************
214  * GstAggregator implementation  *
215  *************************************/
216 static GstElementClass *aggregator_parent_class = NULL;
217
218 /* All members are protected by the object lock unless otherwise noted */
219
220 struct _GstAggregatorPrivate
221 {
222   gint padcount;
223
224   /* Our state is >= PAUSED */
225   gboolean running;             /* protected by src_lock */
226
227   gint seqnum;
228   gboolean send_stream_start;   /* protected by srcpad stream lock */
229   gboolean send_segment;
230   gboolean flush_seeking;
231   gboolean pending_flush_start;
232   gboolean send_eos;            /* protected by srcpad stream lock */
233
234   GstCaps *srccaps;             /* protected by the srcpad stream lock */
235
236   GstTagList *tags;
237   gboolean tags_changed;
238
239   gboolean peer_latency_live;   /* protected by src_lock */
240   GstClockTime peer_latency_min;        /* protected by src_lock */
241   GstClockTime peer_latency_max;        /* protected by src_lock */
242   gboolean has_peer_latency;
243
244   GstClockTime sub_latency_min; /* protected by src_lock */
245   GstClockTime sub_latency_max; /* protected by src_lock */
246
247   /* aggregate */
248   GstClockID aggregate_id;      /* protected by src_lock */
249   GMutex src_lock;
250   GCond src_cond;
251
252   /* properties */
253   gint64 latency;
254 };
255
256 typedef struct
257 {
258   GstEvent *event;
259   gboolean result;
260   gboolean flush;
261
262   gboolean one_actually_seeked;
263 } EventData;
264
265 #define DEFAULT_LATENCY        0
266
267 enum
268 {
269   PROP_0,
270   PROP_LATENCY,
271   PROP_LAST
272 };
273
274 /**
275  * gst_aggregator_iterate_sinkpads:
276  * @self: The #GstAggregator
277  * @func: (scope call): The function to call.
278  * @user_data: (closure): The data to pass to @func.
279  *
280  * Iterate the sinkpads of aggregator to call a function on them.
281  *
282  * This method guarantees that @func will be called only once for each
283  * sink pad.
284  */
285 gboolean
286 gst_aggregator_iterate_sinkpads (GstAggregator * self,
287     GstAggregatorPadForeachFunc func, gpointer user_data)
288 {
289   gboolean result = FALSE;
290   GstIterator *iter;
291   gboolean done = FALSE;
292   GValue item = { 0, };
293   GList *seen_pads = NULL;
294
295   iter = gst_element_iterate_sink_pads (GST_ELEMENT (self));
296
297   if (!iter)
298     goto no_iter;
299
300   while (!done) {
301     switch (gst_iterator_next (iter, &item)) {
302       case GST_ITERATOR_OK:
303       {
304         GstAggregatorPad *pad;
305
306         pad = g_value_get_object (&item);
307
308         /* if already pushed, skip. FIXME, find something faster to tag pads */
309         if (pad == NULL || g_list_find (seen_pads, pad)) {
310           g_value_reset (&item);
311           break;
312         }
313
314         GST_LOG_OBJECT (pad, "calling function %s on pad",
315             GST_DEBUG_FUNCPTR_NAME (func));
316
317         result = func (self, pad, user_data);
318
319         done = !result;
320
321         seen_pads = g_list_prepend (seen_pads, pad);
322
323         g_value_reset (&item);
324         break;
325       }
326       case GST_ITERATOR_RESYNC:
327         gst_iterator_resync (iter);
328         break;
329       case GST_ITERATOR_ERROR:
330         GST_ERROR_OBJECT (self,
331             "Could not iterate over internally linked pads");
332         done = TRUE;
333         break;
334       case GST_ITERATOR_DONE:
335         done = TRUE;
336         break;
337     }
338   }
339   g_value_unset (&item);
340   gst_iterator_free (iter);
341
342   if (seen_pads == NULL) {
343     GST_DEBUG_OBJECT (self, "No pad seen");
344     return FALSE;
345   }
346
347   g_list_free (seen_pads);
348
349 no_iter:
350   return result;
351 }
352
353 static gboolean
354 gst_aggregator_check_pads_ready (GstAggregator * self)
355 {
356   GstAggregatorPad *pad;
357   GList *l, *sinkpads;
358
359   GST_LOG_OBJECT (self, "checking pads");
360
361   GST_OBJECT_LOCK (self);
362
363   sinkpads = GST_ELEMENT_CAST (self)->sinkpads;
364   if (sinkpads == NULL)
365     goto no_sinkpads;
366
367   for (l = sinkpads; l != NULL; l = l->next) {
368     pad = l->data;
369
370     PAD_LOCK (pad);
371     if (pad->priv->buffer == NULL && !pad->priv->eos) {
372       PAD_UNLOCK (pad);
373       goto pad_not_ready;
374     }
375     PAD_UNLOCK (pad);
376
377   }
378
379   GST_OBJECT_UNLOCK (self);
380   GST_LOG_OBJECT (self, "pads are ready");
381   return TRUE;
382
383 no_sinkpads:
384   {
385     GST_LOG_OBJECT (self, "pads not ready: no sink pads");
386     GST_OBJECT_UNLOCK (self);
387     return FALSE;
388   }
389 pad_not_ready:
390   {
391     GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
392     GST_OBJECT_UNLOCK (self);
393     return FALSE;
394   }
395 }
396
397 static void
398 gst_aggregator_reset_flow_values (GstAggregator * self)
399 {
400   GST_OBJECT_LOCK (self);
401   self->priv->send_stream_start = TRUE;
402   self->priv->send_segment = TRUE;
403   gst_segment_init (&self->segment, GST_FORMAT_TIME);
404   GST_OBJECT_UNLOCK (self);
405 }
406
407 static inline void
408 gst_aggregator_push_mandatory_events (GstAggregator * self)
409 {
410   GstAggregatorPrivate *priv = self->priv;
411   GstEvent *segment = NULL;
412   GstEvent *tags = NULL;
413
414   if (self->priv->send_stream_start) {
415     gchar s_id[32];
416
417     GST_INFO_OBJECT (self, "pushing stream start");
418     /* stream-start (FIXME: create id based on input ids) */
419     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
420     if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) {
421       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
422     }
423     self->priv->send_stream_start = FALSE;
424   }
425
426   if (self->priv->srccaps) {
427
428     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
429         self->priv->srccaps);
430     if (!gst_pad_push_event (self->srcpad,
431             gst_event_new_caps (self->priv->srccaps))) {
432       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
433     }
434     gst_caps_unref (self->priv->srccaps);
435     self->priv->srccaps = NULL;
436   }
437
438   GST_OBJECT_LOCK (self);
439   if (self->priv->send_segment && !self->priv->flush_seeking) {
440     segment = gst_event_new_segment (&self->segment);
441
442     if (!self->priv->seqnum)
443       self->priv->seqnum = gst_event_get_seqnum (segment);
444     else
445       gst_event_set_seqnum (segment, self->priv->seqnum);
446     self->priv->send_segment = FALSE;
447
448     GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
449   }
450
451   if (priv->tags && priv->tags_changed && !self->priv->flush_seeking) {
452     tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
453     priv->tags_changed = FALSE;
454   }
455   GST_OBJECT_UNLOCK (self);
456
457   if (segment)
458     gst_pad_push_event (self->srcpad, segment);
459   if (tags)
460     gst_pad_push_event (self->srcpad, tags);
461
462 }
463
464 /**
465  * gst_aggregator_set_src_caps:
466  * @self: The #GstAggregator
467  * @caps: The #GstCaps to set on the src pad.
468  *
469  * Sets the caps to be used on the src pad.
470  */
471 void
472 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
473 {
474   GST_PAD_STREAM_LOCK (self->srcpad);
475   gst_caps_replace (&self->priv->srccaps, caps);
476   gst_aggregator_push_mandatory_events (self);
477   GST_PAD_STREAM_UNLOCK (self->srcpad);
478 }
479
480 /**
481  * gst_aggregator_finish_buffer:
482  * @self: The #GstAggregator
483  * @buffer: (transfer full): the #GstBuffer to push.
484  *
485  * This method will push the provided output buffer downstream. If needed,
486  * mandatory events such as stream-start, caps, and segment events will be
487  * sent before pushing the buffer.
488  */
489 GstFlowReturn
490 gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
491 {
492   gst_aggregator_push_mandatory_events (self);
493
494   GST_OBJECT_LOCK (self);
495   if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) {
496     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
497     GST_OBJECT_UNLOCK (self);
498     return gst_pad_push (self->srcpad, buffer);
499   } else {
500     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
501         self->priv->flush_seeking, gst_pad_is_active (self->srcpad));
502     GST_OBJECT_UNLOCK (self);
503     gst_buffer_unref (buffer);
504     return GST_FLOW_OK;
505   }
506 }
507
508 static void
509 gst_aggregator_push_eos (GstAggregator * self)
510 {
511   GstEvent *event;
512   gst_aggregator_push_mandatory_events (self);
513
514   event = gst_event_new_eos ();
515
516   GST_OBJECT_LOCK (self);
517   self->priv->send_eos = FALSE;
518   gst_event_set_seqnum (event, self->priv->seqnum);
519   GST_OBJECT_UNLOCK (self);
520
521   gst_pad_push_event (self->srcpad, event);
522 }
523
524 static GstClockTime
525 gst_aggregator_get_next_time (GstAggregator * self)
526 {
527   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
528
529   if (klass->get_next_time)
530     return klass->get_next_time (self);
531
532   return GST_CLOCK_TIME_NONE;
533 }
534
535 static gboolean
536 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
537 {
538   GstClockTime latency;
539   GstClockTime start;
540   gboolean res;
541
542   *timeout = FALSE;
543
544   SRC_LOCK (self);
545
546   latency = gst_aggregator_get_latency_unlocked (self);
547
548   if (gst_aggregator_check_pads_ready (self)) {
549     GST_DEBUG_OBJECT (self, "all pads have data");
550     SRC_UNLOCK (self);
551
552     return TRUE;
553   }
554
555   /* Before waiting, check if we're actually still running */
556   if (!self->priv->running || !self->priv->send_eos) {
557     SRC_UNLOCK (self);
558
559     return FALSE;
560   }
561
562   start = gst_aggregator_get_next_time (self);
563
564   if (!GST_CLOCK_TIME_IS_VALID (latency) ||
565       !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) ||
566       !GST_CLOCK_TIME_IS_VALID (start)) {
567     /* We wake up here when something happened, and below
568      * then check if we're ready now. If we return FALSE,
569      * we will be directly called again.
570      */
571     SRC_WAIT (self);
572   } else {
573     GstClockTime base_time, time;
574     GstClock *clock;
575     GstClockReturn status;
576     GstClockTimeDiff jitter;
577
578     GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
579         GST_TIME_ARGS (start));
580
581     GST_OBJECT_LOCK (self);
582     base_time = GST_ELEMENT_CAST (self)->base_time;
583     clock = GST_ELEMENT_CLOCK (self);
584     if (clock)
585       gst_object_ref (clock);
586     GST_OBJECT_UNLOCK (self);
587
588     time = base_time + start;
589     time += latency;
590
591     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
592         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
593         " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")",
594         GST_TIME_ARGS (time),
595         GST_TIME_ARGS (GST_ELEMENT_CAST (self)->base_time),
596         GST_TIME_ARGS (start), GST_TIME_ARGS (latency),
597         GST_TIME_ARGS (gst_clock_get_time (clock)));
598
599
600     self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
601     gst_object_unref (clock);
602     SRC_UNLOCK (self);
603
604     jitter = 0;
605     status = gst_clock_id_wait (self->priv->aggregate_id, &jitter);
606
607     SRC_LOCK (self);
608     if (self->priv->aggregate_id) {
609       gst_clock_id_unref (self->priv->aggregate_id);
610       self->priv->aggregate_id = NULL;
611     }
612
613     GST_DEBUG_OBJECT (self,
614         "clock returned %d (jitter: %s%" GST_TIME_FORMAT ")",
615         status, (jitter < 0 ? "-" : " "),
616         GST_TIME_ARGS ((jitter < 0 ? -jitter : jitter)));
617
618     /* we timed out */
619     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
620       SRC_UNLOCK (self);
621       *timeout = TRUE;
622       return TRUE;
623     }
624   }
625
626   res = gst_aggregator_check_pads_ready (self);
627   SRC_UNLOCK (self);
628
629   return res;
630 }
631
632 static void
633 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
634     GstFlowReturn flow_return)
635 {
636   PAD_LOCK (aggpad);
637   if (flow_return == GST_FLOW_NOT_LINKED)
638     aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
639   else
640     aggpad->priv->flow_return = flow_return;
641   gst_buffer_replace (&aggpad->priv->buffer, NULL);
642   PAD_BROADCAST_EVENT (aggpad);
643   PAD_UNLOCK (aggpad);
644 }
645
646 static void
647 gst_aggregator_aggregate_func (GstAggregator * self)
648 {
649   GstAggregatorPrivate *priv = self->priv;
650   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
651   gboolean timeout = FALSE;
652
653   if (self->priv->running == FALSE) {
654     GST_DEBUG_OBJECT (self, "Not running anymore");
655     return;
656   }
657
658   GST_LOG_OBJECT (self, "Checking aggregate");
659   while (priv->send_eos && priv->running) {
660     GstFlowReturn flow_return;
661
662     if (!gst_aggregator_wait_and_check (self, &timeout))
663       continue;
664
665     GST_TRACE_OBJECT (self, "Actually aggregating!");
666
667     flow_return = klass->aggregate (self, timeout);
668
669     GST_OBJECT_LOCK (self);
670     if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) {
671       /* We don't want to set the pads to flushing, but we want to
672        * stop the thread, so just break here */
673       GST_OBJECT_UNLOCK (self);
674       break;
675     }
676     GST_OBJECT_UNLOCK (self);
677
678     if (flow_return == GST_FLOW_EOS) {
679       gst_aggregator_push_eos (self);
680     }
681
682     GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
683
684     if (flow_return != GST_FLOW_OK) {
685       GList *item;
686
687       GST_OBJECT_LOCK (self);
688       for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
689         GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
690
691         gst_aggregator_pad_set_flushing (aggpad, flow_return);
692       }
693       GST_OBJECT_UNLOCK (self);
694       break;
695     }
696   }
697
698   /* Pause the task here, the only ways to get here are:
699    * 1) We're stopping, in which case the task is stopped anyway
700    * 2) We got a flow error above, in which case it might take
701    *    some time to forward the flow return upstream and we
702    *    would otherwise call the task function over and over
703    *    again without doing anything
704    */
705   gst_pad_pause_task (self->srcpad);
706 }
707
708 static gboolean
709 gst_aggregator_start (GstAggregator * self)
710 {
711   GstAggregatorClass *klass;
712   gboolean result;
713
714   self->priv->running = TRUE;
715   self->priv->send_stream_start = TRUE;
716   self->priv->send_segment = TRUE;
717   self->priv->send_eos = TRUE;
718   self->priv->srccaps = NULL;
719
720   klass = GST_AGGREGATOR_GET_CLASS (self);
721
722   if (klass->start)
723     result = klass->start (self);
724   else
725     result = TRUE;
726
727   return result;
728 }
729
730 static gboolean
731 _check_pending_flush_stop (GstAggregatorPad * pad)
732 {
733   gboolean res;
734
735   PAD_LOCK (pad);
736   res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
737   PAD_UNLOCK (pad);
738
739   return res;
740 }
741
742 static gboolean
743 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
744 {
745   gboolean res = TRUE;
746
747   GST_INFO_OBJECT (self, "%s srcpad task",
748       flush_start ? "Pausing" : "Stopping");
749
750   SRC_LOCK (self);
751   self->priv->running = FALSE;
752   SRC_BROADCAST (self);
753   SRC_UNLOCK (self);
754
755   if (flush_start) {
756     res = gst_pad_push_event (self->srcpad, flush_start);
757   }
758
759   gst_pad_stop_task (self->srcpad);
760
761   return res;
762 }
763
764 static void
765 gst_aggregator_start_srcpad_task (GstAggregator * self)
766 {
767   GST_INFO_OBJECT (self, "Starting srcpad task");
768
769   self->priv->running = TRUE;
770   gst_pad_start_task (GST_PAD (self->srcpad),
771       (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL);
772 }
773
774 static GstFlowReturn
775 gst_aggregator_flush (GstAggregator * self)
776 {
777   GstFlowReturn ret = GST_FLOW_OK;
778   GstAggregatorPrivate *priv = self->priv;
779   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
780
781   GST_DEBUG_OBJECT (self, "Flushing everything");
782   GST_OBJECT_LOCK (self);
783   priv->send_segment = TRUE;
784   priv->flush_seeking = FALSE;
785   priv->tags_changed = FALSE;
786   GST_OBJECT_UNLOCK (self);
787   if (klass->flush)
788     ret = klass->flush (self);
789
790   return ret;
791 }
792
793
794 /* Called with GstAggregator's object lock held */
795
796 static gboolean
797 gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
798 {
799   GList *tmp;
800   GstAggregatorPad *tmppad;
801
802   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
803     tmppad = (GstAggregatorPad *) tmp->data;
804
805     if (_check_pending_flush_stop (tmppad) == FALSE) {
806       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
807           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
808       return FALSE;
809     }
810   }
811
812   return TRUE;
813 }
814
815 static void
816 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
817     GstEvent * event)
818 {
819   GstAggregatorPrivate *priv = self->priv;
820   GstAggregatorPadPrivate *padpriv = aggpad->priv;
821
822   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
823
824   PAD_FLUSH_LOCK (aggpad);
825   PAD_LOCK (aggpad);
826   if (padpriv->pending_flush_start) {
827     GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
828
829     padpriv->pending_flush_start = FALSE;
830     padpriv->pending_flush_stop = TRUE;
831   }
832   PAD_UNLOCK (aggpad);
833
834   GST_OBJECT_LOCK (self);
835   if (priv->flush_seeking) {
836     /* If flush_seeking we forward the first FLUSH_START */
837     if (priv->pending_flush_start) {
838       priv->pending_flush_start = FALSE;
839       GST_OBJECT_UNLOCK (self);
840
841       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
842       gst_aggregator_stop_srcpad_task (self, event);
843
844       GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
845       GST_PAD_STREAM_LOCK (self->srcpad);
846       GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
847       event = NULL;
848     } else {
849       GST_OBJECT_UNLOCK (self);
850       gst_event_unref (event);
851     }
852   } else {
853     GST_OBJECT_UNLOCK (self);
854     gst_event_unref (event);
855   }
856   PAD_FLUSH_UNLOCK (aggpad);
857
858   gst_aggregator_pad_drop_buffer (aggpad);
859 }
860
861 /* GstAggregator vmethods default implementations */
862 static gboolean
863 gst_aggregator_default_sink_event (GstAggregator * self,
864     GstAggregatorPad * aggpad, GstEvent * event)
865 {
866   gboolean res = TRUE;
867   GstPad *pad = GST_PAD (aggpad);
868   GstAggregatorPrivate *priv = self->priv;
869
870   switch (GST_EVENT_TYPE (event)) {
871     case GST_EVENT_FLUSH_START:
872     {
873       gst_aggregator_flush_start (self, aggpad, event);
874       /* We forward only in one case: right after flush_seeking */
875       event = NULL;
876       goto eat;
877     }
878     case GST_EVENT_FLUSH_STOP:
879     {
880       GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP");
881
882       gst_aggregator_pad_flush (aggpad, self);
883       GST_OBJECT_LOCK (self);
884       if (priv->flush_seeking) {
885         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
886         if (gst_aggregator_all_flush_stop_received_locked (self)) {
887           GST_OBJECT_UNLOCK (self);
888           /* That means we received FLUSH_STOP/FLUSH_STOP on
889            * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
890           gst_aggregator_flush (self);
891           gst_pad_push_event (self->srcpad, event);
892           event = NULL;
893           SRC_LOCK (self);
894           priv->send_eos = TRUE;
895           SRC_BROADCAST (self);
896           SRC_UNLOCK (self);
897
898           GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
899           GST_PAD_STREAM_UNLOCK (self->srcpad);
900           gst_aggregator_start_srcpad_task (self);
901         } else {
902           GST_OBJECT_UNLOCK (self);
903         }
904       } else {
905         GST_OBJECT_UNLOCK (self);
906       }
907
908       /* We never forward the event */
909       goto eat;
910     }
911     case GST_EVENT_EOS:
912     {
913       GST_DEBUG_OBJECT (aggpad, "EOS");
914
915       /* We still have a buffer, and we don't want the subclass to have to
916        * check for it. Mark pending_eos, eos will be set when steal_buffer is
917        * called
918        */
919       SRC_LOCK (self);
920       PAD_LOCK (aggpad);
921       if (!aggpad->priv->buffer) {
922         aggpad->priv->eos = TRUE;
923       } else {
924         aggpad->priv->pending_eos = TRUE;
925       }
926       PAD_UNLOCK (aggpad);
927
928       SRC_BROADCAST (self);
929       SRC_UNLOCK (self);
930       goto eat;
931     }
932     case GST_EVENT_SEGMENT:
933     {
934       GST_OBJECT_LOCK (aggpad);
935       gst_event_copy_segment (event, &aggpad->segment);
936       GST_OBJECT_UNLOCK (aggpad);
937
938       GST_OBJECT_LOCK (self);
939       self->priv->seqnum = gst_event_get_seqnum (event);
940       GST_OBJECT_UNLOCK (self);
941       goto eat;
942     }
943     case GST_EVENT_STREAM_START:
944     {
945       goto eat;
946     }
947     case GST_EVENT_GAP:
948     {
949       GstClockTime pts;
950       GstClockTime duration;
951       GstBuffer *gapbuf;
952
953       gst_event_parse_gap (event, &pts, &duration);
954       gapbuf = gst_buffer_new ();
955
956       GST_BUFFER_PTS (gapbuf) = pts;
957       GST_BUFFER_DURATION (gapbuf) = duration;
958       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
959       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
960
961       if (gst_pad_chain (pad, gapbuf) != GST_FLOW_OK) {
962         GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
963         res = FALSE;
964       }
965
966       goto eat;
967     }
968     case GST_EVENT_TAG:
969     {
970       GstTagList *tags;
971
972       gst_event_parse_tag (event, &tags);
973
974       if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) {
975         gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE);
976         gst_event_unref (event);
977         event = NULL;
978         goto eat;
979       }
980       break;
981     }
982     default:
983     {
984       break;
985     }
986   }
987
988   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
989   return gst_pad_event_default (pad, GST_OBJECT (self), event);
990
991 eat:
992   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
993   if (event)
994     gst_event_unref (event);
995
996   return res;
997 }
998
999 static inline gboolean
1000 gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad,
1001     gpointer unused_udata)
1002 {
1003   gst_aggregator_pad_flush (pad, self);
1004
1005   return TRUE;
1006 }
1007
1008 static gboolean
1009 gst_aggregator_stop (GstAggregator * agg)
1010 {
1011   GstAggregatorClass *klass;
1012   gboolean result;
1013
1014   gst_aggregator_reset_flow_values (agg);
1015
1016   gst_aggregator_iterate_sinkpads (agg, gst_aggregator_stop_pad, NULL);
1017
1018   klass = GST_AGGREGATOR_GET_CLASS (agg);
1019
1020   if (klass->stop)
1021     result = klass->stop (agg);
1022   else
1023     result = TRUE;
1024
1025   agg->priv->has_peer_latency = FALSE;
1026   agg->priv->peer_latency_live = FALSE;
1027   agg->priv->peer_latency_min = agg->priv->peer_latency_max = FALSE;
1028
1029   if (agg->priv->tags)
1030     gst_tag_list_unref (agg->priv->tags);
1031   agg->priv->tags = NULL;
1032
1033   return result;
1034 }
1035
1036 /* GstElement vmethods implementations */
1037 static GstStateChangeReturn
1038 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
1039 {
1040   GstStateChangeReturn ret;
1041   GstAggregator *self = GST_AGGREGATOR (element);
1042
1043   switch (transition) {
1044     case GST_STATE_CHANGE_READY_TO_PAUSED:
1045       if (!gst_aggregator_start (self))
1046         goto error_start;
1047       break;
1048     default:
1049       break;
1050   }
1051
1052   if ((ret =
1053           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
1054               transition)) == GST_STATE_CHANGE_FAILURE)
1055     goto failure;
1056
1057
1058   switch (transition) {
1059     case GST_STATE_CHANGE_PAUSED_TO_READY:
1060       if (!gst_aggregator_stop (self)) {
1061         /* What to do in this case? Error out? */
1062         GST_ERROR_OBJECT (self, "Subclass failed to stop.");
1063       }
1064       break;
1065     default:
1066       break;
1067   }
1068
1069   return ret;
1070
1071 /* ERRORS */
1072 failure:
1073   {
1074     GST_ERROR_OBJECT (element, "parent failed state change");
1075     return ret;
1076   }
1077 error_start:
1078   {
1079     GST_ERROR_OBJECT (element, "Subclass failed to start");
1080     return GST_STATE_CHANGE_FAILURE;
1081   }
1082 }
1083
1084 static void
1085 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
1086 {
1087   GstAggregator *self = GST_AGGREGATOR (element);
1088   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1089
1090   GST_INFO_OBJECT (pad, "Removing pad");
1091
1092   SRC_LOCK (self);
1093   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
1094   gst_element_remove_pad (element, pad);
1095
1096   SRC_BROADCAST (self);
1097   SRC_UNLOCK (self);
1098 }
1099
1100 static GstPad *
1101 gst_aggregator_request_new_pad (GstElement * element,
1102     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1103 {
1104   GstAggregator *self;
1105   GstAggregatorPad *agg_pad;
1106
1107   GstElementClass *klass = GST_ELEMENT_GET_CLASS (element);
1108   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
1109
1110   self = GST_AGGREGATOR (element);
1111
1112   if (templ == gst_element_class_get_pad_template (klass, "sink_%u")) {
1113     gint serial = 0;
1114     gchar *name = NULL;
1115
1116     GST_OBJECT_LOCK (element);
1117     if (req_name == NULL || strlen (req_name) < 6
1118         || !g_str_has_prefix (req_name, "sink_")) {
1119       /* no name given when requesting the pad, use next available int */
1120       priv->padcount++;
1121     } else {
1122       /* parse serial number from requested padname */
1123       serial = g_ascii_strtoull (&req_name[5], NULL, 10);
1124       if (serial >= priv->padcount)
1125         priv->padcount = serial;
1126     }
1127
1128     name = g_strdup_printf ("sink_%u", priv->padcount);
1129     agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
1130         "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
1131     g_free (name);
1132
1133     GST_OBJECT_UNLOCK (element);
1134
1135   } else {
1136     return NULL;
1137   }
1138
1139   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
1140
1141   if (priv->running)
1142     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
1143
1144   /* add the pad to the element */
1145   gst_element_add_pad (element, GST_PAD (agg_pad));
1146
1147   return GST_PAD (agg_pad);
1148 }
1149
1150 /* Must be called with SRC_LOCK held */
1151
1152 static gboolean
1153 gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
1154 {
1155   gboolean query_ret, live;
1156   GstClockTime our_latency, min, max;
1157
1158   query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1159
1160   if (!query_ret) {
1161     GST_WARNING_OBJECT (self, "Latency query failed");
1162     return FALSE;
1163   }
1164
1165   gst_query_parse_latency (query, &live, &min, &max);
1166
1167   our_latency = self->priv->latency;
1168
1169   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
1170     GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
1171         ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
1172     return FALSE;
1173   }
1174
1175   if (min > max && GST_CLOCK_TIME_IS_VALID (max)) {
1176     GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL),
1177         ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
1178             GST_TIME_FORMAT ". Add queues or other buffering elements.",
1179             GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
1180     return FALSE;
1181   }
1182
1183   self->priv->peer_latency_live = live;
1184   self->priv->peer_latency_min = min;
1185   self->priv->peer_latency_max = max;
1186   self->priv->has_peer_latency = TRUE;
1187
1188   /* add our own */
1189   min += our_latency;
1190   min += self->priv->sub_latency_min;
1191   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1192       && GST_CLOCK_TIME_IS_VALID (max))
1193     max += self->priv->sub_latency_max;
1194   else
1195     max = GST_CLOCK_TIME_NONE;
1196
1197   if (live && min > max) {
1198     GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
1199         ("%s", "Latency too big"),
1200         ("The requested latency value is too big for the current pipeline.  "
1201             "Limiting to %" G_GINT64_FORMAT, max));
1202     min = max;
1203     /* FIXME: This could in theory become negative, but in
1204      * that case all is lost anyway */
1205     self->priv->latency -= min - max;
1206     /* FIXME: shouldn't we g_object_notify() the change here? */
1207   }
1208
1209   SRC_BROADCAST (self);
1210
1211   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
1212       " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
1213
1214   gst_query_set_latency (query, live, min, max);
1215
1216   return query_ret;
1217 }
1218
1219 /*
1220  * MUST be called with the src_lock held.
1221  *
1222  * See  gst_aggregator_get_latency() for doc
1223  */
1224 static GstClockTime
1225 gst_aggregator_get_latency_unlocked (GstAggregator * self)
1226 {
1227   GstClockTime latency;
1228
1229   g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0);
1230
1231   if (!self->priv->has_peer_latency) {
1232     GstQuery *query = gst_query_new_latency ();
1233     gboolean ret;
1234
1235     ret = gst_aggregator_query_latency_unlocked (self, query);
1236     gst_query_unref (query);
1237     if (!ret)
1238       return GST_CLOCK_TIME_NONE;
1239   }
1240
1241   if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
1242     return GST_CLOCK_TIME_NONE;
1243
1244   /* latency_min is never GST_CLOCK_TIME_NONE by construction */
1245   latency = self->priv->peer_latency_min;
1246
1247   /* add our own */
1248   latency += self->priv->latency;
1249   latency += self->priv->sub_latency_min;
1250
1251   return latency;
1252 }
1253
1254 /**
1255  * gst_aggregator_get_latency:
1256  * @self: a #GstAggregator
1257  *
1258  * Retrieves the latency values reported by @self in response to the latency
1259  * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element
1260  * will not wait for the clock.
1261  *
1262  * Typically only called by subclasses.
1263  *
1264  * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync
1265  */
1266 GstClockTime
1267 gst_aggregator_get_latency (GstAggregator * self)
1268 {
1269   GstClockTime ret;
1270
1271   SRC_LOCK (self);
1272   ret = gst_aggregator_get_latency_unlocked (self);
1273   SRC_UNLOCK (self);
1274
1275   return ret;
1276 }
1277
1278 static gboolean
1279 gst_aggregator_send_event (GstElement * element, GstEvent * event)
1280 {
1281   GstAggregator *self = GST_AGGREGATOR (element);
1282
1283   GST_STATE_LOCK (element);
1284   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
1285       GST_STATE (element) < GST_STATE_PAUSED) {
1286     gdouble rate;
1287     GstFormat fmt;
1288     GstSeekFlags flags;
1289     GstSeekType start_type, stop_type;
1290     gint64 start, stop;
1291
1292     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1293         &start, &stop_type, &stop);
1294
1295     GST_OBJECT_LOCK (self);
1296     gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1297         stop_type, stop, NULL);
1298     self->priv->seqnum = gst_event_get_seqnum (event);
1299     GST_OBJECT_UNLOCK (self);
1300
1301     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
1302   }
1303   GST_STATE_UNLOCK (element);
1304
1305
1306   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
1307       event);
1308 }
1309
1310 static gboolean
1311 gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
1312 {
1313   gboolean res = TRUE;
1314
1315   switch (GST_QUERY_TYPE (query)) {
1316     case GST_QUERY_SEEKING:
1317     {
1318       GstFormat format;
1319
1320       /* don't pass it along as some (file)sink might claim it does
1321        * whereas with a collectpads in between that will not likely work */
1322       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1323       gst_query_set_seeking (query, format, FALSE, 0, -1);
1324       res = TRUE;
1325
1326       break;
1327     }
1328     case GST_QUERY_LATENCY:
1329       SRC_LOCK (self);
1330       res = gst_aggregator_query_latency_unlocked (self, query);
1331       SRC_UNLOCK (self);
1332       break;
1333     default:
1334       return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1335   }
1336
1337   return res;
1338 }
1339
1340 static gboolean
1341 gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
1342 {
1343   EventData *evdata = user_data;
1344   gboolean ret = TRUE;
1345   GstPad *peer = gst_pad_get_peer (pad);
1346   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1347
1348   if (peer) {
1349     ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
1350     GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
1351     gst_object_unref (peer);
1352   }
1353
1354   if (ret == FALSE) {
1355     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK)
1356       GST_ERROR_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
1357
1358     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
1359       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
1360
1361       if (gst_pad_query (peer, seeking)) {
1362         gboolean seekable;
1363
1364         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
1365
1366         if (seekable == FALSE) {
1367           GST_INFO_OBJECT (pad,
1368               "Source not seekable, We failed but it does not matter!");
1369
1370           ret = TRUE;
1371         }
1372       } else {
1373         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
1374       }
1375
1376       gst_query_unref (seeking);
1377     }
1378
1379     if (evdata->flush) {
1380       PAD_LOCK (aggpad);
1381       aggpad->priv->pending_flush_start = FALSE;
1382       aggpad->priv->pending_flush_stop = FALSE;
1383       PAD_UNLOCK (aggpad);
1384     }
1385   } else {
1386     evdata->one_actually_seeked = TRUE;
1387   }
1388
1389   evdata->result &= ret;
1390
1391   /* Always send to all pads */
1392   return FALSE;
1393 }
1394
1395 static EventData
1396 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
1397     GstEvent * event, gboolean flush)
1398 {
1399   EventData evdata;
1400
1401   evdata.event = event;
1402   evdata.result = TRUE;
1403   evdata.flush = flush;
1404   evdata.one_actually_seeked = FALSE;
1405
1406   /* We first need to set all pads as flushing in a first pass
1407    * as flush_start flush_stop is sometimes sent synchronously
1408    * while we send the seek event */
1409   if (flush) {
1410     GList *l;
1411
1412     GST_OBJECT_LOCK (self);
1413     for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
1414       GstAggregatorPad *pad = l->data;
1415
1416       PAD_LOCK (pad);
1417       pad->priv->pending_flush_start = TRUE;
1418       pad->priv->pending_flush_stop = FALSE;
1419       PAD_UNLOCK (pad);
1420     }
1421     GST_OBJECT_UNLOCK (self);
1422   }
1423
1424   gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, &evdata);
1425
1426   gst_event_unref (event);
1427
1428   return evdata;
1429 }
1430
1431 static gboolean
1432 gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
1433 {
1434   gdouble rate;
1435   GstFormat fmt;
1436   GstSeekFlags flags;
1437   GstSeekType start_type, stop_type;
1438   gint64 start, stop;
1439   gboolean flush;
1440   EventData evdata;
1441   GstAggregatorPrivate *priv = self->priv;
1442
1443   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1444       &start, &stop_type, &stop);
1445
1446   GST_INFO_OBJECT (self, "starting SEEK");
1447
1448   flush = flags & GST_SEEK_FLAG_FLUSH;
1449
1450   GST_OBJECT_LOCK (self);
1451   if (flush) {
1452     priv->pending_flush_start = TRUE;
1453     priv->flush_seeking = TRUE;
1454   }
1455
1456   gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1457       stop_type, stop, NULL);
1458   GST_OBJECT_UNLOCK (self);
1459
1460   /* forward the seek upstream */
1461   evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, flush);
1462   event = NULL;
1463
1464   if (!evdata.result || !evdata.one_actually_seeked) {
1465     GST_OBJECT_LOCK (self);
1466     priv->flush_seeking = FALSE;
1467     priv->pending_flush_start = FALSE;
1468     GST_OBJECT_UNLOCK (self);
1469   }
1470
1471   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
1472
1473   return evdata.result;
1474 }
1475
1476 static gboolean
1477 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
1478 {
1479   EventData evdata;
1480   gboolean res = TRUE;
1481
1482   switch (GST_EVENT_TYPE (event)) {
1483     case GST_EVENT_SEEK:
1484     {
1485       gst_event_ref (event);
1486       res = gst_aggregator_do_seek (self, event);
1487       gst_event_unref (event);
1488       event = NULL;
1489       goto done;
1490     }
1491     case GST_EVENT_NAVIGATION:
1492     {
1493       /* navigation is rather pointless. */
1494       res = FALSE;
1495       gst_event_unref (event);
1496       goto done;
1497     }
1498     default:
1499     {
1500       break;
1501     }
1502   }
1503
1504   evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, FALSE);
1505   res = evdata.result;
1506
1507 done:
1508   return res;
1509 }
1510
1511 static gboolean
1512 gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent,
1513     GstEvent * event)
1514 {
1515   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1516
1517   return klass->src_event (GST_AGGREGATOR (parent), event);
1518 }
1519
1520 static gboolean
1521 gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent,
1522     GstQuery * query)
1523 {
1524   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1525
1526   return klass->src_query (GST_AGGREGATOR (parent), query);
1527 }
1528
1529 static gboolean
1530 gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
1531     GstObject * parent, GstPadMode mode, gboolean active)
1532 {
1533   GstAggregator *self = GST_AGGREGATOR (parent);
1534   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1535
1536   if (klass->src_activate) {
1537     if (klass->src_activate (self, mode, active) == FALSE) {
1538       return FALSE;
1539     }
1540   }
1541
1542   if (active == TRUE) {
1543     switch (mode) {
1544       case GST_PAD_MODE_PUSH:
1545       {
1546         GST_INFO_OBJECT (pad, "Activating pad!");
1547         gst_aggregator_start_srcpad_task (self);
1548         return TRUE;
1549       }
1550       default:
1551       {
1552         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
1553         return FALSE;
1554       }
1555     }
1556   }
1557
1558   /* deactivating */
1559   GST_INFO_OBJECT (self, "Deactivating srcpad");
1560   gst_aggregator_stop_srcpad_task (self, FALSE);
1561
1562   return TRUE;
1563 }
1564
1565 static gboolean
1566 gst_aggregator_default_sink_query (GstAggregator * self,
1567     GstAggregatorPad * aggpad, GstQuery * query)
1568 {
1569   GstPad *pad = GST_PAD (aggpad);
1570
1571   return gst_pad_query_default (pad, GST_OBJECT (self), query);
1572 }
1573
1574 static void
1575 gst_aggregator_finalize (GObject * object)
1576 {
1577   GstAggregator *self = (GstAggregator *) object;
1578
1579   g_mutex_clear (&self->priv->src_lock);
1580   g_cond_clear (&self->priv->src_cond);
1581
1582   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
1583 }
1584
1585 /*
1586  * gst_aggregator_set_latency_property:
1587  * @agg: a #GstAggregator
1588  * @latency: the new latency value.
1589  *
1590  * Sets the new latency value to @latency. This value is used to limit the
1591  * amount of time a pad waits for data to appear before considering the pad
1592  * as unresponsive.
1593  */
1594 static void
1595 gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
1596 {
1597   gboolean changed;
1598   GstClockTime min, max;
1599
1600   g_return_if_fail (GST_IS_AGGREGATOR (self));
1601   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
1602
1603   SRC_LOCK (self);
1604   if (self->priv->peer_latency_live) {
1605     min = self->priv->peer_latency_min;
1606     max = self->priv->peer_latency_max;
1607     /* add our own */
1608     min += latency;
1609     min += self->priv->sub_latency_min;
1610     if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1611         && GST_CLOCK_TIME_IS_VALID (max))
1612       max += self->priv->sub_latency_max;
1613     else
1614       max = GST_CLOCK_TIME_NONE;
1615
1616     if (GST_CLOCK_TIME_IS_VALID (max) && min > max) {
1617       GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
1618           ("%s", "Latency too big"),
1619           ("The requested latency value is too big for the latency in the "
1620               "current pipeline.  Limiting to %" G_GINT64_FORMAT, max));
1621       /* FIXME: This could in theory become negative, but in
1622        * that case all is lost anyway */
1623       latency -= min - max;
1624       /* FIXME: shouldn't we g_object_notify() the change here? */
1625     }
1626   }
1627
1628   changed = (self->priv->latency != latency);
1629   self->priv->latency = latency;
1630
1631   if (changed)
1632     SRC_BROADCAST (self);
1633   SRC_UNLOCK (self);
1634
1635   if (changed)
1636     gst_element_post_message (GST_ELEMENT_CAST (self),
1637         gst_message_new_latency (GST_OBJECT_CAST (self)));
1638 }
1639
1640 /*
1641  * gst_aggregator_get_latency_property:
1642  * @agg: a #GstAggregator
1643  *
1644  * Gets the latency value. See gst_aggregator_set_latency for
1645  * more details.
1646  *
1647  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad 
1648  * before a pad is deemed unresponsive. A value of -1 means an
1649  * unlimited time.
1650  */
1651 static gint64
1652 gst_aggregator_get_latency_property (GstAggregator * agg)
1653 {
1654   gint64 res;
1655
1656   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
1657
1658   GST_OBJECT_LOCK (agg);
1659   res = agg->priv->latency;
1660   GST_OBJECT_UNLOCK (agg);
1661
1662   return res;
1663 }
1664
1665 static void
1666 gst_aggregator_set_property (GObject * object, guint prop_id,
1667     const GValue * value, GParamSpec * pspec)
1668 {
1669   GstAggregator *agg = GST_AGGREGATOR (object);
1670
1671   switch (prop_id) {
1672     case PROP_LATENCY:
1673       gst_aggregator_set_latency_property (agg, g_value_get_int64 (value));
1674       break;
1675     default:
1676       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1677       break;
1678   }
1679 }
1680
1681 static void
1682 gst_aggregator_get_property (GObject * object, guint prop_id,
1683     GValue * value, GParamSpec * pspec)
1684 {
1685   GstAggregator *agg = GST_AGGREGATOR (object);
1686
1687   switch (prop_id) {
1688     case PROP_LATENCY:
1689       g_value_set_int64 (value, gst_aggregator_get_latency_property (agg));
1690       break;
1691     default:
1692       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1693       break;
1694   }
1695 }
1696
1697 /* GObject vmethods implementations */
1698 static void
1699 gst_aggregator_class_init (GstAggregatorClass * klass)
1700 {
1701   GObjectClass *gobject_class = (GObjectClass *) klass;
1702   GstElementClass *gstelement_class = (GstElementClass *) klass;
1703
1704   aggregator_parent_class = g_type_class_peek_parent (klass);
1705   g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
1706
1707   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
1708       GST_DEBUG_FG_MAGENTA, "GstAggregator");
1709
1710   klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD;
1711
1712   klass->sink_event = gst_aggregator_default_sink_event;
1713   klass->sink_query = gst_aggregator_default_sink_query;
1714
1715   klass->src_event = gst_aggregator_default_src_event;
1716   klass->src_query = gst_aggregator_default_src_query;
1717
1718   gstelement_class->request_new_pad =
1719       GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
1720   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
1721   gstelement_class->release_pad =
1722       GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
1723   gstelement_class->change_state =
1724       GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
1725
1726   gobject_class->set_property = gst_aggregator_set_property;
1727   gobject_class->get_property = gst_aggregator_get_property;
1728   gobject_class->finalize = gst_aggregator_finalize;
1729
1730   g_object_class_install_property (gobject_class, PROP_LATENCY,
1731       g_param_spec_int64 ("latency", "Buffer latency",
1732           "Additional latency in live mode to allow upstream "
1733           "to take longer to produce buffers for the current "
1734           "position", 0,
1735           (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1),
1736           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1737
1738   GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad);
1739 }
1740
1741 static void
1742 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
1743 {
1744   GstPadTemplate *pad_template;
1745   GstAggregatorPrivate *priv;
1746
1747   g_return_if_fail (klass->aggregate != NULL);
1748
1749   self->priv =
1750       G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
1751       GstAggregatorPrivate);
1752
1753   priv = self->priv;
1754
1755   pad_template =
1756       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
1757   g_return_if_fail (pad_template != NULL);
1758
1759   priv->padcount = -1;
1760   priv->tags_changed = FALSE;
1761
1762   self->priv->peer_latency_live = FALSE;
1763   self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
1764   self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
1765   self->priv->has_peer_latency = FALSE;
1766   gst_aggregator_reset_flow_values (self);
1767
1768   self->srcpad = gst_pad_new_from_template (pad_template, "src");
1769
1770   gst_pad_set_event_function (self->srcpad,
1771       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
1772   gst_pad_set_query_function (self->srcpad,
1773       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func));
1774   gst_pad_set_activatemode_function (self->srcpad,
1775       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func));
1776
1777   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
1778
1779   self->priv->latency = DEFAULT_LATENCY;
1780
1781   g_mutex_init (&self->priv->src_lock);
1782   g_cond_init (&self->priv->src_cond);
1783 }
1784
1785 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
1786  * method to get to the padtemplates */
1787 GType
1788 gst_aggregator_get_type (void)
1789 {
1790   static volatile gsize type = 0;
1791
1792   if (g_once_init_enter (&type)) {
1793     GType _type;
1794     static const GTypeInfo info = {
1795       sizeof (GstAggregatorClass),
1796       NULL,
1797       NULL,
1798       (GClassInitFunc) gst_aggregator_class_init,
1799       NULL,
1800       NULL,
1801       sizeof (GstAggregator),
1802       0,
1803       (GInstanceInitFunc) gst_aggregator_init,
1804     };
1805
1806     _type = g_type_register_static (GST_TYPE_ELEMENT,
1807         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
1808     g_once_init_leave (&type, _type);
1809   }
1810   return type;
1811 }
1812
1813 static GstFlowReturn
1814 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
1815 {
1816   GstBuffer *actual_buf = buffer;
1817   GstAggregator *self = GST_AGGREGATOR (object);
1818   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1819   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
1820   GstFlowReturn flow_return;
1821
1822   GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
1823
1824   PAD_FLUSH_LOCK (aggpad);
1825
1826   PAD_LOCK (aggpad);
1827   flow_return = aggpad->priv->flow_return;
1828   if (flow_return != GST_FLOW_OK)
1829     goto flushing;
1830
1831   if (aggpad->priv->pending_eos == TRUE)
1832     goto eos;
1833
1834   while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
1835     PAD_WAIT_EVENT (aggpad);
1836
1837   flow_return = aggpad->priv->flow_return;
1838   if (flow_return != GST_FLOW_OK)
1839     goto flushing;
1840
1841   PAD_UNLOCK (aggpad);
1842
1843   if (aggclass->clip) {
1844     aggclass->clip (self, aggpad, buffer, &actual_buf);
1845   }
1846
1847   SRC_LOCK (self);
1848   PAD_LOCK (aggpad);
1849   if (aggpad->priv->buffer)
1850     gst_buffer_unref (aggpad->priv->buffer);
1851   aggpad->priv->buffer = actual_buf;
1852
1853   flow_return = aggpad->priv->flow_return;
1854
1855   PAD_UNLOCK (aggpad);
1856   PAD_FLUSH_UNLOCK (aggpad);
1857
1858   SRC_BROADCAST (self);
1859   SRC_UNLOCK (self);
1860
1861   GST_DEBUG_OBJECT (aggpad, "Done chaining");
1862
1863   return flow_return;
1864
1865 flushing:
1866   PAD_UNLOCK (aggpad);
1867   PAD_FLUSH_UNLOCK (aggpad);
1868
1869   gst_buffer_unref (buffer);
1870   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
1871       gst_flow_get_name (flow_return));
1872
1873   return flow_return;
1874
1875 eos:
1876   PAD_UNLOCK (aggpad);
1877   PAD_FLUSH_UNLOCK (aggpad);
1878
1879   gst_buffer_unref (buffer);
1880   GST_DEBUG_OBJECT (pad, "We are EOS already...");
1881
1882   return GST_FLOW_EOS;
1883 }
1884
1885 static gboolean
1886 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
1887     GstQuery * query)
1888 {
1889   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1890   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1891
1892   if (GST_QUERY_IS_SERIALIZED (query)) {
1893     PAD_LOCK (aggpad);
1894
1895     while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
1896       PAD_WAIT_EVENT (aggpad);
1897
1898     if (aggpad->priv->flow_return != GST_FLOW_OK)
1899       goto flushing;
1900
1901     PAD_UNLOCK (aggpad);
1902   }
1903
1904   return klass->sink_query (GST_AGGREGATOR (parent),
1905       GST_AGGREGATOR_PAD (pad), query);
1906
1907 flushing:
1908   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
1909       gst_flow_get_name (aggpad->priv->flow_return));
1910   PAD_UNLOCK (aggpad);
1911   return FALSE;
1912 }
1913
1914 static gboolean
1915 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
1916     GstEvent * event)
1917 {
1918   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1919   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1920
1921   if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS
1922       && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) {
1923     PAD_LOCK (aggpad);
1924
1925
1926     while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
1927       PAD_WAIT_EVENT (aggpad);
1928
1929     if (aggpad->priv->flow_return != GST_FLOW_OK
1930         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
1931       goto flushing;
1932
1933     PAD_UNLOCK (aggpad);
1934   }
1935
1936   return klass->sink_event (GST_AGGREGATOR (parent),
1937       GST_AGGREGATOR_PAD (pad), event);
1938
1939 flushing:
1940   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
1941       gst_flow_get_name (aggpad->priv->flow_return));
1942   PAD_UNLOCK (aggpad);
1943   if (GST_EVENT_IS_STICKY (event))
1944     gst_pad_store_sticky_event (pad, event);
1945   gst_event_unref (event);
1946   return FALSE;
1947 }
1948
1949 static gboolean
1950 gst_aggregator_pad_activate_mode_func (GstPad * pad,
1951     GstObject * parent, GstPadMode mode, gboolean active)
1952 {
1953   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1954
1955   if (active == FALSE) {
1956     gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
1957   } else {
1958     PAD_LOCK (aggpad);
1959     aggpad->priv->flow_return = GST_FLOW_OK;
1960     PAD_BROADCAST_EVENT (aggpad);
1961     PAD_UNLOCK (aggpad);
1962   }
1963
1964   return TRUE;
1965 }
1966
1967 /***********************************
1968  * GstAggregatorPad implementation  *
1969  ************************************/
1970 G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
1971
1972 static void
1973 gst_aggregator_pad_constructed (GObject * object)
1974 {
1975   GstPad *pad = GST_PAD (object);
1976
1977   gst_pad_set_chain_function (pad,
1978       GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
1979   gst_pad_set_event_function (pad,
1980       GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func));
1981   gst_pad_set_query_function (pad,
1982       GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
1983   gst_pad_set_activatemode_function (pad,
1984       GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
1985 }
1986
1987 static void
1988 gst_aggregator_pad_finalize (GObject * object)
1989 {
1990   GstAggregatorPad *pad = (GstAggregatorPad *) object;
1991
1992   g_cond_clear (&pad->priv->event_cond);
1993   g_mutex_clear (&pad->priv->flush_lock);
1994   g_mutex_clear (&pad->priv->lock);
1995
1996   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object);
1997 }
1998
1999 static void
2000 gst_aggregator_pad_dispose (GObject * object)
2001 {
2002   GstAggregatorPad *pad = (GstAggregatorPad *) object;
2003
2004   gst_aggregator_pad_drop_buffer (pad);
2005
2006   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
2007 }
2008
2009 static void
2010 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
2011 {
2012   GObjectClass *gobject_class = (GObjectClass *) klass;
2013
2014   g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
2015
2016   gobject_class->constructed = gst_aggregator_pad_constructed;
2017   gobject_class->finalize = gst_aggregator_pad_finalize;
2018   gobject_class->dispose = gst_aggregator_pad_dispose;
2019 }
2020
2021 static void
2022 gst_aggregator_pad_init (GstAggregatorPad * pad)
2023 {
2024   pad->priv =
2025       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
2026       GstAggregatorPadPrivate);
2027
2028   pad->priv->buffer = NULL;
2029   g_cond_init (&pad->priv->event_cond);
2030
2031   g_mutex_init (&pad->priv->flush_lock);
2032   g_mutex_init (&pad->priv->lock);
2033 }
2034
2035 /**
2036  * gst_aggregator_pad_steal_buffer:
2037  * @pad: the pad to get buffer from
2038  *
2039  * Steal the ref to the buffer currently queued in @pad.
2040  *
2041  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
2042  *   queued. You should unref the buffer after usage.
2043  */
2044 GstBuffer *
2045 gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
2046 {
2047   GstBuffer *buffer = NULL;
2048
2049   PAD_LOCK (pad);
2050   if (pad->priv->buffer) {
2051     GST_TRACE_OBJECT (pad, "Consuming buffer");
2052     buffer = pad->priv->buffer;
2053     pad->priv->buffer = NULL;
2054     if (pad->priv->pending_eos) {
2055       pad->priv->pending_eos = FALSE;
2056       pad->priv->eos = TRUE;
2057     }
2058     PAD_BROADCAST_EVENT (pad);
2059     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
2060   }
2061   PAD_UNLOCK (pad);
2062
2063   return buffer;
2064 }
2065
2066 /**
2067  * gst_aggregator_pad_drop_buffer:
2068  * @pad: the pad where to drop any pending buffer
2069  *
2070  * Drop the buffer currently queued in @pad.
2071  *
2072  * Returns: TRUE if there was a buffer queued in @pad, or FALSE if not.
2073  */
2074 gboolean
2075 gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
2076 {
2077   GstBuffer *buf;
2078
2079   buf = gst_aggregator_pad_steal_buffer (pad);
2080
2081   if (buf == NULL)
2082     return FALSE;
2083
2084   gst_buffer_unref (buf);
2085   return TRUE;
2086 }
2087
2088 /**
2089  * gst_aggregator_pad_get_buffer:
2090  * @pad: the pad to get buffer from
2091  *
2092  * Returns: (transfer full): A reference to the buffer in @pad or
2093  * NULL if no buffer was queued. You should unref the buffer after
2094  * usage.
2095  */
2096 GstBuffer *
2097 gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
2098 {
2099   GstBuffer *buffer = NULL;
2100
2101   PAD_LOCK (pad);
2102   if (pad->priv->buffer)
2103     buffer = gst_buffer_ref (pad->priv->buffer);
2104   PAD_UNLOCK (pad);
2105
2106   return buffer;
2107 }
2108
2109 gboolean
2110 gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
2111 {
2112   gboolean is_eos;
2113
2114   PAD_LOCK (pad);
2115   is_eos = pad->priv->eos;
2116   PAD_UNLOCK (pad);
2117
2118   return is_eos;
2119 }
2120
2121 /**
2122  * gst_aggregator_merge_tags:
2123  * @self: a #GstAggregator
2124  * @tags: a #GstTagList to merge
2125  * @mode: the #GstTagMergeMode to use
2126  *
2127  * Adds tags to so-called pending tags, which will be processed
2128  * before pushing out data downstream.
2129  *
2130  * Note that this is provided for convenience, and the subclass is
2131  * not required to use this and can still do tag handling on its own.
2132  *
2133  * MT safe.
2134  */
2135 void
2136 gst_aggregator_merge_tags (GstAggregator * self,
2137     const GstTagList * tags, GstTagMergeMode mode)
2138 {
2139   GstTagList *otags;
2140
2141   g_return_if_fail (GST_IS_AGGREGATOR (self));
2142   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
2143
2144   /* FIXME Check if we can use OBJECT lock here! */
2145   GST_OBJECT_LOCK (self);
2146   if (tags)
2147     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
2148   otags = self->priv->tags;
2149   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
2150   if (otags)
2151     gst_tag_list_unref (otags);
2152   self->priv->tags_changed = TRUE;
2153   GST_OBJECT_UNLOCK (self);
2154 }
2155
2156 /**
2157  * gst_aggregator_set_latency:
2158  * @self: a #GstAggregator
2159  * @min_latency: minimum latency
2160  * @max_latency: maximum latency
2161  *
2162  * Lets #GstAggregator sub-classes tell the baseclass what their internal
2163  * latency is. Will also post a LATENCY message on the bus so the pipeline
2164  * can reconfigure its global latency.
2165  */
2166 void
2167 gst_aggregator_set_latency (GstAggregator * self,
2168     GstClockTime min_latency, GstClockTime max_latency)
2169 {
2170   gboolean changed = FALSE;
2171
2172   g_return_if_fail (GST_IS_AGGREGATOR (self));
2173   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
2174   g_return_if_fail (max_latency >= min_latency);
2175
2176   SRC_LOCK (self);
2177   if (self->priv->sub_latency_min != min_latency) {
2178     self->priv->sub_latency_min = min_latency;
2179     changed = TRUE;
2180   }
2181   if (self->priv->sub_latency_max != max_latency) {
2182     self->priv->sub_latency_max = max_latency;
2183     changed = TRUE;
2184   }
2185
2186   if (changed)
2187     SRC_BROADCAST (self);
2188   SRC_UNLOCK (self);
2189
2190   if (changed) {
2191     gst_element_post_message (GST_ELEMENT_CAST (self),
2192         gst_message_new_latency (GST_OBJECT_CAST (self)));
2193   }
2194 }