multihandlesink: introduce Handle union
authorThomas Vander Stichele <thomas (at) apestaart (dot) org>
Fri, 27 Jan 2012 20:28:05 +0000 (21:28 +0100)
committerThomas Vander Stichele <thomas (at) apestaart (dot) org>
Sun, 12 Feb 2012 21:23:44 +0000 (22:23 +0100)
gst/tcp/gstmultifdsink.c
gst/tcp/gstmultifdsink.h
gst/tcp/gstmultihandlesink.h
gst/tcp/gstmultisocketsink.c
gst/tcp/gstmultisocketsink.h
gst/tcp/gsttcpserversink.c

index 136b22d..006c344 100644 (file)
@@ -457,7 +457,7 @@ gst_multi_fd_sink_client_get_fd (GstMultiHandleClient * client)
 
 /* "add-full" signal implementation */
 void
-gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
+gst_multi_fd_sink_add_full (GstMultiFdSink * sink, GstMultiSinkHandle handle,
     GstSyncMethod sync_method, GstFormat min_format, guint64 min_value,
     GstFormat max_format, guint64 max_value)
 {
@@ -467,11 +467,13 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
   gint flags;
   struct stat statbuf;
   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
+  // FIXME: convert to a function so we can vfunc this
+  int fd = handle.fd;
 
-  GST_DEBUG_OBJECT (sink, "[fd %5d] adding client, sync_method %d, "
+  GST_DEBUG_OBJECT (sink, "%s adding client, sync_method %d, "
       "min_format %d, min_value %" G_GUINT64_FORMAT
-      ", max_format %d, max_value %" G_GUINT64_FORMAT, fd, sync_method,
-      min_format, min_value, max_format, max_value);
+      ", max_format %d, max_value %" G_GUINT64_FORMAT, mhclient->debug,
+      sync_method, min_format, min_value, max_format, max_value);
 
   /* do limits check if we can */
   if (min_format == max_format) {
@@ -505,8 +507,8 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
 
   /* set the socket to non blocking */
   if (fcntl (fd, F_SETFL, O_NONBLOCK) < 0) {
-    GST_ERROR_OBJECT (sink, "failed to make socket %d non-blocking: %s", fd,
-        g_strerror (errno));
+    GST_ERROR_OBJECT (sink, "failed to make socket %d non-blocking: %s",
+        mhclient->debug, g_strerror (errno));
   }
 
   /* we always read from a client */
@@ -538,18 +540,18 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd,
 wrong_limits:
   {
     GST_WARNING_OBJECT (sink,
-        "[fd %5d] wrong values min =%" G_GUINT64_FORMAT ", max=%"
-        G_GUINT64_FORMAT ", unit %d specified when adding client", fd,
-        min_value, max_value, min_format);
+        "%s wrong values min =%" G_GUINT64_FORMAT ", max=%"
+        G_GUINT64_FORMAT ", unit %d specified when adding client",
+        mhclient->debug, min_value, max_value, min_format);
     return;
   }
 duplicate:
   {
     mhclient->status = GST_CLIENT_STATUS_DUPLICATE;
     CLIENTS_UNLOCK (sink);
-    GST_WARNING_OBJECT (sink, "[fd %5d] duplicate client found, refusing", fd);
+    GST_WARNING_OBJECT (sink, "%s duplicate client found, refusing", fd);
     g_signal_emit (G_OBJECT (sink),
-        gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd,
+        gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, mhclient->debug,
         mhclient->status);
     g_free (client);
     return;
@@ -558,26 +560,27 @@ duplicate:
 
 /* "add" signal implementation */
 void
-gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd)
+gst_multi_fd_sink_add (GstMultiFdSink * sink, GstMultiSinkHandle handle)
 {
-  GstMultiHandleSink *mhsink;
+  GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
 
-  mhsink = GST_MULTI_HANDLE_SINK (sink);
-  gst_multi_fd_sink_add_full (sink, fd, mhsink->def_sync_method,
+  gst_multi_fd_sink_add_full (sink, handle, mhsink->def_sync_method,
       mhsink->def_burst_format, mhsink->def_burst_value,
       mhsink->def_burst_format, -1);
 }
 
 /* "remove" signal implementation */
 void
-gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd)
+gst_multi_fd_sink_remove (GstMultiFdSink * sink, GstMultiSinkHandle handle)
 {
   GList *clink;
   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
   GstMultiHandleSinkClass *mhsinkclass =
       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
+  // FIXME: convert to a function so we can vfunc this
+  int fd = handle.fd;
 
-  GST_DEBUG_OBJECT (sink, "[fd %5d] removing client", fd);
+  GST_DEBUG_OBJECT (sink, "%s removing client", fd);
 
   CLIENTS_LOCK (sink);
   clink = g_hash_table_lookup (sink->fd_hash, &fd);
@@ -587,7 +590,7 @@ gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd)
 
     if (mhclient->status != GST_CLIENT_STATUS_OK) {
       GST_INFO_OBJECT (sink,
-          "[fd %5d] Client already disconnecting with status %d",
+          "%s Client already disconnecting with status %d",
           fd, mhclient->status);
       goto done;
     }
@@ -597,7 +600,7 @@ gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd)
     // FIXME: specific poll
     gst_poll_restart (sink->fdset);
   } else {
-    GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
+    GST_WARNING_OBJECT (sink, "%s no client with this fd found!", fd);
   }
 
 done:
@@ -606,11 +609,14 @@ done:
 
 /* "remove-flush" signal implementation */
 void
-gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, int fd)
+gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink,
+    GstMultiSinkHandle handle)
 {
   GList *clink;
+  // FIXME: convert to a function so we can vfunc this
+  int fd = handle.fd;
 
-  GST_DEBUG_OBJECT (sink, "[fd %5d] flushing client", fd);
+  GST_DEBUG_OBJECT (sink, "%s flushing client", fd);
 
   CLIENTS_LOCK (sink);
   clink = g_hash_table_lookup (sink->fd_hash, &fd);
@@ -620,7 +626,7 @@ gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, int fd)
 
     if (mhclient->status != GST_CLIENT_STATUS_OK) {
       GST_INFO_OBJECT (sink,
-          "[fd %5d] Client already disconnecting with status %d",
+          "%s Client already disconnecting with status %d",
           fd, mhclient->status);
       goto done;
     }
@@ -633,7 +639,7 @@ gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, int fd)
      * it might have some buffers to flush in the ->sending queue. */
     mhclient->status = GST_CLIENT_STATUS_FLUSHING;
   } else {
-    GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd);
+    GST_WARNING_OBJECT (sink, "%s no client with this fd found!", fd);
   }
 done:
   CLIENTS_UNLOCK (sink);
@@ -661,11 +667,13 @@ gst_multi_fd_sink_clear_post (GstMultiHandleSink * mhsink)
  * guint64 : timestamp of the last buffer sent (in nanoseconds)
  */
 GValueArray *
-gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, int fd)
+gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, GstMultiSinkHandle handle)
 {
   GstTCPClient *client;
   GValueArray *result = NULL;
   GList *clink;
+  // FIXME: convert to a function so we can vfunc this
+  int fd = handle.fd;
 
   CLIENTS_LOCK (sink);
   clink = g_hash_table_lookup (sink->fd_hash, &fd);
@@ -727,7 +735,7 @@ noclient:
 
   /* python doesn't like a NULL pointer yet */
   if (result == NULL) {
-    GST_WARNING_OBJECT (sink, "[fd %5d] no client with this found!", fd);
+    GST_WARNING_OBJECT (sink, "%s no client with this found!", fd);
     result = g_value_array_new (0);
   }
 
@@ -742,16 +750,12 @@ noclient:
 static void
 gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link)
 {
-  int fd;
   GTimeVal now;
   GstTCPClient *client = (GstTCPClient *) link->data;
   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
   GstMultiFdSink *mfsink = GST_MULTI_FD_SINK (sink);
-  GstMultiFdSinkClass *fclass;
-
-  fclass = GST_MULTI_FD_SINK_GET_CLASS (sink);
-
-  fd = client->fd.fd;
+  GstMultiFdSinkClass *fclass = GST_MULTI_FD_SINK_GET_CLASS (sink);
+  int fd = client->fd.fd;
 
   if (mhclient->currently_removing) {
     GST_WARNING_OBJECT (sink, "%s client is already being removed",
@@ -822,7 +826,7 @@ gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link)
    * remove it from the hashtable here */
   if (!g_hash_table_remove (mfsink->fd_hash, &client->fd.fd)) {
     GST_WARNING_OBJECT (sink,
-        "[fd %5d] error removing client %p from hash", client->fd.fd, client);
+        "%s error removing client %p from hash", client->fd.fd, client);
   }
   /* after releasing the lock above, the link could be invalid, more
    * precisely, the next and prev pointers could point to invalid list
@@ -833,7 +837,7 @@ gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link)
   sink->clients_cookie++;
 
   if (fclass->removed)
-    fclass->removed (mfsink, client->fd.fd);
+    fclass->removed (mfsink, (GstMultiSinkHandle) client->fd.fd);
 
   g_free (client);
   CLIENTS_UNLOCK (sink);
@@ -861,18 +865,18 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink,
   if (ioctl (fd, FIONREAD, &avail) < 0)
     goto ioctl_failed;
 
-  GST_DEBUG_OBJECT (sink, "[fd %5d] select reports client read of %d bytes",
+  GST_DEBUG_OBJECT (sink, "%s select reports client read of %d bytes",
       fd, avail);
 
   ret = TRUE;
 
   if (avail == 0) {
     /* client sent close, so remove it */
-    GST_DEBUG_OBJECT (sink, "[fd %5d] client asked for close, removing", fd);
+    GST_DEBUG_OBJECT (sink, "%s client asked for close, removing", fd);
     mhclient->status = GST_CLIENT_STATUS_CLOSED;
     ret = FALSE;
   } else if (avail < 0) {
-    GST_WARNING_OBJECT (sink, "[fd %5d] avail < 0, removing", fd);
+    GST_WARNING_OBJECT (sink, "%s avail < 0, removing", fd);
     mhclient->status = GST_CLIENT_STATUS_ERROR;
     ret = FALSE;
   } else {
@@ -886,18 +890,18 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink,
       /* this is the maximum we can read */
       gint to_read = MIN (avail, 512);
 
-      GST_DEBUG_OBJECT (sink, "[fd %5d] client wants us to read %d bytes",
+      GST_DEBUG_OBJECT (sink, "%s client wants us to read %d bytes",
           fd, to_read);
 
       nread = read (fd, dummy, to_read);
       if (nread < -1) {
-        GST_WARNING_OBJECT (sink, "[fd %5d] could not read %d bytes: %s (%d)",
+        GST_WARNING_OBJECT (sink, "%s could not read %d bytes: %s (%d)",
             fd, to_read, g_strerror (errno), errno);
         mhclient->status = GST_CLIENT_STATUS_ERROR;
         ret = FALSE;
         break;
       } else if (nread == 0) {
-        GST_WARNING_OBJECT (sink, "[fd %5d] 0 bytes in read, removing", fd);
+        GST_WARNING_OBJECT (sink, "%s 0 bytes in read, removing", fd);
         mhclient->status = GST_CLIENT_STATUS_ERROR;
         ret = FALSE;
         break;
@@ -911,7 +915,7 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink,
   /* ERRORS */
 ioctl_failed:
   {
-    GST_WARNING_OBJECT (sink, "[fd %5d] ioctl failed: %s (%d)",
+    GST_WARNING_OBJECT (sink, "%s ioctl failed: %s (%d)",
         fd, g_strerror (errno), errno);
     mhclient->status = GST_CLIENT_STATUS_ERROR;
     return FALSE;
@@ -929,7 +933,6 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
   gboolean send_streamheader = FALSE;
   GstStructure *s;
   GstMultiFdSink *sink = GST_MULTI_FD_SINK (mhsink);
-  GstTCPClient *client = (GstTCPClient *) mhclient;
 
   /* before we queue the buffer, we check if we need to queue streamheader
    * buffers (because it's a new client, or because they changed) */
@@ -937,8 +940,8 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
 
   if (!mhclient->caps) {
     GST_DEBUG_OBJECT (sink,
-        "[fd %5d] no previous caps for this client, send streamheader",
-        client->fd.fd);
+        "%s no previous caps for this client, send streamheader",
+        mhclient->debug);
     send_streamheader = TRUE;
     mhclient->caps = gst_caps_ref (caps);
   } else {
@@ -951,23 +954,23 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
       if (!gst_structure_has_field (s, "streamheader")) {
         /* no new streamheader, so nothing new to send */
         GST_DEBUG_OBJECT (sink,
-            "[fd %5d] new caps do not have streamheader, not sending",
-            client->fd.fd);
+            "%s new caps do not have streamheader, not sending",
+            mhclient->debug);
       } else {
         /* there is a new streamheader */
         s = gst_caps_get_structure (mhclient->caps, 0);
         if (!gst_structure_has_field (s, "streamheader")) {
           /* no previous streamheader, so send the new one */
           GST_DEBUG_OBJECT (sink,
-              "[fd %5d] previous caps did not have streamheader, sending",
-              client->fd.fd);
+              "%s previous caps did not have streamheader, sending",
+              mhclient->debug);
           send_streamheader = TRUE;
         } else {
           /* both old and new caps have streamheader set */
           if (!mhsink->resend_streamheader) {
             GST_DEBUG_OBJECT (sink,
-                "[fd %5d] asked to not resend the streamheader, not sending",
-                client->fd.fd);
+                "%s asked to not resend the streamheader, not sending",
+                mhclient->debug);
             send_streamheader = FALSE;
           } else {
             sh1 = gst_structure_get_value (s, "streamheader");
@@ -975,8 +978,8 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
             sh2 = gst_structure_get_value (s, "streamheader");
             if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) {
               GST_DEBUG_OBJECT (sink,
-                  "[fd %5d] new streamheader different from old, sending",
-                  client->fd.fd);
+                  "%s new streamheader different from old, sending",
+                  mhclient->debug);
               send_streamheader = TRUE;
             }
           }
@@ -994,16 +997,16 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
     int i;
 
     GST_LOG_OBJECT (sink,
-        "[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT,
-        client->fd.fd, caps);
+        "%s sending streamheader from caps %" GST_PTR_FORMAT,
+        mhclient->debug, caps);
     s = gst_caps_get_structure (caps, 0);
     if (!gst_structure_has_field (s, "streamheader")) {
       GST_DEBUG_OBJECT (sink,
-          "[fd %5d] no new streamheader, so nothing to send", client->fd.fd);
+          "%s no new streamheader, so nothing to send", mhclient->debug);
     } else {
       GST_LOG_OBJECT (sink,
-          "[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT,
-          client->fd.fd, caps);
+          "%s sending streamheader from caps %" GST_PTR_FORMAT,
+          mhclient->debug, caps);
       sh = gst_structure_get_value (s, "streamheader");
       g_assert (G_VALUE_TYPE (sh) == GST_TYPE_ARRAY);
       buffers = g_value_peek_pointer (sh);
@@ -1016,8 +1019,8 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
         g_assert (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER);
         buffer = g_value_peek_pointer (bufval);
         GST_DEBUG_OBJECT (sink,
-            "[fd %5d] queueing streamheader buffer of length %" G_GSIZE_FORMAT,
-            client->fd.fd, gst_buffer_get_size (buffer));
+            "%s queueing streamheader buffer of length %" G_GSIZE_FORMAT,
+            mhclient->debug, gst_buffer_get_size (buffer));
         gst_buffer_ref (buffer);
 
         mhclient->sending = g_slist_append (mhclient->sending, buffer);
