rtsp-stream: Fix crash on cleanup with shared media and multiple udpsrc
authorJake Foytik <jake.foytik@ipconfigure.com>
Mon, 25 Apr 2016 12:55:25 +0000 (08:55 -0400)
committerSebastian Dröge <sebastian@centricular.com>
Fri, 29 Apr 2016 08:49:14 +0000 (11:49 +0300)
 - Unicast udpsrcs are now managed in a hash table. This allows for proper cleanup in with shared streams and fixes a memory leak.
 - Unicast udpsrcs are now properly cleaned up when shared connections exit. See the update_transport() function.
 - Create unit test for shared media.

https://bugzilla.gnome.org/show_bug.cgi?id=764744

gst/rtsp-server/rtsp-stream.c
tests/check/gst/rtspserver.c
tests/check/gst/stream.c

index 810ee1c..27ad5ac 100644 (file)
 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
 
+/* Container for udpsrc elements created for a specific RTSPTransport. */
+typedef struct
+{
+  GstElement *udpsrc[2];
+} GstRTSPStreamUDPSrcs;
+
+static void
+destroy_udp_srcs_func (gpointer data)
+{
+  g_slice_free (GstRTSPStreamUDPSrcs, (GstRTSPStreamUDPSrcs *) data);
+}
+
 struct _GstRTSPStreamPrivate
 {
   GMutex lock;
@@ -95,16 +107,11 @@ struct _GstRTSPStreamPrivate
   GstElement *srtpdec;
   GHashTable *keys;
 
-  /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
-   * sockets */
-  GstElement *udpsrc_v4[2];
-  /* UDP sources for UDP multicast transports */
-  GstElement *udpsrc_mcast_v4[2];
+  /* Unicast UDP sources associated with RTSPTransports */
+  GHashTable *udpsrcs;
 
-  /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
-   * sockets */
-  GstElement *udpsrc_v6[2];
-  /* UDP sources for UDP multicast transports */
+  /* Only allow one set of IPV4 and IPV6 multicast udpsrcs */
+  GstElement *udpsrc_mcast_v4[2];
   GstElement *udpsrc_mcast_v6[2];
 
   GstElement *udpqueue[2];
@@ -127,12 +134,10 @@ struct _GstRTSPStreamPrivate
   /* server ports for sending/receiving over ipv4 */
   GstRTSPRange server_port_v4;
   GstRTSPAddress *server_addr_v4;
-  gboolean have_ipv4;
 
   /* server ports for sending/receiving over ipv6 */
   GstRTSPRange server_port_v6;
   GstRTSPAddress *server_addr_v6;
-  gboolean have_ipv6;
 
   /* multicast addresses */
   GstRTSPAddressPool *pool;
@@ -270,6 +275,8 @@ gst_rtsp_stream_init (GstRTSPStream * stream)
       NULL, (GDestroyNotify) gst_caps_unref);
   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
       (GDestroyNotify) gst_caps_unref);
+  priv->udpsrcs = g_hash_table_new_full (g_direct_hash, g_direct_equal,
+      NULL, (GDestroyNotify) destroy_udp_srcs_func);
 }
 
 static void
@@ -312,6 +319,11 @@ gst_rtsp_stream_finalize (GObject * obj)
   g_hash_table_unref (priv->keys);
   g_hash_table_destroy (priv->ptmap);
 
+  /* We expect all udpsrcs to be cleaned up by this point. */
+  if (g_hash_table_size (priv->udpsrcs) > 0)
+    g_critical ("Unreffing udpsrcs hash table that contains elements.");
+  g_hash_table_unref (priv->udpsrcs);
+
   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
 }
 
@@ -1493,42 +1505,63 @@ gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
 
   g_mutex_lock (&priv->lock);
 
