Limit queued TCP data messages to one per stream
authorDavid Svensson Fors <davidsf@axis.com>
Thu, 28 Jun 2018 09:22:21 +0000 (11:22 +0200)
committerSebastian Dröge <sebastian@centricular.com>
Mon, 23 Jul 2018 14:45:00 +0000 (17:45 +0300)
Before, the watch backlog size in GstRTSPClient was changed
dynamically between unlimited and a fixed size, trying to avoid both
unlimited memory usage and deadlocks while waiting for place in the
queue. (Some of the deadlocks were described in a long comment in
handle_request().)

In the previous commit, we changed to a fixed backlog size of 100.
This is possible, because we now handle RTP/RTCP data messages differently
from RTSP request/response messages.

The data messages are messages tunneled over TCP. We allow at most one
queued data message per stream in GstRTSPClient at a time, and
successfully sent data messages are acked by sending a "message-sent"
callback from the GstStreamTransport. Until that ack comes, the
GstRTSPStream does not call pull_sample() on its appsink, and
therefore the streaming thread in the pipeline will not be blocked
inside GstRTSPClient, waiting for a place in the queue.

pull_sample() is called when we have both an ack and a "new-sample"
signal from the appsink. Then, we know there is a buffer to write.

RTSP request/response messages are not acked in the same way as data
messages. The rest of the 100 places in the queue are used for
them. If the queue becomes full of request/response messages, we
return an error and close the connection to the client.

Change-Id: I275310bc90a219ceb2473c098261acc78be84c97

gst/rtsp-server/rtsp-client.c
gst/rtsp-server/rtsp-stream-transport.c
gst/rtsp-server/rtsp-stream-transport.h
gst/rtsp-server/rtsp-stream.c

index a48e918..1b33f03 100644 (file)
@@ -70,13 +70,15 @@ struct _GstRTSPClientPrivate
   GstRTSPConnection *connection;
   GstRTSPWatch *watch;
   GMainContext *watch_context;
-  guint close_seq;
   gchar *server_ip;
   gboolean is_ipv6;
 
-  GstRTSPClientSendFunc send_func;      /* protected by send_lock */
-  gpointer send_data;           /* protected by send_lock */
-  GDestroyNotify send_notify;   /* protected by send_lock */
+  /* protected by send_lock */
+  GstRTSPClientSendFunc send_func;
+  gpointer send_data;
+  GDestroyNotify send_notify;
+  guint close_seq;
+  GArray *data_seqs;
 
   GstRTSPSessionPool *session_pool;
   gulong session_removed_id;
@@ -105,11 +107,15 @@ struct _GstRTSPClientPrivate
   GstRTSPTunnelState tstate;
 };
 
+typedef struct
+{
+  guint8 channel;
+  guint seq;
+} DataSeq;
+
 static GMutex tunnels_lock;
 static GHashTable *tunnels;     /* protected by tunnels_lock */
 
-/* FIXME make this configurable. We don't want to do this yet because it will
- * be superceeded by a cache object later */
 #define WATCH_BACKLOG_SIZE              100
 
 #define DEFAULT_SESSION_POOL            NULL
@@ -587,6 +593,7 @@ gst_rtsp_client_init (GstRTSPClient * client)
   g_mutex_init (&priv->send_lock);
   g_mutex_init (&priv->watch_lock);
   priv->close_seq = 0;
+  priv->data_seqs = g_array_new (FALSE, FALSE, sizeof (DataSeq));
   priv->drop_backlog = DEFAULT_DROP_BACKLOG;
   priv->transports =
       g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
@@ -754,6 +761,7 @@ gst_rtsp_client_finalize (GObject * obj)
   g_assert (priv->sessions == NULL);
   g_assert (priv->session_removed_id == 0);
 
+  g_array_unref (priv->data_seqs);
   g_hash_table_unref (priv->transports);
   g_hash_table_unref (priv->pipelined_requests);
 
@@ -1055,6 +1063,82 @@ no_prepare:
   }
 }
 
