rtspclientsink: Use a mutex for protecting against concurrent send/receives
authorMathieu Duponchelle <mathieu.duponchelle@opencreed.com>
Mon, 18 Sep 2017 17:31:31 +0000 (19:31 +0200)
committerMathieu Duponchelle <mathieu.duponchelle@opencreed.com>
Mon, 18 Sep 2017 17:43:17 +0000 (19:43 +0200)
This is a simple port of:

a722f6e8329032c6eda4865d6a07f4ba5981d7ea
c438545dc9e2f14f657bc0ef261fff726449867b
cd17c71dcea5c9310d21f1347c7520983e5869ac

in gst-plugins-good.

gst/rtsp-sink/gstrtspclientsink.c
gst/rtsp-sink/gstrtspclientsink.h

index b2aaec8..3df5e0d 100644 (file)
@@ -666,6 +666,9 @@ gst_rtsp_client_sink_init (GstRTSPClientSink * sink)
 
   sink->state = GST_RTSP_STATE_INVALID;
 
+  g_mutex_init (&sink->conninfo.send_lock);
+  g_mutex_init (&sink->conninfo.recv_lock);
+
   sink->internal_bin = (GstBin *) gst_bin_new ("rtspbin");
   gst_element_set_locked_state (GST_ELEMENT_CAST (sink->internal_bin), TRUE);
   gst_bin_add (GST_BIN (sink), GST_ELEMENT_CAST (sink->internal_bin));
@@ -714,6 +717,9 @@ gst_rtsp_client_sink_finalize (GObject * object)
   g_rec_mutex_clear (&rtsp_client_sink->stream_rec_lock);
   g_rec_mutex_clear (&rtsp_client_sink->state_rec_lock);
 
+  g_mutex_clear (&rtsp_client_sink->conninfo.send_lock);
+  g_mutex_clear (&rtsp_client_sink->conninfo.recv_lock);
+
   g_mutex_clear (&rtsp_client_sink->send_lock);
 
   g_mutex_clear (&rtsp_client_sink->preroll_lock);
@@ -1137,6 +1143,9 @@ gst_rtsp_client_sink_request_new_pad (GstElement * element,
 
   (void) gst_rtsp_client_sink_get_factories ();
 
+  g_mutex_init (&context->conninfo.send_lock);
+  g_mutex_init (&context->conninfo.recv_lock);
+
   GST_RTSP_STATE_LOCK (sink);
   sink->contexts = g_list_prepend (sink->contexts, context);
   GST_RTSP_STATE_UNLOCK (sink);
@@ -1182,6 +1191,9 @@ gst_rtsp_client_sink_release_pad (GstElement * element, GstPad * pad)
   g_free (context->conninfo.location);
   context->conninfo.location = NULL;
 
+  g_mutex_clear (&context->conninfo.send_lock);
+  g_mutex_clear (&context->conninfo.recv_lock);
+
   g_free (context);
 
   gst_element_remove_pad (element, pad);
@@ -1603,28 +1615,34 @@ gst_rtsp_client_sink_cleanup (GstRTSPClientSink * sink)
 
 static GstRTSPResult
 gst_rtsp_client_sink_connection_send (GstRTSPClientSink * sink,
-    GstRTSPConnection * conn, GstRTSPMessage * message, GTimeVal * timeout)
+    GstRTSPConnInfo * conninfo, GstRTSPMessage * message, GTimeVal * timeout)
 {
   GstRTSPResult ret;
 
-  if (conn)
-    ret = gst_rtsp_connection_send (conn, message, timeout);
-  else
+  if (conninfo->connection) {
+    g_mutex_lock (&conninfo->send_lock);
+    ret = gst_rtsp_connection_send (conninfo->connection, message, timeout);
+    g_mutex_unlock (&conninfo->send_lock);
+  } else {
     ret = GST_RTSP_ERROR;
+  }
 
   return ret;
 }
 
 static GstRTSPResult
 gst_rtsp_client_sink_connection_receive (GstRTSPClientSink * sink,
-    GstRTSPConnection * conn, GstRTSPMessage * message, GTimeVal * timeout)
+    GstRTSPConnInfo * conninfo, GstRTSPMessage * message, GTimeVal * timeout)
 {
   GstRTSPResult ret;
 
-  if (conn)
-    ret = gst_rtsp_connection_receive (conn, message, timeout);
-  else
+  if (conninfo->connection) {
+    g_mutex_lock (&conninfo->recv_lock);
+    ret = gst_rtsp_connection_receive (conninfo->connection, message, timeout);
+    g_mutex_unlock (&conninfo->recv_lock);
+  } else {
     ret = GST_RTSP_ERROR;
+  }
 
   return ret;
 }