@@ -1028,8 +1031,8 @@ gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
   gst_caps_unref (caps);
   caps = NULL;
 
-  GST_LOG_OBJECT (sink, "[fd %5d] queueing buffer of length %" G_GSIZE_FORMAT,
-      client->fd.fd, gst_buffer_get_size (buffer));
+  GST_LOG_OBJECT (sink, "%s queueing buffer of length %" G_GSIZE_FORMAT,
+      mhclient->debug, gst_buffer_get_size (buffer));
 
   gst_buffer_ref (buffer);
   mhclient->sending = g_slist_append (mhclient->sending, buffer);
@@ -1063,7 +1066,7 @@ static gboolean
 gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
     GstTCPClient * client)
 {
-  int fd = client->fd.fd;
+  GstMultiSinkHandle handle = (GstMultiSinkHandle) client->fd.fd;
   gboolean more;
   gboolean flushing;
   GstClockTime now;
@@ -1072,7 +1075,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
   GstMultiHandleSinkClass *mhsinkclass =
       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
-
+  int fd = handle.fd;
 
   g_get_current_time (&nowtv);
   now = GST_TIMEVAL_TO_TIME (nowtv);
@@ -1137,8 +1140,8 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
         if (mhclient->flushcount != -1)
           mhclient->flushcount--;
 
-        GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d",
-            fd, client, mhclient->bufpos);
+        GST_LOG_OBJECT (sink, "%s client %p at position %d",
+            mhclient->debug, client, mhclient->bufpos);
 
         /* queueing a buffer will ref it */
         mhsinkclass->client_queue_buffer (mhsink, mhclient, buf);
@@ -1191,7 +1194,8 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
           /* partial write means that the client cannot read more and we should
            * stop sending more */
           GST_LOG_OBJECT (sink,
-              "partial write on %d of %" G_GSSIZE_FORMAT " bytes", fd, wrote);
+              "partial write on %d of %" G_GSSIZE_FORMAT " bytes",
+              mhclient->debug, wrote);
           mhclient->bufoffset += wrote;
           more = FALSE;
         } else {
@@ -1214,20 +1218,20 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
   /* ERRORS */
 flushed:
   {
-    GST_DEBUG_OBJECT (sink, "[fd %5d] flushed, removing", fd);
+    GST_DEBUG_OBJECT (sink, "%s flushed, removing", fd);
     mhclient->status = GST_CLIENT_STATUS_REMOVED;
     return FALSE;
   }
 connection_reset:
   {
-    GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing", fd);
+    GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing", fd);
     mhclient->status = GST_CLIENT_STATUS_CLOSED;
     return FALSE;
   }
 write_error:
   {
     GST_WARNING_OBJECT (sink,
-        "[fd %5d] could not write, removing client: %s (%d)", fd,
+        "%s could not write, removing client: %s (%d)", mhclient->debug,
         g_strerror (errno), errno);
     mhclient->status = GST_CLIENT_STATUS_ERROR;
     return FALSE;
@@ -1307,8 +1311,8 @@ restart:
     next = g_list_next (clients);
 
     mhclient->bufpos++;
-    GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d",
-        client->fd.fd, client, mhclient->bufpos);
+    GST_LOG_OBJECT (sink, "%s client %p at position %d",
+        mhclient->debug, client, mhclient->bufpos);
     /* check soft max if needed, recover client */
     if (soft_max_buffers > 0 && mhclient->bufpos >= soft_max_buffers) {
       gint newpos;
@@ -1318,12 +1322,11 @@ restart:
         mhclient->dropped_buffers += mhclient->bufpos - newpos;
         mhclient->bufpos = newpos;
         mhclient->discont = TRUE;
-        GST_INFO_OBJECT (sink, "[fd %5d] client %p position reset to %d",
-            client->fd.fd, client, mhclient->bufpos);
+        GST_INFO_OBJECT (sink, "%s client %p position reset to %d",
+            mhclient->debug, client, mhclient->bufpos);
       } else {
         GST_INFO_OBJECT (sink,
-            "[fd %5d] client %p not recovering position",
-            client->fd.fd, client);
+            "%s client %p not recovering position", mhclient->debug, client);
       }
     }
     /* check hard max and timeout, remove client */