+static inline DataSeq *
+get_data_seq_element (GstRTSPClient * client, guint8 channel)
+{
+  GstRTSPClientPrivate *priv = client->priv;
+  GArray *data_seqs = priv->data_seqs;
+  gint i = 0;
+
+  while (i < data_seqs->len) {
+    DataSeq *data_seq = &g_array_index (data_seqs, DataSeq, i);
+    if (data_seq->channel == channel)
+      return data_seq;
+    i++;
+  }
+
+  return NULL;
+}
+
+static void
+add_data_seq (GstRTSPClient * client, guint8 channel)
+{
+  GstRTSPClientPrivate *priv = client->priv;
+  DataSeq data_seq = {.channel = channel,.seq = 0 };
+
+  if (get_data_seq_element (client, channel) == NULL)
+    g_array_append_val (priv->data_seqs, data_seq);
+}
+
+static void
+set_data_seq (GstRTSPClient * client, guint8 channel, guint seq)
+{
+  DataSeq *data_seq;
+
+  data_seq = get_data_seq_element (client, channel);
+  g_assert_nonnull (data_seq);
+  data_seq->seq = seq;
+}
+
+static guint
+get_data_seq (GstRTSPClient * client, guint8 channel)
+{
+  DataSeq *data_seq;
+
+  data_seq = get_data_seq_element (client, channel);
+  g_assert_nonnull (data_seq);
+  return data_seq->seq;
+}
+
+static gboolean
+get_data_channel (GstRTSPClient * client, guint seq, guint8 * channel)
+{
+  GstRTSPClientPrivate *priv = client->priv;
+  GArray *data_seqs = priv->data_seqs;
+  gint i = 0;
+
+  while (i < data_seqs->len) {
+    DataSeq *data_seq = &g_array_index (data_seqs, DataSeq, i);
+    if (data_seq->seq == seq) {
+      *channel = data_seq->channel;
+      return TRUE;
+    }
+    i++;
+  }
+
+  return FALSE;
+}
+
+static gboolean
+do_close (gpointer user_data)
+{
+  GstRTSPClient *client = user_data;
+
+  gst_rtsp_client_close (client);
+
+  return G_SOURCE_REMOVE;
+}
+
 static gboolean
 do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client)
 {
@@ -1074,6 +1158,11 @@ do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client)
   gst_rtsp_message_take_body (&message, map_info.data, map_info.size);
 
   g_mutex_lock (&priv->send_lock);
+  if (get_data_seq (client, channel) != 0) {
+    GST_WARNING ("already a queued data message for channel %d", channel);
+    g_mutex_unlock (&priv->send_lock);
+    return FALSE;
+  }
   if (priv->send_func)
     ret = priv->send_func (client, &message, FALSE, priv->send_data);
   g_mutex_unlock (&priv->send_lock);
@@ -1083,6 +1172,16 @@ do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client)
 
   gst_rtsp_message_unset (&message);
 
+  if (!ret) {
+    GSource *idle_src;
+
+    /* close in watch context */
+    idle_src = g_idle_source_new ();
+    g_source_set_callback (idle_src, do_close, client, NULL);
+    g_source_attach (idle_src, priv->watch_context);
+    g_source_unref (idle_src);
+  }
+
   return ret;
 }
 
@@ -2355,6 +2454,8 @@ handle_setup_request (GstRTSPClient * client, GstRTSPContext * ctx)
     g_hash_table_insert (priv->transports,
         GINT_TO_POINTER (ct->interleaved.max), trans);
     g_object_ref (trans);
+    add_data_seq (client, ct->interleaved.min);
+    add_data_seq (client, ct->interleaved.max);
   }
 
   /* create and serialize the server transport */
