+2004-06-27 Wim Taymans <wim@fluendo.com>
+
+ * gst/tcp/gstmultifdsink.c: (gst_recover_policy_get_type),
+ (gst_multifdsink_class_init), (gst_multifdsink_add),
+ (gst_multifdsink_remove), (gst_multifdsink_clear),
+ (gst_multifdsink_client_remove),
+ (gst_multifdsink_handle_client_read),
+ (gst_multifdsink_client_queue_data),
+ (gst_multifdsink_client_queue_caps),
+ (gst_multifdsink_client_queue_buffer),
+ (gst_multifdsink_handle_client_write),
+ (gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer),
+ (gst_multifdsink_handle_clients), (gst_multifdsink_thread),
+ (gst_multifdsink_init_send), (gst_multifdsink_close):
+ * gst/tcp/gstmultifdsink.h:
+ * gst/tcp/gsttcpserversink.c:
+ (gst_tcpserversink_handle_server_read),
+ (gst_tcpserversink_handle_select), (gst_tcpserversink_close):
+ More multifdsink fixes, more recovery policy fixes.
+ Removed stupid g_print
+
2004-06-26 Wim Taymans <wim@fluendo.com>
* gst/tcp/Makefile.am:
/* MultiFdSink signals and args */
enum
{
+ /* methods */
+ SIGNAL_ADD,
+ SIGNAL_REMOVE,
+ SIGNAL_CLEAR,
+ /* signals */
SIGNAL_CLIENT_ADDED,
SIGNAL_CLIENT_REMOVED,
LAST_SIGNAL
}
static void gst_multifdsink_base_init (gpointer g_class);
-static void gst_multifdsink_class_init (GstMultiFdSink * klass);
+static void gst_multifdsink_class_init (GstMultiFdSinkClass * klass);
static void gst_multifdsink_init (GstMultiFdSink * multifdsink);
+static void gst_multifdsink_client_remove (GstMultiFdSink * sink,
+ GstTCPClient * client);
+
static void gst_multifdsink_chain (GstPad * pad, GstData * _data);
static GstElementStateReturn gst_multifdsink_change_state (GstElement *
element);
}
static void
-gst_multifdsink_class_init (GstMultiFdSink * klass)
+gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
"How to recover when client reaches the soft max",
GST_TYPE_RECOVER_POLICY, GST_RECOVER_POLICY_NONE, G_PARAM_READWRITE));
+ gst_multifdsink_signals[SIGNAL_ADD] =
+ g_signal_new ("add", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
+ G_STRUCT_OFFSET (GstMultiFdSinkClass, add),
+ NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
+ gst_multifdsink_signals[SIGNAL_REMOVE] =
+ g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
+ G_STRUCT_OFFSET (GstMultiFdSinkClass, remove),
+ NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT);
+ gst_multifdsink_signals[SIGNAL_CLEAR] =
+ g_signal_new ("clear", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
+ G_STRUCT_OFFSET (GstMultiFdSinkClass, clear),
+ NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 0);
gst_multifdsink_signals[SIGNAL_CLIENT_ADDED] =
g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
gstelement_class->change_state = gst_multifdsink_change_state;
+ klass->add = gst_multifdsink_add;
+ klass->remove = gst_multifdsink_remove;
+ klass->clear = gst_multifdsink_clear;
+
GST_DEBUG_CATEGORY_INIT (multifdsink_debug, "multifdsink", 0, "FD sink");
}
}
}
+void
+gst_multifdsink_add (GstMultiFdSink * sink, int fd)
+{
+ GstTCPClient *client;
+
+ /* create client datastructure */
+ client = g_new0 (GstTCPClient, 1);
+ client->fd = fd;
+ client->bufpos = -1;
+ client->bufoffset = 0;
+ client->sending = NULL;
+
+ g_mutex_lock (sink->clientslock);
+ sink->clients = g_list_prepend (sink->clients, client);
+ g_mutex_unlock (sink->clientslock);
+
+ /* we always read from a client */
+ FD_SET (fd, &sink->readfds);
+
+ /* set the socket to non blocking */
+ fcntl (fd, F_SETFL, O_NONBLOCK);
+
+ g_signal_emit (G_OBJECT (sink),
+ gst_multifdsink_signals[SIGNAL_CLIENT_ADDED], 0, NULL, fd);
+}
+
+void
+gst_multifdsink_remove (GstMultiFdSink * sink, int fd)
+{
+ GList *clients;
+
+ g_mutex_lock (sink->clientslock);
+ /* loop over the clients to find the one with the fd */
+ for (clients = sink->clients; clients; clients = g_list_next (clients)) {
+ GstTCPClient *client;
+
+ client = (GstTCPClient *) clients->data;
+
+ if (client->fd == fd) {
+ gst_multifdsink_client_remove (sink, client);
+ break;
+ }
+ }
+ g_mutex_unlock (sink->clientslock);
+}
+
+void
+gst_multifdsink_clear (GstMultiFdSink * sink)
+{
+ GList *clients;
+
+ g_mutex_lock (sink->clientslock);
+ for (clients = sink->clients; clients; clients = g_list_next (clients)) {
+ GstTCPClient *client;
+
+ client = (GstTCPClient *) clients->data;
+ gst_multifdsink_client_remove (sink, client);
+ }
+ g_mutex_unlock (sink->clientslock);
+}
+
static void
gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client)
{
* another thread */
buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos);
client->bufpos--;
- gst_buffer_ref (buf);
+ /* queueing a buffer will ref it */
gst_multifdsink_client_queue_buffer (sink, client, buf);
- /* it is safe to unref now as queueing a buffer will ref it */
- gst_buffer_unref (buf);
+
/* need to start from the first byte for this new buffer */
client->bufoffset = 0;
}
return TRUE;
}
-static void
+/* calculate the new position for a client after recovery. This function
+ * does not update the client position but merely returns the required
+ * position.
+ */
+static gint
gst_multifdsink_recover_client (GstMultiFdSink * sink, GstTCPClient * client)
{
+ gint newbufpos;
+
/* FIXME: implement recover procedure here, like moving the position to
* the next keyframe, dropping buffers back to the beginning of the queue,
* stuff like that... */
case GST_RECOVER_POLICY_NONE:
/* do nothing, client will catch up or get kicked out when it reaches
* the hard max */
+ newbufpos = client->bufpos;
break;
case GST_RECOVER_POLICY_RESYNC_START:
/* move to beginning of queue */
- client->bufpos = -1;
+ newbufpos = -1;
break;
case GST_RECOVER_POLICY_RESYNC_SOFT:
/* move to beginning of soft max */
- client->bufpos = sink->buffers_soft_max;
+ newbufpos = sink->buffers_soft_max;
break;
case GST_RECOVER_POLICY_RESYNC_KEYFRAME:
/* FIXME, find keyframe in buffers */
- client->bufpos = sink->buffers_soft_max;
+ newbufpos = sink->buffers_soft_max;
+ break;
+ default:
+ newbufpos = sink->buffers_soft_max;
break;
}
+ return newbufpos;
}
/* Queue a buffer on the global queue.
client, client->fd, client->bufpos);
/* check soft max if needed, recover client */
if (sink->buffers_soft_max > 0 && client->bufpos >= sink->buffers_soft_max) {
- gst_multifdsink_recover_client (sink, client);
+ gint newpos;
+
+ newpos = gst_multifdsink_recover_client (sink, client);
+ if (newpos != client->bufpos) {
+ client->bufpos = newpos;
+ client->discont = TRUE;
+ }
}
/* check hard max, remove client */
if (sink->buffers_max > 0 && client->bufpos >= sink->buffers_max) {
gst_buffer_unref (old);
}
sink->buffers_queued = max_buffer_usage;
- g_print ("%d\n", max_buffer_usage);
g_mutex_unlock (sink->clientslock);
/* and send a signal to thread if fd_set changed */