srtobject: Remove pointless GMainLoop
authorOlivier Crête <olivier.crete@collabora.com>
Fri, 23 Aug 2019 20:21:47 +0000 (16:21 -0400)
committerTim-Philipp Müller <tim@centricular.com>
Sat, 31 Aug 2019 23:22:05 +0000 (00:22 +0100)
Just use srt's blocking epoll function and fix locking while we're at it.

ext/srt/gstsrtobject.c
ext/srt/gstsrtobject.h

index ea7bb6f..069f255 100644 (file)
@@ -586,102 +586,101 @@ static gpointer
 thread_func (gpointer data)
 {
   GstSRTObject *srtobject = data;
-
-  g_main_loop_run (srtobject->loop);
-
-  return NULL;
-}
-
-static gboolean
-idle_listen_source_cb (gpointer data)
-{
-  GstSRTObject *srtobject = data;
   SRTSOCKET caller_sock;
-  struct sockaddr caller_sa;
-  gsize caller_sa_len;
+  union
+  {
+    struct sockaddr_storage ss;
+    struct sockaddr sa;
+  } caller_sa;
+  int caller_sa_len;
 
   gint poll_timeout;
 
   SRTSOCKET rsock;
   gint rsocklen = 1;
 
-  if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
-          &poll_timeout)) {
-    poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
-  }
+  for (;;) {
+    if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
+            &poll_timeout)) {
+      poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
+    }
 
-  GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
+    GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
 
-  if (srt_epoll_wait (srtobject->listener_poll_id, &rsock,
-          &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) {
-    gint srt_errno = srt_getlasterror (NULL);
+    if (srt_epoll_wait (srtobject->listener_poll_id, &rsock,
+            &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) {
+      gint srt_errno = srt_getlasterror (NULL);
 
-    if (srt_errno == SRT_ETIMEOUT) {
-      return TRUE;
-    } else {
-      GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
-          ("abort polling: %s", srt_getlasterror_str ()), (NULL));
-      return FALSE;
+      if (srtobject->listener_poll_id == SRT_ERROR)
+        return NULL;
+      if (srt_errno == SRT_ETIMEOUT) {
+        continue;
+      } else {
+        GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
+            ("abort polling: %s", srt_getlasterror_str ()), (NULL));
+        return NULL;
+      }
     }
-  }
 
-  caller_sock =
-      srt_accept (srtobject->listener_sock, &caller_sa, (int *) &caller_sa_len);
+    caller_sock =
+        srt_accept (srtobject->listener_sock, &caller_sa.sa, &caller_sa_len);
 
-  if (caller_sock != SRT_INVALID_SOCK) {
-    SRTCaller *caller;
-    gint flag = SRT_EPOLL_ERR;
-
-    caller = srt_caller_new ();
-    caller->sockaddr =
-        g_socket_address_new_from_native (&caller_sa, caller_sa_len);
-    caller->poll_id = srt_epoll_create ();
-    caller->sock = caller_sock;
-
-    if (gst_uri_handler_get_uri_type (GST_URI_HANDLER
-            (srtobject->element)) == GST_URI_SRC) {
-      flag |= SRT_EPOLL_IN;
-    } else {
-      flag |= SRT_EPOLL_OUT;
-    }
+    if (caller_sock != SRT_INVALID_SOCK) {
+      SRTCaller *caller;
+      gint flag = SRT_EPOLL_ERR;
 
-    if (srt_epoll_add_usock (caller->poll_id, caller_sock, &flag)) {
+      caller = srt_caller_new ();
+      caller->sockaddr =
+          g_socket_address_new_from_native (&caller_sa.sa, caller_sa_len);
+      caller->poll_id = srt_epoll_create ();
+      caller->sock = caller_sock;
 
-      GST_ELEMENT_ERROR (srtobject->element, RESOURCE, SETTINGS,
-          ("%s", srt_getlasterror_str ()), (NULL));
+      if (gst_uri_handler_get_uri_type (GST_URI_HANDLER
+              (srtobject->element)) == GST_URI_SRC) {
+        flag |= SRT_EPOLL_IN;
+      } else {
+        flag |= SRT_EPOLL_OUT;
+      }
 
-      srt_caller_free (caller);
+      if (srt_epoll_add_usock (caller->poll_id, caller_sock, &flag)) {
 
-      /* try-again */
-      return TRUE;
-    }
+        GST_ELEMENT_ERROR (srtobject->element, RESOURCE, SETTINGS,
+            ("%s", srt_getlasterror_str ()), (NULL));
 
-    GST_OBJECT_LOCK (srtobject->element);
-    srtobject->callers = g_list_append (srtobject->callers, caller);
-    g_cond_signal (&srtobject->sock_cond);
-    GST_OBJECT_UNLOCK (srtobject->element);
+        srt_caller_free (caller);
 
-    /* notifying caller-added */
-    if (srtobject->caller_added_closure != NULL) {
-      GValue values[2] = { G_VALUE_INIT, G_VALUE_INIT };
+        /* try-again */
+        continue;
+      }
 
-      g_value_init (&values[0], G_TYPE_INT);
-      g_value_set_int (&values[0], caller->sock);
+      GST_OBJECT_LOCK (srtobject->element);
+      srtobject->callers = g_list_append (srtobject->callers, caller);
+      g_cond_signal (&srtobject->sock_cond);
+      GST_OBJECT_UNLOCK (srtobject->element);
 
-      g_value_init (&values[1], G_TYPE_SOCKET_ADDRESS);
-      g_value_set_object (&values[1], caller->sockaddr);
+      /* notifying caller-added */
+      if (srtobject->caller_added_closure != NULL) {
+        GValue values[2] = { G_VALUE_INIT, G_VALUE_INIT };
 
-      g_closure_invoke (srtobject->caller_added_closure, NULL, 2, values, NULL);
+        g_value_init (&values[0], G_TYPE_INT);
+        g_value_set_int (&values[0], caller->sock);
 
-      g_value_unset (&values[1]);
-    }
+        g_value_init (&values[1], G_TYPE_SOCKET_ADDRESS);
+        g_value_set_object (&values[1], caller->sockaddr);
 
-    GST_DEBUG_OBJECT (srtobject->element, "Accept to connect");
-  }
+        g_closure_invoke (srtobject->caller_added_closure, NULL, 2, values,
+            NULL);
+
+        g_value_unset (&values[1]);
+      }
 
-  /* only one caller is allowed if the element is source. */
-  return gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) !=
-      GST_URI_SRC;
+      GST_DEBUG_OBJECT (srtobject->element, "Accept to connect");
+
+      if (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) ==
+          GST_URI_SRC)
+        return NULL;
+    }
+  }
 }
 
 static gboolean