@@ -1331,8 +1334,8 @@ restart:
         (mhsink->timeout > 0
             && now - mhclient->last_activity_time > mhsink->timeout)) {
       /* remove client */
-      GST_WARNING_OBJECT (sink, "[fd %5d] client %p is too slow, removing",
-          client->fd.fd, client);
+      GST_WARNING_OBJECT (sink, "%s client %p is too slow, removing",
+          mhclient->debug, client);
       /* remove the client, the fd set will be cleared and the select thread
        * will be signaled */
       mhclient->status = GST_CLIENT_STATUS_SLOW;
@@ -1446,6 +1449,7 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
   GstMultiHandleSinkClass *mhsinkclass =
       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
+  int fd;
 
 
   fclass = GST_MULTI_FD_SINK_GET_CLASS (sink);
@@ -1500,7 +1504,6 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink)
         for (clients = mhsink->clients; clients; clients = next) {
           GstTCPClient *client;
           GstMultiHandleClient *mhclient;
-          int fd;
           long flags;
           int res;
 
index d589539..5f6a5d9 100644 (file)
@@ -80,35 +80,35 @@ struct _GstMultiFdSinkClass {
   GstMultiHandleSinkClass parent_class;
 
   /* element methods */
-  void          (*add)          (GstMultiFdSink *sink, int fd);
-  void          (*add_full)     (GstMultiFdSink *sink, int fd, GstSyncMethod sync,
+  void          (*add)          (GstMultiFdSink *sink, GstMultiSinkHandle handle);
+  void          (*add_full)     (GstMultiFdSink *sink, GstMultiSinkHandle handle, GstSyncMethod sync,
                                 GstFormat format, guint64 value, 
                                 GstFormat max_format, guint64 max_value);
-  void          (*remove)       (GstMultiFdSink *sink, int fd);
-  void          (*remove_flush) (GstMultiFdSink *sink, int fd);
+  void          (*remove)       (GstMultiFdSink *sink, GstMultiSinkHandle handle);
+  void          (*remove_flush) (GstMultiFdSink *sink, GstMultiSinkHandle handle);
   void          (*clear)        (GstMultiFdSink *sink);
-  GValueArray*  (*get_stats)    (GstMultiFdSink *sink, int fd);
+  GValueArray*  (*get_stats)    (GstMultiFdSink *sink, GstMultiSinkHandle handle);
 
   /* vtable */
   gboolean (*wait)   (GstMultiFdSink *sink, GstPoll *set);
-  void (*removed) (GstMultiFdSink *sink, int fd);
+  void (*removed) (GstMultiFdSink *sink, GstMultiSinkHandle handle);
 
   /* signals */
-  void (*client_added) (GstElement *element, gint fd);
-  void (*client_removed) (GstElement *element, gint fd, GstClientStatus status);
-  void (*client_fd_removed) (GstElement *element, gint fd);
+  void (*client_added) (GstElement *element, GstMultiSinkHandle handle);
+  void (*client_removed) (GstElement *element, GstMultiSinkHandle handle, GstClientStatus status);
+  void (*client_fd_removed) (GstElement *element, GstMultiSinkHandle handle);
 };
 
 GType gst_multi_fd_sink_get_type (void);
 
-void          gst_multi_fd_sink_add          (GstMultiFdSink *sink, int fd);
-void          gst_multi_fd_sink_add_full     (GstMultiFdSink *sink, int fd, GstSyncMethod sync, 
+void          gst_multi_fd_sink_add          (GstMultiFdSink *sink, GstMultiSinkHandle handle);
+void          gst_multi_fd_sink_add_full     (GstMultiFdSink *sink, GstMultiSinkHandle handle, GstSyncMethod sync, 
                                               GstFormat min_format, guint64 min_value,
                                               GstFormat max_format, guint64 max_value);
-void          gst_multi_fd_sink_remove       (GstMultiFdSink *sink, int fd);
-void          gst_multi_fd_sink_remove_flush (GstMultiFdSink *sink, int fd);
+void          gst_multi_fd_sink_remove       (GstMultiFdSink *sink, GstMultiSinkHandle handle);
+void          gst_multi_fd_sink_remove_flush (GstMultiFdSink *sink, GstMultiSinkHandle handle);
 void          gst_multi_fd_sink_clear        (GstMultiHandleSink *sink);
-GValueArray*  gst_multi_fd_sink_get_stats    (GstMultiFdSink *sink, int fd);
+GValueArray*  gst_multi_fd_sink_get_stats    (GstMultiFdSink *sink, GstMultiSinkHandle handle);
 
 G_END_DECLS
 
index ba4e23b..bf273ba 100644 (file)
@@ -120,6 +120,13 @@ typedef enum
   GST_CLIENT_STATUS_FLUSHING    = 6
 } GstClientStatus;
 
+// FIXME: is it better to use GSocket * or a gpointer here ?
+typedef union
+{
+  int fd;
+  GSocket *socket;
+} GstMultiSinkHandle;
+
 /* structure for a client
  */
 typedef struct {
@@ -251,12 +258,12 @@ struct _GstMultiHandleSinkClass {
   GstBaseSinkClass parent_class;
 
   /* element methods */
-  void          (*add)          (GstMultiHandleSink *sink, GSocket *socket);
-  void          (*add_full)     (GstMultiHandleSink *sink, GSocket *socket, GstSyncMethod sync,
+  void          (*add)          (GstMultiHandleSink *sink, GstMultiSinkHandle handle);
+  void          (*add_full)     (GstMultiHandleSink *sink, GstMultiSinkHandle handle, GstSyncMethod sync,
                                 GstFormat format, guint64 value, 
                                 GstFormat max_format, guint64 max_value);
-  void          (*remove)       (GstMultiHandleSink *sink, GSocket *socket);
-  void          (*remove_flush) (GstMultiHandleSink *sink, GSocket *socket);
+  void          (*remove)       (GstMultiHandleSink *sink, GstMultiSinkHandle handle);
+  void          (*remove_flush) (GstMultiHandleSink *sink, GstMultiSinkHandle handle);
   void          (*clear)        (GstMultiHandleSink *sink);
   void          (*clear_post)   (GstMultiHandleSink *sink);
   void          (*stop_pre)     (GstMultiHandleSink *sink);
@@ -273,18 +280,18 @@ struct _GstMultiHandleSinkClass {
                                 (GstMultiHandleClient *client);
 
 
-  GstStructure* (*get_stats)    (GstMultiHandleSink *sink, GSocket *socket);
+  GstStructure* (*get_stats)    (GstMultiHandleSink *sink, GstMultiSinkHandle handle);
   void          (*remove_client_link) (GstMultiHandleSink * sink, GList * link);
 
   /* vtable */
   gboolean (*init)   (GstMultiHandleSink *sink);
   gboolean (*close)  (GstMultiHandleSink *sink);
-  void (*removed) (GstMultiHandleSink *sink, GSocket *socket);
+  void (*removed) (GstMultiHandleSink *sink, GstMultiSinkHandle handle);
 
   /* signals */
-  void (*client_added) (GstElement *element, GSocket *socket);
-  void (*client_removed) (GstElement *element, GSocket *socket, GstClientStatus status);
-  void (*client_socket_removed) (GstElement *element, GSocket *socket);
+  void (*client_added) (GstElement *element, GstMultiSinkHandle handle);
+  void (*client_removed) (GstElement *element, GstMultiSinkHandle handle, GstClientStatus status);
+  void (*client_socket_removed) (GstElement *element, GstMultiSinkHandle handle);
 };
 
 GType gst_multi_handle_sink_get_type (void);
index 831ffdf..d3f640c 100644 (file)
@@ -160,8 +160,8 @@ static int gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client);
 
 static void gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
     GList * link);
-static gboolean gst_multi_socket_sink_socket_condition (GSocket * socket,
-    GIOCondition condition, GstMultiSocketSink * sink);
+static gboolean gst_multi_socket_sink_socket_condition (GstMultiSinkHandle
+    handle, GIOCondition condition, GstMultiSocketSink * sink);
 
 static gboolean gst_multi_socket_sink_unlock (GstBaseSink * bsink);
 static gboolean gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink);
@@ -399,23 +399,25 @@ gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client)
 {
   GstSocketClient *msclient = (GstSocketClient *) client;
 
-  return g_socket_get_fd (msclient->socket);
+  return g_socket_get_fd (msclient->handle.socket);
 }
 
 /* "add-full" signal implementation */
 void
-gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
-    GstSyncMethod sync_method, GstFormat min_format, guint64 min_value,
-    GstFormat max_format, guint64 max_value)
+gst_multi_socket_sink_add_full (GstMultiSocketSink * sink,
+    GstMultiSinkHandle handle, GstSyncMethod sync_method, GstFormat min_format,
+    guint64 min_value, GstFormat max_format, guint64 max_value)
 {
   GstSocketClient *client;
   GstMultiHandleClient *mhclient;
   GList *clink;
   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
 
-  GST_DEBUG_OBJECT (sink, "[socket %p] adding client, sync_method %d, "
+  g_assert (G_IS_SOCKET (handle.socket));
+
+  GST_DEBUG_OBJECT (sink, "%s adding client, sync_method %d, "
       "min_format %d, min_value %" G_GUINT64_FORMAT
-      ", max_format %d, max_value %" G_GUINT64_FORMAT, socket,
+      ", max_format %d, max_value %" G_GUINT64_FORMAT, handle.socket,
       sync_method, min_format, min_value, max_format, max_value);
 
   /* do limits check if we can */
@@ -428,8 +430,8 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
   client = g_new0 (GstSocketClient, 1);
   mhclient = (GstMultiHandleClient *) client;
   gst_multi_handle_sink_client_init (mhclient, sync_method);
-  g_snprintf (mhclient->debug, 30, "[socket %p]", socket);
-  client->socket = G_SOCKET (g_object_ref (socket));
+  g_snprintf (mhclient->debug, 30, "[socket %p]", handle.socket);
+  client->handle.socket = G_SOCKET (g_object_ref (handle.socket));
 
   mhclient->burst_min_format = min_format;
   mhclient->burst_min_value = min_value;
@@ -439,22 +441,22 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
   CLIENTS_LOCK (sink);
 
   /* check the hash to find a duplicate fd */
-  clink = g_hash_table_lookup (sink->socket_hash, socket);
+  clink = g_hash_table_lookup (sink->socket_hash, handle.socket);
   if (clink != NULL)
     goto duplicate;
 
   /* we can add the fd now */
   clink = mhsink->clients = g_list_prepend (mhsink->clients, client);
-  g_hash_table_insert (sink->socket_hash, socket, clink);
+  g_hash_table_insert (sink->socket_hash, handle.socket, clink);
   mhsink->clients_cookie++;
 
   /* set the socket to non blocking */
-  g_socket_set_blocking (socket, FALSE);
+  g_socket_set_blocking (handle.socket, FALSE);
 
   /* we always read from a client */
   if (sink->main_context) {
     client->source =
-        g_socket_create_source (client->socket,
+        g_socket_create_source (client->handle.socket,
         G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, sink->cancellable);
     g_source_set_callback (client->source,
         (GSourceFunc) gst_multi_socket_sink_socket_condition,
@@ -467,7 +469,7 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
   CLIENTS_UNLOCK (sink);
 
   g_signal_emit (G_OBJECT (sink),
-      gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED], 0, socket);
+      gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED], 0, handle);
 
   return;
 
@@ -475,19 +477,19 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
 wrong_limits:
   {
     GST_WARNING_OBJECT (sink,
-        "[socket %p] wrong values min =%" G_GUINT64_FORMAT ", max=%"
-        G_GUINT64_FORMAT ", format %d specified when adding client", socket,
-        min_value, max_value, min_format);
+        "%s wrong values min =%" G_GUINT64_FORMAT ", max=%"
+        G_GUINT64_FORMAT ", format %d specified when adding client",
+        mhclient->debug, min_value, max_value, min_format);
     return;
   }
 duplicate:
   {
     mhclient->status = GST_CLIENT_STATUS_DUPLICATE;
     CLIENTS_UNLOCK (sink);
-    GST_WARNING_OBJECT (sink, "[socket %p] duplicate client found, refusing",
-        socket);
+    GST_WARNING_OBJECT (sink, "%s duplicate client found, refusing",
+        mhclient->debug);
     g_signal_emit (G_OBJECT (sink),
-        gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, socket,
+        gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, handle,
         mhclient->status);
     g_free (client);
     return;
@@ -496,37 +498,39 @@ duplicate:
 
 /* "add" signal implementation */
 void
-gst_multi_socket_sink_add (GstMultiSocketSink * sink, GSocket * socket)
+gst_multi_socket_sink_add (GstMultiSocketSink * sink, GstMultiSinkHandle handle)
 {
   GstMultiHandleSink *mhsink;
 
   mhsink = GST_MULTI_HANDLE_SINK (sink);
-  gst_multi_socket_sink_add_full (sink, socket, mhsink->def_sync_method,
+  gst_multi_socket_sink_add_full (sink, handle, mhsink->def_sync_method,
       mhsink->def_burst_format, mhsink->def_burst_value,
       mhsink->def_burst_format, -1);
 }
 
 /* "remove" signal implementation */
 void
-gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GSocket * socket)
+gst_multi_socket_sink_remove (GstMultiSocketSink * sink,
+    GstMultiSinkHandle handle)
 {
   GList *clink;
   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
   GstMultiHandleSinkClass *mhsinkclass =
       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
 
-  GST_DEBUG_OBJECT (sink, "[socket %p] removing client", socket);
+  // FIXME; how to vfunc this ?
+  GST_DEBUG_OBJECT (sink, "[socket %p] removing client", handle.socket);
 
   CLIENTS_LOCK (sink);
-  clink = g_hash_table_lookup (sink->socket_hash, socket);
+  clink = g_hash_table_lookup (sink->socket_hash, handle.socket);
   if (clink != NULL) {
     GstSocketClient *client = clink->data;
     GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
 
     if (mhclient->status != GST_CLIENT_STATUS_OK) {
       GST_INFO_OBJECT (sink,
-          "[socket %p] Client already disconnecting with status %d",
-          socket, mhclient->status);
+          "%s Client already disconnecting with status %d",
+          mhclient->debug, mhclient->status);
       goto done;
     }
 
@@ -534,7 +538,7 @@ gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GSocket * socket)
     mhsinkclass->remove_client_link (GST_MULTI_HANDLE_SINK (sink), clink);
   } else {
     GST_WARNING_OBJECT (sink, "[socket %p] no client with this socket found!",
-        socket);
+        handle.socket);
   }
 
 done:
@@ -543,21 +547,22 @@ done:
 
 /* "remove-flush" signal implementation */
 void
-gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, GSocket * socket)
+gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink,
+    GstMultiSinkHandle handle)
 {
   GList *clink;
 
-  GST_DEBUG_OBJECT (sink, "[socket %p] flushing client", socket);
+  GST_DEBUG_OBJECT (sink, "[socket %p] flushing client", handle.socket);
 
   CLIENTS_LOCK (sink);
-  clink = g_hash_table_lookup (sink->socket_hash, socket);
+  clink = g_hash_table_lookup (sink->socket_hash, handle.socket);
   if (clink != NULL) {
     GstSocketClient *client = clink->data;
     GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
 
     if (mhclient->status != GST_CLIENT_STATUS_OK) {
       GST_INFO_OBJECT (sink,
-          "[socket %p] Client already disconnecting with status %d",
+          "%s Client already disconnecting with status %d",
           socket, mhclient->status);
       goto done;
     }
@@ -570,8 +575,7 @@ gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, GSocket * socket)
      * it might have some buffers to flush in the ->sending queue. */
     mhclient->status = GST_CLIENT_STATUS_FLUSHING;
   } else {
-    GST_WARNING_OBJECT (sink, "[socket %p] no client with this fd found!",
-        socket);
+    GST_WARNING_OBJECT (sink, "%s no client with this fd found!", socket);
   }
 done:
   CLIENTS_UNLOCK (sink);
