rtpmux: protect against NULL caps
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpmux.c
1 /* RTP muxer element for GStreamer
2  *
3  * gstrtpmux.c:
4  *
5  * Copyright (C) <2007-2010> Nokia Corporation.
6  *   Contact: Zeeshan Ali <zeeshan.ali@nokia.com>
7  * Copyright (C) <2007-2010> Collabora Ltd
8  *   Contact: Olivier Crete <olivier.crete@collabora.co.uk>
9  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
10  *               2000,2005 Wim Taymans <wim@fluendo.com>
11  *
12  * This library is free software; you can redistribute it and/or
13  * modify it under the terms of the GNU Library General Public
14  * License as published by the Free Software Foundation; either
15  * version 2 of the License, or (at your option) any later version.
16  *
17  * This library is distributed in the hope that it will be useful,
18  * but WITHOUT ANY WARRANTY; without even the implied warranty of
19  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20  * Library General Public License for more details.
21  *
22  * You should have received a copy of the GNU Library General Public
23  * License along with this library; if not, write to the
24  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
25  * Boston, MA 02110-1301, USA.
26  */
27
28 /**
29  * SECTION:element-rtpmux
30  * @see_also: rtpdtmfmux
31  *
32  * The rtp muxer takes multiple RTP streams having the same clock-rate and
33  * muxes into a single stream with a single SSRC.
34  *
35  * <refsect2>
36  * <title>Example pipelines</title>
37  * |[
38  * gst-launch-1.0 rtpmux name=mux ! udpsink host=127.0.0.1 port=8888        \
39  *              alsasrc ! alawenc ! rtppcmapay !                        \
40  *              application/x-rtp, payload=8, rate=8000 ! mux.sink_0    \
41  *              audiotestsrc is-live=1 !                                \
42  *              mulawenc ! rtppcmupay !                                 \
43  *              application/x-rtp, payload=0, rate=8000 ! mux.sink_1
44  * ]|
45  * In this example, an audio stream is captured from ALSA and another is
46  * generated, both are encoded into different payload types and muxed together
47  * so they can be sent on the same port.
48  * </refsect2>
49  */
50
51 #ifdef HAVE_CONFIG_H
52 #include "config.h"
53 #endif
54
55 #include <gst/gst.h>
56 #include <gst/rtp/gstrtpbuffer.h>
57 #include <string.h>
58
59 #include "gstrtpmux.h"
60
61 GST_DEBUG_CATEGORY_STATIC (gst_rtp_mux_debug);
62 #define GST_CAT_DEFAULT gst_rtp_mux_debug
63
64 enum
65 {
66   PROP_0,
67   PROP_TIMESTAMP_OFFSET,
68   PROP_SEQNUM_OFFSET,
69   PROP_SEQNUM,
70   PROP_SSRC
71 };
72
73 #define DEFAULT_TIMESTAMP_OFFSET -1
74 #define DEFAULT_SEQNUM_OFFSET    -1
75 #define DEFAULT_SSRC             -1
76
77 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
78     GST_PAD_SRC,
79     GST_PAD_ALWAYS,
80     GST_STATIC_CAPS ("application/x-rtp")
81     );
82
83 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink_%u",
84     GST_PAD_SINK,
85     GST_PAD_REQUEST,
86     GST_STATIC_CAPS ("application/x-rtp")
87     );
88
89 static GstPad *gst_rtp_mux_request_new_pad (GstElement * element,
90     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
91 static void gst_rtp_mux_release_pad (GstElement * element, GstPad * pad);
92 static GstFlowReturn gst_rtp_mux_chain (GstPad * pad, GstObject * parent,
93     GstBuffer * buffer);
94 static GstFlowReturn gst_rtp_mux_chain_list (GstPad * pad, GstObject * parent,
95     GstBufferList * bufferlist);
96 static gboolean gst_rtp_mux_setcaps (GstPad * pad, GstRTPMux * rtp_mux,
97     GstCaps * caps);
98 static gboolean gst_rtp_mux_sink_event (GstPad * pad, GstObject * parent,
99     GstEvent * event);
100 static gboolean gst_rtp_mux_sink_query (GstPad * pad, GstObject * parent,
101     GstQuery * query);
102
103 static GstStateChangeReturn gst_rtp_mux_change_state (GstElement *
104     element, GstStateChange transition);
105
106 static void gst_rtp_mux_set_property (GObject * object, guint prop_id,
107     const GValue * value, GParamSpec * pspec);
108 static void gst_rtp_mux_get_property (GObject * object, guint prop_id,
109     GValue * value, GParamSpec * pspec);
110 static void gst_rtp_mux_dispose (GObject * object);
111
112 static gboolean gst_rtp_mux_src_event_real (GstRTPMux * rtp_mux,
113     GstEvent * event);
114
115 G_DEFINE_TYPE (GstRTPMux, gst_rtp_mux, GST_TYPE_ELEMENT);
116
117
118 static void
119 gst_rtp_mux_class_init (GstRTPMuxClass * klass)
120 {
121   GObjectClass *gobject_class;
122   GstElementClass *gstelement_class;
123
124   gobject_class = (GObjectClass *) klass;
125   gstelement_class = (GstElementClass *) klass;
126
127
128   gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
129   gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
130
131   gst_element_class_set_static_metadata (gstelement_class, "RTP muxer",
132       "Codec/Muxer",
133       "multiplex N rtp streams into one", "Zeeshan Ali <first.last@nokia.com>");
134
135   gobject_class->get_property = gst_rtp_mux_get_property;
136   gobject_class->set_property = gst_rtp_mux_set_property;
137   gobject_class->dispose = gst_rtp_mux_dispose;
138
139   klass->src_event = gst_rtp_mux_src_event_real;
140
141   g_object_class_install_property (G_OBJECT_CLASS (klass),
142       PROP_TIMESTAMP_OFFSET, g_param_spec_int ("timestamp-offset",
143           "Timestamp Offset",
144           "Offset to add to all outgoing timestamps (-1 = random)", -1,
145           G_MAXINT, DEFAULT_TIMESTAMP_OFFSET,
146           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
147   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEQNUM_OFFSET,
148       g_param_spec_int ("seqnum-offset", "Sequence number Offset",
149           "Offset to add to all outgoing seqnum (-1 = random)", -1, G_MAXINT,
150           DEFAULT_SEQNUM_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
151   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEQNUM,
152       g_param_spec_uint ("seqnum", "Sequence number",
153           "The RTP sequence number of the last processed packet",
154           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
155   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SSRC,
156       g_param_spec_uint ("ssrc", "SSRC",
157           "The SSRC of the packets (default == random)",
158           0, G_MAXUINT, DEFAULT_SSRC,
159           GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
160           G_PARAM_STATIC_STRINGS));
161
162   gstelement_class->request_new_pad =
163       GST_DEBUG_FUNCPTR (gst_rtp_mux_request_new_pad);
164   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_mux_release_pad);
165   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_mux_change_state);
166 }
167
168 static void
169 gst_rtp_mux_dispose (GObject * object)
170 {
171   GstRTPMux *rtp_mux = GST_RTP_MUX (object);
172   GList *item;
173
174   g_clear_object (&rtp_mux->last_pad);
175
176 restart:
177   for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
178     GstPad *pad = GST_PAD (item->data);
179     if (GST_PAD_IS_SINK (pad)) {
180       gst_element_release_request_pad (GST_ELEMENT (object), pad);
181       goto restart;
182     }
183   }
184
185   G_OBJECT_CLASS (gst_rtp_mux_parent_class)->dispose (object);
186 }
187
188 static gboolean
189 gst_rtp_mux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
190 {
191   GstRTPMux *rtp_mux = GST_RTP_MUX (parent);
192   GstRTPMuxClass *klass;
193   gboolean ret;
194
195   klass = GST_RTP_MUX_GET_CLASS (rtp_mux);
196
197   ret = klass->src_event (rtp_mux, event);
198
199   return ret;
200 }
201
202 static gboolean
203 gst_rtp_mux_src_event_real (GstRTPMux * rtp_mux, GstEvent * event)
204 {
205   switch (GST_EVENT_TYPE (event)) {
206     case GST_EVENT_CUSTOM_UPSTREAM:
207     {
208       const GstStructure *s = gst_event_get_structure (event);
209
210       if (gst_structure_has_name (s, "GstRTPCollision")) {
211         guint ssrc = 0;
212
213         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
214           ssrc = -1;
215
216         GST_DEBUG_OBJECT (rtp_mux, "collided ssrc: %" G_GUINT32_FORMAT, ssrc);
217
218         /* choose another ssrc for our stream */
219         GST_OBJECT_LOCK (rtp_mux);
220         if (ssrc == rtp_mux->current_ssrc) {
221           GstCaps *caps;
222           guint suggested_ssrc = 0;
223           guint32 new_ssrc;
224
225           if (gst_structure_get_uint (s, "suggested-ssrc", &suggested_ssrc))
226             rtp_mux->current_ssrc = suggested_ssrc;
227
228           while (ssrc == rtp_mux->current_ssrc)
229             rtp_mux->current_ssrc = g_random_int ();
230
231           new_ssrc = rtp_mux->current_ssrc;
232           GST_OBJECT_UNLOCK (rtp_mux);
233
234           caps = gst_pad_get_current_caps (rtp_mux->srcpad);
235           caps = gst_caps_make_writable (caps);
236           gst_caps_set_simple (caps, "ssrc", G_TYPE_UINT, new_ssrc, NULL);
237           gst_pad_set_caps (rtp_mux->srcpad, caps);
238           gst_caps_unref (caps);
239         } else {
240           GST_OBJECT_UNLOCK (rtp_mux);
241         }
242       }
243       break;
244     }
245     default:
246       break;
247   }
248
249
250   return gst_pad_event_default (rtp_mux->srcpad, GST_OBJECT (rtp_mux), event);
251 }
252
253 static void
254 gst_rtp_mux_init (GstRTPMux * rtp_mux)
255 {
256   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtp_mux);
257
258   rtp_mux->srcpad =
259       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
260           "src"), "src");
261   gst_pad_set_event_function (rtp_mux->srcpad,
262       GST_DEBUG_FUNCPTR (gst_rtp_mux_src_event));
263   gst_pad_use_fixed_caps (rtp_mux->srcpad);
264   gst_element_add_pad (GST_ELEMENT (rtp_mux), rtp_mux->srcpad);
265
266   rtp_mux->ssrc = DEFAULT_SSRC;
267   rtp_mux->current_ssrc = DEFAULT_SSRC;
268   rtp_mux->ts_offset = DEFAULT_TIMESTAMP_OFFSET;
269   rtp_mux->seqnum_offset = DEFAULT_SEQNUM_OFFSET;
270
271   rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
272 }
273
274 static void
275 gst_rtp_mux_setup_sinkpad (GstRTPMux * rtp_mux, GstPad * sinkpad)
276 {
277   GstRTPMuxPadPrivate *padpriv = g_slice_new0 (GstRTPMuxPadPrivate);
278
279   /* setup some pad functions */
280   gst_pad_set_chain_function (sinkpad, GST_DEBUG_FUNCPTR (gst_rtp_mux_chain));
281   gst_pad_set_chain_list_function (sinkpad,
282       GST_DEBUG_FUNCPTR (gst_rtp_mux_chain_list));
283   gst_pad_set_event_function (sinkpad,
284       GST_DEBUG_FUNCPTR (gst_rtp_mux_sink_event));
285   gst_pad_set_query_function (sinkpad,
286       GST_DEBUG_FUNCPTR (gst_rtp_mux_sink_query));
287
288
289   gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED);
290
291   gst_pad_set_element_private (sinkpad, padpriv);
292
293   gst_pad_set_active (sinkpad, TRUE);
294   gst_element_add_pad (GST_ELEMENT (rtp_mux), sinkpad);
295 }
296
297 static GstPad *
298 gst_rtp_mux_request_new_pad (GstElement * element,
299     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
300 {
301   GstRTPMux *rtp_mux;
302   GstPad *newpad;
303
304   g_return_val_if_fail (templ != NULL, NULL);
305   g_return_val_if_fail (GST_IS_RTP_MUX (element), NULL);
306
307   rtp_mux = GST_RTP_MUX (element);
308
309   if (templ->direction != GST_PAD_SINK) {
310     GST_WARNING_OBJECT (rtp_mux, "request pad that is not a SINK pad");
311     return NULL;
312   }
313
314   newpad = gst_pad_new_from_template (templ, req_name);
315   if (newpad)
316     gst_rtp_mux_setup_sinkpad (rtp_mux, newpad);
317   else
318     GST_WARNING_OBJECT (rtp_mux, "failed to create request pad");
319
320   return newpad;
321 }
322
323 static void
324 gst_rtp_mux_release_pad (GstElement * element, GstPad * pad)
325 {
326   GstRTPMuxPadPrivate *padpriv;
327
328   GST_OBJECT_LOCK (element);
329   padpriv = gst_pad_get_element_private (pad);
330   gst_pad_set_element_private (pad, NULL);
331   GST_OBJECT_UNLOCK (element);
332
333   gst_element_remove_pad (element, pad);
334
335   if (padpriv) {
336     g_slice_free (GstRTPMuxPadPrivate, padpriv);
337   }
338 }
339
340 /* Put our own timestamp-offset on the buffer */
341 static void
342 gst_rtp_mux_readjust_rtp_timestamp_locked (GstRTPMux * rtp_mux,
343     GstRTPMuxPadPrivate * padpriv, GstRTPBuffer * rtpbuffer)
344 {
345   guint32 ts;
346   guint32 sink_ts_base = 0;
347
348   if (padpriv && padpriv->have_timestamp_offset)
349     sink_ts_base = padpriv->timestamp_offset;
350
351   ts = gst_rtp_buffer_get_timestamp (rtpbuffer) - sink_ts_base +
352       rtp_mux->ts_base;
353   GST_LOG_OBJECT (rtp_mux, "Re-adjusting RTP ts %u to %u",
354       gst_rtp_buffer_get_timestamp (rtpbuffer), ts);
355   gst_rtp_buffer_set_timestamp (rtpbuffer, ts);
356 }
357
358 static gboolean
359 process_buffer_locked (GstRTPMux * rtp_mux, GstRTPMuxPadPrivate * padpriv,
360     GstRTPBuffer * rtpbuffer)
361 {
362   GstRTPMuxClass *klass = GST_RTP_MUX_GET_CLASS (rtp_mux);
363
364   if (klass->accept_buffer_locked)
365     if (!klass->accept_buffer_locked (rtp_mux, padpriv, rtpbuffer))
366       return FALSE;
367
368   rtp_mux->seqnum++;
369   gst_rtp_buffer_set_seq (rtpbuffer, rtp_mux->seqnum);
370
371   gst_rtp_buffer_set_ssrc (rtpbuffer, rtp_mux->current_ssrc);
372   gst_rtp_mux_readjust_rtp_timestamp_locked (rtp_mux, padpriv, rtpbuffer);
373   GST_LOG_OBJECT (rtp_mux,
374       "Pushing packet size %" G_GSIZE_FORMAT ", seq=%d, ts=%u",
375       rtpbuffer->map[0].size, rtp_mux->seqnum,
376       gst_rtp_buffer_get_timestamp (rtpbuffer));
377
378   if (padpriv) {
379     if (padpriv->segment.format == GST_FORMAT_TIME) {
380       GST_BUFFER_PTS (rtpbuffer->buffer) =
381           gst_segment_to_running_time (&padpriv->segment, GST_FORMAT_TIME,
382           GST_BUFFER_PTS (rtpbuffer->buffer));
383       GST_BUFFER_DTS (rtpbuffer->buffer) =
384           gst_segment_to_running_time (&padpriv->segment, GST_FORMAT_TIME,
385           GST_BUFFER_DTS (rtpbuffer->buffer));
386     }
387   }
388
389   return TRUE;
390 }
391
392 struct BufferListData
393 {
394   GstRTPMux *rtp_mux;
395   GstRTPMuxPadPrivate *padpriv;
396   gboolean drop;
397 };
398
399 static gboolean
400 process_list_item (GstBuffer ** buffer, guint idx, gpointer user_data)
401 {
402   struct BufferListData *bd = user_data;
403   GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
404
405   *buffer = gst_buffer_make_writable (*buffer);
406
407   gst_rtp_buffer_map (*buffer, GST_MAP_READWRITE, &rtpbuffer);
408
409   bd->drop = !process_buffer_locked (bd->rtp_mux, bd->padpriv, &rtpbuffer);
410
411   gst_rtp_buffer_unmap (&rtpbuffer);
412
413   if (bd->drop)
414     return FALSE;
415
416   if (GST_BUFFER_DURATION_IS_VALID (*buffer) &&
417       GST_BUFFER_PTS_IS_VALID (*buffer))
418     bd->rtp_mux->last_stop = GST_BUFFER_PTS (*buffer) +
419         GST_BUFFER_DURATION (*buffer);
420   else
421     bd->rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
422
423   return TRUE;
424 }
425
426 static gboolean resend_events (GstPad * pad, GstEvent ** event,
427     gpointer user_data);
428
429 static GstFlowReturn
430 gst_rtp_mux_chain_list (GstPad * pad, GstObject * parent,
431     GstBufferList * bufferlist)
432 {
433   GstRTPMux *rtp_mux;
434   GstFlowReturn ret;
435   GstRTPMuxPadPrivate *padpriv;
436   gboolean changed = FALSE;
437   struct BufferListData bd;
438
439   rtp_mux = GST_RTP_MUX (parent);
440
441   if (gst_pad_check_reconfigure (rtp_mux->srcpad)) {
442     GstCaps *current_caps = gst_pad_get_current_caps (pad);
443
444     if (!gst_rtp_mux_setcaps (pad, rtp_mux, current_caps)) {
445       gst_pad_mark_reconfigure (rtp_mux->srcpad);
446       if (GST_PAD_IS_FLUSHING (rtp_mux->srcpad))
447         ret = GST_FLOW_FLUSHING;
448       else
449         ret = GST_FLOW_NOT_NEGOTIATED;
450       gst_buffer_list_unref (bufferlist);
451       goto out;
452     }
453     gst_caps_unref (current_caps);
454   }
455
456   GST_OBJECT_LOCK (rtp_mux);
457
458   padpriv = gst_pad_get_element_private (pad);
459   if (!padpriv) {
460     GST_OBJECT_UNLOCK (rtp_mux);
461     ret = GST_FLOW_NOT_LINKED;
462     gst_buffer_list_unref (bufferlist);
463     goto out;
464   }
465
466   bd.rtp_mux = rtp_mux;
467   bd.padpriv = padpriv;
468   bd.drop = FALSE;
469
470   bufferlist = gst_buffer_list_make_writable (bufferlist);
471   gst_buffer_list_foreach (bufferlist, process_list_item, &bd);
472
473   if (!bd.drop && pad != rtp_mux->last_pad) {
474     changed = TRUE;
475     g_clear_object (&rtp_mux->last_pad);
476     rtp_mux->last_pad = g_object_ref (pad);
477   }
478
479   GST_OBJECT_UNLOCK (rtp_mux);
480
481   if (changed)
482     gst_pad_sticky_events_foreach (pad, resend_events, rtp_mux);
483
484   if (bd.drop) {
485     gst_buffer_list_unref (bufferlist);
486     ret = GST_FLOW_OK;
487   } else {
488     ret = gst_pad_push_list (rtp_mux->srcpad, bufferlist);
489   }
490
491 out:
492
493   return ret;
494 }
495
496 static gboolean
497 resend_events (GstPad * pad, GstEvent ** event, gpointer user_data)
498 {
499   GstRTPMux *rtp_mux = user_data;
500
501   if (GST_EVENT_TYPE (*event) == GST_EVENT_CAPS) {
502     GstCaps *caps;
503
504     gst_event_parse_caps (*event, &caps);
505     gst_rtp_mux_setcaps (pad, rtp_mux, caps);
506   } else if (GST_EVENT_TYPE (*event) == GST_EVENT_SEGMENT) {
507     GstSegment new_segment;
508     gst_segment_init (&new_segment, GST_FORMAT_TIME);
509     gst_pad_push_event (rtp_mux->srcpad, gst_event_new_segment (&new_segment));
510   } else {
511     gst_pad_push_event (rtp_mux->srcpad, gst_event_ref (*event));
512   }
513
514   return TRUE;
515 }
516
517 static GstFlowReturn
518 gst_rtp_mux_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
519 {
520   GstRTPMux *rtp_mux;
521   GstFlowReturn ret;
522   GstRTPMuxPadPrivate *padpriv;
523   gboolean drop;
524   gboolean changed = FALSE;
525   GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
526
527   rtp_mux = GST_RTP_MUX (parent);
528
529   if (gst_pad_check_reconfigure (rtp_mux->srcpad)) {
530     GstCaps *current_caps = gst_pad_get_current_caps (pad);
531
532     if (!gst_rtp_mux_setcaps (pad, rtp_mux, current_caps)) {
533       gst_pad_mark_reconfigure (rtp_mux->srcpad);
534       if (GST_PAD_IS_FLUSHING (rtp_mux->srcpad))
535         ret = GST_FLOW_FLUSHING;
536       else
537         ret = GST_FLOW_NOT_NEGOTIATED;
538       gst_buffer_unref (buffer);
539       goto out;
540     }
541     gst_caps_unref (current_caps);
542   }
543
544   GST_OBJECT_LOCK (rtp_mux);
545   padpriv = gst_pad_get_element_private (pad);
546
547   if (!padpriv) {
548     GST_OBJECT_UNLOCK (rtp_mux);
549     gst_buffer_unref (buffer);
550     return GST_FLOW_NOT_LINKED;
551   }
552
553   buffer = gst_buffer_make_writable (buffer);
554
555   if (!gst_rtp_buffer_map (buffer, GST_MAP_READWRITE, &rtpbuffer)) {
556     GST_OBJECT_UNLOCK (rtp_mux);
557     gst_buffer_unref (buffer);
558     GST_ERROR_OBJECT (rtp_mux, "Invalid RTP buffer");
559     return GST_FLOW_ERROR;
560   }
561
562   drop = !process_buffer_locked (rtp_mux, padpriv, &rtpbuffer);
563
564   gst_rtp_buffer_unmap (&rtpbuffer);
565
566   if (!drop) {
567     if (pad != rtp_mux->last_pad) {
568       changed = TRUE;
569       g_clear_object (&rtp_mux->last_pad);
570       rtp_mux->last_pad = g_object_ref (pad);
571     }
572
573     if (GST_BUFFER_DURATION_IS_VALID (buffer) &&
574         GST_BUFFER_PTS_IS_VALID (buffer))
575       rtp_mux->last_stop = GST_BUFFER_PTS (buffer) +
576           GST_BUFFER_DURATION (buffer);
577     else
578       rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
579   }
580
581   GST_OBJECT_UNLOCK (rtp_mux);
582
583   if (changed)
584     gst_pad_sticky_events_foreach (pad, resend_events, rtp_mux);
585
586   if (drop) {
587     gst_buffer_unref (buffer);
588     ret = GST_FLOW_OK;
589   } else {
590     ret = gst_pad_push (rtp_mux->srcpad, buffer);
591   }
592
593 out:
594   return ret;
595 }
596
597 static gboolean
598 gst_rtp_mux_setcaps (GstPad * pad, GstRTPMux * rtp_mux, GstCaps * caps)
599 {
600   GstStructure *structure;
601   gboolean ret = FALSE;
602   GstRTPMuxPadPrivate *padpriv;
603   GstCaps *peercaps;
604
605   if (caps == NULL)
606     return FALSE;
607
608   if (!gst_caps_is_fixed (caps))
609     return FALSE;
610
611   peercaps = gst_pad_peer_query_caps (rtp_mux->srcpad, NULL);
612   if (peercaps) {
613     GstCaps *tcaps, *othercaps;;
614     tcaps = gst_pad_get_pad_template_caps (pad);
615     othercaps = gst_caps_intersect_full (peercaps, tcaps,
616         GST_CAPS_INTERSECT_FIRST);
617
618     if (gst_caps_get_size (othercaps) > 0) {
619       structure = gst_caps_get_structure (othercaps, 0);
620       GST_OBJECT_LOCK (rtp_mux);
621       if (gst_structure_get_uint (structure, "ssrc", &rtp_mux->current_ssrc)) {
622         GST_DEBUG_OBJECT (pad, "Use downstream ssrc: %x",
623             rtp_mux->current_ssrc);
624         rtp_mux->have_ssrc = TRUE;
625       }
626       GST_OBJECT_UNLOCK (rtp_mux);
627     }
628
629     gst_caps_unref (othercaps);
630
631     gst_caps_unref (peercaps);
632     gst_caps_unref (tcaps);
633   }
634
635   structure = gst_caps_get_structure (caps, 0);
636
637   if (!structure)
638     return FALSE;
639
640   GST_OBJECT_LOCK (rtp_mux);
641   padpriv = gst_pad_get_element_private (pad);
642   if (padpriv &&
643       gst_structure_get_uint (structure, "timestamp-offset",
644           &padpriv->timestamp_offset)) {
645     padpriv->have_timestamp_offset = TRUE;
646   }
647
648   caps = gst_caps_copy (caps);
649
650   /* if we don't have a specified ssrc, first try to take one from the caps,
651      and if that fails, generate one */
652   if (rtp_mux->ssrc == DEFAULT_SSRC) {
653     if (rtp_mux->current_ssrc == DEFAULT_SSRC) {
654       if (!gst_structure_get_uint (structure, "ssrc", &rtp_mux->current_ssrc))
655         rtp_mux->current_ssrc = g_random_int ();
656     }
657   } else {
658     rtp_mux->current_ssrc = rtp_mux->ssrc;
659   }
660
661   gst_caps_set_simple (caps,
662       "timestamp-offset", G_TYPE_UINT, rtp_mux->ts_base,
663       "seqnum-offset", G_TYPE_UINT, rtp_mux->seqnum_base,
664       "ssrc", G_TYPE_UINT, rtp_mux->current_ssrc, NULL);
665
666   GST_OBJECT_UNLOCK (rtp_mux);
667
668   if (rtp_mux->send_stream_start) {
669     gchar s_id[32];
670
671     /* stream-start (FIXME: create id based on input ids) */
672     g_snprintf (s_id, sizeof (s_id), "interleave-%08x", g_random_int ());
673     gst_pad_push_event (rtp_mux->srcpad, gst_event_new_stream_start (s_id));
674
675     rtp_mux->send_stream_start = FALSE;
676   }
677
678   GST_DEBUG_OBJECT (rtp_mux,
679       "setting caps %" GST_PTR_FORMAT " on src pad..", caps);
680   ret = gst_pad_set_caps (rtp_mux->srcpad, caps);
681
682
683   gst_caps_unref (caps);
684
685   return ret;
686 }
687
688 static void
689 clear_caps (GstCaps * caps, gboolean only_clock_rate)
690 {
691   gint i, j;
692
693   /* Lets only match on the clock-rate */
694   for (i = 0; i < gst_caps_get_size (caps); i++) {
695     GstStructure *s = gst_caps_get_structure (caps, i);
696
697     for (j = 0; j < gst_structure_n_fields (s); j++) {
698       const gchar *name = gst_structure_nth_field_name (s, j);
699
700       if (strcmp (name, "clock-rate") && (only_clock_rate ||
701               (strcmp (name, "ssrc")))) {
702         gst_structure_remove_field (s, name);
703         j--;
704       }
705     }
706   }
707 }
708
709 static gboolean
710 same_clock_rate_fold (const GValue * item, GValue * ret, gpointer user_data)
711 {
712   GstPad *mypad = user_data;
713   GstPad *pad = g_value_get_object (item);
714   GstCaps *peercaps;
715   GstCaps *accumcaps;
716
717   if (pad == mypad)
718     return TRUE;
719
720   accumcaps = g_value_get_boxed (ret);
721   peercaps = gst_pad_peer_query_caps (pad, accumcaps);
722   if (!peercaps) {
723     g_warning ("no peercaps");
724     return TRUE;
725   }
726   peercaps = gst_caps_make_writable (peercaps);
727   clear_caps (peercaps, TRUE);
728
729   g_value_take_boxed (ret, peercaps);
730
731   return !gst_caps_is_empty (peercaps);
732 }
733
734 static GstCaps *
735 gst_rtp_mux_getcaps (GstPad * pad, GstRTPMux * mux, GstCaps * filter)
736 {
737   GstCaps *caps = NULL;
738   GstIterator *iter = NULL;
739   GValue v = { 0 };
740   GstIteratorResult res;
741   GstCaps *peercaps;
742   GstCaps *othercaps;
743   GstCaps *tcaps;
744   const GstStructure *structure;
745
746   peercaps = gst_pad_peer_query_caps (mux->srcpad, NULL);
747
748   if (peercaps) {
749     tcaps = gst_pad_get_pad_template_caps (pad);
750     othercaps = gst_caps_intersect_full (peercaps, tcaps,
751         GST_CAPS_INTERSECT_FIRST);
752     gst_caps_unref (peercaps);
753   } else {
754     tcaps = gst_pad_get_pad_template_caps (mux->srcpad);
755     if (filter)
756       othercaps = gst_caps_intersect_full (filter, tcaps,
757           GST_CAPS_INTERSECT_FIRST);
758     else
759       othercaps = gst_caps_copy (tcaps);
760   }
761   gst_caps_unref (tcaps);
762
763   GST_LOG_OBJECT (pad, "Intersected srcpad-peercaps and template caps: %"
764       GST_PTR_FORMAT, othercaps);
765
766   structure = gst_caps_get_structure (othercaps, 0);
767   if (mux->ssrc == DEFAULT_SSRC) {
768     if (gst_structure_get_uint (structure, "ssrc", &mux->current_ssrc))
769       GST_DEBUG_OBJECT (pad, "Use downstream ssrc: %u", mux->current_ssrc);
770   }
771
772   clear_caps (othercaps, TRUE);
773
774   g_value_init (&v, GST_TYPE_CAPS);
775
776   iter = gst_element_iterate_sink_pads (GST_ELEMENT (mux));
777   do {
778     gst_value_set_caps (&v, othercaps);
779     res = gst_iterator_fold (iter, same_clock_rate_fold, &v, pad);
780     gst_iterator_resync (iter);
781   } while (res == GST_ITERATOR_RESYNC);
782   gst_iterator_free (iter);
783
784   caps = gst_caps_intersect ((GstCaps *) gst_value_get_caps (&v), othercaps);
785
786   g_value_unset (&v);
787   gst_caps_unref (othercaps);
788
789   if (res == GST_ITERATOR_ERROR) {
790     gst_caps_unref (caps);
791     caps = gst_caps_new_empty ();
792   }
793
794
795   return caps;
796 }
797
798 static gboolean
799 gst_rtp_mux_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
800 {
801   GstRTPMux *mux = GST_RTP_MUX (parent);
802   gboolean res = FALSE;
803
804   switch (GST_QUERY_TYPE (query)) {
805     case GST_QUERY_CAPS:
806     {
807       GstCaps *filter, *caps;
808
809       gst_query_parse_caps (query, &filter);
810       GST_LOG_OBJECT (pad, "Received caps-query with filter-caps: %"
811           GST_PTR_FORMAT, filter);
812       caps = gst_rtp_mux_getcaps (pad, mux, filter);
813       gst_query_set_caps_result (query, caps);
814       GST_LOG_OBJECT (mux, "Answering caps-query with caps: %"
815           GST_PTR_FORMAT, caps);
816       gst_caps_unref (caps);
817       res = TRUE;
818       break;
819     }
820     default:
821       res = gst_pad_query_default (pad, parent, query);
822       break;
823   }
824
825   return res;
826 }
827
828 static void
829 gst_rtp_mux_get_property (GObject * object,
830     guint prop_id, GValue * value, GParamSpec * pspec)
831 {
832   GstRTPMux *rtp_mux;
833
834   rtp_mux = GST_RTP_MUX (object);
835
836   GST_OBJECT_LOCK (rtp_mux);
837   switch (prop_id) {
838     case PROP_TIMESTAMP_OFFSET:
839       g_value_set_int (value, rtp_mux->ts_offset);
840       break;
841     case PROP_SEQNUM_OFFSET:
842       g_value_set_int (value, rtp_mux->seqnum_offset);
843       break;
844     case PROP_SEQNUM:
845       g_value_set_uint (value, rtp_mux->seqnum);
846       break;
847     case PROP_SSRC:
848       g_value_set_uint (value, rtp_mux->ssrc);
849       break;
850     default:
851       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
852       break;
853   }
854   GST_OBJECT_UNLOCK (rtp_mux);
855 }
856
857 static void
858 gst_rtp_mux_set_property (GObject * object,
859     guint prop_id, const GValue * value, GParamSpec * pspec)
860 {
861   GstRTPMux *rtp_mux;
862
863   rtp_mux = GST_RTP_MUX (object);
864
865   switch (prop_id) {
866     case PROP_TIMESTAMP_OFFSET:
867       rtp_mux->ts_offset = g_value_get_int (value);
868       break;
869     case PROP_SEQNUM_OFFSET:
870       rtp_mux->seqnum_offset = g_value_get_int (value);
871       break;
872     case PROP_SSRC:
873       GST_OBJECT_LOCK (rtp_mux);
874       rtp_mux->ssrc = g_value_get_uint (value);
875       rtp_mux->current_ssrc = rtp_mux->ssrc;
876       rtp_mux->have_ssrc = TRUE;
877       rtp_mux->ssrc_random = FALSE;
878       GST_OBJECT_UNLOCK (rtp_mux);
879       break;
880     default:
881       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
882       break;
883   }
884 }
885
886 static gboolean
887 gst_rtp_mux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
888 {
889   GstRTPMux *mux = GST_RTP_MUX (parent);
890   gboolean is_pad;
891   gboolean ret;
892
893   GST_OBJECT_LOCK (mux);
894   is_pad = (pad == mux->last_pad);
895   GST_OBJECT_UNLOCK (mux);
896
897   switch (GST_EVENT_TYPE (event)) {
898     case GST_EVENT_CAPS:
899     {
900       GstCaps *caps;
901
902       gst_event_parse_caps (event, &caps);
903       GST_LOG_OBJECT (pad, "Received caps-event with caps: %"
904           GST_PTR_FORMAT, caps);
905       ret = gst_rtp_mux_setcaps (pad, mux, caps);
906       gst_event_unref (event);
907       return ret;
908     }
909     case GST_EVENT_FLUSH_STOP:
910     {
911       GST_OBJECT_LOCK (mux);
912       mux->last_stop = GST_CLOCK_TIME_NONE;
913       GST_OBJECT_UNLOCK (mux);
914       break;
915     }
916     case GST_EVENT_SEGMENT:
917     {
918       GstRTPMuxPadPrivate *padpriv;
919
920       GST_OBJECT_LOCK (mux);
921       padpriv = gst_pad_get_element_private (pad);
922
923       if (padpriv) {
924         gst_event_copy_segment (event, &padpriv->segment);
925       }
926       GST_OBJECT_UNLOCK (mux);
927
928       if (is_pad) {
929         GstSegment new_segment;
930         gst_segment_init (&new_segment, GST_FORMAT_TIME);
931         gst_event_unref (event);
932         event = gst_event_new_segment (&new_segment);
933       }
934       break;
935     }
936     default:
937       break;
938   }
939
940   if (is_pad) {
941     return gst_pad_push_event (mux->srcpad, event);
942   } else {
943     gst_event_unref (event);
944     return TRUE;
945   }
946 }
947
948 static void
949 gst_rtp_mux_ready_to_paused (GstRTPMux * rtp_mux)
950 {
951
952   GST_OBJECT_LOCK (rtp_mux);
953
954   g_clear_object (&rtp_mux->last_pad);
955   rtp_mux->send_stream_start = TRUE;
956
957   if (rtp_mux->seqnum_offset == -1)
958     rtp_mux->seqnum_base = g_random_int_range (0, G_MAXUINT16);
959   else
960     rtp_mux->seqnum_base = rtp_mux->seqnum_offset;
961   rtp_mux->seqnum = rtp_mux->seqnum_base;
962
963   if (rtp_mux->ts_offset == -1)
964     rtp_mux->ts_base = g_random_int ();
965   else
966     rtp_mux->ts_base = rtp_mux->ts_offset;
967
968   rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
969
970   if (rtp_mux->ssrc_random) {
971     rtp_mux->have_ssrc = FALSE;
972   } else {
973     rtp_mux->current_ssrc = rtp_mux->ssrc;
974     rtp_mux->have_ssrc = TRUE;
975   }
976
977   GST_DEBUG_OBJECT (rtp_mux, "set timestamp-offset to %u", rtp_mux->ts_base);
978
979   GST_OBJECT_UNLOCK (rtp_mux);
980 }
981
982 static GstStateChangeReturn
983 gst_rtp_mux_change_state (GstElement * element, GstStateChange transition)
984 {
985   GstRTPMux *rtp_mux;
986   GstStateChangeReturn ret;
987
988   rtp_mux = GST_RTP_MUX (element);
989
990   switch (transition) {
991     case GST_STATE_CHANGE_READY_TO_PAUSED:
992       gst_rtp_mux_ready_to_paused (rtp_mux);
993       break;
994     default:
995       break;
996   }
997
998   ret = GST_ELEMENT_CLASS (gst_rtp_mux_parent_class)->change_state (element,
999       transition);
1000
1001   switch (transition) {
1002     case GST_STATE_CHANGE_PAUSED_TO_READY:
1003       g_clear_object (&rtp_mux->last_pad);
1004       break;
1005     default:
1006       break;
1007   }
1008
1009   return ret;
1010 }
1011
1012 gboolean
1013 gst_rtp_mux_plugin_init (GstPlugin * plugin)
1014 {
1015   GST_DEBUG_CATEGORY_INIT (gst_rtp_mux_debug, "rtpmux", 0, "rtp muxer");
1016
1017   return gst_element_register (plugin, "rtpmux", GST_RANK_NONE,
1018       GST_TYPE_RTP_MUX);
1019 }