- make aggregator actually work
[platform/upstream/gstreamer.git] / gst / elements / gstaggregator.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wim.taymans@chello.be>
4  *
5  * gstaggregator.c: Aggregator element, N in 1 out
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 #include "gstaggregator.h"
24
25
26 GstElementDetails gst_aggregator_details = {
27   "Aggregator pipe fitting",
28   "Generic",
29   "LGPL",
30   "N-to-1 pipe fitting",
31   VERSION,
32   "Wim Taymans <wim.taymans@chello.be>",
33   "(C) 2001",
34 };
35
36 /* Aggregator signals and args */
37 enum {
38   /* FILL ME */
39   LAST_SIGNAL
40 };
41
42 enum {
43   ARG_0,
44   ARG_NUM_PADS,
45   ARG_SILENT,
46   ARG_SCHED,
47   ARG_LAST_MESSAGE,
48   /* FILL ME */
49 };
50
51 GST_PAD_TEMPLATE_FACTORY (aggregator_src_factory,
52   "sink%d",
53   GST_PAD_SINK,
54   GST_PAD_REQUEST,
55   NULL                  /* no caps */
56 );
57
58 #define GST_TYPE_AGGREGATOR_SCHED (gst_aggregator_sched_get_type())
59 static GType
60 gst_aggregator_sched_get_type (void)
61 {
62   static GType aggregator_sched_type = 0;
63   static GEnumValue aggregator_sched[] = {
64     { AGGREGATOR_LOOP,          "1", "Loop Based"},
65     { AGGREGATOR_LOOP_SELECT,   "3", "Loop Based Select"},
66     { AGGREGATOR_CHAIN,         "4", "Chain Based"},
67     {0, NULL, NULL},
68   };
69   if (!aggregator_sched_type) {
70     aggregator_sched_type = g_enum_register_static ("GstAggregatorSched", aggregator_sched);
71   }
72   return aggregator_sched_type;
73 }
74
75 #define AGGREGATOR_IS_LOOP_BASED(ag)    ((ag)->sched != AGGREGATOR_CHAIN)
76
77 static void     gst_aggregator_class_init       (GstAggregatorClass *klass);
78 static void     gst_aggregator_init             (GstAggregator *aggregator);
79
80 static GstPad*  gst_aggregator_request_new_pad  (GstElement *element, GstPadTemplate *temp, const
81                                                  gchar *unused);
82 static void     gst_aggregator_update_functions (GstAggregator *aggregator);
83
84 static void     gst_aggregator_set_property     (GObject *object, guint prop_id, 
85                                                  const GValue *value, GParamSpec *pspec);
86 static void     gst_aggregator_get_property     (GObject *object, guint prop_id, 
87                                                  GValue *value, GParamSpec *pspec);
88
89 static void     gst_aggregator_chain            (GstPad *pad, GstBuffer *buf);
90 static void     gst_aggregator_loop             (GstElement *element);
91
92 static GstElementClass *parent_class = NULL;
93 /*static guint gst_aggregator_signals[LAST_SIGNAL] = { 0 };*/
94
95 GType
96 gst_aggregator_get_type (void) 
97 {
98   static GType aggregator_type = 0;
99
100   if (!aggregator_type) {
101     static const GTypeInfo aggregator_info = {
102       sizeof(GstAggregatorClass),      
103       NULL,
104       NULL,
105       (GClassInitFunc)gst_aggregator_class_init,
106       NULL,
107       NULL,
108       sizeof(GstAggregator),
109       0,
110       (GInstanceInitFunc)gst_aggregator_init,
111     };
112     aggregator_type = g_type_register_static (GST_TYPE_ELEMENT, "GstAggregator", &aggregator_info, 0);
113   }
114   return aggregator_type;
115 }
116
117 static void
118 gst_aggregator_class_init (GstAggregatorClass *klass) 
119 {
120   GObjectClass *gobject_class;
121   GstElementClass *gstelement_class;
122
123   gobject_class = (GObjectClass*) klass;
124   gstelement_class = (GstElementClass*) klass;
125
126   parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
127
128   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_NUM_PADS,
129     g_param_spec_int ("num_pads", "Num pads", "The number of source pads",
130                       0, G_MAXINT, 0, G_PARAM_READABLE)); 
131   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SILENT,
132     g_param_spec_boolean ("silent", "Silent", "Don't produce messages",
133                       FALSE, G_PARAM_READWRITE)); 
134   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SCHED,
135     g_param_spec_enum ("sched", "Scheduling", "The type of scheduling this element should use",
136                       GST_TYPE_AGGREGATOR_SCHED, AGGREGATOR_CHAIN, G_PARAM_READWRITE)); 
137   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LAST_MESSAGE,
138     g_param_spec_string ("last_message", "Last message", "The current state of the element",
139                          NULL, G_PARAM_READABLE));
140
141   gobject_class->set_property = GST_DEBUG_FUNCPTR(gst_aggregator_set_property);
142   gobject_class->get_property = GST_DEBUG_FUNCPTR(gst_aggregator_get_property);
143
144   gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR(gst_aggregator_request_new_pad);
145 }
146
147 static void 
148 gst_aggregator_init (GstAggregator *aggregator) 
149 {
150   aggregator->srcpad = gst_pad_new ("src", GST_PAD_SRC);
151   gst_element_add_pad (GST_ELEMENT (aggregator), aggregator->srcpad);
152
153   aggregator->numsinkpads = 0;
154   aggregator->sinkpads = NULL;
155   aggregator->silent = FALSE;
156   aggregator->sched = AGGREGATOR_LOOP;
157
158   gst_aggregator_update_functions (aggregator);
159 }
160
161 static GstPad*
162 gst_aggregator_request_new_pad (GstElement *element, GstPadTemplate *templ, const gchar *unused) 
163 {
164   gchar *name;
165   GstPad *sinkpad;
166   GstAggregator *aggregator;
167
168   g_return_val_if_fail (GST_IS_AGGREGATOR (element), NULL);
169
170   if (templ->direction != GST_PAD_SINK) {
171     g_warning ("gstaggregator: request new pad that is not a SRC pad\n");
172     return NULL;
173   }
174
175   aggregator = GST_AGGREGATOR (element);
176
177   name = g_strdup_printf ("sink%d",aggregator->numsinkpads);
178   
179   sinkpad = gst_pad_new_from_template (templ, name);
180   g_free (name);
181   
182   if (!AGGREGATOR_IS_LOOP_BASED (aggregator)) {
183     gst_pad_set_chain_function (sinkpad, gst_aggregator_chain);
184   }
185   gst_element_add_pad (GST_ELEMENT (aggregator), sinkpad);
186   
187   aggregator->sinkpads = g_list_prepend (aggregator->sinkpads, sinkpad);
188   aggregator->numsinkpads++;
189   
190   return sinkpad;
191 }
192
193 static void
194 gst_aggregator_update_functions (GstAggregator *aggregator)
195 {
196   GList *pads;
197
198   if (AGGREGATOR_IS_LOOP_BASED (aggregator)) {
199     gst_element_set_loop_function (GST_ELEMENT (aggregator), GST_DEBUG_FUNCPTR (gst_aggregator_loop));
200   }
201   else {
202     gst_element_set_loop_function (GST_ELEMENT (aggregator), NULL);
203   }
204
205   pads = aggregator->sinkpads;
206   while (pads) {
207     GstPad *pad = GST_PAD (pads->data);
208                           
209     if (AGGREGATOR_IS_LOOP_BASED (aggregator)) {
210       gst_pad_set_get_function (pad, NULL);
211     }
212     else {
213       gst_element_set_loop_function (GST_ELEMENT (aggregator), NULL);
214     }
215     pads = g_list_next (pads);
216   }
217 }
218
219 static void
220 gst_aggregator_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
221 {
222   GstAggregator *aggregator;
223
224   /* it's not null if we got it, but it might not be ours */
225   g_return_if_fail (GST_IS_AGGREGATOR (object));
226
227   aggregator = GST_AGGREGATOR (object);
228
229   switch (prop_id) {
230     case ARG_SILENT:
231       aggregator->silent = g_value_get_boolean (value);
232       break;
233     case ARG_SCHED:
234       aggregator->sched = g_value_get_enum (value);
235       gst_aggregator_update_functions (aggregator);
236       break;
237     default:
238       break;
239   }
240 }
241
242 static void
243 gst_aggregator_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
244 {
245   GstAggregator *aggregator;
246
247   /* it's not null if we got it, but it might not be ours */
248   g_return_if_fail (GST_IS_AGGREGATOR (object));
249
250   aggregator = GST_AGGREGATOR (object);
251
252   switch (prop_id) {
253     case ARG_NUM_PADS:
254       g_value_set_int (value, aggregator->numsinkpads);
255       break;
256     case ARG_SILENT:
257       g_value_set_boolean (value, aggregator->silent);
258       break;
259     case ARG_SCHED:
260       g_value_set_enum (value, aggregator->sched);
261       break;
262     case ARG_LAST_MESSAGE:
263       g_value_set_string (value, aggregator->last_message);
264       break;
265     default:
266       break;
267   }
268 }
269
270 static void 
271 gst_aggregator_push (GstAggregator *aggregator, GstPad *pad, GstBuffer *buf, guchar *debug) 
272 {
273   if (!aggregator->silent) {
274     g_free (aggregator->last_message);
275
276     aggregator->last_message = g_strdup_printf ("%10.10s ******* (%s:%s)a (%d bytes, %"
277                                                 G_GUINT64_FORMAT ")",
278             debug, GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
279
280     g_object_notify (G_OBJECT (aggregator), "last_message");
281   }
282
283   gst_pad_push (aggregator->srcpad, buf);
284 }
285
286 static void 
287 gst_aggregator_loop (GstElement *element) 
288 {
289   GstAggregator *aggregator;
290   GstBuffer *buf;
291   guchar *debug;
292
293   aggregator = GST_AGGREGATOR (element);
294
295   if (aggregator->sched == AGGREGATOR_LOOP) {
296     GList *pads = aggregator->sinkpads;
297
298     while (pads) {
299       GstPad *pad = GST_PAD (pads->data);
300       pads = g_list_next (pads);
301
302       g_print ("inspecting pad %s:%s\n", GST_DEBUG_PAD_NAME (pad));
303       if (GST_PAD_IS_ACTIVE (pad)) {
304         buf = gst_pad_pull (pad);
305         debug = "loop";
306
307         gst_aggregator_push (aggregator, pad, buf, debug);
308       }
309     }
310   }
311   else {
312     if (aggregator->sched == AGGREGATOR_LOOP_SELECT) {
313       GstPad *pad;
314
315       debug = "loop_select";
316
317       pad = gst_pad_select (aggregator->sinkpads);
318       buf = gst_pad_pull (pad);
319
320       gst_aggregator_push (aggregator, pad, buf, debug);
321     }
322     else {
323       g_assert_not_reached ();
324     }
325   }
326 }
327
328 /**
329  * gst_aggregator_chain:
330  * @pad: the pad to follow
331  * @buf: the buffer to pass
332  *
333  * Chain a buffer on a pad.
334  */
335 static void 
336 gst_aggregator_chain (GstPad *pad, GstBuffer *buf) 
337 {
338   GstAggregator *aggregator;
339
340   g_return_if_fail (pad != NULL);
341   g_return_if_fail (GST_IS_PAD (pad));
342   g_return_if_fail (buf != NULL);
343
344   aggregator = GST_AGGREGATOR (gst_pad_get_parent (pad));
345 /*  gst_trace_add_entry (NULL, 0, buf, "aggregator buffer");*/
346
347   gst_aggregator_push (aggregator, pad, buf, "chain");
348 }
349
350 gboolean
351 gst_aggregator_factory_init (GstElementFactory *factory)
352 {
353   gst_element_factory_add_pad_template (factory, GST_PAD_TEMPLATE_GET (aggregator_src_factory));
354
355   return TRUE;
356 }
357