rtpmux: Implement stream locking, needed for DTMF
[platform/upstream/gstreamer.git] / gst / rtpmanager / gstrtpmux.c
1 /* RTP muxer element for GStreamer
2  *
3  * gstrtpmux.c:
4  *
5  * Copyright (C) <2007> Nokia Corporation.
6  *   Contact: Zeeshan Ali <zeeshan.ali@nokia.com>
7  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
8  *               2000,2005 Wim Taymans <wim@fluendo.com>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 02111-1307, USA.
24  */
25
26 /**
27  * SECTION:element-rtpmux
28  * @short_description: Muxer that takes one or several RTP streams
29  * and muxes them to a single rtp stream.
30  *
31  * <refsect2>
32  * </refsect2>
33  */
34
35 #ifdef HAVE_CONFIG_H
36 #include "config.h"
37 #endif
38
39 #include <gst/gst.h>
40 #include <gst/rtp/gstrtpbuffer.h>
41
42 #include <string.h>
43
44 GST_DEBUG_CATEGORY_STATIC (gst_rtp_mux_debug);
45 #define GST_CAT_DEFAULT gst_rtp_mux_debug
46
47 #define GST_TYPE_RTP_MUX (gst_rtp_mux_get_type())
48 #define GST_RTP_MUX(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTP_MUX, GstRTPMux))
49 #define GST_RTP_MUX_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTP_MUX, GstRTPMux))
50 #define GST_IS_RTP_MUX(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_MUX))
51 #define GST_IS_RTP_MUX_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_MUX))
52
53 typedef struct _GstRTPMux GstRTPMux;
54 typedef struct _GstRTPMuxClass GstRTPMuxClass;
55
56 /**
57  * GstRTPMux:
58  *
59  * The opaque #GstRTPMux structure.
60  */
61 struct _GstRTPMux
62 {
63   GstElement element;
64
65   /* pad */
66   GstPad *srcpad;
67
68   /* sinkpads */
69   gint numpads;
70   GstPad *special_pad;
71
72   guint16  seqnum_base;
73   gint16   seqnum_offset;
74   guint16  seqnum;
75 };
76
77 struct _GstRTPMuxClass
78 {
79   GstElementClass parent_class;
80 };
81
82 /* elementfactory information */
83 static const GstElementDetails gst_rtp_mux_details =
84 GST_ELEMENT_DETAILS ("RTP muxer",
85     "Codec/Muxer",
86     "mux rtp streams",
87     "Zeeshan Ali <first.last@nokia.com>");
88
89 enum
90 {
91   ARG_0,
92   PROP_SEQNUM_OFFSET,
93   PROP_SEQNUM
94   /* FILL ME */
95 };
96
97 #define DEFAULT_SEQNUM_OFFSET           -1
98
99 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
100     GST_PAD_SRC,
101     GST_PAD_ALWAYS,
102     GST_STATIC_CAPS ("application/x-rtp")
103     );
104
105 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink_%d",
106     GST_PAD_SINK,
107     GST_PAD_REQUEST,
108     GST_STATIC_CAPS ("application/x-rtp")
109     );
110
111 static void gst_rtp_mux_base_init (gpointer g_class);
112 static void gst_rtp_mux_class_init (GstRTPMuxClass * klass);
113 static void gst_rtp_mux_init (GstRTPMux * rtp_mux);
114
115 static void gst_rtp_mux_finalize (GObject * object);
116
117 static gboolean gst_rtp_mux_handle_sink_event (GstPad * pad,
118     GstEvent * event);
119 static GstPad *gst_rtp_mux_request_new_pad (GstElement * element,
120     GstPadTemplate * templ, const gchar * name);
121 static GstFlowReturn gst_rtp_mux_chain (GstPad * pad,
122     GstBuffer * buffer);
123 static gboolean gst_rtp_mux_setcaps (GstPad *pad, GstCaps *caps);
124
125 static GstStateChangeReturn gst_rtp_mux_change_state (GstElement *
126     element, GstStateChange transition);
127
128 static void gst_rtp_mux_set_property (GObject * object, guint prop_id,
129     const GValue * value, GParamSpec * pspec);
130 static void gst_rtp_mux_get_property (GObject * object, guint prop_id,
131     GValue * value, GParamSpec * pspec);
132
133 static GstElementClass *parent_class = NULL;
134
135 GType
136 gst_rtp_mux_get_type (void)
137 {
138   static GType rtp_mux_type = 0;
139
140   if (!rtp_mux_type) {
141     static const GTypeInfo rtp_mux_info = {
142       sizeof (GstRTPMuxClass),
143       gst_rtp_mux_base_init,
144       NULL,
145       (GClassInitFunc) gst_rtp_mux_class_init,
146       NULL,
147       NULL,
148       sizeof (GstRTPMux),
149       0,
150       (GInstanceInitFunc) gst_rtp_mux_init,
151     };
152
153     rtp_mux_type =
154         g_type_register_static (GST_TYPE_ELEMENT, "GstRTPMux",
155         &rtp_mux_info, 0);
156   }
157   return rtp_mux_type;
158 }
159
160 static void
161 gst_rtp_mux_base_init (gpointer g_class)
162 {
163   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
164
165   gst_element_class_add_pad_template (element_class,
166       gst_static_pad_template_get (&src_factory));
167   gst_element_class_add_pad_template (element_class,
168       gst_static_pad_template_get (&sink_factory));
169
170   gst_element_class_set_details (element_class, &gst_rtp_mux_details);
171 }
172
173 static void
174 gst_rtp_mux_class_init (GstRTPMuxClass * klass)
175 {
176   GObjectClass *gobject_class;
177   GstElementClass *gstelement_class;
178
179   gobject_class = (GObjectClass *) klass;
180   gstelement_class = (GstElementClass *) klass;
181
182   parent_class = g_type_class_peek_parent (klass);
183
184   gobject_class->finalize = gst_rtp_mux_finalize;
185   gobject_class->get_property = gst_rtp_mux_get_property;
186   gobject_class->set_property = gst_rtp_mux_set_property;
187
188   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEQNUM_OFFSET,
189       g_param_spec_int ("seqnum-offset", "Sequence number Offset",
190           "Offset to add to all outgoing seqnum (-1 = random)", -1, G_MAXINT,
191           DEFAULT_SEQNUM_OFFSET, G_PARAM_READWRITE));
192   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEQNUM,
193       g_param_spec_uint ("seqnum", "Sequence number",
194           "The RTP sequence number of the last processed packet",
195           0, G_MAXUINT, 0, G_PARAM_READABLE));
196
197   gstelement_class->request_new_pad = gst_rtp_mux_request_new_pad;
198   gstelement_class->change_state = gst_rtp_mux_change_state;
199 }
200
201 static void
202 gst_rtp_mux_init (GstRTPMux * rtp_mux)
203 {
204   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtp_mux);
205
206   rtp_mux->srcpad =
207       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
208           "src"), "src");
209   gst_element_add_pad (GST_ELEMENT (rtp_mux), rtp_mux->srcpad);
210
211   rtp_mux->seqnum_offset = DEFAULT_SEQNUM_OFFSET;
212 }
213
214 static void
215 gst_rtp_mux_finalize (GObject * object)
216 {
217   GstRTPMux *rtp_mux;
218
219   rtp_mux = GST_RTP_MUX (object);
220
221   G_OBJECT_CLASS (parent_class)->finalize (object);
222 }
223
224 static GstPad *
225 gst_rtp_mux_request_new_pad (GstElement * element,
226     GstPadTemplate * templ, const gchar * req_name)
227 {
228   GstRTPMux *rtp_mux;
229   GstPad *newpad;
230   GstElementClass *klass = GST_ELEMENT_GET_CLASS (element);
231
232   g_return_val_if_fail (templ != NULL, NULL);
233
234   rtp_mux = GST_RTP_MUX (element);
235
236   if (templ->direction != GST_PAD_SINK) {
237     GST_WARNING_OBJECT (rtp_mux, "request pad that is not a SINK pad");
238     return NULL;
239   }
240
241   g_return_val_if_fail (GST_IS_RTP_MUX (element), NULL);
242
243   if (templ == gst_element_class_get_pad_template (klass, "sink_%d")) {
244     gchar *name;
245
246     /* create new pad with the name */
247     name = g_strdup_printf ("sink_%02d", rtp_mux->numpads);
248     newpad = gst_pad_new_from_template (templ, name);
249     g_free (name);
250
251     rtp_mux->numpads++;
252   } else {
253     GST_WARNING_OBJECT (rtp_mux, "this is not our template!\n");
254     return NULL;
255   }
256
257   /* setup some pad functions */
258   gst_pad_set_chain_function (newpad, gst_rtp_mux_chain);
259   gst_pad_set_setcaps_function (newpad, gst_rtp_mux_setcaps);
260   gst_pad_set_event_function (newpad, gst_rtp_mux_handle_sink_event);
261
262   /* dd the pad to the element */
263   gst_element_add_pad (element, newpad);
264
265   return newpad;
266 }
267
268 static GstFlowReturn
269 gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer)
270 {
271   GstRTPMux *rtp_mux;
272
273   rtp_mux = GST_RTP_MUX (gst_pad_get_parent (pad));
274
275   rtp_mux->seqnum++;
276   GST_LOG_OBJECT (rtp_mux, "setting RTP seqnum %d", rtp_mux->seqnum);
277   gst_rtp_buffer_set_seq (buffer, rtp_mux->seqnum);
278
279   gst_object_unref (rtp_mux);
280
281   GST_DEBUG_OBJECT (rtp_mux, "Pushing packet size %d, seq=%d, ts=%u",
282       GST_BUFFER_SIZE (buffer), rtp_mux->seqnum - 1);
283
284   return gst_pad_push (rtp_mux->srcpad, buffer);
285 }
286
287 static gboolean
288 gst_rtp_mux_setcaps (GstPad *pad, GstCaps *caps)
289 {
290   /*GstRTPMux *rtp_mux;
291   GstCaps *old_caps;
292   GstCaps *new_caps;
293   gint i;
294   gboolean ret;
295
296   rtp_mux = GST_RTP_MUX (gst_pad_get_parent (pad));
297
298   new_caps = gst_caps_copy (caps);
299
300   / * We want our own seq base on the caps * /
301   for (i=0; i< gst_caps_get_size (new_caps); i++) {
302      GstStructure *structure = gst_caps_get_structure (new_caps, i);
303      gst_structure_set (structure,
304              "seqnum-base", G_TYPE_UINT, rtp_mux->seqnum_base, NULL);
305   }
306
307   old_caps = GST_PAD_CAPS (rtp_mux->srcpad);
308   if (old_caps != NULL) {
309     new_caps = gst_caps_union (old_caps, new_caps);
310   }
311
312   GST_DEBUG_OBJECT (rtp_mux,
313           "seting caps %" GST_PTR_FORMAT " on src pad..", caps);
314   ret = gst_pad_set_caps (rtp_mux->srcpad, new_caps);
315   gst_caps_unref (new_caps);
316
317   return ret;*/
318
319   return TRUE;
320 }
321
322 static void
323 gst_rtp_mux_set_sinkpads_blocked (GstRTPMux *rtp_mux, gboolean blocked, GstPad *exception)
324 {
325   GstIterator *iter;
326   GstPad *pad;
327
328   GST_DEBUG_OBJECT (rtp_mux, "blocking all sink pads except: %s",
329           GST_ELEMENT_NAME (exception));
330
331   iter = gst_element_iterate_sink_pads (GST_ELEMENT (rtp_mux));
332   while (gst_iterator_next (iter, (gpointer) &pad) == GST_ITERATOR_OK) {
333     if (pad != exception) {
334       GstPad *peer;
335
336       peer = gst_pad_get_peer (pad);
337
338       GST_DEBUG_OBJECT (rtp_mux, "blocking pad %s..", GST_ELEMENT_NAME (pad));
339       gst_pad_set_blocked (peer, blocked);
340       GST_DEBUG_OBJECT (rtp_mux, "pad %s blocked", GST_ELEMENT_NAME (pad));
341
342       gst_object_unref (GST_OBJECT (peer));
343     }
344     gst_object_unref (GST_OBJECT (pad));
345   }
346 }
347
348 static gboolean
349 gst_rtp_mux_handle_sink_event (GstPad * pad, GstEvent * event)
350 {
351   GstRTPMux *rtp_mux;
352   GstEventType type;
353
354   rtp_mux = GST_RTP_MUX (gst_pad_get_parent (pad));
355
356   type = event ? GST_EVENT_TYPE (event) : GST_EVENT_UNKNOWN;
357
358   switch (type) {
359     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
360     {
361       const GstStructure *structure;
362
363       structure = gst_event_get_structure (event);
364       /* FIXME: is this event generic enough to be given a generic name? */
365       if (structure && gst_structure_has_name (structure, "stream-lock")) {
366         gboolean lock;
367
368         if (!gst_structure_get_boolean (structure, "lock", &lock))
369           break;
370
371         if (lock) {
372           if (rtp_mux->special_pad != NULL) {
373               GST_WARNING_OBJECT (rtp_mux,
374                       "Stream lock already acquired by pad %s",
375                       GST_ELEMENT_NAME (rtp_mux->special_pad));
376               break;
377           }
378
379           rtp_mux->special_pad = gst_object_ref (pad);
380         }
381
382         else {
383           if (rtp_mux->special_pad == NULL) {
384               GST_WARNING_OBJECT (rtp_mux,
385                       "Stream lock not acquired, can't release it");
386               break;
387           }
388
389           else if (pad != rtp_mux->special_pad) {
390               GST_WARNING_OBJECT (rtp_mux,
391                       "pad %s attempted to release Stream lock"
392                       " which was acquired by pad %s", GST_ELEMENT_NAME (pad),
393                       GST_ELEMENT_NAME (rtp_mux->special_pad));
394               break;
395           }
396
397           gst_object_unref (rtp_mux->special_pad);
398           rtp_mux->special_pad = NULL;
399         }
400
401         gst_rtp_mux_set_sinkpads_blocked (rtp_mux, lock, pad);
402       }
403       break;
404     }
405     default:
406       break;
407   }
408
409   gst_object_unref (rtp_mux);
410
411   return gst_pad_event_default (pad, event);
412 }
413
414 static void
415 gst_rtp_mux_get_property (GObject * object,
416     guint prop_id, GValue * value, GParamSpec * pspec)
417 {
418   GstRTPMux *rtp_mux;
419
420   rtp_mux = GST_RTP_MUX (object);
421
422   switch (prop_id) {
423     case PROP_SEQNUM_OFFSET:
424       g_value_set_int (value, rtp_mux->seqnum_offset);
425       break;
426     case PROP_SEQNUM:
427       g_value_set_uint (value, rtp_mux->seqnum);
428       break;
429     default:
430       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
431       break;
432   }
433 }
434
435 static void
436 gst_rtp_mux_set_property (GObject * object,
437     guint prop_id, const GValue * value, GParamSpec * pspec)
438 {
439   GstRTPMux *rtp_mux;
440
441   rtp_mux = GST_RTP_MUX (object);
442
443   switch (prop_id) {
444     case PROP_SEQNUM_OFFSET:
445       rtp_mux->seqnum_offset = g_value_get_int (value);
446       break;
447     default:
448       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
449       break;
450   }
451 }
452
453 static GstStateChangeReturn
454 gst_rtp_mux_change_state (GstElement * element, GstStateChange transition)
455 {
456   GstRTPMux *rtp_mux;
457   GstStateChangeReturn ret;
458
459   rtp_mux = GST_RTP_MUX (element);
460
461   switch (transition) {
462     case GST_STATE_CHANGE_NULL_TO_READY:
463       break;
464     case GST_STATE_CHANGE_READY_TO_PAUSED:
465       if (rtp_mux->seqnum_offset == -1)
466           rtp_mux->seqnum_base = g_random_int_range (0, G_MAXUINT16);
467       else
468           rtp_mux->seqnum_base = rtp_mux->seqnum_offset;
469       rtp_mux->seqnum = rtp_mux->seqnum_base;
470       break;
471     case GST_STATE_CHANGE_PAUSED_TO_READY:
472       break;
473     default:
474       break;
475   }
476
477   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
478   if (ret == GST_STATE_CHANGE_FAILURE)
479     return ret;
480
481   switch (transition) {
482     default:
483       break;
484   }
485
486   return ret;
487 }
488
489 gboolean
490 gst_rtp_mux_plugin_init (GstPlugin * plugin)
491 {
492   GST_DEBUG_CATEGORY_INIT (gst_rtp_mux_debug, "rtpmux", 0,
493       "rtp muxer");
494
495   return gst_element_register (plugin, "rtpmux", GST_RANK_NONE,
496       GST_TYPE_RTP_MUX);
497 }
498
499 GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
500     GST_VERSION_MINOR,
501     "rtpmux",
502     "RTP muxer",
503     gst_rtp_mux_plugin_init,
504     VERSION,
505     "LGPL",
506     "Farsight",
507     "http://farsight.sf.net")