@@ -4058,33 +4159,48 @@ do_send_message (GstRTSPClient * client, GstRTSPMessage * message,
     gboolean close, gpointer user_data)
 {
   GstRTSPClientPrivate *priv = client->priv;
+  guint id = 0;
   GstRTSPResult ret;
-  GTimeVal time;
 
-  time.tv_sec = 1;
-  time.tv_usec = 0;
+  /* send the message */
+  ret = gst_rtsp_watch_send_message (priv->watch, message, &id);
+  if (ret != GST_RTSP_OK)
+    goto error;
 
-  do {
-    /* send the response and store the seq number so we can wait until it's
-     * written to the client to close the connection */
-    ret =
-        gst_rtsp_watch_send_message (priv->watch, message,
-        close ? &priv->close_seq : NULL);
-    if (ret == GST_RTSP_OK)
-      break;
+  /* if close flag is set, store the seq number so we can wait until it's
+   * written to the client to close the connection */
+  if (close)
+    priv->close_seq = id;
 
-    if (ret != GST_RTSP_ENOMEM)
-      goto error;
+  if (gst_rtsp_message_get_type (message) == GST_RTSP_MESSAGE_DATA) {
+    guint8 channel = 0;
+    GstRTSPResult r;
 
-    /* drop backlog */
-    if (priv->drop_backlog)
-      break;
+    r = gst_rtsp_message_parse_data (message, &channel);
+    if (r != GST_RTSP_OK) {
+      ret = r;
+      goto error;
+    }
 
-    /* queue was full, wait for more space */
-    GST_DEBUG_OBJECT (client, "waiting for backlog");
-    ret = gst_rtsp_watch_wait_backlog (priv->watch, &time);
-    GST_DEBUG_OBJECT (client, "Resend due to backlog full");
-  } while (ret != GST_RTSP_EINTR);
+    /* check if the message has been queued for transmission in watch */
+    if (id) {
+      /* store the seq number so we can wait until it has been sent */
+      GST_DEBUG_OBJECT (client, "wait for message %d, channel %d", id, channel);
+      set_data_seq (client, channel, id);
+    } else {
+      GstRTSPStreamTransport *trans;
+
+      trans =
+          g_hash_table_lookup (priv->transports,
+          GINT_TO_POINTER ((gint) channel));
+      if (trans) {
+        GST_DEBUG_OBJECT (client, "emit 'message-sent' signal");
+        g_mutex_unlock (&priv->send_lock);
+        gst_rtsp_stream_transport_message_sent (trans);
+        g_mutex_lock (&priv->send_lock);
+      }
+    }
+  }
 
   return ret == GST_RTSP_OK;
 
@@ -4092,7 +4208,7 @@ do_send_message (GstRTSPClient * client, GstRTSPMessage * message,
 error:
   {
     GST_DEBUG_OBJECT (client, "got error %d", ret);
-    return ret == GST_RTSP_OK;
+    return FALSE;
   }
 }
 
@@ -4108,13 +4224,33 @@ message_sent (GstRTSPWatch * watch, guint cseq, gpointer user_data)
 {
   GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
   GstRTSPClientPrivate *priv = client->priv;
+  GstRTSPStreamTransport *trans = NULL;
+  guint8 channel = 0;
+  gboolean close = FALSE;
+
+  g_mutex_lock (&priv->send_lock);
+
+  if (get_data_channel (client, cseq, &channel)) {
+    trans = g_hash_table_lookup (priv->transports, GINT_TO_POINTER (channel));
+    set_data_seq (client, channel, 0);
+  }
 
   if (priv->close_seq && priv->close_seq == cseq) {
     GST_INFO ("client %p: send close message", client);
+    close = TRUE;
     priv->close_seq = 0;
-    gst_rtsp_client_close (client);
   }
 
+  g_mutex_unlock (&priv->send_lock);
+
+  if (trans) {
+    GST_DEBUG_OBJECT (client, "emit 'message-sent' signal");
+    gst_rtsp_stream_transport_message_sent (trans);
+  }
+
+  if (close)
+    gst_rtsp_client_close (client);
+
   return GST_RTSP_OK;
 }
 
index 7a6ba12..99af1ae 100644 (file)
@@ -35,6 +35,9 @@
  * is received from the client. It will also call
  * gst_rtsp_stream_transport_set_timed_out() when a receiver has timed out.
  *
+ * A #GstRTSPClient will call gst_rtsp_stream_transport_message_sent() when it
+ * has sent a data message for the transport.
+ *
  * Last reviewed on 2013-07-16 (1.0.0)
  */
 
@@ -58,6 +61,10 @@ struct _GstRTSPStreamTransportPrivate
   gboolean active;
   gboolean timed_out;
 
+  GstRTSPMessageSentFunc message_sent;
+  gpointer ms_user_data;
+  GDestroyNotify ms_notify;
+
   GstRTSPTransport *transport;
   GstRTSPUrl *url;
 
@@ -109,6 +116,7 @@ gst_rtsp_stream_transport_finalize (GObject * obj)
   /* remove callbacks now */
   gst_rtsp_stream_transport_set_callbacks (trans, NULL, NULL, NULL, NULL);
   gst_rtsp_stream_transport_set_keepalive (trans, NULL, NULL, NULL);
+  gst_rtsp_stream_transport_set_message_sent (trans, NULL, NULL, NULL);
 
   if (priv->stream)
     g_object_unref (priv->stream);
@@ -223,6 +231,33 @@ gst_rtsp_stream_transport_set_keepalive (GstRTSPStreamTransport * trans,
   priv->ka_notify = notify;
 }
 
+/**
+ * gst_rtsp_stream_transport_set_message_sent:
+ * @trans: a #GstRTSPStreamTransport
+ * @message_sent: (scope notified): a callback called when a message has been sent
+ * @user_data: (closure): user data passed to callback
+ * @notify: (allow-none): called with the user_data when no longer needed
+ *
+ * Install a callback that will be called when a message has been sent on @trans.
+ */
+void
+gst_rtsp_stream_transport_set_message_sent (GstRTSPStreamTransport * trans,
+    GstRTSPMessageSentFunc message_sent, gpointer user_data,
+    GDestroyNotify notify)
+{
+  GstRTSPStreamTransportPrivate *priv;
+
+  g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
+
+  priv = trans->priv;
+
+  priv->message_sent = message_sent;
+  if (priv->ms_notify)
+    priv->ms_notify (priv->ms_user_data);
+  priv->ms_user_data = user_data;
+  priv->ms_notify = notify;
+}
+
 
 /**
  * gst_rtsp_stream_transport_set_transport:
@@ -511,6 +546,23 @@ gst_rtsp_stream_transport_keep_alive (GstRTSPStreamTransport * trans)
 }
 
 /**
+ * gst_rtsp_stream_transport_message_sent:
+ * @trans: a #GstRTSPStreamTransport
+ *
+ * Signal the installed message_sent callback for @trans.
+ */
+void
+gst_rtsp_stream_transport_message_sent (GstRTSPStreamTransport * trans)
+{
+  GstRTSPStreamTransportPrivate *priv;
+
+  priv = trans->priv;
+
+  if (priv->message_sent)
+    priv->message_sent (priv->ms_user_data);
+}
+
+/**
  * gst_rtsp_stream_transport_recv_data:
  * @trans: a #GstRTSPStreamTransport
  * @channel: a channel
index f608b8a..48c565a 100644 (file)
@@ -66,6 +66,15 @@ typedef gboolean (*GstRTSPSendFunc)      (GstBuffer *buffer, guint8 channel, gpo
 typedef void     (*GstRTSPKeepAliveFunc) (gpointer user_data);
 
 /**
+ * GstRTSPMessageSentFunc:
+ * @user_data: user data
+ *
+ * Function registered with gst_rtsp_stream_transport_set_message_sent()
+ * and called when a message has been sent on the transport.
+ */
+typedef void     (*GstRTSPMessageSentFunc) (gpointer user_data);
+
+/**
  * GstRTSPStreamTransport:
  * @parent: parent instance
  *
@@ -129,9 +138,18 @@ void                     gst_rtsp_stream_transport_set_keepalive (GstRTSPStreamT
                                                                   GDestroyNotify  notify);
 
 GST_RTSP_SERVER_API
+void                     gst_rtsp_stream_transport_set_message_sent (GstRTSPStreamTransport *trans,
+                                                                  GstRTSPMessageSentFunc message_sent,
+                                                                  gpointer user_data,
+                                                                  GDestroyNotify  notify);
+
+GST_RTSP_SERVER_API
 void                     gst_rtsp_stream_transport_keep_alive    (GstRTSPStreamTransport *trans);
 
 GST_RTSP_SERVER_API
+void                     gst_rtsp_stream_transport_message_sent  (GstRTSPStreamTransport *trans);
+
+GST_RTSP_SERVER_API
 gboolean                 gst_rtsp_stream_transport_set_active    (GstRTSPStreamTransport *trans,
                                                                   gboolean active);
 
index 9537357..bc6917d 100644 (file)
@@ -158,6 +158,9 @@ struct _GstRTSPStreamPrivate
   GList *tr_cache_rtcp;
   guint tr_cache_cookie_rtp;
   guint tr_cache_cookie_rtcp;
+  guint n_tcp_transports;
+  gboolean have_buffer[2];
+  guint n_outstanding;
 
   gint dscp_qos;
 
@@ -208,6 +211,10 @@ static void gst_rtsp_stream_set_property (GObject * object, guint propid,
 
 static void gst_rtsp_stream_finalize (GObject * obj);
 
+static gboolean
+update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
+    gboolean add);
+
 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
 
 G_DEFINE_TYPE_WITH_PRIVATE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
@@ -2115,32 +2122,52 @@ clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
   }
 }
 
-static GstFlowReturn
-handle_new_sample (GstAppSink * sink, gpointer user_data)
+static void
+send_tcp_message (GstRTSPStream * stream, gint idx)
 {
-  GstRTSPStreamPrivate *priv;
+  GstRTSPStreamPrivate *priv = stream->priv;
+  GstAppSink *sink;
   GList *walk;
   GstSample *sample;
   GstBuffer *buffer;
-  GstRTSPStream *stream;
   gboolean is_rtp;
 
+  g_mutex_lock (&priv->lock);
+
+  if (priv->n_outstanding > 0 || !priv->have_buffer[idx]) {
+    g_mutex_unlock (&priv->lock);
+    return;
+  }
+
+  priv->have_buffer[idx] = FALSE;
+
+  if (priv->appsink[idx] == NULL) {
+    /* session expired */
+    g_mutex_unlock (&priv->lock);
+    return;
+  }
+
+  sink = GST_APP_SINK (priv->appsink[idx]);
   sample = gst_app_sink_pull_sample (sink);
