rtpmux: As 0xFFFFFFFF is a valid ssrc, check if it has been set
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpmux.c
1 /* RTP muxer element for GStreamer
2  *
3  * gstrtpmux.c:
4  *
5  * Copyright (C) <2007-2010> Nokia Corporation.
6  *   Contact: Zeeshan Ali <zeeshan.ali@nokia.com>
7  * Copyright (C) <2007-2010> Collabora Ltd
8  *   Contact: Olivier Crete <olivier.crete@collabora.co.uk>
9  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
10  *               2000,2005 Wim Taymans <wim@fluendo.com>
11  *
12  * This library is free software; you can redistribute it and/or
13  * modify it under the terms of the GNU Library General Public
14  * License as published by the Free Software Foundation; either
15  * version 2 of the License, or (at your option) any later version.
16  *
17  * This library is distributed in the hope that it will be useful,
18  * but WITHOUT ANY WARRANTY; without even the implied warranty of
19  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20  * Library General Public License for more details.
21  *
22  * You should have received a copy of the GNU Library General Public
23  * License along with this library; if not, write to the
24  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
25  * Boston, MA 02110-1301, USA.
26  */
27
28 /**
29  * SECTION:element-rtpmux
30  * @see_also: rtpdtmfmux
31  *
32  * The rtp muxer takes multiple RTP streams having the same clock-rate and
33  * muxes into a single stream with a single SSRC.
34  *
35  * <refsect2>
36  * <title>Example pipelines</title>
37  * |[
38  * gst-launch-1.0 rtpmux name=mux ! udpsink host=127.0.0.1 port=8888        \
39  *              alsasrc ! alawenc ! rtppcmapay !                        \
40  *              application/x-rtp, payload=8, rate=8000 ! mux.sink_0    \
41  *              audiotestsrc is-live=1 !                                \
42  *              mulawenc ! rtppcmupay !                                 \
43  *              application/x-rtp, payload=0, rate=8000 ! mux.sink_1
44  * ]|
45  * In this example, an audio stream is captured from ALSA and another is
46  * generated, both are encoded into different payload types and muxed together
47  * so they can be sent on the same port.
48  * </refsect2>
49  */
50
51 #ifdef HAVE_CONFIG_H
52 #include "config.h"
53 #endif
54
55 #include <gst/gst.h>
56 #include <gst/rtp/gstrtpbuffer.h>
57 #include <string.h>
58
59 #include "gstrtpmux.h"
60
61 GST_DEBUG_CATEGORY_STATIC (gst_rtp_mux_debug);
62 #define GST_CAT_DEFAULT gst_rtp_mux_debug
63
64 enum
65 {
66   PROP_0,
67   PROP_TIMESTAMP_OFFSET,
68   PROP_SEQNUM_OFFSET,
69   PROP_SEQNUM,
70   PROP_SSRC
71 };
72
73 #define DEFAULT_TIMESTAMP_OFFSET -1
74 #define DEFAULT_SEQNUM_OFFSET    -1
75 #define DEFAULT_SSRC             -1
76
77 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
78     GST_PAD_SRC,
79     GST_PAD_ALWAYS,
80     GST_STATIC_CAPS ("application/x-rtp")
81     );
82
83 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink_%u",
84     GST_PAD_SINK,
85     GST_PAD_REQUEST,
86     GST_STATIC_CAPS ("application/x-rtp")
87     );
88
89 static GstPad *gst_rtp_mux_request_new_pad (GstElement * element,
90     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
91 static void gst_rtp_mux_release_pad (GstElement * element, GstPad * pad);
92 static GstFlowReturn gst_rtp_mux_chain (GstPad * pad, GstObject * parent,
93     GstBuffer * buffer);
94 static GstFlowReturn gst_rtp_mux_chain_list (GstPad * pad, GstObject * parent,
95     GstBufferList * bufferlist);
96 static gboolean gst_rtp_mux_setcaps (GstPad * pad, GstRTPMux * rtp_mux,
97     GstCaps * caps);
98 static gboolean gst_rtp_mux_sink_event (GstPad * pad, GstObject * parent,
99     GstEvent * event);
100 static gboolean gst_rtp_mux_sink_query (GstPad * pad, GstObject * parent,
101     GstQuery * query);
102
103 static GstStateChangeReturn gst_rtp_mux_change_state (GstElement *
104     element, GstStateChange transition);
105
106 static void gst_rtp_mux_set_property (GObject * object, guint prop_id,
107     const GValue * value, GParamSpec * pspec);
108 static void gst_rtp_mux_get_property (GObject * object, guint prop_id,
109     GValue * value, GParamSpec * pspec);
110 static void gst_rtp_mux_dispose (GObject * object);
111
112 static gboolean gst_rtp_mux_src_event_real (GstRTPMux * rtp_mux,
113     GstEvent * event);
114
115 G_DEFINE_TYPE (GstRTPMux, gst_rtp_mux, GST_TYPE_ELEMENT);
116
117
118 static void
119 gst_rtp_mux_class_init (GstRTPMuxClass * klass)
120 {
121   GObjectClass *gobject_class;
122   GstElementClass *gstelement_class;
123
124   gobject_class = (GObjectClass *) klass;
125   gstelement_class = (GstElementClass *) klass;
126
127
128   gst_element_class_add_pad_template (gstelement_class,
129       gst_static_pad_template_get (&src_factory));
130   gst_element_class_add_pad_template (gstelement_class,
131       gst_static_pad_template_get (&sink_factory));
132
133   gst_element_class_set_static_metadata (gstelement_class, "RTP muxer",
134       "Codec/Muxer",
135       "multiplex N rtp streams into one", "Zeeshan Ali <first.last@nokia.com>");
136
137   gobject_class->get_property = gst_rtp_mux_get_property;
138   gobject_class->set_property = gst_rtp_mux_set_property;
139   gobject_class->dispose = gst_rtp_mux_dispose;
140
141   klass->src_event = gst_rtp_mux_src_event_real;
142
143   g_object_class_install_property (G_OBJECT_CLASS (klass),
144       PROP_TIMESTAMP_OFFSET, g_param_spec_int ("timestamp-offset",
145           "Timestamp Offset",
146           "Offset to add to all outgoing timestamps (-1 = random)", -1,
147           G_MAXINT, DEFAULT_TIMESTAMP_OFFSET,
148           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
149   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEQNUM_OFFSET,
150       g_param_spec_int ("seqnum-offset", "Sequence number Offset",
151           "Offset to add to all outgoing seqnum (-1 = random)", -1, G_MAXINT,
152           DEFAULT_SEQNUM_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
153   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEQNUM,
154       g_param_spec_uint ("seqnum", "Sequence number",
155           "The RTP sequence number of the last processed packet",
156           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
157   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SSRC,
158       g_param_spec_uint ("ssrc", "SSRC",
159           "The SSRC of the packets (default == random)",
160           0, G_MAXUINT, DEFAULT_SSRC,
161           GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
162           G_PARAM_STATIC_STRINGS));
163
164   gstelement_class->request_new_pad =
165       GST_DEBUG_FUNCPTR (gst_rtp_mux_request_new_pad);
166   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_mux_release_pad);
167   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_mux_change_state);
168 }
169
170 static void
171 gst_rtp_mux_dispose (GObject * object)
172 {
173   GstRTPMux *rtp_mux = GST_RTP_MUX (object);
174   GList *item;
175
176   g_clear_object (&rtp_mux->last_pad);
177
178 restart:
179   for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
180     GstPad *pad = GST_PAD (item->data);
181     if (GST_PAD_IS_SINK (pad)) {
182       gst_element_release_request_pad (GST_ELEMENT (object), pad);
183       goto restart;
184     }
185   }
186
187   G_OBJECT_CLASS (gst_rtp_mux_parent_class)->dispose (object);
188 }
189
190 static gboolean
191 gst_rtp_mux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
192 {
193   GstRTPMux *rtp_mux = GST_RTP_MUX (parent);
194   GstRTPMuxClass *klass;
195   gboolean ret;
196
197   klass = GST_RTP_MUX_GET_CLASS (rtp_mux);
198
199   ret = klass->src_event (rtp_mux, event);
200
201   return ret;
202 }
203
204 static gboolean
205 gst_rtp_mux_src_event_real (GstRTPMux * rtp_mux, GstEvent * event)
206 {
207   GstIterator *iter;
208   gboolean result = FALSE;
209   gboolean done = FALSE;
210
211   switch (GST_EVENT_TYPE (event)) {
212     case GST_EVENT_CUSTOM_UPSTREAM:
213     {
214       const GstStructure *s = gst_event_get_structure (event);
215
216       if (gst_structure_has_name (s, "GstRTPCollision")) {
217         guint ssrc = 0;
218
219         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
220           ssrc = -1;
221
222         GST_DEBUG_OBJECT (rtp_mux, "collided ssrc: %" G_GUINT32_FORMAT, ssrc);
223
224         /* choose another ssrc for our stream */
225         GST_OBJECT_LOCK (rtp_mux);
226         if (ssrc == rtp_mux->current_ssrc) {
227           GstCaps *caps;
228           guint suggested_ssrc = 0;
229           guint32 new_ssrc;
230
231           if (gst_structure_get_uint (s, "suggested-ssrc", &suggested_ssrc))
232             rtp_mux->current_ssrc = suggested_ssrc;
233
234           while (ssrc == rtp_mux->current_ssrc)
235             rtp_mux->current_ssrc = g_random_int ();
236
237           new_ssrc = rtp_mux->current_ssrc;
238           GST_OBJECT_UNLOCK (rtp_mux);
239
240           caps = gst_pad_get_current_caps (rtp_mux->srcpad);
241           caps = gst_caps_make_writable (caps);
242           gst_caps_set_simple (caps, "ssrc", G_TYPE_UINT, new_ssrc, NULL);
243           gst_pad_set_caps (rtp_mux->srcpad, caps);
244           gst_caps_unref (caps);
245         } else {
246           GST_OBJECT_UNLOCK (rtp_mux);
247         }
248       }
249       break;
250     }
251     default:
252       break;
253   }
254
255
256   iter = gst_element_iterate_sink_pads (GST_ELEMENT (rtp_mux));
257
258   while (!done) {
259     GValue item = { 0, };
260
261     switch (gst_iterator_next (iter, &item)) {
262       case GST_ITERATOR_OK:
263         gst_event_ref (event);
264         result |= gst_pad_push_event (g_value_get_object (&item), event);
265         g_value_reset (&item);
266         break;
267       case GST_ITERATOR_RESYNC:
268         gst_iterator_resync (iter);
269         result = FALSE;
270         break;
271       case GST_ITERATOR_ERROR:
272         GST_WARNING_OBJECT (rtp_mux, "Error iterating sinkpads");
273       case GST_ITERATOR_DONE:
274         done = TRUE;
275         break;
276     }
277   }
278   gst_iterator_free (iter);
279   gst_event_unref (event);
280
281   return result;
282 }
283
284 static void
285 gst_rtp_mux_init (GstRTPMux * rtp_mux)
286 {
287   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtp_mux);
288
289   rtp_mux->srcpad =
290       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
291           "src"), "src");
292   gst_pad_set_event_function (rtp_mux->srcpad,
293       GST_DEBUG_FUNCPTR (gst_rtp_mux_src_event));
294   gst_pad_use_fixed_caps (rtp_mux->srcpad);
295   gst_element_add_pad (GST_ELEMENT (rtp_mux), rtp_mux->srcpad);
296
297   rtp_mux->ssrc = DEFAULT_SSRC;
298   rtp_mux->current_ssrc = DEFAULT_SSRC;
299   rtp_mux->ssrc_random = TRUE;
300   rtp_mux->ts_offset = DEFAULT_TIMESTAMP_OFFSET;
301   rtp_mux->seqnum_offset = DEFAULT_SEQNUM_OFFSET;
302
303   rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
304 }
305
306 static void
307 gst_rtp_mux_setup_sinkpad (GstRTPMux * rtp_mux, GstPad * sinkpad)
308 {
309   GstRTPMuxPadPrivate *padpriv = g_slice_new0 (GstRTPMuxPadPrivate);
310
311   /* setup some pad functions */
312   gst_pad_set_chain_function (sinkpad, GST_DEBUG_FUNCPTR (gst_rtp_mux_chain));
313   gst_pad_set_chain_list_function (sinkpad,
314       GST_DEBUG_FUNCPTR (gst_rtp_mux_chain_list));
315   gst_pad_set_event_function (sinkpad,
316       GST_DEBUG_FUNCPTR (gst_rtp_mux_sink_event));
317   gst_pad_set_query_function (sinkpad,
318       GST_DEBUG_FUNCPTR (gst_rtp_mux_sink_query));
319
320
321   gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED);
322
323   gst_pad_set_element_private (sinkpad, padpriv);
324
325   gst_pad_set_active (sinkpad, TRUE);
326   gst_element_add_pad (GST_ELEMENT (rtp_mux), sinkpad);
327 }
328
329 static GstPad *
330 gst_rtp_mux_request_new_pad (GstElement * element,
331     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
332 {
333   GstRTPMux *rtp_mux;
334   GstPad *newpad;
335
336   g_return_val_if_fail (templ != NULL, NULL);
337   g_return_val_if_fail (GST_IS_RTP_MUX (element), NULL);
338
339   rtp_mux = GST_RTP_MUX (element);
340
341   if (templ->direction != GST_PAD_SINK) {
342     GST_WARNING_OBJECT (rtp_mux, "request pad that is not a SINK pad");
343     return NULL;
344   }
345
346   newpad = gst_pad_new_from_template (templ, req_name);
347   if (newpad)
348     gst_rtp_mux_setup_sinkpad (rtp_mux, newpad);
349   else
350     GST_WARNING_OBJECT (rtp_mux, "failed to create request pad");
351
352   return newpad;
353 }
354
355 static void
356 gst_rtp_mux_release_pad (GstElement * element, GstPad * pad)
357 {
358   GstRTPMuxPadPrivate *padpriv;
359
360   GST_OBJECT_LOCK (element);
361   padpriv = gst_pad_get_element_private (pad);
362   gst_pad_set_element_private (pad, NULL);
363   GST_OBJECT_UNLOCK (element);
364
365   gst_element_remove_pad (element, pad);
366
367   if (padpriv) {
368     g_slice_free (GstRTPMuxPadPrivate, padpriv);
369   }
370 }
371
372 /* Put our own timestamp-offset on the buffer */
373 static void
374 gst_rtp_mux_readjust_rtp_timestamp_locked (GstRTPMux * rtp_mux,
375     GstRTPMuxPadPrivate * padpriv, GstRTPBuffer * rtpbuffer)
376 {
377   guint32 ts;
378   guint32 sink_ts_base = 0;
379
380   if (padpriv && padpriv->have_timestamp_offset)
381     sink_ts_base = padpriv->timestamp_offset;
382
383   ts = gst_rtp_buffer_get_timestamp (rtpbuffer) - sink_ts_base +
384       rtp_mux->ts_base;
385   GST_LOG_OBJECT (rtp_mux, "Re-adjusting RTP ts %u to %u",
386       gst_rtp_buffer_get_timestamp (rtpbuffer), ts);
387   gst_rtp_buffer_set_timestamp (rtpbuffer, ts);
388 }
389
390 static gboolean
391 process_buffer_locked (GstRTPMux * rtp_mux, GstRTPMuxPadPrivate * padpriv,
392     GstRTPBuffer * rtpbuffer)
393 {
394   GstRTPMuxClass *klass = GST_RTP_MUX_GET_CLASS (rtp_mux);
395
396   if (klass->accept_buffer_locked)
397     if (!klass->accept_buffer_locked (rtp_mux, padpriv, rtpbuffer))
398       return FALSE;
399
400   rtp_mux->seqnum++;
401   gst_rtp_buffer_set_seq (rtpbuffer, rtp_mux->seqnum);
402
403   gst_rtp_buffer_set_ssrc (rtpbuffer, rtp_mux->current_ssrc);
404   gst_rtp_mux_readjust_rtp_timestamp_locked (rtp_mux, padpriv, rtpbuffer);
405   GST_LOG_OBJECT (rtp_mux,
406       "Pushing packet size %" G_GSIZE_FORMAT ", seq=%d, ts=%u",
407       rtpbuffer->map[0].size, rtp_mux->seqnum,
408       gst_rtp_buffer_get_timestamp (rtpbuffer));
409
410   if (padpriv) {
411     if (padpriv->segment.format == GST_FORMAT_TIME)
412       GST_BUFFER_PTS (rtpbuffer->buffer) =
413           gst_segment_to_running_time (&padpriv->segment, GST_FORMAT_TIME,
414           GST_BUFFER_PTS (rtpbuffer->buffer));
415   }
416
417   return TRUE;
418 }
419
420 struct BufferListData
421 {
422   GstRTPMux *rtp_mux;
423   GstRTPMuxPadPrivate *padpriv;
424   gboolean drop;
425 };
426
427 static gboolean
428 process_list_item (GstBuffer ** buffer, guint idx, gpointer user_data)
429 {
430   struct BufferListData *bd = user_data;
431   GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
432
433   *buffer = gst_buffer_make_writable (*buffer);
434
435   gst_rtp_buffer_map (*buffer, GST_MAP_READWRITE, &rtpbuffer);
436
437   bd->drop = !process_buffer_locked (bd->rtp_mux, bd->padpriv, &rtpbuffer);
438
439   gst_rtp_buffer_unmap (&rtpbuffer);
440
441   if (bd->drop)
442     return FALSE;
443
444   if (GST_BUFFER_DURATION_IS_VALID (*buffer) &&
445       GST_BUFFER_PTS_IS_VALID (*buffer))
446     bd->rtp_mux->last_stop = GST_BUFFER_PTS (*buffer) +
447         GST_BUFFER_DURATION (*buffer);
448   else
449     bd->rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
450
451   return TRUE;
452 }
453
454 static GstFlowReturn
455 gst_rtp_mux_chain_list (GstPad * pad, GstObject * parent,
456     GstBufferList * bufferlist)
457 {
458   GstRTPMux *rtp_mux;
459   GstFlowReturn ret;
460   GstRTPMuxPadPrivate *padpriv;
461   struct BufferListData bd;
462
463   rtp_mux = GST_RTP_MUX (parent);
464
465   if (gst_pad_check_reconfigure (rtp_mux->srcpad)) {
466     GstCaps *current_caps = gst_pad_get_current_caps (pad);
467
468     if (!gst_rtp_mux_setcaps (pad, rtp_mux, current_caps)) {
469       ret = GST_FLOW_NOT_NEGOTIATED;
470       gst_buffer_list_unref (bufferlist);
471       goto out;
472     }
473     gst_caps_unref (current_caps);
474   }
475
476   GST_OBJECT_LOCK (rtp_mux);
477
478   padpriv = gst_pad_get_element_private (pad);
479   if (!padpriv) {
480     GST_OBJECT_UNLOCK (rtp_mux);
481     ret = GST_FLOW_NOT_LINKED;
482     gst_buffer_list_unref (bufferlist);
483     goto out;
484   }
485
486   bd.rtp_mux = rtp_mux;
487   bd.padpriv = padpriv;
488   bd.drop = FALSE;
489
490   bufferlist = gst_buffer_list_make_writable (bufferlist);
491   gst_buffer_list_foreach (bufferlist, process_list_item, &bd);
492
493   GST_OBJECT_UNLOCK (rtp_mux);
494
495   if (bd.drop) {
496     gst_buffer_list_unref (bufferlist);
497     ret = GST_FLOW_OK;
498   } else {
499     ret = gst_pad_push_list (rtp_mux->srcpad, bufferlist);
500   }
501
502 out:
503
504   return ret;
505 }
506
507 static gboolean
508 resend_events (GstPad * pad, GstEvent ** event, gpointer user_data)
509 {
510   GstRTPMux *rtp_mux = user_data;
511
512   if (GST_EVENT_TYPE (*event) == GST_EVENT_CAPS) {
513     GstCaps *caps;
514
515     gst_event_parse_caps (*event, &caps);
516     gst_rtp_mux_setcaps (pad, rtp_mux, caps);
517   } else {
518     gst_pad_push_event (rtp_mux->srcpad, gst_event_ref (*event));
519   }
520
521   return TRUE;
522 }
523
524 static GstFlowReturn
525 gst_rtp_mux_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
526 {
527   GstRTPMux *rtp_mux;
528   GstFlowReturn ret;
529   GstRTPMuxPadPrivate *padpriv;
530   gboolean drop;
531   gboolean changed = FALSE;
532   GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
533
534   rtp_mux = GST_RTP_MUX (parent);
535
536   if (gst_pad_check_reconfigure (rtp_mux->srcpad)) {
537     GstCaps *current_caps = gst_pad_get_current_caps (pad);
538
539     if (!gst_rtp_mux_setcaps (pad, rtp_mux, current_caps)) {
540       ret = GST_FLOW_NOT_NEGOTIATED;
541       gst_buffer_unref (buffer);
542       goto out;
543     }
544     gst_caps_unref (current_caps);
545   }
546
547   GST_OBJECT_LOCK (rtp_mux);
548   padpriv = gst_pad_get_element_private (pad);
549
550   if (!padpriv) {
551     GST_OBJECT_UNLOCK (rtp_mux);
552     gst_buffer_unref (buffer);
553     return GST_FLOW_NOT_LINKED;
554   }
555
556   buffer = gst_buffer_make_writable (buffer);
557
558   if (!gst_rtp_buffer_map (buffer, GST_MAP_READWRITE, &rtpbuffer)) {
559     GST_OBJECT_UNLOCK (rtp_mux);
560     gst_buffer_unref (buffer);
561     GST_ERROR_OBJECT (rtp_mux, "Invalid RTP buffer");
562     return GST_FLOW_ERROR;
563   }
564
565   drop = !process_buffer_locked (rtp_mux, padpriv, &rtpbuffer);
566
567   gst_rtp_buffer_unmap (&rtpbuffer);
568
569   if (!drop) {
570     if (pad != rtp_mux->last_pad) {
571       changed = TRUE;
572       g_clear_object (&rtp_mux->last_pad);
573       rtp_mux->last_pad = g_object_ref (pad);
574     }
575
576     if (GST_BUFFER_DURATION_IS_VALID (buffer) &&
577         GST_BUFFER_PTS_IS_VALID (buffer))
578       rtp_mux->last_stop = GST_BUFFER_PTS (buffer) +
579           GST_BUFFER_DURATION (buffer);
580     else
581       rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
582   }
583
584   GST_OBJECT_UNLOCK (rtp_mux);
585
586   if (changed)
587     gst_pad_sticky_events_foreach (pad, resend_events, rtp_mux);
588
589   if (drop) {
590     gst_buffer_unref (buffer);
591     ret = GST_FLOW_OK;
592   } else {
593     ret = gst_pad_push (rtp_mux->srcpad, buffer);
594   }
595
596 out:
597   return ret;
598 }
599
600 static gboolean
601 gst_rtp_mux_setcaps (GstPad * pad, GstRTPMux * rtp_mux, GstCaps * caps)
602 {
603   GstStructure *structure;
604   gboolean ret = FALSE;
605   GstRTPMuxPadPrivate *padpriv;
606   GstCaps *peercaps;
607
608   if (!gst_caps_is_fixed (caps))
609     return FALSE;
610
611   peercaps = gst_pad_peer_query_caps (rtp_mux->srcpad, NULL);
612   if (peercaps) {
613     GstCaps *tcaps, *othercaps;;
614     tcaps = gst_pad_get_pad_template_caps (pad);
615     othercaps = gst_caps_intersect_full (peercaps, tcaps,
616         GST_CAPS_INTERSECT_FIRST);
617
618     if (gst_caps_get_size (othercaps) > 0) {
619       structure = gst_caps_get_structure (othercaps, 0);
620       GST_OBJECT_LOCK (rtp_mux);
621       if (gst_structure_get_uint (structure, "ssrc", &rtp_mux->current_ssrc)) {
622         GST_DEBUG_OBJECT (pad, "Use downstream ssrc: %x",
623             rtp_mux->current_ssrc);
624         rtp_mux->have_ssrc = TRUE;
625       }
626       GST_OBJECT_UNLOCK (rtp_mux);
627     }
628
629     gst_caps_unref (othercaps);
630
631     gst_caps_unref (peercaps);
632     gst_caps_unref (tcaps);
633   }
634
635   structure = gst_caps_get_structure (caps, 0);
636
637   if (!structure)
638     return FALSE;
639
640   GST_OBJECT_LOCK (rtp_mux);
641   padpriv = gst_pad_get_element_private (pad);
642   if (padpriv &&
643       gst_structure_get_uint (structure, "timestamp-offset",
644           &padpriv->timestamp_offset)) {
645     padpriv->have_timestamp_offset = TRUE;
646   }
647
648   caps = gst_caps_copy (caps);
649
650   /* if we don't have a specified ssrc, first try to take one from the caps,
651      and if that fails, generate one */
652   if (!rtp_mux->have_ssrc) {
653     if (rtp_mux->ssrc_random) {
654       if (!gst_structure_get_uint (structure, "ssrc", &rtp_mux->current_ssrc))
655         rtp_mux->current_ssrc = g_random_int ();
656       rtp_mux->have_ssrc = TRUE;
657     }
658   }
659
660   gst_caps_set_simple (caps,
661       "timestamp-offset", G_TYPE_UINT, rtp_mux->ts_base,
662       "seqnum-offset", G_TYPE_UINT, rtp_mux->seqnum_base,
663       "ssrc", G_TYPE_UINT, rtp_mux->current_ssrc, NULL);
664
665   GST_OBJECT_UNLOCK (rtp_mux);
666
667   if (rtp_mux->send_stream_start) {
668     gchar s_id[32];
669
670     /* stream-start (FIXME: create id based on input ids) */
671     g_snprintf (s_id, sizeof (s_id), "interleave-%08x", g_random_int ());
672     gst_pad_push_event (rtp_mux->srcpad, gst_event_new_stream_start (s_id));
673
674     rtp_mux->send_stream_start = FALSE;
675   }
676
677   GST_DEBUG_OBJECT (rtp_mux,
678       "setting caps %" GST_PTR_FORMAT " on src pad..", caps);
679   ret = gst_pad_set_caps (rtp_mux->srcpad, caps);
680
681
682   gst_caps_unref (caps);
683
684   return ret;
685 }
686
687 static void
688 clear_caps (GstCaps * caps, gboolean only_clock_rate)
689 {
690   gint i, j;
691
692   /* Lets only match on the clock-rate */
693   for (i = 0; i < gst_caps_get_size (caps); i++) {
694     GstStructure *s = gst_caps_get_structure (caps, i);
695
696     for (j = 0; j < gst_structure_n_fields (s); j++) {
697       const gchar *name = gst_structure_nth_field_name (s, j);
698
699       if (strcmp (name, "clock-rate") && (only_clock_rate ||
700               (strcmp (name, "ssrc")))) {
701         gst_structure_remove_field (s, name);
702         j--;
703       }
704     }
705   }
706 }
707
708 static gboolean
709 same_clock_rate_fold (const GValue * item, GValue * ret, gpointer user_data)
710 {
711   GstPad *mypad = user_data;
712   GstPad *pad = g_value_get_object (item);
713   GstCaps *peercaps;
714   GstCaps *accumcaps;
715
716   if (pad == mypad)
717     return TRUE;
718
719   accumcaps = g_value_get_boxed (ret);
720   peercaps = gst_pad_peer_query_caps (pad, accumcaps);
721   if (!peercaps) {
722     g_warning ("no peercaps");
723     return TRUE;
724   }
725   peercaps = gst_caps_make_writable (peercaps);
726   clear_caps (peercaps, TRUE);
727
728   g_value_take_boxed (ret, peercaps);
729
730   return !gst_caps_is_empty (peercaps);
731 }
732
733 static GstCaps *
734 gst_rtp_mux_getcaps (GstPad * pad, GstRTPMux * mux, GstCaps * filter)
735 {
736   GstCaps *caps = NULL;
737   GstIterator *iter = NULL;
738   GValue v = { 0 };
739   GstIteratorResult res;
740   GstCaps *peercaps;
741   GstCaps *othercaps;
742   GstCaps *tcaps;
743
744   peercaps = gst_pad_peer_query_caps (mux->srcpad, NULL);
745
746   if (peercaps) {
747     tcaps = gst_pad_get_pad_template_caps (pad);
748     othercaps = gst_caps_intersect_full (peercaps, tcaps,
749         GST_CAPS_INTERSECT_FIRST);
750     gst_caps_unref (peercaps);
751   } else {
752     tcaps = gst_pad_get_pad_template_caps (mux->srcpad);
753     if (filter)
754       othercaps = gst_caps_intersect_full (filter, tcaps,
755           GST_CAPS_INTERSECT_FIRST);
756     else
757       othercaps = gst_caps_copy (tcaps);
758   }
759   gst_caps_unref (tcaps);
760
761   GST_LOG_OBJECT (pad, "Intersected srcpad-peercaps and template caps: %"
762       GST_PTR_FORMAT, othercaps);
763
764   clear_caps (othercaps, TRUE);
765
766   g_value_init (&v, GST_TYPE_CAPS);
767
768   iter = gst_element_iterate_sink_pads (GST_ELEMENT (mux));
769   do {
770     gst_value_set_caps (&v, othercaps);
771     res = gst_iterator_fold (iter, same_clock_rate_fold, &v, pad);
772     gst_iterator_resync (iter);
773   } while (res == GST_ITERATOR_RESYNC);
774   gst_iterator_free (iter);
775
776   caps = gst_caps_intersect ((GstCaps *) gst_value_get_caps (&v), othercaps);
777
778   g_value_unset (&v);
779   gst_caps_unref (othercaps);
780
781   if (res == GST_ITERATOR_ERROR) {
782     gst_caps_unref (caps);
783     caps = gst_caps_new_empty ();
784   }
785
786
787   return caps;
788 }
789
790 static gboolean
791 gst_rtp_mux_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
792 {
793   GstRTPMux *mux = GST_RTP_MUX (parent);
794   gboolean res = FALSE;
795
796   switch (GST_QUERY_TYPE (query)) {
797     case GST_QUERY_CAPS:
798     {
799       GstCaps *filter, *caps;
800
801       gst_query_parse_caps (query, &filter);
802       GST_LOG_OBJECT (pad, "Received caps-query with filter-caps: %"
803           GST_PTR_FORMAT, filter);
804       caps = gst_rtp_mux_getcaps (pad, mux, filter);
805       gst_query_set_caps_result (query, caps);
806       GST_LOG_OBJECT (mux, "Answering caps-query with caps: %"
807           GST_PTR_FORMAT, caps);
808       gst_caps_unref (caps);
809       res = TRUE;
810       break;
811     }
812     default:
813       res = gst_pad_query_default (pad, parent, query);
814       break;
815   }
816
817   return res;
818 }
819
820 static void
821 gst_rtp_mux_get_property (GObject * object,
822     guint prop_id, GValue * value, GParamSpec * pspec)
823 {
824   GstRTPMux *rtp_mux;
825
826   rtp_mux = GST_RTP_MUX (object);
827
828   GST_OBJECT_LOCK (rtp_mux);
829   switch (prop_id) {
830     case PROP_TIMESTAMP_OFFSET:
831       g_value_set_int (value, rtp_mux->ts_offset);
832       break;
833     case PROP_SEQNUM_OFFSET:
834       g_value_set_int (value, rtp_mux->seqnum_offset);
835       break;
836     case PROP_SEQNUM:
837       g_value_set_uint (value, rtp_mux->seqnum);
838       break;
839     case PROP_SSRC:
840       g_value_set_uint (value, rtp_mux->ssrc);
841       break;
842     default:
843       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
844       break;
845   }
846   GST_OBJECT_UNLOCK (rtp_mux);
847 }
848
849 static void
850 gst_rtp_mux_set_property (GObject * object,
851     guint prop_id, const GValue * value, GParamSpec * pspec)
852 {
853   GstRTPMux *rtp_mux;
854
855   rtp_mux = GST_RTP_MUX (object);
856
857   switch (prop_id) {
858     case PROP_TIMESTAMP_OFFSET:
859       rtp_mux->ts_offset = g_value_get_int (value);
860       break;
861     case PROP_SEQNUM_OFFSET:
862       rtp_mux->seqnum_offset = g_value_get_int (value);
863       break;
864     case PROP_SSRC:
865       GST_OBJECT_LOCK (rtp_mux);
866       rtp_mux->ssrc = g_value_get_uint (value);
867       rtp_mux->current_ssrc = rtp_mux->ssrc;
868       rtp_mux->have_ssrc = TRUE;
869       rtp_mux->ssrc_random = FALSE;
870       GST_OBJECT_UNLOCK (rtp_mux);
871       break;
872     default:
873       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
874       break;
875   }
876 }
877
878 static gboolean
879 gst_rtp_mux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
880 {
881   GstRTPMux *mux = GST_RTP_MUX (parent);
882   gboolean is_pad;
883   gboolean ret;
884
885   switch (GST_EVENT_TYPE (event)) {
886     case GST_EVENT_CAPS:
887     {
888       GstCaps *caps;
889
890       gst_event_parse_caps (event, &caps);
891       GST_LOG_OBJECT (pad, "Received caps-event with caps: %"
892           GST_PTR_FORMAT, caps);
893       ret = gst_rtp_mux_setcaps (pad, mux, caps);
894       gst_event_unref (event);
895       return ret;
896     }
897     case GST_EVENT_FLUSH_STOP:
898     {
899       GST_OBJECT_LOCK (mux);
900       mux->last_stop = GST_CLOCK_TIME_NONE;
901       GST_OBJECT_UNLOCK (mux);
902       break;
903     }
904     case GST_EVENT_SEGMENT:
905     {
906       GstRTPMuxPadPrivate *padpriv;
907
908       GST_OBJECT_LOCK (mux);
909       padpriv = gst_pad_get_element_private (pad);
910
911       if (padpriv) {
912         gst_event_copy_segment (event, &padpriv->segment);
913       }
914       GST_OBJECT_UNLOCK (mux);
915       break;
916     }
917     default:
918       break;
919   }
920
921   GST_OBJECT_LOCK (mux);
922   is_pad = (pad == mux->last_pad);
923   GST_OBJECT_UNLOCK (mux);
924
925   if (is_pad) {
926     return gst_pad_push_event (mux->srcpad, event);
927   } else {
928     gst_event_unref (event);
929     return TRUE;
930   }
931 }
932
933 static void
934 gst_rtp_mux_ready_to_paused (GstRTPMux * rtp_mux)
935 {
936
937   GST_OBJECT_LOCK (rtp_mux);
938
939   g_clear_object (&rtp_mux->last_pad);
940   rtp_mux->send_stream_start = TRUE;
941
942   if (rtp_mux->seqnum_offset == -1)
943     rtp_mux->seqnum_base = g_random_int_range (0, G_MAXUINT16);
944   else
945     rtp_mux->seqnum_base = rtp_mux->seqnum_offset;
946   rtp_mux->seqnum = rtp_mux->seqnum_base;
947
948   if (rtp_mux->ts_offset == -1)
949     rtp_mux->ts_base = g_random_int ();
950   else
951     rtp_mux->ts_base = rtp_mux->ts_offset;
952
953   rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
954
955   if (rtp_mux->ssrc_random) {
956     rtp_mux->have_ssrc = FALSE;
957   } else {
958     rtp_mux->current_ssrc = rtp_mux->ssrc;
959     rtp_mux->have_ssrc = TRUE;
960   }
961
962   GST_DEBUG_OBJECT (rtp_mux, "set timestamp-offset to %u", rtp_mux->ts_base);
963
964   GST_OBJECT_UNLOCK (rtp_mux);
965 }
966
967 static GstStateChangeReturn
968 gst_rtp_mux_change_state (GstElement * element, GstStateChange transition)
969 {
970   GstRTPMux *rtp_mux;
971   GstStateChangeReturn ret;
972
973   rtp_mux = GST_RTP_MUX (element);
974
975   switch (transition) {
976     case GST_STATE_CHANGE_READY_TO_PAUSED:
977       gst_rtp_mux_ready_to_paused (rtp_mux);
978       break;
979     default:
980       break;
981   }
982
983   ret = GST_ELEMENT_CLASS (gst_rtp_mux_parent_class)->change_state (element,
984       transition);
985
986   switch (transition) {
987     case GST_STATE_CHANGE_PAUSED_TO_READY:
988       g_clear_object (&rtp_mux->last_pad);
989       break;
990     default:
991       break;
992   }
993
994   return ret;
995 }
996
997 gboolean
998 gst_rtp_mux_plugin_init (GstPlugin * plugin)
999 {
1000   GST_DEBUG_CATEGORY_INIT (gst_rtp_mux_debug, "rtpmux", 0, "rtp muxer");
1001
1002   return gst_element_register (plugin, "rtpmux", GST_RANK_NONE,
1003       GST_TYPE_RTP_MUX);
1004 }