@@ -580,14 +584,15 @@ done:
 /* "get-stats" signal implementation
  */
 GstStructure *
-gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink, GSocket * socket)
+gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink,
+    GstMultiSinkHandle handle)
 {
   GstSocketClient *client;
   GstStructure *result = NULL;
   GList *clink;
 
   CLIENTS_LOCK (sink);
-  clink = g_hash_table_lookup (sink->socket_hash, socket);
+  clink = g_hash_table_lookup (sink->socket_hash, handle.socket);
   if (clink == NULL)
     goto noclient;
 
@@ -624,7 +629,7 @@ noclient:
 
   /* python doesn't like a NULL pointer yet */
   if (result == NULL) {
-    GST_WARNING_OBJECT (sink, "[socket %p] no client with this found!", socket);
+    GST_WARNING_OBJECT (sink, "%s no client with this found!", socket);
     result = gst_structure_new_empty ("multisocketsink-stats");
   }
 
@@ -640,7 +645,6 @@ static void
 gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
     GList * link)
 {
-  GSocket *socket;
   GTimeVal now;
   GstSocketClient *client = link->data;
   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
@@ -649,8 +653,6 @@ gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
 
   fclass = GST_MULTI_SOCKET_SINK_GET_CLASS (sink);
 
-  socket = client->socket;
-
   if (mhclient->currently_removing) {
     GST_WARNING_OBJECT (sink, "%s client is already being removed",
         mhclient->debug);
@@ -714,7 +716,7 @@ gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
   CLIENTS_UNLOCK (sink);
 
   g_signal_emit (G_OBJECT (sink),
-      gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, socket,
+      gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, client->handle,
       mhclient->status);
 
   /* lock again before we remove the client completely */
@@ -722,9 +724,9 @@ gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
 
   /* fd cannot be reused in the above signal callback so we can safely
    * remove it from the hashtable here */
-  if (!g_hash_table_remove (mssink->socket_hash, socket)) {
+  if (!g_hash_table_remove (mssink->socket_hash, client->handle.socket)) {
     GST_WARNING_OBJECT (sink,
-        "[socket %p] error removing client %p from hash", socket, client);
+        "%s error removing client %p from hash", mhclient, client);
   }
   /* after releasing the lock above, the link could be invalid, more
    * precisely, the next and prev pointers could point to invalid list
@@ -735,15 +737,16 @@ gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink,
   sink->clients_cookie++;
 
   if (fclass->removed)
-    fclass->removed (mssink, socket);
+    fclass->removed (sink, client->handle);
 
   g_free (client);
   CLIENTS_UNLOCK (sink);
 
   /* and the fd is really gone now */
   g_signal_emit (G_OBJECT (sink),
-      gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED], 0, socket);
-  g_object_unref (socket);
+      gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED], 0,
+      client->handle);
+  g_object_unref (client->handle.socket);
 
   CLIENTS_LOCK (sink);
 }
@@ -762,8 +765,7 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
   gboolean first = TRUE;
   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
 
-  GST_DEBUG_OBJECT (sink, "[socket %p] select reports client read",
-      client->socket);
+  GST_DEBUG_OBJECT (sink, "%s select reports client read", mhclient->debug);
 
   ret = TRUE;
 
@@ -773,25 +775,24 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
   do {
     gssize navail;
 
-    GST_DEBUG_OBJECT (sink, "[socket %p] client wants us to read",
-        client->socket);
+    GST_DEBUG_OBJECT (sink, "%s client wants us to read", mhclient->debug);
 
-    navail = g_socket_get_available_bytes (client->socket);
+    navail = g_socket_get_available_bytes (client->handle.socket);
     if (navail < 0)
       break;
 
     nread =
-        g_socket_receive (client->socket, dummy, MIN (navail, sizeof (dummy)),
-        sink->cancellable, &err);
+        g_socket_receive (client->handle.socket, dummy, MIN (navail,
+            sizeof (dummy)), sink->cancellable, &err);
     if (first && nread == 0) {
       /* client sent close, so remove it */
-      GST_DEBUG_OBJECT (sink, "[socket %p] client asked for close, removing",
-          client->socket);
+      GST_DEBUG_OBJECT (sink, "%s client asked for close, removing",
+          mhclient->debug);
       mhclient->status = GST_CLIENT_STATUS_CLOSED;
       ret = FALSE;
     } else if (nread < 0) {
-      GST_WARNING_OBJECT (sink, "[socket %p] could not read: %s",
-          client->socket, err->message);
+      GST_WARNING_OBJECT (sink, "%s could not read: %s",
+          mhclient->debug, err->message);
       mhclient->status = GST_CLIENT_STATUS_ERROR;
       ret = FALSE;
       break;
@@ -808,7 +809,6 @@ static gboolean
 gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
     GstMultiHandleClient * mhclient, GstBuffer * buffer)
 {
-  GstSocketClient *client = (GstSocketClient *) mhclient;
   GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
   GstCaps *caps;
 
@@ -822,8 +822,8 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
 
   if (!mhclient->caps) {
     GST_DEBUG_OBJECT (sink,
-        "[socket %p] no previous caps for this client, send streamheader",
-        client->socket);
+        "%s no previous caps for this client, send streamheader",
+        mhclient->debug);
     send_streamheader = TRUE;
     mhclient->caps = gst_caps_ref (caps);
   } else {
@@ -836,23 +836,23 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
       if (!gst_structure_has_field (s, "streamheader")) {
         /* no new streamheader, so nothing new to send */
         GST_DEBUG_OBJECT (sink,
-            "[socket %p] new caps do not have streamheader, not sending",
-            client->socket);
+            "%s new caps do not have streamheader, not sending",
+            mhclient->debug);
       } else {
         /* there is a new streamheader */
         s = gst_caps_get_structure (mhclient->caps, 0);
         if (!gst_structure_has_field (s, "streamheader")) {
           /* no previous streamheader, so send the new one */
           GST_DEBUG_OBJECT (sink,
-              "[socket %p] previous caps did not have streamheader, sending",
-              client->socket);
+              "%s previous caps did not have streamheader, sending",
+              mhclient->debug);
           send_streamheader = TRUE;
         } else {
           /* both old and new caps have streamheader set */
           if (!mhsink->resend_streamheader) {
             GST_DEBUG_OBJECT (sink,
-                "[socket %p] asked to not resend the streamheader, not sending",
-                client->socket);
+                "%s asked to not resend the streamheader, not sending",
+                mhclient->debug);
             send_streamheader = FALSE;
           } else {
             sh1 = gst_structure_get_value (s, "streamheader");
@@ -860,8 +860,8 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
             sh2 = gst_structure_get_value (s, "streamheader");
             if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) {
               GST_DEBUG_OBJECT (sink,
-                  "[socket %p] new streamheader different from old, sending",
-                  client->socket);
+                  "%s new streamheader different from old, sending",
+                  mhclient->debug);
               send_streamheader = TRUE;
             }
           }