-  if (!sample)
-    return GST_FLOW_OK;
+  if (!sample) {
+    g_mutex_unlock (&priv->lock);
+    return;
+  }
 
-  stream = (GstRTSPStream *) user_data;
-  priv = stream->priv;
   buffer = gst_sample_get_buffer (sample);
 
-  is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
+  is_rtp = (idx == 0);
 
-  g_mutex_lock (&priv->lock);
   if (is_rtp) {
     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
       clear_tr_cache (priv, is_rtp);
       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
+        const GstRTSPTransport *t = gst_rtsp_stream_transport_get_transport (tr);
+
+        if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP)
+          continue;
+
         priv->tr_cache_rtp =
             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
       }
@@ -2151,26 +2178,72 @@ handle_new_sample (GstAppSink * sink, gpointer user_data)
       clear_tr_cache (priv, is_rtp);
       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
+        const GstRTSPTransport *t = gst_rtsp_stream_transport_get_transport (tr);
+
+        if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP)
+          continue;
+
         priv->tr_cache_rtcp =
             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
       }
       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
     }
   }
+
+  priv->n_outstanding += priv->n_tcp_transports;
+
   g_mutex_unlock (&priv->lock);
 
   if (is_rtp) {
     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
-      gst_rtsp_stream_transport_send_rtp (tr, buffer);
+      if (!gst_rtsp_stream_transport_send_rtp (tr, buffer)) {
+        /* remove transport on send error */
+        g_mutex_lock (&priv->lock);
+        priv->n_outstanding--;
+        update_transport (stream, tr, FALSE);
+        g_mutex_unlock (&priv->lock);
+      }
     }
   } else {
     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
-      gst_rtsp_stream_transport_send_rtcp (tr, buffer);
+      if (!gst_rtsp_stream_transport_send_rtcp (tr, buffer)) {
+        /* remove transport on send error */
+        g_mutex_lock (&priv->lock);
+        priv->n_outstanding--;
+        update_transport (stream, tr, FALSE);
+        g_mutex_unlock (&priv->lock);
+      }
     }
   }
   gst_sample_unref (sample);
