funnel: Only emit EOS event if all sinkpads have received one
[platform/upstream/gstreamer.git] / plugins / elements / gstfunnel.c
1 /*
2  * GStreamer Funnel element
3  *
4  * Copyright 2007 Collabora Ltd.
5  *  @author: Olivier Crete <olivier.crete@collabora.co.uk>
6  * Copyright 2007 Nokia Corp.
7  *
8  * gstfunnel.c: Simple Funnel element
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License as published by the Free Software Foundation; either
13  * version 2.1 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Lesser General Public License for more details.
19  *
20  * You should have received a copy of the GNU Lesser General Public
21  * License along with this library; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
23  */
24
25 /**
26  * SECTION:element-funnel
27  *
28  * Takes packets from various input sinks into one output source.
29  *
30  * funnel always outputs a single, open ended segment from
31  * 0 with in %GST_FORMAT_TIME and outputs the buffers of the
32  * different sinkpads with timestamps that are set to the
33  * running time for that stream. funnel does not synchronize
34  * the different input streams but simply forwards all buffers
35  * immediately when they arrive.
36  *
37  */
38
39 #ifdef HAVE_CONFIG_H
40 #include "config.h"
41 #endif
42
43 #include "gstfunnel.h"
44
45 GST_DEBUG_CATEGORY_STATIC (gst_funnel_debug);
46 #define GST_CAT_DEFAULT gst_funnel_debug
47
48 GType gst_funnel_pad_get_type (void);
49 #define GST_TYPE_FUNNEL_PAD \
50   (gst_funnel_pad_get_type())
51 #define GST_FUNNEL_PAD(obj) \
52   (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_FUNNEL_PAD, GstFunnelPad))
53 #define GST_FUNNEL_PAD_CLASS(klass) \
54   (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_FUNNEL_PAD, GstFunnelPadClass))
55 #define GST_IS_FUNNEL_PAD(obj) \
56   (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_FUNNEL_PAD))
57 #define GST_IS_FUNNEL_PAD_CLASS(klass) \
58   (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_FUNNEL_PAD))
59 #define GST_FUNNEL_PAD_CAST(obj) \
60   ((GstFunnelPad *)(obj))
61
62 typedef struct _GstFunnelPad GstFunnelPad;
63 typedef struct _GstFunnelPadClass GstFunnelPadClass;
64
65 struct _GstFunnelPad
66 {
67   GstPad parent;
68
69   GstSegment segment;
70   gboolean got_eos;
71 };
72
73 struct _GstFunnelPadClass
74 {
75   GstPadClass parent;
76 };
77
78 G_DEFINE_TYPE (GstFunnelPad, gst_funnel_pad, GST_TYPE_PAD);
79
80 static void
81 gst_funnel_pad_class_init (GstFunnelPadClass * klass)
82 {
83 }
84
85 static void
86 gst_funnel_pad_reset (GstFunnelPad * pad)
87 {
88   gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
89   pad->got_eos = FALSE;
90 }
91
92 static void
93 gst_funnel_pad_init (GstFunnelPad * pad)
94 {
95   gst_funnel_pad_reset (pad);
96 }
97
98 static GstStaticPadTemplate funnel_sink_template =
99 GST_STATIC_PAD_TEMPLATE ("sink_%u",
100     GST_PAD_SINK,
101     GST_PAD_REQUEST,
102     GST_STATIC_CAPS_ANY);
103
104 static GstStaticPadTemplate funnel_src_template =
105 GST_STATIC_PAD_TEMPLATE ("src",
106     GST_PAD_SRC,
107     GST_PAD_ALWAYS,
108     GST_STATIC_CAPS_ANY);
109
110 #define _do_init \
111   GST_DEBUG_CATEGORY_INIT (gst_funnel_debug, "funnel", 0, "funnel element");
112 #define gst_funnel_parent_class parent_class
113 G_DEFINE_TYPE_WITH_CODE (GstFunnel, gst_funnel, GST_TYPE_ELEMENT, _do_init);
114
115 static GstStateChangeReturn gst_funnel_change_state (GstElement * element,
116     GstStateChange transition);
117 static GstPad *gst_funnel_request_new_pad (GstElement * element,
118     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
119 static void gst_funnel_release_pad (GstElement * element, GstPad * pad);
120
121 static GstFlowReturn gst_funnel_sink_chain (GstPad * pad, GstObject * parent,
122     GstBuffer * buffer);
123 static gboolean gst_funnel_sink_event (GstPad * pad, GstObject * parent,
124     GstEvent * event);
125 static gboolean gst_funnel_sink_query (GstPad * pad, GstObject * parent,
126     GstQuery * query);
127
128 static gboolean gst_funnel_src_event (GstPad * pad, GstObject * parent,
129     GstEvent * event);
130
131 static void
132 gst_funnel_dispose (GObject * object)
133 {
134   GList *item;
135
136 restart:
137   for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
138     GstPad *pad = GST_PAD (item->data);
139
140     if (GST_PAD_IS_SINK (pad)) {
141       gst_element_release_request_pad (GST_ELEMENT (object), pad);
142       goto restart;
143     }
144   }
145
146   G_OBJECT_CLASS (parent_class)->dispose (object);
147 }
148
149 static void
150 gst_funnel_class_init (GstFunnelClass * klass)
151 {
152   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
153   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
154
155   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_funnel_dispose);
156
157   gst_element_class_set_static_metadata (gstelement_class,
158       "Funnel pipe fitting", "Generic", "N-to-1 pipe fitting",
159       "Olivier Crete <olivier.crete@collabora.co.uk>");
160
161   gst_element_class_add_pad_template (gstelement_class,
162       gst_static_pad_template_get (&funnel_sink_template));
163   gst_element_class_add_pad_template (gstelement_class,
164       gst_static_pad_template_get (&funnel_src_template));
165
166   gstelement_class->request_new_pad =
167       GST_DEBUG_FUNCPTR (gst_funnel_request_new_pad);
168   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_funnel_release_pad);
169   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_funnel_change_state);
170 }
171
172 static void
173 gst_funnel_init (GstFunnel * funnel)
174 {
175   funnel->srcpad = gst_pad_new_from_static_template (&funnel_src_template,
176       "src");
177   gst_pad_set_event_function (funnel->srcpad, gst_funnel_src_event);
178   gst_pad_use_fixed_caps (funnel->srcpad);
179   gst_element_add_pad (GST_ELEMENT (funnel), funnel->srcpad);
180 }
181
182 static GstPad *
183 gst_funnel_request_new_pad (GstElement * element, GstPadTemplate * templ,
184     const gchar * name, const GstCaps * caps)
185 {
186   GstPad *sinkpad;
187
188   GST_DEBUG_OBJECT (element, "requesting pad");
189
190   sinkpad = GST_PAD_CAST (g_object_new (GST_TYPE_FUNNEL_PAD,
191           "name", name, "direction", templ->direction, "template", templ,
192           NULL));
193
194   gst_pad_set_chain_function (sinkpad,
195       GST_DEBUG_FUNCPTR (gst_funnel_sink_chain));
196   gst_pad_set_event_function (sinkpad,
197       GST_DEBUG_FUNCPTR (gst_funnel_sink_event));
198   gst_pad_set_query_function (sinkpad,
199       GST_DEBUG_FUNCPTR (gst_funnel_sink_query));
200
201   gst_pad_set_active (sinkpad, TRUE);
202
203   gst_element_add_pad (element, sinkpad);
204
205   return sinkpad;
206 }
207
208 static gboolean
209 gst_funnel_all_sinkpads_eos_unlocked (GstFunnel * funnel)
210 {
211   GstElement *element = GST_ELEMENT_CAST (funnel);
212   GList *item;
213
214   if (element->numsinkpads == 0)
215     return FALSE;
216
217   for (item = element->sinkpads; item != NULL; item = g_list_next (item)) {
218     GstFunnelPad *sinkpad = item->data;
219
220     if (!sinkpad->got_eos)
221       return FALSE;
222   }
223
224   return TRUE;
225 }
226
227 static void
228 gst_funnel_release_pad (GstElement * element, GstPad * pad)
229 {
230   GstFunnel *funnel = GST_FUNNEL (element);
231   GstFunnelPad *fpad = GST_FUNNEL_PAD_CAST (pad);
232   gboolean send_eos = FALSE;
233
234   GST_DEBUG_OBJECT (funnel, "releasing pad");
235
236   gst_pad_set_active (pad, FALSE);
237
238   gst_element_remove_pad (GST_ELEMENT_CAST (funnel), pad);
239
240   GST_OBJECT_LOCK (funnel);
241   if (!fpad->got_eos && gst_funnel_all_sinkpads_eos_unlocked (funnel)) {
242     GST_DEBUG_OBJECT (funnel, "Pad removed. All others are EOS. Sending EOS");
243     send_eos = TRUE;
244   }
245   GST_OBJECT_UNLOCK (funnel);
246
247   if (send_eos)
248     if (!gst_pad_push_event (funnel->srcpad, gst_event_new_eos ()))
249       GST_WARNING_OBJECT (funnel, "Failure pushing EOS");
250 }
251
252 static GstFlowReturn
253 gst_funnel_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
254 {
255   GstFlowReturn res;
256   GstFunnel *funnel = GST_FUNNEL (parent);
257   GstFunnelPad *fpad = GST_FUNNEL_PAD_CAST (pad);
258   GstEvent *event = NULL;
259   GstClockTime newts;
260 #if 0
261   GstCaps *padcaps;
262 #endif
263
264   GST_DEBUG_OBJECT (funnel, "received buffer %p", buffer);
265
266   GST_OBJECT_LOCK (funnel);
267
268   if (fpad->got_eos) {
269     GST_OBJECT_UNLOCK (funnel);
270     GST_WARNING_OBJECT (funnel, "Got buffer on pad that received EOS");
271     res = GST_FLOW_EOS;
272     gst_buffer_unref (buffer);
273     goto out;
274   }
275
276   if (fpad->segment.format == GST_FORMAT_UNDEFINED) {
277     GST_WARNING_OBJECT (funnel, "Got buffer without segment,"
278         " setting segment [0,inf[");
279     gst_segment_init (&fpad->segment, GST_FORMAT_TIME);
280   }
281
282   if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_TIMESTAMP (buffer)))
283     fpad->segment.position = GST_BUFFER_TIMESTAMP (buffer);
284
285   newts = gst_segment_to_running_time (&fpad->segment,
286       fpad->segment.format, GST_BUFFER_TIMESTAMP (buffer));
287   if (newts != GST_BUFFER_TIMESTAMP (buffer)) {
288     buffer = gst_buffer_make_writable (buffer);
289     GST_BUFFER_TIMESTAMP (buffer) = newts;
290   }
291
292   if (!funnel->has_segment) {
293     GstSegment segment;
294
295     gst_segment_init (&segment, GST_FORMAT_TIME);
296     event = gst_event_new_segment (&segment);
297     funnel->has_segment = TRUE;
298   }
299   GST_OBJECT_UNLOCK (funnel);
300
301   if (event) {
302     if (!gst_pad_push_event (funnel->srcpad, event))
303       GST_WARNING_OBJECT (funnel, "Could not push out newsegment event");
304   }
305 #if 0
306   GST_OBJECT_LOCK (pad);
307   padcaps = GST_PAD_CAPS (funnel->srcpad);
308   GST_OBJECT_UNLOCK (pad);
309
310   if (GST_BUFFER_CAPS (buffer) && GST_BUFFER_CAPS (buffer) != padcaps) {
311     if (!gst_pad_set_caps (funnel->srcpad, GST_BUFFER_CAPS (buffer))) {
312       res = GST_FLOW_NOT_NEGOTIATED;
313       gst_buffer_unref (buffer);
314       goto out;
315     }
316   }
317 #endif
318
319   res = gst_pad_push (funnel->srcpad, buffer);
320
321   GST_LOG_OBJECT (funnel, "handled buffer %s", gst_flow_get_name (res));
322
323 out:
324
325   return res;
326 }
327
328 static gboolean
329 gst_funnel_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
330 {
331   GstFunnel *funnel = GST_FUNNEL (parent);
332   GstFunnelPad *fpad = GST_FUNNEL_PAD_CAST (pad);
333   gboolean forward = TRUE;
334   gboolean res = TRUE;
335
336   switch (GST_EVENT_TYPE (event)) {
337     case GST_EVENT_SEGMENT:
338     {
339       GST_OBJECT_LOCK (funnel);
340       gst_event_copy_segment (event, &fpad->segment);
341       GST_OBJECT_UNLOCK (funnel);
342
343       forward = FALSE;
344       break;
345     }
346     case GST_EVENT_FLUSH_STOP:
347     {
348       GST_OBJECT_LOCK (funnel);
349       gst_segment_init (&fpad->segment, GST_FORMAT_UNDEFINED);
350       funnel->has_segment = FALSE;
351       fpad->got_eos = FALSE;
352       GST_OBJECT_UNLOCK (funnel);
353     }
354       break;
355     case GST_EVENT_EOS:
356     {
357       GST_OBJECT_LOCK (funnel);
358       fpad->got_eos = TRUE;
359
360       if (!gst_funnel_all_sinkpads_eos_unlocked (funnel)) {
361         GST_DEBUG_OBJECT (funnel,
362             "Got EOS, but not from all sinkpads. Skipping");
363         forward = FALSE;
364       } else {
365         GST_DEBUG_OBJECT (funnel, "Got EOS from all sinkpads. Forwarding");
366       }
367       GST_OBJECT_UNLOCK (funnel);
368       break;
369     }
370     default:
371       break;
372   }
373
374   if (forward)
375     res = gst_pad_push_event (funnel->srcpad, event);
376   else
377     gst_event_unref (event);
378
379   return res;
380 }
381
382 static gboolean
383 gst_funnel_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
384 {
385   GstFunnel *funnel = GST_FUNNEL (parent);
386   gboolean forward = TRUE;
387   gboolean res = TRUE;
388
389   if (forward)
390     res = gst_pad_peer_query (funnel->srcpad, query);
391
392   return res;
393 }
394
395 static gboolean
396 gst_funnel_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
397 {
398   GstElement *funnel;
399   GstIterator *iter;
400   GstPad *sinkpad;
401   gboolean result = FALSE;
402   gboolean done = FALSE;
403   GValue value = { 0, };
404
405   funnel = GST_ELEMENT_CAST (parent);
406
407   iter = gst_element_iterate_sink_pads (funnel);
408
409   while (!done) {
410     switch (gst_iterator_next (iter, &value)) {
411       case GST_ITERATOR_OK:
412         sinkpad = g_value_get_object (&value);
413         gst_event_ref (event);
414         result |= gst_pad_push_event (sinkpad, event);
415         g_value_reset (&value);
416         break;
417       case GST_ITERATOR_RESYNC:
418         gst_iterator_resync (iter);
419         result = FALSE;
420         break;
421       case GST_ITERATOR_ERROR:
422         GST_WARNING_OBJECT (funnel, "Error iterating sinkpads");
423       case GST_ITERATOR_DONE:
424         done = TRUE;
425         break;
426     }
427   }
428   g_value_unset (&value);
429   gst_iterator_free (iter);
430   gst_event_unref (event);
431
432   return result;
433 }
434
435 static void
436 reset_pad (const GValue * data, gpointer user_data)
437 {
438   GstPad *pad = g_value_get_object (data);
439   GstFunnelPad *fpad = GST_FUNNEL_PAD_CAST (pad);
440
441   GST_OBJECT_LOCK (pad);
442   gst_funnel_pad_reset (fpad);
443   GST_OBJECT_UNLOCK (pad);
444 }
445
446 static GstStateChangeReturn
447 gst_funnel_change_state (GstElement * element, GstStateChange transition)
448 {
449   GstFunnel *funnel = GST_FUNNEL (element);
450   GstStateChangeReturn ret;
451
452   switch (transition) {
453     case GST_STATE_CHANGE_READY_TO_PAUSED:
454     {
455       GstIterator *iter = gst_element_iterate_sink_pads (element);
456       GstIteratorResult res;
457
458       do {
459         res = gst_iterator_foreach (iter, reset_pad, NULL);
460       } while (res == GST_ITERATOR_RESYNC);
461
462       gst_iterator_free (iter);
463
464       if (res == GST_ITERATOR_ERROR)
465         return GST_STATE_CHANGE_FAILURE;
466
467       GST_OBJECT_LOCK (funnel);
468       funnel->has_segment = FALSE;
469       GST_OBJECT_UNLOCK (funnel);
470     }
471       break;
472     default:
473       break;
474   }
475
476   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
477
478   return ret;
479 }