@@ -1793,7 +1811,7 @@ gst_rtsp_client_sink_init_request (GstRTSPClientSink * sink,
 /* FIXME, handle server request, reply with OK, for now */
 static GstRTSPResult
 gst_rtsp_client_sink_handle_request (GstRTSPClientSink * sink,
-    GstRTSPConnection * conn, GstRTSPMessage * request)
+    GstRTSPConnInfo * conninfo, GstRTSPMessage * request)
 {
   GstRTSPMessage response = { 0 };
   GstRTSPResult res;
@@ -1818,7 +1836,7 @@ gst_rtsp_client_sink_handle_request (GstRTSPClientSink * sink,
   if (sink->debug)
     gst_rtsp_message_dump (&response);
 
-  res = gst_rtsp_client_sink_connection_send (sink, conn, &response, NULL);
+  res = gst_rtsp_client_sink_connection_send (sink, conninfo, &response, NULL);
   if (res < 0)
     goto send_error;
 
@@ -1869,7 +1887,7 @@ gst_rtsp_client_sink_send_keep_alive (GstRTSPClientSink * sink)
     gst_rtsp_message_dump (&request);
 
   res =
-      gst_rtsp_client_sink_connection_send (sink, sink->conninfo.connection,
+      gst_rtsp_client_sink_connection_send (sink, &sink->conninfo,
       &request, NULL);
   if (res < 0)
     goto send_error;
@@ -1920,7 +1938,7 @@ gst_rtsp_client_sink_loop_rx (GstRTSPClientSink * sink)
      * keep-alive request to keep the session open. */
     res =
         gst_rtsp_client_sink_connection_receive (sink,
-        sink->conninfo.connection, &message, &tv_timeout);
+        &sink->conninfo, &message, &tv_timeout);
 
     switch (res) {
       case GST_RTSP_OK:
@@ -1964,7 +1982,7 @@ gst_rtsp_client_sink_loop_rx (GstRTSPClientSink * sink)
         /* server sends us a request message, handle it */
         res =
             gst_rtsp_client_sink_handle_request (sink,
-            sink->conninfo.connection, &message);
+            &sink->conninfo, &message);
         if (res == GST_RTSP_EEOF)
           goto server_eof;
         else if (res < 0)
@@ -2479,7 +2497,7 @@ no_user_pass:
 
 static GstRTSPResult
 gst_rtsp_client_sink_try_send (GstRTSPClientSink * sink,
-    GstRTSPConnection * conn, GstRTSPMessage * request,
+    GstRTSPConnInfo * conninfo, GstRTSPMessage * request,
     GstRTSPMessage * response, GstRTSPStatusCode * code)
 {
   GstRTSPResult res;
@@ -2496,14 +2514,14 @@ again:
   g_mutex_lock (&sink->send_lock);
 
   res =
-      gst_rtsp_client_sink_connection_send (sink, conn, request,
+      gst_rtsp_client_sink_connection_send (sink, conninfo, request,
       sink->ptcp_timeout);
   if (res < 0) {
     g_mutex_unlock (&sink->send_lock);
     goto send_error;
   }
 
-  gst_rtsp_connection_reset_timeout (conn);
+  gst_rtsp_connection_reset_timeout (conninfo->connection);
 
   /* See if we should handle the response */
   if (response == NULL) {
@@ -2512,7 +2530,7 @@ again:
   }
 next:
   res =
-      gst_rtsp_client_sink_connection_receive (sink, conn, response,
+      gst_rtsp_client_sink_connection_receive (sink, conninfo, response,
       sink->ptcp_timeout);
 
   g_mutex_unlock (&sink->send_lock);
@@ -2526,7 +2544,7 @@ next:
 
   switch (response->type) {
     case GST_RTSP_MESSAGE_REQUEST:
-      res = gst_rtsp_client_sink_handle_request (sink, conn, response);
+      res = gst_rtsp_client_sink_handle_request (sink, conninfo, response);
       if (res == GST_RTSP_EEOF)
         goto server_eof;
       else if (res < 0)
@@ -2663,7 +2681,7 @@ gst_rtsp_client_sink_set_state (GstRTSPClientSink * sink, GstState state)
  * Returns: #GST_RTSP_OK if the processing was successful.
  */
 static GstRTSPResult
-gst_rtsp_client_sink_send (GstRTSPClientSink * sink, GstRTSPConnection * conn,
+gst_rtsp_client_sink_send (GstRTSPClientSink * sink, GstRTSPConnInfo * conninfo,
     GstRTSPMessage * request, GstRTSPMessage * response,
     GstRTSPStatusCode * code)
 {
@@ -2685,7 +2703,7 @@ gst_rtsp_client_sink_send (GstRTSPClientSink * sink, GstRTSPConnection * conn,
     method = request->type_data.request.method;
 
     if ((res =
-            gst_rtsp_client_sink_try_send (sink, conn, request, response,
+            gst_rtsp_client_sink_try_send (sink, conninfo, request, response,
                 &int_code)) < 0)
       goto error;
 
@@ -2892,7 +2910,7 @@ gst_rtsp_client_sink_connect_to_server (GstRTSPClientSink * sink,
         ("Retrieving server options"));
 
   if ((res =
-          gst_rtsp_client_sink_send (sink, sink->conninfo.connection, &request,
+          gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
               &response, NULL)) < 0)
     goto send_error;
 
@@ -3074,7 +3092,7 @@ gst_rtsp_client_sink_close (GstRTSPClientSink * sink, gboolean async,
       GST_ELEMENT_PROGRESS (sink, CONTINUE, "close", ("Closing stream"));
 
     if ((res =
-            gst_rtsp_client_sink_send (sink, info->connection, &request,
+            gst_rtsp_client_sink_send (sink, info, &request,
                 &response, NULL)) < 0)
       goto send_error;
 
@@ -3486,7 +3504,7 @@ do_send_data (GstBuffer * buffer, guint8 channel,
   gst_rtsp_message_take_body (&message, map_info.data, map_info.size);
 
   res =
-      gst_rtsp_client_sink_try_send (sink, sink->conninfo.connection, &message,
+      gst_rtsp_client_sink_try_send (sink, &sink->conninfo, &message,
       NULL, NULL);
 
   gst_rtsp_message_steal_body (&message, &data, &usize);
@@ -3534,7 +3552,7 @@ gst_rtsp_client_sink_setup_streams (GstRTSPClientSink * sink, gboolean async)
     GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
     GstRTSPStream *stream;
 
-    GstRTSPConnection *conn;
+    GstRTSPConnInfo *info;
     GstRTSPProfile profiles;
     GstRTSPProfile cur_profile;
     gchar *transports;
@@ -3571,14 +3589,14 @@ gst_rtsp_client_sink_setup_streams (GstRTSPClientSink * sink, gboolean async)
             stream);
         continue;
       }
-      conn = context->conninfo.connection;
+      info = &context->conninfo;
     } else {
-      conn = sink->conninfo.connection;
+      info = &sink->conninfo;
     }
     GST_DEBUG_OBJECT (sink, "doing setup of stream %p with %s", stream,
         context->conninfo.location);
 
-    conn_socket = gst_rtsp_connection_get_read_socket (conn);
+    conn_socket = gst_rtsp_connection_get_read_socket (info->connection);
     sa = g_socket_get_local_address (conn_socket, NULL);
     family = g_socket_address_get_family (sa);
     g_object_unref (sa);
@@ -3649,7 +3667,7 @@ gst_rtsp_client_sink_setup_streams (GstRTSPClientSink * sink, gboolean async)
               context->index));
 
     /* handle the code ourselves */
-    res = gst_rtsp_client_sink_send (sink, conn, &request, &response, &code);
+    res = gst_rtsp_client_sink_send (sink, info, &request, &response, &code);
     if (res < 0)
       goto send_error;
 
@@ -3981,7 +3999,7 @@ gst_rtsp_client_sink_record (GstRTSPClientSink * sink, gboolean async)
         ("Sending server stream info"));
 
   if ((res =
-          gst_rtsp_client_sink_send (sink, sink->conninfo.connection, &request,
+          gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
               &response, NULL)) < 0)
     goto send_error;
 
@@ -4016,7 +4034,7 @@ gst_rtsp_client_sink_record (GstRTSPClientSink * sink, gboolean async)
   if (async)
     GST_ELEMENT_PROGRESS (sink, CONTINUE, "record", ("Starting recording"));
   if ((res =
-          gst_rtsp_client_sink_send (sink, sink->conninfo.connection, &request,
+          gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
               &response, NULL)) < 0)
     goto send_error;
 
@@ -4108,7 +4126,7 @@ gst_rtsp_client_sink_pause (GstRTSPClientSink * sink, gboolean async)
    * aggregate control */
   for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
     GstRTSPStreamContext *stream = (GstRTSPStreamContext *) walk->data;
-    GstRTSPConnection *conn;
+    GstRTSPConnInfo *info;
     const gchar *setup_url;
 
     /* try aggregate control first but do non-aggregate control otherwise */
@@ -4118,9 +4136,9 @@ gst_rtsp_client_sink_pause (GstRTSPClientSink * sink, gboolean async)
       continue;
 
     if (sink->conninfo.connection) {
-      conn = sink->conninfo.connection;
+      info = &sink->conninfo;
     } else if (stream->conninfo.connection) {
-      conn = stream->conninfo.connection;
+      info = &stream->conninfo;
     } else {
       continue;
     }
@@ -4135,7 +4153,7 @@ gst_rtsp_client_sink_pause (GstRTSPClientSink * sink, gboolean async)
       goto create_request_failed;
 
     if ((res =
-            gst_rtsp_client_sink_send (sink, conn, &request, &response,
+            gst_rtsp_client_sink_send (sink, info, &request, &response,
                 NULL)) < 0)
       goto send_error;
 
index a8aef5b..b27daf8 100644 (file)
@@ -86,6 +86,9 @@ struct _GstRTSPConnInfo {
   GstRTSPConnection  *connection;
   gboolean            connected;
   gboolean            flushing;
+
+  GMutex              send_lock;
+  GMutex              recv_lock;
 };
 
 typedef struct _GstRTSPStreamInfo GstRTSPStreamInfo;