+}
+
+static GstFlowReturn
+handle_new_sample (GstAppSink * sink, gpointer user_data)
+{
+  GstRTSPStream *stream = user_data;
+  GstRTSPStreamPrivate *priv = stream->priv;
+  int i;
+  int idx = -1;
+
+  g_mutex_lock (&priv->lock);
+
+  for (i = 0; i < 2; i++)
+    if (GST_ELEMENT_CAST (sink) == priv->appsink[i]) {
+      priv->have_buffer[i] = TRUE;
+      if (priv->n_outstanding == 0) {
+        /* send message */
+        idx = i;
+      }
+      break;
+    }
+
+  g_mutex_unlock (&priv->lock);
+
+  if (idx != -1)
+    send_tcp_message (stream, idx);
 
   return GST_FLOW_OK;
 }
@@ -2971,7 +3044,8 @@ create_sender_part (GstRTSPStream * stream, const GstRTSPTransport * transport)
     } else if (is_tcp && !priv->appsink[i]) {
       /* make appsink */
       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
-      g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
+      g_object_set (priv->appsink[i], "emit-signals", FALSE, "max-buffers", 1,
+          NULL);
 
       /* we need to set sync and preroll to FALSE for the sink to avoid
        * deadlock. This is only needed for sink sending RTCP data. */
@@ -3913,9 +3987,11 @@ update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
       if (add) {
         GST_INFO ("adding TCP %s", tr->destination);
         priv->transports = g_list_prepend (priv->transports, trans);
+        priv->n_tcp_transports++;
       } else {
         GST_INFO ("removing TCP %s", tr->destination);
         priv->transports = g_list_remove (priv->transports, trans);
+        priv->n_tcp_transports--;
       }
       priv->transports_cookie++;
       break;
@@ -3936,6 +4012,51 @@ mcast_error:
   }
 }
 
