ipcpipeline: use GstPoll instead of select() to watch for socket activity
authorGeorge Kiagiadakis <george.kiagiadakis@collabora.com>
Mon, 4 Sep 2017 12:52:03 +0000 (15:52 +0300)
committerGeorge Kiagiadakis <george.kiagiadakis@collabora.com>
Mon, 4 Sep 2017 12:52:03 +0000 (15:52 +0300)
... and make that code more readable in the process

https://bugzilla.gnome.org/show_bug.cgi?id=787208

sys/ipcpipeline/gstipcpipelinecomm.c
sys/ipcpipeline/gstipcpipelinecomm.h

index e21da99..6f17a11 100644 (file)
@@ -1605,15 +1605,16 @@ gst_ipc_pipeline_comm_init (GstIpcPipelineComm * comm, GstElement * element)
       g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
       (GDestroyNotify) comm_request_free);
   comm->adapter = gst_adapter_new ();
-  g_atomic_int_set (&comm->thread_running, 0);
+  comm->poll = gst_poll_new (TRUE);
+  gst_poll_fd_init (&comm->pollFDin);
 }
 
 void
 gst_ipc_pipeline_comm_clear (GstIpcPipelineComm * comm)
 {
-  g_assert (!g_atomic_int_get (&comm->thread_running));
   g_hash_table_destroy (comm->waiting_ids);
   gst_object_unref (comm->adapter);
+  gst_poll_free (comm->poll);
   g_mutex_clear (&comm->mutex);
 }
 
@@ -1698,91 +1699,75 @@ gst_ipc_pipeline_comm_reply_request (GstIpcPipelineComm * comm, guint32 id,
   return TRUE;
 }
 
