From fa14beb88bcb34eb9d01cbba72fdaa6c2c569cd0 Mon Sep 17 00:00:00 2001 From: Thomas Vander Stichele Date: Sat, 28 Jan 2012 11:02:21 +0100 Subject: [PATCH] multihandlesink: rework to use Handle --- gst/tcp/gstmultifdsink.c | 129 +++++++++++++++++++++++++++---------------- gst/tcp/gstmultifdsink.h | 4 +- gst/tcp/gstmultihandlesink.h | 3 + gst/tcp/gstmultisocketsink.c | 115 ++++++++++++++++++++++++-------------- gst/tcp/gstmultisocketsink.h | 3 +- 5 files changed, 162 insertions(+), 92 deletions(-) diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c index 006c344..d2ea37d 100644 --- a/gst/tcp/gstmultifdsink.c +++ b/gst/tcp/gstmultifdsink.c @@ -110,6 +110,8 @@ #include #endif +// FIXME: remove +#include #include #include #include @@ -195,6 +197,8 @@ static void gst_multi_fd_sink_queue_buffer (GstMultiHandleSink * mhsink, static gboolean gst_multi_fd_sink_client_queue_buffer (GstMultiHandleSink * mhsink, GstMultiHandleClient * mhclient, GstBuffer * buffer); static int gst_multi_fd_sink_client_get_fd (GstMultiHandleClient * client); +static void +gst_multi_fd_sink_handle_debug (GstMultiSinkHandle handle, gchar debug[30]); static void gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link); @@ -413,6 +417,8 @@ gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) GST_DEBUG_FUNCPTR (gst_multi_fd_sink_client_queue_buffer); gstmultihandlesink_class->client_get_fd = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_client_get_fd); + gstmultihandlesink_class->handle_debug = + GST_DEBUG_FUNCPTR (gst_multi_fd_sink_handle_debug); gstmultihandlesink_class->remove_client_link = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove_client_link); @@ -432,7 +438,7 @@ gst_multi_fd_sink_init (GstMultiFdSink * this) { this->mode = DEFAULT_MODE; - this->fd_hash = g_hash_table_new (g_int_hash, g_int_equal); + this->handle_hash = g_hash_table_new (g_int_hash, g_int_equal); this->handle_read = DEFAULT_HANDLE_READ; } @@ -442,7 +448,7 @@ gst_multi_fd_sink_finalize (GObject * object) { GstMultiFdSink *this = GST_MULTI_FD_SINK (object); - g_hash_table_destroy (this->fd_hash); + g_hash_table_destroy (this->handle_hash); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -452,7 +458,13 @@ gst_multi_fd_sink_client_get_fd (GstMultiHandleClient * client) { GstTCPClient *tclient = (GstTCPClient *) client; - return tclient->fd.fd; + return tclient->gfd.fd; +} + +static void +gst_multi_fd_sink_handle_debug (GstMultiSinkHandle handle, gchar debug[30]) +{ + g_snprintf (debug, 30, "[fd %5d]", handle.fd); } /* "add-full" signal implementation */ @@ -469,10 +481,14 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, GstMultiSinkHandle handle, GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); // FIXME: convert to a function so we can vfunc this int fd = handle.fd; + gchar debug[30]; + GstMultiHandleSinkClass *mhsinkclass = + GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink); + mhsinkclass->handle_debug (handle, debug); GST_DEBUG_OBJECT (sink, "%s adding client, sync_method %d, " "min_format %d, min_value %" G_GUINT64_FORMAT - ", max_format %d, max_value %" G_GUINT64_FORMAT, mhclient->debug, + ", max_format %d, max_value %" G_GUINT64_FORMAT, debug, sync_method, min_format, min_value, max_format, max_value); /* do limits check if we can */ @@ -485,9 +501,11 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, GstMultiSinkHandle handle, client = g_new0 (GstTCPClient, 1); mhclient = (GstMultiHandleClient *) client; gst_multi_handle_sink_client_init (mhclient, sync_method); - g_snprintf (mhclient->debug, 30, "[fd %5d]", fd); + strncpy (mhclient->debug, debug, 30); - client->fd.fd = fd; + gst_poll_fd_init (&client->gfd); + client->gfd.fd = fd; + mhclient->handle.fd = fd; mhclient->burst_min_format = min_format; mhclient->burst_min_value = min_value; mhclient->burst_max_format = max_format; @@ -496,13 +514,13 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, GstMultiSinkHandle handle, CLIENTS_LOCK (sink); /* check the hash to find a duplicate fd */ - clink = g_hash_table_lookup (sink->fd_hash, &client->fd.fd); + clink = g_hash_table_lookup (sink->handle_hash, &client->gfd.fd); if (clink != NULL) goto duplicate; /* we can add the fd now */ clink = mhsink->clients = g_list_prepend (mhsink->clients, client); - g_hash_table_insert (sink->fd_hash, &client->fd.fd, clink); + g_hash_table_insert (sink->handle_hash, &client->gfd.fd, clink); mhsink->clients_cookie++; /* set the socket to non blocking */ @@ -512,13 +530,13 @@ gst_multi_fd_sink_add_full (GstMultiFdSink * sink, GstMultiSinkHandle handle, } /* we always read from a client */ - gst_poll_add_fd (sink->fdset, &client->fd); + gst_poll_add_fd (sink->fdset, &client->gfd); /* we don't try to read from write only fds */ if (sink->handle_read) { flags = fcntl (fd, F_GETFL, 0); if ((flags & O_ACCMODE) != O_WRONLY) { - gst_poll_fd_ctl_read (sink->fdset, &client->fd, TRUE); + gst_poll_fd_ctl_read (sink->fdset, &client->gfd, TRUE); } } /* figure out the mode, can't use send() for non sockets */ @@ -578,12 +596,14 @@ gst_multi_fd_sink_remove (GstMultiFdSink * sink, GstMultiSinkHandle handle) GstMultiHandleSinkClass *mhsinkclass = GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink); // FIXME: convert to a function so we can vfunc this + gchar debug[30]; int fd = handle.fd; - GST_DEBUG_OBJECT (sink, "%s removing client", fd); + mhsinkclass->handle_debug (handle, debug); + GST_DEBUG_OBJECT (sink, "%s removing client", debug); CLIENTS_LOCK (sink); - clink = g_hash_table_lookup (sink->fd_hash, &fd); + clink = g_hash_table_lookup (sink->handle_hash, &fd); if (clink != NULL) { GstTCPClient *client = (GstTCPClient *) clink->data; GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; @@ -591,7 +611,7 @@ gst_multi_fd_sink_remove (GstMultiFdSink * sink, GstMultiSinkHandle handle) if (mhclient->status != GST_CLIENT_STATUS_OK) { GST_INFO_OBJECT (sink, "%s Client already disconnecting with status %d", - fd, mhclient->status); + debug, mhclient->status); goto done; } @@ -600,7 +620,7 @@ gst_multi_fd_sink_remove (GstMultiFdSink * sink, GstMultiSinkHandle handle) // FIXME: specific poll gst_poll_restart (sink->fdset); } else { - GST_WARNING_OBJECT (sink, "%s no client with this fd found!", fd); + GST_WARNING_OBJECT (sink, "%s no client with this fd found!", debug); } done: @@ -614,12 +634,18 @@ gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, { GList *clink; // FIXME: convert to a function so we can vfunc this + gchar debug[30]; int fd = handle.fd; + GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); + GstMultiHandleSinkClass *mhsinkclass = + GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink); + + mhsinkclass->handle_debug (handle, debug); - GST_DEBUG_OBJECT (sink, "%s flushing client", fd); + GST_DEBUG_OBJECT (sink, "%s flushing client", debug); CLIENTS_LOCK (sink); - clink = g_hash_table_lookup (sink->fd_hash, &fd); + clink = g_hash_table_lookup (sink->handle_hash, &fd); if (clink != NULL) { GstTCPClient *client = (GstTCPClient *) clink->data; GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; @@ -627,7 +653,7 @@ gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, if (mhclient->status != GST_CLIENT_STATUS_OK) { GST_INFO_OBJECT (sink, "%s Client already disconnecting with status %d", - fd, mhclient->status); + debug, mhclient->status); goto done; } @@ -639,7 +665,7 @@ gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, * it might have some buffers to flush in the ->sending queue. */ mhclient->status = GST_CLIENT_STATUS_FLUSHING; } else { - GST_WARNING_OBJECT (sink, "%s no client with this fd found!", fd); + GST_WARNING_OBJECT (sink, "%s no client with this fd found!", debug); } done: CLIENTS_UNLOCK (sink); @@ -673,10 +699,16 @@ gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, GstMultiSinkHandle handle) GValueArray *result = NULL; GList *clink; // FIXME: convert to a function so we can vfunc this + gchar debug[30]; int fd = handle.fd; + GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); + GstMultiHandleSinkClass *mhsinkclass = + GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink); + + mhsinkclass->handle_debug (handle, debug); CLIENTS_LOCK (sink); - clink = g_hash_table_lookup (sink->fd_hash, &fd); + clink = g_hash_table_lookup (sink->handle_hash, &fd); if (clink == NULL) goto noclient; @@ -735,7 +767,7 @@ noclient: /* python doesn't like a NULL pointer yet */ if (result == NULL) { - GST_WARNING_OBJECT (sink, "%s no client with this found!", fd); + GST_WARNING_OBJECT (sink, "%s no client with this found!", debug); result = g_value_array_new (0); } @@ -755,7 +787,7 @@ gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link) GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; GstMultiFdSink *mfsink = GST_MULTI_FD_SINK (sink); GstMultiFdSinkClass *fclass = GST_MULTI_FD_SINK_GET_CLASS (sink); - int fd = client->fd.fd; + int fd = client->gfd.fd; if (mhclient->currently_removing) { GST_WARNING_OBJECT (sink, "%s client is already being removed", @@ -797,7 +829,7 @@ gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link) break; } - gst_poll_remove_fd (mfsink->fdset, &client->fd); + gst_poll_remove_fd (mfsink->fdset, &client->gfd); g_get_current_time (&now); mhclient->disconnect_time = GST_TIMEVAL_TO_TIME (now); @@ -824,9 +856,9 @@ gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link) /* fd cannot be reused in the above signal callback so we can safely * remove it from the hashtable here */ - if (!g_hash_table_remove (mfsink->fd_hash, &client->fd.fd)) { + if (!g_hash_table_remove (mfsink->handle_hash, &client->gfd.fd)) { GST_WARNING_OBJECT (sink, - "%s error removing client %p from hash", client->fd.fd, client); + "%s error removing client %p from hash", mhclient->debug, client); } /* after releasing the lock above, the link could be invalid, more * precisely, the next and prev pointers could point to invalid list @@ -837,7 +869,7 @@ gst_multi_fd_sink_remove_client_link (GstMultiHandleSink * sink, GList * link) sink->clients_cookie++; if (fclass->removed) - fclass->removed (mfsink, (GstMultiSinkHandle) client->fd.fd); + fclass->removed (mfsink, (GstMultiSinkHandle) client->gfd.fd); g_free (client); CLIENTS_UNLOCK (sink); @@ -860,23 +892,24 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink, gboolean ret; GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; - fd = client->fd.fd; + fd = client->gfd.fd; if (ioctl (fd, FIONREAD, &avail) < 0) goto ioctl_failed; GST_DEBUG_OBJECT (sink, "%s select reports client read of %d bytes", - fd, avail); + mhclient->debug, avail); ret = TRUE; if (avail == 0) { /* client sent close, so remove it */ - GST_DEBUG_OBJECT (sink, "%s client asked for close, removing", fd); + GST_DEBUG_OBJECT (sink, "%s client asked for close, removing", + mhclient->debug); mhclient->status = GST_CLIENT_STATUS_CLOSED; ret = FALSE; } else if (avail < 0) { - GST_WARNING_OBJECT (sink, "%s avail < 0, removing", fd); + GST_WARNING_OBJECT (sink, "%s avail < 0, removing", mhclient->debug); mhclient->status = GST_CLIENT_STATUS_ERROR; ret = FALSE; } else { @@ -891,17 +924,18 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink, gint to_read = MIN (avail, 512); GST_DEBUG_OBJECT (sink, "%s client wants us to read %d bytes", - fd, to_read); + mhclient->debug, to_read); nread = read (fd, dummy, to_read); if (nread < -1) { GST_WARNING_OBJECT (sink, "%s could not read %d bytes: %s (%d)", - fd, to_read, g_strerror (errno), errno); + mhclient->debug, to_read, g_strerror (errno), errno); mhclient->status = GST_CLIENT_STATUS_ERROR; ret = FALSE; break; } else if (nread == 0) { - GST_WARNING_OBJECT (sink, "%s 0 bytes in read, removing", fd); + GST_WARNING_OBJECT (sink, "%s 0 bytes in read, removing", + mhclient->debug); mhclient->status = GST_CLIENT_STATUS_ERROR; ret = FALSE; break; @@ -916,7 +950,7 @@ gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink, ioctl_failed: { GST_WARNING_OBJECT (sink, "%s ioctl failed: %s (%d)", - fd, g_strerror (errno), errno); + mhclient->debug, g_strerror (errno), errno); mhclient->status = GST_CLIENT_STATUS_ERROR; return FALSE; } @@ -1066,7 +1100,7 @@ static gboolean gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, GstTCPClient * client) { - GstMultiSinkHandle handle = (GstMultiSinkHandle) client->fd.fd; + GstMultiSinkHandle handle = (GstMultiSinkHandle) client->gfd.fd; gboolean more; gboolean flushing; GstClockTime now; @@ -1092,7 +1126,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, /* client is too fast, remove from write queue until new buffer is * available */ // FIXME: specific - gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE); + gst_poll_fd_ctl_write (sink->fdset, &client->gfd, FALSE); // /* if we flushed out all of the client buffers, we can stop */ if (mhclient->flushcount == 0) @@ -1116,7 +1150,7 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, } else { /* cannot send data to this client yet */ // FIXME: specific - gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE); + gst_poll_fd_ctl_write (sink->fdset, &client->gfd, FALSE); return TRUE; } } @@ -1218,13 +1252,14 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, /* ERRORS */ flushed: { - GST_DEBUG_OBJECT (sink, "%s flushed, removing", fd); + GST_DEBUG_OBJECT (sink, "%s flushed, removing", mhclient->debug); mhclient->status = GST_CLIENT_STATUS_REMOVED; return FALSE; } connection_reset: { - GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing", fd); + GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing", + mhclient->debug); mhclient->status = GST_CLIENT_STATUS_CLOSED; return FALSE; } @@ -1347,7 +1382,7 @@ restart: } else if (mhclient->bufpos == 0 || mhclient->new_connection) { /* can send data to this client now. need to signal the select thread that * the fd_set changed */ - gst_poll_fd_ctl_write (sink->fdset, &client->fd, TRUE); + gst_poll_fd_ctl_write (sink->fdset, &client->gfd, TRUE); need_signal = TRUE; } /* keep track of maximum buffer usage */ @@ -1516,7 +1551,7 @@ gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink) mhclient = (GstMultiHandleClient *) client; next = g_list_next (clients); - fd = client->fd.fd; + fd = client->gfd.fd; res = fcntl (fd, F_GETFL, &flags); if (res == -1) { @@ -1578,25 +1613,25 @@ restart2: continue; } - if (gst_poll_fd_has_closed (sink->fdset, &client->fd)) { + if (gst_poll_fd_has_closed (sink->fdset, &client->gfd)) { mhclient->status = GST_CLIENT_STATUS_CLOSED; mhsinkclass->remove_client_link (mhsink, clients); continue; } - if (gst_poll_fd_has_error (sink->fdset, &client->fd)) { - GST_WARNING_OBJECT (sink, "gst_poll_fd_has_error for %d", client->fd.fd); + if (gst_poll_fd_has_error (sink->fdset, &client->gfd)) { + GST_WARNING_OBJECT (sink, "gst_poll_fd_has_error for %d", client->gfd.fd); mhclient->status = GST_CLIENT_STATUS_ERROR; mhsinkclass->remove_client_link (mhsink, clients); continue; } - if (gst_poll_fd_can_read (sink->fdset, &client->fd)) { + if (gst_poll_fd_can_read (sink->fdset, &client->gfd)) { /* handle client read */ if (!gst_multi_fd_sink_handle_client_read (sink, client)) { mhsinkclass->remove_client_link (mhsink, clients); continue; } } - if (gst_poll_fd_can_write (sink->fdset, &client->fd)) { + if (gst_poll_fd_can_write (sink->fdset, &client->gfd)) { /* handle client write */ if (!gst_multi_fd_sink_handle_client_write (sink, client)) { mhsinkclass->remove_client_link (mhsink, clients); @@ -1658,7 +1693,7 @@ gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value, g_value_set_boolean (value, multifdsink->handle_read); break; case PROP_NUM_FDS: - g_value_set_uint (value, g_hash_table_size (multifdsink->fd_hash)); + g_value_set_uint (value, g_hash_table_size (multifdsink->handle_hash)); break; default: @@ -1711,6 +1746,6 @@ gst_multi_fd_sink_stop_post (GstMultiHandleSink * mhsink) gst_poll_free (mfsink->fdset); mfsink->fdset = NULL; } - g_hash_table_foreach_remove (mfsink->fd_hash, multifdsink_hash_remove, + g_hash_table_foreach_remove (mfsink->handle_hash, multifdsink_hash_remove, mfsink); } diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultifdsink.h index 5f6a5d9..3d3a6db 100644 --- a/gst/tcp/gstmultifdsink.h +++ b/gst/tcp/gstmultifdsink.h @@ -52,7 +52,7 @@ typedef struct _GstMultiFdSinkClass GstMultiFdSinkClass; typedef struct { GstMultiHandleClient client; - GstPollFD fd; + GstPollFD gfd; gboolean is_socket; } GstTCPClient; @@ -66,7 +66,7 @@ struct _GstMultiFdSink { GstMultiHandleSink element; /*< private >*/ - GHashTable *fd_hash; /* index on fd to client */ + GHashTable *handle_hash; /* index on fd to client */ gint mode; GstPoll *fdset; diff --git a/gst/tcp/gstmultihandlesink.h b/gst/tcp/gstmultihandlesink.h index bf273ba..535c226 100644 --- a/gst/tcp/gstmultihandlesink.h +++ b/gst/tcp/gstmultihandlesink.h @@ -130,6 +130,8 @@ typedef union /* structure for a client */ typedef struct { + GstMultiSinkHandle handle; + gchar debug[30]; /* a debug string used in debug calls to identify the client */ gint bufpos; /* position of this client in the global queue */ @@ -278,6 +280,7 @@ struct _GstMultiHandleSinkClass { GstBuffer *buffer); int (*client_get_fd) (GstMultiHandleClient *client); + void (*handle_debug) (GstMultiSinkHandle handle, gchar debug[30]); GstStructure* (*get_stats) (GstMultiHandleSink *sink, GstMultiSinkHandle handle); diff --git a/gst/tcp/gstmultisocketsink.c b/gst/tcp/gstmultisocketsink.c index d3f640c..f3b94d4 100644 --- a/gst/tcp/gstmultisocketsink.c +++ b/gst/tcp/gstmultisocketsink.c @@ -106,6 +106,8 @@ #include +#include + #include "gstmultisocketsink.h" #include "gsttcp-marshal.h" @@ -156,6 +158,8 @@ static void gst_multi_socket_sink_queue_buffer (GstMultiHandleSink * mhsink, static gboolean gst_multi_socket_sink_client_queue_buffer (GstMultiHandleSink * mhsink, GstMultiHandleClient * mhclient, GstBuffer * buffer); static int gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client); +static void gst_multi_socket_sink_handle_debug (GstMultiSinkHandle handle, + gchar debug[30]); static void gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink, @@ -358,6 +362,8 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass) GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_queue_buffer); gstmultihandlesink_class->client_get_fd = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_get_fd); + gstmultihandlesink_class->handle_debug = + GST_DEBUG_FUNCPTR (gst_multi_socket_sink_handle_debug); gstmultihandlesink_class->remove_client_link = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_client_link); @@ -375,7 +381,7 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass) static void gst_multi_socket_sink_init (GstMultiSocketSink * this) { - this->socket_hash = g_hash_table_new (g_direct_hash, g_int_equal); + this->handle_hash = g_hash_table_new (g_direct_hash, g_int_equal); this->cancellable = g_cancellable_new (); } @@ -385,7 +391,7 @@ gst_multi_socket_sink_finalize (GObject * object) { GstMultiSocketSink *this = GST_MULTI_SOCKET_SINK (object); - g_hash_table_destroy (this->socket_hash); + g_hash_table_destroy (this->handle_hash); if (this->cancellable) { g_object_unref (this->cancellable); this->cancellable = NULL; @@ -397,11 +403,16 @@ gst_multi_socket_sink_finalize (GObject * object) static int gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client) { - GstSocketClient *msclient = (GstSocketClient *) client; + return g_socket_get_fd (client->handle.socket); +} - return g_socket_get_fd (msclient->handle.socket); +static void +gst_multi_socket_sink_handle_debug (GstMultiSinkHandle handle, gchar debug[30]) +{ + g_snprintf (debug, 30, "[socket %p]", handle.socket); } + /* "add-full" signal implementation */ void gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, @@ -412,12 +423,17 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GstMultiHandleClient *mhclient; GList *clink; GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); + gchar debug[30]; + GstMultiHandleSinkClass *mhsinkclass = + GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink); + // FIXME: remove assert g_assert (G_IS_SOCKET (handle.socket)); + mhsinkclass->handle_debug (handle, debug); GST_DEBUG_OBJECT (sink, "%s adding client, sync_method %d, " "min_format %d, min_value %" G_GUINT64_FORMAT - ", max_format %d, max_value %" G_GUINT64_FORMAT, handle.socket, + ", max_format %d, max_value %" G_GUINT64_FORMAT, debug, sync_method, min_format, min_value, max_format, max_value); /* do limits check if we can */ @@ -430,8 +446,8 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, client = g_new0 (GstSocketClient, 1); mhclient = (GstMultiHandleClient *) client; gst_multi_handle_sink_client_init (mhclient, sync_method); - g_snprintf (mhclient->debug, 30, "[socket %p]", handle.socket); - client->handle.socket = G_SOCKET (g_object_ref (handle.socket)); + strncpy (mhclient->debug, debug, 30); + mhclient->handle.socket = G_SOCKET (g_object_ref (handle.socket)); mhclient->burst_min_format = min_format; mhclient->burst_min_value = min_value; @@ -441,13 +457,13 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, CLIENTS_LOCK (sink); /* check the hash to find a duplicate fd */ - clink = g_hash_table_lookup (sink->socket_hash, handle.socket); + clink = g_hash_table_lookup (sink->handle_hash, handle.socket); if (clink != NULL) goto duplicate; /* we can add the fd now */ clink = mhsink->clients = g_list_prepend (mhsink->clients, client); - g_hash_table_insert (sink->socket_hash, handle.socket, clink); + g_hash_table_insert (sink->handle_hash, handle.socket, clink); mhsink->clients_cookie++; /* set the socket to non blocking */ @@ -456,7 +472,7 @@ gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, /* we always read from a client */ if (sink->main_context) { client->source = - g_socket_create_source (client->handle.socket, + g_socket_create_source (mhclient->handle.socket, G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, sink->cancellable); g_source_set_callback (client->source, (GSourceFunc) gst_multi_socket_sink_socket_condition, @@ -517,12 +533,14 @@ gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); GstMultiHandleSinkClass *mhsinkclass = GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink); + gchar debug[30]; + mhsinkclass->handle_debug (handle, debug); // FIXME; how to vfunc this ? - GST_DEBUG_OBJECT (sink, "[socket %p] removing client", handle.socket); + GST_DEBUG_OBJECT (sink, "%s removing client", debug); CLIENTS_LOCK (sink); - clink = g_hash_table_lookup (sink->socket_hash, handle.socket); + clink = g_hash_table_lookup (sink->handle_hash, handle.socket); if (clink != NULL) { GstSocketClient *client = clink->data; GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; @@ -537,8 +555,7 @@ gst_multi_socket_sink_remove (GstMultiSocketSink * sink, mhclient->status = GST_CLIENT_STATUS_REMOVED; mhsinkclass->remove_client_link (GST_MULTI_HANDLE_SINK (sink), clink); } else { - GST_WARNING_OBJECT (sink, "[socket %p] no client with this socket found!", - handle.socket); + GST_WARNING_OBJECT (sink, "%s no client with this socket found!", debug); } done: @@ -551,11 +568,17 @@ gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, GstMultiSinkHandle handle) { GList *clink; + GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); + GstMultiHandleSinkClass *mhsinkclass = + GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink); + gchar debug[30]; + + mhsinkclass->handle_debug (handle, debug); - GST_DEBUG_OBJECT (sink, "[socket %p] flushing client", handle.socket); + GST_DEBUG_OBJECT (sink, "%s flushing client", debug); CLIENTS_LOCK (sink); - clink = g_hash_table_lookup (sink->socket_hash, handle.socket); + clink = g_hash_table_lookup (sink->handle_hash, handle.socket); if (clink != NULL) { GstSocketClient *client = clink->data; GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; @@ -563,7 +586,7 @@ gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, if (mhclient->status != GST_CLIENT_STATUS_OK) { GST_INFO_OBJECT (sink, "%s Client already disconnecting with status %d", - socket, mhclient->status); + mhclient->debug, mhclient->status); goto done; } @@ -575,7 +598,7 @@ gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, * it might have some buffers to flush in the ->sending queue. */ mhclient->status = GST_CLIENT_STATUS_FLUSHING; } else { - GST_WARNING_OBJECT (sink, "%s no client with this fd found!", socket); + GST_WARNING_OBJECT (sink, "%s no client with this fd found!", debug); } done: CLIENTS_UNLOCK (sink); @@ -590,9 +613,15 @@ gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink, GstSocketClient *client; GstStructure *result = NULL; GList *clink; + GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink); + GstMultiHandleSinkClass *mhsinkclass = + GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink); + gchar debug[30]; + + mhsinkclass->handle_debug (handle, debug); CLIENTS_LOCK (sink); - clink = g_hash_table_lookup (sink->socket_hash, handle.socket); + clink = g_hash_table_lookup (sink->handle_hash, handle.socket); if (clink == NULL) goto noclient; @@ -629,7 +658,7 @@ noclient: /* python doesn't like a NULL pointer yet */ if (result == NULL) { - GST_WARNING_OBJECT (sink, "%s no client with this found!", socket); + GST_WARNING_OBJECT (sink, "%s no client with this found!", debug); result = gst_structure_new_empty ("multisocketsink-stats"); } @@ -716,7 +745,7 @@ gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink, CLIENTS_UNLOCK (sink); g_signal_emit (G_OBJECT (sink), - gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, client->handle, + gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED], 0, mhclient->handle, mhclient->status); /* lock again before we remove the client completely */ @@ -724,9 +753,9 @@ gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink, /* fd cannot be reused in the above signal callback so we can safely * remove it from the hashtable here */ - if (!g_hash_table_remove (mssink->socket_hash, client->handle.socket)) { + if (!g_hash_table_remove (mssink->handle_hash, mhclient->handle.socket)) { GST_WARNING_OBJECT (sink, - "%s error removing client %p from hash", mhclient, client); + "%s error removing client %p from hash", mhclient->debug, client); } /* after releasing the lock above, the link could be invalid, more * precisely, the next and prev pointers could point to invalid list @@ -737,16 +766,18 @@ gst_multi_socket_sink_remove_client_link (GstMultiHandleSink * sink, sink->clients_cookie++; if (fclass->removed) - fclass->removed (sink, client->handle); + fclass->removed (sink, mhclient->handle); - g_free (client); CLIENTS_UNLOCK (sink); /* and the fd is really gone now */ g_signal_emit (G_OBJECT (sink), gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED], 0, - client->handle); - g_object_unref (client->handle.socket); + mhclient->handle); + g_assert (G_IS_SOCKET (mhclient->handle.socket)); + g_object_unref (mhclient->handle.socket); + + g_free (client); CLIENTS_LOCK (sink); } @@ -777,12 +808,12 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink, GST_DEBUG_OBJECT (sink, "%s client wants us to read", mhclient->debug); - navail = g_socket_get_available_bytes (client->handle.socket); + navail = g_socket_get_available_bytes (mhclient->handle.socket); if (navail < 0) break; nread = - g_socket_receive (client->handle.socket, dummy, MIN (navail, + g_socket_receive (mhclient->handle.socket, dummy, MIN (navail, sizeof (dummy)), sink->cancellable, &err); if (first && nread == 0) { /* client sent close, so remove it */ @@ -1059,7 +1090,7 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, /* try to write the complete buffer */ wrote = - g_socket_send (client->handle.socket, + g_socket_send (mhclient->handle.socket, (gchar *) map.data + mhclient->bufoffset, maxsize, sink->cancellable, &err); gst_buffer_unmap (head, &map); @@ -1100,13 +1131,14 @@ gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink, /* ERRORS */ flushed: { - GST_DEBUG_OBJECT (sink, "%s flushed, removing", socket); + GST_DEBUG_OBJECT (sink, "%s flushed, removing", mhclient->debug); mhclient->status = GST_CLIENT_STATUS_REMOVED; return FALSE; } connection_reset: { - GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing", socket); + GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing", + mhclient->debug); mhclient->status = GST_CLIENT_STATUS_CLOSED; g_clear_error (&err); return FALSE; @@ -1114,7 +1146,8 @@ connection_reset: write_error: { GST_WARNING_OBJECT (sink, - "%s could not write, removing client: %s", socket, err->message); + "%s could not write, removing client: %s", mhclient->debug, + err->message); g_clear_error (&err); mhclient->status = GST_CLIENT_STATUS_ERROR; return FALSE; @@ -1232,7 +1265,7 @@ restart: * the fd_set changed */ if (!client->source) { client->source = - g_socket_create_source (client->handle.socket, + g_socket_create_source (mhclient->handle.socket, G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, sink->cancellable); g_source_set_callback (client->source, @@ -1333,7 +1366,7 @@ gst_multi_socket_sink_socket_condition (GstMultiSinkHandle handle, GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink); CLIENTS_LOCK (sink); - clink = g_hash_table_lookup (sink->socket_hash, handle.socket); + clink = g_hash_table_lookup (sink->handle_hash, handle.socket); if (clink == NULL) { ret = FALSE; goto done; @@ -1350,7 +1383,7 @@ gst_multi_socket_sink_socket_condition (GstMultiSinkHandle handle, } if ((condition & G_IO_ERR)) { - GST_WARNING_OBJECT (sink, "Socket %p has error", mhclient->debug); + GST_WARNING_OBJECT (sink, "%s has error", mhclient->debug); mhclient->status = GST_CLIENT_STATUS_ERROR; mhsinkclass->remove_client_link (mhsink, clink); ret = FALSE; @@ -1469,7 +1502,7 @@ gst_multi_socket_sink_get_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_NUM_SOCKETS: g_value_set_uint (value, - g_hash_table_size (multisocketsink->socket_hash)); + g_hash_table_size (multisocketsink->handle_hash)); break; default: @@ -1490,13 +1523,13 @@ gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink) CLIENTS_LOCK (mssink); for (clients = mhsink->clients; clients; clients = clients->next) { - GstSocketClient *client; + GstSocketClient *client = clients->data; + GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client; - client = clients->data; if (client->source) continue; client->source = - g_socket_create_source (client->handle.socket, + g_socket_create_source (mhclient->handle.socket, G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP, mssink->cancellable); g_source_set_callback (client->source, @@ -1535,7 +1568,7 @@ gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink) mssink->main_context = NULL; } - g_hash_table_foreach_remove (mssink->socket_hash, multisocketsink_hash_remove, + g_hash_table_foreach_remove (mssink->handle_hash, multisocketsink_hash_remove, mssink); } diff --git a/gst/tcp/gstmultisocketsink.h b/gst/tcp/gstmultisocketsink.h index 2da288f..01e821d 100644 --- a/gst/tcp/gstmultisocketsink.h +++ b/gst/tcp/gstmultisocketsink.h @@ -55,7 +55,6 @@ typedef struct _GstMultiSocketSinkClass GstMultiSocketSinkClass; typedef struct { GstMultiHandleClient client; - GstMultiSinkHandle handle; GSource *source; } GstSocketClient; @@ -68,7 +67,7 @@ struct _GstMultiSocketSink { GstMultiHandleSink element; /*< private >*/ - GHashTable *socket_hash; /* index on socket to client */ + GHashTable *handle_hash; /* index on socket to client */ GMainContext *main_context; GCancellable *cancellable; -- 2.7.4