Merge branch 'plugin-move-rtp-opus'
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpmux.c
1 /* RTP muxer element for GStreamer
2  *
3  * gstrtpmux.c:
4  *
5  * Copyright (C) <2007-2010> Nokia Corporation.
6  *   Contact: Zeeshan Ali <zeeshan.ali@nokia.com>
7  * Copyright (C) <2007-2010> Collabora Ltd
8  *   Contact: Olivier Crete <olivier.crete@collabora.co.uk>
9  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
10  *               2000,2005 Wim Taymans <wim@fluendo.com>
11  *
12  * This library is free software; you can redistribute it and/or
13  * modify it under the terms of the GNU Library General Public
14  * License as published by the Free Software Foundation; either
15  * version 2 of the License, or (at your option) any later version.
16  *
17  * This library is distributed in the hope that it will be useful,
18  * but WITHOUT ANY WARRANTY; without even the implied warranty of
19  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20  * Library General Public License for more details.
21  *
22  * You should have received a copy of the GNU Library General Public
23  * License along with this library; if not, write to the
24  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
25  * Boston, MA 02110-1301, USA.
26  */
27
28 /**
29  * SECTION:element-rtpmux
30  * @see_also: rtpdtmfmux
31  *
32  * The rtp muxer takes multiple RTP streams having the same clock-rate and
33  * muxes into a single stream with a single SSRC.
34  *
35  * <refsect2>
36  * <title>Example pipelines</title>
37  * |[
38  * gst-launch-1.0 rtpmux name=mux ! udpsink host=127.0.0.1 port=8888        \
39  *              alsasrc ! alawenc ! rtppcmapay !                        \
40  *              application/x-rtp, payload=8, rate=8000 ! mux.sink_0    \
41  *              audiotestsrc is-live=1 !                                \
42  *              mulawenc ! rtppcmupay !                                 \
43  *              application/x-rtp, payload=0, rate=8000 ! mux.sink_1
44  * ]|
45  * In this example, an audio stream is captured from ALSA and another is
46  * generated, both are encoded into different payload types and muxed together
47  * so they can be sent on the same port.
48  * </refsect2>
49  */
50
51 #ifdef HAVE_CONFIG_H
52 #include "config.h"
53 #endif
54
55 #include <gst/gst.h>
56 #include <gst/rtp/gstrtpbuffer.h>
57 #include <string.h>
58
59 #include "gstrtpmux.h"
60
61 GST_DEBUG_CATEGORY_STATIC (gst_rtp_mux_debug);
62 #define GST_CAT_DEFAULT gst_rtp_mux_debug
63
64 enum
65 {
66   PROP_0,
67   PROP_TIMESTAMP_OFFSET,
68   PROP_SEQNUM_OFFSET,
69   PROP_SEQNUM,
70   PROP_SSRC
71 };
72
73 #define DEFAULT_TIMESTAMP_OFFSET -1
74 #define DEFAULT_SEQNUM_OFFSET    -1
75 #define DEFAULT_SSRC             -1
76
77 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
78     GST_PAD_SRC,
79     GST_PAD_ALWAYS,
80     GST_STATIC_CAPS ("application/x-rtp")
81     );
82
83 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink_%u",
84     GST_PAD_SINK,
85     GST_PAD_REQUEST,
86     GST_STATIC_CAPS ("application/x-rtp")
87     );
88
89 static GstPad *gst_rtp_mux_request_new_pad (GstElement * element,
90     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
91 static void gst_rtp_mux_release_pad (GstElement * element, GstPad * pad);
92 static GstFlowReturn gst_rtp_mux_chain (GstPad * pad, GstObject * parent,
93     GstBuffer * buffer);
94 static GstFlowReturn gst_rtp_mux_chain_list (GstPad * pad, GstObject * parent,
95     GstBufferList * bufferlist);
96 static gboolean gst_rtp_mux_setcaps (GstPad * pad, GstRTPMux * rtp_mux,
97     GstCaps * caps);
98 static gboolean gst_rtp_mux_sink_event (GstPad * pad, GstObject * parent,
99     GstEvent * event);
100 static gboolean gst_rtp_mux_sink_query (GstPad * pad, GstObject * parent,
101     GstQuery * query);
102
103 static GstStateChangeReturn gst_rtp_mux_change_state (GstElement *
104     element, GstStateChange transition);
105
106 static void gst_rtp_mux_set_property (GObject * object, guint prop_id,
107     const GValue * value, GParamSpec * pspec);
108 static void gst_rtp_mux_get_property (GObject * object, guint prop_id,
109     GValue * value, GParamSpec * pspec);
110 static void gst_rtp_mux_dispose (GObject * object);
111
112 static gboolean gst_rtp_mux_src_event_real (GstRTPMux * rtp_mux,
113     GstEvent * event);
114
115 G_DEFINE_TYPE (GstRTPMux, gst_rtp_mux, GST_TYPE_ELEMENT);
116
117
118 static void
119 gst_rtp_mux_class_init (GstRTPMuxClass * klass)
120 {
121   GObjectClass *gobject_class;
122   GstElementClass *gstelement_class;
123
124   gobject_class = (GObjectClass *) klass;
125   gstelement_class = (GstElementClass *) klass;
126
127
128   gst_element_class_add_pad_template (gstelement_class,
129       gst_static_pad_template_get (&src_factory));
130   gst_element_class_add_pad_template (gstelement_class,
131       gst_static_pad_template_get (&sink_factory));
132
133   gst_element_class_set_static_metadata (gstelement_class, "RTP muxer",
134       "Codec/Muxer",
135       "multiplex N rtp streams into one", "Zeeshan Ali <first.last@nokia.com>");
136
137   gobject_class->get_property = gst_rtp_mux_get_property;
138   gobject_class->set_property = gst_rtp_mux_set_property;
139   gobject_class->dispose = gst_rtp_mux_dispose;
140
141   klass->src_event = gst_rtp_mux_src_event_real;
142
143   g_object_class_install_property (G_OBJECT_CLASS (klass),
144       PROP_TIMESTAMP_OFFSET, g_param_spec_int ("timestamp-offset",
145           "Timestamp Offset",
146           "Offset to add to all outgoing timestamps (-1 = random)", -1,
147           G_MAXINT, DEFAULT_TIMESTAMP_OFFSET,
148           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
149   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEQNUM_OFFSET,
150       g_param_spec_int ("seqnum-offset", "Sequence number Offset",
151           "Offset to add to all outgoing seqnum (-1 = random)", -1, G_MAXINT,
152           DEFAULT_SEQNUM_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
153   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEQNUM,
154       g_param_spec_uint ("seqnum", "Sequence number",
155           "The RTP sequence number of the last processed packet",
156           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
157   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SSRC,
158       g_param_spec_uint ("ssrc", "SSRC",
159           "The SSRC of the packets (default == random)",
160           0, G_MAXUINT, DEFAULT_SSRC,
161           GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
162           G_PARAM_STATIC_STRINGS));
163
164   gstelement_class->request_new_pad =
165       GST_DEBUG_FUNCPTR (gst_rtp_mux_request_new_pad);
166   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_mux_release_pad);
167   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_mux_change_state);
168 }
169
170 static void
171 gst_rtp_mux_dispose (GObject * object)
172 {
173   GstRTPMux *rtp_mux = GST_RTP_MUX (object);
174   GList *item;
175
176   g_clear_object (&rtp_mux->last_pad);
177
178 restart:
179   for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
180     GstPad *pad = GST_PAD (item->data);
181     if (GST_PAD_IS_SINK (pad)) {
182       gst_element_release_request_pad (GST_ELEMENT (object), pad);
183       goto restart;
184     }
185   }
186
187   G_OBJECT_CLASS (gst_rtp_mux_parent_class)->dispose (object);
188 }
189
190 static gboolean
191 gst_rtp_mux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
192 {
193   GstRTPMux *rtp_mux = GST_RTP_MUX (parent);
194   GstRTPMuxClass *klass;
195   gboolean ret;
196
197   klass = GST_RTP_MUX_GET_CLASS (rtp_mux);
198
199   ret = klass->src_event (rtp_mux, event);
200
201   return ret;
202 }
203
204 static gboolean
205 gst_rtp_mux_src_event_real (GstRTPMux * rtp_mux, GstEvent * event)
206 {
207   switch (GST_EVENT_TYPE (event)) {
208     case GST_EVENT_CUSTOM_UPSTREAM:
209     {
210       const GstStructure *s = gst_event_get_structure (event);
211
212       if (gst_structure_has_name (s, "GstRTPCollision")) {
213         guint ssrc = 0;
214
215         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
216           ssrc = -1;
217
218         GST_DEBUG_OBJECT (rtp_mux, "collided ssrc: %" G_GUINT32_FORMAT, ssrc);
219
220         /* choose another ssrc for our stream */
221         GST_OBJECT_LOCK (rtp_mux);
222         if (ssrc == rtp_mux->current_ssrc) {
223           GstCaps *caps;
224           guint suggested_ssrc = 0;
225           guint32 new_ssrc;
226
227           if (gst_structure_get_uint (s, "suggested-ssrc", &suggested_ssrc))
228             rtp_mux->current_ssrc = suggested_ssrc;
229
230           while (ssrc == rtp_mux->current_ssrc)
231             rtp_mux->current_ssrc = g_random_int ();
232
233           new_ssrc = rtp_mux->current_ssrc;
234           GST_OBJECT_UNLOCK (rtp_mux);
235
236           caps = gst_pad_get_current_caps (rtp_mux->srcpad);
237           caps = gst_caps_make_writable (caps);
238           gst_caps_set_simple (caps, "ssrc", G_TYPE_UINT, new_ssrc, NULL);
239           gst_pad_set_caps (rtp_mux->srcpad, caps);
240           gst_caps_unref (caps);
241         } else {
242           GST_OBJECT_UNLOCK (rtp_mux);
243         }
244       }
245       break;
246     }
247     default:
248       break;
249   }
250
251
252   return gst_pad_event_default (rtp_mux->srcpad, GST_OBJECT (rtp_mux), event);
253 }
254
255 static void
256 gst_rtp_mux_init (GstRTPMux * rtp_mux)
257 {
258   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtp_mux);
259
260   rtp_mux->srcpad =
261       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
262           "src"), "src");
263   gst_pad_set_event_function (rtp_mux->srcpad,
264       GST_DEBUG_FUNCPTR (gst_rtp_mux_src_event));
265   gst_pad_use_fixed_caps (rtp_mux->srcpad);
266   gst_element_add_pad (GST_ELEMENT (rtp_mux), rtp_mux->srcpad);
267
268   rtp_mux->ssrc = DEFAULT_SSRC;
269   rtp_mux->current_ssrc = DEFAULT_SSRC;
270   rtp_mux->ssrc_random = TRUE;
271   rtp_mux->ts_offset = DEFAULT_TIMESTAMP_OFFSET;
272   rtp_mux->seqnum_offset = DEFAULT_SEQNUM_OFFSET;
273
274   rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
275 }
276
277 static void
278 gst_rtp_mux_setup_sinkpad (GstRTPMux * rtp_mux, GstPad * sinkpad)
279 {
280   GstRTPMuxPadPrivate *padpriv = g_slice_new0 (GstRTPMuxPadPrivate);
281
282   /* setup some pad functions */
283   gst_pad_set_chain_function (sinkpad, GST_DEBUG_FUNCPTR (gst_rtp_mux_chain));
284   gst_pad_set_chain_list_function (sinkpad,
285       GST_DEBUG_FUNCPTR (gst_rtp_mux_chain_list));
286   gst_pad_set_event_function (sinkpad,
287       GST_DEBUG_FUNCPTR (gst_rtp_mux_sink_event));
288   gst_pad_set_query_function (sinkpad,
289       GST_DEBUG_FUNCPTR (gst_rtp_mux_sink_query));
290
291
292   gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED);
293
294   gst_pad_set_element_private (sinkpad, padpriv);
295
296   gst_pad_set_active (sinkpad, TRUE);
297   gst_element_add_pad (GST_ELEMENT (rtp_mux), sinkpad);
298 }
299
300 static GstPad *
301 gst_rtp_mux_request_new_pad (GstElement * element,
302     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
303 {
304   GstRTPMux *rtp_mux;
305   GstPad *newpad;
306
307   g_return_val_if_fail (templ != NULL, NULL);
308   g_return_val_if_fail (GST_IS_RTP_MUX (element), NULL);
309
310   rtp_mux = GST_RTP_MUX (element);
311
312   if (templ->direction != GST_PAD_SINK) {
313     GST_WARNING_OBJECT (rtp_mux, "request pad that is not a SINK pad");
314     return NULL;
315   }
316
317   newpad = gst_pad_new_from_template (templ, req_name);
318   if (newpad)
319     gst_rtp_mux_setup_sinkpad (rtp_mux, newpad);
320   else
321     GST_WARNING_OBJECT (rtp_mux, "failed to create request pad");
322
323   return newpad;
324 }
325
326 static void
327 gst_rtp_mux_release_pad (GstElement * element, GstPad * pad)
328 {
329   GstRTPMuxPadPrivate *padpriv;
330
331   GST_OBJECT_LOCK (element);
332   padpriv = gst_pad_get_element_private (pad);
333   gst_pad_set_element_private (pad, NULL);
334   GST_OBJECT_UNLOCK (element);
335
336   gst_element_remove_pad (element, pad);
337
338   if (padpriv) {
339     g_slice_free (GstRTPMuxPadPrivate, padpriv);
340   }
341 }
342
343 /* Put our own timestamp-offset on the buffer */
344 static void
345 gst_rtp_mux_readjust_rtp_timestamp_locked (GstRTPMux * rtp_mux,
346     GstRTPMuxPadPrivate * padpriv, GstRTPBuffer * rtpbuffer)
347 {
348   guint32 ts;
349   guint32 sink_ts_base = 0;
350
351   if (padpriv && padpriv->have_timestamp_offset)
352     sink_ts_base = padpriv->timestamp_offset;
353
354   ts = gst_rtp_buffer_get_timestamp (rtpbuffer) - sink_ts_base +
355       rtp_mux->ts_base;
356   GST_LOG_OBJECT (rtp_mux, "Re-adjusting RTP ts %u to %u",
357       gst_rtp_buffer_get_timestamp (rtpbuffer), ts);
358   gst_rtp_buffer_set_timestamp (rtpbuffer, ts);
359 }
360
361 static gboolean
362 process_buffer_locked (GstRTPMux * rtp_mux, GstRTPMuxPadPrivate * padpriv,
363     GstRTPBuffer * rtpbuffer)
364 {
365   GstRTPMuxClass *klass = GST_RTP_MUX_GET_CLASS (rtp_mux);
366
367   if (klass->accept_buffer_locked)
368     if (!klass->accept_buffer_locked (rtp_mux, padpriv, rtpbuffer))
369       return FALSE;
370
371   rtp_mux->seqnum++;
372   gst_rtp_buffer_set_seq (rtpbuffer, rtp_mux->seqnum);
373
374   gst_rtp_buffer_set_ssrc (rtpbuffer, rtp_mux->current_ssrc);
375   gst_rtp_mux_readjust_rtp_timestamp_locked (rtp_mux, padpriv, rtpbuffer);
376   GST_LOG_OBJECT (rtp_mux,
377       "Pushing packet size %" G_GSIZE_FORMAT ", seq=%d, ts=%u",
378       rtpbuffer->map[0].size, rtp_mux->seqnum,
379       gst_rtp_buffer_get_timestamp (rtpbuffer));
380
381   if (padpriv) {
382     if (padpriv->segment.format == GST_FORMAT_TIME)
383       GST_BUFFER_PTS (rtpbuffer->buffer) =
384           gst_segment_to_running_time (&padpriv->segment, GST_FORMAT_TIME,
385           GST_BUFFER_PTS (rtpbuffer->buffer));
386   }
387
388   return TRUE;
389 }
390
391 struct BufferListData
392 {
393   GstRTPMux *rtp_mux;
394   GstRTPMuxPadPrivate *padpriv;
395   gboolean drop;
396 };
397
398 static gboolean
399 process_list_item (GstBuffer ** buffer, guint idx, gpointer user_data)
400 {
401   struct BufferListData *bd = user_data;
402   GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
403
404   *buffer = gst_buffer_make_writable (*buffer);
405
406   gst_rtp_buffer_map (*buffer, GST_MAP_READWRITE, &rtpbuffer);
407
408   bd->drop = !process_buffer_locked (bd->rtp_mux, bd->padpriv, &rtpbuffer);
409
410   gst_rtp_buffer_unmap (&rtpbuffer);
411
412   if (bd->drop)
413     return FALSE;
414
415   if (GST_BUFFER_DURATION_IS_VALID (*buffer) &&
416       GST_BUFFER_PTS_IS_VALID (*buffer))
417     bd->rtp_mux->last_stop = GST_BUFFER_PTS (*buffer) +
418         GST_BUFFER_DURATION (*buffer);
419   else
420     bd->rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
421
422   return TRUE;
423 }
424
425 static GstFlowReturn
426 gst_rtp_mux_chain_list (GstPad * pad, GstObject * parent,
427     GstBufferList * bufferlist)
428 {
429   GstRTPMux *rtp_mux;
430   GstFlowReturn ret;
431   GstRTPMuxPadPrivate *padpriv;
432   struct BufferListData bd;
433
434   rtp_mux = GST_RTP_MUX (parent);
435
436   if (gst_pad_check_reconfigure (rtp_mux->srcpad)) {
437     GstCaps *current_caps = gst_pad_get_current_caps (pad);
438
439     if (!gst_rtp_mux_setcaps (pad, rtp_mux, current_caps)) {
440       ret = GST_FLOW_NOT_NEGOTIATED;
441       gst_buffer_list_unref (bufferlist);
442       goto out;
443     }
444     gst_caps_unref (current_caps);
445   }
446
447   GST_OBJECT_LOCK (rtp_mux);
448
449   padpriv = gst_pad_get_element_private (pad);
450   if (!padpriv) {
451     GST_OBJECT_UNLOCK (rtp_mux);
452     ret = GST_FLOW_NOT_LINKED;
453     gst_buffer_list_unref (bufferlist);
454     goto out;
455   }
456
457   bd.rtp_mux = rtp_mux;
458   bd.padpriv = padpriv;
459   bd.drop = FALSE;
460
461   bufferlist = gst_buffer_list_make_writable (bufferlist);
462   gst_buffer_list_foreach (bufferlist, process_list_item, &bd);
463
464   GST_OBJECT_UNLOCK (rtp_mux);
465
466   if (bd.drop) {
467     gst_buffer_list_unref (bufferlist);
468     ret = GST_FLOW_OK;
469   } else {
470     ret = gst_pad_push_list (rtp_mux->srcpad, bufferlist);
471   }
472
473 out:
474
475   return ret;
476 }
477
478 static gboolean
479 resend_events (GstPad * pad, GstEvent ** event, gpointer user_data)
480 {
481   GstRTPMux *rtp_mux = user_data;
482
483   if (GST_EVENT_TYPE (*event) == GST_EVENT_CAPS) {
484     GstCaps *caps;
485
486     gst_event_parse_caps (*event, &caps);
487     gst_rtp_mux_setcaps (pad, rtp_mux, caps);
488   } else {
489     gst_pad_push_event (rtp_mux->srcpad, gst_event_ref (*event));
490   }
491
492   return TRUE;
493 }
494
495 static GstFlowReturn
496 gst_rtp_mux_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
497 {
498   GstRTPMux *rtp_mux;
499   GstFlowReturn ret;
500   GstRTPMuxPadPrivate *padpriv;
501   gboolean drop;
502   gboolean changed = FALSE;
503   GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
504
505   rtp_mux = GST_RTP_MUX (parent);
506
507   if (gst_pad_check_reconfigure (rtp_mux->srcpad)) {
508     GstCaps *current_caps = gst_pad_get_current_caps (pad);
509
510     if (!gst_rtp_mux_setcaps (pad, rtp_mux, current_caps)) {
511       ret = GST_FLOW_NOT_NEGOTIATED;
512       gst_buffer_unref (buffer);
513       goto out;
514     }
515     gst_caps_unref (current_caps);
516   }
517
518   GST_OBJECT_LOCK (rtp_mux);
519   padpriv = gst_pad_get_element_private (pad);
520
521   if (!padpriv) {
522     GST_OBJECT_UNLOCK (rtp_mux);
523     gst_buffer_unref (buffer);
524     return GST_FLOW_NOT_LINKED;
525   }
526
527   buffer = gst_buffer_make_writable (buffer);
528
529   if (!gst_rtp_buffer_map (buffer, GST_MAP_READWRITE, &rtpbuffer)) {
530     GST_OBJECT_UNLOCK (rtp_mux);
531     gst_buffer_unref (buffer);
532     GST_ERROR_OBJECT (rtp_mux, "Invalid RTP buffer");
533     return GST_FLOW_ERROR;
534   }
535
536   drop = !process_buffer_locked (rtp_mux, padpriv, &rtpbuffer);
537
538   gst_rtp_buffer_unmap (&rtpbuffer);
539
540   if (!drop) {
541     if (pad != rtp_mux->last_pad) {
542       changed = TRUE;
543       g_clear_object (&rtp_mux->last_pad);
544       rtp_mux->last_pad = g_object_ref (pad);
545     }
546
547     if (GST_BUFFER_DURATION_IS_VALID (buffer) &&
548         GST_BUFFER_PTS_IS_VALID (buffer))
549       rtp_mux->last_stop = GST_BUFFER_PTS (buffer) +
550           GST_BUFFER_DURATION (buffer);
551     else
552       rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
553   }
554
555   GST_OBJECT_UNLOCK (rtp_mux);
556
557   if (changed)
558     gst_pad_sticky_events_foreach (pad, resend_events, rtp_mux);
559
560   if (drop) {
561     gst_buffer_unref (buffer);
562     ret = GST_FLOW_OK;
563   } else {
564     ret = gst_pad_push (rtp_mux->srcpad, buffer);
565   }
566
567 out:
568   return ret;
569 }
570
571 static gboolean
572 gst_rtp_mux_setcaps (GstPad * pad, GstRTPMux * rtp_mux, GstCaps * caps)
573 {
574   GstStructure *structure;
575   gboolean ret = FALSE;
576   GstRTPMuxPadPrivate *padpriv;
577   GstCaps *peercaps;
578
579   if (!gst_caps_is_fixed (caps))
580     return FALSE;
581
582   peercaps = gst_pad_peer_query_caps (rtp_mux->srcpad, NULL);
583   if (peercaps) {
584     GstCaps *tcaps, *othercaps;;
585     tcaps = gst_pad_get_pad_template_caps (pad);
586     othercaps = gst_caps_intersect_full (peercaps, tcaps,
587         GST_CAPS_INTERSECT_FIRST);
588
589     if (gst_caps_get_size (othercaps) > 0) {
590       structure = gst_caps_get_structure (othercaps, 0);
591       GST_OBJECT_LOCK (rtp_mux);
592       if (gst_structure_get_uint (structure, "ssrc", &rtp_mux->current_ssrc)) {
593         GST_DEBUG_OBJECT (pad, "Use downstream ssrc: %x",
594             rtp_mux->current_ssrc);
595         rtp_mux->have_ssrc = TRUE;
596       }
597       GST_OBJECT_UNLOCK (rtp_mux);
598     }
599
600     gst_caps_unref (othercaps);
601
602     gst_caps_unref (peercaps);
603     gst_caps_unref (tcaps);
604   }
605
606   structure = gst_caps_get_structure (caps, 0);
607
608   if (!structure)
609     return FALSE;
610
611   GST_OBJECT_LOCK (rtp_mux);
612   padpriv = gst_pad_get_element_private (pad);
613   if (padpriv &&
614       gst_structure_get_uint (structure, "timestamp-offset",
615           &padpriv->timestamp_offset)) {
616     padpriv->have_timestamp_offset = TRUE;
617   }
618
619   caps = gst_caps_copy (caps);
620
621   /* if we don't have a specified ssrc, first try to take one from the caps,
622      and if that fails, generate one */
623   if (!rtp_mux->have_ssrc) {
624     if (rtp_mux->ssrc_random) {
625       if (!gst_structure_get_uint (structure, "ssrc", &rtp_mux->current_ssrc))
626         rtp_mux->current_ssrc = g_random_int ();
627       rtp_mux->have_ssrc = TRUE;
628     }
629   }
630
631   gst_caps_set_simple (caps,
632       "timestamp-offset", G_TYPE_UINT, rtp_mux->ts_base,
633       "seqnum-offset", G_TYPE_UINT, rtp_mux->seqnum_base,
634       "ssrc", G_TYPE_UINT, rtp_mux->current_ssrc, NULL);
635
636   GST_OBJECT_UNLOCK (rtp_mux);
637
638   if (rtp_mux->send_stream_start) {
639     gchar s_id[32];
640
641     /* stream-start (FIXME: create id based on input ids) */
642     g_snprintf (s_id, sizeof (s_id), "interleave-%08x", g_random_int ());
643     gst_pad_push_event (rtp_mux->srcpad, gst_event_new_stream_start (s_id));
644
645     rtp_mux->send_stream_start = FALSE;
646   }
647
648   GST_DEBUG_OBJECT (rtp_mux,
649       "setting caps %" GST_PTR_FORMAT " on src pad..", caps);
650   ret = gst_pad_set_caps (rtp_mux->srcpad, caps);
651
652
653   gst_caps_unref (caps);
654
655   return ret;
656 }
657
658 static void
659 clear_caps (GstCaps * caps, gboolean only_clock_rate)
660 {
661   gint i, j;
662
663   /* Lets only match on the clock-rate */
664   for (i = 0; i < gst_caps_get_size (caps); i++) {
665     GstStructure *s = gst_caps_get_structure (caps, i);
666
667     for (j = 0; j < gst_structure_n_fields (s); j++) {
668       const gchar *name = gst_structure_nth_field_name (s, j);
669
670       if (strcmp (name, "clock-rate") && (only_clock_rate ||
671               (strcmp (name, "ssrc")))) {
672         gst_structure_remove_field (s, name);
673         j--;
674       }
675     }
676   }
677 }
678
679 static gboolean
680 same_clock_rate_fold (const GValue * item, GValue * ret, gpointer user_data)
681 {
682   GstPad *mypad = user_data;
683   GstPad *pad = g_value_get_object (item);
684   GstCaps *peercaps;
685   GstCaps *accumcaps;
686
687   if (pad == mypad)
688     return TRUE;
689
690   accumcaps = g_value_get_boxed (ret);
691   peercaps = gst_pad_peer_query_caps (pad, accumcaps);
692   if (!peercaps) {
693     g_warning ("no peercaps");
694     return TRUE;
695   }
696   peercaps = gst_caps_make_writable (peercaps);
697   clear_caps (peercaps, TRUE);
698
699   g_value_take_boxed (ret, peercaps);
700
701   return !gst_caps_is_empty (peercaps);
702 }
703
704 static GstCaps *
705 gst_rtp_mux_getcaps (GstPad * pad, GstRTPMux * mux, GstCaps * filter)
706 {
707   GstCaps *caps = NULL;
708   GstIterator *iter = NULL;
709   GValue v = { 0 };
710   GstIteratorResult res;
711   GstCaps *peercaps;
712   GstCaps *othercaps;
713   GstCaps *tcaps;
714
715   peercaps = gst_pad_peer_query_caps (mux->srcpad, NULL);
716
717   if (peercaps) {
718     tcaps = gst_pad_get_pad_template_caps (pad);
719     othercaps = gst_caps_intersect_full (peercaps, tcaps,
720         GST_CAPS_INTERSECT_FIRST);
721     gst_caps_unref (peercaps);
722   } else {
723     tcaps = gst_pad_get_pad_template_caps (mux->srcpad);
724     if (filter)
725       othercaps = gst_caps_intersect_full (filter, tcaps,
726           GST_CAPS_INTERSECT_FIRST);
727     else
728       othercaps = gst_caps_copy (tcaps);
729   }
730   gst_caps_unref (tcaps);
731
732   GST_LOG_OBJECT (pad, "Intersected srcpad-peercaps and template caps: %"
733       GST_PTR_FORMAT, othercaps);
734
735   clear_caps (othercaps, TRUE);
736
737   g_value_init (&v, GST_TYPE_CAPS);
738
739   iter = gst_element_iterate_sink_pads (GST_ELEMENT (mux));
740   do {
741     gst_value_set_caps (&v, othercaps);
742     res = gst_iterator_fold (iter, same_clock_rate_fold, &v, pad);
743     gst_iterator_resync (iter);
744   } while (res == GST_ITERATOR_RESYNC);
745   gst_iterator_free (iter);
746
747   caps = gst_caps_intersect ((GstCaps *) gst_value_get_caps (&v), othercaps);
748
749   g_value_unset (&v);
750   gst_caps_unref (othercaps);
751
752   if (res == GST_ITERATOR_ERROR) {
753     gst_caps_unref (caps);
754     caps = gst_caps_new_empty ();
755   }
756
757
758   return caps;
759 }
760
761 static gboolean
762 gst_rtp_mux_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
763 {
764   GstRTPMux *mux = GST_RTP_MUX (parent);
765   gboolean res = FALSE;
766
767   switch (GST_QUERY_TYPE (query)) {
768     case GST_QUERY_CAPS:
769     {
770       GstCaps *filter, *caps;
771
772       gst_query_parse_caps (query, &filter);
773       GST_LOG_OBJECT (pad, "Received caps-query with filter-caps: %"
774           GST_PTR_FORMAT, filter);
775       caps = gst_rtp_mux_getcaps (pad, mux, filter);
776       gst_query_set_caps_result (query, caps);
777       GST_LOG_OBJECT (mux, "Answering caps-query with caps: %"
778           GST_PTR_FORMAT, caps);
779       gst_caps_unref (caps);
780       res = TRUE;
781       break;
782     }
783     default:
784       res = gst_pad_query_default (pad, parent, query);
785       break;
786   }
787
788   return res;
789 }
790
791 static void
792 gst_rtp_mux_get_property (GObject * object,
793     guint prop_id, GValue * value, GParamSpec * pspec)
794 {
795   GstRTPMux *rtp_mux;
796
797   rtp_mux = GST_RTP_MUX (object);
798
799   GST_OBJECT_LOCK (rtp_mux);
800   switch (prop_id) {
801     case PROP_TIMESTAMP_OFFSET:
802       g_value_set_int (value, rtp_mux->ts_offset);
803       break;
804     case PROP_SEQNUM_OFFSET:
805       g_value_set_int (value, rtp_mux->seqnum_offset);
806       break;
807     case PROP_SEQNUM:
808       g_value_set_uint (value, rtp_mux->seqnum);
809       break;
810     case PROP_SSRC:
811       g_value_set_uint (value, rtp_mux->ssrc);
812       break;
813     default:
814       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
815       break;
816   }
817   GST_OBJECT_UNLOCK (rtp_mux);
818 }
819
820 static void
821 gst_rtp_mux_set_property (GObject * object,
822     guint prop_id, const GValue * value, GParamSpec * pspec)
823 {
824   GstRTPMux *rtp_mux;
825
826   rtp_mux = GST_RTP_MUX (object);
827
828   switch (prop_id) {
829     case PROP_TIMESTAMP_OFFSET:
830       rtp_mux->ts_offset = g_value_get_int (value);
831       break;
832     case PROP_SEQNUM_OFFSET:
833       rtp_mux->seqnum_offset = g_value_get_int (value);
834       break;
835     case PROP_SSRC:
836       GST_OBJECT_LOCK (rtp_mux);
837       rtp_mux->ssrc = g_value_get_uint (value);
838       rtp_mux->current_ssrc = rtp_mux->ssrc;
839       rtp_mux->have_ssrc = TRUE;
840       rtp_mux->ssrc_random = FALSE;
841       GST_OBJECT_UNLOCK (rtp_mux);
842       break;
843     default:
844       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
845       break;
846   }
847 }
848
849 static gboolean
850 gst_rtp_mux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
851 {
852   GstRTPMux *mux = GST_RTP_MUX (parent);
853   gboolean is_pad;
854   gboolean ret;
855
856   switch (GST_EVENT_TYPE (event)) {
857     case GST_EVENT_CAPS:
858     {
859       GstCaps *caps;
860
861       gst_event_parse_caps (event, &caps);
862       GST_LOG_OBJECT (pad, "Received caps-event with caps: %"
863           GST_PTR_FORMAT, caps);
864       ret = gst_rtp_mux_setcaps (pad, mux, caps);
865       gst_event_unref (event);
866       return ret;
867     }
868     case GST_EVENT_FLUSH_STOP:
869     {
870       GST_OBJECT_LOCK (mux);
871       mux->last_stop = GST_CLOCK_TIME_NONE;
872       GST_OBJECT_UNLOCK (mux);
873       break;
874     }
875     case GST_EVENT_SEGMENT:
876     {
877       GstRTPMuxPadPrivate *padpriv;
878
879       GST_OBJECT_LOCK (mux);
880       padpriv = gst_pad_get_element_private (pad);
881
882       if (padpriv) {
883         gst_event_copy_segment (event, &padpriv->segment);
884       }
885       GST_OBJECT_UNLOCK (mux);
886       break;
887     }
888     default:
889       break;
890   }
891
892   GST_OBJECT_LOCK (mux);
893   is_pad = (pad == mux->last_pad);
894   GST_OBJECT_UNLOCK (mux);
895
896   if (is_pad) {
897     return gst_pad_push_event (mux->srcpad, event);
898   } else {
899     gst_event_unref (event);
900     return TRUE;
901   }
902 }
903
904 static void
905 gst_rtp_mux_ready_to_paused (GstRTPMux * rtp_mux)
906 {
907
908   GST_OBJECT_LOCK (rtp_mux);
909
910   g_clear_object (&rtp_mux->last_pad);
911   rtp_mux->send_stream_start = TRUE;
912
913   if (rtp_mux->seqnum_offset == -1)
914     rtp_mux->seqnum_base = g_random_int_range (0, G_MAXUINT16);
915   else
916     rtp_mux->seqnum_base = rtp_mux->seqnum_offset;
917   rtp_mux->seqnum = rtp_mux->seqnum_base;
918
919   if (rtp_mux->ts_offset == -1)
920     rtp_mux->ts_base = g_random_int ();
921   else
922     rtp_mux->ts_base = rtp_mux->ts_offset;
923
924   rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
925
926   if (rtp_mux->ssrc_random) {
927     rtp_mux->have_ssrc = FALSE;
928   } else {
929     rtp_mux->current_ssrc = rtp_mux->ssrc;
930     rtp_mux->have_ssrc = TRUE;
931   }
932
933   GST_DEBUG_OBJECT (rtp_mux, "set timestamp-offset to %u", rtp_mux->ts_base);
934
935   GST_OBJECT_UNLOCK (rtp_mux);
936 }
937
938 static GstStateChangeReturn
939 gst_rtp_mux_change_state (GstElement * element, GstStateChange transition)
940 {
941   GstRTPMux *rtp_mux;
942   GstStateChangeReturn ret;
943
944   rtp_mux = GST_RTP_MUX (element);
945
946   switch (transition) {
947     case GST_STATE_CHANGE_READY_TO_PAUSED:
948       gst_rtp_mux_ready_to_paused (rtp_mux);
949       break;
950     default:
951       break;
952   }
953
954   ret = GST_ELEMENT_CLASS (gst_rtp_mux_parent_class)->change_state (element,
955       transition);
956
957   switch (transition) {
958     case GST_STATE_CHANGE_PAUSED_TO_READY:
959       g_clear_object (&rtp_mux->last_pad);
960       break;
961     default:
962       break;
963   }
964
965   return ret;
966 }
967
968 gboolean
969 gst_rtp_mux_plugin_init (GstPlugin * plugin)
970 {
971   GST_DEBUG_CATEGORY_INIT (gst_rtp_mux_debug, "rtpmux", 0, "rtp muxer");
972
973   return gst_element_register (plugin, "rtpmux", GST_RANK_NONE,
974       GST_TYPE_RTP_MUX);
975 }