-static gboolean
+static gint
 update_adapter (GstIpcPipelineComm * comm)
 {
   GstMemory *mem = NULL;
-
-  for (;;) {
-    fd_set set;
-    struct timeval tv;
-    int sret;
-    ssize_t sz;
-    GstBuffer *buf;
-    GstMapInfo map;
-    int fdin = comm->fdin;
-    int fdclose = comm->reader_thread_stopping_pipe[0];
-    int fdmax;
-
-    FD_ZERO (&set);
-    FD_SET (fdclose, &set);
-    fdmax = fdclose;
-    if (fdin >= 0 && GST_ELEMENT_PARENT (comm->element)) {
-      FD_SET (fdin, &set);
-      if (fdin > fdmax)
-        fdmax = fdin;
-    }
-    tv.tv_sec = 0;
-    tv.tv_usec = 100000;
-    sret = select (fdmax + 1, &set, NULL, NULL, &tv);
-    if (sret < 0) {
-      if (errno == EAGAIN)
-        continue;
-      if (errno == EINTR)
-        break;
-      if (mem)
-        gst_memory_unref (mem);
-      return FALSE;
+  GstBuffer *buf;
+  GstMapInfo map;
+  ssize_t sz;
+  gint ret = 0;
+
+again:
+  /* update pollFDin if necessary (fdin changed or we lost our parent).
+   * we do not allow a parent-less element to communicate with its peer
+   * in order to avoid race conditions where the slave tries to change
+   * the state of its parent pipeline while it is not yet added in that
+   * pipeline. */
+  if (comm->pollFDin.fd != comm->fdin || !GST_OBJECT_PARENT (comm->element)) {
+    if (comm->pollFDin.fd != -1) {
+      GST_DEBUG_OBJECT (comm->element, "Stop watching fd %d",
+          comm->pollFDin.fd);
+      gst_poll_remove_fd (comm->poll, &comm->pollFDin);
+      gst_poll_fd_init (&comm->pollFDin);
     }
-    if (FD_ISSET (fdclose, &set)) {
-      GST_INFO_OBJECT (comm->element, "data received on close notify pipe");
-      comm->reader_thread_stopping = TRUE;
-      break;
+    if (comm->fdin != -1 && GST_OBJECT_PARENT (comm->element)) {
+      GST_DEBUG_OBJECT (comm->element, "Start watching fd %d", comm->fdin);
+      comm->pollFDin.fd = comm->fdin;
+      gst_poll_add_fd (comm->poll, &comm->pollFDin);
+      gst_poll_fd_ctl_read (comm->poll, &comm->pollFDin, TRUE);
     }
-    if (fdin < 0)
-      break;
-    if (!FD_ISSET (fdin, &set))
-      break;
-    if (mem == NULL)
+  }
+
+  /* wait for activity on fdin or a flush */
+  if (gst_poll_wait (comm->poll, 100 * GST_MSECOND) < 0) {
+    if (errno == EAGAIN)
+      goto again;
+    /* error out, unless interrupted or flushing */
+    if (errno != EINTR)
+      ret = (errno == EBUSY) ? 2 : 1;
+  }
+
+  /* read from fdin if possible and push data to our adapter */
+  if (comm->pollFDin.fd >= 0
+      && gst_poll_fd_can_read (comm->poll, &comm->pollFDin)) {
+    if (!mem)
       mem = gst_allocator_alloc (NULL, comm->read_chunk_size, NULL);
+
     gst_memory_map (mem, &map, GST_MAP_WRITE);
-    sz = read (fdin, map.data, map.size);
+    sz = read (comm->pollFDin.fd, map.data, map.size);
     gst_memory_unmap (mem, &map);
-    if (sz < 0) {
+
+    if (sz <= 0) {
       if (errno == EAGAIN)
-        continue;
+        goto again;
+      /* error out, unless interrupted */
+      if (errno != EINTR)
+        ret = 1;
+    } else {
+      gst_memory_resize (mem, 0, sz);
+      buf = gst_buffer_new ();
+      gst_buffer_append_memory (buf, mem);
       mem = NULL;
-      if (errno == EINTR)
-        break;
-      gst_memory_unref (mem);
-      return FALSE;
+      GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz);
+      gst_adapter_push (comm->adapter, buf);
     }
-    if (sz == 0) {
-      GST_INFO_OBJECT (comm->element, "fd closed");
-      comm->reader_thread_stopping = TRUE;
-      break;
-    }
-    gst_memory_resize (mem, 0, sz);
-    buf = gst_buffer_new ();
-    gst_buffer_append_memory (buf, mem);
-    mem = NULL;
-    GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz);
-    gst_adapter_push (comm->adapter, buf);
-
-    /* If we have more data, we loop, otherwise we break */
-    FD_ZERO (&set);
-    if (fdin >= 0)
-      FD_SET (comm->fdin, &set);
-    tv.tv_sec = 0;
-    tv.tv_usec = 0;
-    sret = select (fdin + 1, &set, NULL, NULL, &tv);
-    if (sret < 0 || !FD_ISSET (fdin, &set))
-      break;
   }
+
   if (mem)
     gst_memory_unref (mem);
 
-  return TRUE;
+  return ret;
 }
 
 static gboolean
@@ -2151,24 +2136,28 @@ static gpointer
 reader_thread (gpointer data)
 {
   GstIpcPipelineComm *comm = (GstIpcPipelineComm *) data;
-
-  g_atomic_int_set (&comm->thread_running, 1);
-  while (!comm->reader_thread_stopping) {
-    if (!update_adapter (comm)) {
-      if (comm->reader_thread_stopping) {
+  gboolean running = TRUE;
+  gint ret = 0;
+
+  while (running) {
+    ret = update_adapter (comm);
+    switch (ret) {
+      case 1:
+        GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
+            ("Failed to read from socket"));
+        running = FALSE;
+        break;
+      case 2:
         GST_INFO_OBJECT (comm->element, "We're stopping, all good");
+        running = FALSE;
+        break;
+      default:
+        read_many (comm);
         break;
-      }
-      GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
-          ("Failed to read from socket"));
-      break;
     }
-    read_many (comm);
   }
 
   GST_INFO_OBJECT (comm->element, "Reader thread ending");
-  g_atomic_int_set (&comm->thread_running, 0);
-
   return NULL;
 }
 
@@ -2184,7 +2173,6 @@ gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
   if (comm->reader_thread)
     return FALSE;
 
-  comm->reader_thread_stopping = FALSE;
   comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
   comm->on_buffer = on_buffer;
   comm->on_event = on_event;
@@ -2193,10 +2181,7 @@ gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
   comm->on_state_lost = on_state_lost;
   comm->on_message = on_message;
   comm->user_data = user_data;
-  if (pipe (comm->reader_thread_stopping_pipe) < 0) {
-    GST_WARNING_OBJECT (comm->element, "Failed to create pipes");
-    return FALSE;
-  }
+  gst_poll_set_flushing (comm->poll, FALSE);
   comm->reader_thread =
       g_thread_new ("reader", (GThreadFunc) reader_thread, comm);
   return TRUE;
@@ -2205,15 +2190,11 @@ gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
 void
 gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm)
 {
-  char dummy = 0;
-
   if (!comm->reader_thread)
     return;
-  while (write (comm->reader_thread_stopping_pipe[1], &dummy, 1) < 0
-      && errno == EINTR);
+
+  gst_poll_set_flushing (comm->poll, TRUE);
   g_thread_join (comm->reader_thread);
-  close (comm->reader_thread_stopping_pipe[0]);
-  close (comm->reader_thread_stopping_pipe[1]);
   comm->reader_thread = NULL;
 }
 
index bd7335e..cebbadb 100644 (file)
@@ -64,9 +64,9 @@ typedef struct
   GHashTable *waiting_ids;
 
   GThread *reader_thread;
-  gboolean reader_thread_stopping;
-  volatile gint thread_running;
-  int reader_thread_stopping_pipe[2];
+  GstPoll *poll;
+  GstPollFD pollFDin;
+
   GstAdapter *adapter;
   guint8 state;
   guint32 send_id;