-  if (family == G_SOCKET_FAMILY_IPV4) {
-    if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
-      if (priv->have_ipv4_mcast)
+  if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
+    if (family == G_SOCKET_FAMILY_IPV4) {
+      /* Multicast IPV4 */
+      if (priv->have_ipv4_mcast) {
+        result = TRUE;
         goto done;
+      }
+
       priv->have_ipv4_mcast =
           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
           priv->udpsrc_mcast_v4, &priv->server_port_v4, ct, &priv->addr_v4,
           use_client_settings);
+      result = priv->have_ipv4_mcast;
+
     } else {
-      priv->have_ipv4 =
-          alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4,
-          &priv->server_port_v4, ct, &priv->server_addr_v4,
-          use_client_settings);
-    }
-  } else {
-    if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
-      if (priv->have_ipv6_mcast)
+      /* Multicast IPV6 */
+      if (priv->have_ipv6_mcast) {
+        result = TRUE;
         goto done;
+      }
+
       priv->have_ipv6_mcast =
           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
           priv->udpsrc_mcast_v6, &priv->server_port_v6, ct, &priv->addr_v6,
           use_client_settings);
+      result = priv->have_ipv6_mcast;
+    }
+  } else {
+    /* We allow multiple unicast transports, so we must maintain a table of the 
+     * udpsrcs created for them. */
+    GstRTSPStreamUDPSrcs *transport_udpsrcs =
+        g_slice_new0 (GstRTSPStreamUDPSrcs);
+
+    if (family == G_SOCKET_FAMILY_IPV4) {
+      /* Unicast IPV4 */
+      result =
+          alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
+          transport_udpsrcs->udpsrc, &priv->server_port_v4, ct,
+          &priv->server_addr_v4, use_client_settings);
     } else {
-      if (priv->have_ipv6)
-        goto done;
-      priv->have_ipv6 =
-          alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6,
-          &priv->server_port_v6, ct, &priv->server_addr_v6,
-          use_client_settings);
+      /* Unicast IPV6 */
+      result =
+          alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
+          transport_udpsrcs->udpsrc, &priv->server_port_v6, ct,
+          &priv->server_addr_v6, use_client_settings);
     }
+
+    /* If we didn't create any unicast udpsrcs, free the transport_udpsrcs struct. 
+     * Otherwise, add it to the hash table */
+    if (transport_udpsrcs->udpsrc[0] == NULL
+        && transport_udpsrcs->udpsrc[1] == NULL)
+      g_slice_free (GstRTSPStreamUDPSrcs, transport_udpsrcs);
+    else
+      g_hash_table_insert (priv->udpsrcs, ct, transport_udpsrcs);
   }
 
 done:
-  result = priv->have_ipv4 || priv->have_ipv4_mcast || priv->have_ipv6 ||
-      priv->have_ipv6_mcast;
-
   g_mutex_unlock (&priv->lock);
 
   return result;
@@ -2586,39 +2619,6 @@ create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
       gst_pad_link (pad, priv->recv_sink[i]);
       gst_object_unref (pad);
 
