aggregator: Push EOS on error return.
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
1 /* GStreamer aggregator base class
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @short_description: manages a set of pads with the purpose of
25  * aggregating their buffers.
26  * @see_also: gstcollectpads for historical reasons.
27  *
28  * Manages a set of pads with the purpose of aggregating their buffers.
29  * Control is given to the subclass when all pads have data.
30  * <itemizedlist>
31  *  <listitem><para>
32  *    Base class for mixers and muxers. Subclasses should at least implement
33  *    the #GstAggregatorClass.aggregate() virtual method.
34  *  </para></listitem>
35  *  <listitem><para>
36  *    When data is queued on all pads, tha aggregate vmethod is called.
37  *  </para></listitem>
38  *  <listitem><para>
39  *    One can peek at the data on any given GstAggregatorPad with the
40  *    gst_aggregator_pad_get_buffer () method, and take ownership of it
41  *    with the gst_aggregator_pad_steal_buffer () method. When a buffer
42  *    has been taken with steal_buffer (), a new buffer can be queued
43  *    on that pad.
44  *  </para></listitem>
45  *  <listitem><para>
46  *    If the subclass wishes to push a buffer downstream in its aggregate
47  *    implementation, it should do so through the
48  *    gst_aggregator_finish_buffer () method. This method will take care
49  *    of sending and ordering mandatory events such as stream start, caps
50  *    and segment.
51  *  </para></listitem>
52  *  <listitem><para>
53  *    Same goes for EOS events, which should not be pushed directly by the
54  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
55  *    implementation.
56  *  </para></listitem>
57  *  <listitem><para>
58  *    Note that the aggregator logic regarding gap event handling is to turn
59  *    these into gap buffers with matching PTS and duration. It will also
60  *    flag these buffers with GST_BUFFER_FLAG_GAP and GST_BUFFER_FLAG_DROPPABLE
61  *    to ease their identification and subsequent processing.
62  *  </para></listitem>
63  * </itemizedlist>
64  */
65
66 #ifdef HAVE_CONFIG_H
67 #  include "config.h"
68 #endif
69
70 #include <string.h>             /* strlen */
71
72 #include "gstaggregator.h"
73
74
75 /*  Might become API */
76 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
77     const GstTagList * tags, GstTagMergeMode mode);
78 static void gst_aggregator_set_latency_property (GstAggregator * agg,
79     gint64 latency);
80 static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
81
82
83 /* Locking order, locks in this element must always be taken in this order
84  *
85  * standard sink pad stream lock -> GST_PAD_STREAM_LOCK (aggpad)
86  * Aggregator pad flush lock -> PAD_FLUSH_LOCK(aggpad)
87  * standard src pad stream lock -> GST_PAD_STREAM_LOCK (srcpad)
88  * Aggregator src lock -> SRC_LOCK(agg) w/ SRC_WAIT/BROADCAST
89  * standard element object lock -> GST_OBJECT_LOCK(agg)
90  * Aggregator pad lock -> PAD_LOCK (aggpad) w/ PAD_WAIT/BROADCAST_EVENT(aggpad)
91  * standard src pad object lock -> GST_OBJECT_LOCK(srcpad)
92  * standard sink pad object lock -> GST_OBJECT_LOCK(aggpad)
93  */
94
95
96 static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
97
98 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
99 #define GST_CAT_DEFAULT aggregator_debug
100
101 /* GstAggregatorPad definitions */
102 #define PAD_LOCK(pad)   G_STMT_START {                                  \
103   GST_TRACE_OBJECT (pad, "Taking PAD lock from thread %p",              \
104         g_thread_self());                                               \
105   g_mutex_lock(&pad->priv->lock);                                       \
106   GST_TRACE_OBJECT (pad, "Took PAD lock from thread %p",                \
107         g_thread_self());                                               \
108   } G_STMT_END
109
110 #define PAD_UNLOCK(pad)  G_STMT_START {                                 \
111   GST_TRACE_OBJECT (pad, "Releasing PAD lock from thread %p",           \
112       g_thread_self());                                                 \
113   g_mutex_unlock(&pad->priv->lock);                                     \
114   GST_TRACE_OBJECT (pad, "Release PAD lock from thread %p",             \
115         g_thread_self());                                               \
116   } G_STMT_END
117
118
119 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
120   GST_LOG_OBJECT (pad, "Waiting for buffer to be consumed thread %p",   \
121         g_thread_self());                                               \
122   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),           \
123       (&((GstAggregatorPad*)pad)->priv->lock));                         \
124   GST_LOG_OBJECT (pad, "DONE Waiting for buffer to be consumed on thread %p", \
125         g_thread_self());                                               \
126   } G_STMT_END
127
128 #define PAD_BROADCAST_EVENT(pad) G_STMT_START {                        \
129   GST_LOG_OBJECT (pad, "Signaling buffer consumed from thread %p",     \
130         g_thread_self());                                              \
131   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond));    \
132   } G_STMT_END
133
134
135 #define PAD_FLUSH_LOCK(pad)     G_STMT_START {                          \
136   GST_TRACE_OBJECT (pad, "Taking lock from thread %p",                  \
137         g_thread_self());                                               \
138   g_mutex_lock(&pad->priv->flush_lock);                                 \
139   GST_TRACE_OBJECT (pad, "Took lock from thread %p",                    \
140         g_thread_self());                                               \
141   } G_STMT_END
142
143 #define PAD_FLUSH_UNLOCK(pad)   G_STMT_START {                          \
144   GST_TRACE_OBJECT (pad, "Releasing lock from thread %p",               \
145         g_thread_self());                                               \
146   g_mutex_unlock(&pad->priv->flush_lock);                               \
147   GST_TRACE_OBJECT (pad, "Release lock from thread %p",                 \
148         g_thread_self());                                               \
149   } G_STMT_END
150
151 #define SRC_LOCK(self)   G_STMT_START {                             \
152   GST_TRACE_OBJECT (self, "Taking src lock from thread %p",         \
153       g_thread_self());                                             \
154   g_mutex_lock(&self->priv->src_lock);                              \
155   GST_TRACE_OBJECT (self, "Took src lock from thread %p",           \
156         g_thread_self());                                           \
157   } G_STMT_END
158
159 #define SRC_UNLOCK(self)  G_STMT_START {                            \
160   GST_TRACE_OBJECT (self, "Releasing src lock from thread %p",      \
161         g_thread_self());                                           \
162   g_mutex_unlock(&self->priv->src_lock);                            \
163   GST_TRACE_OBJECT (self, "Released src lock from thread %p",       \
164         g_thread_self());                                           \
165   } G_STMT_END
166
167 #define SRC_WAIT(self) G_STMT_START {                               \
168   GST_LOG_OBJECT (self, "Waiting for src on thread %p",             \
169         g_thread_self());                                           \
170   g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));    \
171   GST_LOG_OBJECT (self, "DONE Waiting for src on thread %p",        \
172         g_thread_self());                                           \
173   } G_STMT_END
174
175 #define SRC_BROADCAST(self) G_STMT_START {                          \
176     GST_LOG_OBJECT (self, "Signaling src from thread %p",           \
177         g_thread_self());                                           \
178     if (self->priv->aggregate_id)                                   \
179       gst_clock_id_unschedule (self->priv->aggregate_id);           \
180     g_cond_broadcast(&(self->priv->src_cond));                      \
181   } G_STMT_END
182
183 struct _GstAggregatorPadPrivate
184 {
185   /* Following fields are protected by the PAD_LOCK */
186   GstFlowReturn flow_return;
187   gboolean pending_flush_start;
188   gboolean pending_flush_stop;
189   gboolean pending_eos;
190
191   GstBuffer *buffer;
192   gboolean eos;
193
194   GMutex lock;
195   GCond event_cond;
196   /* This lock prevents a flush start processing happening while
197    * the chain function is also happening.
198    */
199   GMutex flush_lock;
200 };
201
202 static gboolean
203 gst_aggregator_pad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
204 {
205   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
206
207   PAD_LOCK (aggpad);
208   aggpad->priv->pending_eos = FALSE;
209   aggpad->priv->eos = FALSE;
210   aggpad->priv->flow_return = GST_FLOW_OK;
211   PAD_UNLOCK (aggpad);
212
213   if (klass->flush)
214     return klass->flush (aggpad, agg);
215
216   return TRUE;
217 }
218
219 /*************************************
220  * GstAggregator implementation  *
221  *************************************/
222 static GstElementClass *aggregator_parent_class = NULL;
223
224 /* All members are protected by the object lock unless otherwise noted */
225
226 struct _GstAggregatorPrivate
227 {
228   gint padcount;
229
230   /* Our state is >= PAUSED */
231   gboolean running;             /* protected by src_lock */
232
233   gint seqnum;
234   gboolean send_stream_start;   /* protected by srcpad stream lock */
235   gboolean send_segment;
236   gboolean flush_seeking;
237   gboolean pending_flush_start;
238   gboolean send_eos;            /* protected by srcpad stream lock */
239
240   GstCaps *srccaps;             /* protected by the srcpad stream lock */
241
242   GstTagList *tags;
243   gboolean tags_changed;
244
245   gboolean peer_latency_live;   /* protected by src_lock */
246   GstClockTime peer_latency_min;        /* protected by src_lock */
247   GstClockTime peer_latency_max;        /* protected by src_lock */
248   gboolean has_peer_latency;
249
250   GstClockTime sub_latency_min; /* protected by src_lock */
251   GstClockTime sub_latency_max; /* protected by src_lock */
252
253   /* aggregate */
254   GstClockID aggregate_id;      /* protected by src_lock */
255   GMutex src_lock;
256   GCond src_cond;
257
258   /* properties */
259   gint64 latency;
260 };
261
262 typedef struct
263 {
264   GstEvent *event;
265   gboolean result;
266   gboolean flush;
267
268   gboolean one_actually_seeked;
269 } EventData;
270
271 #define DEFAULT_LATENCY        0
272
273 enum
274 {
275   PROP_0,
276   PROP_LATENCY,
277   PROP_LAST
278 };
279
280 /**
281  * gst_aggregator_iterate_sinkpads:
282  * @self: The #GstAggregator
283  * @func: (scope call): The function to call.
284  * @user_data: (closure): The data to pass to @func.
285  *
286  * Iterate the sinkpads of aggregator to call a function on them.
287  *
288  * This method guarantees that @func will be called only once for each
289  * sink pad.
290  */
291 gboolean
292 gst_aggregator_iterate_sinkpads (GstAggregator * self,
293     GstAggregatorPadForeachFunc func, gpointer user_data)
294 {
295   gboolean result = FALSE;
296   GstIterator *iter;
297   gboolean done = FALSE;
298   GValue item = { 0, };
299   GList *seen_pads = NULL;
300
301   iter = gst_element_iterate_sink_pads (GST_ELEMENT (self));
302
303   if (!iter)
304     goto no_iter;
305
306   while (!done) {
307     switch (gst_iterator_next (iter, &item)) {
308       case GST_ITERATOR_OK:
309       {
310         GstAggregatorPad *pad;
311
312         pad = g_value_get_object (&item);
313
314         /* if already pushed, skip. FIXME, find something faster to tag pads */
315         if (pad == NULL || g_list_find (seen_pads, pad)) {
316           g_value_reset (&item);
317           break;
318         }
319
320         GST_LOG_OBJECT (pad, "calling function %s on pad",
321             GST_DEBUG_FUNCPTR_NAME (func));
322
323         result = func (self, pad, user_data);
324
325         done = !result;
326
327         seen_pads = g_list_prepend (seen_pads, pad);
328
329         g_value_reset (&item);
330         break;
331       }
332       case GST_ITERATOR_RESYNC:
333         gst_iterator_resync (iter);
334         break;
335       case GST_ITERATOR_ERROR:
336         GST_ERROR_OBJECT (self,
337             "Could not iterate over internally linked pads");
338         done = TRUE;
339         break;
340       case GST_ITERATOR_DONE:
341         done = TRUE;
342         break;
343     }
344   }
345   g_value_unset (&item);
346   gst_iterator_free (iter);
347
348   if (seen_pads == NULL) {
349     GST_DEBUG_OBJECT (self, "No pad seen");
350     return FALSE;
351   }
352
353   g_list_free (seen_pads);
354
355 no_iter:
356   return result;
357 }
358
359 static gboolean
360 gst_aggregator_check_pads_ready (GstAggregator * self)
361 {
362   GstAggregatorPad *pad;
363   GList *l, *sinkpads;
364
365   GST_LOG_OBJECT (self, "checking pads");
366
367   GST_OBJECT_LOCK (self);
368
369   sinkpads = GST_ELEMENT_CAST (self)->sinkpads;
370   if (sinkpads == NULL)
371     goto no_sinkpads;
372
373   for (l = sinkpads; l != NULL; l = l->next) {
374     pad = l->data;
375
376     PAD_LOCK (pad);
377     if (pad->priv->buffer == NULL && !pad->priv->eos) {
378       PAD_UNLOCK (pad);
379       goto pad_not_ready;
380     }
381     PAD_UNLOCK (pad);
382
383   }
384
385   GST_OBJECT_UNLOCK (self);
386   GST_LOG_OBJECT (self, "pads are ready");
387   return TRUE;
388
389 no_sinkpads:
390   {
391     GST_LOG_OBJECT (self, "pads not ready: no sink pads");
392     GST_OBJECT_UNLOCK (self);
393     return FALSE;
394   }
395 pad_not_ready:
396   {
397     GST_LOG_OBJECT (pad, "pad not ready to be aggregated yet");
398     GST_OBJECT_UNLOCK (self);
399     return FALSE;
400   }
401 }
402
403 static void
404 gst_aggregator_reset_flow_values (GstAggregator * self)
405 {
406   GST_OBJECT_LOCK (self);
407   self->priv->send_stream_start = TRUE;
408   self->priv->send_segment = TRUE;
409   gst_segment_init (&self->segment, GST_FORMAT_TIME);
410   GST_OBJECT_UNLOCK (self);
411 }
412
413 static inline void
414 gst_aggregator_push_mandatory_events (GstAggregator * self)
415 {
416   GstAggregatorPrivate *priv = self->priv;
417   GstEvent *segment = NULL;
418   GstEvent *tags = NULL;
419
420   if (self->priv->send_stream_start) {
421     gchar s_id[32];
422
423     GST_INFO_OBJECT (self, "pushing stream start");
424     /* stream-start (FIXME: create id based on input ids) */
425     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
426     if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) {
427       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
428     }
429     self->priv->send_stream_start = FALSE;
430   }
431
432   if (self->priv->srccaps) {
433
434     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
435         self->priv->srccaps);
436     if (!gst_pad_push_event (self->srcpad,
437             gst_event_new_caps (self->priv->srccaps))) {
438       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
439     }
440     gst_caps_unref (self->priv->srccaps);
441     self->priv->srccaps = NULL;
442   }
443
444   GST_OBJECT_LOCK (self);
445   if (self->priv->send_segment && !self->priv->flush_seeking) {
446     segment = gst_event_new_segment (&self->segment);
447
448     if (!self->priv->seqnum)
449       self->priv->seqnum = gst_event_get_seqnum (segment);
450     else
451       gst_event_set_seqnum (segment, self->priv->seqnum);
452     self->priv->send_segment = FALSE;
453
454     GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segment);
455   }
456
457   if (priv->tags && priv->tags_changed && !self->priv->flush_seeking) {
458     tags = gst_event_new_tag (gst_tag_list_ref (priv->tags));
459     priv->tags_changed = FALSE;
460   }
461   GST_OBJECT_UNLOCK (self);
462
463   if (segment)
464     gst_pad_push_event (self->srcpad, segment);
465   if (tags)
466     gst_pad_push_event (self->srcpad, tags);
467
468 }
469
470 /**
471  * gst_aggregator_set_src_caps:
472  * @self: The #GstAggregator
473  * @caps: The #GstCaps to set on the src pad.
474  *
475  * Sets the caps to be used on the src pad.
476  */
477 void
478 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
479 {
480   GST_PAD_STREAM_LOCK (self->srcpad);
481   gst_caps_replace (&self->priv->srccaps, caps);
482   gst_aggregator_push_mandatory_events (self);
483   GST_PAD_STREAM_UNLOCK (self->srcpad);
484 }
485
486 /**
487  * gst_aggregator_finish_buffer:
488  * @self: The #GstAggregator
489  * @buffer: (transfer full): the #GstBuffer to push.
490  *
491  * This method will push the provided output buffer downstream. If needed,
492  * mandatory events such as stream-start, caps, and segment events will be
493  * sent before pushing the buffer.
494  */
495 GstFlowReturn
496 gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
497 {
498   gst_aggregator_push_mandatory_events (self);
499
500   GST_OBJECT_LOCK (self);
501   if (!self->priv->flush_seeking && gst_pad_is_active (self->srcpad)) {
502     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
503     GST_OBJECT_UNLOCK (self);
504     return gst_pad_push (self->srcpad, buffer);
505   } else {
506     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
507         self->priv->flush_seeking, gst_pad_is_active (self->srcpad));
508     GST_OBJECT_UNLOCK (self);
509     gst_buffer_unref (buffer);
510     return GST_FLOW_OK;
511   }
512 }
513
514 static void
515 gst_aggregator_push_eos (GstAggregator * self)
516 {
517   GstEvent *event;
518   gst_aggregator_push_mandatory_events (self);
519
520   event = gst_event_new_eos ();
521
522   GST_OBJECT_LOCK (self);
523   self->priv->send_eos = FALSE;
524   gst_event_set_seqnum (event, self->priv->seqnum);
525   GST_OBJECT_UNLOCK (self);
526
527   gst_pad_push_event (self->srcpad, event);
528 }
529
530 static GstClockTime
531 gst_aggregator_get_next_time (GstAggregator * self)
532 {
533   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
534
535   if (klass->get_next_time)
536     return klass->get_next_time (self);
537
538   return GST_CLOCK_TIME_NONE;
539 }
540
541 static gboolean
542 gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout)
543 {
544   GstClockTime latency;
545   GstClockTime start;
546   gboolean res;
547
548   *timeout = FALSE;
549
550   SRC_LOCK (self);
551
552   latency = gst_aggregator_get_latency_unlocked (self);
553
554   if (gst_aggregator_check_pads_ready (self)) {
555     GST_DEBUG_OBJECT (self, "all pads have data");
556     SRC_UNLOCK (self);
557
558     return TRUE;
559   }
560
561   /* Before waiting, check if we're actually still running */
562   if (!self->priv->running || !self->priv->send_eos) {
563     SRC_UNLOCK (self);
564
565     return FALSE;
566   }
567
568   start = gst_aggregator_get_next_time (self);
569
570   if (!GST_CLOCK_TIME_IS_VALID (latency) ||
571       !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self)) ||
572       !GST_CLOCK_TIME_IS_VALID (start)) {
573     /* We wake up here when something happened, and below
574      * then check if we're ready now. If we return FALSE,
575      * we will be directly called again.
576      */
577     SRC_WAIT (self);
578   } else {
579     GstClockTime base_time, time;
580     GstClock *clock;
581     GstClockReturn status;
582     GstClockTimeDiff jitter;
583
584     GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
585         GST_TIME_ARGS (start));
586
587     GST_OBJECT_LOCK (self);
588     base_time = GST_ELEMENT_CAST (self)->base_time;
589     clock = GST_ELEMENT_CLOCK (self);
590     if (clock)
591       gst_object_ref (clock);
592     GST_OBJECT_UNLOCK (self);
593
594     time = base_time + start;
595     time += latency;
596
597     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
598         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
599         " latency %" GST_TIME_FORMAT " current %" GST_TIME_FORMAT ")",
600         GST_TIME_ARGS (time),
601         GST_TIME_ARGS (GST_ELEMENT_CAST (self)->base_time),
602         GST_TIME_ARGS (start), GST_TIME_ARGS (latency),
603         GST_TIME_ARGS (gst_clock_get_time (clock)));
604
605
606     self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
607     gst_object_unref (clock);
608     SRC_UNLOCK (self);
609
610     jitter = 0;
611     status = gst_clock_id_wait (self->priv->aggregate_id, &jitter);
612
613     SRC_LOCK (self);
614     if (self->priv->aggregate_id) {
615       gst_clock_id_unref (self->priv->aggregate_id);
616       self->priv->aggregate_id = NULL;
617     }
618
619     GST_DEBUG_OBJECT (self,
620         "clock returned %d (jitter: %s%" GST_TIME_FORMAT ")",
621         status, (jitter < 0 ? "-" : " "),
622         GST_TIME_ARGS ((jitter < 0 ? -jitter : jitter)));
623
624     /* we timed out */
625     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
626       SRC_UNLOCK (self);
627       *timeout = TRUE;
628       return TRUE;
629     }
630   }
631
632   res = gst_aggregator_check_pads_ready (self);
633   SRC_UNLOCK (self);
634
635   return res;
636 }
637
638 static void
639 gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
640     GstFlowReturn flow_return)
641 {
642   PAD_LOCK (aggpad);
643   if (flow_return == GST_FLOW_NOT_LINKED)
644     aggpad->priv->flow_return = MIN (flow_return, aggpad->priv->flow_return);
645   else
646     aggpad->priv->flow_return = flow_return;
647   gst_buffer_replace (&aggpad->priv->buffer, NULL);
648   PAD_BROADCAST_EVENT (aggpad);
649   PAD_UNLOCK (aggpad);
650 }
651
652 static void
653 gst_aggregator_aggregate_func (GstAggregator * self)
654 {
655   GstAggregatorPrivate *priv = self->priv;
656   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
657   gboolean timeout = FALSE;
658
659   if (self->priv->running == FALSE) {
660     GST_DEBUG_OBJECT (self, "Not running anymore");
661     return;
662   }
663
664   GST_LOG_OBJECT (self, "Checking aggregate");
665   while (priv->send_eos && priv->running) {
666     GstFlowReturn flow_return;
667
668     if (!gst_aggregator_wait_and_check (self, &timeout))
669       continue;
670
671     GST_TRACE_OBJECT (self, "Actually aggregating!");
672
673     flow_return = klass->aggregate (self, timeout);
674
675     GST_OBJECT_LOCK (self);
676     if (flow_return == GST_FLOW_FLUSHING && priv->flush_seeking) {
677       /* We don't want to set the pads to flushing, but we want to
678        * stop the thread, so just break here */
679       GST_OBJECT_UNLOCK (self);
680       break;
681     }
682     GST_OBJECT_UNLOCK (self);
683
684     if (flow_return == GST_FLOW_EOS || flow_return == GST_FLOW_ERROR) {
685       gst_aggregator_push_eos (self);
686     }
687
688     GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
689
690     if (flow_return != GST_FLOW_OK) {
691       GList *item;
692
693       GST_OBJECT_LOCK (self);
694       for (item = GST_ELEMENT (self)->sinkpads; item; item = item->next) {
695         GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (item->data);
696
697         gst_aggregator_pad_set_flushing (aggpad, flow_return);
698       }
699       GST_OBJECT_UNLOCK (self);
700       break;
701     }
702   }
703
704   /* Pause the task here, the only ways to get here are:
705    * 1) We're stopping, in which case the task is stopped anyway
706    * 2) We got a flow error above, in which case it might take
707    *    some time to forward the flow return upstream and we
708    *    would otherwise call the task function over and over
709    *    again without doing anything
710    */
711   gst_pad_pause_task (self->srcpad);
712 }
713
714 static gboolean
715 gst_aggregator_start (GstAggregator * self)
716 {
717   GstAggregatorClass *klass;
718   gboolean result;
719
720   self->priv->running = TRUE;
721   self->priv->send_stream_start = TRUE;
722   self->priv->send_segment = TRUE;
723   self->priv->send_eos = TRUE;
724   self->priv->srccaps = NULL;
725
726   klass = GST_AGGREGATOR_GET_CLASS (self);
727
728   if (klass->start)
729     result = klass->start (self);
730   else
731     result = TRUE;
732
733   return result;
734 }
735
736 static gboolean
737 _check_pending_flush_stop (GstAggregatorPad * pad)
738 {
739   gboolean res;
740
741   PAD_LOCK (pad);
742   res = (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
743   PAD_UNLOCK (pad);
744
745   return res;
746 }
747
748 static gboolean
749 gst_aggregator_stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
750 {
751   gboolean res = TRUE;
752
753   GST_INFO_OBJECT (self, "%s srcpad task",
754       flush_start ? "Pausing" : "Stopping");
755
756   SRC_LOCK (self);
757   self->priv->running = FALSE;
758   SRC_BROADCAST (self);
759   SRC_UNLOCK (self);
760
761   if (flush_start) {
762     res = gst_pad_push_event (self->srcpad, flush_start);
763   }
764
765   gst_pad_stop_task (self->srcpad);
766
767   return res;
768 }
769
770 static void
771 gst_aggregator_start_srcpad_task (GstAggregator * self)
772 {
773   GST_INFO_OBJECT (self, "Starting srcpad task");
774
775   self->priv->running = TRUE;
776   gst_pad_start_task (GST_PAD (self->srcpad),
777       (GstTaskFunction) gst_aggregator_aggregate_func, self, NULL);
778 }
779
780 static GstFlowReturn
781 gst_aggregator_flush (GstAggregator * self)
782 {
783   GstFlowReturn ret = GST_FLOW_OK;
784   GstAggregatorPrivate *priv = self->priv;
785   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
786
787   GST_DEBUG_OBJECT (self, "Flushing everything");
788   GST_OBJECT_LOCK (self);
789   priv->send_segment = TRUE;
790   priv->flush_seeking = FALSE;
791   priv->tags_changed = FALSE;
792   GST_OBJECT_UNLOCK (self);
793   if (klass->flush)
794     ret = klass->flush (self);
795
796   return ret;
797 }
798
799
800 /* Called with GstAggregator's object lock held */
801
802 static gboolean
803 gst_aggregator_all_flush_stop_received_locked (GstAggregator * self)
804 {
805   GList *tmp;
806   GstAggregatorPad *tmppad;
807
808   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
809     tmppad = (GstAggregatorPad *) tmp->data;
810
811     if (_check_pending_flush_stop (tmppad) == FALSE) {
812       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
813           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
814       return FALSE;
815     }
816   }
817
818   return TRUE;
819 }
820
821 static void
822 gst_aggregator_flush_start (GstAggregator * self, GstAggregatorPad * aggpad,
823     GstEvent * event)
824 {
825   GstAggregatorPrivate *priv = self->priv;
826   GstAggregatorPadPrivate *padpriv = aggpad->priv;
827
828   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
829
830   PAD_FLUSH_LOCK (aggpad);
831   PAD_LOCK (aggpad);
832   if (padpriv->pending_flush_start) {
833     GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
834
835     padpriv->pending_flush_start = FALSE;
836     padpriv->pending_flush_stop = TRUE;
837   }
838   PAD_UNLOCK (aggpad);
839
840   GST_OBJECT_LOCK (self);
841   if (priv->flush_seeking) {
842     /* If flush_seeking we forward the first FLUSH_START */
843     if (priv->pending_flush_start) {
844       priv->pending_flush_start = FALSE;
845       GST_OBJECT_UNLOCK (self);
846
847       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
848       gst_aggregator_stop_srcpad_task (self, event);
849
850       GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
851       GST_PAD_STREAM_LOCK (self->srcpad);
852       GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
853       event = NULL;
854     } else {
855       GST_OBJECT_UNLOCK (self);
856       gst_event_unref (event);
857     }
858   } else {
859     GST_OBJECT_UNLOCK (self);
860     gst_event_unref (event);
861   }
862   PAD_FLUSH_UNLOCK (aggpad);
863
864   gst_aggregator_pad_drop_buffer (aggpad);
865 }
866
867 /* GstAggregator vmethods default implementations */
868 static gboolean
869 gst_aggregator_default_sink_event (GstAggregator * self,
870     GstAggregatorPad * aggpad, GstEvent * event)
871 {
872   gboolean res = TRUE;
873   GstPad *pad = GST_PAD (aggpad);
874   GstAggregatorPrivate *priv = self->priv;
875
876   switch (GST_EVENT_TYPE (event)) {
877     case GST_EVENT_FLUSH_START:
878     {
879       gst_aggregator_flush_start (self, aggpad, event);
880       /* We forward only in one case: right after flush_seeking */
881       event = NULL;
882       goto eat;
883     }
884     case GST_EVENT_FLUSH_STOP:
885     {
886       GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP");
887
888       gst_aggregator_pad_flush (aggpad, self);
889       GST_OBJECT_LOCK (self);
890       if (priv->flush_seeking) {
891         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
892         if (gst_aggregator_all_flush_stop_received_locked (self)) {
893           GST_OBJECT_UNLOCK (self);
894           /* That means we received FLUSH_STOP/FLUSH_STOP on
895            * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
896           gst_aggregator_flush (self);
897           gst_pad_push_event (self->srcpad, event);
898           event = NULL;
899           SRC_LOCK (self);
900           priv->send_eos = TRUE;
901           SRC_BROADCAST (self);
902           SRC_UNLOCK (self);
903
904           GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
905           GST_PAD_STREAM_UNLOCK (self->srcpad);
906           gst_aggregator_start_srcpad_task (self);
907         } else {
908           GST_OBJECT_UNLOCK (self);
909         }
910       } else {
911         GST_OBJECT_UNLOCK (self);
912       }
913
914       /* We never forward the event */
915       goto eat;
916     }
917     case GST_EVENT_EOS:
918     {
919       GST_DEBUG_OBJECT (aggpad, "EOS");
920
921       /* We still have a buffer, and we don't want the subclass to have to
922        * check for it. Mark pending_eos, eos will be set when steal_buffer is
923        * called
924        */
925       SRC_LOCK (self);
926       PAD_LOCK (aggpad);
927       if (!aggpad->priv->buffer) {
928         aggpad->priv->eos = TRUE;
929       } else {
930         aggpad->priv->pending_eos = TRUE;
931       }
932       PAD_UNLOCK (aggpad);
933
934       SRC_BROADCAST (self);
935       SRC_UNLOCK (self);
936       goto eat;
937     }
938     case GST_EVENT_SEGMENT:
939     {
940       GST_OBJECT_LOCK (aggpad);
941       gst_event_copy_segment (event, &aggpad->segment);
942       GST_OBJECT_UNLOCK (aggpad);
943
944       GST_OBJECT_LOCK (self);
945       self->priv->seqnum = gst_event_get_seqnum (event);
946       GST_OBJECT_UNLOCK (self);
947       goto eat;
948     }
949     case GST_EVENT_STREAM_START:
950     {
951       goto eat;
952     }
953     case GST_EVENT_GAP:
954     {
955       GstClockTime pts;
956       GstClockTime duration;
957       GstBuffer *gapbuf;
958
959       gst_event_parse_gap (event, &pts, &duration);
960       gapbuf = gst_buffer_new ();
961
962       GST_BUFFER_PTS (gapbuf) = pts;
963       GST_BUFFER_DURATION (gapbuf) = duration;
964       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
965       GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
966
967       if (gst_pad_chain (pad, gapbuf) != GST_FLOW_OK) {
968         GST_WARNING_OBJECT (self, "Failed to chain gap buffer");
969         res = FALSE;
970       }
971
972       goto eat;
973     }
974     case GST_EVENT_TAG:
975     {
976       GstTagList *tags;
977
978       gst_event_parse_tag (event, &tags);
979
980       if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) {
981         gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE);
982         gst_event_unref (event);
983         event = NULL;
984         goto eat;
985       }
986       break;
987     }
988     default:
989     {
990       break;
991     }
992   }
993
994   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
995   return gst_pad_event_default (pad, GST_OBJECT (self), event);
996
997 eat:
998   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
999   if (event)
1000     gst_event_unref (event);
1001
1002   return res;
1003 }
1004
1005 static inline gboolean
1006 gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad,
1007     gpointer unused_udata)
1008 {
1009   gst_aggregator_pad_flush (pad, self);
1010
1011   return TRUE;
1012 }
1013
1014 static gboolean
1015 gst_aggregator_stop (GstAggregator * agg)
1016 {
1017   GstAggregatorClass *klass;
1018   gboolean result;
1019
1020   gst_aggregator_reset_flow_values (agg);
1021
1022   gst_aggregator_iterate_sinkpads (agg, gst_aggregator_stop_pad, NULL);
1023
1024   klass = GST_AGGREGATOR_GET_CLASS (agg);
1025
1026   if (klass->stop)
1027     result = klass->stop (agg);
1028   else
1029     result = TRUE;
1030
1031   agg->priv->has_peer_latency = FALSE;
1032   agg->priv->peer_latency_live = FALSE;
1033   agg->priv->peer_latency_min = agg->priv->peer_latency_max = FALSE;
1034
1035   if (agg->priv->tags)
1036     gst_tag_list_unref (agg->priv->tags);
1037   agg->priv->tags = NULL;
1038
1039   return result;
1040 }
1041
1042 /* GstElement vmethods implementations */
1043 static GstStateChangeReturn
1044 gst_aggregator_change_state (GstElement * element, GstStateChange transition)
1045 {
1046   GstStateChangeReturn ret;
1047   GstAggregator *self = GST_AGGREGATOR (element);
1048
1049   switch (transition) {
1050     case GST_STATE_CHANGE_READY_TO_PAUSED:
1051       if (!gst_aggregator_start (self))
1052         goto error_start;
1053       break;
1054     default:
1055       break;
1056   }
1057
1058   if ((ret =
1059           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
1060               transition)) == GST_STATE_CHANGE_FAILURE)
1061     goto failure;
1062
1063
1064   switch (transition) {
1065     case GST_STATE_CHANGE_PAUSED_TO_READY:
1066       if (!gst_aggregator_stop (self)) {
1067         /* What to do in this case? Error out? */
1068         GST_ERROR_OBJECT (self, "Subclass failed to stop.");
1069       }
1070       break;
1071     default:
1072       break;
1073   }
1074
1075   return ret;
1076
1077 /* ERRORS */
1078 failure:
1079   {
1080     GST_ERROR_OBJECT (element, "parent failed state change");
1081     return ret;
1082   }
1083 error_start:
1084   {
1085     GST_ERROR_OBJECT (element, "Subclass failed to start");
1086     return GST_STATE_CHANGE_FAILURE;
1087   }
1088 }
1089
1090 static void
1091 gst_aggregator_release_pad (GstElement * element, GstPad * pad)
1092 {
1093   GstAggregator *self = GST_AGGREGATOR (element);
1094   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1095
1096   GST_INFO_OBJECT (pad, "Removing pad");
1097
1098   SRC_LOCK (self);
1099   gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
1100   gst_element_remove_pad (element, pad);
1101
1102   SRC_BROADCAST (self);
1103   SRC_UNLOCK (self);
1104 }
1105
1106 static GstPad *
1107 gst_aggregator_request_new_pad (GstElement * element,
1108     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
1109 {
1110   GstAggregator *self;
1111   GstAggregatorPad *agg_pad;
1112
1113   GstElementClass *klass = GST_ELEMENT_GET_CLASS (element);
1114   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
1115
1116   self = GST_AGGREGATOR (element);
1117
1118   if (templ == gst_element_class_get_pad_template (klass, "sink_%u")) {
1119     gint serial = 0;
1120     gchar *name = NULL;
1121
1122     GST_OBJECT_LOCK (element);
1123     if (req_name == NULL || strlen (req_name) < 6
1124         || !g_str_has_prefix (req_name, "sink_")) {
1125       /* no name given when requesting the pad, use next available int */
1126       priv->padcount++;
1127     } else {
1128       /* parse serial number from requested padname */
1129       serial = g_ascii_strtoull (&req_name[5], NULL, 10);
1130       if (serial >= priv->padcount)
1131         priv->padcount = serial;
1132     }
1133
1134     name = g_strdup_printf ("sink_%u", priv->padcount);
1135     agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
1136         "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
1137     g_free (name);
1138
1139     GST_OBJECT_UNLOCK (element);
1140
1141   } else {
1142     return NULL;
1143   }
1144
1145   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
1146
1147   if (priv->running)
1148     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
1149
1150   /* add the pad to the element */
1151   gst_element_add_pad (element, GST_PAD (agg_pad));
1152
1153   return GST_PAD (agg_pad);
1154 }
1155
1156 /* Must be called with SRC_LOCK held */
1157
1158 static gboolean
1159 gst_aggregator_query_latency_unlocked (GstAggregator * self, GstQuery * query)
1160 {
1161   gboolean query_ret, live;
1162   GstClockTime our_latency, min, max;
1163
1164   query_ret = gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1165
1166   if (!query_ret) {
1167     GST_WARNING_OBJECT (self, "Latency query failed");
1168     return FALSE;
1169   }
1170
1171   gst_query_parse_latency (query, &live, &min, &max);
1172
1173   our_latency = self->priv->latency;
1174
1175   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (min))) {
1176     GST_ERROR_OBJECT (self, "Invalid minimum latency %" GST_TIME_FORMAT
1177         ". Please file a bug at " PACKAGE_BUGREPORT ".", GST_TIME_ARGS (min));
1178     return FALSE;
1179   }
1180
1181   if (min > max && GST_CLOCK_TIME_IS_VALID (max)) {
1182     GST_ELEMENT_WARNING (self, CORE, CLOCK, (NULL),
1183         ("Impossible to configure latency: max %" GST_TIME_FORMAT " < min %"
1184             GST_TIME_FORMAT ". Add queues or other buffering elements.",
1185             GST_TIME_ARGS (max), GST_TIME_ARGS (min)));
1186     return FALSE;
1187   }
1188
1189   self->priv->peer_latency_live = live;
1190   self->priv->peer_latency_min = min;
1191   self->priv->peer_latency_max = max;
1192   self->priv->has_peer_latency = TRUE;
1193
1194   /* add our own */
1195   min += our_latency;
1196   min += self->priv->sub_latency_min;
1197   if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1198       && GST_CLOCK_TIME_IS_VALID (max))
1199     max += self->priv->sub_latency_max;
1200   else
1201     max = GST_CLOCK_TIME_NONE;
1202
1203   if (live && min > max) {
1204     GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
1205         ("%s", "Latency too big"),
1206         ("The requested latency value is too big for the current pipeline. "
1207             "Limiting to %" G_GINT64_FORMAT, max));
1208     min = max;
1209     /* FIXME: This could in theory become negative, but in
1210      * that case all is lost anyway */
1211     self->priv->latency -= min - max;
1212     /* FIXME: shouldn't we g_object_notify() the change here? */
1213   }
1214
1215   SRC_BROADCAST (self);
1216
1217   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
1218       " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
1219
1220   gst_query_set_latency (query, live, min, max);
1221
1222   return query_ret;
1223 }
1224
1225 /*
1226  * MUST be called with the src_lock held.
1227  *
1228  * See  gst_aggregator_get_latency() for doc
1229  */
1230 static GstClockTime
1231 gst_aggregator_get_latency_unlocked (GstAggregator * self)
1232 {
1233   GstClockTime latency;
1234
1235   g_return_val_if_fail (GST_IS_AGGREGATOR (self), 0);
1236
1237   if (!self->priv->has_peer_latency) {
1238     GstQuery *query = gst_query_new_latency ();
1239     gboolean ret;
1240
1241     ret = gst_aggregator_query_latency_unlocked (self, query);
1242     gst_query_unref (query);
1243     if (!ret)
1244       return GST_CLOCK_TIME_NONE;
1245   }
1246
1247   if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
1248     return GST_CLOCK_TIME_NONE;
1249
1250   /* latency_min is never GST_CLOCK_TIME_NONE by construction */
1251   latency = self->priv->peer_latency_min;
1252
1253   /* add our own */
1254   latency += self->priv->latency;
1255   latency += self->priv->sub_latency_min;
1256
1257   return latency;
1258 }
1259
1260 /**
1261  * gst_aggregator_get_latency:
1262  * @self: a #GstAggregator
1263  *
1264  * Retrieves the latency values reported by @self in response to the latency
1265  * query, or %GST_CLOCK_TIME_NONE if there is not live source connected and the element
1266  * will not wait for the clock.
1267  *
1268  * Typically only called by subclasses.
1269  *
1270  * Returns: The latency or %GST_CLOCK_TIME_NONE if the element does not sync
1271  */
1272 GstClockTime
1273 gst_aggregator_get_latency (GstAggregator * self)
1274 {
1275   GstClockTime ret;
1276
1277   SRC_LOCK (self);
1278   ret = gst_aggregator_get_latency_unlocked (self);
1279   SRC_UNLOCK (self);
1280
1281   return ret;
1282 }
1283
1284 static gboolean
1285 gst_aggregator_send_event (GstElement * element, GstEvent * event)
1286 {
1287   GstAggregator *self = GST_AGGREGATOR (element);
1288
1289   GST_STATE_LOCK (element);
1290   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
1291       GST_STATE (element) < GST_STATE_PAUSED) {
1292     gdouble rate;
1293     GstFormat fmt;
1294     GstSeekFlags flags;
1295     GstSeekType start_type, stop_type;
1296     gint64 start, stop;
1297
1298     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1299         &start, &stop_type, &stop);
1300
1301     GST_OBJECT_LOCK (self);
1302     gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1303         stop_type, stop, NULL);
1304     self->priv->seqnum = gst_event_get_seqnum (event);
1305     GST_OBJECT_UNLOCK (self);
1306
1307     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
1308   }
1309   GST_STATE_UNLOCK (element);
1310
1311
1312   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
1313       event);
1314 }
1315
1316 static gboolean
1317 gst_aggregator_default_src_query (GstAggregator * self, GstQuery * query)
1318 {
1319   gboolean res = TRUE;
1320
1321   switch (GST_QUERY_TYPE (query)) {
1322     case GST_QUERY_SEEKING:
1323     {
1324       GstFormat format;
1325
1326       /* don't pass it along as some (file)sink might claim it does
1327        * whereas with a collectpads in between that will not likely work */
1328       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1329       gst_query_set_seeking (query, format, FALSE, 0, -1);
1330       res = TRUE;
1331
1332       break;
1333     }
1334     case GST_QUERY_LATENCY:
1335       SRC_LOCK (self);
1336       res = gst_aggregator_query_latency_unlocked (self, query);
1337       SRC_UNLOCK (self);
1338       break;
1339     default:
1340       return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1341   }
1342
1343   return res;
1344 }
1345
1346 static gboolean
1347 gst_aggregator_event_forward_func (GstPad * pad, gpointer user_data)
1348 {
1349   EventData *evdata = user_data;
1350   gboolean ret = TRUE;
1351   GstPad *peer = gst_pad_get_peer (pad);
1352   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1353
1354   if (peer) {
1355     ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
1356     GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
1357     gst_object_unref (peer);
1358   }
1359
1360   if (ret == FALSE) {
1361     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK)
1362       GST_ERROR_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
1363
1364     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
1365       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
1366
1367       if (gst_pad_query (peer, seeking)) {
1368         gboolean seekable;
1369
1370         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
1371
1372         if (seekable == FALSE) {
1373           GST_INFO_OBJECT (pad,
1374               "Source not seekable, We failed but it does not matter!");
1375
1376           ret = TRUE;
1377         }
1378       } else {
1379         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
1380       }
1381
1382       gst_query_unref (seeking);
1383     }
1384
1385     if (evdata->flush) {
1386       PAD_LOCK (aggpad);
1387       aggpad->priv->pending_flush_start = FALSE;
1388       aggpad->priv->pending_flush_stop = FALSE;
1389       PAD_UNLOCK (aggpad);
1390     }
1391   } else {
1392     evdata->one_actually_seeked = TRUE;
1393   }
1394
1395   evdata->result &= ret;
1396
1397   /* Always send to all pads */
1398   return FALSE;
1399 }
1400
1401 static EventData
1402 gst_aggregator_forward_event_to_all_sinkpads (GstAggregator * self,
1403     GstEvent * event, gboolean flush)
1404 {
1405   EventData evdata;
1406
1407   evdata.event = event;
1408   evdata.result = TRUE;
1409   evdata.flush = flush;
1410   evdata.one_actually_seeked = FALSE;
1411
1412   /* We first need to set all pads as flushing in a first pass
1413    * as flush_start flush_stop is sometimes sent synchronously
1414    * while we send the seek event */
1415   if (flush) {
1416     GList *l;
1417
1418     GST_OBJECT_LOCK (self);
1419     for (l = GST_ELEMENT_CAST (self)->sinkpads; l != NULL; l = l->next) {
1420       GstAggregatorPad *pad = l->data;
1421
1422       PAD_LOCK (pad);
1423       pad->priv->pending_flush_start = TRUE;
1424       pad->priv->pending_flush_stop = FALSE;
1425       PAD_UNLOCK (pad);
1426     }
1427     GST_OBJECT_UNLOCK (self);
1428   }
1429
1430   gst_pad_forward (self->srcpad, gst_aggregator_event_forward_func, &evdata);
1431
1432   gst_event_unref (event);
1433
1434   return evdata;
1435 }
1436
1437 static gboolean
1438 gst_aggregator_do_seek (GstAggregator * self, GstEvent * event)
1439 {
1440   gdouble rate;
1441   GstFormat fmt;
1442   GstSeekFlags flags;
1443   GstSeekType start_type, stop_type;
1444   gint64 start, stop;
1445   gboolean flush;
1446   EventData evdata;
1447   GstAggregatorPrivate *priv = self->priv;
1448
1449   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1450       &start, &stop_type, &stop);
1451
1452   GST_INFO_OBJECT (self, "starting SEEK");
1453
1454   flush = flags & GST_SEEK_FLAG_FLUSH;
1455
1456   GST_OBJECT_LOCK (self);
1457   if (flush) {
1458     priv->pending_flush_start = TRUE;
1459     priv->flush_seeking = TRUE;
1460   }
1461
1462   gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1463       stop_type, stop, NULL);
1464   GST_OBJECT_UNLOCK (self);
1465
1466   /* forward the seek upstream */
1467   evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, flush);
1468   event = NULL;
1469
1470   if (!evdata.result || !evdata.one_actually_seeked) {
1471     GST_OBJECT_LOCK (self);
1472     priv->flush_seeking = FALSE;
1473     priv->pending_flush_start = FALSE;
1474     GST_OBJECT_UNLOCK (self);
1475   }
1476
1477   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
1478
1479   return evdata.result;
1480 }
1481
1482 static gboolean
1483 gst_aggregator_default_src_event (GstAggregator * self, GstEvent * event)
1484 {
1485   EventData evdata;
1486   gboolean res = TRUE;
1487
1488   switch (GST_EVENT_TYPE (event)) {
1489     case GST_EVENT_SEEK:
1490     {
1491       gst_event_ref (event);
1492       res = gst_aggregator_do_seek (self, event);
1493       gst_event_unref (event);
1494       event = NULL;
1495       goto done;
1496     }
1497     case GST_EVENT_NAVIGATION:
1498     {
1499       /* navigation is rather pointless. */
1500       res = FALSE;
1501       gst_event_unref (event);
1502       goto done;
1503     }
1504     default:
1505     {
1506       break;
1507     }
1508   }
1509
1510   evdata = gst_aggregator_forward_event_to_all_sinkpads (self, event, FALSE);
1511   res = evdata.result;
1512
1513 done:
1514   return res;
1515 }
1516
1517 static gboolean
1518 gst_aggregator_src_pad_event_func (GstPad * pad, GstObject * parent,
1519     GstEvent * event)
1520 {
1521   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1522
1523   return klass->src_event (GST_AGGREGATOR (parent), event);
1524 }
1525
1526 static gboolean
1527 gst_aggregator_src_pad_query_func (GstPad * pad, GstObject * parent,
1528     GstQuery * query)
1529 {
1530   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1531
1532   return klass->src_query (GST_AGGREGATOR (parent), query);
1533 }
1534
1535 static gboolean
1536 gst_aggregator_src_pad_activate_mode_func (GstPad * pad,
1537     GstObject * parent, GstPadMode mode, gboolean active)
1538 {
1539   GstAggregator *self = GST_AGGREGATOR (parent);
1540   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1541
1542   if (klass->src_activate) {
1543     if (klass->src_activate (self, mode, active) == FALSE) {
1544       return FALSE;
1545     }
1546   }
1547
1548   if (active == TRUE) {
1549     switch (mode) {
1550       case GST_PAD_MODE_PUSH:
1551       {
1552         GST_INFO_OBJECT (pad, "Activating pad!");
1553         gst_aggregator_start_srcpad_task (self);
1554         return TRUE;
1555       }
1556       default:
1557       {
1558         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
1559         return FALSE;
1560       }
1561     }
1562   }
1563
1564   /* deactivating */
1565   GST_INFO_OBJECT (self, "Deactivating srcpad");
1566   gst_aggregator_stop_srcpad_task (self, FALSE);
1567
1568   return TRUE;
1569 }
1570
1571 static gboolean
1572 gst_aggregator_default_sink_query (GstAggregator * self,
1573     GstAggregatorPad * aggpad, GstQuery * query)
1574 {
1575   GstPad *pad = GST_PAD (aggpad);
1576
1577   return gst_pad_query_default (pad, GST_OBJECT (self), query);
1578 }
1579
1580 static void
1581 gst_aggregator_finalize (GObject * object)
1582 {
1583   GstAggregator *self = (GstAggregator *) object;
1584
1585   g_mutex_clear (&self->priv->src_lock);
1586   g_cond_clear (&self->priv->src_cond);
1587
1588   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
1589 }
1590
1591 /*
1592  * gst_aggregator_set_latency_property:
1593  * @agg: a #GstAggregator
1594  * @latency: the new latency value.
1595  *
1596  * Sets the new latency value to @latency. This value is used to limit the
1597  * amount of time a pad waits for data to appear before considering the pad
1598  * as unresponsive.
1599  */
1600 static void
1601 gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
1602 {
1603   gboolean changed;
1604   GstClockTime min, max;
1605
1606   g_return_if_fail (GST_IS_AGGREGATOR (self));
1607   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (latency));
1608
1609   SRC_LOCK (self);
1610   if (self->priv->peer_latency_live) {
1611     min = self->priv->peer_latency_min;
1612     max = self->priv->peer_latency_max;
1613     /* add our own */
1614     min += latency;
1615     min += self->priv->sub_latency_min;
1616     if (GST_CLOCK_TIME_IS_VALID (self->priv->sub_latency_max)
1617         && GST_CLOCK_TIME_IS_VALID (max))
1618       max += self->priv->sub_latency_max;
1619     else
1620       max = GST_CLOCK_TIME_NONE;
1621
1622     if (GST_CLOCK_TIME_IS_VALID (max) && min > max) {
1623       GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
1624           ("%s", "Latency too big"),
1625           ("The requested latency value is too big for the latency in the "
1626               "current pipeline.  Limiting to %" G_GINT64_FORMAT, max));
1627       /* FIXME: This could in theory become negative, but in
1628        * that case all is lost anyway */
1629       latency -= min - max;
1630       /* FIXME: shouldn't we g_object_notify() the change here? */
1631     }
1632   }
1633
1634   changed = (self->priv->latency != latency);
1635   self->priv->latency = latency;
1636
1637   if (changed)
1638     SRC_BROADCAST (self);
1639   SRC_UNLOCK (self);
1640
1641   if (changed)
1642     gst_element_post_message (GST_ELEMENT_CAST (self),
1643         gst_message_new_latency (GST_OBJECT_CAST (self)));
1644 }
1645
1646 /*
1647  * gst_aggregator_get_latency_property:
1648  * @agg: a #GstAggregator
1649  *
1650  * Gets the latency value. See gst_aggregator_set_latency for
1651  * more details.
1652  *
1653  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad 
1654  * before a pad is deemed unresponsive. A value of -1 means an
1655  * unlimited time.
1656  */
1657 static gint64
1658 gst_aggregator_get_latency_property (GstAggregator * agg)
1659 {
1660   gint64 res;
1661
1662   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
1663
1664   GST_OBJECT_LOCK (agg);
1665   res = agg->priv->latency;
1666   GST_OBJECT_UNLOCK (agg);
1667
1668   return res;
1669 }
1670
1671 static void
1672 gst_aggregator_set_property (GObject * object, guint prop_id,
1673     const GValue * value, GParamSpec * pspec)
1674 {
1675   GstAggregator *agg = GST_AGGREGATOR (object);
1676
1677   switch (prop_id) {
1678     case PROP_LATENCY:
1679       gst_aggregator_set_latency_property (agg, g_value_get_int64 (value));
1680       break;
1681     default:
1682       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1683       break;
1684   }
1685 }
1686
1687 static void
1688 gst_aggregator_get_property (GObject * object, guint prop_id,
1689     GValue * value, GParamSpec * pspec)
1690 {
1691   GstAggregator *agg = GST_AGGREGATOR (object);
1692
1693   switch (prop_id) {
1694     case PROP_LATENCY:
1695       g_value_set_int64 (value, gst_aggregator_get_latency_property (agg));
1696       break;
1697     default:
1698       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1699       break;
1700   }
1701 }
1702
1703 /* GObject vmethods implementations */
1704 static void
1705 gst_aggregator_class_init (GstAggregatorClass * klass)
1706 {
1707   GObjectClass *gobject_class = (GObjectClass *) klass;
1708   GstElementClass *gstelement_class = (GstElementClass *) klass;
1709
1710   aggregator_parent_class = g_type_class_peek_parent (klass);
1711   g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
1712
1713   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
1714       GST_DEBUG_FG_MAGENTA, "GstAggregator");
1715
1716   klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD;
1717
1718   klass->sink_event = gst_aggregator_default_sink_event;
1719   klass->sink_query = gst_aggregator_default_sink_query;
1720
1721   klass->src_event = gst_aggregator_default_src_event;
1722   klass->src_query = gst_aggregator_default_src_query;
1723
1724   gstelement_class->request_new_pad =
1725       GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
1726   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_aggregator_send_event);
1727   gstelement_class->release_pad =
1728       GST_DEBUG_FUNCPTR (gst_aggregator_release_pad);
1729   gstelement_class->change_state =
1730       GST_DEBUG_FUNCPTR (gst_aggregator_change_state);
1731
1732   gobject_class->set_property = gst_aggregator_set_property;
1733   gobject_class->get_property = gst_aggregator_get_property;
1734   gobject_class->finalize = gst_aggregator_finalize;
1735
1736   g_object_class_install_property (gobject_class, PROP_LATENCY,
1737       g_param_spec_int64 ("latency", "Buffer latency",
1738           "Additional latency in live mode to allow upstream "
1739           "to take longer to produce buffers for the current "
1740           "position", 0,
1741           (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1),
1742           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1743
1744   GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad);
1745 }
1746
1747 static void
1748 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
1749 {
1750   GstPadTemplate *pad_template;
1751   GstAggregatorPrivate *priv;
1752
1753   g_return_if_fail (klass->aggregate != NULL);
1754
1755   self->priv =
1756       G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
1757       GstAggregatorPrivate);
1758
1759   priv = self->priv;
1760
1761   pad_template =
1762       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
1763   g_return_if_fail (pad_template != NULL);
1764
1765   priv->padcount = -1;
1766   priv->tags_changed = FALSE;
1767
1768   self->priv->peer_latency_live = FALSE;
1769   self->priv->peer_latency_min = self->priv->sub_latency_min = 0;
1770   self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
1771   self->priv->has_peer_latency = FALSE;
1772   gst_aggregator_reset_flow_values (self);
1773
1774   self->srcpad = gst_pad_new_from_template (pad_template, "src");
1775
1776   gst_pad_set_event_function (self->srcpad,
1777       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_event_func));
1778   gst_pad_set_query_function (self->srcpad,
1779       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_query_func));
1780   gst_pad_set_activatemode_function (self->srcpad,
1781       GST_DEBUG_FUNCPTR (gst_aggregator_src_pad_activate_mode_func));
1782
1783   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
1784
1785   self->priv->latency = DEFAULT_LATENCY;
1786
1787   g_mutex_init (&self->priv->src_lock);
1788   g_cond_init (&self->priv->src_cond);
1789 }
1790
1791 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
1792  * method to get to the padtemplates */
1793 GType
1794 gst_aggregator_get_type (void)
1795 {
1796   static volatile gsize type = 0;
1797
1798   if (g_once_init_enter (&type)) {
1799     GType _type;
1800     static const GTypeInfo info = {
1801       sizeof (GstAggregatorClass),
1802       NULL,
1803       NULL,
1804       (GClassInitFunc) gst_aggregator_class_init,
1805       NULL,
1806       NULL,
1807       sizeof (GstAggregator),
1808       0,
1809       (GInstanceInitFunc) gst_aggregator_init,
1810     };
1811
1812     _type = g_type_register_static (GST_TYPE_ELEMENT,
1813         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
1814     g_once_init_leave (&type, _type);
1815   }
1816   return type;
1817 }
1818
1819 static GstFlowReturn
1820 gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
1821 {
1822   GstBuffer *actual_buf = buffer;
1823   GstAggregator *self = GST_AGGREGATOR (object);
1824   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1825   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
1826   GstFlowReturn flow_return;
1827
1828   GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
1829
1830   PAD_FLUSH_LOCK (aggpad);
1831
1832   PAD_LOCK (aggpad);
1833   flow_return = aggpad->priv->flow_return;
1834   if (flow_return != GST_FLOW_OK)
1835     goto flushing;
1836
1837   if (aggpad->priv->pending_eos == TRUE)
1838     goto eos;
1839
1840   while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
1841     PAD_WAIT_EVENT (aggpad);
1842
1843   flow_return = aggpad->priv->flow_return;
1844   if (flow_return != GST_FLOW_OK)
1845     goto flushing;
1846
1847   PAD_UNLOCK (aggpad);
1848
1849   if (aggclass->clip) {
1850     aggclass->clip (self, aggpad, buffer, &actual_buf);
1851   }
1852
1853   SRC_LOCK (self);
1854   PAD_LOCK (aggpad);
1855   if (aggpad->priv->buffer)
1856     gst_buffer_unref (aggpad->priv->buffer);
1857   aggpad->priv->buffer = actual_buf;
1858
1859   flow_return = aggpad->priv->flow_return;
1860
1861   PAD_UNLOCK (aggpad);
1862   PAD_FLUSH_UNLOCK (aggpad);
1863
1864   SRC_BROADCAST (self);
1865   SRC_UNLOCK (self);
1866
1867   GST_DEBUG_OBJECT (aggpad, "Done chaining");
1868
1869   return flow_return;
1870
1871 flushing:
1872   PAD_UNLOCK (aggpad);
1873   PAD_FLUSH_UNLOCK (aggpad);
1874
1875   gst_buffer_unref (buffer);
1876   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
1877       gst_flow_get_name (flow_return));
1878
1879   return flow_return;
1880
1881 eos:
1882   PAD_UNLOCK (aggpad);
1883   PAD_FLUSH_UNLOCK (aggpad);
1884
1885   gst_buffer_unref (buffer);
1886   GST_DEBUG_OBJECT (pad, "We are EOS already...");
1887
1888   return GST_FLOW_EOS;
1889 }
1890
1891 static gboolean
1892 gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent,
1893     GstQuery * query)
1894 {
1895   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1896   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1897
1898   if (GST_QUERY_IS_SERIALIZED (query)) {
1899     PAD_LOCK (aggpad);
1900
1901     while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
1902       PAD_WAIT_EVENT (aggpad);
1903
1904     if (aggpad->priv->flow_return != GST_FLOW_OK)
1905       goto flushing;
1906
1907     PAD_UNLOCK (aggpad);
1908   }
1909
1910   return klass->sink_query (GST_AGGREGATOR (parent),
1911       GST_AGGREGATOR_PAD (pad), query);
1912
1913 flushing:
1914   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query",
1915       gst_flow_get_name (aggpad->priv->flow_return));
1916   PAD_UNLOCK (aggpad);
1917   return FALSE;
1918 }
1919
1920 static gboolean
1921 gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent,
1922     GstEvent * event)
1923 {
1924   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1925   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1926
1927   if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS
1928       && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) {
1929     PAD_LOCK (aggpad);
1930
1931
1932     while (aggpad->priv->buffer && aggpad->priv->flow_return == GST_FLOW_OK)
1933       PAD_WAIT_EVENT (aggpad);
1934
1935     if (aggpad->priv->flow_return != GST_FLOW_OK
1936         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
1937       goto flushing;
1938
1939     PAD_UNLOCK (aggpad);
1940   }
1941
1942   return klass->sink_event (GST_AGGREGATOR (parent),
1943       GST_AGGREGATOR_PAD (pad), event);
1944
1945 flushing:
1946   GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping event",
1947       gst_flow_get_name (aggpad->priv->flow_return));
1948   PAD_UNLOCK (aggpad);
1949   if (GST_EVENT_IS_STICKY (event))
1950     gst_pad_store_sticky_event (pad, event);
1951   gst_event_unref (event);
1952   return FALSE;
1953 }
1954
1955 static gboolean
1956 gst_aggregator_pad_activate_mode_func (GstPad * pad,
1957     GstObject * parent, GstPadMode mode, gboolean active)
1958 {
1959   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1960
1961   if (active == FALSE) {
1962     gst_aggregator_pad_set_flushing (aggpad, GST_FLOW_FLUSHING);
1963   } else {
1964     PAD_LOCK (aggpad);
1965     aggpad->priv->flow_return = GST_FLOW_OK;
1966     PAD_BROADCAST_EVENT (aggpad);
1967     PAD_UNLOCK (aggpad);
1968   }
1969
1970   return TRUE;
1971 }
1972
1973 /***********************************
1974  * GstAggregatorPad implementation  *
1975  ************************************/
1976 G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
1977
1978 static void
1979 gst_aggregator_pad_constructed (GObject * object)
1980 {
1981   GstPad *pad = GST_PAD (object);
1982
1983   gst_pad_set_chain_function (pad,
1984       GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
1985   gst_pad_set_event_function (pad,
1986       GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func));
1987   gst_pad_set_query_function (pad,
1988       GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
1989   gst_pad_set_activatemode_function (pad,
1990       GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
1991 }
1992
1993 static void
1994 gst_aggregator_pad_finalize (GObject * object)
1995 {
1996   GstAggregatorPad *pad = (GstAggregatorPad *) object;
1997
1998   g_cond_clear (&pad->priv->event_cond);
1999   g_mutex_clear (&pad->priv->flush_lock);
2000   g_mutex_clear (&pad->priv->lock);
2001
2002   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->finalize (object);
2003 }
2004
2005 static void
2006 gst_aggregator_pad_dispose (GObject * object)
2007 {
2008   GstAggregatorPad *pad = (GstAggregatorPad *) object;
2009
2010   gst_aggregator_pad_drop_buffer (pad);
2011
2012   G_OBJECT_CLASS (gst_aggregator_pad_parent_class)->dispose (object);
2013 }
2014
2015 static void
2016 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
2017 {
2018   GObjectClass *gobject_class = (GObjectClass *) klass;
2019
2020   g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
2021
2022   gobject_class->constructed = gst_aggregator_pad_constructed;
2023   gobject_class->finalize = gst_aggregator_pad_finalize;
2024   gobject_class->dispose = gst_aggregator_pad_dispose;
2025 }
2026
2027 static void
2028 gst_aggregator_pad_init (GstAggregatorPad * pad)
2029 {
2030   pad->priv =
2031       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
2032       GstAggregatorPadPrivate);
2033
2034   pad->priv->buffer = NULL;
2035   g_cond_init (&pad->priv->event_cond);
2036
2037   g_mutex_init (&pad->priv->flush_lock);
2038   g_mutex_init (&pad->priv->lock);
2039 }
2040
2041 /**
2042  * gst_aggregator_pad_steal_buffer:
2043  * @pad: the pad to get buffer from
2044  *
2045  * Steal the ref to the buffer currently queued in @pad.
2046  *
2047  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
2048  *   queued. You should unref the buffer after usage.
2049  */
2050 GstBuffer *
2051 gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
2052 {
2053   GstBuffer *buffer = NULL;
2054
2055   PAD_LOCK (pad);
2056   if (pad->priv->buffer) {
2057     GST_TRACE_OBJECT (pad, "Consuming buffer");
2058     buffer = pad->priv->buffer;
2059     pad->priv->buffer = NULL;
2060     if (pad->priv->pending_eos) {
2061       pad->priv->pending_eos = FALSE;
2062       pad->priv->eos = TRUE;
2063     }
2064     PAD_BROADCAST_EVENT (pad);
2065     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
2066   }
2067   PAD_UNLOCK (pad);
2068
2069   return buffer;
2070 }
2071
2072 /**
2073  * gst_aggregator_pad_drop_buffer:
2074  * @pad: the pad where to drop any pending buffer
2075  *
2076  * Drop the buffer currently queued in @pad.
2077  *
2078  * Returns: TRUE if there was a buffer queued in @pad, or FALSE if not.
2079  */
2080 gboolean
2081 gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
2082 {
2083   GstBuffer *buf;
2084
2085   buf = gst_aggregator_pad_steal_buffer (pad);
2086
2087   if (buf == NULL)
2088     return FALSE;
2089
2090   gst_buffer_unref (buf);
2091   return TRUE;
2092 }
2093
2094 /**
2095  * gst_aggregator_pad_get_buffer:
2096  * @pad: the pad to get buffer from
2097  *
2098  * Returns: (transfer full): A reference to the buffer in @pad or
2099  * NULL if no buffer was queued. You should unref the buffer after
2100  * usage.
2101  */
2102 GstBuffer *
2103 gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
2104 {
2105   GstBuffer *buffer = NULL;
2106
2107   PAD_LOCK (pad);
2108   if (pad->priv->buffer)
2109     buffer = gst_buffer_ref (pad->priv->buffer);
2110   PAD_UNLOCK (pad);
2111
2112   return buffer;
2113 }
2114
2115 gboolean
2116 gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
2117 {
2118   gboolean is_eos;
2119
2120   PAD_LOCK (pad);
2121   is_eos = pad->priv->eos;
2122   PAD_UNLOCK (pad);
2123
2124   return is_eos;
2125 }
2126
2127 /**
2128  * gst_aggregator_merge_tags:
2129  * @self: a #GstAggregator
2130  * @tags: a #GstTagList to merge
2131  * @mode: the #GstTagMergeMode to use
2132  *
2133  * Adds tags to so-called pending tags, which will be processed
2134  * before pushing out data downstream.
2135  *
2136  * Note that this is provided for convenience, and the subclass is
2137  * not required to use this and can still do tag handling on its own.
2138  *
2139  * MT safe.
2140  */
2141 void
2142 gst_aggregator_merge_tags (GstAggregator * self,
2143     const GstTagList * tags, GstTagMergeMode mode)
2144 {
2145   GstTagList *otags;
2146
2147   g_return_if_fail (GST_IS_AGGREGATOR (self));
2148   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
2149
2150   /* FIXME Check if we can use OBJECT lock here! */
2151   GST_OBJECT_LOCK (self);
2152   if (tags)
2153     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
2154   otags = self->priv->tags;
2155   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
2156   if (otags)
2157     gst_tag_list_unref (otags);
2158   self->priv->tags_changed = TRUE;
2159   GST_OBJECT_UNLOCK (self);
2160 }
2161
2162 /**
2163  * gst_aggregator_set_latency:
2164  * @self: a #GstAggregator
2165  * @min_latency: minimum latency
2166  * @max_latency: maximum latency
2167  *
2168  * Lets #GstAggregator sub-classes tell the baseclass what their internal
2169  * latency is. Will also post a LATENCY message on the bus so the pipeline
2170  * can reconfigure its global latency.
2171  */
2172 void
2173 gst_aggregator_set_latency (GstAggregator * self,
2174     GstClockTime min_latency, GstClockTime max_latency)
2175 {
2176   gboolean changed = FALSE;
2177
2178   g_return_if_fail (GST_IS_AGGREGATOR (self));
2179   g_return_if_fail (GST_CLOCK_TIME_IS_VALID (min_latency));
2180   g_return_if_fail (max_latency >= min_latency);
2181
2182   SRC_LOCK (self);
2183   if (self->priv->sub_latency_min != min_latency) {
2184     self->priv->sub_latency_min = min_latency;
2185     changed = TRUE;
2186   }
2187   if (self->priv->sub_latency_max != max_latency) {
2188     self->priv->sub_latency_max = max_latency;
2189     changed = TRUE;
2190   }
2191
2192   if (changed)
2193     SRC_BROADCAST (self);
2194   SRC_UNLOCK (self);
2195
2196   if (changed) {
2197     gst_element_post_message (GST_ELEMENT_CAST (self),
2198         gst_message_new_latency (GST_OBJECT_CAST (self)));
2199   }
2200 }