rtsp-stream: plug socket leak
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gio/gio.h>
24
25 #include <gst/app/gstappsrc.h>
26 #include <gst/app/gstappsink.h>
27
28 #include "rtsp-stream.h"
29
30 enum
31 {
32   PROP_0,
33   PROP_LAST
34 };
35
36 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
37 #define GST_CAT_DEFAULT rtsp_stream_debug
38
39 static GQuark ssrc_stream_map_key;
40
41 static void gst_rtsp_stream_finalize (GObject * obj);
42
43 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
44
45 static void
46 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
47 {
48   GObjectClass *gobject_class;
49
50   gobject_class = G_OBJECT_CLASS (klass);
51
52   gobject_class->finalize = gst_rtsp_stream_finalize;
53
54   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
55
56   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
57 }
58
59 static void
60 gst_rtsp_stream_init (GstRTSPStream * stream)
61 {
62   g_mutex_init (&stream->lock);
63 }
64
65 static void
66 gst_rtsp_stream_finalize (GObject * obj)
67 {
68   GstRTSPStream *stream;
69
70   stream = GST_RTSP_STREAM (obj);
71
72   /* we really need to be unjoined now */
73   g_return_if_fail (!stream->is_joined);
74
75   if (stream->addr)
76     gst_rtsp_address_free (stream->addr);
77   if (stream->pool)
78     g_object_unref (stream->pool);
79   gst_object_unref (stream->payloader);
80   gst_object_unref (stream->srcpad);
81   g_mutex_clear (&stream->lock);
82
83   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
84 }
85
86 /**
87  * gst_rtsp_stream_new:
88  * @idx: an index
89  * @srcpad: a #GstPad
90  * @payloader: a #GstElement
91  *
92  * Create a new media stream with index @idx that handles RTP data on
93  * @srcpad and has a payloader element @payloader.
94  *
95  * Returns: a new #GstRTSPStream
96  */
97 GstRTSPStream *
98 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
99 {
100   GstRTSPStream *stream;
101
102   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
103   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
104   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
105
106   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
107   stream->idx = idx;
108   stream->payloader = gst_object_ref (payloader);
109   stream->srcpad = gst_object_ref (srcpad);
110
111   return stream;
112 }
113
114 /**
115  * gst_rtsp_stream_set_mtu:
116  * @stream: a #GstRTSPStream
117  * @mtu: a new MTU
118  *
119  * Configure the mtu in the payloader of @stream to @mtu.
120  */
121 void
122 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
123 {
124   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
125
126   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
127
128   g_object_set (G_OBJECT (stream->payloader), "mtu", mtu, NULL);
129 }
130
131 /**
132  * gst_rtsp_stream_get_mtu:
133  * @stream: a #GstRTSPStream
134  *
135  * Get the configured MTU in the payloader of @stream.
136  *
137  * Returns: the MTU of the payloader.
138  */
139 guint
140 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
141 {
142   guint mtu;
143
144   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
145
146   g_object_get (G_OBJECT (stream->payloader), "mtu", &mtu, NULL);
147
148   return mtu;
149 }
150
151 /**
152  * gst_rtsp_stream_set_address_pool:
153  * @stream: a #GstRTSPStream
154  * @pool: a #GstRTSPAddressPool
155  *
156  * configure @pool to be used as the address pool of @stream.
157  */
158 void
159 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
160     GstRTSPAddressPool * pool)
161 {
162   GstRTSPAddressPool *old;
163
164   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
165
166   GST_LOG_OBJECT (stream, "set address pool %p", pool);
167
168   g_mutex_lock (&stream->lock);
169   if ((old = stream->pool) != pool)
170     stream->pool = pool ? g_object_ref (pool) : NULL;
171   else
172     old = NULL;
173   g_mutex_unlock (&stream->lock);
174
175   if (old)
176     g_object_unref (old);
177 }
178
179 /**
180  * gst_rtsp_stream_get_address_pool:
181  * @stream: a #GstRTSPStream
182  *
183  * Get the #GstRTSPAddressPool used as the address pool of @stream.
184  *
185  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
186  * usage.
187  */
188 GstRTSPAddressPool *
189 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
190 {
191   GstRTSPAddressPool *result;
192
193   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
194
195   g_mutex_lock (&stream->lock);
196   if ((result = stream->pool))
197     g_object_ref (result);
198   g_mutex_unlock (&stream->lock);
199
200   return result;
201 }
202
203 /**
204  * gst_rtsp_stream_get_address:
205  * @stream: a #GstRTSPStream
206  *
207  * Get the multicast address of @stream.
208  *
209  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
210  * allocated. gst_rtsp_address_free() after usage.
211  */
212 GstRTSPAddress *
213 gst_rtsp_stream_get_address (GstRTSPStream * stream)
214 {
215   GstRTSPAddress *result;
216
217   g_mutex_lock (&stream->lock);
218   if (stream->addr == NULL) {
219     if (stream->pool == NULL)
220       goto no_pool;
221
222     stream->addr = gst_rtsp_address_pool_acquire_address (stream->pool,
223         GST_RTSP_ADDRESS_FLAG_EVEN_PORT, 2);
224     if (stream->addr == NULL)
225       goto no_address;
226   }
227   result = gst_rtsp_address_copy (stream->addr);
228   g_mutex_unlock (&stream->lock);
229
230   return result;
231
232   /* ERRORS */
233 no_pool:
234   {
235     GST_ERROR_OBJECT (stream, "no address pool specified");
236     g_mutex_unlock (&stream->lock);
237     return NULL;
238   }
239 no_address:
240   {
241     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
242     g_mutex_unlock (&stream->lock);
243     return NULL;
244   }
245 }
246
247 /* must be called with lock */
248 static gboolean
249 alloc_ports (GstRTSPStream * stream)
250 {
251   GstStateChangeReturn ret;
252   GstElement *udpsrc0, *udpsrc1;
253   GstElement *udpsink0, *udpsink1;
254   gint tmp_rtp, tmp_rtcp;
255   guint count;
256   gint rtpport, rtcpport;
257   GSocket *socket;
258   const gchar *host;
259
260   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
261
262   udpsrc0 = NULL;
263   udpsrc1 = NULL;
264   udpsink0 = NULL;
265   udpsink1 = NULL;
266   count = 0;
267
268   /* Start with random port */
269   tmp_rtp = 0;
270
271   if (stream->is_ipv6)
272     host = "udp://[::0]";
273   else
274     host = "udp://0.0.0.0";
275
276   /* try to allocate 2 UDP ports, the RTP port should be an even
277    * number and the RTCP port should be the next (uneven) port */
278 again:
279   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
280   if (udpsrc0 == NULL)
281     goto no_udp_protocol;
282   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
283
284   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
285   if (ret == GST_STATE_CHANGE_FAILURE) {
286     if (tmp_rtp != 0) {
287       tmp_rtp += 2;
288       if (++count > 20)
289         goto no_ports;
290
291       gst_element_set_state (udpsrc0, GST_STATE_NULL);
292       gst_object_unref (udpsrc0);
293
294       goto again;
295     }
296     goto no_udp_protocol;
297   }
298
299   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
300
301   /* check if port is even */
302   if ((tmp_rtp & 1) != 0) {
303     /* port not even, close and allocate another */
304     if (++count > 20)
305       goto no_ports;
306
307     gst_element_set_state (udpsrc0, GST_STATE_NULL);
308     gst_object_unref (udpsrc0);
309
310     tmp_rtp++;
311     goto again;
312   }
313
314   /* allocate port+1 for RTCP now */
315   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
316   if (udpsrc1 == NULL)
317     goto no_udp_rtcp_protocol;
318
319   /* set port */
320   tmp_rtcp = tmp_rtp + 1;
321   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
322
323   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
324   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
325   if (ret == GST_STATE_CHANGE_FAILURE) {
326
327     if (++count > 20)
328       goto no_ports;
329
330     gst_element_set_state (udpsrc0, GST_STATE_NULL);
331     gst_object_unref (udpsrc0);
332
333     gst_element_set_state (udpsrc1, GST_STATE_NULL);
334     gst_object_unref (udpsrc1);
335
336     tmp_rtp += 2;
337     goto again;
338   }
339   /* all fine, do port check */
340   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
341   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
342
343   /* this should not happen... */
344   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
345     goto port_error;
346
347   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
348   if (!udpsink0)
349     goto no_udp_protocol;
350
351   g_object_get (G_OBJECT (udpsrc0), "used-socket", &socket, NULL);
352   g_object_set (G_OBJECT (udpsink0), "socket", socket, NULL);
353   g_object_unref (socket);
354   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
355
356   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
357   if (!udpsink1)
358     goto no_udp_protocol;
359
360   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
361           "send-duplicates")) {
362     g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
363     g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
364   } else {
365     g_warning
366         ("old multiudpsink version found without send-duplicates property");
367   }
368
369   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
370           "buffer-size")) {
371     g_object_set (G_OBJECT (udpsink0), "buffer-size", stream->buffer_size,
372         NULL);
373   } else {
374     GST_WARNING ("multiudpsink version found without buffer-size property");
375   }
376
377   g_object_get (G_OBJECT (udpsrc1), "used-socket", &socket, NULL);
378   g_object_set (G_OBJECT (udpsink1), "socket", socket, NULL);
379   g_object_unref (socket);
380   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
381   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
382   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
383   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
384   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
385   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
386   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
387
388   /* we keep these elements, we will further configure them when the
389    * client told us to really use the UDP ports. */
390   stream->udpsrc[0] = udpsrc0;
391   stream->udpsrc[1] = udpsrc1;
392   stream->udpsink[0] = udpsink0;
393   stream->udpsink[1] = udpsink1;
394   stream->server_port.min = rtpport;
395   stream->server_port.max = rtcpport;
396
397   return TRUE;
398
399   /* ERRORS */
400 no_udp_protocol:
401   {
402     goto cleanup;
403   }
404 no_ports:
405   {
406     goto cleanup;
407   }
408 no_udp_rtcp_protocol:
409   {
410     goto cleanup;
411   }
412 port_error:
413   {
414     goto cleanup;
415   }
416 cleanup:
417   {
418     if (udpsrc0) {
419       gst_element_set_state (udpsrc0, GST_STATE_NULL);
420       gst_object_unref (udpsrc0);
421     }
422     if (udpsrc1) {
423       gst_element_set_state (udpsrc1, GST_STATE_NULL);
424       gst_object_unref (udpsrc1);
425     }
426     if (udpsink0) {
427       gst_element_set_state (udpsink0, GST_STATE_NULL);
428       gst_object_unref (udpsink0);
429     }
430     if (udpsink1) {
431       gst_element_set_state (udpsink1, GST_STATE_NULL);
432       gst_object_unref (udpsink1);
433     }
434     return FALSE;
435   }
436 }
437
438 /* executed from streaming thread */
439 static void
440 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
441 {
442   GstCaps *newcaps, *oldcaps;
443
444   newcaps = gst_pad_get_current_caps (pad);
445
446   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
447       newcaps);
448
449   g_mutex_lock (&stream->lock);
450   oldcaps = stream->caps;
451   stream->caps = newcaps;
452   g_mutex_unlock (&stream->lock);
453
454   if (oldcaps)
455     gst_caps_unref (oldcaps);
456 }
457
458 static void
459 dump_structure (const GstStructure * s)
460 {
461   gchar *sstr;
462
463   sstr = gst_structure_to_string (s);
464   GST_INFO ("structure: %s", sstr);
465   g_free (sstr);
466 }
467
468 static GstRTSPStreamTransport *
469 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
470 {
471   GList *walk;
472   GstRTSPStreamTransport *result = NULL;
473   const gchar *tmp;
474   gchar *dest;
475   guint port;
476
477   if (rtcp_from == NULL)
478     return NULL;
479
480   tmp = g_strrstr (rtcp_from, ":");
481   if (tmp == NULL)
482     return NULL;
483
484   port = atoi (tmp + 1);
485   dest = g_strndup (rtcp_from, tmp - rtcp_from);
486
487   g_mutex_lock (&stream->lock);
488   GST_INFO ("finding %s:%d in %d transports", dest, port,
489       g_list_length (stream->transports));
490
491   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
492     GstRTSPStreamTransport *trans = walk->data;
493     gint min, max;
494
495     min = trans->transport->client_port.min;
496     max = trans->transport->client_port.max;
497
498     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
499             || max == port)) {
500       result = trans;
501       break;
502     }
503   }
504   g_mutex_unlock (&stream->lock);
505
506   g_free (dest);
507
508   return result;
509 }
510
511 static GstRTSPStreamTransport *
512 check_transport (GObject * source, GstRTSPStream * stream)
513 {
514   GstStructure *stats;
515   GstRTSPStreamTransport *trans;
516
517   /* see if we have a stream to match with the origin of the RTCP packet */
518   trans = g_object_get_qdata (source, ssrc_stream_map_key);
519   if (trans == NULL) {
520     g_object_get (source, "stats", &stats, NULL);
521     if (stats) {
522       const gchar *rtcp_from;
523
524       dump_structure (stats);
525
526       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
527       if ((trans = find_transport (stream, rtcp_from))) {
528         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
529             source);
530
531         /* keep ref to the source */
532         trans->rtpsource = source;
533
534         g_object_set_qdata (source, ssrc_stream_map_key, trans);
535       }
536       gst_structure_free (stats);
537     }
538   }
539
540   return trans;
541 }
542
543
544 static void
545 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
546 {
547   GstRTSPStreamTransport *trans;
548
549   GST_INFO ("%p: new source %p", stream, source);
550
551   trans = check_transport (source, stream);
552
553   if (trans)
554     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
555 }
556
557 static void
558 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
559 {
560   GST_INFO ("%p: new SDES %p", stream, source);
561 }
562
563 static void
564 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
565 {
566   GstRTSPStreamTransport *trans;
567
568   trans = check_transport (source, stream);
569
570   if (trans) {
571     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
572     gst_rtsp_stream_transport_keep_alive (trans);
573   }
574 #ifdef DUMP_STATS
575   {
576     GstStructure *stats;
577     g_object_get (source, "stats", &stats, NULL);
578     if (stats) {
579       dump_structure (stats);
580       gst_structure_free (stats);
581     }
582   }
583 #endif
584 }
585
586 static void
587 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
588 {
589   GST_INFO ("%p: source %p bye", stream, source);
590 }
591
592 static void
593 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
594 {
595   GstRTSPStreamTransport *trans;
596
597   GST_INFO ("%p: source %p bye timeout", stream, source);
598
599   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
600     trans->rtpsource = NULL;
601     trans->timeout = TRUE;
602   }
603 }
604
605 static void
606 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
607 {
608   GstRTSPStreamTransport *trans;
609
610   GST_INFO ("%p: source %p timeout", stream, source);
611
612   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
613     trans->rtpsource = NULL;
614     trans->timeout = TRUE;
615   }
616 }
617
618 static GstFlowReturn
619 handle_new_sample (GstAppSink * sink, gpointer user_data)
620 {
621   GList *walk;
622   GstSample *sample;
623   GstBuffer *buffer;
624   GstRTSPStream *stream;
625
626   sample = gst_app_sink_pull_sample (sink);
627   if (!sample)
628     return GST_FLOW_OK;
629
630   stream = (GstRTSPStream *) user_data;
631   buffer = gst_sample_get_buffer (sample);
632
633   g_mutex_lock (&stream->lock);
634   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
635     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
636
637     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
638       gst_rtsp_stream_transport_send_rtp (tr, buffer);
639     } else {
640       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
641     }
642   }
643   g_mutex_unlock (&stream->lock);
644
645   gst_sample_unref (sample);
646
647   return GST_FLOW_OK;
648 }
649
650 static GstAppSinkCallbacks sink_cb = {
651   NULL,                         /* not interested in EOS */
652   NULL,                         /* not interested in preroll samples */
653   handle_new_sample,
654 };
655
656 /**
657  * gst_rtsp_stream_join_bin:
658  * @stream: a #GstRTSPStream
659  * @bin: a #GstBin to join
660  * @rtpbin: a rtpbin element in @bin
661  * @state: the target state of the new elements
662  *
663  * Join the #Gstbin @bin that contains the element @rtpbin.
664  *
665  * @stream will link to @rtpbin, which must be inside @bin. The elements
666  * added to @bin will be set to the state given in @state.
667  *
668  * Returns: %TRUE on success.
669  */
670 gboolean
671 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
672     GstElement * rtpbin, GstState state)
673 {
674   gint i, idx;
675   gchar *name;
676   GstPad *pad, *teepad, *queuepad, *selpad;
677   GstPadLinkReturn ret;
678
679   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
680   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
681   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
682
683   g_mutex_lock (&stream->lock);
684   if (stream->is_joined)
685     goto was_joined;
686
687   /* create a session with the same index as the stream */
688   idx = stream->idx;
689
690   GST_INFO ("stream %p joining bin as session %d", stream, idx);
691
692   if (!alloc_ports (stream))
693     goto no_ports;
694
695   /* get a pad for sending RTP */
696   name = g_strdup_printf ("send_rtp_sink_%u", idx);
697   stream->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
698   g_free (name);
699   /* link the RTP pad to the session manager, it should not really fail unless
700    * this is not really an RTP pad */
701   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
702   if (ret != GST_PAD_LINK_OK)
703     goto link_failed;
704
705   /* get pads from the RTP session element for sending and receiving
706    * RTP/RTCP*/
707   name = g_strdup_printf ("send_rtp_src_%u", idx);
708   stream->send_src[0] = gst_element_get_static_pad (rtpbin, name);
709   g_free (name);
710   name = g_strdup_printf ("send_rtcp_src_%u", idx);
711   stream->send_src[1] = gst_element_get_request_pad (rtpbin, name);
712   g_free (name);
713   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
714   stream->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
715   g_free (name);
716   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
717   stream->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
718   g_free (name);
719
720   /* get the session */
721   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &stream->session);
722
723   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
724       stream);
725   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
726       stream);
727   g_signal_connect (stream->session, "on-ssrc-active",
728       (GCallback) on_ssrc_active, stream);
729   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
730       stream);
731   g_signal_connect (stream->session, "on-bye-timeout",
732       (GCallback) on_bye_timeout, stream);
733   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
734       stream);
735
736   for (i = 0; i < 2; i++) {
737     /* For the sender we create this bit of pipeline for both
738      * RTP and RTCP. Sync and preroll are enabled on udpsink so
739      * we need to add a queue before appsink to make the pipeline
740      * not block. For the TCP case, we want to pump data to the
741      * client as fast as possible anyway.
742      *
743      * .--------.      .-----.    .---------.
744      * | rtpbin |      | tee |    | udpsink |
745      * |       send->sink   src->sink       |
746      * '--------'      |     |    '---------'
747      *                 |     |    .---------.    .---------.
748      *                 |     |    |  queue  |    | appsink |
749      *                 |    src->sink      src->sink       |
750      *                 '-----'    '---------'    '---------'
751      */
752     /* make tee for RTP/RTCP */
753     stream->tee[i] = gst_element_factory_make ("tee", NULL);
754     gst_bin_add (bin, stream->tee[i]);
755
756     /* and link to rtpbin send pad */
757     pad = gst_element_get_static_pad (stream->tee[i], "sink");
758     gst_pad_link (stream->send_src[i], pad);
759     gst_object_unref (pad);
760
761     /* add udpsink */
762     gst_bin_add (bin, stream->udpsink[i]);
763
764     /* link tee to udpsink */
765     teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
766     pad = gst_element_get_static_pad (stream->udpsink[i], "sink");
767     gst_pad_link (teepad, pad);
768     gst_object_unref (pad);
769     gst_object_unref (teepad);
770
771     /* make queue */
772     stream->appqueue[i] = gst_element_factory_make ("queue", NULL);
773     gst_bin_add (bin, stream->appqueue[i]);
774     /* and link to tee */
775     teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
776     pad = gst_element_get_static_pad (stream->appqueue[i], "sink");
777     gst_pad_link (teepad, pad);
778     gst_object_unref (pad);
779     gst_object_unref (teepad);
780
781     /* make appsink */
782     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
783     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
784     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
785     gst_bin_add (bin, stream->appsink[i]);
786     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
787         &sink_cb, stream, NULL);
788     /* and link to queue */
789     queuepad = gst_element_get_static_pad (stream->appqueue[i], "src");
790     pad = gst_element_get_static_pad (stream->appsink[i], "sink");
791     gst_pad_link (queuepad, pad);
792     gst_object_unref (pad);
793     gst_object_unref (queuepad);
794
795     /* For the receiver we create this bit of pipeline for both
796      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
797      * and it is all funneled into the rtpbin receive pad.
798      *
799      * .--------.     .--------.    .--------.
800      * | udpsrc |     | funnel |    | rtpbin |
801      * |       src->sink      src->sink      |
802      * '--------'     |        |    '--------'
803      * .--------.     |        |
804      * | appsrc |     |        |
805      * |       src->sink       |
806      * '--------'     '--------'
807      */
808     /* make funnel for the RTP/RTCP receivers */
809     stream->funnel[i] = gst_element_factory_make ("funnel", NULL);
810     gst_bin_add (bin, stream->funnel[i]);
811
812     pad = gst_element_get_static_pad (stream->funnel[i], "src");
813     gst_pad_link (pad, stream->recv_sink[i]);
814     gst_object_unref (pad);
815
816     /* add udpsrc */
817     gst_bin_add (bin, stream->udpsrc[i]);
818     /* and link to the funnel */
819     selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
820     pad = gst_element_get_static_pad (stream->udpsrc[i], "src");
821     gst_pad_link (pad, selpad);
822     gst_object_unref (pad);
823     gst_object_unref (selpad);
824
825     /* make and add appsrc */
826     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
827     gst_bin_add (bin, stream->appsrc[i]);
828     /* and link to the funnel */
829     selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
830     pad = gst_element_get_static_pad (stream->appsrc[i], "src");
831     gst_pad_link (pad, selpad);
832     gst_object_unref (pad);
833     gst_object_unref (selpad);
834
835     /* check if we need to set to a special state */
836     if (state != GST_STATE_NULL) {
837       gst_element_set_state (stream->udpsink[i], state);
838       gst_element_set_state (stream->appsink[i], state);
839       gst_element_set_state (stream->appqueue[i], state);
840       gst_element_set_state (stream->tee[i], state);
841       gst_element_set_state (stream->funnel[i], state);
842       gst_element_set_state (stream->appsrc[i], state);
843     }
844     /* we set and keep these to playing so that they don't cause NO_PREROLL return
845      * values */
846     gst_element_set_state (stream->udpsrc[i], GST_STATE_PLAYING);
847     gst_element_set_locked_state (stream->udpsrc[i], TRUE);
848   }
849
850   /* be notified of caps changes */
851   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
852       (GCallback) caps_notify, stream);
853
854   stream->is_joined = TRUE;
855   g_mutex_unlock (&stream->lock);
856
857   return TRUE;
858
859   /* ERRORS */
860 was_joined:
861   {
862     g_mutex_unlock (&stream->lock);
863     return TRUE;
864   }
865 no_ports:
866   {
867     g_mutex_unlock (&stream->lock);
868     GST_WARNING ("failed to allocate ports %d", idx);
869     return FALSE;
870   }
871 link_failed:
872   {
873     GST_WARNING ("failed to link stream %d", idx);
874     gst_object_unref (stream->send_rtp_sink);
875     stream->send_rtp_sink = NULL;
876     g_mutex_unlock (&stream->lock);
877     return FALSE;
878   }
879 }
880
881 /**
882  * gst_rtsp_stream_leave_bin:
883  * @stream: a #GstRTSPStream
884  * @bin: a #GstBin
885  * @rtpbin: a rtpbin #GstElement
886  *
887  * Remove the elements of @stream from @bin. @bin must be set
888  * to the NULL state before calling this.
889  *
890  * Return: %TRUE on success.
891  */
892 gboolean
893 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
894     GstElement * rtpbin)
895 {
896   gint i;
897
898   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
899   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
900   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
901
902   g_mutex_lock (&stream->lock);
903   if (!stream->is_joined)
904     goto was_not_joined;
905
906   /* all transports must be removed by now */
907   g_return_val_if_fail (stream->transports == NULL, FALSE);
908
909   GST_INFO ("stream %p leaving bin", stream);
910
911   gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
912   g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
913   gst_element_release_request_pad (rtpbin, stream->send_rtp_sink);
914   gst_object_unref (stream->send_rtp_sink);
915   stream->send_rtp_sink = NULL;
916
917   for (i = 0; i < 2; i++) {
918     /* and set udpsrc to NULL now before removing */
919     gst_element_set_locked_state (stream->udpsrc[i], FALSE);
920     gst_element_set_state (stream->udpsrc[i], GST_STATE_NULL);
921
922     /* removing them should also nicely release the request
923      * pads when they finalize */
924     gst_bin_remove (bin, stream->udpsrc[i]);
925     gst_bin_remove (bin, stream->udpsink[i]);
926     gst_bin_remove (bin, stream->appsrc[i]);
927     gst_bin_remove (bin, stream->appsink[i]);
928     gst_bin_remove (bin, stream->appqueue[i]);
929     gst_bin_remove (bin, stream->tee[i]);
930     gst_bin_remove (bin, stream->funnel[i]);
931
932     gst_element_release_request_pad (rtpbin, stream->recv_sink[i]);
933     gst_object_unref (stream->recv_sink[i]);
934     stream->recv_sink[i] = NULL;
935
936     stream->udpsrc[i] = NULL;
937     stream->udpsink[i] = NULL;
938     stream->appsrc[i] = NULL;
939     stream->appsink[i] = NULL;
940     stream->appqueue[i] = NULL;
941     stream->tee[i] = NULL;
942     stream->funnel[i] = NULL;
943   }
944   gst_object_unref (stream->send_src[0]);
945   stream->send_src[0] = NULL;
946
947   gst_element_release_request_pad (rtpbin, stream->send_src[1]);
948   gst_object_unref (stream->send_src[1]);
949   stream->send_src[1] = NULL;
950
951   g_object_unref (stream->session);
952   if (stream->caps)
953     gst_caps_unref (stream->caps);
954
955   stream->is_joined = FALSE;
956   g_mutex_unlock (&stream->lock);
957
958   return TRUE;
959
960 was_not_joined:
961   {
962     return TRUE;
963   }
964 }
965
966 /**
967  * gst_rtsp_stream_get_rtpinfo:
968  * @stream: a #GstRTSPStream
969  * @rtptime: result RTP timestamp
970  * @seq: result RTP seqnum
971  *
972  * Retrieve the current rtptime and seq. This is used to
973  * construct a RTPInfo reply header.
974  *
975  * Returns: %TRUE when rtptime and seq could be determined.
976  */
977 gboolean
978 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
979     guint * rtptime, guint * seq)
980 {
981   GObjectClass *payobjclass;
982
983   payobjclass = G_OBJECT_GET_CLASS (stream->payloader);
984
985   if (!g_object_class_find_property (payobjclass, "seqnum") ||
986       !g_object_class_find_property (payobjclass, "timestamp"))
987     return FALSE;
988
989   g_object_get (stream->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
990
991   return TRUE;
992 }
993
994 /**
995  * gst_rtsp_stream_recv_rtp:
996  * @stream: a #GstRTSPStream
997  * @buffer: (transfer full): a #GstBuffer
998  *
999  * Handle an RTP buffer for the stream. This method is usually called when a
1000  * message has been received from a client using the TCP transport.
1001  *
1002  * This function takes ownership of @buffer.
1003  *
1004  * Returns: a GstFlowReturn.
1005  */
1006 GstFlowReturn
1007 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1008 {
1009   GstFlowReturn ret;
1010   GstElement *element;
1011
1012   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1013   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1014   g_return_val_if_fail (stream->is_joined, FALSE);
1015
1016   g_mutex_lock (&stream->lock);
1017   element = gst_object_ref (stream->appsrc[0]);
1018   g_mutex_unlock (&stream->lock);
1019
1020   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1021
1022   gst_object_unref (element);
1023
1024   return ret;
1025 }
1026
1027 /**
1028  * gst_rtsp_stream_recv_rtcp:
1029  * @stream: a #GstRTSPStream
1030  * @buffer: (transfer full): a #GstBuffer
1031  *
1032  * Handle an RTCP buffer for the stream. This method is usually called when a
1033  * message has been received from a client using the TCP transport.
1034  *
1035  * This function takes ownership of @buffer.
1036  *
1037  * Returns: a GstFlowReturn.
1038  */
1039 GstFlowReturn
1040 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
1041 {
1042   GstFlowReturn ret;
1043   GstElement *element;
1044
1045   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1046   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1047   g_return_val_if_fail (stream->is_joined, FALSE);
1048
1049   g_mutex_lock (&stream->lock);
1050   element = gst_object_ref (stream->appsrc[1]);
1051   g_mutex_unlock (&stream->lock);
1052
1053   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1054
1055   gst_object_unref (element);
1056
1057   return ret;
1058 }
1059
1060 /* must be called with lock */
1061 static gboolean
1062 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
1063     gboolean add)
1064 {
1065   GstRTSPTransport *tr;
1066   gboolean updated;
1067
1068   updated = FALSE;
1069
1070   tr = trans->transport;
1071
1072   switch (tr->lower_transport) {
1073     case GST_RTSP_LOWER_TRANS_UDP:
1074     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1075     {
1076       gchar *dest;
1077       gint min, max;
1078       guint ttl = 0;
1079
1080       dest = tr->destination;
1081       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1082         min = tr->port.min;
1083         max = tr->port.max;
1084         ttl = tr->ttl;
1085       } else {
1086         min = tr->client_port.min;
1087         max = tr->client_port.max;
1088       }
1089
1090       if (add && !trans->active) {
1091         GST_INFO ("adding %s:%d-%d", dest, min, max);
1092         g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1093         g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1094         if (ttl > 0) {
1095           GST_INFO ("setting ttl-mc %d", ttl);
1096           g_object_set (G_OBJECT (stream->udpsink[0]), "ttl-mc", ttl, NULL);
1097           g_object_set (G_OBJECT (stream->udpsink[1]), "ttl-mc", ttl, NULL);
1098         }
1099         stream->transports = g_list_prepend (stream->transports, trans);
1100         trans->active = TRUE;
1101         updated = TRUE;
1102       } else if (trans->active) {
1103         GST_INFO ("removing %s:%d-%d", dest, min, max);
1104         g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1105         g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1106         stream->transports = g_list_remove (stream->transports, trans);
1107         trans->active = FALSE;
1108         updated = TRUE;
1109       }
1110       break;
1111     }
1112     case GST_RTSP_LOWER_TRANS_TCP:
1113       if (add && !trans->active) {
1114         GST_INFO ("adding TCP %s", tr->destination);
1115         stream->transports = g_list_prepend (stream->transports, trans);
1116         trans->active = TRUE;
1117         updated = TRUE;
1118       } else if (trans->active) {
1119         GST_INFO ("removing TCP %s", tr->destination);
1120         stream->transports = g_list_remove (stream->transports, trans);
1121         trans->active = FALSE;
1122         updated = TRUE;
1123       }
1124       break;
1125     default:
1126       GST_INFO ("Unknown transport %d", tr->lower_transport);
1127       break;
1128   }
1129   return updated;
1130 }
1131
1132
1133 /**
1134  * gst_rtsp_stream_add_transport:
1135  * @stream: a #GstRTSPStream
1136  * @trans: a #GstRTSPStreamTransport
1137  *
1138  * Add the transport in @trans to @stream. The media of @stream will
1139  * then also be send to the values configured in @trans.
1140  *
1141  * @stream must be joined to a bin.
1142  *
1143  * @trans must contain a valid #GstRTSPTransport.
1144  *
1145  * Returns: %TRUE if @trans was added
1146  */
1147 gboolean
1148 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1149     GstRTSPStreamTransport * trans)
1150 {
1151   gboolean res;
1152
1153   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1154   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1155   g_return_val_if_fail (stream->is_joined, FALSE);
1156   g_return_val_if_fail (trans->transport != NULL, FALSE);
1157
1158   g_mutex_lock (&stream->lock);
1159   res = update_transport (stream, trans, TRUE);
1160   g_mutex_unlock (&stream->lock);
1161
1162   return res;
1163 }
1164
1165 /**
1166  * gst_rtsp_stream_remove_transport:
1167  * @stream: a #GstRTSPStream
1168  * @trans: a #GstRTSPStreamTransport
1169  *
1170  * Remove the transport in @trans from @stream. The media of @stream will
1171  * not be sent to the values configured in @trans.
1172  *
1173  * @stream must be joined to a bin.
1174  *
1175  * @trans must contain a valid #GstRTSPTransport.
1176  *
1177  * Returns: %TRUE if @trans was removed
1178  */
1179 gboolean
1180 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
1181     GstRTSPStreamTransport * trans)
1182 {
1183   gboolean res;
1184
1185   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1186   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1187   g_return_val_if_fail (stream->is_joined, FALSE);
1188   g_return_val_if_fail (trans->transport != NULL, FALSE);
1189
1190   g_mutex_lock (&stream->lock);
1191   res = update_transport (stream, trans, FALSE);
1192   g_mutex_unlock (&stream->lock);
1193
1194   return res;
1195 }