-      if (priv->udpsrc_v4[i]) {
-        if (priv->srcpad) {
-          /* we set and keep these to playing so that they don't cause NO_PREROLL return
-           * values. This is only relevant for PLAY pipelines */
-          gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
-          gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
-        }
-        /* add udpsrc */
-        gst_bin_add (bin, priv->udpsrc_v4[i]);
-
-        /* and link to the funnel v4 */
-        selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
-        pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
-        gst_pad_link (pad, selpad);
-        gst_object_unref (pad);
-        gst_object_unref (selpad);
-      }
-
-      if (priv->udpsrc_v6[i]) {
-        if (priv->srcpad) {
-          gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
-          gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
-        }
-        gst_bin_add (bin, priv->udpsrc_v6[i]);
-
-        /* and link to the funnel v6 */
-        selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
-        pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
-        gst_pad_link (pad, selpad);
-        gst_object_unref (pad);
-        gst_object_unref (selpad);
-      }
-
       if (is_tcp) {
         /* make and add appsrc */
         priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
@@ -2808,51 +2808,50 @@ no_udp_protocol:
       gst_element_set_state (priv->udpsink[0], GST_STATE_NULL);
     if (priv->udpsink[1])
       gst_element_set_state (priv->udpsink[1], GST_STATE_NULL);
-    if (priv->udpsrc_v4[0]) {
-      gst_element_set_state (priv->udpsrc_v4[0], GST_STATE_NULL);
-      gst_object_unref (priv->udpsrc_v4[0]);
-      priv->udpsrc_v4[0] = NULL;
-    }
-    if (priv->udpsrc_v4[1]) {
-      gst_element_set_state (priv->udpsrc_v4[1], GST_STATE_NULL);
-      gst_object_unref (priv->udpsrc_v4[1]);
-      priv->udpsrc_v4[1] = NULL;
-    }
-    if (priv->udpsrc_mcast_v4[0]) {
-      gst_element_set_state (priv->udpsrc_mcast_v4[0], GST_STATE_NULL);
-      gst_object_unref (priv->udpsrc_mcast_v4[0]);
-      priv->udpsrc_mcast_v4[0] = NULL;
-    }
-    if (priv->udpsrc_mcast_v4[1]) {
-      gst_element_set_state (priv->udpsrc_mcast_v4[1], GST_STATE_NULL);
-      gst_object_unref (priv->udpsrc_mcast_v4[1]);
-      priv->udpsrc_mcast_v4[1] = NULL;
-    }
-    if (priv->udpsrc_v6[0]) {
-      gst_element_set_state (priv->udpsrc_v6[0], GST_STATE_NULL);
-      gst_object_unref (priv->udpsrc_v6[0]);
-      priv->udpsrc_v6[0] = NULL;
-    }
-    if (priv->udpsrc_v6[1]) {
-      gst_element_set_state (priv->udpsrc_v6[1], GST_STATE_NULL);
-      gst_object_unref (priv->udpsrc_v6[1]);
-      priv->udpsrc_v6[1] = NULL;
-    }
-    if (priv->udpsrc_mcast_v6[0]) {
-      gst_element_set_state (priv->udpsrc_mcast_v6[0], GST_STATE_NULL);
-      gst_object_unref (priv->udpsrc_mcast_v6[0]);
-      priv->udpsrc_mcast_v6[0] = NULL;
-    }
-    if (priv->udpsrc_mcast_v6[1]) {
-      gst_element_set_state (priv->udpsrc_mcast_v6[1], GST_STATE_NULL);
-      gst_object_unref (priv->udpsrc_mcast_v6[1]);
-      priv->udpsrc_mcast_v6[1] = NULL;
-    }
+
     g_mutex_unlock (&priv->lock);
     return FALSE;
   }
 }
 
+/* Must be called with priv->lock. */
+static void
+remove_all_unicast_udpsrcs (GstRTSPStream * stream, GstBin * bin)
+{
+  GstRTSPStreamPrivate *priv;
+  GHashTableIter iter;
+  gpointer iter_key, iter_value;
+
+  priv = stream->priv;
+
+  /* Remove all of the unicast udpsrcs */
+  g_hash_table_iter_init (&iter, priv->udpsrcs);
+  while (g_hash_table_iter_next (&iter, &iter_key, &iter_value)) {
+    GstRTSPStreamUDPSrcs *transport_udpsrcs =
+        (GstRTSPStreamUDPSrcs *) iter_value;
+
+    for (int i = 0; i < 2; i++) {
+      if (transport_udpsrcs->udpsrc[i]) {
+        if (priv->sinkpad || i == 1) {
+          /* Set udpsrc to NULL now before removing */
+          gst_element_set_locked_state (transport_udpsrcs->udpsrc[i], FALSE);
+          gst_element_set_state (transport_udpsrcs->udpsrc[i], GST_STATE_NULL);
+
+          /* removing them should also nicely release the request
+           * pads when they finalize */
+          gst_bin_remove (bin, transport_udpsrcs->udpsrc[i]);
+        } else {
+          /* we need to set the state to NULL before unref */
+          gst_element_set_state (transport_udpsrcs->udpsrc[i], GST_STATE_NULL);
+          gst_object_unref (transport_udpsrcs->udpsrc[i]);
+        }
+      }
+    }
+  }
+
+  g_hash_table_remove_all (priv->udpsrcs);
+}
+
 /**
  * gst_rtsp_stream_leave_bin:
  * @stream: a #GstRTSPStream
@@ -2910,6 +2909,7 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
 
+  remove_all_unicast_udpsrcs (stream, bin);
 
   for (i = 0; i < 2; i++) {
     if (priv->udpsink[i])
@@ -2927,21 +2927,6 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
     if (priv->appsrc[i])
       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
 
-    if (priv->udpsrc_v4[i]) {
-      if (priv->sinkpad || i == 1) {
-        /* and set udpsrc to NULL now before removing */
-        gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
-        gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
-        /* removing them should also nicely release the request
-         * pads when they finalize */
-        gst_bin_remove (bin, priv->udpsrc_v4[i]);
-      } else {
-        /* we need to set the state to NULL before unref */
-        gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
-        gst_object_unref (priv->udpsrc_v4[i]);
-      }
-    }
-
     if (priv->udpsrc_mcast_v4[i]) {
       if (priv->sinkpad || i == 1) {
         /* and set udpsrc to NULL now before removing */
@@ -2956,16 +2941,6 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
       }
     }
 