@@ -752,15 +751,6 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject,
 
   srtobject->listener_sock = sock;
 
-  srtobject->context = g_main_context_new ();
-  srtobject->loop = g_main_loop_new (srtobject->context, TRUE);
-
-  srtobject->listener_source = g_idle_source_new ();
-  g_source_set_callback (srtobject->listener_source,
-      (GSourceFunc) idle_listen_source_cb, srtobject, NULL);
-
-  g_source_attach (srtobject->listener_source, srtobject->context);
-
   srtobject->thread =
       g_thread_try_new ("GstSRTObjectListener", thread_func, srtobject, error);
 
@@ -772,9 +762,6 @@ gst_srt_object_wait_connect (GstSRTObject * srtobject,
 
 failed:
 
-  g_clear_pointer (&srtobject->loop, g_main_loop_unref);
-  g_clear_pointer (&srtobject->context, g_main_context_unref);
-
   if (srtobject->listener_poll_id != SRT_ERROR) {
     srt_epoll_release (srtobject->listener_poll_id);
   }
@@ -1019,6 +1006,7 @@ out:
 void
 gst_srt_object_close (GstSRTObject * srtobject)
 {
+  GST_OBJECT_LOCK (srtobject->element);
   if (srtobject->poll_id != SRT_ERROR) {
     srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
   }
@@ -1032,20 +1020,16 @@ gst_srt_object_close (GstSRTObject * srtobject)
     srtobject->sock = SRT_INVALID_SOCK;
   }
 
-  if (srtobject->loop) {
-    g_main_loop_quit (srtobject->loop);
-
-    if (srtobject->listener_poll_id != SRT_ERROR) {
-      srt_epoll_remove_usock (srtobject->listener_poll_id,
-          srtobject->listener_sock);
-      srtobject->listener_poll_id = SRT_ERROR;
-    }
-
-    g_thread_join (srtobject->thread);
-
-    g_clear_pointer (&srtobject->thread, g_thread_unref);
-    g_clear_pointer (&srtobject->loop, g_main_loop_unref);
-    g_clear_pointer (&srtobject->context, g_main_context_unref);
+  if (srtobject->listener_poll_id != SRT_ERROR) {
+    srt_epoll_remove_usock (srtobject->listener_poll_id,
+        srtobject->listener_sock);
+    srtobject->listener_poll_id = SRT_ERROR;
+  }
+  if (srtobject->thread) {
+    GThread *thread = g_steal_pointer (&srtobject->thread);
+    GST_OBJECT_UNLOCK (srtobject->element);
+    g_thread_join (thread);
+    GST_OBJECT_LOCK (srtobject->element);
   }
 
   if (srtobject->listener_sock != SRT_INVALID_SOCK) {
@@ -1056,14 +1040,20 @@ gst_srt_object_close (GstSRTObject * srtobject)
     srtobject->listener_sock = SRT_INVALID_SOCK;
   }
 
-  g_list_foreach (srtobject->callers, (GFunc) srt_caller_invoke_removed_closure,
-      srtobject);
-  g_list_free_full (srtobject->callers, (GDestroyNotify) srt_caller_free);
+  if (srtobject->callers) {
+    GList *callers = g_steal_pointer (&srtobject->callers);
+    GST_OBJECT_UNLOCK (srtobject->element);
+    g_list_foreach (callers, (GFunc) srt_caller_invoke_removed_closure,
+        srtobject);
+    GST_OBJECT_LOCK (srtobject->element);
+    g_list_free_full (callers, (GDestroyNotify) srt_caller_free);
+  }
 
   g_clear_pointer (&srtobject->caller_added_closure, g_closure_unref);
   g_clear_pointer (&srtobject->caller_removed_closure, g_closure_unref);
 
   srtobject->opened = FALSE;
+  GST_OBJECT_UNLOCK (srtobject->element);
 }
 
 static gboolean
@@ -1076,7 +1066,7 @@ gst_srt_object_wait_caller (GstSRTObject * srtobject,
 
   GST_OBJECT_LOCK (srtobject->element);
   while (!g_cancellable_is_cancelled (cancellable)) {
-    ret = g_list_length (srtobject->callers) >= 1;
+    ret = (srtobject->callers != NULL);
     if (ret)
       break;
     g_cond_wait (&srtobject->sock_cond,
@@ -1096,7 +1086,7 @@ gst_srt_object_read (GstSRTObject * srtobject,
   gssize len = 0;
   gint poll_timeout;
   GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
-  gint poll_id;
+  gint poll_id = SRT_ERROR;
 
   /* Only source element can read data */
   g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
@@ -1111,9 +1101,13 @@ gst_srt_object_read (GstSRTObject * srtobject,
     if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
       return -1;
 
+    GST_OBJECT_LOCK (srtobject->element);
     caller = srtobject->callers->data;
-    poll_id = caller->poll_id;
-
+    if (srtobject->callers)
+      poll_id = caller->poll_id;
+    GST_OBJECT_UNLOCK (srtobject->element);
+    if (poll_id == SRT_ERROR)
+      return 0;
   } else {
     poll_id = srtobject->poll_id;
   }
@@ -1127,9 +1121,16 @@ gst_srt_object_read (GstSRTObject * srtobject,
 
     SRTSOCKET rsock;
     gint rsocklen = 1;
+    int pollret;
 
-    if (srt_epoll_wait (poll_id, &rsock,
-            &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) {
+    pollret = srt_epoll_wait (poll_id, &rsock,
+        &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0);
+    if (pollret < 0) {
+      gint srt_errno = srt_getlasterror (NULL);
+
+      if (srt_errno != SRT_ETIMEOUT) {
+        return 0;
+      }
       continue;
     }
 
@@ -1288,9 +1289,7 @@ gst_srt_object_write_to_callers (GstSRTObject * srtobject,
   err:
     srtobject->callers = g_list_remove (srtobject->callers, caller);
     srt_caller_invoke_removed_closure (caller, srtobject);
-    GST_OBJECT_UNLOCK (srtobject->element);
     srt_caller_free (caller);
-    GST_OBJECT_LOCK (srtobject->element);
   }
 
   GST_OBJECT_UNLOCK (srtobject->element);
index 86bc413..06d4598 100644 (file)
@@ -61,9 +61,6 @@ struct _GstSRTObject
   SRTSOCKET                     listener_sock;
   gint                          listener_poll_id;
 
-  GMainLoop                    *loop;
-  GMainContext                 *context;
-  GSource                      *listener_source;
   GThread                      *thread;
 
   GList                        *callers;