+static void
+on_message_sent (gpointer user_data)
+{
+  GstRTSPStream *stream = user_data;
+  GstRTSPStreamPrivate *priv = stream->priv;
+  gint idx = -1;
+
+  GST_DEBUG_OBJECT (stream, "message send complete");
+
+  g_mutex_lock (&priv->lock);
+
+  g_assert (priv->n_outstanding >= 0);
+
+  if (priv->n_outstanding == 0)
+    goto no_outstanding;
+
+  priv->n_outstanding--;
+  if (priv->n_outstanding == 0) {
+    gint i;
+
+    /* iterate from 1 and down, so we prioritize RTCP over RTP */
+    for (i = 1; i >= 0; i--) {
+      if (priv->have_buffer[i]) {
+        /* send message */
+        idx = i;
+        break;
+      }
+    }
+  }
+
+  g_mutex_unlock (&priv->lock);
+
+  if (idx != -1)
+    send_tcp_message (stream, idx);
+
+  return;
+
+  /* ERRORS */
+no_outstanding:
+  {
+    GST_INFO ("no outstanding messages");
+    g_mutex_unlock (&priv->lock);
+    return;
+  }
+}
 
 /**
  * gst_rtsp_stream_add_transport:
@@ -3965,6 +4086,9 @@ gst_rtsp_stream_add_transport (GstRTSPStream * stream,
 
   g_mutex_lock (&priv->lock);
   res = update_transport (stream, trans, TRUE);
+  if (res)
+    gst_rtsp_stream_transport_set_message_sent (trans, on_message_sent, stream,
+        NULL);
   g_mutex_unlock (&priv->lock);
 
   return res;