-    if (priv->udpsrc_v6[i]) {
-      if (priv->sinkpad || i == 1) {
-        gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
-        gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
-        gst_bin_remove (bin, priv->udpsrc_v6[i]);
-      } else {
-        gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
-        gst_object_unref (priv->udpsrc_v6[i]);
-      }
-    }
     if (priv->udpsrc_mcast_v6[i]) {
       if (priv->sinkpad || i == 1) {
         gst_element_set_locked_state (priv->udpsrc_mcast_v6[i], FALSE);
@@ -3006,8 +2981,6 @@ gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
       priv->recv_sink[i] = NULL;
     }
 
-    priv->udpsrc_v4[i] = NULL;
-    priv->udpsrc_v6[i] = NULL;
     priv->udpsrc_mcast_v4[i] = NULL;
     priv->udpsrc_mcast_v6[i] = NULL;
     priv->udpsink[i] = NULL;
@@ -3378,6 +3351,68 @@ gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
   return ret;
 }
 
+/* Properly dispose udpsrcs that were created for a given transport. */
+/* Must be called with priv->lock. */
+static void
+remove_transport_udpsrcs (GstRTSPStreamPrivate * priv,
+    const GstRTSPTransport * tr)
+{
+  /* Remove the udpsrcs associated with this transport. */
+  GstRTSPStreamUDPSrcs *transport_udpsrcs =
+      g_hash_table_lookup (priv->udpsrcs, tr);
+  if (transport_udpsrcs != NULL) {
+    for (int i = 0; i < 2; i++) {
+      if (transport_udpsrcs->udpsrc[i]) {
+        if (priv->sinkpad || i == 1) {
+          GstBin *bin;
+          GstPad *udpsrc_srcpad, *funnel_sinkpad;
+
+          /* We know these udpsrcs are all linked to funnels. Explicitely 
+           * get the funnel src pads so we can properly release them. */
+          udpsrc_srcpad =
+              gst_element_get_static_pad (transport_udpsrcs->udpsrc[i], "src");
+          funnel_sinkpad = gst_pad_get_peer (udpsrc_srcpad);
+
+          if (funnel_sinkpad != NULL) {
+            /* Unlink pads and release funnel's request pad. */
+            gst_pad_unlink (udpsrc_srcpad, funnel_sinkpad);
+            gst_element_release_request_pad (priv->funnel[i], funnel_sinkpad);
+            gst_object_unref (funnel_sinkpad);
+          }
+          gst_object_unref (udpsrc_srcpad);
+
+          /* Set udpsrc to NULL now before removing */
+          gst_element_set_locked_state (transport_udpsrcs->udpsrc[i], FALSE);
+          gst_element_set_state (transport_udpsrcs->udpsrc[i], GST_STATE_NULL);
+
+          /* This udpsrc is expected to be owned by a bin. Get the bin and 
+           * remove our element. */
+          bin = GST_BIN (gst_element_get_parent (transport_udpsrcs->udpsrc[i]));
+          if (bin != NULL) {
+            gst_bin_remove (bin, transport_udpsrcs->udpsrc[i]);
+            gst_object_unref (bin);
+          } else {
+            GST_ERROR ("Expected this udpsrc element to be part of a bin.");
+            gst_object_unref (transport_udpsrcs->udpsrc[i]);
+          }
+
+        } else {
+          /* we need to set the state to NULL before unref */
+          gst_element_set_state (transport_udpsrcs->udpsrc[i], GST_STATE_NULL);
+          gst_object_unref (transport_udpsrcs->udpsrc[i]);
+        }
+      }
+    }
+
+    /* The udpsrcs are now properly cleaned up. Remove them from the table */
+    g_hash_table_remove (priv->udpsrcs, tr);
+
+  } else {
+    /* This can happen if we're dealing with a multicast transport. */
+    GST_INFO ("Could not find udpsrcs associated with this transport.");
+  }
+}
+
 /* must be called with lock */
 static gboolean
 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
