aggregator: Add a timeout parameter to ::aggregate()
[platform/upstream/gstreamer.git] / libs / gst / base / gstaggregator.c
1 /* GStreamer
2  * Copyright (C) 2014 Mathieu Duponchelle <mathieu.duponchelle@opencreed.com>
3  * Copyright (C) 2014 Thibault Saunier <tsaunier@gnome.org>
4  *
5  * gstaggregator.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 /**
23  * SECTION: gstaggregator
24  * @short_description: manages a set of pads with the purpose of
25  * aggregating their buffers.
26  * @see_also: gstcollectpads for historical reasons.
27  *
28  * Manages a set of pads with the purpose of aggregating their buffers.
29  * Control is given to the subclass when all pads have data.
30  * <itemizedlist>
31  *  <listitem><para>
32  *    Base class for mixers and muxers. Implementers should at least implement
33  *    the aggregate () vmethod.
34  *  </para></listitem>
35  *  <listitem><para>
36  *    When data is queued on all pads, tha aggregate vmethod is called.
37  *  </para></listitem>
38  *  <listitem><para>
39  *    One can peek at the data on any given GstAggregatorPad with the
40  *    gst_aggregator_pad_get_buffer () method, and take ownership of it
41  *    with the gst_aggregator_pad_steal_buffer () method. When a buffer
42  *    has been taken with steal_buffer (), a new buffer can be queued
43  *    on that pad.
44  *  </para></listitem>
45  *  <listitem><para>
46  *    If the subclass wishes to push a buffer downstream in its aggregate
47  *    implementation, it should do so through the
48  *    gst_aggregator_finish_buffer () method. This method will take care
49  *    of sending and ordering mandatory events such as stream start, caps
50  *    and segment.
51  *  </para></listitem>
52  *  <listitem><para>
53  *    Same goes for EOS events, which should not be pushed directly by the
54  *    subclass, it should instead return GST_FLOW_EOS in its aggregate
55  *    implementation.
56  *  </para></listitem>
57  * </itemizedlist>
58  */
59
60 #ifdef HAVE_CONFIG_H
61 #  include "config.h"
62 #endif
63
64 #include <string.h>             /* strlen */
65
66 #include "gstaggregator.h"
67
68
69 /*  Might become API */
70 static void gst_aggregator_merge_tags (GstAggregator * aggregator,
71     const GstTagList * tags, GstTagMergeMode mode);
72 static void gst_aggregator_set_latency_property (GstAggregator * agg,
73     gint64 latency);
74 static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
75
76
77 GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
78 #define GST_CAT_DEFAULT aggregator_debug
79
80 /* GstAggregatorPad definitions */
81 #define PAD_LOCK_EVENT(pad)   G_STMT_START {                            \
82   GST_LOG_OBJECT (pad, "Taking EVENT lock from thread %p",              \
83         g_thread_self());                                               \
84   g_mutex_lock(&pad->priv->event_lock);                                 \
85   GST_LOG_OBJECT (pad, "Took EVENT lock from thread %p",              \
86         g_thread_self());                                               \
87   } G_STMT_END
88
89 #define PAD_UNLOCK_EVENT(pad)  G_STMT_START {                           \
90   GST_LOG_OBJECT (pad, "Releasing EVENT lock from thread %p",          \
91         g_thread_self());                                               \
92   g_mutex_unlock(&pad->priv->event_lock);                               \
93   GST_LOG_OBJECT (pad, "Release EVENT lock from thread %p",          \
94         g_thread_self());                                               \
95   } G_STMT_END
96
97
98 #define PAD_WAIT_EVENT(pad)   G_STMT_START {                            \
99   GST_LOG_OBJECT (pad, "Waiting for EVENT on thread %p",               \
100         g_thread_self());                                               \
101   g_cond_wait(&(((GstAggregatorPad* )pad)->priv->event_cond),       \
102       &(pad->priv->event_lock));                                        \
103   GST_LOG_OBJECT (pad, "DONE Waiting for EVENT on thread %p",               \
104         g_thread_self());                                               \
105   } G_STMT_END
106
107 #define PAD_BROADCAST_EVENT(pad) {                                          \
108   GST_LOG_OBJECT (pad, "Signaling EVENT from thread %p",               \
109         g_thread_self());                                                   \
110   g_cond_broadcast(&(((GstAggregatorPad* )pad)->priv->event_cond)); \
111   }
112
113 #define GST_AGGREGATOR_SETCAPS_LOCK(self)   G_STMT_START {        \
114   GST_LOG_OBJECT (self, "Taking SETCAPS lock from thread %p",   \
115         g_thread_self());                                         \
116   g_mutex_lock(&self->priv->setcaps_lock);                         \
117   GST_LOG_OBJECT (self, "Took SETCAPS lock from thread %p",     \
118         g_thread_self());                                         \
119   } G_STMT_END
120
121 #define GST_AGGREGATOR_SETCAPS_UNLOCK(self)   G_STMT_START {        \
122   GST_LOG_OBJECT (self, "Releasing SETCAPS lock from thread %p",  \
123         g_thread_self());                                           \
124   g_mutex_unlock(&self->priv->setcaps_lock);                         \
125   GST_LOG_OBJECT (self, "Took SETCAPS lock from thread %p",       \
126         g_thread_self());                                           \
127   } G_STMT_END
128
129 #define PAD_STREAM_LOCK(pad)   G_STMT_START {                            \
130   GST_LOG_OBJECT (pad, "Taking lock from thread %p",              \
131         g_thread_self());                                               \
132   g_mutex_lock(&pad->priv->stream_lock);                                 \
133   GST_LOG_OBJECT (pad, "Took lock from thread %p",              \
134         g_thread_self());                                               \
135   } G_STMT_END
136
137 #define PAD_STREAM_UNLOCK(pad)  G_STMT_START {                           \
138   GST_LOG_OBJECT (pad, "Releasing lock from thread %p",          \
139         g_thread_self());                                               \
140   g_mutex_unlock(&pad->priv->stream_lock);                               \
141   GST_LOG_OBJECT (pad, "Release lock from thread %p",          \
142         g_thread_self());                                               \
143   } G_STMT_END
144
145 #define SRC_STREAM_LOCK(self)   G_STMT_START {                             \
146   GST_LOG_OBJECT (self, "Taking src STREAM lock from thread %p",           \
147         g_thread_self());                                                  \
148   g_mutex_lock(&self->priv->src_lock);                                     \
149   GST_LOG_OBJECT (self, "Took src STREAM lock from thread %p",             \
150         g_thread_self());                                                  \
151   } G_STMT_END
152
153 #define SRC_STREAM_UNLOCK(self)  G_STMT_START {                            \
154   GST_LOG_OBJECT (self, "Releasing src STREAM lock from thread %p",        \
155         g_thread_self());                                                  \
156   g_mutex_unlock(&self->priv->src_lock);                                   \
157   GST_LOG_OBJECT (self, "Release src STREAM lock from thread %p",          \
158         g_thread_self());                                                  \
159   } G_STMT_END
160
161 #define SRC_STREAM_WAIT(self) G_STMT_START {                               \
162   GST_LOG_OBJECT (self, "Waiting for src STREAM on thread %p",             \
163         g_thread_self());                                                  \
164   g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock));           \
165   GST_LOG_OBJECT (self, "DONE Waiting for src STREAM on thread %p",        \
166         g_thread_self());                                                  \
167   } G_STMT_END
168
169 #define SRC_STREAM_BROADCAST(self) G_STMT_START {                          \
170   GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p",             \
171         g_thread_self());                                                  \
172   g_cond_broadcast(&(self->priv->src_cond));                               \
173   } G_STMT_END
174
175 #define KICK_SRC_THREAD(self) G_STMT_START {                               \
176     SRC_STREAM_LOCK (self);                                                \
177     GST_LOG_OBJECT (self, "kicking src STREAM from thread %p",             \
178           g_thread_self ());                                               \
179     if (self->priv->aggregate_id)                                          \
180       gst_clock_id_unschedule (self->priv->aggregate_id);                  \
181     self->priv->n_kicks++;                                                 \
182     SRC_STREAM_BROADCAST (self);                                           \
183     SRC_STREAM_UNLOCK (self);                                              \
184   } G_STMT_END
185
186 struct _GstAggregatorPadPrivate
187 {
188   gboolean pending_flush_start;
189   gboolean pending_flush_stop;
190   gboolean pending_eos;
191   gboolean flushing;
192
193   GMutex event_lock;
194   GCond event_cond;
195
196   GMutex stream_lock;
197 };
198
199 static gboolean
200 _aggpad_flush (GstAggregatorPad * aggpad, GstAggregator * agg)
201 {
202   GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
203
204   aggpad->eos = FALSE;
205   aggpad->priv->flushing = FALSE;
206
207   if (klass->flush)
208     return klass->flush (aggpad, agg);
209
210   return TRUE;
211 }
212
213 /*************************************
214  * GstAggregator implementation  *
215  *************************************/
216 static GstElementClass *aggregator_parent_class = NULL;
217
218 struct _GstAggregatorPrivate
219 {
220   gint padcount;
221
222   /* Our state is >= PAUSED */
223   gboolean running;
224
225
226   gint seqnum;
227   gboolean send_stream_start;
228   gboolean send_segment;
229   gboolean flush_seeking;
230   gboolean pending_flush_start;
231   gboolean send_eos;
232   GstFlowReturn flow_return;
233
234   GstCaps *srccaps;
235
236   GstTagList *tags;
237   gboolean tags_changed;
238
239   /* Lock to prevent two src setcaps from happening at the same time  */
240   GMutex setcaps_lock;
241
242   gboolean latency_live;
243   GstClockTime latency_min;
244   GstClockTime latency_max;
245
246   /* aggregate */
247   GstClockID aggregate_id;
248   gint n_kicks;
249   GMutex src_lock;
250   GCond src_cond;
251 };
252
253 typedef struct
254 {
255   GstEvent *event;
256   gboolean result;
257   gboolean flush;
258
259   gboolean one_actually_seeked;
260 } EventData;
261
262 #define DEFAULT_LATENCY        -1
263
264 enum
265 {
266   PROP_0,
267   PROP_LATENCY,
268   PROP_LAST
269 };
270
271 /**
272  * gst_aggregator_iterate_sinkpads:
273  * @self: The #GstAggregator
274  * @func: The function to call.
275  * @user_data: The data to pass to @func.
276  *
277  * Iterate the sinkpads of aggregator to call a function on them.
278  *
279  * This method guarantees that @func will be called only once for each
280  * sink pad.
281  */
282 gboolean
283 gst_aggregator_iterate_sinkpads (GstAggregator * self,
284     GstAggregatorPadForeachFunc func, gpointer user_data)
285 {
286   gboolean result = FALSE;
287   GstIterator *iter;
288   gboolean done = FALSE;
289   GValue item = { 0, };
290   GList *seen_pads = NULL;
291
292   iter = gst_element_iterate_sink_pads (GST_ELEMENT (self));
293
294   if (!iter)
295     goto no_iter;
296
297   while (!done) {
298     switch (gst_iterator_next (iter, &item)) {
299       case GST_ITERATOR_OK:
300       {
301         GstPad *pad;
302
303         pad = g_value_get_object (&item);
304
305         /* if already pushed, skip. FIXME, find something faster to tag pads */
306         if (pad == NULL || g_list_find (seen_pads, pad)) {
307           g_value_reset (&item);
308           break;
309         }
310
311         GST_LOG_OBJECT (self, "calling function on pad %s:%s",
312             GST_DEBUG_PAD_NAME (pad));
313         result = func (self, pad, user_data);
314
315         done = !result;
316
317         seen_pads = g_list_prepend (seen_pads, pad);
318
319         g_value_reset (&item);
320         break;
321       }
322       case GST_ITERATOR_RESYNC:
323         gst_iterator_resync (iter);
324         break;
325       case GST_ITERATOR_ERROR:
326         GST_ERROR_OBJECT (self,
327             "Could not iterate over internally linked pads");
328         done = TRUE;
329         break;
330       case GST_ITERATOR_DONE:
331         done = TRUE;
332         break;
333     }
334   }
335   g_value_unset (&item);
336   gst_iterator_free (iter);
337
338   if (seen_pads == NULL) {
339     GST_DEBUG_OBJECT (self, "No pad seen");
340     return FALSE;
341   }
342
343   g_list_free (seen_pads);
344
345 no_iter:
346   return result;
347 }
348
349 static inline gboolean
350 _check_all_pads_with_data_or_eos (GstAggregator * self,
351     GstAggregatorPad * aggpad, gpointer user_data)
352 {
353   if (aggpad->buffer || aggpad->eos) {
354     return TRUE;
355   }
356
357   GST_LOG_OBJECT (aggpad, "Not ready to be aggregated");
358
359   return FALSE;
360 }
361
362 static void
363 _reset_flow_values (GstAggregator * self)
364 {
365   self->priv->flow_return = GST_FLOW_FLUSHING;
366   self->priv->send_stream_start = TRUE;
367   self->priv->send_segment = TRUE;
368   gst_segment_init (&self->segment, GST_FORMAT_TIME);
369 }
370
371 static inline void
372 _push_mandatory_events (GstAggregator * self)
373 {
374   GstAggregatorPrivate *priv = self->priv;
375
376   if (g_atomic_int_get (&self->priv->send_stream_start)) {
377     gchar s_id[32];
378
379     GST_INFO_OBJECT (self, "pushing stream start");
380     /* stream-start (FIXME: create id based on input ids) */
381     g_snprintf (s_id, sizeof (s_id), "agg-%08x", g_random_int ());
382     if (!gst_pad_push_event (self->srcpad, gst_event_new_stream_start (s_id))) {
383       GST_WARNING_OBJECT (self->srcpad, "Sending stream start event failed");
384     }
385     g_atomic_int_set (&self->priv->send_stream_start, FALSE);
386   }
387
388   if (self->priv->srccaps) {
389
390     GST_INFO_OBJECT (self, "pushing caps: %" GST_PTR_FORMAT,
391         self->priv->srccaps);
392     if (!gst_pad_push_event (self->srcpad,
393             gst_event_new_caps (self->priv->srccaps))) {
394       GST_WARNING_OBJECT (self->srcpad, "Sending caps event failed");
395     }
396     gst_caps_unref (self->priv->srccaps);
397     self->priv->srccaps = NULL;
398   }
399
400   if (g_atomic_int_get (&self->priv->send_segment)) {
401     if (!g_atomic_int_get (&self->priv->flush_seeking)) {
402       GstEvent *segev = gst_event_new_segment (&self->segment);
403
404       if (!self->priv->seqnum)
405         self->priv->seqnum = gst_event_get_seqnum (segev);
406       else
407         gst_event_set_seqnum (segev, self->priv->seqnum);
408
409       GST_DEBUG_OBJECT (self, "pushing segment %" GST_PTR_FORMAT, segev);
410       gst_pad_push_event (self->srcpad, segev);
411       g_atomic_int_set (&self->priv->send_segment, FALSE);
412     }
413   }
414
415   if (priv->tags && priv->tags_changed) {
416     gst_pad_push_event (self->srcpad,
417         gst_event_new_tag (gst_tag_list_ref (priv->tags)));
418     priv->tags_changed = FALSE;
419   }
420 }
421
422 /**
423  * gst_aggregator_set_src_caps:
424  * @self: The #GstAggregator
425  * @caps: The #GstCaps to set on the src pad.
426  *
427  * Sets the caps to be used on the src pad.
428  */
429 void
430 gst_aggregator_set_src_caps (GstAggregator * self, GstCaps * caps)
431 {
432   GST_AGGREGATOR_SETCAPS_LOCK (self);
433   gst_caps_replace (&self->priv->srccaps, caps);
434   _push_mandatory_events (self);
435   GST_AGGREGATOR_SETCAPS_UNLOCK (self);
436 }
437
438 /**
439  * gst_aggregator_finish_buffer:
440  * @self: The #GstAggregator
441  * @buffer: the #GstBuffer to push.
442  *
443  * This method will take care of sending mandatory events before pushing
444  * the provided buffer.
445  */
446 GstFlowReturn
447 gst_aggregator_finish_buffer (GstAggregator * self, GstBuffer * buffer)
448 {
449   _push_mandatory_events (self);
450
451   if (!g_atomic_int_get (&self->priv->flush_seeking) &&
452       gst_pad_is_active (self->srcpad)) {
453     GST_TRACE_OBJECT (self, "pushing buffer %" GST_PTR_FORMAT, buffer);
454     return gst_pad_push (self->srcpad, buffer);
455   } else {
456     GST_INFO_OBJECT (self, "Not pushing (active: %i, flushing: %i)",
457         g_atomic_int_get (&self->priv->flush_seeking),
458         gst_pad_is_active (self->srcpad));
459     gst_buffer_unref (buffer);
460     return GST_FLOW_OK;
461   }
462 }
463
464 static void
465 _push_eos (GstAggregator * self)
466 {
467   GstEvent *event;
468   _push_mandatory_events (self);
469
470   self->priv->send_eos = FALSE;
471   event = gst_event_new_eos ();
472   gst_event_set_seqnum (event, self->priv->seqnum);
473   gst_pad_push_event (self->srcpad, event);
474 }
475
476 static GstClockTime
477 gst_aggregator_get_next_time (GstAggregator * self)
478 {
479   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
480
481   if (klass->get_next_time)
482     return klass->get_next_time (self);
483
484   return GST_CLOCK_TIME_NONE;
485 }
486
487 /* called with the src STREAM lock */
488 static gboolean
489 _wait_and_check (GstAggregator * self, gboolean * timeout)
490 {
491   GstClockTime latency_max, latency_min;
492   GstClockTime start;
493   gboolean live;
494
495   *timeout = FALSE;
496
497   gst_aggregator_get_latency (self, &live, &latency_min, &latency_max);
498
499   if (gst_aggregator_iterate_sinkpads (self,
500           (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos,
501           NULL)) {
502     GST_DEBUG_OBJECT (self, "all pads have data");
503     return TRUE;
504   }
505
506   SRC_STREAM_LOCK (self);
507   start = gst_aggregator_get_next_time (self);
508
509   if (!live || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self))
510       || !GST_CLOCK_TIME_IS_VALID (start)) {
511     while (self->priv->n_kicks <= 0)
512       SRC_STREAM_WAIT (self);
513     self->priv->n_kicks--;
514   } else {
515     GstClockTime base_time, time;
516     GstClock *clock;
517     GstClockReturn status;
518
519     GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
520         GST_TIME_ARGS (start));
521
522     GST_OBJECT_LOCK (self);
523     base_time = GST_ELEMENT_CAST (self)->base_time;
524     clock = GST_ELEMENT_CLOCK (self);
525     if (clock)
526       gst_object_ref (clock);
527     GST_OBJECT_UNLOCK (self);
528
529     time = base_time + start;
530
531     if (GST_CLOCK_TIME_IS_VALID (latency_max)) {
532       time += latency_max;
533     } else if (GST_CLOCK_TIME_IS_VALID (latency_min)) {
534       time += latency_min;
535     } else {
536       time += self->latency;
537     }
538
539     GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
540         GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
541         " latency max %" GST_TIME_FORMAT " latency min %" GST_TIME_FORMAT
542         " current %" GST_TIME_FORMAT ")", GST_TIME_ARGS (time),
543         GST_TIME_ARGS (GST_ELEMENT_CAST (self)->base_time),
544         GST_TIME_ARGS (start), GST_TIME_ARGS (latency_max),
545         GST_TIME_ARGS (latency_min),
546         GST_TIME_ARGS (gst_clock_get_time (clock)));
547
548     self->priv->aggregate_id = gst_clock_new_single_shot_id (clock, time);
549     gst_object_unref (clock);
550     SRC_STREAM_UNLOCK (self);
551
552     status = gst_clock_id_wait (self->priv->aggregate_id, NULL);
553
554     SRC_STREAM_LOCK (self);
555     if (self->priv->aggregate_id) {
556       gst_clock_id_unref (self->priv->aggregate_id);
557       self->priv->aggregate_id = NULL;
558     }
559     self->priv->n_kicks--;
560
561     GST_DEBUG_OBJECT (self, "clock returned %d", status);
562
563     /* we timed out */
564     if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
565       SRC_STREAM_UNLOCK (self);
566       *timeout = TRUE;
567       return TRUE;
568     }
569   }
570   SRC_STREAM_UNLOCK (self);
571
572   return gst_aggregator_iterate_sinkpads (self,
573       (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL);
574 }
575
576 static void
577 aggregate_func (GstAggregator * self)
578 {
579   GstAggregatorPrivate *priv = self->priv;
580   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
581   gboolean timeout = FALSE;
582
583   if (self->priv->running == FALSE) {
584     GST_DEBUG_OBJECT (self, "Not running anymore");
585     return;
586   }
587
588   GST_LOG_OBJECT (self, "Checking aggregate");
589   while (priv->send_eos && priv->running) {
590     if (!_wait_and_check (self, &timeout))
591       continue;
592
593     GST_TRACE_OBJECT (self, "Actually aggregating!");
594
595     priv->flow_return = klass->aggregate (self, timeout);
596
597     if (priv->flow_return == GST_FLOW_EOS) {
598       _push_eos (self);
599     }
600
601     if (priv->flow_return == GST_FLOW_FLUSHING &&
602         g_atomic_int_get (&priv->flush_seeking))
603       priv->flow_return = GST_FLOW_OK;
604
605     GST_LOG_OBJECT (self, "flow return is %s",
606         gst_flow_get_name (priv->flow_return));
607
608     if (priv->flow_return != GST_FLOW_OK)
609       break;
610   }
611 }
612
613 static gboolean
614 _start (GstAggregator * self)
615 {
616   self->priv->running = TRUE;
617   self->priv->send_stream_start = TRUE;
618   self->priv->send_segment = TRUE;
619   self->priv->send_eos = TRUE;
620   self->priv->srccaps = NULL;
621   self->priv->flow_return = GST_FLOW_OK;
622
623   return TRUE;
624 }
625
626 static gboolean
627 _check_pending_flush_stop (GstAggregatorPad * pad)
628 {
629   return (!pad->priv->pending_flush_stop && !pad->priv->pending_flush_start);
630 }
631
632 static gboolean
633 _stop_srcpad_task (GstAggregator * self, GstEvent * flush_start)
634 {
635   gboolean res = TRUE;
636
637   GST_INFO_OBJECT (self, "%s srcpad task",
638       flush_start ? "Pausing" : "Stopping");
639
640   self->priv->running = FALSE;
641   KICK_SRC_THREAD (self);
642
643   if (flush_start) {
644     res = gst_pad_push_event (self->srcpad, flush_start);
645   }
646
647   gst_pad_stop_task (self->srcpad);
648   KICK_SRC_THREAD (self);
649
650   return res;
651 }
652
653 static void
654 _start_srcpad_task (GstAggregator * self)
655 {
656   GST_INFO_OBJECT (self, "Starting srcpad task");
657
658   self->priv->running = TRUE;
659   self->priv->n_kicks = 0;
660   gst_pad_start_task (GST_PAD (self->srcpad),
661       (GstTaskFunction) aggregate_func, self, NULL);
662 }
663
664 static GstFlowReturn
665 _flush (GstAggregator * self)
666 {
667   GstFlowReturn ret = GST_FLOW_OK;
668   GstAggregatorPrivate *priv = self->priv;
669   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
670
671   GST_DEBUG_OBJECT (self, "Flushing everything");
672   g_atomic_int_set (&priv->send_segment, TRUE);
673   g_atomic_int_set (&priv->flush_seeking, FALSE);
674   g_atomic_int_set (&priv->tags_changed, FALSE);
675   if (klass->flush)
676     ret = klass->flush (self);
677
678   return ret;
679 }
680
681 static gboolean
682 _all_flush_stop_received (GstAggregator * self)
683 {
684   GList *tmp;
685   GstAggregatorPad *tmppad;
686
687   GST_OBJECT_LOCK (self);
688   for (tmp = GST_ELEMENT (self)->sinkpads; tmp; tmp = tmp->next) {
689     tmppad = (GstAggregatorPad *) tmp->data;
690
691     if (_check_pending_flush_stop (tmppad) == FALSE) {
692       GST_DEBUG_OBJECT (tmppad, "Is not last %i -- %i",
693           tmppad->priv->pending_flush_start, tmppad->priv->pending_flush_stop);
694       GST_OBJECT_UNLOCK (self);
695       return FALSE;
696     }
697   }
698   GST_OBJECT_UNLOCK (self);
699
700   return TRUE;
701 }
702
703 static void
704 _flush_start (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
705 {
706   GstBuffer *tmpbuf;
707   GstAggregatorPrivate *priv = self->priv;
708   GstAggregatorPadPrivate *padpriv = aggpad->priv;
709
710   g_atomic_int_set (&aggpad->priv->flushing, TRUE);
711   /*  Remove pad buffer and wake up the streaming thread */
712   tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
713   gst_buffer_replace (&tmpbuf, NULL);
714   PAD_STREAM_LOCK (aggpad);
715   if (g_atomic_int_compare_and_exchange (&padpriv->pending_flush_start,
716           TRUE, FALSE) == TRUE) {
717     GST_DEBUG_OBJECT (aggpad, "Expecting FLUSH_STOP now");
718     g_atomic_int_set (&padpriv->pending_flush_stop, TRUE);
719   }
720
721   if (g_atomic_int_get (&priv->flush_seeking)) {
722     /* If flush_seeking we forward the first FLUSH_START */
723     if (g_atomic_int_compare_and_exchange (&priv->pending_flush_start,
724             TRUE, FALSE) == TRUE) {
725
726       GST_INFO_OBJECT (self, "Flushing, pausing srcpad task");
727       _stop_srcpad_task (self, event);
728       priv->flow_return = GST_FLOW_OK;
729
730       GST_INFO_OBJECT (self, "Getting STREAM_LOCK while seeking");
731       GST_PAD_STREAM_LOCK (self->srcpad);
732       GST_LOG_OBJECT (self, "GOT STREAM_LOCK");
733       event = NULL;
734     }
735   } else {
736     gst_event_unref (event);
737   }
738   PAD_STREAM_UNLOCK (aggpad);
739
740   tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
741   gst_buffer_replace (&tmpbuf, NULL);
742 }
743
744 /* GstAggregator vmethods default implementations */
745 static gboolean
746 _sink_event (GstAggregator * self, GstAggregatorPad * aggpad, GstEvent * event)
747 {
748   gboolean res = TRUE;
749   GstPad *pad = GST_PAD (aggpad);
750   GstAggregatorPrivate *priv = self->priv;
751
752   switch (GST_EVENT_TYPE (event)) {
753     case GST_EVENT_FLUSH_START:
754     {
755       _flush_start (self, aggpad, event);
756       /* We forward only in one case: right after flush_seeking */
757       event = NULL;
758       goto eat;
759     }
760     case GST_EVENT_FLUSH_STOP:
761     {
762       GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP");
763
764       _aggpad_flush (aggpad, self);
765       if (g_atomic_int_get (&priv->flush_seeking)) {
766         g_atomic_int_set (&aggpad->priv->pending_flush_stop, FALSE);
767
768         if (g_atomic_int_get (&priv->flush_seeking)) {
769           if (_all_flush_stop_received (self)) {
770             /* That means we received FLUSH_STOP/FLUSH_STOP on
771              * all sinkpads -- Seeking is Done... sending FLUSH_STOP */
772             _flush (self);
773             gst_pad_push_event (self->srcpad, event);
774             priv->send_eos = TRUE;
775             event = NULL;
776             KICK_SRC_THREAD (self);
777
778             GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
779             GST_PAD_STREAM_UNLOCK (self->srcpad);
780             _start_srcpad_task (self);
781           }
782         }
783       }
784
785       /* We never forward the event */
786       goto eat;
787     }
788     case GST_EVENT_EOS:
789     {
790       GST_DEBUG_OBJECT (aggpad, "EOS");
791
792       /* We still have a buffer, and we don't want the subclass to have to
793        * check for it. Mark pending_eos, eos will be set when steal_buffer is
794        * called
795        */
796       PAD_LOCK_EVENT (aggpad);
797       if (!aggpad->buffer) {
798         aggpad->eos = TRUE;
799       } else {
800         aggpad->priv->pending_eos = TRUE;
801       }
802       PAD_UNLOCK_EVENT (aggpad);
803
804       KICK_SRC_THREAD (self);
805       goto eat;
806     }
807     case GST_EVENT_SEGMENT:
808     {
809       PAD_LOCK_EVENT (aggpad);
810       gst_event_copy_segment (event, &aggpad->segment);
811       self->priv->seqnum = gst_event_get_seqnum (event);
812       PAD_UNLOCK_EVENT (aggpad);
813       goto eat;
814     }
815     case GST_EVENT_STREAM_START:
816     {
817       goto eat;
818     }
819     case GST_EVENT_TAG:
820     {
821       GstTagList *tags;
822
823       gst_event_parse_tag (event, &tags);
824
825       if (gst_tag_list_get_scope (tags) == GST_TAG_SCOPE_STREAM) {
826         gst_aggregator_merge_tags (self, tags, GST_TAG_MERGE_REPLACE);
827         gst_event_unref (event);
828         event = NULL;
829         goto eat;
830       }
831       break;
832     }
833     default:
834     {
835       break;
836     }
837   }
838
839   GST_DEBUG_OBJECT (pad, "Forwarding event: %" GST_PTR_FORMAT, event);
840   return gst_pad_event_default (pad, GST_OBJECT (self), event);
841
842 eat:
843   GST_DEBUG_OBJECT (pad, "Eating event: %" GST_PTR_FORMAT, event);
844   if (event)
845     gst_event_unref (event);
846
847   return res;
848 }
849
850 static gboolean
851 _stop_pad (GstAggregator * self, GstAggregatorPad * pad, gpointer unused_udata)
852 {
853   _aggpad_flush (pad, self);
854
855   return TRUE;
856 }
857
858 static gboolean
859 _stop (GstAggregator * agg)
860 {
861   _reset_flow_values (agg);
862
863   gst_aggregator_iterate_sinkpads (agg,
864       (GstAggregatorPadForeachFunc) _stop_pad, NULL);
865
866   if (agg->priv->tags)
867     gst_tag_list_unref (agg->priv->tags);
868   agg->priv->tags = NULL;
869
870   return TRUE;
871 }
872
873 /* GstElement vmethods implementations */
874 static GstStateChangeReturn
875 _change_state (GstElement * element, GstStateChange transition)
876 {
877   GstStateChangeReturn ret;
878   GstAggregator *self = GST_AGGREGATOR (element);
879   GstAggregatorClass *agg_class = GST_AGGREGATOR_GET_CLASS (self);
880
881
882   switch (transition) {
883     case GST_STATE_CHANGE_READY_TO_PAUSED:
884       agg_class->start (self);
885       break;
886     default:
887       break;
888   }
889
890   if ((ret =
891           GST_ELEMENT_CLASS (aggregator_parent_class)->change_state (element,
892               transition)) == GST_STATE_CHANGE_FAILURE)
893     goto failure;
894
895
896   switch (transition) {
897     case GST_STATE_CHANGE_PAUSED_TO_READY:
898       agg_class->stop (self);
899       break;
900     default:
901       break;
902   }
903
904   return ret;
905
906 failure:
907   {
908     GST_ERROR_OBJECT (element, "parent failed state change");
909     return ret;
910   }
911 }
912
913 static void
914 _release_pad (GstElement * element, GstPad * pad)
915 {
916   GstAggregator *self = GST_AGGREGATOR (element);
917   GstBuffer *tmpbuf;
918
919   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
920
921   GST_INFO_OBJECT (pad, "Removing pad");
922
923   g_atomic_int_set (&aggpad->priv->flushing, TRUE);
924   tmpbuf = gst_aggregator_pad_steal_buffer (aggpad);
925   gst_buffer_replace (&tmpbuf, NULL);
926   gst_element_remove_pad (element, pad);
927
928   KICK_SRC_THREAD (self);
929 }
930
931 static GstPad *
932 _request_new_pad (GstElement * element,
933     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
934 {
935   GstAggregator *self;
936   GstAggregatorPad *agg_pad;
937
938   GstElementClass *klass = GST_ELEMENT_GET_CLASS (element);
939   GstAggregatorPrivate *priv = GST_AGGREGATOR (element)->priv;
940
941   self = GST_AGGREGATOR (element);
942
943   if (templ == gst_element_class_get_pad_template (klass, "sink_%u")) {
944     gint serial = 0;
945     gchar *name = NULL;
946
947     GST_OBJECT_LOCK (element);
948     if (req_name == NULL || strlen (req_name) < 6
949         || !g_str_has_prefix (req_name, "sink_")) {
950       /* no name given when requesting the pad, use next available int */
951       priv->padcount++;
952     } else {
953       /* parse serial number from requested padname */
954       serial = g_ascii_strtoull (&req_name[5], NULL, 10);
955       if (serial >= priv->padcount)
956         priv->padcount = serial;
957     }
958
959     name = g_strdup_printf ("sink_%u", priv->padcount);
960     agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
961         "name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
962     g_free (name);
963
964     GST_OBJECT_UNLOCK (element);
965
966   } else {
967     return NULL;
968   }
969
970   GST_DEBUG_OBJECT (element, "Adding pad %s", GST_PAD_NAME (agg_pad));
971
972   if (priv->running)
973     gst_pad_set_active (GST_PAD (agg_pad), TRUE);
974
975   /* add the pad to the element */
976   gst_element_add_pad (element, GST_PAD (agg_pad));
977
978   return GST_PAD (agg_pad);
979 }
980
981 typedef struct
982 {
983   GstClockTime min, max;
984   gboolean live;
985 } LatencyData;
986
987 static gboolean
988 _latency_query (GstAggregator * self, GstPad * pad, gpointer user_data)
989 {
990   LatencyData *data = user_data;
991   GstClockTime min, max;
992   GstQuery *query;
993   gboolean live, res;
994
995   query = gst_query_new_latency ();
996   res = gst_pad_peer_query (pad, query);
997
998   if (res) {
999     gst_query_parse_latency (query, &live, &min, &max);
1000
1001     GST_LOG_OBJECT (self, "got latency live:%s min:%" G_GINT64_FORMAT
1002         " max:%" G_GINT64_FORMAT, live ? "true" : "false", min, max);
1003
1004     if (min > data->min)
1005       data->min = min;
1006
1007     if (max != GST_CLOCK_TIME_NONE &&
1008         ((data->max != GST_CLOCK_TIME_NONE && max < data->max) ||
1009             (data->max == GST_CLOCK_TIME_NONE)))
1010       data->max = max;
1011
1012     data->live |= live;
1013   }
1014
1015   gst_query_unref (query);
1016
1017   return TRUE;
1018 }
1019
1020 /**
1021  * gst_aggregator_get_latency:
1022  * @self: a #GstAggregator
1023  * @live: (out) (allow-none): whether @self is live
1024  * @min_latency: (out) (allow-none): the configured minimum latency of @self
1025  * @max_latency: (out) (allow-none): the configured maximum latency of @self
1026  *
1027  * Retreives the latency values reported by @self in response to the latency
1028  * query.
1029  *
1030  * Typically only called by subclasses.
1031  */
1032 void
1033 gst_aggregator_get_latency (GstAggregator * self, gboolean * live,
1034     GstClockTime * min_latency, GstClockTime * max_latency)
1035 {
1036   GstClockTime min, max;
1037
1038   g_return_if_fail (GST_IS_AGGREGATOR (self));
1039
1040   min = self->priv->latency_min;
1041   max = self->priv->latency_max;
1042
1043   if (GST_CLOCK_TIME_IS_VALID (self->latency)) {
1044     min += self->latency;
1045     if (GST_CLOCK_TIME_IS_VALID (max))
1046       max += self->latency;
1047   }
1048
1049   if (live)
1050     *live = self->priv->latency_live;
1051   if (min_latency)
1052     *min_latency = min;
1053   if (max_latency)
1054     *max_latency = max;
1055 }
1056
1057 static gboolean
1058 gst_aggregator_query_latency (GstAggregator * self, GstQuery * query)
1059 {
1060   LatencyData data;
1061
1062   data.min = 0;
1063   data.max = GST_CLOCK_TIME_NONE;
1064   data.live = FALSE;
1065
1066   /* query upstream's latency */
1067   gst_aggregator_iterate_sinkpads (self,
1068       (GstAggregatorPadForeachFunc) _latency_query, &data);
1069
1070   if (data.live && GST_CLOCK_TIME_IS_VALID (self->latency) &&
1071       self->latency > data.max) {
1072     GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
1073         ("%s", "Latency too big"),
1074         ("The requested latency value is too big for the current pipeline.  "
1075             "Limiting to %" G_GINT64_FORMAT, data.max));
1076     self->latency = data.max;
1077   }
1078
1079   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (data.min))) {
1080     GST_WARNING_OBJECT (self, "Invalid minimum latency, using 0");
1081     data.min = 0;
1082   }
1083
1084   self->priv->latency_live = data.live;
1085   self->priv->latency_min = data.min;
1086   self->priv->latency_max = data.max;
1087
1088   /* add our own */
1089   if (GST_CLOCK_TIME_IS_VALID (self->latency)) {
1090     if (GST_CLOCK_TIME_IS_VALID (data.min))
1091       data.min += self->latency;
1092     if (GST_CLOCK_TIME_IS_VALID (data.max))
1093       data.max += self->latency;
1094   }
1095
1096   GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
1097       " max:%" G_GINT64_FORMAT, data.live ? "true" : "false", data.min,
1098       data.max);
1099
1100   gst_query_set_latency (query, data.live, data.min, data.max);
1101
1102   return TRUE;
1103 }
1104
1105 static gboolean
1106 _send_event (GstElement * element, GstEvent * event)
1107 {
1108   GstAggregator *self = GST_AGGREGATOR (element);
1109
1110   GST_STATE_LOCK (element);
1111   if (GST_EVENT_TYPE (event) == GST_EVENT_SEEK &&
1112       GST_STATE (element) < GST_STATE_PAUSED) {
1113     gdouble rate;
1114     GstFormat fmt;
1115     GstSeekFlags flags;
1116     GstSeekType start_type, stop_type;
1117     gint64 start, stop;
1118
1119     gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1120         &start, &stop_type, &stop);
1121     gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1122         stop_type, stop, NULL);
1123
1124     self->priv->seqnum = gst_event_get_seqnum (event);
1125     GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
1126   }
1127   GST_STATE_UNLOCK (element);
1128
1129
1130   return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
1131       event);
1132 }
1133
1134 static gboolean
1135 _src_query (GstAggregator * self, GstQuery * query)
1136 {
1137   gboolean res = TRUE;
1138
1139   switch (GST_QUERY_TYPE (query)) {
1140     case GST_QUERY_SEEKING:
1141     {
1142       GstFormat format;
1143
1144       /* don't pass it along as some (file)sink might claim it does
1145        * whereas with a collectpads in between that will not likely work */
1146       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1147       gst_query_set_seeking (query, format, FALSE, 0, -1);
1148       res = TRUE;
1149
1150       goto discard;
1151     }
1152     case GST_QUERY_LATENCY:
1153     {
1154       return gst_aggregator_query_latency (self, query);
1155     }
1156     default:
1157       break;
1158   }
1159
1160   return gst_pad_query_default (self->srcpad, GST_OBJECT (self), query);
1161
1162 discard:
1163   return res;
1164 }
1165
1166 static gboolean
1167 event_forward_func (GstPad * pad, EventData * evdata)
1168 {
1169   gboolean ret = TRUE;
1170   GstPad *peer = gst_pad_get_peer (pad);
1171   GstAggregatorPadPrivate *padpriv = GST_AGGREGATOR_PAD (pad)->priv;
1172
1173   if (peer) {
1174     ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
1175     GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
1176     gst_object_unref (peer);
1177   }
1178
1179   if (ret == FALSE) {
1180     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK)
1181       GST_ERROR_OBJECT (pad, "Event %" GST_PTR_FORMAT " failed", evdata->event);
1182
1183     if (GST_EVENT_TYPE (evdata->event) == GST_EVENT_SEEK) {
1184       GstQuery *seeking = gst_query_new_seeking (GST_FORMAT_TIME);
1185
1186       if (gst_pad_query (peer, seeking)) {
1187         gboolean seekable;
1188
1189         gst_query_parse_seeking (seeking, NULL, &seekable, NULL, NULL);
1190
1191         if (seekable == FALSE) {
1192           GST_INFO_OBJECT (pad,
1193               "Source not seekable, We failed but it does not matter!");
1194
1195           ret = TRUE;
1196         }
1197       } else {
1198         GST_ERROR_OBJECT (pad, "Query seeking FAILED");
1199       }
1200     }
1201
1202     if (evdata->flush) {
1203       padpriv->pending_flush_start = FALSE;
1204       padpriv->pending_flush_stop = FALSE;
1205     }
1206   } else {
1207     evdata->one_actually_seeked = TRUE;
1208   }
1209
1210   evdata->result &= ret;
1211
1212   /* Always send to all pads */
1213   return FALSE;
1214 }
1215
1216 static gboolean
1217 _set_flush_pending (GstAggregator * self, GstAggregatorPad * pad,
1218     gpointer udata)
1219 {
1220   pad->priv->pending_flush_start = TRUE;
1221   pad->priv->pending_flush_stop = FALSE;
1222
1223   return TRUE;
1224 }
1225
1226 static EventData
1227 _forward_event_to_all_sinkpads (GstAggregator * self, GstEvent * event,
1228     gboolean flush)
1229 {
1230   EventData evdata;
1231
1232   evdata.event = event;
1233   evdata.result = TRUE;
1234   evdata.flush = flush;
1235   evdata.one_actually_seeked = FALSE;
1236
1237   /* We first need to set all pads as flushing in a first pass
1238    * as flush_start flush_stop is sometimes sent synchronously
1239    * while we send the seek event */
1240   if (flush)
1241     gst_aggregator_iterate_sinkpads (self,
1242         (GstAggregatorPadForeachFunc) _set_flush_pending, NULL);
1243   gst_pad_forward (self->srcpad, (GstPadForwardFunction) event_forward_func,
1244       &evdata);
1245
1246   gst_event_unref (event);
1247
1248   return evdata;
1249 }
1250
1251 static gboolean
1252 _do_seek (GstAggregator * self, GstEvent * event)
1253 {
1254   gdouble rate;
1255   GstFormat fmt;
1256   GstSeekFlags flags;
1257   GstSeekType start_type, stop_type;
1258   gint64 start, stop;
1259   gboolean flush;
1260   EventData evdata;
1261   GstAggregatorPrivate *priv = self->priv;
1262
1263   gst_event_parse_seek (event, &rate, &fmt, &flags, &start_type,
1264       &start, &stop_type, &stop);
1265
1266   GST_INFO_OBJECT (self, "starting SEEK");
1267
1268   flush = flags & GST_SEEK_FLAG_FLUSH;
1269
1270   if (flush) {
1271     g_atomic_int_set (&priv->pending_flush_start, TRUE);
1272     g_atomic_int_set (&priv->flush_seeking, TRUE);
1273   }
1274
1275   gst_segment_do_seek (&self->segment, rate, fmt, flags, start_type, start,
1276       stop_type, stop, NULL);
1277
1278   /* forward the seek upstream */
1279   evdata = _forward_event_to_all_sinkpads (self, event, flush);
1280   event = NULL;
1281
1282   if (!evdata.result || !evdata.one_actually_seeked) {
1283     g_atomic_int_set (&priv->flush_seeking, FALSE);
1284     g_atomic_int_set (&priv->pending_flush_start, FALSE);
1285   }
1286
1287   GST_INFO_OBJECT (self, "seek done, result: %d", evdata.result);
1288
1289   return evdata.result;
1290 }
1291
1292 static gboolean
1293 _src_event (GstAggregator * self, GstEvent * event)
1294 {
1295   EventData evdata;
1296   gboolean res = TRUE;
1297
1298   switch (GST_EVENT_TYPE (event)) {
1299     case GST_EVENT_SEEK:
1300     {
1301       gst_event_ref (event);
1302       res = _do_seek (self, event);
1303       gst_event_unref (event);
1304       event = NULL;
1305       goto done;
1306     }
1307     case GST_EVENT_NAVIGATION:
1308     {
1309       /* navigation is rather pointless. */
1310       res = FALSE;
1311       gst_event_unref (event);
1312       goto done;
1313     }
1314     default:
1315     {
1316       break;
1317     }
1318   }
1319
1320   evdata = _forward_event_to_all_sinkpads (self, event, FALSE);
1321   res = evdata.result;
1322
1323 done:
1324   return res;
1325 }
1326
1327 static gboolean
1328 src_event_func (GstPad * pad, GstObject * parent, GstEvent * event)
1329 {
1330   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1331
1332   return klass->src_event (GST_AGGREGATOR (parent), event);
1333 }
1334
1335 static gboolean
1336 src_query_func (GstPad * pad, GstObject * parent, GstQuery * query)
1337 {
1338   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1339
1340   return klass->src_query (GST_AGGREGATOR (parent), query);
1341 }
1342
1343 static gboolean
1344 src_activate_mode (GstPad * pad,
1345     GstObject * parent, GstPadMode mode, gboolean active)
1346 {
1347   GstAggregator *self = GST_AGGREGATOR (parent);
1348   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1349
1350   if (klass->src_activate) {
1351     if (klass->src_activate (self, mode, active) == FALSE) {
1352       return FALSE;
1353     }
1354   }
1355
1356   if (active == TRUE) {
1357     switch (mode) {
1358       case GST_PAD_MODE_PUSH:
1359       {
1360         GST_INFO_OBJECT (pad, "Activating pad!");
1361         _start_srcpad_task (self);
1362         return TRUE;
1363       }
1364       default:
1365       {
1366         GST_ERROR_OBJECT (pad, "Only supported mode is PUSH");
1367         return FALSE;
1368       }
1369     }
1370   }
1371
1372   /* deactivating */
1373   GST_INFO_OBJECT (self, "Deactivating srcpad");
1374   _stop_srcpad_task (self, FALSE);
1375
1376   return TRUE;
1377 }
1378
1379 static gboolean
1380 _sink_query (GstAggregator * self, GstAggregatorPad * aggpad, GstQuery * query)
1381 {
1382   GstPad *pad = GST_PAD (aggpad);
1383
1384   return gst_pad_query_default (pad, GST_OBJECT (self), query);
1385 }
1386
1387 static void
1388 gst_aggregator_finalize (GObject * object)
1389 {
1390   GstAggregator *self = (GstAggregator *) object;
1391
1392   gst_object_unref (self->clock);
1393   g_mutex_clear (&self->priv->setcaps_lock);
1394   g_mutex_clear (&self->priv->src_lock);
1395   g_cond_clear (&self->priv->src_cond);
1396
1397   G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
1398 }
1399
1400 static void
1401 gst_aggregator_dispose (GObject * object)
1402 {
1403   G_OBJECT_CLASS (aggregator_parent_class)->dispose (object);
1404 }
1405
1406 /*
1407  * gst_aggregator_set_latency_property:
1408  * @agg: a #GstAggregator
1409  * @latency: the new latency value.
1410  *
1411  * Sets the new latency value to @latency. This value is used to limit the
1412  * amount of time a pad waits for data to appear before considering the pad
1413  * as unresponsive.
1414  */
1415 static void
1416 gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
1417 {
1418   g_return_if_fail (GST_IS_AGGREGATOR (self));
1419
1420   GST_OBJECT_LOCK (self);
1421
1422   if (self->priv->latency_live && self->priv->latency_max != 0 &&
1423       GST_CLOCK_TIME_IS_VALID (latency) && latency > self->priv->latency_max) {
1424     GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
1425         ("%s", "Latency too big"),
1426         ("The requested latency value is too big for the latency in the "
1427             "current pipeline.  Limiting to %" G_GINT64_FORMAT,
1428             self->priv->latency_max));
1429     latency = self->priv->latency_max;
1430   }
1431
1432   self->latency = latency;
1433   GST_OBJECT_UNLOCK (self);
1434 }
1435
1436 /*
1437  * gst_aggregator_get_latency_property:
1438  * @agg: a #GstAggregator
1439  *
1440  * Gets the latency value. See gst_aggregator_set_latency for
1441  * more details.
1442  *
1443  * Returns: The time in nanoseconds to wait for data to arrive on a sink pad 
1444  * before a pad is deemed unresponsive. A value of -1 means an
1445  * unlimited time.
1446  */
1447 static gint64
1448 gst_aggregator_get_latency_property (GstAggregator * agg)
1449 {
1450   gint64 res;
1451
1452   g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
1453
1454   GST_OBJECT_LOCK (agg);
1455   res = agg->latency;
1456   GST_OBJECT_UNLOCK (agg);
1457
1458   return res;
1459 }
1460
1461 static void
1462 gst_aggregator_set_property (GObject * object, guint prop_id,
1463     const GValue * value, GParamSpec * pspec)
1464 {
1465   GstAggregator *agg = GST_AGGREGATOR (object);
1466
1467   switch (prop_id) {
1468     case PROP_LATENCY:
1469       gst_aggregator_set_latency_property (agg, g_value_get_int64 (value));
1470       break;
1471     default:
1472       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1473       break;
1474   }
1475 }
1476
1477 static void
1478 gst_aggregator_get_property (GObject * object, guint prop_id,
1479     GValue * value, GParamSpec * pspec)
1480 {
1481   GstAggregator *agg = GST_AGGREGATOR (object);
1482
1483   switch (prop_id) {
1484     case PROP_LATENCY:
1485       g_value_set_int64 (value, gst_aggregator_get_latency_property (agg));
1486       break;
1487     default:
1488       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1489       break;
1490   }
1491 }
1492
1493 /* GObject vmethods implementations */
1494 static void
1495 gst_aggregator_class_init (GstAggregatorClass * klass)
1496 {
1497   GObjectClass *gobject_class = (GObjectClass *) klass;
1498   GstElementClass *gstelement_class = (GstElementClass *) klass;
1499
1500   aggregator_parent_class = g_type_class_peek_parent (klass);
1501   g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
1502
1503   GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
1504       GST_DEBUG_FG_MAGENTA, "GstAggregator");
1505
1506   klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD;
1507   klass->start = _start;
1508   klass->stop = _stop;
1509
1510   klass->sink_event = _sink_event;
1511   klass->sink_query = _sink_query;
1512
1513   klass->src_event = _src_event;
1514   klass->src_query = _src_query;
1515
1516   gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (_request_new_pad);
1517   gstelement_class->send_event = GST_DEBUG_FUNCPTR (_send_event);
1518   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (_release_pad);
1519   gstelement_class->change_state = GST_DEBUG_FUNCPTR (_change_state);
1520
1521   gobject_class->set_property = gst_aggregator_set_property;
1522   gobject_class->get_property = gst_aggregator_get_property;
1523   gobject_class->finalize = gst_aggregator_finalize;
1524   gobject_class->dispose = gst_aggregator_dispose;
1525
1526   g_object_class_install_property (gobject_class, PROP_LATENCY,
1527       g_param_spec_int64 ("latency", "Buffer latency",
1528           "Number of nanoseconds to wait for a buffer to arrive on a sink pad"
1529           "before the pad is deemed unresponsive (-1 unlimited)", -1,
1530           (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1),
1531           DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1532 }
1533
1534 static void
1535 gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
1536 {
1537   GstPadTemplate *pad_template;
1538   GstAggregatorPrivate *priv;
1539
1540   g_return_if_fail (klass->aggregate != NULL);
1541
1542   self->priv =
1543       G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
1544       GstAggregatorPrivate);
1545
1546   priv = self->priv;
1547
1548   pad_template =
1549       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "src");
1550   g_return_if_fail (pad_template != NULL);
1551
1552   priv->padcount = -1;
1553   priv->tags_changed = FALSE;
1554
1555   self->priv->latency_live = FALSE;
1556   self->priv->latency_min = 0;
1557   self->priv->latency_max = GST_CLOCK_TIME_NONE;
1558   _reset_flow_values (self);
1559
1560   self->srcpad = gst_pad_new_from_template (pad_template, "src");
1561
1562   gst_pad_set_event_function (self->srcpad,
1563       GST_DEBUG_FUNCPTR ((GstPadEventFunction) src_event_func));
1564   gst_pad_set_query_function (self->srcpad,
1565       GST_DEBUG_FUNCPTR ((GstPadQueryFunction) src_query_func));
1566   gst_pad_set_activatemode_function (self->srcpad,
1567       GST_DEBUG_FUNCPTR ((GstPadActivateModeFunction) src_activate_mode));
1568
1569   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
1570
1571   self->clock = gst_system_clock_obtain ();
1572   self->latency = -1;
1573
1574   g_mutex_init (&self->priv->setcaps_lock);
1575   g_mutex_init (&self->priv->src_lock);
1576   g_cond_init (&self->priv->src_cond);
1577 }
1578
1579 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
1580  * method to get to the padtemplates */
1581 GType
1582 gst_aggregator_get_type (void)
1583 {
1584   static volatile gsize type = 0;
1585
1586   if (g_once_init_enter (&type)) {
1587     GType _type;
1588     static const GTypeInfo info = {
1589       sizeof (GstAggregatorClass),
1590       NULL,
1591       NULL,
1592       (GClassInitFunc) gst_aggregator_class_init,
1593       NULL,
1594       NULL,
1595       sizeof (GstAggregator),
1596       0,
1597       (GInstanceInitFunc) gst_aggregator_init,
1598     };
1599
1600     _type = g_type_register_static (GST_TYPE_ELEMENT,
1601         "GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
1602     g_once_init_leave (&type, _type);
1603   }
1604   return type;
1605 }
1606
1607 static GstFlowReturn
1608 _chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
1609 {
1610   GstBuffer *actual_buf = buffer;
1611   GstAggregator *self = GST_AGGREGATOR (object);
1612   GstAggregatorPrivate *priv = self->priv;
1613   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1614   GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
1615
1616   GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
1617
1618   PAD_STREAM_LOCK (aggpad);
1619
1620   if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
1621     goto flushing;
1622
1623   if (g_atomic_int_get (&aggpad->priv->pending_eos) == TRUE)
1624     goto eos;
1625
1626   PAD_LOCK_EVENT (aggpad);
1627
1628   while (aggpad->buffer && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
1629     GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
1630     PAD_WAIT_EVENT (aggpad);
1631   }
1632   PAD_UNLOCK_EVENT (aggpad);
1633
1634   if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
1635     goto flushing;
1636
1637   if (aggclass->clip) {
1638     aggclass->clip (self, aggpad, buffer, &actual_buf);
1639   }
1640
1641   PAD_LOCK_EVENT (aggpad);
1642   if (aggpad->buffer)
1643     gst_buffer_unref (aggpad->buffer);
1644   aggpad->buffer = actual_buf;
1645   PAD_UNLOCK_EVENT (aggpad);
1646   PAD_STREAM_UNLOCK (aggpad);
1647
1648   if (gst_aggregator_iterate_sinkpads (self,
1649           (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL))
1650     KICK_SRC_THREAD (self);
1651
1652   GST_DEBUG_OBJECT (aggpad, "Done chaining");
1653
1654   return priv->flow_return;
1655
1656 flushing:
1657   PAD_STREAM_UNLOCK (aggpad);
1658
1659   gst_buffer_unref (buffer);
1660   GST_DEBUG_OBJECT (aggpad, "We are flushing");
1661
1662   return GST_FLOW_FLUSHING;
1663
1664 eos:
1665   PAD_STREAM_UNLOCK (aggpad);
1666
1667   gst_buffer_unref (buffer);
1668   GST_DEBUG_OBJECT (pad, "We are EOS already...");
1669
1670   return GST_FLOW_EOS;
1671 }
1672
1673 static gboolean
1674 pad_query_func (GstPad * pad, GstObject * parent, GstQuery * query)
1675 {
1676   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1677   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1678
1679   if (GST_QUERY_IS_SERIALIZED (query)) {
1680     PAD_LOCK_EVENT (aggpad);
1681
1682     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE) {
1683       PAD_UNLOCK_EVENT (aggpad);
1684       goto flushing;
1685     }
1686
1687     while (aggpad->buffer
1688         && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
1689       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
1690       PAD_WAIT_EVENT (aggpad);
1691     }
1692     PAD_UNLOCK_EVENT (aggpad);
1693
1694     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
1695       goto flushing;
1696   }
1697
1698   return klass->sink_query (GST_AGGREGATOR (parent),
1699       GST_AGGREGATOR_PAD (pad), query);
1700
1701 flushing:
1702   GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping query");
1703   return FALSE;
1704 }
1705
1706 static gboolean
1707 pad_event_func (GstPad * pad, GstObject * parent, GstEvent * event)
1708 {
1709   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1710   GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent);
1711
1712   if (GST_EVENT_IS_SERIALIZED (event) && GST_EVENT_TYPE (event) != GST_EVENT_EOS
1713       && GST_EVENT_TYPE (event) != GST_EVENT_SEGMENT_DONE) {
1714     PAD_LOCK_EVENT (aggpad);
1715
1716     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE
1717         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) {
1718       PAD_UNLOCK_EVENT (aggpad);
1719       goto flushing;
1720     }
1721
1722     while (aggpad->buffer
1723         && g_atomic_int_get (&aggpad->priv->flushing) == FALSE) {
1724       GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
1725       PAD_WAIT_EVENT (aggpad);
1726     }
1727     PAD_UNLOCK_EVENT (aggpad);
1728
1729     if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE
1730         && GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP)
1731       goto flushing;
1732   }
1733
1734   return klass->sink_event (GST_AGGREGATOR (parent),
1735       GST_AGGREGATOR_PAD (pad), event);
1736
1737 flushing:
1738   GST_DEBUG_OBJECT (aggpad, "Pad is flushing, dropping event");
1739   if (GST_EVENT_IS_STICKY (event))
1740     gst_pad_store_sticky_event (pad, event);
1741   gst_event_unref (event);
1742   return FALSE;
1743 }
1744
1745 static gboolean
1746 pad_activate_mode_func (GstPad * pad,
1747     GstObject * parent, GstPadMode mode, gboolean active)
1748 {
1749   GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
1750
1751   if (active == FALSE) {
1752     PAD_LOCK_EVENT (aggpad);
1753     g_atomic_int_set (&aggpad->priv->flushing, TRUE);
1754     gst_buffer_replace (&aggpad->buffer, NULL);
1755     PAD_BROADCAST_EVENT (aggpad);
1756     PAD_UNLOCK_EVENT (aggpad);
1757   } else {
1758     g_atomic_int_set (&aggpad->priv->flushing, FALSE);
1759     PAD_LOCK_EVENT (aggpad);
1760     PAD_BROADCAST_EVENT (aggpad);
1761     PAD_UNLOCK_EVENT (aggpad);
1762   }
1763
1764   return TRUE;
1765 }
1766
1767 /***********************************
1768  * GstAggregatorPad implementation  *
1769  ************************************/
1770 static GstPadClass *aggregator_pad_parent_class = NULL;
1771 G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
1772
1773 static void
1774 _pad_constructed (GObject * object)
1775 {
1776   GstPad *pad = GST_PAD (object);
1777
1778   gst_pad_set_chain_function (pad,
1779       GST_DEBUG_FUNCPTR ((GstPadChainFunction) _chain));
1780   gst_pad_set_event_function (pad,
1781       GST_DEBUG_FUNCPTR ((GstPadEventFunction) pad_event_func));
1782   gst_pad_set_query_function (pad,
1783       GST_DEBUG_FUNCPTR ((GstPadQueryFunction) pad_query_func));
1784   gst_pad_set_activatemode_function (pad,
1785       GST_DEBUG_FUNCPTR ((GstPadActivateModeFunction) pad_activate_mode_func));
1786 }
1787
1788 static void
1789 gst_aggregator_pad_finalize (GObject * object)
1790 {
1791   GstAggregatorPad *pad = (GstAggregatorPad *) object;
1792
1793   g_mutex_clear (&pad->priv->event_lock);
1794   g_cond_clear (&pad->priv->event_cond);
1795   g_mutex_clear (&pad->priv->stream_lock);
1796
1797   G_OBJECT_CLASS (aggregator_pad_parent_class)->finalize (object);
1798 }
1799
1800 static void
1801 gst_aggregator_pad_dispose (GObject * object)
1802 {
1803   GstAggregatorPad *pad = (GstAggregatorPad *) object;
1804   GstBuffer *buf;
1805
1806   buf = gst_aggregator_pad_steal_buffer (pad);
1807   if (buf)
1808     gst_buffer_unref (buf);
1809
1810   G_OBJECT_CLASS (aggregator_pad_parent_class)->dispose (object);
1811 }
1812
1813 static void
1814 gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
1815 {
1816   GObjectClass *gobject_class = (GObjectClass *) klass;
1817
1818   aggregator_pad_parent_class = g_type_class_peek_parent (klass);
1819   g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
1820
1821   gobject_class->constructed = GST_DEBUG_FUNCPTR (_pad_constructed);
1822   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_aggregator_pad_finalize);
1823   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_aggregator_pad_dispose);
1824 }
1825
1826 static void
1827 gst_aggregator_pad_init (GstAggregatorPad * pad)
1828 {
1829   pad->priv =
1830       G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
1831       GstAggregatorPadPrivate);
1832
1833   pad->buffer = NULL;
1834   g_mutex_init (&pad->priv->event_lock);
1835   g_cond_init (&pad->priv->event_cond);
1836
1837   g_mutex_init (&pad->priv->stream_lock);
1838 }
1839
1840 /**
1841  * gst_aggregator_pad_steal_buffer:
1842  * @pad: the pad to get buffer from
1843  *
1844  * Steal the ref to the buffer currently queued in @pad.
1845  *
1846  * Returns: (transfer full): The buffer in @pad or NULL if no buffer was
1847  *   queued. You should unref the buffer after usage.
1848  */
1849 GstBuffer *
1850 gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
1851 {
1852   GstBuffer *buffer = NULL;
1853
1854   PAD_LOCK_EVENT (pad);
1855   if (pad->buffer) {
1856     GST_TRACE_OBJECT (pad, "Consuming buffer");
1857     buffer = pad->buffer;
1858     pad->buffer = NULL;
1859     if (pad->priv->pending_eos) {
1860       pad->priv->pending_eos = FALSE;
1861       pad->eos = TRUE;
1862     }
1863     PAD_BROADCAST_EVENT (pad);
1864     GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
1865   }
1866   PAD_UNLOCK_EVENT (pad);
1867
1868   return buffer;
1869 }
1870
1871 /**
1872  * gst_aggregator_pad_get_buffer:
1873  * @pad: the pad to get buffer from
1874  *
1875  * Returns: (transfer full): A reference to the buffer in @pad or
1876  * NULL if no buffer was queued. You should unref the buffer after
1877  * usage.
1878  */
1879 GstBuffer *
1880 gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
1881 {
1882   GstBuffer *buffer = NULL;
1883
1884   PAD_LOCK_EVENT (pad);
1885   if (pad->buffer)
1886     buffer = gst_buffer_ref (pad->buffer);
1887   PAD_UNLOCK_EVENT (pad);
1888
1889   return buffer;
1890 }
1891
1892 /**
1893  * gst_aggregator_merge_tags:
1894  * @self: a #GstAggregator
1895  * @tags: a #GstTagList to merge
1896  * @mode: the #GstTagMergeMode to use
1897  *
1898  * Adds tags to so-called pending tags, which will be processed
1899  * before pushing out data downstream.
1900  *
1901  * Note that this is provided for convenience, and the subclass is
1902  * not required to use this and can still do tag handling on its own.
1903  *
1904  * MT safe.
1905  */
1906 void
1907 gst_aggregator_merge_tags (GstAggregator * self,
1908     const GstTagList * tags, GstTagMergeMode mode)
1909 {
1910   GstTagList *otags;
1911
1912   g_return_if_fail (GST_IS_AGGREGATOR (self));
1913   g_return_if_fail (tags == NULL || GST_IS_TAG_LIST (tags));
1914
1915   /* FIXME Check if we can use OBJECT lock here! */
1916   GST_OBJECT_LOCK (self);
1917   if (tags)
1918     GST_DEBUG_OBJECT (self, "merging tags %" GST_PTR_FORMAT, tags);
1919   otags = self->priv->tags;
1920   self->priv->tags = gst_tag_list_merge (self->priv->tags, tags, mode);
1921   if (otags)
1922     gst_tag_list_unref (otags);
1923   self->priv->tags_changed = TRUE;
1924   GST_OBJECT_UNLOCK (self);
1925 }