d8dad7e2c6166bb39cbe93651c31d4136356d35e
[platform/upstream/gst-plugins-good.git] / gst / oldcore / gstaggregator.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wim.taymans@chello.be>
4  *
5  * gstaggregator.c: Aggregator element, N in 1 out
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #  include "config.h"
25 #endif
26
27 #include "gstaggregator.h"
28
29 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
30     GST_PAD_SRC,
31     GST_PAD_ALWAYS,
32     GST_STATIC_CAPS_ANY);
33
34 GST_DEBUG_CATEGORY_STATIC (gst_aggregator_debug);
35 #define GST_CAT_DEFAULT gst_aggregator_debug
36
37 GstElementDetails gst_aggregator_details =
38 GST_ELEMENT_DETAILS ("Aggregator pipe fitting",
39     "Generic",
40     "N-to-1 pipe fitting",
41     "Wim Taymans <wim.taymans@chello.be>");
42
43 /* Aggregator signals and args */
44 enum
45 {
46   /* FILL ME */
47   LAST_SIGNAL
48 };
49
50 enum
51 {
52   ARG_0,
53   ARG_NUM_PADS,
54   ARG_SILENT,
55   ARG_SCHED,
56   ARG_LAST_MESSAGE
57       /* FILL ME */
58 };
59
60 GstStaticPadTemplate aggregator_src_template =
61 GST_STATIC_PAD_TEMPLATE ("sink%d",
62     GST_PAD_SINK,
63     GST_PAD_REQUEST,
64     GST_STATIC_CAPS_ANY);
65
66 #define GST_TYPE_AGGREGATOR_SCHED (gst_aggregator_sched_get_type())
67 static GType
68 gst_aggregator_sched_get_type (void)
69 {
70   static GType aggregator_sched_type = 0;
71   static GEnumValue aggregator_sched[] = {
72     {AGGREGATOR_LOOP, "1", "Loop Based"},
73     {AGGREGATOR_LOOP_SELECT, "3", "Loop Based Select"},
74     {AGGREGATOR_CHAIN, "4", "Chain Based"},
75     {0, NULL, NULL},
76   };
77
78   if (!aggregator_sched_type) {
79     aggregator_sched_type =
80         g_enum_register_static ("GstAggregatorSched", aggregator_sched);
81   }
82   return aggregator_sched_type;
83 }
84
85 #define AGGREGATOR_IS_LOOP_BASED(ag)    ((ag)->sched != AGGREGATOR_CHAIN)
86
87 static GstPad *gst_aggregator_request_new_pad (GstElement * element,
88     GstPadTemplate * temp, const gchar * unused);
89 static void gst_aggregator_update_functions (GstAggregator * aggregator);
90
91 static void gst_aggregator_finalize (GObject * object);
92 static void gst_aggregator_set_property (GObject * object, guint prop_id,
93     const GValue * value, GParamSpec * pspec);
94 static void gst_aggregator_get_property (GObject * object, guint prop_id,
95     GValue * value, GParamSpec * pspec);
96
97 static void gst_aggregator_chain (GstPad * pad, GstData * _data);
98 static void gst_aggregator_loop (GstElement * element);
99
100 #define _do_init(bla) \
101   GST_DEBUG_CATEGORY_INIT (gst_aggregator_debug, "aggregator", 0, "aggregator element");
102
103 GST_BOILERPLATE_FULL (GstAggregator, gst_aggregator, GstElement,
104     GST_TYPE_ELEMENT, _do_init);
105
106 static void
107 gst_aggregator_base_init (gpointer g_class)
108 {
109   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
110
111   gst_element_class_add_pad_template (gstelement_class,
112       gst_static_pad_template_get (&aggregator_src_template));
113   gst_element_class_add_pad_template (gstelement_class,
114       gst_static_pad_template_get (&srctemplate));
115   gst_element_class_set_details (gstelement_class, &gst_aggregator_details);
116 }
117
118 static void
119 gst_aggregator_finalize (GObject * object)
120 {
121   GstAggregator *aggregator;
122
123   aggregator = GST_AGGREGATOR (object);
124
125   g_list_free (aggregator->sinkpads);
126   g_free (aggregator->last_message);
127
128   G_OBJECT_CLASS (parent_class)->finalize (object);
129 }
130
131 static void
132 gst_aggregator_class_init (GstAggregatorClass * klass)
133 {
134   GObjectClass *gobject_class;
135   GstElementClass *gstelement_class;
136
137   gobject_class = (GObjectClass *) klass;
138   gstelement_class = (GstElementClass *) klass;
139
140   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_aggregator_set_property);
141   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_aggregator_get_property);
142
143   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_NUM_PADS,
144       g_param_spec_int ("num_pads", "Num pads", "The number of source pads",
145           0, G_MAXINT, 0, G_PARAM_READABLE));
146   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SILENT,
147       g_param_spec_boolean ("silent", "Silent", "Don't produce messages",
148           FALSE, G_PARAM_READWRITE));
149   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SCHED,
150       g_param_spec_enum ("sched", "Scheduling",
151           "The type of scheduling this element should use",
152           GST_TYPE_AGGREGATOR_SCHED, AGGREGATOR_CHAIN, G_PARAM_READWRITE));
153   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LAST_MESSAGE,
154       g_param_spec_string ("last_message", "Last message",
155           "The current state of the element", NULL, G_PARAM_READABLE));
156
157   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_aggregator_finalize);
158
159   gstelement_class->request_new_pad =
160       GST_DEBUG_FUNCPTR (gst_aggregator_request_new_pad);
161 }
162
163 static void
164 gst_aggregator_init (GstAggregator * aggregator, GstAggregatorClass * g_class)
165 {
166   aggregator->srcpad =
167       gst_pad_new_from_template (gst_static_pad_template_get (&srctemplate),
168       "src");
169   gst_pad_set_getcaps_function (aggregator->srcpad, gst_pad_proxy_getcaps);
170   gst_element_add_pad (GST_ELEMENT (aggregator), aggregator->srcpad);
171
172   aggregator->numsinkpads = 0;
173   aggregator->sinkpads = NULL;
174   aggregator->silent = FALSE;
175   aggregator->sched = AGGREGATOR_LOOP;
176   aggregator->last_message = NULL;
177
178   gst_aggregator_update_functions (aggregator);
179 }
180
181 static GstPad *
182 gst_aggregator_request_new_pad (GstElement * element, GstPadTemplate * templ,
183     const gchar * unused)
184 {
185   gchar *name;
186   GstPad *sinkpad;
187   GstAggregator *aggregator;
188
189   g_return_val_if_fail (GST_IS_AGGREGATOR (element), NULL);
190
191   if (templ->direction != GST_PAD_SINK) {
192     g_warning ("gstaggregator: request new pad that is not a sink pad\n");
193     return NULL;
194   }
195
196   aggregator = GST_AGGREGATOR (element);
197
198   name = g_strdup_printf ("sink%d", aggregator->numsinkpads);
199
200   sinkpad = gst_pad_new_from_template (templ, name);
201   g_free (name);
202
203   if (!AGGREGATOR_IS_LOOP_BASED (aggregator)) {
204     gst_pad_set_chain_function (sinkpad, gst_aggregator_chain);
205   }
206   gst_pad_set_getcaps_function (sinkpad, gst_pad_proxy_getcaps);
207   gst_element_add_pad (GST_ELEMENT (aggregator), sinkpad);
208
209   aggregator->sinkpads = g_list_prepend (aggregator->sinkpads, sinkpad);
210   aggregator->numsinkpads++;
211
212   return sinkpad;
213 }
214
215 static void
216 gst_aggregator_update_functions (GstAggregator * aggregator)
217 {
218   GList *pads;
219
220   if (AGGREGATOR_IS_LOOP_BASED (aggregator)) {
221     gst_element_set_loop_function (GST_ELEMENT (aggregator),
222         GST_DEBUG_FUNCPTR (gst_aggregator_loop));
223   } else {
224     gst_element_set_loop_function (GST_ELEMENT (aggregator), NULL);
225   }
226
227   pads = aggregator->sinkpads;
228   while (pads) {
229     GstPad *pad = GST_PAD (pads->data);
230
231     if (AGGREGATOR_IS_LOOP_BASED (aggregator)) {
232       gst_pad_set_get_function (pad, NULL);
233     } else {
234       gst_element_set_loop_function (GST_ELEMENT (aggregator), NULL);
235     }
236     pads = g_list_next (pads);
237   }
238 }
239
240 static void
241 gst_aggregator_set_property (GObject * object, guint prop_id,
242     const GValue * value, GParamSpec * pspec)
243 {
244   GstAggregator *aggregator;
245
246   g_return_if_fail (GST_IS_AGGREGATOR (object));
247
248   aggregator = GST_AGGREGATOR (object);
249
250   switch (prop_id) {
251     case ARG_SILENT:
252       aggregator->silent = g_value_get_boolean (value);
253       break;
254     case ARG_SCHED:
255       aggregator->sched = g_value_get_enum (value);
256       gst_aggregator_update_functions (aggregator);
257       break;
258     default:
259       break;
260   }
261 }
262
263 static void
264 gst_aggregator_get_property (GObject * object, guint prop_id, GValue * value,
265     GParamSpec * pspec)
266 {
267   GstAggregator *aggregator;
268
269   g_return_if_fail (GST_IS_AGGREGATOR (object));
270
271   aggregator = GST_AGGREGATOR (object);
272
273   switch (prop_id) {
274     case ARG_NUM_PADS:
275       g_value_set_int (value, aggregator->numsinkpads);
276       break;
277     case ARG_SILENT:
278       g_value_set_boolean (value, aggregator->silent);
279       break;
280     case ARG_SCHED:
281       g_value_set_enum (value, aggregator->sched);
282       break;
283     case ARG_LAST_MESSAGE:
284       g_value_set_string (value, aggregator->last_message);
285       break;
286     default:
287       break;
288   }
289 }
290
291 static void
292 gst_aggregator_push (GstAggregator * aggregator, GstPad * pad, GstBuffer * buf,
293     guchar * debug)
294 {
295   if (!aggregator->silent) {
296     g_free (aggregator->last_message);
297
298     aggregator->last_message =
299         g_strdup_printf ("%10.10s ******* (%s:%s)a (%d bytes, %"
300         G_GUINT64_FORMAT ")", debug, GST_DEBUG_PAD_NAME (pad),
301         GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
302
303     g_object_notify (G_OBJECT (aggregator), "last_message");
304   }
305
306   gst_pad_push (aggregator->srcpad, GST_DATA (buf));
307 }
308
309 static void
310 gst_aggregator_loop (GstElement * element)
311 {
312   GstAggregator *aggregator;
313   GstBuffer *buf;
314   guchar *debug;
315
316   aggregator = GST_AGGREGATOR (element);
317
318   if (aggregator->sched == AGGREGATOR_LOOP) {
319     GList *pads = aggregator->sinkpads;
320
321     /* we'll loop over all pads and try to pull from all
322      * active ones */
323     while (pads) {
324       GstPad *pad = GST_PAD (pads->data);
325
326       pads = g_list_next (pads);
327
328       /* we need to check is the pad is usable. IS_USABLE will check
329        * if the pad is linked, if it is enabled (the element is
330        * playing and the app didn't gst_pad_set_enabled (pad, FALSE))
331        * and that the peer pad is also enabled.
332        */
333       if (GST_PAD_IS_USABLE (pad)) {
334         buf = GST_BUFFER (gst_pad_pull (pad));
335         debug = "loop";
336
337         /* then push it forward */
338         gst_aggregator_push (aggregator, pad, buf, debug);
339       }
340     }
341   } else {
342     if (aggregator->sched == AGGREGATOR_LOOP_SELECT) {
343       GstPad *pad;
344
345       debug = "loop_select";
346
347       buf = GST_BUFFER (gst_pad_collectv (&pad, aggregator->sinkpads));
348
349       gst_aggregator_push (aggregator, pad, buf, debug);
350     } else {
351       g_assert_not_reached ();
352     }
353   }
354 }
355
356 /**
357  * gst_aggregator_chain:
358  * @pad: the pad to follow
359  * @buf: the buffer to pass
360  *
361  * Chain a buffer on a pad.
362  */
363 static void
364 gst_aggregator_chain (GstPad * pad, GstData * _data)
365 {
366   GstBuffer *buf = GST_BUFFER (_data);
367   GstAggregator *aggregator;
368
369   g_return_if_fail (pad != NULL);
370   g_return_if_fail (GST_IS_PAD (pad));
371   g_return_if_fail (buf != NULL);
372
373   aggregator = GST_AGGREGATOR (gst_pad_get_parent (pad));
374 /*  gst_trace_add_entry (NULL, 0, buf, "aggregator buffer");*/
375
376   gst_aggregator_push (aggregator, pad, buf, "chain");
377 }