@@ -3426,6 +3461,8 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
         priv->transports = g_list_remove (priv->transports, trans);
+
+        remove_transport_udpsrcs (priv, tr);
       }
       priv->transports_cookie++;
       break;
index d21bf04..805b177 100644 (file)
@@ -149,7 +149,7 @@ get_client_ports (GstRTSPRange * range)
 
 /* start the tested rtsp server */
 static void
-start_server (void)
+start_server (gboolean set_shared_factory)
 {
   GstRTSPMountPoints *mounts;
   gchar *service;
@@ -172,6 +172,7 @@ start_server (void)
   gst_rtsp_address_pool_add_range (pool, GST_RTSP_ADDRESS_POOL_ANY_IPV4,
       GST_RTSP_ADDRESS_POOL_ANY_IPV4, 6000, 6010, 0);
   gst_rtsp_media_factory_set_address_pool (factory, pool);
+  gst_rtsp_media_factory_set_shared (factory, set_shared_factory);
   gst_object_unref (pool);
 
   /* set port to any */
@@ -571,7 +572,7 @@ GST_START_TEST (test_connect)
 {
   GstRTSPConnection *conn;
 
-  start_server ();
+  start_server (FALSE);
 
   /* connect to server */
   conn = connect_to_server (test_port, TEST_MOUNT_POINT);
@@ -597,7 +598,7 @@ GST_START_TEST (test_describe)
   const gchar *control_video;
   const gchar *control_audio;
 
-  start_server ();
+  start_server (FALSE);
 
   conn = connect_to_server (test_port, TEST_MOUNT_POINT);
 
@@ -668,7 +669,7 @@ GST_START_TEST (test_describe_non_existing_mount_point)
 {
   GstRTSPConnection *conn;
 
-  start_server ();
+  start_server (FALSE);
 
   /* send DESCRIBE request for a non-existing mount point
    * and check that we get a 404 Not Found */
@@ -697,7 +698,7 @@ do_test_setup (GstRTSPLowerTrans lower_transport)
   GstRTSPTransport *video_transport = NULL;
   GstRTSPTransport *audio_transport = NULL;
 
-  start_server ();
+  start_server (FALSE);
 
   conn = connect_to_server (test_port, TEST_MOUNT_POINT);
 
@@ -782,7 +783,7 @@ GST_START_TEST (test_setup_twice)
   gchar *session1 = NULL;
   gchar *session2 = NULL;
 
-  start_server ();
+  start_server (FALSE);
 
   conn = connect_to_server (test_port, TEST_MOUNT_POINT);
 
@@ -854,7 +855,7 @@ GST_START_TEST (test_setup_with_require_header)
   gchar *unsupported = NULL;
   GstRTSPTransport *video_transport = NULL;
 
-  start_server ();
+  start_server (FALSE);
 
   conn = connect_to_server (test_port, TEST_MOUNT_POINT);
 
@@ -916,7 +917,7 @@ GST_START_TEST (test_setup_non_existing_stream)
   GstRTSPConnection *conn;
   GstRTSPRange client_ports;
 
-  start_server ();
+  start_server (FALSE);
 
   conn = connect_to_server (test_port, TEST_MOUNT_POINT);
 
@@ -1009,7 +1010,8 @@ done:
 }
 
 static void
-do_test_play_full (const gchar * range, GstRTSPLowerTrans lower_transport)
+do_test_play_full (const gchar * range, GstRTSPLowerTrans lower_transport,
+    GMutex * lock)
 {
   GstRTSPConnection *conn;
   GstSDPMessage *sdp_message = NULL;
@@ -1051,8 +1053,20 @@ do_test_play_full (const gchar * range, GstRTSPLowerTrans lower_transport)
     fail_unless_equals_string (range, range_out);
   g_free (range_out);
 
-  receive_rtp (rtp_socket, NULL);
-  receive_rtcp (rtcp_socket, NULL, 0);
+  for (;;) {
+    receive_rtp (rtp_socket, NULL);
+    receive_rtcp (rtcp_socket, NULL, 0);
+
+    if (lock != NULL) {
+      if (g_mutex_trylock (lock) == TRUE) {
+        g_mutex_unlock (lock);
+        break;
+      }
+    } else {
+      break;
+    }
+
+  }
 
   /* send TEARDOWN request and check that we get 200 OK */
   fail_unless (do_simple_request (conn, GST_RTSP_TEARDOWN,
@@ -1076,12 +1090,12 @@ do_test_play_full (const gchar * range, GstRTSPLowerTrans lower_transport)
 static void
 do_test_play (const gchar * range)
 {
-  do_test_play_full (range, GST_RTSP_LOWER_TRANS_UDP);
+  do_test_play_full (range, GST_RTSP_LOWER_TRANS_UDP, NULL);
 }
 
 GST_START_TEST (test_play)
 {
-  start_server ();
+  start_server (FALSE);
 
   do_test_play (NULL);
 
@@ -1095,7 +1109,7 @@ GST_START_TEST (test_play_without_session)
 {
   GstRTSPConnection *conn;
 
-  start_server ();
+  start_server (FALSE);
 
   conn = connect_to_server (test_port, TEST_MOUNT_POINT);
 
@@ -1155,7 +1169,7 @@ GST_START_TEST (test_play_multithreaded)
   gst_rtsp_thread_pool_set_max_threads (pool, 2);
   g_object_unref (pool);
 
-  start_server ();
+  start_server (FALSE);
 
   do_test_play (NULL);
 
@@ -1215,7 +1229,7 @@ GST_START_TEST (test_play_multithreaded_block_in_describe)
   gst_rtsp_mount_points_add_factory (mounts, TEST_MOUNT_POINT "2", factory);
   g_object_unref (mounts);
 
-  start_server ();
+  start_server (FALSE);
 
   conn = connect_to_server (test_port, TEST_MOUNT_POINT "2");
   iterate ();
@@ -1294,7 +1308,7 @@ GST_START_TEST (test_play_multithreaded_timeout_client)
   g_signal_connect (server, "client-connected",
       G_CALLBACK (session_connected_new_session_cb), new_session_timeout_one);
 
-  start_server ();
+  start_server (FALSE);
 
 
   conn = connect_to_server (test_port, TEST_MOUNT_POINT);
@@ -1367,7 +1381,7 @@ GST_START_TEST (test_play_multithreaded_timeout_session)
   g_signal_connect (server, "client-connected",
       G_CALLBACK (session_connected_new_session_cb), new_session_timeout_one);
 
-  start_server ();
+  start_server (FALSE);
 
 
   conn = connect_to_server (test_port, TEST_MOUNT_POINT);
@@ -1443,7 +1457,7 @@ GST_START_TEST (test_play_disconnect)
   g_signal_connect (server, "client-connected",
       G_CALLBACK (session_connected_new_session_cb), new_session_timeout_one);
 
-  start_server ();
+  start_server (FALSE);
 
   conn = connect_to_server (test_port, TEST_MOUNT_POINT);
 
@@ -1515,7 +1529,8 @@ GST_START_TEST (test_play_specific_server_port)
   factory = gst_rtsp_media_factory_new ();
   /* we have to suspend media after SDP in order to make sure that
    * we can reconfigure UDP sink with new UDP ports */
-  gst_rtsp_media_factory_set_suspend_mode (factory, GST_RTSP_SUSPEND_MODE_RESET);
+  gst_rtsp_media_factory_set_suspend_mode (factory,
+      GST_RTSP_SUSPEND_MODE_RESET);
   pool = gst_rtsp_address_pool_new ();
   gst_rtsp_address_pool_add_range (pool, GST_RTSP_ADDRESS_POOL_ANY_IPV4,
       GST_RTSP_ADDRESS_POOL_ANY_IPV4, 7770, 7780, 0);
@@ -1603,7 +1618,7 @@ GST_END_TEST;
 
 GST_START_TEST (test_play_smpte_range)
 {
-  start_server ();
+  start_server (FALSE);
 
   do_test_play ("npt=5-");
   do_test_play ("smpte=0:00:00-");
@@ -1617,6 +1632,65 @@ GST_START_TEST (test_play_smpte_range)
 
 GST_END_TEST;
 
+static gpointer
+thread_func (gpointer data)
+{
+  do_test_play_full (NULL, GST_RTSP_LOWER_TRANS_UDP, (GMutex *) data);
+  return NULL;
+}
+
+/* Test adding and removing clients to a 'Shared' media. */
+GST_START_TEST (test_shared)
+{
+  GMutex lock1, lock2, lock3, lock4;
+  GThread *thread1, *thread2, *thread3, *thread4;
+
+  /* Locks for each thread. Each thread will keep reading data as long as the
+   * thread is locked. */
+  g_mutex_init (&lock1);
+  g_mutex_init (&lock2);
+  g_mutex_init (&lock3);
+  g_mutex_init (&lock4);
+
+  start_server (TRUE);
+
+  /* Start the first receiver thread. */
+  g_mutex_lock (&lock1);
+  thread1 = g_thread_new ("thread1", thread_func, &lock1);
+
+  /* Connect and disconnect another client. */
+  g_mutex_lock (&lock2);
+  thread2 = g_thread_new ("thread2", thread_func, &lock2);
+  g_mutex_unlock (&lock2);
+  g_mutex_clear (&lock2);
+  g_thread_join (thread2);
+
+  /* Do it again. */
+  g_mutex_lock (&lock3);
+  thread3 = g_thread_new ("thread3", thread_func, &lock3);
+  g_mutex_unlock (&lock3);
+  g_mutex_clear (&lock3);
+  g_thread_join (thread3);
+
+  /* Disconnect the last client. This will clean up the media. */
+  g_mutex_unlock (&lock1);
+  g_mutex_clear (&lock1);
+  g_thread_join (thread1);
+
+  /* Connect and disconnect another client. This will create and clean up the 
+   * media. */
+  g_mutex_lock (&lock4);
+  thread4 = g_thread_new ("thread4", thread_func, &lock4);
+  g_mutex_unlock (&lock4);
+  g_mutex_clear (&lock4);
+  g_thread_join (thread4);
+
+  stop_server ();
+  iterate ();
+}
+
+GST_END_TEST;
+
 GST_START_TEST (test_announce_without_sdp)
 {
   GstRTSPConnection *conn;
@@ -1749,7 +1823,8 @@ GST_START_TEST (test_record_tcp)
   gint i;
 
   mfactory =
-      start_record_server ("( rtppcmadepay name=depay0 ! appsink name=sink async=false )");
+      start_record_server
+      ("( rtppcmadepay name=depay0 ! appsink name=sink async=false )");
 
   g_signal_connect (mfactory, "media-constructed",
       G_CALLBACK (media_constructed_cb), &server_sink);
@@ -1926,6 +2001,7 @@ rtspserver_suite (void)
   tcase_add_test (tc, test_play_disconnect);
   tcase_add_test (tc, test_play_specific_server_port);
   tcase_add_test (tc, test_play_smpte_range);
+  tcase_add_test (tc, test_shared);
   tcase_add_test (tc, test_announce_without_sdp);
   tcase_add_test (tc, test_record_tcp);
   return s;
index 0fb7e59..26c2915 100644 (file)
@@ -52,18 +52,20 @@ GST_START_TEST (test_get_sockets)
 
   /* configure address pool for IPv4 and IPv6 unicast addresses */
   pool = gst_rtsp_address_pool_new ();
-  fail_unless (gst_rtsp_address_pool_add_range (pool, GST_RTSP_ADDRESS_POOL_ANY_IPV4,
-        GST_RTSP_ADDRESS_POOL_ANY_IPV4, 50000, 60000, 0));
-  fail_unless (gst_rtsp_address_pool_add_range (pool, GST_RTSP_ADDRESS_POOL_ANY_IPV6,
-        GST_RTSP_ADDRESS_POOL_ANY_IPV6, 50000, 60000, 0));
+  fail_unless (gst_rtsp_address_pool_add_range (pool,
+          GST_RTSP_ADDRESS_POOL_ANY_IPV4, GST_RTSP_ADDRESS_POOL_ANY_IPV4, 50000,
+          60000, 0));
+  fail_unless (gst_rtsp_address_pool_add_range (pool,
+          GST_RTSP_ADDRESS_POOL_ANY_IPV6, GST_RTSP_ADDRESS_POOL_ANY_IPV6, 50000,
+          60000, 0));
   gst_rtsp_stream_set_address_pool (stream, pool);
 
   fail_unless (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL));
 
   gst_rtsp_transport_new (&tr);
   tr->lower_transport = GST_RTSP_LOWER_TRANS_UDP;
-  fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, G_SOCKET_FAMILY_IPV4,
-        tr, FALSE));
+  fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream,
+          G_SOCKET_FAMILY_IPV4, tr, FALSE));
 
   socket = gst_rtsp_stream_get_rtp_socket (stream, G_SOCKET_FAMILY_IPV4);
   have_ipv4 = (socket != NULL);
@@ -138,7 +140,7 @@ GST_START_TEST (test_allocate_udp_ports_fail)
 
   pool = gst_rtsp_address_pool_new ();
   fail_unless (gst_rtsp_address_pool_add_range (pool, "192.168.1.1",
-        "192.168.1.1", 6000, 6001, 0));
+          "192.168.1.1", 6000, 6001, 0));
   gst_rtsp_stream_set_address_pool (stream, pool);
 
   fail_unless (gst_rtsp_stream_join_bin (stream, bin, rtpbin, GST_STATE_NULL));
@@ -146,7 +148,7 @@ GST_START_TEST (test_allocate_udp_ports_fail)
   gst_rtsp_transport_new (&tr);
   tr->lower_transport = GST_RTSP_LOWER_TRANS_UDP;
   fail_if (gst_rtsp_stream_allocate_udp_sockets (stream, G_SOCKET_FAMILY_IPV4,
-        tr, FALSE));
+          tr, FALSE));
 
   gst_rtsp_transport_free (tr);
   g_object_unref (pool);
