multisocketsink: keep on reading when we stop sending
authorWim Taymans <wtaymans@redhat.com>
Wed, 2 Dec 2015 08:52:19 +0000 (09:52 +0100)
committerWim Taymans <wtaymans@redhat.com>
Wed, 2 Dec 2015 09:26:03 +0000 (10:26 +0100)
When we stop sending because we need more data, still keep a GSource
around to receive data from the clients.
Also handle read and write in the same go.

gst/tcp/gstmultisocketsink.c
gst/tcp/gstmultisocketsink.h

index cf5a4a5..489cba2 100644 (file)
@@ -181,6 +181,8 @@ static void gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
     GstMultiHandleClient * mhclient);
 static void gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
     GstMultiHandleClient * mhclient);
+static void gst_multi_socket_sink_stop_sending (GstMultiSocketSink * sink,
+    GstSocketClient * client);
 
 static gboolean gst_multi_socket_sink_socket_condition (GstMultiSinkHandle
     handle, GIOCondition condition, GstMultiSocketSink * sink);
@@ -764,12 +766,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
       if (mhclient->bufpos == -1) {
         /* client is too fast, remove from write queue until new buffer is
          * available */
-        /* FIXME: specific */
-        if (client->source) {
-          g_source_destroy (client->source);
-          g_source_unref (client->source);
-          client->source = NULL;
-        }
+        gst_multi_socket_sink_stop_sending (sink, client);
 
         /* if we flushed out all of the client buffers, we can stop */
         if (mhclient->flushcount == 0)
@@ -793,13 +790,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
             mhclient->bufpos = position;
           } else {
             /* cannot send data to this client yet */
-            /* FIXME: specific */
-            if (client->source) {
-              g_source_destroy (client->source);
-              g_source_unref (client->source);
-              client->source = NULL;
-            }
-
+            gst_multi_socket_sink_stop_sending (sink, client);
             return TRUE;
           }
         }
@@ -909,37 +900,58 @@ write_error:
 }
 
 static void
-gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
-    GstMultiHandleClient * mhclient)
+ensure_condition (GstMultiSocketSink * sink, GstSocketClient * client,
+    GIOCondition condition)
 {
-  GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
-  GstSocketClient *client = (GstSocketClient *) (mhclient);
+  GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
 
-  if (!sink->main_context)
+  if (client->condition == condition)
     return;
 
-  if (!client->source) {
-    client->source =
-        g_socket_create_source (mhclient->handle.socket,
-        G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, sink->cancellable);
+  if (client->source) {
+    g_source_destroy (client->source);
+    g_source_unref (client->source);
+  }
+  if (condition && sink->main_context) {
+    client->source = g_socket_create_source (mhclient->handle.socket,
+        condition, sink->cancellable);
     g_source_set_callback (client->source,
         (GSourceFunc) gst_multi_socket_sink_socket_condition,
         gst_object_ref (sink), (GDestroyNotify) gst_object_unref);
     g_source_attach (client->source, sink->main_context);
+  } else {
+    client->source = NULL;
+    condition = 0;
   }
+  client->condition = condition;
+}
+
+static void
+gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
+    GstMultiHandleClient * mhclient)
+{
+  GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
+  GstSocketClient *client = (GstSocketClient *) (mhclient);
+
+  ensure_condition (sink, client,
+      G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP);
 }
 
 static void
 gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
     GstMultiHandleClient * mhclient)
 {
+  GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
   GstSocketClient *client = (GstSocketClient *) (mhclient);
 
-  if (client->source) {
-    g_source_destroy (client->source);
-    g_source_unref (client->source);
-    client->source = NULL;
-  }
+  ensure_condition (sink, client, 0);
+}
+
+static void
+gst_multi_socket_sink_stop_sending (GstMultiSocketSink * sink,
+    GstSocketClient * client)
+{
+  ensure_condition (sink, client, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
 }
 
 /* Handle the clients. This is called when a socket becomes ready
@@ -987,14 +999,16 @@ gst_multi_socket_sink_socket_condition (GstMultiSinkHandle handle,
     gst_multi_handle_sink_remove_client_link (mhsink, clink);
     ret = FALSE;
     goto done;
-  } else if ((condition & G_IO_IN) || (condition & G_IO_PRI)) {
+  }
+  if ((condition & G_IO_IN) || (condition & G_IO_PRI)) {
     /* handle client read */
     if (!gst_multi_socket_sink_handle_client_read (sink, client)) {
       gst_multi_handle_sink_remove_client_link (mhsink, clink);
       ret = FALSE;
       goto done;
     }
-  } else if ((condition & G_IO_OUT)) {
+  }
+  if ((condition & G_IO_OUT)) {
     /* handle client write */
     if (!gst_multi_socket_sink_handle_client_write (sink, client)) {
       gst_multi_handle_sink_remove_client_link (mhsink, clink);
index e1d25ec..29edf1b 100644 (file)
@@ -54,6 +54,7 @@ typedef struct {
   GstMultiHandleClient client;
 
   GSource *source;
+  GIOCondition condition;
 } GstSocketClient;
 
 /**