srt: Clean up error handling
authorJan Alexander Steffens (heftig) <jan.steffens@ltnglobal.com>
Wed, 8 Jun 2022 14:35:54 +0000 (16:35 +0200)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Fri, 4 Nov 2022 13:07:34 +0000 (13:07 +0000)
- Make the srt_epoll_wait loops more uniform.

- Error only via GError when possible; let the element send the error
  message. Avoids a second error message.

- Return 0 when cancelled. Avoids an error message from the element.

- Don't send an error message from send_headers when we're a server
  sink.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3156>

subprojects/gst-plugins-bad/ext/srt/gstsrtobject.c

index ba19cae..11ddd76 100644 (file)
@@ -38,11 +38,13 @@ GST_DEBUG_CATEGORY_EXTERN (gst_debug_srtobject);
 #define GST_CAT_DEFAULT gst_debug_srtobject
 
 #if SRT_VERSION_VALUE > 0x10402
-#define SRTSOCK_ERROR_DEBUG ("libsrt reported: %s", srt_rejectreason_str (reason))
+#define REASON_FORMAT "s"
+#define REASON_ARGS(reason) srt_rejectreason_str (reason)
 #else
 /* srt_rejectreason_str() is unavailable in libsrt 1.4.2 and prior due to
  * unexported symbol. See https://github.com/Haivision/srt/pull/1728. */
-#define SRTSOCK_ERROR_DEBUG ("libsrt reported reject reason code %d", reason)
+#define REASON_FORMAT "s %d"
+#define REASON_ARGS(reason) "reject reason code", (reason)
 #endif
 
 /* Define options added in later revisions */
@@ -52,10 +54,6 @@ GST_DEBUG_CATEGORY_EXTERN (gst_debug_srtobject);
 #define SRTO_RETRANSMITALGO 61
 #endif
 
-#define ELEMENT_WARNING_SRTSOCK_ERROR(code, reason) \
-  GST_ELEMENT_WARNING (srtobject->element, RESOURCE, code, \
-  ("Error on SRT socket. Trying to reconnect."), SRTSOCK_ERROR_DEBUG)
-
 enum
 {
   PROP_URI = 1,
@@ -924,19 +922,19 @@ thread_func (gpointer data)
 
     GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
 
-    if (srt_epoll_wait (srtobject->listener_poll_id, &rsock,
-            &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) {
+    if (srt_epoll_wait (srtobject->listener_poll_id, &rsock, &rsocklen, 0, 0,
+            poll_timeout, NULL, 0, NULL, 0) < 0) {
       gint srt_errno = srt_getlasterror (NULL);
 
       if (srtobject->listener_poll_id == SRT_ERROR)
         return NULL;
-      if (srt_errno == SRT_ETIMEOUT) {
+
+      if (srt_errno == SRT_ETIMEOUT)
         continue;
-      } else {
-        GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
-            ("abort polling: %s", srt_getlasterror_str ()), (NULL));
-        return NULL;
-      }
+
+      GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
+          ("abort polling: %s", srt_getlasterror_str ()), (NULL));
+      return NULL;
     }
 
     caller_sock =
@@ -1433,33 +1431,25 @@ gst_srt_object_close (GstSRTObject * srtobject)
 
 static gboolean
 gst_srt_object_wait_caller (GstSRTObject * srtobject,
-    GCancellable * cancellable, GError ** error)
+    GCancellable * cancellable)
 {
   gboolean ret;
 
   g_mutex_lock (&srtobject->sock_lock);
 
-  if (srtobject->callers == NULL) {
+  ret = (srtobject->callers != NULL);
+  if (!ret) {
     GST_INFO_OBJECT (srtobject->element, "Waiting for connection");
-
-    while (!g_cancellable_is_cancelled (cancellable)) {
-      ret = (srtobject->callers != NULL);
-      if (ret) {
-        GST_DEBUG_OBJECT (srtobject->element, "Got a connection");
-        break;
-      }
-
+    while (!ret && !g_cancellable_is_cancelled (cancellable)) {
       g_cond_wait (&srtobject->sock_cond, &srtobject->sock_lock);
+      ret = (srtobject->callers != NULL);
     }
-  } else {
-    ret = TRUE;
   }
 
   g_mutex_unlock (&srtobject->sock_lock);
 
-  if (!ret) {
-    g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_FAILED,
-        "Canceled waiting for a connection.");
+  if (ret) {
+    GST_DEBUG_OBJECT (srtobject->element, "Got a connection");
   }
 
   return ret;
@@ -1492,8 +1482,8 @@ gst_srt_object_read (GstSRTObject * srtobject,
   GST_OBJECT_UNLOCK (srtobject->element);
 
   if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
-    if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
-      return -1;
+    if (!gst_srt_object_wait_caller (srtobject, cancellable))
+      return 0;
 
     g_mutex_lock (&srtobject->sock_lock);
     if (srtobject->callers) {
@@ -1519,34 +1509,48 @@ gst_srt_object_read (GstSRTObject * srtobject,
             poll_timeout, NULL, 0, NULL, 0) < 0) {
       gint srt_errno = srt_getlasterror (NULL);
 
-      if (srt_errno != SRT_ETIMEOUT) {
+#if SRT_VERSION_VALUE >= 0x010402
+      if (srt_errno == SRT_EPOLLEMPTY)
         return 0;
-      }
-      continue;
+#endif
+
+      if (srt_errno == SRT_ETIMEOUT)
+        continue;
+
+      g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
+          "Failed to poll socket: %s", srt_getlasterror_str ());
+      return -1;
     }
 
     if (wsocklen == 1 && rsocklen == 1) {
       /* Socket reported in wsock AND rsock signifies an error. */
       gint reason = srt_getrejectreason (wsock);
-      gboolean is_auth_error = (reason == SRT_REJ_BADSECRET
-          || reason == SRT_REJ_UNSECURE);
 
-      if (is_auth_error) {
-        ELEMENT_WARNING_SRTSOCK_ERROR (NOT_AUTHORIZED, reason);
-      }
+      if (reason == SRT_REJ_BADSECRET || reason == SRT_REJ_UNSECURE) {
+        if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
+          GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED,
+              ("Caller failed to authenticate: %" REASON_FORMAT,
+                  REASON_ARGS (reason)), (NULL));
+          return 0;
+        }
 
-      if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
-        /* Caller has disappeared. */
-        return 0;
+        GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED,
+            ("Failed to authenticate: %" REASON_FORMAT ". Trying to reconnect",
+                REASON_ARGS (reason)), (NULL));
       } else {
-        if (!is_auth_error) {
-          ELEMENT_WARNING_SRTSOCK_ERROR (READ, reason);
+        if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
+          /* Caller has disappeared. */
+          return 0;
         }
 
-        gst_srt_object_close (srtobject);
-        if (!gst_srt_object_open_internal (srtobject, cancellable, error)) {
-          return -1;
-        }
+        GST_ELEMENT_WARNING (srtobject->element, RESOURCE, READ,
+            ("Error on SRT socket: %" REASON_FORMAT ". Trying to reconnect",
+                REASON_ARGS (reason)), (NULL));
+      }
+
+      gst_srt_object_close (srtobject);
+      if (!gst_srt_object_open_internal (srtobject, cancellable, error)) {
+        return -1;
       }
       continue;
     }