@@ -258,8 +260,8 @@ GST_START_TEST (test_multicast_address_and_unicast_udp)
   gst_rtsp_transport_new (&tr);
   /* unicast udp */
   tr->lower_transport = GST_RTSP_LOWER_TRANS_UDP;
-  fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, G_SOCKET_FAMILY_IPV4,
-        tr, FALSE));
+  fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream,
+          G_SOCKET_FAMILY_IPV4, tr, FALSE));
 
   gst_rtsp_transport_free (tr);
   g_object_unref (pool);
@@ -309,8 +311,8 @@ GST_START_TEST (test_allocate_udp_ports_multicast)
   /* allocate udp multicast ports for IPv4 */
   gst_rtsp_transport_new (&tr);
   tr->lower_transport = GST_RTSP_LOWER_TRANS_UDP_MCAST;
-  fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, G_SOCKET_FAMILY_IPV4,
-        tr, FALSE));
+  fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream,
+          G_SOCKET_FAMILY_IPV4, tr, FALSE));
 
   /* check the multicast address and ports for IPv4 */
   addr = gst_rtsp_stream_get_multicast_address (stream, G_SOCKET_FAMILY_IPV4);
@@ -320,9 +322,9 @@ GST_START_TEST (test_allocate_udp_ports_multicast)
   fail_unless_equals_int (addr->n_ports, 2);
   gst_rtsp_address_free (addr);
 
