From 4e05100e8c882ac9aa0b670935d8ac2584609056 Mon Sep 17 00:00:00 2001 From: "Jan Alexander Steffens (heftig)" Date: Wed, 8 Jun 2022 16:35:54 +0200 Subject: [PATCH] srt: Clean up error handling - Make the srt_epoll_wait loops more uniform. - Error only via GError when possible; let the element send the error message. Avoids a second error message. - Return 0 when cancelled. Avoids an error message from the element. - Don't send an error message from send_headers when we're a server sink. Part-of: --- subprojects/gst-plugins-bad/ext/srt/gstsrtobject.c | 188 +++++++++++++-------- 1 file changed, 114 insertions(+), 74 deletions(-) diff --git a/subprojects/gst-plugins-bad/ext/srt/gstsrtobject.c b/subprojects/gst-plugins-bad/ext/srt/gstsrtobject.c index ba19cae..11ddd76 100644 --- a/subprojects/gst-plugins-bad/ext/srt/gstsrtobject.c +++ b/subprojects/gst-plugins-bad/ext/srt/gstsrtobject.c @@ -38,11 +38,13 @@ GST_DEBUG_CATEGORY_EXTERN (gst_debug_srtobject); #define GST_CAT_DEFAULT gst_debug_srtobject #if SRT_VERSION_VALUE > 0x10402 -#define SRTSOCK_ERROR_DEBUG ("libsrt reported: %s", srt_rejectreason_str (reason)) +#define REASON_FORMAT "s" +#define REASON_ARGS(reason) srt_rejectreason_str (reason) #else /* srt_rejectreason_str() is unavailable in libsrt 1.4.2 and prior due to * unexported symbol. See https://github.com/Haivision/srt/pull/1728. */ -#define SRTSOCK_ERROR_DEBUG ("libsrt reported reject reason code %d", reason) +#define REASON_FORMAT "s %d" +#define REASON_ARGS(reason) "reject reason code", (reason) #endif /* Define options added in later revisions */ @@ -52,10 +54,6 @@ GST_DEBUG_CATEGORY_EXTERN (gst_debug_srtobject); #define SRTO_RETRANSMITALGO 61 #endif -#define ELEMENT_WARNING_SRTSOCK_ERROR(code, reason) \ - GST_ELEMENT_WARNING (srtobject->element, RESOURCE, code, \ - ("Error on SRT socket. Trying to reconnect."), SRTSOCK_ERROR_DEBUG) - enum { PROP_URI = 1, @@ -924,19 +922,19 @@ thread_func (gpointer data) GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller"); - if (srt_epoll_wait (srtobject->listener_poll_id, &rsock, - &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) { + if (srt_epoll_wait (srtobject->listener_poll_id, &rsock, &rsocklen, 0, 0, + poll_timeout, NULL, 0, NULL, 0) < 0) { gint srt_errno = srt_getlasterror (NULL); if (srtobject->listener_poll_id == SRT_ERROR) return NULL; - if (srt_errno == SRT_ETIMEOUT) { + + if (srt_errno == SRT_ETIMEOUT) continue; - } else { - GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED, - ("abort polling: %s", srt_getlasterror_str ()), (NULL)); - return NULL; - } + + GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED, + ("abort polling: %s", srt_getlasterror_str ()), (NULL)); + return NULL; } caller_sock = @@ -1433,33 +1431,25 @@ gst_srt_object_close (GstSRTObject * srtobject) static gboolean gst_srt_object_wait_caller (GstSRTObject * srtobject, - GCancellable * cancellable, GError ** error) + GCancellable * cancellable) { gboolean ret; g_mutex_lock (&srtobject->sock_lock); - if (srtobject->callers == NULL) { + ret = (srtobject->callers != NULL); + if (!ret) { GST_INFO_OBJECT (srtobject->element, "Waiting for connection"); - - while (!g_cancellable_is_cancelled (cancellable)) { - ret = (srtobject->callers != NULL); - if (ret) { - GST_DEBUG_OBJECT (srtobject->element, "Got a connection"); - break; - } - + while (!ret && !g_cancellable_is_cancelled (cancellable)) { g_cond_wait (&srtobject->sock_cond, &srtobject->sock_lock); + ret = (srtobject->callers != NULL); } - } else { - ret = TRUE; } g_mutex_unlock (&srtobject->sock_lock); - if (!ret) { - g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_FAILED, - "Canceled waiting for a connection."); + if (ret) { + GST_DEBUG_OBJECT (srtobject->element, "Got a connection"); } return ret; @@ -1492,8 +1482,8 @@ gst_srt_object_read (GstSRTObject * srtobject, GST_OBJECT_UNLOCK (srtobject->element); if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) { - if (!gst_srt_object_wait_caller (srtobject, cancellable, error)) - return -1; + if (!gst_srt_object_wait_caller (srtobject, cancellable)) + return 0; g_mutex_lock (&srtobject->sock_lock); if (srtobject->callers) { @@ -1519,34 +1509,48 @@ gst_srt_object_read (GstSRTObject * srtobject, poll_timeout, NULL, 0, NULL, 0) < 0) { gint srt_errno = srt_getlasterror (NULL); - if (srt_errno != SRT_ETIMEOUT) { +#if SRT_VERSION_VALUE >= 0x010402 + if (srt_errno == SRT_EPOLLEMPTY) return 0; - } - continue; +#endif + + if (srt_errno == SRT_ETIMEOUT) + continue; + + g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, + "Failed to poll socket: %s", srt_getlasterror_str ()); + return -1; } if (wsocklen == 1 && rsocklen == 1) { /* Socket reported in wsock AND rsock signifies an error. */ gint reason = srt_getrejectreason (wsock); - gboolean is_auth_error = (reason == SRT_REJ_BADSECRET - || reason == SRT_REJ_UNSECURE); - if (is_auth_error) { - ELEMENT_WARNING_SRTSOCK_ERROR (NOT_AUTHORIZED, reason); - } + if (reason == SRT_REJ_BADSECRET || reason == SRT_REJ_UNSECURE) { + if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) { + GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED, + ("Caller failed to authenticate: %" REASON_FORMAT, + REASON_ARGS (reason)), (NULL)); + return 0; + } - if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) { - /* Caller has disappeared. */ - return 0; + GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED, + ("Failed to authenticate: %" REASON_FORMAT ". Trying to reconnect", + REASON_ARGS (reason)), (NULL)); } else { - if (!is_auth_error) { - ELEMENT_WARNING_SRTSOCK_ERROR (READ, reason); + if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) { + /* Caller has disappeared. */ + return 0; } - gst_srt_object_close (srtobject); - if (!gst_srt_object_open_internal (srtobject, cancellable, error)) { - return -1; - } + GST_ELEMENT_WARNING (srtobject->element, RESOURCE, READ, + ("Error on SRT socket: %" REASON_FORMAT ". Trying to reconnect", + REASON_ARGS (reason)), (NULL)); + } + + gst_srt_object_close (srtobject); + if (!gst_srt_object_open_internal (srtobject, cancellable, error)) { + return -1; } continue; } @@ -1598,7 +1602,7 @@ gst_srt_object_wakeup (GstSRTObject * srtobject, GCancellable * cancellable) static gboolean gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock, gint poll_id, gint poll_timeout, GstBufferList * headers, - GCancellable * cancellable) + GCancellable * cancellable, GError ** error) { guint size, i; @@ -1618,27 +1622,39 @@ gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock, GstMapInfo mapinfo; if (g_cancellable_is_cancelled (cancellable)) { - return FALSE; + return TRUE; } - if (poll_id > 0 && srt_epoll_wait (poll_id, 0, 0, &wsock, - &wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) { - continue; + if (poll_id >= 0 && srt_epoll_wait (poll_id, 0, 0, &wsock, &wsocklen, + poll_timeout, NULL, 0, NULL, 0) < 0) { + gint srt_errno = srt_getlasterror (NULL); + +#if SRT_VERSION_VALUE >= 0x010402 + if (srt_errno == SRT_EPOLLEMPTY) + return TRUE; +#endif + + if (srt_errno == SRT_ETIMEOUT) + continue; + + g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, + "Failed to poll socket: %s", srt_getlasterror_str ()); + return FALSE; } GST_TRACE_OBJECT (srtobject->element, "sending header %u %" GST_PTR_FORMAT, i, buffer); if (!gst_buffer_map (buffer, &mapinfo, GST_MAP_READ)) { - GST_ELEMENT_ERROR (srtobject->element, RESOURCE, READ, - ("Could not map the input stream"), (NULL)); + g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, + "Failed to map header buffer"); return FALSE; } sent = srt_sendmsg2 (wsock, (char *) mapinfo.data, mapinfo.size, 0); if (sent == SRT_ERROR) { - GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL, - ("%s", srt_getlasterror_str ())); + g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, "%s", + srt_getlasterror_str ()); gst_buffer_unmap (buffer, &mapinfo); return FALSE; } @@ -1654,7 +1670,7 @@ gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock, static gssize gst_srt_object_write_to_callers (GstSRTObject * srtobject, GstBufferList * headers, - const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error) + const GstMapInfo * mapinfo, GCancellable * cancellable) { GList *callers; @@ -1674,10 +1690,17 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject, } if (!caller->sent_headers) { - if (!gst_srt_object_send_headers (srtobject, caller->sock, -1, - -1, headers, cancellable)) { + GError *error = NULL; + + if (!gst_srt_object_send_headers (srtobject, caller->sock, -1, 0, + headers, cancellable, &error)) { + GST_WARNING_OBJECT (srtobject->element, + "Failed to send headers to caller %d: %s", + caller->sock, error->message); + g_error_free (error); goto err; } + caller->sent_headers = TRUE; } @@ -1712,7 +1735,7 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject, cancelled: g_mutex_unlock (&srtobject->sock_lock); - return -1; + return 0; } static gssize @@ -1736,9 +1759,10 @@ gst_srt_object_write_one (GstSRTObject * srtobject, if (!srtobject->sent_headers) { if (!gst_srt_object_send_headers (srtobject, srtobject->sock, - srtobject->poll_id, poll_timeout, headers, cancellable)) { + srtobject->poll_id, poll_timeout, headers, cancellable, error)) { return -1; } + srtobject->sent_headers = TRUE; } @@ -1764,7 +1788,19 @@ gst_srt_object_write_one (GstSRTObject * srtobject, if (srt_epoll_wait (srtobject->poll_id, &rsock, &rsocklen, &wsock, &wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) { - continue; + gint srt_errno = srt_getlasterror (NULL); + +#if SRT_VERSION_VALUE >= 0x010402 + if (srt_errno == SRT_EPOLLEMPTY) + return 0; +#endif + + if (srt_errno == SRT_ETIMEOUT) + continue; + + g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, + "Failed to poll socket: %s", srt_getlasterror_str ()); + return -1; } if (wsocklen == 1 && rsocklen == 1) { @@ -1772,9 +1808,13 @@ gst_srt_object_write_one (GstSRTObject * srtobject, gint reason = srt_getrejectreason (wsock); if (reason == SRT_REJ_BADSECRET || reason == SRT_REJ_UNSECURE) { - ELEMENT_WARNING_SRTSOCK_ERROR (NOT_AUTHORIZED, reason); + GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED, + ("Failed to authenticate: %" REASON_FORMAT ". Trying to reconnect", + REASON_ARGS (reason)), (NULL)); } else { - ELEMENT_WARNING_SRTSOCK_ERROR (WRITE, reason); + GST_ELEMENT_WARNING (srtobject->element, RESOURCE, WRITE, + ("Error on SRT socket: %" REASON_FORMAT ". Trying to reconnect", + REASON_ARGS (reason)), (NULL)); } gst_srt_object_close (srtobject); @@ -1785,18 +1825,18 @@ gst_srt_object_write_one (GstSRTObject * srtobject, } if (srt_getsockflag (wsock, SRTO_PAYLOADSIZE, &payload_size, &optlen)) { - GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL, - ("%s", srt_getlasterror_str ())); - break; + g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, "%s", + srt_getlasterror_str ()); + return -1; } rest = MIN (mapinfo->size - len, payload_size); sent = srt_sendmsg2 (wsock, (char *) (msg + len), rest, 0); if (sent < 0) { - GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL, - ("%s", srt_getlasterror_str ())); - break; + g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, "%s", + srt_getlasterror_str ()); + return -1; } len += sent; srtobject->bytes += sent; @@ -1826,12 +1866,12 @@ gst_srt_object_write (GstSRTObject * srtobject, if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) { if (wait_for_connection) { - if (!gst_srt_object_wait_caller (srtobject, cancellable, error)) - return -1; + if (!gst_srt_object_wait_caller (srtobject, cancellable)) + return 0; } len = gst_srt_object_write_to_callers (srtobject, headers, mapinfo, - cancellable, error); + cancellable); } else { len = gst_srt_object_write_one (srtobject, headers, mapinfo, cancellable, -- 2.7.4