@@ -1598,7 +1602,7 @@ gst_srt_object_wakeup (GstSRTObject * srtobject, GCancellable * cancellable)
 static gboolean
 gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
     gint poll_id, gint poll_timeout, GstBufferList * headers,
-    GCancellable * cancellable)
+    GCancellable * cancellable, GError ** error)
 {
   guint size, i;
 
@@ -1618,27 +1622,39 @@ gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
     GstMapInfo mapinfo;
 
     if (g_cancellable_is_cancelled (cancellable)) {
-      return FALSE;
+      return TRUE;
     }
 
-    if (poll_id > 0 && srt_epoll_wait (poll_id, 0, 0, &wsock,
-            &wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
-      continue;
+    if (poll_id >= 0 && srt_epoll_wait (poll_id, 0, 0, &wsock, &wsocklen,
+            poll_timeout, NULL, 0, NULL, 0) < 0) {
+      gint srt_errno = srt_getlasterror (NULL);
+
+#if SRT_VERSION_VALUE >= 0x010402
+      if (srt_errno == SRT_EPOLLEMPTY)
+        return TRUE;
+#endif
+
+      if (srt_errno == SRT_ETIMEOUT)
+        continue;
+
+      g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
+          "Failed to poll socket: %s", srt_getlasterror_str ());
+      return FALSE;
     }
 
     GST_TRACE_OBJECT (srtobject->element, "sending header %u %" GST_PTR_FORMAT,
         i, buffer);
 
     if (!gst_buffer_map (buffer, &mapinfo, GST_MAP_READ)) {
-      GST_ELEMENT_ERROR (srtobject->element, RESOURCE, READ,
-          ("Could not map the input stream"), (NULL));
+      g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
+          "Failed to map header buffer");
       return FALSE;
     }
 
     sent = srt_sendmsg2 (wsock, (char *) mapinfo.data, mapinfo.size, 0);
     if (sent == SRT_ERROR) {
-      GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
-          ("%s", srt_getlasterror_str ()));
+      g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, "%s",
+          srt_getlasterror_str ());
       gst_buffer_unmap (buffer, &mapinfo);
       return FALSE;
     }