-  /* allocate upd multicast ports for IPv6 */
-  fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, G_SOCKET_FAMILY_IPV6,
-        tr, FALSE));
+  /* allocate udp multicast ports for IPv6 */
+  fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream,
+          G_SOCKET_FAMILY_IPV6, tr, FALSE));
 
   /* check the multicast address and ports for IPv6 */
   addr = gst_rtsp_stream_get_multicast_address (stream, G_SOCKET_FAMILY_IPV6);
@@ -388,8 +390,8 @@ GST_START_TEST (test_allocate_udp_ports_client_settings)
   tr->port.min = 6002;
   tr->port.max = 6003;
   tr->lower_transport = GST_RTSP_LOWER_TRANS_UDP_MCAST;
-  fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, G_SOCKET_FAMILY_IPV4,
-        tr, FALSE));
+  fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream,
+          G_SOCKET_FAMILY_IPV4, tr, FALSE));
 
   /* verify that the multicast address and ports correspond to the requested client
    * transport information for IPv4 */
@@ -405,8 +407,8 @@ GST_START_TEST (test_allocate_udp_ports_client_settings)
   tr->destination = g_strdup ("FF11:DB8::1");
   tr->port.min = 6006;
   tr->port.max = 6007;
-  fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream, G_SOCKET_FAMILY_IPV6,
-        tr, FALSE));
+  fail_unless (gst_rtsp_stream_allocate_udp_sockets (stream,
+          G_SOCKET_FAMILY_IPV6, tr, FALSE));
 
   /* verify that the multicast address and ports correspond to the requested client
    * transport information for IPv6 */