@@ -879,17 +879,16 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
     int i;
 
     GST_LOG_OBJECT (sink,
-        "[socket %p] sending streamheader from caps %" GST_PTR_FORMAT,
-        client->socket, caps);
+        "%s sending streamheader from caps %" GST_PTR_FORMAT,
+        mhclient->debug, caps);
     s = gst_caps_get_structure (caps, 0);
     if (!gst_structure_has_field (s, "streamheader")) {
       GST_DEBUG_OBJECT (sink,
-          "[socket %p] no new streamheader, so nothing to send",
-          client->socket);
+          "%s no new streamheader, so nothing to send", mhclient->debug);
     } else {
       GST_LOG_OBJECT (sink,
-          "[socket %p] sending streamheader from caps %" GST_PTR_FORMAT,
-          client->socket, caps);
+          "%s sending streamheader from caps %" GST_PTR_FORMAT,
+          mhclient->debug, caps);
       sh = gst_structure_get_value (s, "streamheader");
       g_assert (G_VALUE_TYPE (sh) == GST_TYPE_ARRAY);
       buffers = g_value_peek_pointer (sh);
@@ -902,8 +901,8 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
         g_assert (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER);
         buffer = g_value_peek_pointer (bufval);
         GST_DEBUG_OBJECT (sink,
-            "[socket %p] queueing streamheader buffer of length %"
-            G_GSIZE_FORMAT, client->socket, gst_buffer_get_size (buffer));
+            "%s queueing streamheader buffer of length %"
+            G_GSIZE_FORMAT, mhclient->debug, gst_buffer_get_size (buffer));
         gst_buffer_ref (buffer);
 
         mhclient->sending = g_slist_append (mhclient->sending, buffer);
@@ -915,7 +914,7 @@ gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink,
   caps = NULL;
 
   GST_LOG_OBJECT (sink,
-      "[socket %p] queueing buffer of length %" G_GSIZE_FORMAT, client->socket,
+      "%s queueing buffer of length %" G_GSIZE_FORMAT, mhclient->debug,
       gst_buffer_get_size (buffer));
 
   gst_buffer_ref (buffer);
