GstElement *udpsrc_v6[2];
GstElement *udpqueue[2];
GstElement *udpsink[2];
+ GSocket *socket_v4[2];
+ GSocket *socket_v6[2];
/* for UDP multicast */
GstElement *mcast_udpsrc_v4[2];
GstElement *mcast_udpsrc_v6[2];
GstElement *mcast_udpqueue[2];
GstElement *mcast_udpsink[2];
+ GSocket *mcast_socket_v4[2];
+ GSocket *mcast_socket_v6[2];
/* for TCP transport */
GstElement *appsrc[2];
if (priv->rtxsend)
g_object_unref (priv->rtxsend);
+ if (priv->socket_v4[0])
+ g_object_unref (priv->socket_v4[0]);
+ if (priv->socket_v4[1])
+ g_object_unref (priv->socket_v4[1]);
+ if (priv->socket_v6[0])
+ g_object_unref (priv->socket_v6[0]);
+ if (priv->socket_v6[1])
+ g_object_unref (priv->socket_v6[1]);
+ if (priv->mcast_socket_v4[0])
+ g_object_unref (priv->mcast_socket_v4[0]);
+ if (priv->mcast_socket_v4[1])
+ g_object_unref (priv->mcast_socket_v4[1]);
+ if (priv->mcast_socket_v6[0])
+ g_object_unref (priv->mcast_socket_v6[0]);
+ if (priv->mcast_socket_v6[1])
+ g_object_unref (priv->mcast_socket_v6[1]);
+
g_free (priv->multicast_iface);
gst_object_unref (priv->payloader);
/* Update the dscp qos property on the udp sinks */
static void
-update_dscp_qos (GstRTSPStream * stream, GstElement * udpsink[2])
+update_dscp_qos (GstRTSPStream * stream, GstElement ** udpsink)
{
GstRTSPStreamPrivate *priv;
priv = stream->priv;
- if (udpsink[0]) {
- g_object_set (G_OBJECT (udpsink[0]), "qos-dscp", priv->dscp_qos, NULL);
- }
-
- if (udpsink[1]) {
- g_object_set (G_OBJECT (udpsink[1]), "qos-dscp", priv->dscp_qos, NULL);
+ if (*udpsink) {
+ g_object_set (G_OBJECT (*udpsink), "qos-dscp", priv->dscp_qos, NULL);
}
}
/* must be called with lock */
static void
-set_sockets_for_udpsinks (GstElement * udpsink[2], GSocket * rtp_socket,
- GSocket * rtcp_socket, GSocketFamily family)
+set_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
+ GSocketFamily family)
{
const gchar *multisink_socket;
else
multisink_socket = "socket";
- g_object_set (G_OBJECT (udpsink[0]), multisink_socket, rtp_socket, NULL);
- g_object_set (G_OBJECT (udpsink[1]), multisink_socket, rtcp_socket, NULL);
+ g_object_set (G_OBJECT (udpsink), multisink_socket, socket, NULL);
+}
+
+/* must be called with lock */
+static void
+set_multicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
+ GSocketFamily family, const gchar * multicast_iface,
+ const gchar * addr_str, gint port)
+{
+ set_socket_for_udpsink (udpsink, socket, family);
+
+ if (multicast_iface) {
+ g_object_set (G_OBJECT (udpsink), "multicast-iface",
+ multicast_iface, NULL);
+ }
+
+ g_signal_emit_by_name (udpsink, "add", addr_str, port, NULL);
+}
+
+
+/* must be called with lock */
+static void
+set_unicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
+ GSocketFamily family)
+{
+ set_socket_for_udpsink (udpsink, socket, family);
+}
+
+static guint16
+get_port_from_socket (GSocket * socket)
+{
+ guint16 port;
+ GSocketAddress *sockaddr;
+ GError *err;
+
+ GST_DEBUG ("socket: %p", socket);
+ sockaddr = g_socket_get_local_address (socket, &err);
+ if (sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (sockaddr)) {
+ g_clear_object (&sockaddr);
+ GST_ERROR ("failed to get sockaddr: %s", err->message);
+ g_error_free (err);
+ return 0;
+ }
+
+ port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (sockaddr));
+ g_object_unref (sockaddr);
+
+ return port;
}
+
static gboolean
-create_and_configure_udpsinks (GstRTSPStream * stream, GstElement * udpsink[2])
+create_and_configure_udpsink (GstRTSPStream * stream, GstElement ** udpsink,
+ GSocket *socket_v4, GSocket *socket_v6, gboolean multicast, gboolean is_rtp)
{
GstRTSPStreamPrivate *priv = stream->priv;
- GstElement *udpsink0, *udpsink1;
- udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
- udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
+ *udpsink = gst_element_factory_make ("multiudpsink", NULL);
- if (!udpsink0 || !udpsink1)
+ if (!*udpsink)
goto no_udp_protocol;
/* configure sinks */
- g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
- g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
+ g_object_set (G_OBJECT (*udpsink), "close-socket", FALSE, NULL);
- g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
- g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
+ g_object_set (G_OBJECT (*udpsink), "send-duplicates", FALSE, NULL);
- g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
+ if (is_rtp)
+ g_object_set (G_OBJECT (*udpsink), "buffer-size", priv->buffer_size, NULL);
+ else
+ g_object_set (G_OBJECT (*udpsink), "sync", FALSE, NULL);
- g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
/* Needs to be async for RECORD streams, otherwise we will never go to
* PLAYING because the sinks will wait for data while the udpsrc can't
* provide data with timestamps in PAUSED. */
- if (priv->sinkpad)
- g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
- g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
-
- /* join multicast group when adding clients, so we'll start receiving from it.
- * We cannot rely on the udpsrc to join the group since its socket is always a
- * local unicast one. */
- g_object_set (G_OBJECT (udpsink0), "auto-multicast", TRUE, NULL);
- g_object_set (G_OBJECT (udpsink1), "auto-multicast", TRUE, NULL);
+ if (!is_rtp || priv->sinkpad)
+ g_object_set (G_OBJECT (*udpsink), "async", FALSE, NULL);
- g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
- g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
+ if (multicast) {
+ /* join multicast group when adding clients, so we'll start receiving from it.
+ * We cannot rely on the udpsrc to join the group since its socket is always a
+ * local unicast one. */
+ g_object_set (G_OBJECT (*udpsink), "auto-multicast", TRUE, NULL);
- udpsink[0] = udpsink0;
- udpsink[1] = udpsink1;
+ g_object_set (G_OBJECT (*udpsink), "loop", FALSE, NULL);
+ }
/* update the dscp qos field in the sinks */
update_dscp_qos (stream, udpsink);
+ if (priv->server_addr_v4) {
+ GST_DEBUG_OBJECT (stream,
+ "udp IPv4, configure udpsinks");
+ set_unicast_socket_for_udpsink (*udpsink, socket_v4,
+ G_SOCKET_FAMILY_IPV4);
+ }
+
+ if (priv->server_addr_v6) {
+ GST_DEBUG_OBJECT (stream,
+ "udp IPv6, configure udpsinks");
+ set_unicast_socket_for_udpsink (*udpsink, socket_v6,
+ G_SOCKET_FAMILY_IPV6);
+ }
+
+ if (multicast) {
+ gint port;
+ if (priv->mcast_addr_v4) {
+ GST_DEBUG_OBJECT (stream, "mcast IPv4, configure udpsinks");
+ port = get_port_from_socket (socket_v4);
+ if (!port)
+ goto get_port_failed;
+ set_multicast_socket_for_udpsink (*udpsink, socket_v4,
+ G_SOCKET_FAMILY_IPV4, priv->multicast_iface, priv->mcast_addr_v4->address, port);
+ }
+
+ if (priv->mcast_addr_v6) {
+ GST_DEBUG_OBJECT (stream, "mcast IPv6, configure udpsinks");
+ port = get_port_from_socket (socket_v6);
+ if (!port)
+ goto get_port_failed;
+ set_multicast_socket_for_udpsink (*udpsink, socket_v6,
+ G_SOCKET_FAMILY_IPV6, priv->multicast_iface, priv->mcast_addr_v6->address, port);
+ }
+
+ }
+
return TRUE;
/* ERRORS */
no_udp_protocol:
{
+ GST_ERROR_OBJECT (stream, "failed to create udpsink element");
+ return FALSE;
+ }
+get_port_failed:
+ {
+ GST_ERROR_OBJECT (stream, "failed to get udp port");
return FALSE;
}
}
/* must be called with lock */
static gboolean
-create_and_configure_udpsources (GstElement * udpsrc_out[2],
- GSocket * rtp_socket, GSocket * rtcp_socket)
+create_and_configure_udpsource (GstElement ** udpsrc,
+ GSocket * socket)
{
GstStateChangeReturn ret;
- udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
- udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
+ g_assert (socket != NULL);
- if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
+ *udpsrc = gst_element_factory_make ("udpsrc", NULL);
+ if (*udpsrc == NULL)
goto error;
- g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
- g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
+ g_object_set (G_OBJECT (*udpsrc), "socket", socket, NULL);
/* The udpsrc cannot do the join because its socket is always a local unicast
* one. The udpsink sharing the same socket will do it for us. */
- g_object_set (G_OBJECT (udpsrc_out[0]), "auto-multicast", FALSE, NULL);
- g_object_set (G_OBJECT (udpsrc_out[1]), "auto-multicast", FALSE, NULL);
+ g_object_set (G_OBJECT (*udpsrc), "auto-multicast", FALSE, NULL);
- g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
- g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
+ g_object_set (G_OBJECT (*udpsrc), "loop", FALSE, NULL);
- g_object_set (G_OBJECT (udpsrc_out[0]), "close-socket", FALSE, NULL);
- g_object_set (G_OBJECT (udpsrc_out[1]), "close-socket", FALSE, NULL);
+ g_object_set (G_OBJECT (*udpsrc), "close-socket", FALSE, NULL);
- ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
- if (ret == GST_STATE_CHANGE_FAILURE)
- goto error;
- ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
+ ret = gst_element_set_state (*udpsrc, GST_STATE_READY);
if (ret == GST_STATE_CHANGE_FAILURE)
goto error;
/* ERRORS */
error:
{
- if (udpsrc_out[0]) {
- gst_element_set_state (udpsrc_out[0], GST_STATE_NULL);
- g_clear_object (&udpsrc_out[0]);
- }
- if (udpsrc_out[1]) {
- gst_element_set_state (udpsrc_out[1], GST_STATE_NULL);
- g_clear_object (&udpsrc_out[1]);
+ if (*udpsrc) {
+ gst_element_set_state (*udpsrc, GST_STATE_NULL);
+ g_clear_object (udpsrc);
}
return FALSE;
}
static gboolean
alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
- GstElement * udpsrc_out[2], GstElement * udpsink_out[2],
- GstRTSPAddress ** server_addr_out, gboolean multicast)
+ GSocket *socket_out[2], GstRTSPAddress ** server_addr_out,
+ gboolean multicast, GstRTSPTransport * ct)
{
GstRTSPStreamPrivate *priv = stream->priv;
GSocket *rtp_socket = NULL;
GSocket *rtcp_socket;
gint tmp_rtp, tmp_rtcp;
guint count;
- gint rtpport, rtcpport;
GList *rejected_addresses = NULL;
GstRTSPAddress *addr = NULL;
GInetAddress *inetaddr = NULL;
- gchar *addr_str;
GSocketAddress *rtp_sockaddr = NULL;
GSocketAddress *rtcp_sockaddr = NULL;
GstRTSPAddressPool *pool;
- g_assert (!udpsrc_out[0]);
- g_assert (!udpsrc_out[1]);
- g_assert ((!udpsink_out[0] && !udpsink_out[1]) ||
- (udpsink_out[0] && udpsink_out[1]));
- g_assert (*server_addr_out == NULL);
-
pool = priv->pool;
count = 0;
rejected_addresses = g_list_prepend (rejected_addresses, addr);
if (!pool)
- goto no_ports;
+ goto no_pool;
flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
if (multicast)
addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
if (addr == NULL)
- goto no_ports;
+ goto no_address;
tmp_rtp = addr->port;
g_clear_object (&inetaddr);
+ /* FIXME: Does it really work with the IP_MULTICAST_ALL socket option and
+ * socket control message set in udpsrc? */
if (multicast)
inetaddr = g_inet_address_new_any (family);
else
rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
+ GST_DEBUG_OBJECT (stream, "rtp bind() failed, will try again");
g_object_unref (rtp_sockaddr);
goto again;
}
rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
+ GST_DEBUG_OBJECT (stream, "rctp bind() failed, will try again");
g_object_unref (rtcp_sockaddr);
g_clear_object (&rtp_socket);
goto again;
addr->n_ports = 2;
}
- addr_str = addr->address;
g_clear_object (&inetaddr);
- if (!create_and_configure_udpsources (udpsrc_out, rtp_socket, rtcp_socket)) {
- goto no_udp_protocol;
- }
-
- g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
- g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
-
- /* this should not happen... */
- if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
- goto port_error;
-
- /* This function is called twice (for v4 and v6) but we create only one pair
- * of udpsinks. */
- if (!udpsink_out[0]
- && !create_and_configure_udpsinks (stream, udpsink_out))
- goto no_udp_protocol;
-
- if (multicast) {
- g_object_set (G_OBJECT (udpsink_out[0]), "multicast-iface",
- priv->multicast_iface, NULL);
- g_object_set (G_OBJECT (udpsink_out[1]), "multicast-iface",
- priv->multicast_iface, NULL);
-
- g_signal_emit_by_name (udpsink_out[0], "add", addr_str, rtpport, NULL);
- g_signal_emit_by_name (udpsink_out[1], "add", addr_str, rtcpport, NULL);
- }
-
- set_sockets_for_udpsinks (udpsink_out, rtp_socket, rtcp_socket, family);
-
+ socket_out[0] = rtp_socket;
+ socket_out[1] = rtcp_socket;
*server_addr_out = addr;
- g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
+ GST_DEBUG_OBJECT (stream, "allocated address: %s and ports: %d, %d", addr->address, tmp_rtp, tmp_rtcp);
- g_object_unref (rtp_socket);
- g_object_unref (rtcp_socket);
+ g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
return TRUE;
/* ERRORS */
no_udp_protocol:
{
+ GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: protocol error");
goto cleanup;
}
-no_ports:
+no_pool:
{
+ GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: no address pool specified");
goto cleanup;
}
-port_error:
+no_address:
{
+ GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
+ goto cleanup;
+ }
+no_ports:
+ {
+ GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: no ports");
goto cleanup;
}
socket_error:
{
+ GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: socket error");
goto cleanup;
}
cleanup:
* Allocates RTP and RTCP ports.
*
* Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
- * Deprecated: This function shouldn't have been made public
*/
gboolean
gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
- GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings)
+ GSocketFamily family, GstRTSPTransport * ct,
+ gboolean use_transport_settings)
{
- g_warn_if_reached ();
- return FALSE;
+ GstRTSPStreamPrivate *priv;
+ gboolean ret = FALSE;
+ GstRTSPLowerTrans transport;
+ gboolean allocated = FALSE;
+
+ g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
+ g_return_val_if_fail (ct != NULL, FALSE);
+ priv = stream->priv;
+
+ transport = ct->lower_transport;
+
+ g_mutex_lock (&priv->lock);
+
+ if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
+ if (family == G_SOCKET_FAMILY_IPV4 && priv->mcast_addr_v4)
+ allocated = TRUE;
+ else if (family == G_SOCKET_FAMILY_IPV6 && priv->mcast_addr_v6)
+ allocated = TRUE;
+ } else if (transport == GST_RTSP_LOWER_TRANS_UDP) {
+ if (family == G_SOCKET_FAMILY_IPV4 && priv->server_addr_v4)
+ allocated = TRUE;
+ else if (family == G_SOCKET_FAMILY_IPV6 && priv->server_addr_v6)
+ allocated = TRUE;
+ }
+
+ if (allocated) {
+ g_mutex_unlock (&priv->lock);
+ return TRUE;
+ }
+
+ if (family == G_SOCKET_FAMILY_IPV4) {
+ /* IPv4 */
+ if (transport == GST_RTSP_LOWER_TRANS_UDP) {
+ /* UDP unicast */
+ GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv4");
+ ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
+ priv->socket_v4, &priv->server_addr_v4, FALSE, ct);
+ } else {
+ /* multicast */
+ GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv4");
+ ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
+ priv->mcast_socket_v4, &priv->mcast_addr_v4, TRUE, ct);
+ }
+ } else {
+ /* IPv6 */
+ if (transport == GST_RTSP_LOWER_TRANS_UDP) {
+ /* unicast */
+ GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv6");
+ ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
+ priv->socket_v6, &priv->server_addr_v6, FALSE, ct);
+
+ } else {
+ /* multicast */
+ GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv6");
+ ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
+ priv->mcast_socket_v6, &priv->mcast_addr_v6, TRUE, ct);
+ }
+ }
+ g_mutex_unlock (&priv->lock);
+
+ return ret;
}
/**
return ret;
}
-/* must be called with lock */
-static gboolean
-alloc_ports (GstRTSPStream * stream)
-{
- GstRTSPStreamPrivate *priv = stream->priv;
- gboolean ret = TRUE;
-
- if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP) {
- ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
- priv->udpsrc_v4, priv->udpsink, &priv->server_addr_v4, FALSE);
-
- ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
- priv->udpsrc_v6, priv->udpsink, &priv->server_addr_v6, FALSE);
- }
-
- /* FIXME: Maybe actually consider the return values? */
- if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST) {
- ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
- priv->mcast_udpsrc_v4, priv->mcast_udpsink, &priv->mcast_addr_v4, TRUE);
-
- ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
- priv->mcast_udpsrc_v6, priv->mcast_udpsink, &priv->mcast_addr_v6, TRUE);
- }
-
- return ret;
-}
-
/**
* gst_rtsp_stream_get_server_port:
* @stream: a #GstRTSPStream
gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
}
+typedef struct _ProbeData ProbeData;
+
+struct _ProbeData
+{
+ GstRTSPStream *stream;
+ /* existing sink, already linked to tee */
+ GstElement *sink1;
+ /* new sink, about to be linked */
+ GstElement *sink2;
+ /* new queue element, that will be linked to tee and sink1 */
+ GstElement **queue1;
+ /* new queue element, that will be linked to tee and sink2 */
+ GstElement **queue2;
+ GstPad *sink_pad;
+ GstPad *tee_pad;
+ guint index;
+};
+
static void
-plug_sink (GstBin * bin, GstElement * tee, GstElement * sink,
- GstElement ** queue_out)
+free_cb_data (gpointer user_data)
{
- GstPad *pad;
- GstPad *teepad;
- GstPad *queuepad;
+ ProbeData *data = user_data;
+
+ gst_object_unref (data->stream);
+ gst_object_unref (data->sink1);
+ gst_object_unref (data->sink2);
+ gst_object_unref (data->sink_pad);
+ gst_object_unref (data->tee_pad);
+ g_free (data);
+}
+
- gst_bin_add (bin, sink);
+static void
+create_and_plug_queue_to_unlinked_stream (GstRTSPStream * stream, GstElement *tee,
+ GstElement *sink, GstElement ** queue)
+{
+ GstRTSPStreamPrivate *priv = stream->priv;
+ GstPad *tee_pad;
+ GstPad *queue_pad;
+ GstPad *sink_pad;
- *queue_out = gst_element_factory_make ("queue", NULL);
- g_object_set (*queue_out, "max-size-buffers", 1, "max-size-bytes", 0,
+ /* create queue for the new stream */
+ *queue = gst_element_factory_make ("queue", NULL);
+ g_object_set (*queue, "max-size-buffers", 1, "max-size-bytes", 0,
"max-size-time", G_GINT64_CONSTANT (0), NULL);
- gst_bin_add (bin, *queue_out);
+ gst_bin_add (priv->joined_bin, *queue);
/* link tee to queue */
- teepad = gst_element_get_request_pad (tee, "src_%u");
- pad = gst_element_get_static_pad (*queue_out, "sink");
- gst_pad_link (teepad, pad);
- gst_object_unref (pad);
- gst_object_unref (teepad);
+ tee_pad = gst_element_get_request_pad (tee, "src_%u");
+ queue_pad = gst_element_get_static_pad (*queue, "sink");
+ gst_pad_link (tee_pad, queue_pad);
+ gst_object_unref (queue_pad);
+ gst_object_unref (tee_pad);
/* link queue to sink */
- queuepad = gst_element_get_static_pad (*queue_out, "src");
- pad = gst_element_get_static_pad (sink, "sink");
- gst_pad_link (queuepad, pad);
- gst_object_unref (queuepad);
- gst_object_unref (pad);
+ queue_pad = gst_element_get_static_pad (*queue, "src");
+ sink_pad = gst_element_get_static_pad (sink, "sink");
+ gst_pad_link (queue_pad, sink_pad);
+ gst_object_unref (queue_pad);
+ gst_object_unref (sink_pad);
+
+ gst_element_sync_state_with_parent (sink);
+ gst_element_sync_state_with_parent (*queue);
+}
+
+static GstPadProbeReturn
+create_and_plug_queue_to_linked_stream_probe_cb (GstPad * inpad,
+ GstPadProbeInfo * info, gpointer user_data)
+{
+ GstRTSPStreamPrivate *priv;
+ ProbeData *data = user_data;
+ GstRTSPStream *stream;
+ GstElement **queue1;
+ GstElement **queue2;
+ GstPad *sink_pad;
+ GstPad *tee_pad;
+ GstPad *queue_pad;
+ guint index;
+
+ stream = data->stream;
+ priv = stream->priv;
+ queue1 = data->queue1;
+ queue2 = data->queue2;
+ sink_pad = data->sink_pad;
+ tee_pad = data->tee_pad;
+ index = data->index;
+
+ /* unlink tee and the existing sink:
+ * .-----. .---------.
+ * | tee | | sink1 |
+ * sink src->sink |
+ * '-----' '---------'
+ */
+ g_assert (gst_pad_unlink (tee_pad, sink_pad));
+
+ /* add queue to the already existing stream */
+ *queue1 = gst_element_factory_make ("queue", NULL);
+ g_object_set (*queue1, "max-size-buffers", 1, "max-size-bytes", 0,
+ "max-size-time", G_GINT64_CONSTANT (0), NULL);
+ gst_bin_add (priv->joined_bin, *queue1);
+
+ /* link tee, queue and sink:
+ * .-----. .---------. .---------.
+ * | tee | | queue1 | | sink1 |
+ * sink src->sink src->sink |
+ * '-----' '---------' '---------'
+ */
+ queue_pad = gst_element_get_static_pad (*queue1, "sink");
+ gst_pad_link (tee_pad, queue_pad);
+ gst_object_unref (queue_pad);
+ queue_pad = gst_element_get_static_pad (*queue1, "src");
+ gst_pad_link (queue_pad, sink_pad);
+ gst_object_unref (queue_pad);
+
+ gst_element_sync_state_with_parent (*queue1);
+
+ /* create queue and link it to tee and the new sink */
+ create_and_plug_queue_to_unlinked_stream (stream,
+ priv->tee[index], data->sink2, queue2);
+
+ /* the final stream:
+ *
+ * .-----. .---------. .---------.
+ * | tee | | queue1 | | sink1 |
+ * sink src->sink src->sink |
+ * | | '---------' '---------'
+ * | | .---------. .---------.
+ * | | | queue2 | | sink2 |
+ * | src->sink src->sink |
+ * '-----' '---------' '---------'
+ */
+
+ return GST_PAD_PROBE_REMOVE;
+}
+
+static void
+create_and_plug_queue_to_linked_stream (GstRTSPStream * stream, GstElement * sink1,
+ GstElement * sink2, guint index, GstElement ** queue1,
+ GstElement ** queue2)
+{
+ ProbeData *data;
+
+ data = g_new0 (ProbeData, 1);
+ data->stream = gst_object_ref (stream);
+ data->sink1 = gst_object_ref (sink1);
+ data->sink2 = gst_object_ref (sink2);
+ data->queue1 = queue1;
+ data->queue2 = queue2;
+ data->index = index;
+
+ data->sink_pad = gst_element_get_static_pad (sink1, "sink");
+ g_assert (data->sink_pad);
+ data->tee_pad = gst_pad_get_peer (data->sink_pad);
+ g_assert (data->tee_pad);
+
+ gst_pad_add_probe (data->tee_pad, GST_PAD_PROBE_TYPE_IDLE,
+ create_and_plug_queue_to_linked_stream_probe_cb, data, free_cb_data);
+}
+
+static void
+plug_udp_sink (GstRTSPStream * stream, GstElement * sink_to_plug,
+ GstElement ** queue_to_plug, guint index, gboolean is_mcast)
+{
+ GstRTSPStreamPrivate *priv = stream->priv;
+ GstElement *existing_sink;
+
+ if (is_mcast)
+ existing_sink = priv->udpsink[index];
+ else
+ existing_sink = priv->mcast_udpsink[index];
+
+ GST_DEBUG_OBJECT (stream, "plug %s sink", is_mcast ? "mcast" : "udp");
+
+ /* add sink to the bin */
+ gst_bin_add (priv->joined_bin, sink_to_plug);
+
+ if (priv->appsink[index] && existing_sink) {
+
+ /* queues are already added for the existing stream, add one for
+ the newly added udp stream */
+ create_and_plug_queue_to_unlinked_stream (stream, priv->tee[index],
+ sink_to_plug, queue_to_plug);
+
+ } else if (priv->appsink[index] || existing_sink) {
+ GstElement **queue;
+ GstElement *element;
+
+ /* add queue to the already existing stream plus the newly created udp
+ stream */
+ if (priv->appsink[index]) {
+ element = priv->appsink[index];
+ queue = &priv->appqueue[index];
+ } else {
+ element = existing_sink;
+ if (is_mcast)
+ queue = &priv->udpqueue[index];
+ else
+ queue = &priv->mcast_udpqueue[index];
+ }
+
+ create_and_plug_queue_to_linked_stream (stream, element, sink_to_plug, index,
+ queue, queue_to_plug);
+
+ } else {
+ GstPad *tee_pad;
+ GstPad *sink_pad;
+
+ GST_DEBUG_OBJECT (stream, "creating first stream");
+
+ /* no need to add queues */
+ tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u");
+ sink_pad = gst_element_get_static_pad (sink_to_plug, "sink");
+ gst_pad_link (tee_pad, sink_pad);
+ gst_object_unref (tee_pad);
+ gst_object_unref (sink_pad);
+ }
+
+ gst_element_sync_state_with_parent (sink_to_plug);
}
-/* must be called with lock */
static void
-create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
+plug_tcp_sink (GstRTSPStream * stream, guint index)
+{
+ GstRTSPStreamPrivate *priv = stream->priv;
+
+ GST_DEBUG_OBJECT (stream, "plug tcp sink");
+
+ /* add sink to the bin */
+ gst_bin_add (priv->joined_bin, priv->appsink[index]);
+
+ if (priv->mcast_udpsink[index] && priv->udpsink[index]) {
+
+ /* queues are already added for the existing stream, add one for
+ the newly added tcp stream */
+ create_and_plug_queue_to_unlinked_stream (stream,
+ priv->tee[index], priv->appsink[index], &priv->appqueue[index]);
+
+ } else if (priv->mcast_udpsink[index] || priv->udpsink[index]) {
+ GstElement **queue;
+ GstElement *element;
+
+ /* add queue to the already existing stream plus the newly created tcp
+ stream */
+ if (priv->mcast_udpsink[index]) {
+ element = priv->mcast_udpsink[index];
+ queue = &priv->mcast_udpqueue[index];
+ } else {
+ element = priv->udpsink[index];
+ queue = &priv->udpqueue[index];
+ }
+
+ create_and_plug_queue_to_linked_stream (stream, element, priv->appsink[index], index,
+ queue, &priv->appqueue[index]);
+
+ } else {
+ GstPad *tee_pad;
+ GstPad *sink_pad;
+
+ /* no need to add queues */
+ tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u");
+ sink_pad = gst_element_get_static_pad (priv->appsink[index], "sink");
+ gst_pad_link (tee_pad, sink_pad);
+ gst_object_unref (tee_pad);
+ gst_object_unref (sink_pad);
+ }
+
+ gst_element_sync_state_with_parent (priv->appsink[index]);
+}
+
+static void
+plug_sink (GstRTSPStream * stream, const GstRTSPTransport * transport,
+ guint index)
+{
+ GstRTSPStreamPrivate *priv;
+ gboolean is_tcp, is_udp, is_mcast;
+ priv = stream->priv;
+
+ is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
+ is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
+ is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
+
+ if (is_udp)
+ plug_udp_sink (stream, priv->udpsink[index],
+ &priv->udpqueue[index], index, FALSE);
+
+ else if (is_mcast)
+ plug_udp_sink (stream, priv->mcast_udpsink[index],
+ &priv->mcast_udpqueue[index], index, TRUE);
+
+ else if (is_tcp)
+ plug_tcp_sink (stream, index);
+}
+
+/* must be called with lock */
+static gboolean
+create_sender_part (GstRTSPStream * stream, const GstRTSPTransport * transport)
{
GstRTSPStreamPrivate *priv;
GstPad *pad;
- gboolean is_tcp, is_udp;
+ GstBin *bin;
+ gboolean is_tcp, is_udp, is_mcast;
gint i;
+ GST_DEBUG_OBJECT (stream, "create sender part");
priv = stream->priv;
+ bin = priv->joined_bin;
+
+ is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
+ is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
+ is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
- is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
- is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
- (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
+ GST_DEBUG_OBJECT (stream, "tcp: %d, udp: %d, mcast: %d", is_tcp, is_udp,
+ is_mcast);
+
+ if (is_udp && !priv->server_addr_v4 && !priv->server_addr_v6) {
+ GST_WARNING_OBJECT (stream, "no sockets assigned for UDP");
+ return FALSE;
+ }
+
+ if (is_mcast && !priv->mcast_addr_v4 && !priv->mcast_addr_v6) {
+ GST_WARNING_OBJECT (stream, "no sockets assigned for UDP multicast");
+ return FALSE;
+ }
for (i = 0; i < 2; i++) {
+ gboolean link_tee = FALSE;
/* For the sender we create this bit of pipeline for both
- * RTP and RTCP. Sync and preroll are enabled on udpsink so
- * we need to add a queue before appsink and udpsink to make
- * the pipeline not block. For the TCP case, we want to pump
- * client as fast as possible anyway. This pipeline is used
- * when both TCP and UDP are present.
+ * RTP and RTCP.
+ * Initially there will be only one active transport for
+ * the stream, so the pipeline will look like this:
+ *
+ * .--------. .-----. .---------.
+ * | rtpbin | | tee | | sink |
+ * | send->sink src->sink |
+ * '--------' '-----' '---------'
+ *
+ * For each new transport, the already existing branch will
+ * be reconfigured by adding a queue element:
*
* .--------. .-----. .---------. .---------.
* | rtpbin | | tee | | queue | | udpsink |
* | send->sink src->sink src->sink |
* '--------' | | '---------' '---------'
* | | .---------. .---------.
+ * | | | queue | | udpsink |
+ * | src->sink src->sink |
+ * | | '---------' '---------'
+ * | | .---------. .---------.
* | | | queue | | appsink |
* | src->sink src->sink |
* '-----' '---------' '---------'
- *
- * When only UDP or only TCP is allowed, we skip the tee and queue
- * and link the udpsink (for UDP) or appsink (for TCP) directly to
- * the session.
*/
/* Only link the RTP send src if we're going to send RTP, link
if (!priv->srcpad && i == 0)
continue;
- if (is_tcp) {
+ if (!priv->tee[i]) {
+ /* make tee for RTP/RTCP */
+ priv->tee[i] = gst_element_factory_make ("tee", NULL);
+ gst_bin_add (bin, priv->tee[i]);
+ link_tee = TRUE;
+ }
+
+ if (is_udp && !priv->udpsink[i]) {
+ /* we create only one pair of udpsinks for IPv4 and IPv6 */
+ create_and_configure_udpsink (stream, &priv->udpsink[i], priv->socket_v4[i],
+ priv->socket_v6[i], FALSE, (i == 0));
+ plug_sink (stream, transport, i);
+ } else if (is_mcast && !priv->mcast_udpsink[i]) {
+ /* we create only one pair of mcast-udpsinks for IPv4 and IPv6 */
+ create_and_configure_udpsink (stream, &priv->mcast_udpsink[i],
+ priv->mcast_socket_v4[i], priv->mcast_socket_v6[i], TRUE, (i == 0));
+ plug_sink (stream, transport, i);
+ } else if (is_tcp && !priv->appsink[i]) {
/* make appsink */
priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
+
+ /* we need to set sync and preroll to FALSE for the sink to avoid
+ * deadlock. This is only needed for sink sending RTCP data. */
+ if (i == 1)
+ g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE,
+ NULL);
+
gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
&sink_cb, stream, NULL);
+ plug_sink (stream, transport, i);
}
- /* If we have udp always use a tee because we could have mcast clients
- * requesting different ports, in which case we'll have to plug more
- * udpsinks. */
- if (is_udp) {
- /* make tee for RTP/RTCP */
- priv->tee[i] = gst_element_factory_make ("tee", NULL);
- gst_bin_add (bin, priv->tee[i]);
-
+ if (link_tee) {
/* and link to rtpbin send pad */
+ gst_element_sync_state_with_parent (priv->tee[i]);
pad = gst_element_get_static_pad (priv->tee[i], "sink");
gst_pad_link (priv->send_src[i], pad);
gst_object_unref (pad);
-
- if (priv->udpsink[i])
- plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]);
-
- if (priv->mcast_udpsink[i])
- plug_sink (bin, priv->tee[i], priv->mcast_udpsink[i],
- &priv->mcast_udpqueue[i]);
-
- if (is_tcp) {
- if (i == 1)
- g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
- plug_sink (bin, priv->tee[i], priv->appsink[i], &priv->appqueue[i]);
- }
- } else if (is_tcp) {
- /* only appsink needed, link it to the session */
- gst_bin_add (bin, priv->appsink[i]);
- pad = gst_element_get_static_pad (priv->appsink[i], "sink");
- gst_pad_link (priv->send_src[i], pad);
- gst_object_unref (pad);
-
- /* when its only TCP, we need to set sync and preroll to FALSE
- * for the sink to avoid deadlock. And this is only needed for
- * sink used for RTCP data, not the RTP data. */
- if (i == 1)
- g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
- }
-
- /* check if we need to set to a special state */
- if (state != GST_STATE_NULL) {
- if (priv->udpsink[i])
- gst_element_set_state (priv->udpsink[i], state);
- if (priv->mcast_udpsink[i])
- gst_element_set_state (priv->mcast_udpsink[i], state);
- if (priv->appsink[i])
- gst_element_set_state (priv->appsink[i], state);
- if (priv->appqueue[i])
- gst_element_set_state (priv->appqueue[i], state);
- if (priv->udpqueue[i])
- gst_element_set_state (priv->udpqueue[i], state);
- if (priv->mcast_udpqueue[i])
- gst_element_set_state (priv->mcast_udpqueue[i], state);
- if (priv->tee[i])
- gst_element_set_state (priv->tee[i], state);
}
}
+
+ return TRUE;
}
/* must be called with lock */
}
/* must be called with lock */
-static void
-create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
+static gboolean
+create_receiver_part (GstRTSPStream * stream, const GstRTSPTransport *
+ transport)
{
GstRTSPStreamPrivate *priv;
GstPad *pad;
- gboolean is_tcp;
+ GstBin *bin;
+ gboolean tcp;
+ gboolean udp;
+ gboolean mcast;
gint i;
+ GST_DEBUG_OBJECT (stream, "create receiver part");
priv = stream->priv;
+ bin = priv->joined_bin;
- is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
+ tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
+ udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
+ mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
for (i = 0; i < 2; i++) {
/* For the receiver we create this bit of pipeline for both
* RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
* and it is all funneled into the rtpbin receive pad.
*
+ *
* .--------. .--------. .--------.
* | udpsrc | | funnel | | rtpbin |
- * | src->sink src->sink |
+ * | RTP src->sink src->sink |
+ * '--------' | | | |
+ * .--------. | | | |
+ * | appsrc | | | | |
+ * | RTP src->sink | | |
+ * '--------' '--------' | |
+ * | |
+ * .--------. .--------. | |
+ * | udpsrc | | funnel | | |
+ * | RTCP src->sink src->sink |
* '--------' | | '--------'
* .--------. | |
* | appsrc | | |
- * | src->sink |
+ * | RTCP src->sink |
* '--------' '--------'
*/
gst_pad_link (pad, priv->recv_sink[i]);
gst_object_unref (pad);
- if (priv->udpsrc_v4[i])
+ if (udp && !priv->udpsrc_v4[i] && priv->server_addr_v4) {
+ GST_DEBUG_OBJECT (stream, "udp IPv4, create and configure udpsources");
+ if (!create_and_configure_udpsource (&priv->udpsrc_v4[i],
+ priv->socket_v4[i]))
+ goto udpsrc_error;
+
plug_src (stream, bin, priv->udpsrc_v4[i], priv->funnel[i]);
+ }
+
+ if (udp && !priv->udpsrc_v6[i] && priv->server_addr_v6) {
+ GST_DEBUG_OBJECT (stream, "udp IPv6, create and configure udpsources");
+ if (!create_and_configure_udpsource (&priv->udpsrc_v6[i],
+ priv->socket_v6[i]))
+ goto udpsrc_error;
- if (priv->udpsrc_v6[i])
plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]);
+ }
- if (priv->mcast_udpsrc_v4[i])
+ if (mcast && !priv->mcast_udpsrc_v4[i] && priv->mcast_addr_v4) {
+ GST_DEBUG_OBJECT (stream, "mcast IPv4, create and configure udpsources");
+ if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v4[i],
+ priv->mcast_socket_v4[i]))
+ goto mcast_udpsrc_error;
plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]);
+ }
- if (priv->mcast_udpsrc_v6[i])
+ if (mcast && !priv->mcast_udpsrc_v6[i] && priv->mcast_addr_v6) {
+ GST_DEBUG_OBJECT (stream, "mcast IPv6, create and configure udpsources");
+ if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v6[i],
+ priv->mcast_socket_v6[i]))
+ goto mcast_udpsrc_error;
plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]);
+ }
- if (is_tcp) {
+ if (tcp && !priv->appsrc[i]) {
/* make and add appsrc */
priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
priv->appsrc_base_time[i] = -1;
plug_src (stream, bin, priv->appsrc[i], priv->funnel[i]);
}
- /* check if we need to set to a special state */
- if (state != GST_STATE_NULL) {
- gst_element_set_state (priv->funnel[i], state);
- }
+ gst_element_sync_state_with_parent (priv->funnel[i]);
}
+
+ return TRUE;
+
+mcast_udpsrc_error:
+udpsrc_error:
+ return FALSE;
}
static gboolean
GST_INFO ("stream %p joining bin as session %u", stream, idx);
- if (!alloc_ports (stream))
- goto no_ports;
-
if (priv->profiles & GST_RTSP_PROFILE_SAVP
|| priv->profiles & GST_RTSP_PROFILE_SAVPF) {
/* For SRTP */
priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
g_free (name);
} else {
- /* Need to connect our sinkpad from here */
+ /* RECORD case: need to connect our sinkpad from here */
g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
/* EOS */
g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
g_signal_connect (priv->session, "on-sender-ssrc-active",
(GCallback) on_sender_ssrc_active, stream);
- create_sender_part (stream, bin, state);
- create_receiver_part (stream, bin, state);
-
if (priv->srcpad) {
/* be notified of caps changes */
priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
}
priv->joined_bin = bin;
+ GST_DEBUG_OBJECT (stream, "successfully joined bin");
g_mutex_unlock (&priv->lock);
return TRUE;
g_mutex_unlock (&priv->lock);
return TRUE;
}
-no_ports:
- {
- g_mutex_unlock (&priv->lock);
- GST_WARNING ("failed to allocate ports %u", idx);
- return FALSE;
- }
link_failed:
{
GST_WARNING ("failed to link stream %u", idx);
return GST_PAD_PROBE_OK;
}
-/**
- * gst_rtsp_stream_set_blocked:
- * @stream: a #GstRTSPStream
- * @blocked: boolean indicating we should block or unblock
- *
- * Blocks or unblocks the dataflow on @stream.
- *
- * Returns: %TRUE on success
- */
-gboolean
-gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
+static void
+set_blocked (GstRTSPStream * stream, gboolean blocked)
{
GstRTSPStreamPrivate *priv;
int i;
- g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
+ GST_DEBUG_OBJECT (stream, "blocked: %d", blocked);
priv = stream->priv;
- g_mutex_lock (&priv->lock);
if (blocked) {
priv->blocking = FALSE;
for (i = 0; i < 2; i++) {
- if (priv->blocked_id[i] == 0) {
+ if (priv->blocked_id[i] != 0)
+ continue;
+ if (priv->send_src[i]) {
priv->blocked_id[i] = gst_pad_add_probe (priv->send_src[i],
GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
}
priv->blocking = FALSE;
}
+}
+
+/**
+ * gst_rtsp_stream_set_blocked:
+ * @stream: a #GstRTSPStream
+ * @blocked: boolean indicating we should block or unblock
+ *
+ * Blocks or unblocks the dataflow on @stream.
+ *
+ * Returns: %TRUE on success
+ */
+gboolean
+gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
+{
+ GstRTSPStreamPrivate *priv;
+
+ g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
+
+ priv = stream->priv;
+ g_mutex_lock (&priv->lock);
+ set_blocked (stream, blocked);
+ g_mutex_unlock (&priv->lock);
+
+ return TRUE;
+}
+
+/**
+ * gst_rtsp_stream_ublock_linked:
+ * @stream: a #GstRTSPStream
+ *
+ * Unblocks the dataflow on @stream if it is linked.
+ *
+ * Returns: %TRUE on success
+ */
+gboolean
+gst_rtsp_stream_unblock_linked (GstRTSPStream * stream)
+{
+ GstRTSPStreamPrivate *priv;
+
+ g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
+
+ priv = stream->priv;
+ g_mutex_lock (&priv->lock);
+ if (priv->send_src[0] && gst_pad_is_linked (priv->send_src[0]))
+ set_blocked (stream, FALSE);
g_mutex_unlock (&priv->lock);
return TRUE;
return TRUE;
}
+
+/**
+ * gst_rtsp_stream_complete_stream:
+ * @stream: a #GstRTSPStream
+ * @transport: a #GstRTSPTransport
+ *
+ * Add a receiver and sender part to the pipeline based on the transport from
+ * SETUP.
+ *
+ * Returns: %TRUE if the pipeline has been sucessfully updated.
+ */
+gboolean
+gst_rtsp_stream_complete_stream (GstRTSPStream * stream,
+ const GstRTSPTransport * transport)
+{
+ GstRTSPStreamPrivate *priv;
+
+ g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
+
+ priv = stream->priv;
+ GST_DEBUG_OBJECT (stream, "complete stream");
+
+ g_mutex_lock (&priv->lock);
+
+ if (!(priv->protocols & transport->lower_transport))
+ goto unallowed_transport;
+
+ if (!create_receiver_part (stream, transport))
+ goto create_receiver_error;
+
+ /* in the RECORD case, we only add RTCP sender part */
+ if (!create_sender_part (stream, transport))
+ goto create_sender_error;
+
+ g_mutex_unlock (&priv->lock);
+
+ GST_DEBUG_OBJECT (stream, "pipeline sucsessfully updated");
+ return TRUE;
+
+create_receiver_error:
+create_sender_error:
+unallowed_transport:
+ {
+ g_mutex_unlock (&priv->lock);
+ return FALSE;
+ }
+}
}
static void
+do_test_play_tcp_full (const gchar * range)
+{
+ GstRTSPConnection *conn;
+ GstSDPMessage *sdp_message = NULL;
+ const GstSDPMedia *sdp_media;
+ const gchar *video_control;
+ const gchar *audio_control;
+ GstRTSPRange client_port;
+ gchar *session = NULL;
+ GstRTSPTransport *video_transport = NULL;
+ GstRTSPTransport *audio_transport = NULL;
+ gchar *range_out = NULL;
+ GstRTSPLowerTrans lower_transport = GST_RTSP_LOWER_TRANS_TCP;
+
+ conn = connect_to_server (test_port, TEST_MOUNT_POINT);
+
+ sdp_message = do_describe (conn, TEST_MOUNT_POINT);
+ get_client_ports (&client_port);
+
+ /* get control strings from DESCRIBE response */
+ fail_unless (gst_sdp_message_medias_len (sdp_message) == 2);
+ sdp_media = gst_sdp_message_get_media (sdp_message, 0);
+ video_control = gst_sdp_media_get_attribute_val (sdp_media, "control");
+ sdp_media = gst_sdp_message_get_media (sdp_message, 1);
+ audio_control = gst_sdp_media_get_attribute_val (sdp_media, "control");
+
+ /* do SETUP for video and audio */
+ fail_unless (do_setup_full (conn, video_control, lower_transport,
+ &client_port, NULL, &session, &video_transport,
+ NULL) == GST_RTSP_STS_OK);
+ fail_unless (do_setup_full (conn, audio_control, lower_transport,
+ &client_port, NULL, &session, &audio_transport,
+ NULL) == GST_RTSP_STS_OK);
+
+ /* send PLAY request and check that we get 200 OK */
+ fail_unless (do_request (conn, GST_RTSP_PLAY, NULL, session, NULL, range,
+ NULL, NULL, NULL, NULL, NULL, &range_out) == GST_RTSP_STS_OK);
+
+ if (range)
+ fail_unless_equals_string (range, range_out);
+ g_free (range_out);
+
+ {
+ GstRTSPMessage *message;
+ fail_unless (gst_rtsp_message_new (&message) == GST_RTSP_OK);
+ fail_unless (gst_rtsp_connection_receive (conn, message, NULL) == GST_RTSP_OK);
+ fail_unless (gst_rtsp_message_get_type (message) == GST_RTSP_MESSAGE_DATA);
+ gst_rtsp_message_free (message);
+ }
+
+ /* send TEARDOWN request and check that we get 200 OK */
+ fail_unless (do_simple_request (conn, GST_RTSP_TEARDOWN,
+ session) == GST_RTSP_STS_OK);
+
+ /* FIXME: The rtsp-server always disconnects the transport before
+ * sending the RTCP BYE
+ * receive_rtcp (rtcp_socket, NULL, GST_RTCP_TYPE_BYE);
+ */
+
+ /* clean up and iterate so the clean-up can finish */
+ g_free (session);
+ gst_rtsp_transport_free (video_transport);
+ gst_rtsp_transport_free (audio_transport);
+ gst_sdp_message_free (sdp_message);
+ gst_rtsp_connection_free (conn);
+}
+
+static void
do_test_play_full (const gchar * range, GstRTSPLowerTrans lower_transport,
GMutex * lock)
{
GST_END_TEST;
+/* media contains two streams: video and audio but only one
+ * stream is requested */
+GST_START_TEST (test_play_one_active_stream)
+{
+ GstRTSPConnection *conn;
+ GstSDPMessage *sdp_message = NULL;
+ const GstSDPMedia *sdp_media;
+ const gchar *video_control;
+ GstRTSPRange client_port;
+ gchar *session = NULL;
+ GstRTSPTransport *video_transport = NULL;
+ GstRTSPSessionPool *pool;
+ GstRTSPThreadPool *thread_pool;
+
+ thread_pool = gst_rtsp_server_get_thread_pool (server);
+ gst_rtsp_thread_pool_set_max_threads (thread_pool, 2);
+ g_object_unref (thread_pool);
+
+ pool = gst_rtsp_server_get_session_pool (server);
+ g_signal_connect (server, "client-connected",
+ G_CALLBACK (session_connected_new_session_cb), new_session_timeout_one);
+
+ start_server (FALSE);
+
+ conn = connect_to_server (test_port, TEST_MOUNT_POINT);
+
+ gst_rtsp_connection_set_remember_session_id (conn, FALSE);
+
+ sdp_message = do_describe (conn, TEST_MOUNT_POINT);
+
+ /* get control strings from DESCRIBE response */
+ fail_unless (gst_sdp_message_medias_len (sdp_message) == 2);
+ sdp_media = gst_sdp_message_get_media (sdp_message, 0);
+ video_control = gst_sdp_media_get_attribute_val (sdp_media, "control");
+
+ get_client_ports (&client_port);
+
+ /* do SETUP for video only */
+ fail_unless (do_setup (conn, video_control, &client_port, &session,
+ &video_transport) == GST_RTSP_STS_OK);
+
+ fail_unless (gst_rtsp_session_pool_get_n_sessions (pool) == 1);
+
+ /* send PLAY request and check that we get 200 OK */
+ fail_unless (do_simple_request (conn, GST_RTSP_PLAY,
+ session) == GST_RTSP_STS_OK);
+
+
+ /* send TEARDOWN request */
+ fail_unless (do_simple_request (conn, GST_RTSP_TEARDOWN,
+ session) == GST_RTSP_STS_OK);
+
+ /* clean up and iterate so the clean-up can finish */
+ g_object_unref (pool);
+ g_free (session);
+ gst_rtsp_transport_free (video_transport);
+ gst_sdp_message_free (sdp_message);
+ gst_rtsp_connection_free (conn);
+
+ stop_server ();
+ iterate ();
+}
+
+GST_END_TEST;
GST_START_TEST (test_play_disconnect)
{
GST_END_TEST;
+GST_START_TEST (test_play_smpte_range_tcp)
+{
+ start_tcp_server ();
+
+ do_test_play_tcp_full ("npt=5-");
+ do_test_play_tcp_full ("smpte=0:00:00-");
+ do_test_play_tcp_full ("smpte=1:00:00-");
+ do_test_play_tcp_full ("smpte=1:00:03-");
+ do_test_play_tcp_full ("clock=20120321T152256Z-");
+
+ stop_server ();
+ iterate ();
+}
+
+GST_END_TEST;
+
static gpointer
thread_func (gpointer data)
{
GST_END_TEST;
+static void
+do_test_multiple_transports (GstRTSPLowerTrans trans1, GstRTSPLowerTrans trans2)
+{
+ GstRTSPConnection *conn1;
+ GstRTSPConnection *conn2;
+ GstSDPMessage *sdp_message1 = NULL;
+ GstSDPMessage *sdp_message2 = NULL;
+ const GstSDPMedia *sdp_media;
+ const gchar *video_control;
+ const gchar *audio_control;
+ GstRTSPRange client_port1, client_port2;
+ gchar *session1 = NULL;
+ gchar *session2 = NULL;
+ GstRTSPTransport *video_transport = NULL;
+ GstRTSPTransport *audio_transport = NULL;
+ GSocket *rtp_socket, *rtcp_socket;
+
+ conn1 = connect_to_server (test_port, TEST_MOUNT_POINT);
+ conn2 = connect_to_server (test_port, TEST_MOUNT_POINT);
+
+ sdp_message1 = do_describe (conn1, TEST_MOUNT_POINT);
+
+ get_client_ports_full (&client_port1, &rtp_socket, &rtcp_socket);
+ /* get control strings from DESCRIBE response */
+ sdp_media = gst_sdp_message_get_media (sdp_message1, 0);
+ video_control = gst_sdp_media_get_attribute_val (sdp_media, "control");
+ sdp_media = gst_sdp_message_get_media (sdp_message1, 1);
+ audio_control = gst_sdp_media_get_attribute_val (sdp_media, "control");
+
+ /* do SETUP for video and audio */
+ fail_unless (do_setup_full (conn1, video_control, trans1,
+ &client_port1, NULL, &session1, &video_transport,
+ NULL) == GST_RTSP_STS_OK);
+ fail_unless (do_setup_full (conn1, audio_control, trans1,
+ &client_port1, NULL, &session1, &audio_transport,
+ NULL) == GST_RTSP_STS_OK);
+
+ gst_rtsp_transport_free (video_transport);
+ gst_rtsp_transport_free (audio_transport);
+
+ sdp_message2 = do_describe (conn2, TEST_MOUNT_POINT);
+
+ /* get control strings from DESCRIBE response */
+ sdp_media = gst_sdp_message_get_media (sdp_message2, 0);
+ video_control = gst_sdp_media_get_attribute_val (sdp_media, "control");
+ sdp_media = gst_sdp_message_get_media (sdp_message2, 1);
+ audio_control = gst_sdp_media_get_attribute_val (sdp_media, "control");
+
+ get_client_ports_full (&client_port2, NULL, NULL);
+ /* do SETUP for video and audio */
+ fail_unless (do_setup_full (conn2, video_control, trans2,
+ &client_port2, NULL, &session2, &video_transport,
+ NULL) == GST_RTSP_STS_OK);
+ fail_unless (do_setup_full (conn2, audio_control, trans2,
+ &client_port2, NULL, &session2, &audio_transport,
+ NULL) == GST_RTSP_STS_OK);
+
+ /* send PLAY request and check that we get 200 OK */
+ fail_unless (do_request (conn1, GST_RTSP_PLAY, NULL, session1, NULL, NULL,
+ NULL, NULL, NULL, NULL, NULL, NULL) == GST_RTSP_STS_OK);
+ /* send PLAY request and check that we get 200 OK */
+ fail_unless (do_request (conn2, GST_RTSP_PLAY, NULL, session2, NULL, NULL,
+ NULL, NULL, NULL, NULL, NULL, NULL) == GST_RTSP_STS_OK);
+
+
+ /* receive UDP data */
+ receive_rtp (rtp_socket, NULL);
+ receive_rtcp (rtcp_socket, NULL, 0);
+
+ /* receive TCP data */
+ {
+ GstRTSPMessage *message;
+ fail_unless (gst_rtsp_message_new (&message) == GST_RTSP_OK);
+ fail_unless (gst_rtsp_connection_receive (conn2, message, NULL) == GST_RTSP_OK);
+ fail_unless (gst_rtsp_message_get_type (message) == GST_RTSP_MESSAGE_DATA);
+ gst_rtsp_message_free (message);
+ }
+
+ /* send TEARDOWN request and check that we get 200 OK */
+ fail_unless (do_simple_request (conn1, GST_RTSP_TEARDOWN,
+ session1) == GST_RTSP_STS_OK);
+ /* send TEARDOWN request and check that we get 200 OK */
+ fail_unless (do_simple_request (conn2, GST_RTSP_TEARDOWN,
+ session2) == GST_RTSP_STS_OK);
+
+ /* clean up and iterate so the clean-up can finish */
+ g_object_unref (rtp_socket);
+ g_object_unref (rtcp_socket);
+ g_free (session1);
+ g_free (session2);
+ gst_rtsp_transport_free (video_transport);
+ gst_rtsp_transport_free (audio_transport);
+ gst_sdp_message_free (sdp_message1);
+ gst_sdp_message_free (sdp_message2);
+ gst_rtsp_connection_free (conn1);
+ gst_rtsp_connection_free (conn2);
+}
+
+GST_START_TEST (test_multiple_transports)
+{
+ start_server (TRUE);
+ do_test_multiple_transports (GST_RTSP_LOWER_TRANS_UDP, GST_RTSP_LOWER_TRANS_TCP);
+ stop_server ();
+}
+
+GST_END_TEST;
+
static Suite *
rtspserver_suite (void)
{
tcase_add_test (tc, test_play_multithreaded_timeout_client);
tcase_add_test (tc, test_play_multithreaded_timeout_session);
tcase_add_test (tc, test_no_session_timeout);
+ tcase_add_test (tc, test_play_one_active_stream);
tcase_add_test (tc, test_play_disconnect);
tcase_add_test (tc, test_play_specific_server_port);
tcase_add_test (tc, test_play_smpte_range);
+ tcase_add_test (tc, test_play_smpte_range_tcp);
tcase_add_test (tc, test_shared);
tcase_add_test (tc, test_announce_without_sdp);
tcase_add_test (tc, test_record_tcp);
+ tcase_add_test (tc, test_multiple_transports);
+
return s;
}