#define GST_CAT_DEFAULT gst_debug_srtobject
#if SRT_VERSION_VALUE > 0x10402
-#define SRTSOCK_ERROR_DEBUG ("libsrt reported: %s", srt_rejectreason_str (reason))
+#define REASON_FORMAT "s"
+#define REASON_ARGS(reason) srt_rejectreason_str (reason)
#else
/* srt_rejectreason_str() is unavailable in libsrt 1.4.2 and prior due to
* unexported symbol. See https://github.com/Haivision/srt/pull/1728. */
-#define SRTSOCK_ERROR_DEBUG ("libsrt reported reject reason code %d", reason)
+#define REASON_FORMAT "s %d"
+#define REASON_ARGS(reason) "reject reason code", (reason)
#endif
/* Define options added in later revisions */
#define SRTO_RETRANSMITALGO 61
#endif
-#define ELEMENT_WARNING_SRTSOCK_ERROR(code, reason) \
- GST_ELEMENT_WARNING (srtobject->element, RESOURCE, code, \
- ("Error on SRT socket. Trying to reconnect."), SRTSOCK_ERROR_DEBUG)
-
enum
{
PROP_URI = 1,
GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
- if (srt_epoll_wait (srtobject->listener_poll_id, &rsock,
- &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) {
+ if (srt_epoll_wait (srtobject->listener_poll_id, &rsock, &rsocklen, 0, 0,
+ poll_timeout, NULL, 0, NULL, 0) < 0) {
gint srt_errno = srt_getlasterror (NULL);
if (srtobject->listener_poll_id == SRT_ERROR)
return NULL;
- if (srt_errno == SRT_ETIMEOUT) {
+
+ if (srt_errno == SRT_ETIMEOUT)
continue;
- } else {
- GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
- ("abort polling: %s", srt_getlasterror_str ()), (NULL));
- return NULL;
- }
+
+ GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
+ ("abort polling: %s", srt_getlasterror_str ()), (NULL));
+ return NULL;
}
caller_sock =
static gboolean
gst_srt_object_wait_caller (GstSRTObject * srtobject,
- GCancellable * cancellable, GError ** error)
+ GCancellable * cancellable)
{
gboolean ret;
g_mutex_lock (&srtobject->sock_lock);
- if (srtobject->callers == NULL) {
+ ret = (srtobject->callers != NULL);
+ if (!ret) {
GST_INFO_OBJECT (srtobject->element, "Waiting for connection");
-
- while (!g_cancellable_is_cancelled (cancellable)) {
- ret = (srtobject->callers != NULL);
- if (ret) {
- GST_DEBUG_OBJECT (srtobject->element, "Got a connection");
- break;
- }
-
+ while (!ret && !g_cancellable_is_cancelled (cancellable)) {
g_cond_wait (&srtobject->sock_cond, &srtobject->sock_lock);
+ ret = (srtobject->callers != NULL);
}
- } else {
- ret = TRUE;
}
g_mutex_unlock (&srtobject->sock_lock);
- if (!ret) {
- g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_FAILED,
- "Canceled waiting for a connection.");
+ if (ret) {
+ GST_DEBUG_OBJECT (srtobject->element, "Got a connection");
}
return ret;
GST_OBJECT_UNLOCK (srtobject->element);
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
- if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
- return -1;
+ if (!gst_srt_object_wait_caller (srtobject, cancellable))
+ return 0;
g_mutex_lock (&srtobject->sock_lock);
if (srtobject->callers) {
poll_timeout, NULL, 0, NULL, 0) < 0) {
gint srt_errno = srt_getlasterror (NULL);
- if (srt_errno != SRT_ETIMEOUT) {
+#if SRT_VERSION_VALUE >= 0x010402
+ if (srt_errno == SRT_EPOLLEMPTY)
return 0;
- }
- continue;
+#endif
+
+ if (srt_errno == SRT_ETIMEOUT)
+ continue;
+
+ g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
+ "Failed to poll socket: %s", srt_getlasterror_str ());
+ return -1;
}
if (wsocklen == 1 && rsocklen == 1) {
/* Socket reported in wsock AND rsock signifies an error. */
gint reason = srt_getrejectreason (wsock);
- gboolean is_auth_error = (reason == SRT_REJ_BADSECRET
- || reason == SRT_REJ_UNSECURE);
- if (is_auth_error) {
- ELEMENT_WARNING_SRTSOCK_ERROR (NOT_AUTHORIZED, reason);
- }
+ if (reason == SRT_REJ_BADSECRET || reason == SRT_REJ_UNSECURE) {
+ if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
+ GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED,
+ ("Caller failed to authenticate: %" REASON_FORMAT,
+ REASON_ARGS (reason)), (NULL));
+ return 0;
+ }
- if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
- /* Caller has disappeared. */
- return 0;
+ GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED,
+ ("Failed to authenticate: %" REASON_FORMAT ". Trying to reconnect",
+ REASON_ARGS (reason)), (NULL));
} else {
- if (!is_auth_error) {
- ELEMENT_WARNING_SRTSOCK_ERROR (READ, reason);
+ if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
+ /* Caller has disappeared. */
+ return 0;
}
- gst_srt_object_close (srtobject);
- if (!gst_srt_object_open_internal (srtobject, cancellable, error)) {
- return -1;
- }
+ GST_ELEMENT_WARNING (srtobject->element, RESOURCE, READ,
+ ("Error on SRT socket: %" REASON_FORMAT ". Trying to reconnect",
+ REASON_ARGS (reason)), (NULL));
+ }
+
+ gst_srt_object_close (srtobject);
+ if (!gst_srt_object_open_internal (srtobject, cancellable, error)) {
+ return -1;
}
continue;
}
static gboolean
gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
gint poll_id, gint poll_timeout, GstBufferList * headers,
- GCancellable * cancellable)
+ GCancellable * cancellable, GError ** error)
{
guint size, i;
GstMapInfo mapinfo;
if (g_cancellable_is_cancelled (cancellable)) {
- return FALSE;
+ return TRUE;
}
- if (poll_id > 0 && srt_epoll_wait (poll_id, 0, 0, &wsock,
- &wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
- continue;
+ if (poll_id >= 0 && srt_epoll_wait (poll_id, 0, 0, &wsock, &wsocklen,
+ poll_timeout, NULL, 0, NULL, 0) < 0) {
+ gint srt_errno = srt_getlasterror (NULL);
+
+#if SRT_VERSION_VALUE >= 0x010402
+ if (srt_errno == SRT_EPOLLEMPTY)
+ return TRUE;
+#endif
+
+ if (srt_errno == SRT_ETIMEOUT)
+ continue;
+
+ g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
+ "Failed to poll socket: %s", srt_getlasterror_str ());
+ return FALSE;
}
GST_TRACE_OBJECT (srtobject->element, "sending header %u %" GST_PTR_FORMAT,
i, buffer);
if (!gst_buffer_map (buffer, &mapinfo, GST_MAP_READ)) {
- GST_ELEMENT_ERROR (srtobject->element, RESOURCE, READ,
- ("Could not map the input stream"), (NULL));
+ g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
+ "Failed to map header buffer");
return FALSE;
}
sent = srt_sendmsg2 (wsock, (char *) mapinfo.data, mapinfo.size, 0);
if (sent == SRT_ERROR) {
- GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
- ("%s", srt_getlasterror_str ()));
+ g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, "%s",
+ srt_getlasterror_str ());
gst_buffer_unmap (buffer, &mapinfo);
return FALSE;
}
static gssize
gst_srt_object_write_to_callers (GstSRTObject * srtobject,
GstBufferList * headers,
- const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
+ const GstMapInfo * mapinfo, GCancellable * cancellable)
{
GList *callers;
}
if (!caller->sent_headers) {
- if (!gst_srt_object_send_headers (srtobject, caller->sock, -1,
- -1, headers, cancellable)) {
+ GError *error = NULL;
+
+ if (!gst_srt_object_send_headers (srtobject, caller->sock, -1, 0,
+ headers, cancellable, &error)) {
+ GST_WARNING_OBJECT (srtobject->element,
+ "Failed to send headers to caller %d: %s",
+ caller->sock, error->message);
+ g_error_free (error);
goto err;
}
+
caller->sent_headers = TRUE;
}
cancelled:
g_mutex_unlock (&srtobject->sock_lock);
- return -1;
+ return 0;
}
static gssize
if (!srtobject->sent_headers) {
if (!gst_srt_object_send_headers (srtobject, srtobject->sock,
- srtobject->poll_id, poll_timeout, headers, cancellable)) {
+ srtobject->poll_id, poll_timeout, headers, cancellable, error)) {
return -1;
}
+
srtobject->sent_headers = TRUE;
}
if (srt_epoll_wait (srtobject->poll_id, &rsock, &rsocklen, &wsock,
&wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
- continue;
+ gint srt_errno = srt_getlasterror (NULL);
+
+#if SRT_VERSION_VALUE >= 0x010402
+ if (srt_errno == SRT_EPOLLEMPTY)
+ return 0;
+#endif
+
+ if (srt_errno == SRT_ETIMEOUT)
+ continue;
+
+ g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE,
+ "Failed to poll socket: %s", srt_getlasterror_str ());
+ return -1;
}
if (wsocklen == 1 && rsocklen == 1) {
gint reason = srt_getrejectreason (wsock);
if (reason == SRT_REJ_BADSECRET || reason == SRT_REJ_UNSECURE) {
- ELEMENT_WARNING_SRTSOCK_ERROR (NOT_AUTHORIZED, reason);
+ GST_ELEMENT_WARNING (srtobject->element, RESOURCE, NOT_AUTHORIZED,
+ ("Failed to authenticate: %" REASON_FORMAT ". Trying to reconnect",
+ REASON_ARGS (reason)), (NULL));
} else {
- ELEMENT_WARNING_SRTSOCK_ERROR (WRITE, reason);
+ GST_ELEMENT_WARNING (srtobject->element, RESOURCE, WRITE,
+ ("Error on SRT socket: %" REASON_FORMAT ". Trying to reconnect",
+ REASON_ARGS (reason)), (NULL));
}
gst_srt_object_close (srtobject);
}
if (srt_getsockflag (wsock, SRTO_PAYLOADSIZE, &payload_size, &optlen)) {
- GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
- ("%s", srt_getlasterror_str ()));
- break;
+ g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, "%s",
+ srt_getlasterror_str ());
+ return -1;
}
rest = MIN (mapinfo->size - len, payload_size);
sent = srt_sendmsg2 (wsock, (char *) (msg + len), rest, 0);
if (sent < 0) {
- GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
- ("%s", srt_getlasterror_str ()));
- break;
+ g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_WRITE, "%s",
+ srt_getlasterror_str ());
+ return -1;
}
len += sent;
srtobject->bytes += sent;
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
if (wait_for_connection) {
- if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
- return -1;
+ if (!gst_srt_object_wait_caller (srtobject, cancellable))
+ return 0;
}
len =
gst_srt_object_write_to_callers (srtobject, headers, mapinfo,
- cancellable, error);
+ cancellable);
} else {
len =
gst_srt_object_write_one (srtobject, headers, mapinfo, cancellable,