@@ -950,7 +949,6 @@ static gboolean
 gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
     GstSocketClient * client)
 {
-  GSocket *socket = client->socket;
   gboolean more;
   gboolean flushing;
   GstClockTime now;
@@ -1034,7 +1032,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
         if (mhclient->flushcount != -1)
           mhclient->flushcount--;
 
-        GST_LOG_OBJECT (sink, "[socket %p] client %p at position %d",
+        GST_LOG_OBJECT (sink, "%s client %p at position %d",
             socket, client, mhclient->bufpos);
 
         /* queueing a buffer will ref it */
@@ -1061,8 +1059,9 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
       /* try to write the complete buffer */
 
       wrote =
-          g_socket_send (socket, (gchar *) map.data + mhclient->bufoffset,
-          maxsize, sink->cancellable, &err);
+          g_socket_send (client->handle.socket,
+          (gchar *) map.data + mhclient->bufoffset, maxsize, sink->cancellable,
+          &err);
       gst_buffer_unmap (head, &map);
 
       if (wrote < 0) {
@@ -1101,14 +1100,13 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
   /* ERRORS */
 flushed:
   {
-    GST_DEBUG_OBJECT (sink, "[socket %p] flushed, removing", socket);
+    GST_DEBUG_OBJECT (sink, "%s flushed, removing", socket);
     mhclient->status = GST_CLIENT_STATUS_REMOVED;
     return FALSE;
   }
 connection_reset:
   {
-    GST_DEBUG_OBJECT (sink, "[socket %p] connection reset by peer, removing",
-        socket);
+    GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing", socket);
     mhclient->status = GST_CLIENT_STATUS_CLOSED;
     g_clear_error (&err);
     return FALSE;
@@ -1116,8 +1114,7 @@ connection_reset:
 write_error:
   {
     GST_WARNING_OBJECT (sink,
-        "[socket %p] could not write, removing client: %s", socket,
-        err->message);
+        "%s could not write, removing client: %s", socket, err->message);
     g_clear_error (&err);
     mhclient->status = GST_CLIENT_STATUS_ERROR;
     return FALSE;
@@ -1198,8 +1195,8 @@ restart:
     next = g_list_next (clients);
 
     mhclient->bufpos++;
-    GST_LOG_OBJECT (sink, "[socket %p] client %p at position %d",
-        client->socket, client, mhclient->bufpos);
+    GST_LOG_OBJECT (sink, "%s client %p at position %d",
+        mhclient->debug, client, mhclient->bufpos);
     /* check soft max if needed, recover client */
     if (soft_max_buffers > 0 && mhclient->bufpos >= soft_max_buffers) {
       gint newpos;
@@ -1209,12 +1206,11 @@ restart:
         mhclient->dropped_buffers += mhclient->bufpos - newpos;
         mhclient->bufpos = newpos;
         mhclient->discont = TRUE;
-        GST_INFO_OBJECT (sink, "[socket %p] client %p position reset to %d",
-            client->socket, client, mhclient->bufpos);
+        GST_INFO_OBJECT (sink, "%s client %p position reset to %d",
+            mhclient->debug, client, mhclient->bufpos);
       } else {
         GST_INFO_OBJECT (sink,
-            "[socket %p] client %p not recovering position",
-            client->socket, client);
+            "%s client %p not recovering position", mhclient->debug, client);
       }
     }
     /* check hard max and timeout, remove client */
@@ -1222,8 +1218,8 @@ restart:
         (mhsink->timeout > 0
             && now - mhclient->last_activity_time > mhsink->timeout)) {
       /* remove client */
-      GST_WARNING_OBJECT (sink, "[socket %p] client %p is too slow, removing",
-          client->socket, client);
+      GST_WARNING_OBJECT (sink, "%s client %p is too slow, removing",
+          mhclient->debug, client);
       /* remove the client, the fd set will be cleared and the select thread
        * will be signaled */
       mhclient->status = GST_CLIENT_STATUS_SLOW;
@@ -1236,7 +1232,7 @@ restart:
        * the fd_set changed */
       if (!client->source) {
         client->source =
-            g_socket_create_source (client->socket,
+            g_socket_create_source (client->handle.socket,
             G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP,
             sink->cancellable);
         g_source_set_callback (client->source,
@@ -1325,7 +1321,7 @@ restart:
  * garbage list and removed.
  */
 static gboolean
-gst_multi_socket_sink_socket_condition (GSocket * socket,
+gst_multi_socket_sink_socket_condition (GstMultiSinkHandle handle,
     GIOCondition condition, GstMultiSocketSink * sink)
 {
   GList *clink;
@@ -1337,7 +1333,7 @@ gst_multi_socket_sink_socket_condition (GSocket * socket,
       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
 
   CLIENTS_LOCK (sink);
-  clink = g_hash_table_lookup (sink->socket_hash, socket);
+  clink = g_hash_table_lookup (sink->socket_hash, handle.socket);
   if (clink == NULL) {
     ret = FALSE;
     goto done;
@@ -1354,7 +1350,7 @@ gst_multi_socket_sink_socket_condition (GSocket * socket,
   }
 
   if ((condition & G_IO_ERR)) {
-    GST_WARNING_OBJECT (sink, "Socket %p has error", client->socket);
+    GST_WARNING_OBJECT (sink, "Socket %p has error", mhclient->debug);
     mhclient->status = GST_CLIENT_STATUS_ERROR;
     mhsinkclass->remove_client_link (mhsink, clink);
     ret = FALSE;
@@ -1500,7 +1496,7 @@ gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink)
     if (client->source)
       continue;
     client->source =
-        g_socket_create_source (client->socket,
+        g_socket_create_source (client->handle.socket,
         G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP,
         mssink->cancellable);
     g_source_set_callback (client->source,
index 1a0222b..2da288f 100644 (file)
@@ -55,7 +55,7 @@ typedef struct _GstMultiSocketSinkClass GstMultiSocketSinkClass;
 typedef struct {
   GstMultiHandleClient client;
 
-  GSocket *socket;
+  GstMultiSinkHandle handle;
   GSource *source;
 } GstSocketClient;
 
@@ -80,34 +80,34 @@ struct _GstMultiSocketSinkClass {
   GstMultiHandleSinkClass parent_class;
 
   /* element methods */
-  void          (*add)          (GstMultiSocketSink *sink, GSocket *socket);
-  void          (*add_full)     (GstMultiSocketSink *sink, GSocket *socket, GstSyncMethod sync,
+  void          (*add)          (GstMultiSocketSink *sink, GstMultiSinkHandle handle);
+  void          (*add_full)     (GstMultiSocketSink *sink, GstMultiSinkHandle handle, GstSyncMethod sync,
                                 GstFormat format, guint64 value,
                                 GstFormat max_format, guint64 max_value);
-  void          (*remove)       (GstMultiSocketSink *sink, GSocket *socket);
-  void          (*remove_flush) (GstMultiSocketSink *sink, GSocket *socket);
+  void          (*remove)       (GstMultiSocketSink *sink, GstMultiSinkHandle handle);
+  void          (*remove_flush) (GstMultiSocketSink *sink, GstMultiSinkHandle handle);
   void          (*clear)        (GstMultiSocketSink *sink);
-  GstStructure* (*get_stats)    (GstMultiSocketSink *sink, GSocket *socket);
+  GstStructure* (*get_stats)    (GstMultiSocketSink *sink, GstMultiSinkHandle handle);
 
   /* vtable */
-  void (*removed) (GstMultiSocketSink *sink, GSocket *socket);
+  void (*removed) (GstMultiHandleSink *sink, GstMultiSinkHandle handle);
 
   /* signals */
-  void (*client_added) (GstElement *element, GSocket *socket);
-  void (*client_removed) (GstElement *element, GSocket *socket, GstClientStatus status);
-  void (*client_socket_removed) (GstElement *element, GSocket *socket);
+  void (*client_added) (GstElement *element, GstMultiSinkHandle handle);
+  void (*client_removed) (GstElement *element, GstMultiSinkHandle handle, GstClientStatus status);
+  void (*client_socket_removed) (GstElement *element, GstMultiSinkHandle handle);
 };
 
 GType gst_multi_socket_sink_get_type (void);
 
-void          gst_multi_socket_sink_add          (GstMultiSocketSink *sink, GSocket *socket);
-void          gst_multi_socket_sink_add_full     (GstMultiSocketSink *sink, GSocket *socket, GstSyncMethod sync,
+void          gst_multi_socket_sink_add          (GstMultiSocketSink *sink, GstMultiSinkHandle handle);
+void          gst_multi_socket_sink_add_full     (GstMultiSocketSink *sink, GstMultiSinkHandle handle, GstSyncMethod sync,
                                               GstFormat min_format, guint64 min_value,
                                               GstFormat max_format, guint64 max_value);
-void          gst_multi_socket_sink_remove       (GstMultiSocketSink *sink, GSocket *socket);
-void          gst_multi_socket_sink_remove_flush (GstMultiSocketSink *sink, GSocket *socket);
+void          gst_multi_socket_sink_remove       (GstMultiSocketSink *sink, GstMultiSinkHandle handle);
+void          gst_multi_socket_sink_remove_flush (GstMultiSocketSink *sink, GstMultiSinkHandle handle);
 void          gst_multi_socket_sink_clear        (GstMultiHandleSink *sink);
-GstStructure*  gst_multi_socket_sink_get_stats    (GstMultiSocketSink *sink, GSocket *socket);
+GstStructure*  gst_multi_socket_sink_get_stats    (GstMultiSocketSink *sink, GstMultiSinkHandle handle);
 
 G_END_DECLS
 
index 62acaaf..451de74 100644 (file)
@@ -59,8 +59,8 @@ static void gst_tcp_server_sink_finalize (GObject * gobject);
 
 static gboolean gst_tcp_server_sink_init_send (GstMultiHandleSink * this);
 static gboolean gst_tcp_server_sink_close (GstMultiHandleSink * this);
-static void gst_tcp_server_sink_removed (GstMultiSocketSink * sink,
-    GSocket * socket);
+static void gst_tcp_server_sink_removed (GstMultiHandleSink * sink,
+    GstMultiSinkHandle handle);
 
 static void gst_tcp_server_sink_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
@@ -147,7 +147,8 @@ gst_tcp_server_sink_handle_server_read (GstTCPServerSink * sink)
   if (!client_socket)
     goto accept_failed;
 
-  gst_multi_socket_sink_add (GST_MULTI_SOCKET_SINK (sink), client_socket);
+  gst_multi_socket_sink_add (GST_MULTI_SOCKET_SINK (sink),
+      (GstMultiSinkHandle) client_socket);
 
 #ifndef GST_DISABLE_GST_DEBUG
   {
@@ -178,7 +179,8 @@ accept_failed:
 }
 
 static void
-gst_tcp_server_sink_removed (GstMultiSocketSink * sink, GSocket * socket)
+gst_tcp_server_sink_removed (GstMultiHandleSink * sink,
+    GstMultiSinkHandle handle)
 {
 #ifndef GST_DISABLE_GST_DEBUG
   GstTCPServerSink *this = GST_TCP_SERVER_SINK (sink);
@@ -187,7 +189,7 @@ gst_tcp_server_sink_removed (GstMultiSocketSink * sink, GSocket * socket)
 
   GST_DEBUG_OBJECT (this, "closing socket");
 
-  if (!g_socket_close (socket, &err)) {
+  if (!g_socket_close (handle.socket, &err)) {
     GST_ERROR_OBJECT (this, "Failed to close socket: %s", err->message);
     g_clear_error (&err);
   }