gstrtpmux: allow the ssrc-property to decide ssrc on outgoing buffers
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpmux.c
1 /* RTP muxer element for GStreamer
2  *
3  * gstrtpmux.c:
4  *
5  * Copyright (C) <2007-2010> Nokia Corporation.
6  *   Contact: Zeeshan Ali <zeeshan.ali@nokia.com>
7  * Copyright (C) <2007-2010> Collabora Ltd
8  *   Contact: Olivier Crete <olivier.crete@collabora.co.uk>
9  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
10  *               2000,2005 Wim Taymans <wim@fluendo.com>
11  *
12  * This library is free software; you can redistribute it and/or
13  * modify it under the terms of the GNU Library General Public
14  * License as published by the Free Software Foundation; either
15  * version 2 of the License, or (at your option) any later version.
16  *
17  * This library is distributed in the hope that it will be useful,
18  * but WITHOUT ANY WARRANTY; without even the implied warranty of
19  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20  * Library General Public License for more details.
21  *
22  * You should have received a copy of the GNU Library General Public
23  * License along with this library; if not, write to the
24  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
25  * Boston, MA 02110-1301, USA.
26  */
27
28 /**
29  * SECTION:element-rtpmux
30  * @see_also: rtpdtmfmux
31  *
32  * The rtp muxer takes multiple RTP streams having the same clock-rate and
33  * muxes into a single stream with a single SSRC.
34  *
35  * <refsect2>
36  * <title>Example pipelines</title>
37  * |[
38  * gst-launch-1.0 rtpmux name=mux ! udpsink host=127.0.0.1 port=8888        \
39  *              alsasrc ! alawenc ! rtppcmapay !                        \
40  *              application/x-rtp, payload=8, rate=8000 ! mux.sink_0    \
41  *              audiotestsrc is-live=1 !                                \
42  *              mulawenc ! rtppcmupay !                                 \
43  *              application/x-rtp, payload=0, rate=8000 ! mux.sink_1
44  * ]|
45  * In this example, an audio stream is captured from ALSA and another is
46  * generated, both are encoded into different payload types and muxed together
47  * so they can be sent on the same port.
48  * </refsect2>
49  */
50
51 #ifdef HAVE_CONFIG_H
52 #include "config.h"
53 #endif
54
55 #include <gst/gst.h>
56 #include <gst/rtp/gstrtpbuffer.h>
57 #include <string.h>
58
59 #include "gstrtpmux.h"
60
61 GST_DEBUG_CATEGORY_STATIC (gst_rtp_mux_debug);
62 #define GST_CAT_DEFAULT gst_rtp_mux_debug
63
64 enum
65 {
66   PROP_0,
67   PROP_TIMESTAMP_OFFSET,
68   PROP_SEQNUM_OFFSET,
69   PROP_SEQNUM,
70   PROP_SSRC
71 };
72
73 #define DEFAULT_TIMESTAMP_OFFSET -1
74 #define DEFAULT_SEQNUM_OFFSET    -1
75 #define DEFAULT_SSRC             -1
76
77 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
78     GST_PAD_SRC,
79     GST_PAD_ALWAYS,
80     GST_STATIC_CAPS ("application/x-rtp")
81     );
82
83 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink_%u",
84     GST_PAD_SINK,
85     GST_PAD_REQUEST,
86     GST_STATIC_CAPS ("application/x-rtp")
87     );
88
89 static GstPad *gst_rtp_mux_request_new_pad (GstElement * element,
90     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
91 static void gst_rtp_mux_release_pad (GstElement * element, GstPad * pad);
92 static GstFlowReturn gst_rtp_mux_chain (GstPad * pad, GstObject * parent,
93     GstBuffer * buffer);
94 static GstFlowReturn gst_rtp_mux_chain_list (GstPad * pad, GstObject * parent,
95     GstBufferList * bufferlist);
96 static gboolean gst_rtp_mux_setcaps (GstPad * pad, GstRTPMux * rtp_mux,
97     GstCaps * caps);
98 static gboolean gst_rtp_mux_sink_event (GstPad * pad, GstObject * parent,
99     GstEvent * event);
100 static gboolean gst_rtp_mux_sink_query (GstPad * pad, GstObject * parent,
101     GstQuery * query);
102
103 static GstStateChangeReturn gst_rtp_mux_change_state (GstElement *
104     element, GstStateChange transition);
105
106 static void gst_rtp_mux_set_property (GObject * object, guint prop_id,
107     const GValue * value, GParamSpec * pspec);
108 static void gst_rtp_mux_get_property (GObject * object, guint prop_id,
109     GValue * value, GParamSpec * pspec);
110 static void gst_rtp_mux_dispose (GObject * object);
111
112 static gboolean gst_rtp_mux_src_event_real (GstRTPMux * rtp_mux,
113     GstEvent * event);
114
115 G_DEFINE_TYPE (GstRTPMux, gst_rtp_mux, GST_TYPE_ELEMENT);
116
117
118 static void
119 gst_rtp_mux_class_init (GstRTPMuxClass * klass)
120 {
121   GObjectClass *gobject_class;
122   GstElementClass *gstelement_class;
123
124   gobject_class = (GObjectClass *) klass;
125   gstelement_class = (GstElementClass *) klass;
126
127
128   gst_element_class_add_pad_template (gstelement_class,
129       gst_static_pad_template_get (&src_factory));
130   gst_element_class_add_pad_template (gstelement_class,
131       gst_static_pad_template_get (&sink_factory));
132
133   gst_element_class_set_static_metadata (gstelement_class, "RTP muxer",
134       "Codec/Muxer",
135       "multiplex N rtp streams into one", "Zeeshan Ali <first.last@nokia.com>");
136
137   gobject_class->get_property = gst_rtp_mux_get_property;
138   gobject_class->set_property = gst_rtp_mux_set_property;
139   gobject_class->dispose = gst_rtp_mux_dispose;
140
141   klass->src_event = gst_rtp_mux_src_event_real;
142
143   g_object_class_install_property (G_OBJECT_CLASS (klass),
144       PROP_TIMESTAMP_OFFSET, g_param_spec_int ("timestamp-offset",
145           "Timestamp Offset",
146           "Offset to add to all outgoing timestamps (-1 = random)", -1,
147           G_MAXINT, DEFAULT_TIMESTAMP_OFFSET,
148           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
149   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEQNUM_OFFSET,
150       g_param_spec_int ("seqnum-offset", "Sequence number Offset",
151           "Offset to add to all outgoing seqnum (-1 = random)", -1, G_MAXINT,
152           DEFAULT_SEQNUM_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
153   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEQNUM,
154       g_param_spec_uint ("seqnum", "Sequence number",
155           "The RTP sequence number of the last processed packet",
156           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
157   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SSRC,
158       g_param_spec_uint ("ssrc", "SSRC",
159           "The SSRC of the packets (-1 == random)",
160           0, G_MAXUINT, DEFAULT_SSRC,
161           GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
162           G_PARAM_STATIC_STRINGS));
163
164   gstelement_class->request_new_pad =
165       GST_DEBUG_FUNCPTR (gst_rtp_mux_request_new_pad);
166   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_mux_release_pad);
167   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_mux_change_state);
168 }
169
170 static void
171 gst_rtp_mux_dispose (GObject * object)
172 {
173   GstRTPMux *rtp_mux = GST_RTP_MUX (object);
174   GList *item;
175
176   g_clear_object (&rtp_mux->last_pad);
177
178 restart:
179   for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
180     GstPad *pad = GST_PAD (item->data);
181     if (GST_PAD_IS_SINK (pad)) {
182       gst_element_release_request_pad (GST_ELEMENT (object), pad);
183       goto restart;
184     }
185   }
186
187   G_OBJECT_CLASS (gst_rtp_mux_parent_class)->dispose (object);
188 }
189
190 static gboolean
191 gst_rtp_mux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
192 {
193   GstRTPMux *rtp_mux = GST_RTP_MUX (parent);
194   GstRTPMuxClass *klass;
195   gboolean ret;
196
197   klass = GST_RTP_MUX_GET_CLASS (rtp_mux);
198
199   ret = klass->src_event (rtp_mux, event);
200
201   return ret;
202 }
203
204 static gboolean
205 gst_rtp_mux_src_event_real (GstRTPMux * rtp_mux, GstEvent * event)
206 {
207   GstIterator *iter;
208   gboolean result = FALSE;
209   gboolean done = FALSE;
210
211   switch (GST_EVENT_TYPE (event)) {
212     case GST_EVENT_CUSTOM_UPSTREAM:
213     {
214       const GstStructure *s = gst_event_get_structure (event);
215
216       if (gst_structure_has_name (s, "GstRTPCollision")) {
217         guint ssrc = 0;
218
219         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
220           ssrc = -1;
221
222         GST_DEBUG_OBJECT (rtp_mux, "collided ssrc: %" G_GUINT32_FORMAT, ssrc);
223
224         /* choose another ssrc for our stream */
225         GST_OBJECT_LOCK (rtp_mux);
226         if (ssrc == rtp_mux->current_ssrc) {
227           GstCaps *caps;
228           guint suggested_ssrc = 0;
229           guint32 new_ssrc;
230
231           if (gst_structure_get_uint (s, "suggested-ssrc", &suggested_ssrc))
232             rtp_mux->current_ssrc = suggested_ssrc;
233
234           while (ssrc == rtp_mux->current_ssrc)
235             rtp_mux->current_ssrc = g_random_int ();
236
237           new_ssrc = rtp_mux->current_ssrc;
238           GST_OBJECT_UNLOCK (rtp_mux);
239
240           caps = gst_pad_get_current_caps (rtp_mux->srcpad);
241           caps = gst_caps_make_writable (caps);
242           gst_caps_set_simple (caps, "ssrc", G_TYPE_UINT, new_ssrc, NULL);
243           gst_pad_set_caps (rtp_mux->srcpad, caps);
244           gst_caps_unref (caps);
245         } else {
246           GST_OBJECT_UNLOCK (rtp_mux);
247         }
248       }
249       break;
250     }
251     default:
252       break;
253   }
254
255
256   iter = gst_element_iterate_sink_pads (GST_ELEMENT (rtp_mux));
257
258   while (!done) {
259     GValue item = { 0, };
260
261     switch (gst_iterator_next (iter, &item)) {
262       case GST_ITERATOR_OK:
263         gst_event_ref (event);
264         result |= gst_pad_push_event (g_value_get_object (&item), event);
265         g_value_reset (&item);
266         break;
267       case GST_ITERATOR_RESYNC:
268         gst_iterator_resync (iter);
269         result = FALSE;
270         break;
271       case GST_ITERATOR_ERROR:
272         GST_WARNING_OBJECT (rtp_mux, "Error iterating sinkpads");
273       case GST_ITERATOR_DONE:
274         done = TRUE;
275         break;
276     }
277   }
278   gst_iterator_free (iter);
279   gst_event_unref (event);
280
281   return result;
282 }
283
284 static void
285 gst_rtp_mux_init (GstRTPMux * rtp_mux)
286 {
287   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtp_mux);
288
289   rtp_mux->srcpad =
290       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
291           "src"), "src");
292   gst_pad_set_event_function (rtp_mux->srcpad,
293       GST_DEBUG_FUNCPTR (gst_rtp_mux_src_event));
294   gst_pad_use_fixed_caps (rtp_mux->srcpad);
295   gst_element_add_pad (GST_ELEMENT (rtp_mux), rtp_mux->srcpad);
296
297   rtp_mux->ssrc = DEFAULT_SSRC;
298   rtp_mux->current_ssrc = DEFAULT_SSRC;
299   rtp_mux->ts_offset = DEFAULT_TIMESTAMP_OFFSET;
300   rtp_mux->seqnum_offset = DEFAULT_SEQNUM_OFFSET;
301
302   rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
303 }
304
305 static void
306 gst_rtp_mux_setup_sinkpad (GstRTPMux * rtp_mux, GstPad * sinkpad)
307 {
308   GstRTPMuxPadPrivate *padpriv = g_slice_new0 (GstRTPMuxPadPrivate);
309
310   /* setup some pad functions */
311   gst_pad_set_chain_function (sinkpad, GST_DEBUG_FUNCPTR (gst_rtp_mux_chain));
312   gst_pad_set_chain_list_function (sinkpad,
313       GST_DEBUG_FUNCPTR (gst_rtp_mux_chain_list));
314   gst_pad_set_event_function (sinkpad,
315       GST_DEBUG_FUNCPTR (gst_rtp_mux_sink_event));
316   gst_pad_set_query_function (sinkpad,
317       GST_DEBUG_FUNCPTR (gst_rtp_mux_sink_query));
318
319
320   gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED);
321
322   gst_pad_set_element_private (sinkpad, padpriv);
323
324   gst_pad_set_active (sinkpad, TRUE);
325   gst_element_add_pad (GST_ELEMENT (rtp_mux), sinkpad);
326 }
327
328 static GstPad *
329 gst_rtp_mux_request_new_pad (GstElement * element,
330     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
331 {
332   GstRTPMux *rtp_mux;
333   GstPad *newpad;
334
335   g_return_val_if_fail (templ != NULL, NULL);
336   g_return_val_if_fail (GST_IS_RTP_MUX (element), NULL);
337
338   rtp_mux = GST_RTP_MUX (element);
339
340   if (templ->direction != GST_PAD_SINK) {
341     GST_WARNING_OBJECT (rtp_mux, "request pad that is not a SINK pad");
342     return NULL;
343   }
344
345   newpad = gst_pad_new_from_template (templ, req_name);
346   if (newpad)
347     gst_rtp_mux_setup_sinkpad (rtp_mux, newpad);
348   else
349     GST_WARNING_OBJECT (rtp_mux, "failed to create request pad");
350
351   return newpad;
352 }
353
354 static void
355 gst_rtp_mux_release_pad (GstElement * element, GstPad * pad)
356 {
357   GstRTPMuxPadPrivate *padpriv;
358
359   GST_OBJECT_LOCK (element);
360   padpriv = gst_pad_get_element_private (pad);
361   gst_pad_set_element_private (pad, NULL);
362   GST_OBJECT_UNLOCK (element);
363
364   gst_element_remove_pad (element, pad);
365
366   if (padpriv) {
367     g_slice_free (GstRTPMuxPadPrivate, padpriv);
368   }
369 }
370
371 /* Put our own timestamp-offset on the buffer */
372 static void
373 gst_rtp_mux_readjust_rtp_timestamp_locked (GstRTPMux * rtp_mux,
374     GstRTPMuxPadPrivate * padpriv, GstRTPBuffer * rtpbuffer)
375 {
376   guint32 ts;
377   guint32 sink_ts_base = 0;
378
379   if (padpriv && padpriv->have_timestamp_offset)
380     sink_ts_base = padpriv->timestamp_offset;
381
382   ts = gst_rtp_buffer_get_timestamp (rtpbuffer) - sink_ts_base +
383       rtp_mux->ts_base;
384   GST_LOG_OBJECT (rtp_mux, "Re-adjusting RTP ts %u to %u",
385       gst_rtp_buffer_get_timestamp (rtpbuffer), ts);
386   gst_rtp_buffer_set_timestamp (rtpbuffer, ts);
387 }
388
389 static gboolean
390 process_buffer_locked (GstRTPMux * rtp_mux, GstRTPMuxPadPrivate * padpriv,
391     GstRTPBuffer * rtpbuffer)
392 {
393   GstRTPMuxClass *klass = GST_RTP_MUX_GET_CLASS (rtp_mux);
394
395   if (klass->accept_buffer_locked)
396     if (!klass->accept_buffer_locked (rtp_mux, padpriv, rtpbuffer))
397       return FALSE;
398
399   rtp_mux->seqnum++;
400   gst_rtp_buffer_set_seq (rtpbuffer, rtp_mux->seqnum);
401
402   gst_rtp_buffer_set_ssrc (rtpbuffer, rtp_mux->current_ssrc);
403   gst_rtp_mux_readjust_rtp_timestamp_locked (rtp_mux, padpriv, rtpbuffer);
404   GST_LOG_OBJECT (rtp_mux,
405       "Pushing packet size %" G_GSIZE_FORMAT ", seq=%d, ts=%u",
406       rtpbuffer->map[0].size, rtp_mux->seqnum,
407       gst_rtp_buffer_get_timestamp (rtpbuffer));
408
409   if (padpriv) {
410     if (padpriv->segment.format == GST_FORMAT_TIME)
411       GST_BUFFER_PTS (rtpbuffer->buffer) =
412           gst_segment_to_running_time (&padpriv->segment, GST_FORMAT_TIME,
413           GST_BUFFER_PTS (rtpbuffer->buffer));
414   }
415
416   return TRUE;
417 }
418
419 struct BufferListData
420 {
421   GstRTPMux *rtp_mux;
422   GstRTPMuxPadPrivate *padpriv;
423   gboolean drop;
424 };
425
426 static gboolean
427 process_list_item (GstBuffer ** buffer, guint idx, gpointer user_data)
428 {
429   struct BufferListData *bd = user_data;
430   GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
431
432   *buffer = gst_buffer_make_writable (*buffer);
433
434   gst_rtp_buffer_map (*buffer, GST_MAP_READWRITE, &rtpbuffer);
435
436   bd->drop = !process_buffer_locked (bd->rtp_mux, bd->padpriv, &rtpbuffer);
437
438   gst_rtp_buffer_unmap (&rtpbuffer);
439
440   if (bd->drop)
441     return FALSE;
442
443   if (GST_BUFFER_DURATION_IS_VALID (*buffer) &&
444       GST_BUFFER_PTS_IS_VALID (*buffer))
445     bd->rtp_mux->last_stop = GST_BUFFER_PTS (*buffer) +
446         GST_BUFFER_DURATION (*buffer);
447   else
448     bd->rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
449
450   return TRUE;
451 }
452
453 static GstFlowReturn
454 gst_rtp_mux_chain_list (GstPad * pad, GstObject * parent,
455     GstBufferList * bufferlist)
456 {
457   GstRTPMux *rtp_mux;
458   GstFlowReturn ret;
459   GstRTPMuxPadPrivate *padpriv;
460   struct BufferListData bd;
461
462   rtp_mux = GST_RTP_MUX (parent);
463
464   if (gst_pad_check_reconfigure (rtp_mux->srcpad)) {
465     GstCaps *current_caps = gst_pad_get_current_caps (pad);
466
467     if (!gst_rtp_mux_setcaps (pad, rtp_mux, current_caps)) {
468       ret = GST_FLOW_NOT_NEGOTIATED;
469       gst_buffer_list_unref (bufferlist);
470       goto out;
471     }
472     gst_caps_unref (current_caps);
473   }
474
475   GST_OBJECT_LOCK (rtp_mux);
476
477   padpriv = gst_pad_get_element_private (pad);
478   if (!padpriv) {
479     GST_OBJECT_UNLOCK (rtp_mux);
480     ret = GST_FLOW_NOT_LINKED;
481     gst_buffer_list_unref (bufferlist);
482     goto out;
483   }
484
485   bd.rtp_mux = rtp_mux;
486   bd.padpriv = padpriv;
487   bd.drop = FALSE;
488
489   bufferlist = gst_buffer_list_make_writable (bufferlist);
490   gst_buffer_list_foreach (bufferlist, process_list_item, &bd);
491
492   GST_OBJECT_UNLOCK (rtp_mux);
493
494   if (bd.drop) {
495     gst_buffer_list_unref (bufferlist);
496     ret = GST_FLOW_OK;
497   } else {
498     ret = gst_pad_push_list (rtp_mux->srcpad, bufferlist);
499   }
500
501 out:
502
503   return ret;
504 }
505
506 static gboolean
507 resend_events (GstPad * pad, GstEvent ** event, gpointer user_data)
508 {
509   GstRTPMux *rtp_mux = user_data;
510
511   if (GST_EVENT_TYPE (*event) == GST_EVENT_CAPS) {
512     GstCaps *caps;
513
514     gst_event_parse_caps (*event, &caps);
515     gst_rtp_mux_setcaps (pad, rtp_mux, caps);
516   } else {
517     gst_pad_push_event (rtp_mux->srcpad, gst_event_ref (*event));
518   }
519
520   return TRUE;
521 }
522
523 static GstFlowReturn
524 gst_rtp_mux_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
525 {
526   GstRTPMux *rtp_mux;
527   GstFlowReturn ret;
528   GstRTPMuxPadPrivate *padpriv;
529   gboolean drop;
530   gboolean changed = FALSE;
531   GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
532
533   rtp_mux = GST_RTP_MUX (parent);
534
535   if (gst_pad_check_reconfigure (rtp_mux->srcpad)) {
536     GstCaps *current_caps = gst_pad_get_current_caps (pad);
537
538     if (!gst_rtp_mux_setcaps (pad, rtp_mux, current_caps)) {
539       ret = GST_FLOW_NOT_NEGOTIATED;
540       gst_buffer_unref (buffer);
541       goto out;
542     }
543     gst_caps_unref (current_caps);
544   }
545
546   GST_OBJECT_LOCK (rtp_mux);
547   padpriv = gst_pad_get_element_private (pad);
548
549   if (!padpriv) {
550     GST_OBJECT_UNLOCK (rtp_mux);
551     gst_buffer_unref (buffer);
552     return GST_FLOW_NOT_LINKED;
553   }
554
555   buffer = gst_buffer_make_writable (buffer);
556
557   if (!gst_rtp_buffer_map (buffer, GST_MAP_READWRITE, &rtpbuffer)) {
558     GST_OBJECT_UNLOCK (rtp_mux);
559     gst_buffer_unref (buffer);
560     GST_ERROR_OBJECT (rtp_mux, "Invalid RTP buffer");
561     return GST_FLOW_ERROR;
562   }
563
564   drop = !process_buffer_locked (rtp_mux, padpriv, &rtpbuffer);
565
566   gst_rtp_buffer_unmap (&rtpbuffer);
567
568   if (!drop) {
569     if (pad != rtp_mux->last_pad) {
570       changed = TRUE;
571       g_clear_object (&rtp_mux->last_pad);
572       rtp_mux->last_pad = g_object_ref (pad);
573     }
574
575     if (GST_BUFFER_DURATION_IS_VALID (buffer) &&
576         GST_BUFFER_PTS_IS_VALID (buffer))
577       rtp_mux->last_stop = GST_BUFFER_PTS (buffer) +
578           GST_BUFFER_DURATION (buffer);
579     else
580       rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
581   }
582
583   GST_OBJECT_UNLOCK (rtp_mux);
584
585   if (changed)
586     gst_pad_sticky_events_foreach (pad, resend_events, rtp_mux);
587
588   if (drop) {
589     gst_buffer_unref (buffer);
590     ret = GST_FLOW_OK;
591   } else {
592     ret = gst_pad_push (rtp_mux->srcpad, buffer);
593   }
594
595 out:
596   return ret;
597 }
598
599 static gboolean
600 gst_rtp_mux_setcaps (GstPad * pad, GstRTPMux * rtp_mux, GstCaps * caps)
601 {
602   GstStructure *structure;
603   gboolean ret = FALSE;
604   GstRTPMuxPadPrivate *padpriv;
605   GstCaps *peercaps;
606
607   if (!gst_caps_is_fixed (caps))
608     return FALSE;
609
610   peercaps = gst_pad_peer_query_caps (rtp_mux->srcpad, NULL);
611   if (peercaps) {
612     GstCaps *tcaps, *othercaps;;
613     tcaps = gst_pad_get_pad_template_caps (pad);
614     othercaps = gst_caps_intersect_full (peercaps, tcaps,
615         GST_CAPS_INTERSECT_FIRST);
616
617     if (gst_caps_get_size (othercaps) > 0) {
618       structure = gst_caps_get_structure (othercaps, 0);
619       GST_OBJECT_LOCK (rtp_mux);
620       if (gst_structure_get_uint (structure, "ssrc", &rtp_mux->current_ssrc)) {
621         GST_DEBUG_OBJECT (pad, "Use downstream ssrc: %x",
622             rtp_mux->current_ssrc);
623         rtp_mux->have_ssrc = TRUE;
624       }
625       GST_OBJECT_UNLOCK (rtp_mux);
626     }
627
628     gst_caps_unref (othercaps);
629
630     gst_caps_unref (peercaps);
631     gst_caps_unref (tcaps);
632   }
633
634   structure = gst_caps_get_structure (caps, 0);
635
636   if (!structure)
637     return FALSE;
638
639   GST_OBJECT_LOCK (rtp_mux);
640   padpriv = gst_pad_get_element_private (pad);
641   if (padpriv &&
642       gst_structure_get_uint (structure, "timestamp-offset",
643           &padpriv->timestamp_offset)) {
644     padpriv->have_timestamp_offset = TRUE;
645   }
646
647   caps = gst_caps_copy (caps);
648
649   /* if we don't have a specified ssrc, first try to take one from the caps,
650      and if that fails, generate one */
651   if (!rtp_mux->have_ssrc) {
652     if (rtp_mux->ssrc == DEFAULT_SSRC) {
653       if (!gst_structure_get_uint (structure, "ssrc", &rtp_mux->current_ssrc))
654         rtp_mux->current_ssrc = g_random_int ();
655       rtp_mux->have_ssrc = TRUE;
656     }
657   }
658
659   gst_caps_set_simple (caps,
660       "timestamp-offset", G_TYPE_UINT, rtp_mux->ts_base,
661       "seqnum-offset", G_TYPE_UINT, rtp_mux->seqnum_base,
662       "ssrc", G_TYPE_UINT, rtp_mux->current_ssrc, NULL);
663
664   GST_OBJECT_UNLOCK (rtp_mux);
665
666   if (rtp_mux->send_stream_start) {
667     gchar s_id[32];
668
669     /* stream-start (FIXME: create id based on input ids) */
670     g_snprintf (s_id, sizeof (s_id), "interleave-%08x", g_random_int ());
671     gst_pad_push_event (rtp_mux->srcpad, gst_event_new_stream_start (s_id));
672
673     rtp_mux->send_stream_start = FALSE;
674   }
675
676   GST_DEBUG_OBJECT (rtp_mux,
677       "setting caps %" GST_PTR_FORMAT " on src pad..", caps);
678   ret = gst_pad_set_caps (rtp_mux->srcpad, caps);
679
680
681   gst_caps_unref (caps);
682
683   return ret;
684 }
685
686 static void
687 clear_caps (GstCaps * caps, gboolean only_clock_rate)
688 {
689   gint i, j;
690
691   /* Lets only match on the clock-rate */
692   for (i = 0; i < gst_caps_get_size (caps); i++) {
693     GstStructure *s = gst_caps_get_structure (caps, i);
694
695     for (j = 0; j < gst_structure_n_fields (s); j++) {
696       const gchar *name = gst_structure_nth_field_name (s, j);
697
698       if (strcmp (name, "clock-rate") && (only_clock_rate ||
699               (strcmp (name, "ssrc")))) {
700         gst_structure_remove_field (s, name);
701         j--;
702       }
703     }
704   }
705 }
706
707 static gboolean
708 same_clock_rate_fold (const GValue * item, GValue * ret, gpointer user_data)
709 {
710   GstPad *mypad = user_data;
711   GstPad *pad = g_value_get_object (item);
712   GstCaps *peercaps;
713   GstCaps *accumcaps;
714
715   if (pad == mypad)
716     return TRUE;
717
718   accumcaps = g_value_get_boxed (ret);
719   peercaps = gst_pad_peer_query_caps (pad, accumcaps);
720   if (!peercaps) {
721     g_warning ("no peercaps");
722     return TRUE;
723   }
724   peercaps = gst_caps_make_writable (peercaps);
725   clear_caps (peercaps, TRUE);
726
727   g_value_take_boxed (ret, peercaps);
728
729   return !gst_caps_is_empty (peercaps);
730 }
731
732 static GstCaps *
733 gst_rtp_mux_getcaps (GstPad * pad, GstRTPMux * mux, GstCaps * filter)
734 {
735   GstCaps *caps = NULL;
736   GstIterator *iter = NULL;
737   GValue v = { 0 };
738   GstIteratorResult res;
739   GstCaps *peercaps;
740   GstCaps *othercaps;
741   GstCaps *tcaps;
742
743   peercaps = gst_pad_peer_query_caps (mux->srcpad, NULL);
744
745   if (peercaps) {
746     tcaps = gst_pad_get_pad_template_caps (pad);
747     othercaps = gst_caps_intersect_full (peercaps, tcaps,
748         GST_CAPS_INTERSECT_FIRST);
749     gst_caps_unref (peercaps);
750   } else {
751     tcaps = gst_pad_get_pad_template_caps (mux->srcpad);
752     if (filter)
753       othercaps = gst_caps_intersect_full (filter, tcaps,
754           GST_CAPS_INTERSECT_FIRST);
755     else
756       othercaps = gst_caps_copy (tcaps);
757   }
758   gst_caps_unref (tcaps);
759
760   GST_LOG_OBJECT (pad, "Intersected srcpad-peercaps and template caps: %"
761       GST_PTR_FORMAT, othercaps);
762
763   clear_caps (othercaps, TRUE);
764
765   g_value_init (&v, GST_TYPE_CAPS);
766
767   iter = gst_element_iterate_sink_pads (GST_ELEMENT (mux));
768   do {
769     gst_value_set_caps (&v, othercaps);
770     res = gst_iterator_fold (iter, same_clock_rate_fold, &v, pad);
771     gst_iterator_resync (iter);
772   } while (res == GST_ITERATOR_RESYNC);
773   gst_iterator_free (iter);
774
775   caps = gst_caps_intersect ((GstCaps *) gst_value_get_caps (&v), othercaps);
776
777   g_value_unset (&v);
778   gst_caps_unref (othercaps);
779
780   if (res == GST_ITERATOR_ERROR) {
781     gst_caps_unref (caps);
782     caps = gst_caps_new_empty ();
783   }
784
785
786   return caps;
787 }
788
789 static gboolean
790 gst_rtp_mux_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
791 {
792   GstRTPMux *mux = GST_RTP_MUX (parent);
793   gboolean res = FALSE;
794
795   switch (GST_QUERY_TYPE (query)) {
796     case GST_QUERY_CAPS:
797     {
798       GstCaps *filter, *caps;
799
800       gst_query_parse_caps (query, &filter);
801       GST_LOG_OBJECT (pad, "Received caps-query with filter-caps: %"
802           GST_PTR_FORMAT, filter);
803       caps = gst_rtp_mux_getcaps (pad, mux, filter);
804       gst_query_set_caps_result (query, caps);
805       GST_LOG_OBJECT (mux, "Answering caps-query with caps: %"
806           GST_PTR_FORMAT, caps);
807       gst_caps_unref (caps);
808       res = TRUE;
809       break;
810     }
811     default:
812       res = gst_pad_query_default (pad, parent, query);
813       break;
814   }
815
816   return res;
817 }
818
819 static void
820 gst_rtp_mux_get_property (GObject * object,
821     guint prop_id, GValue * value, GParamSpec * pspec)
822 {
823   GstRTPMux *rtp_mux;
824
825   rtp_mux = GST_RTP_MUX (object);
826
827   GST_OBJECT_LOCK (rtp_mux);
828   switch (prop_id) {
829     case PROP_TIMESTAMP_OFFSET:
830       g_value_set_int (value, rtp_mux->ts_offset);
831       break;
832     case PROP_SEQNUM_OFFSET:
833       g_value_set_int (value, rtp_mux->seqnum_offset);
834       break;
835     case PROP_SEQNUM:
836       g_value_set_uint (value, rtp_mux->seqnum);
837       break;
838     case PROP_SSRC:
839       g_value_set_uint (value, rtp_mux->ssrc);
840       break;
841     default:
842       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
843       break;
844   }
845   GST_OBJECT_UNLOCK (rtp_mux);
846 }
847
848 static void
849 gst_rtp_mux_set_property (GObject * object,
850     guint prop_id, const GValue * value, GParamSpec * pspec)
851 {
852   GstRTPMux *rtp_mux;
853
854   rtp_mux = GST_RTP_MUX (object);
855
856   switch (prop_id) {
857     case PROP_TIMESTAMP_OFFSET:
858       rtp_mux->ts_offset = g_value_get_int (value);
859       break;
860     case PROP_SEQNUM_OFFSET:
861       rtp_mux->seqnum_offset = g_value_get_int (value);
862       break;
863     case PROP_SSRC:
864       GST_OBJECT_LOCK (rtp_mux);
865       rtp_mux->ssrc = g_value_get_uint (value);
866       rtp_mux->current_ssrc = rtp_mux->ssrc;
867       rtp_mux->have_ssrc = TRUE;
868       GST_OBJECT_UNLOCK (rtp_mux);
869       break;
870     default:
871       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
872       break;
873   }
874 }
875
876 static gboolean
877 gst_rtp_mux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
878 {
879   GstRTPMux *mux = GST_RTP_MUX (parent);
880   gboolean is_pad;
881   gboolean ret;
882
883   switch (GST_EVENT_TYPE (event)) {
884     case GST_EVENT_CAPS:
885     {
886       GstCaps *caps;
887
888       gst_event_parse_caps (event, &caps);
889       GST_LOG_OBJECT (pad, "Received caps-event with caps: %"
890           GST_PTR_FORMAT, caps);
891       ret = gst_rtp_mux_setcaps (pad, mux, caps);
892       gst_event_unref (event);
893       return ret;
894     }
895     case GST_EVENT_FLUSH_STOP:
896     {
897       GST_OBJECT_LOCK (mux);
898       mux->last_stop = GST_CLOCK_TIME_NONE;
899       GST_OBJECT_UNLOCK (mux);
900       break;
901     }
902     case GST_EVENT_SEGMENT:
903     {
904       GstRTPMuxPadPrivate *padpriv;
905
906       GST_OBJECT_LOCK (mux);
907       padpriv = gst_pad_get_element_private (pad);
908
909       if (padpriv) {
910         gst_event_copy_segment (event, &padpriv->segment);
911       }
912       GST_OBJECT_UNLOCK (mux);
913       break;
914     }
915     default:
916       break;
917   }
918
919   GST_OBJECT_LOCK (mux);
920   is_pad = (pad == mux->last_pad);
921   GST_OBJECT_UNLOCK (mux);
922
923   if (is_pad) {
924     return gst_pad_push_event (mux->srcpad, event);
925   } else {
926     gst_event_unref (event);
927     return TRUE;
928   }
929 }
930
931 static void
932 gst_rtp_mux_ready_to_paused (GstRTPMux * rtp_mux)
933 {
934
935   GST_OBJECT_LOCK (rtp_mux);
936
937   g_clear_object (&rtp_mux->last_pad);
938   rtp_mux->send_stream_start = TRUE;
939
940   if (rtp_mux->seqnum_offset == -1)
941     rtp_mux->seqnum_base = g_random_int_range (0, G_MAXUINT16);
942   else
943     rtp_mux->seqnum_base = rtp_mux->seqnum_offset;
944   rtp_mux->seqnum = rtp_mux->seqnum_base;
945
946   if (rtp_mux->ts_offset == -1)
947     rtp_mux->ts_base = g_random_int ();
948   else
949     rtp_mux->ts_base = rtp_mux->ts_offset;
950
951   rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
952
953   if (rtp_mux->ssrc == DEFAULT_SSRC) {
954     rtp_mux->have_ssrc = FALSE;
955   } else {
956     rtp_mux->current_ssrc = rtp_mux->ssrc;
957     rtp_mux->have_ssrc = TRUE;
958   }
959
960   GST_DEBUG_OBJECT (rtp_mux, "set timestamp-offset to %u", rtp_mux->ts_base);
961
962   GST_OBJECT_UNLOCK (rtp_mux);
963 }
964
965 static GstStateChangeReturn
966 gst_rtp_mux_change_state (GstElement * element, GstStateChange transition)
967 {
968   GstRTPMux *rtp_mux;
969   GstStateChangeReturn ret;
970
971   rtp_mux = GST_RTP_MUX (element);
972
973   switch (transition) {
974     case GST_STATE_CHANGE_READY_TO_PAUSED:
975       gst_rtp_mux_ready_to_paused (rtp_mux);
976       break;
977     default:
978       break;
979   }
980
981   ret = GST_ELEMENT_CLASS (gst_rtp_mux_parent_class)->change_state (element,
982       transition);
983
984   switch (transition) {
985     case GST_STATE_CHANGE_PAUSED_TO_READY:
986       g_clear_object (&rtp_mux->last_pad);
987       break;
988     default:
989       break;
990   }
991
992   return ret;
993 }
994
995 gboolean
996 gst_rtp_mux_plugin_init (GstPlugin * plugin)
997 {
998   GST_DEBUG_CATEGORY_INIT (gst_rtp_mux_debug, "rtpmux", 0, "rtp muxer");
999
1000   return gst_element_register (plugin, "rtpmux", GST_RANK_NONE,
1001       GST_TYPE_RTP_MUX);
1002 }