@@ -1654,7 +1670,7 @@ gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
 static gssize
 gst_srt_object_write_to_callers (GstSRTObject * srtobject,
     GstBufferList * headers,
-    const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
+    const GstMapInfo * mapinfo, GCancellable * cancellable)
 {
   GList *callers;
 
@@ -1674,10 +1690,17 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject,
     }
 
     if (!caller->sent_headers) {
-      if (!gst_srt_object_send_headers (srtobject, caller->sock, -1,
-              -1, headers, cancellable)) {
+      GError *error = NULL;
+
+      if (!gst_srt_object_send_headers (srtobject, caller->sock, -1, 0,
+              headers, cancellable, &error)) {
+        GST_WARNING_OBJECT (srtobject->element,
+            "Failed to send headers to caller %d: %s",
+            caller->sock, error->message);
+        g_error_free (error);
         goto err;
       }
+
       caller->sent_headers = TRUE;
     }
 
@@ -1712,7 +1735,7 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject,
 
 cancelled:
   g_mutex_unlock (&srtobject->sock_lock);
-  return -1;
+  return 0;
 }
 
 static gssize
@@ -1736,9 +1759,10 @@ gst_srt_object_write_one (GstSRTObject * srtobject,
 
   if (!srtobject->sent_headers) {
     if (!gst_srt_object_send_headers (srtobject, srtobject->sock,
-            srtobject->poll_id, poll_timeout, headers, cancellable)) {
+            srtobject->poll_id, poll_timeout, headers, cancellable, error)) {
       return -1;
     }
+
     srtobject->sent_headers = TRUE;
   }
 
@@ -1764,7 +1788,19 @@ gst_srt_object_write_one (GstSRTObject * srtobject,
 
     if (srt_epoll_wait (srtobject->poll_id, &rsock, &rsocklen, &wsock,
             &wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
-      continue;
+      gint srt_errno = srt_getlasterror (NULL);
+
+#if SRT_VERSION_VALUE >= 0x010402
+      if (srt_errno == SRT_EPOLLEMPTY)
+        return 0;
+#endif
+
+      if (srt_errno == SRT_ETIMEOUT)
+        continue;
+
+      g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
+          "Failed to poll socket: %s", srt_getlasterror_str ());
+      return -1;
     }
 
     if (wsocklen == 1 && rsocklen == 1) {
@@ -1772,9 +1808,13 @@ gst_srt_object_write_one (GstSRTObject * srtobject,
       gint reason = srt_getrejectreason (wsock);
 
       if (reason == SRT_REJ_BADSECRET || reason == SRT_REJ_UNSECURE) {
-        ELEMENT_WARNING_SRTSOCK_ERROR (NOT_AUTHORIZED, reason);
+        GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED,
+            ("Failed to authenticate: %" REASON_FORMAT ". Trying to reconnect",
+                REASON_ARGS (reason)), (NULL));
       } else {
-        ELEMENT_WARNING_SRTSOCK_ERROR (WRITE, reason);
+        GST_ELEMENT_WARNING (srtobject->element, RESOURCE, WRITE,
+            ("Error on SRT socket: %" REASON_FORMAT ". Trying to reconnect",
+                REASON_ARGS (reason)), (NULL));
       }
 
       gst_srt_object_close (srtobject);
@@ -1785,18 +1825,18 @@ gst_srt_object_write_one (GstSRTObject * srtobject,
     }
 
     if (srt_getsockflag (wsock, SRTO_PAYLOADSIZE, &payload_size, &optlen)) {
-      GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
-          ("%s", srt_getlasterror_str ()));
-      break;
+      g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, "%s",
+          srt_getlasterror_str ());
+      return -1;
     }
 
     rest = MIN (mapinfo->size - len, payload_size);
 
     sent = srt_sendmsg2 (wsock, (char *) (msg + len), rest, 0);
     if (sent < 0) {
-      GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
-          ("%s", srt_getlasterror_str ()));
-      break;
+      g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, "%s",
+          srt_getlasterror_str ());
+      return -1;
     }
     len += sent;
     srtobject->bytes += sent;
@@ -1826,12 +1866,12 @@ gst_srt_object_write (GstSRTObject * srtobject,
 
   if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
     if (wait_for_connection) {
-      if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
-        return -1;
+      if (!gst_srt_object_wait_caller (srtobject, cancellable))
+        return 0;
     }
     len =
         gst_srt_object_write_to_callers (srtobject, headers, mapinfo,
-        cancellable, error);
+        cancellable);
   } else {
     len =
         gst_srt_object_write_one (srtobject, headers, mapinfo, cancellable,