From d09028b4c390a2f950448f91dac4d5db6e73e1a4 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 29 May 2013 17:44:30 +0200 Subject: [PATCH] rtsp: use child sources instead of using the sockets Use the source of the pollable input/output streams instead of accessing the sockets directly. --- gst-libs/gst/rtsp/gstrtspconnection.c | 369 ++++++++++++++++------------------ 1 file changed, 169 insertions(+), 200 deletions(-) diff --git a/gst-libs/gst/rtsp/gstrtspconnection.c b/gst-libs/gst/rtsp/gstrtspconnection.c index 5d467cc..b95fb5a 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.c +++ b/gst-libs/gst/rtsp/gstrtspconnection.c @@ -2730,8 +2730,8 @@ struct _GstRTSPWatch GstRTSPBuilder builder; GstRTSPMessage message; - GPollFD readfd; - GPollFD writefd; + GSource *readsrc; + GSource *writesrc; /* queued message for transmission */ guint id; @@ -2767,172 +2767,95 @@ gst_rtsp_source_prepare (GSource * source, gint * timeout) static gboolean gst_rtsp_source_check (GSource * source) { - GstRTSPWatch *watch = (GstRTSPWatch *) source; - - if (watch->readfd.revents & READ_COND) - return TRUE; - - if (watch->writefd.revents & WRITE_COND) - return TRUE; - return FALSE; } static gboolean -gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, - gpointer user_data G_GNUC_UNUSED) +gst_rtsp_source_dispatch_read (GPollableInputStream * stream, + GstRTSPWatch * watch) { - GstRTSPWatch *watch = (GstRTSPWatch *) source; GstRTSPResult res = GST_RTSP_ERROR; - gboolean keep_running = TRUE; - /* first read as much as we can */ - if (watch->readfd.revents & READ_COND || watch->conn->initial_buffer != NULL) { - do { - if (watch->readfd.revents & READ_ERR) - goto read_error; + res = build_next (&watch->builder, &watch->message, watch->conn, FALSE); + if (res == GST_RTSP_EINTR) + goto done; + else if (G_UNLIKELY (res == GST_RTSP_EEOF)) { + /* When we are in tunnelled mode, the read socket can be closed and we + * should be prepared for a new POST method to reopen it */ + if (watch->conn->tstate == TUNNEL_STATE_COMPLETE) { + /* remove the read connection for the tunnel */ + /* we accept a new POST request */ + watch->conn->tstate = TUNNEL_STATE_GET; + /* and signal that we lost our tunnel */ + if (watch->funcs.tunnel_lost) + res = watch->funcs.tunnel_lost (watch, watch->user_data); + goto read_done; + } else + goto eof; + } else if (G_LIKELY (res == GST_RTSP_OK)) { + if (!watch->conn->manual_http && + watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) { + if (watch->conn->tstate == TUNNEL_STATE_NONE && + watch->message.type_data.request.method == GST_RTSP_GET) { + GstRTSPMessage *response; + GstRTSPStatusCode code; + + watch->conn->tstate = TUNNEL_STATE_GET; - res = build_next (&watch->builder, &watch->message, watch->conn, FALSE); - if (res == GST_RTSP_EINTR) - break; - else if (G_UNLIKELY (res == GST_RTSP_EEOF)) { - watch->readfd.events = 0; - watch->readfd.revents = 0; - g_source_remove_poll ((GSource *) watch, &watch->readfd); - /* When we are in tunnelled mode, the read socket can be closed and we - * should be prepared for a new POST method to reopen it */ - if (watch->conn->tstate == TUNNEL_STATE_COMPLETE) { - /* remove the read connection for the tunnel */ - /* we accept a new POST request */ - watch->conn->tstate = TUNNEL_STATE_GET; - /* and signal that we lost our tunnel */ - if (watch->funcs.tunnel_lost) - res = watch->funcs.tunnel_lost (watch, watch->user_data); - goto read_done; - } else - goto eof; - } else if (G_LIKELY (res == GST_RTSP_OK)) { - if (!watch->conn->manual_http && - watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) { - if (watch->conn->tstate == TUNNEL_STATE_NONE && - watch->message.type_data.request.method == GST_RTSP_GET) { - GstRTSPMessage *response; - GstRTSPStatusCode code; - - watch->conn->tstate = TUNNEL_STATE_GET; - - if (watch->funcs.tunnel_start) - code = watch->funcs.tunnel_start (watch, watch->user_data); - else - code = GST_RTSP_STS_OK; - - /* queue the response */ - response = gen_tunnel_reply (watch->conn, code, &watch->message); - gst_rtsp_watch_send_message (watch, response, NULL); - gst_rtsp_message_free (response); - goto read_done; - } else if (watch->conn->tstate == TUNNEL_STATE_NONE && - watch->message.type_data.request.method == GST_RTSP_POST) { - watch->conn->tstate = TUNNEL_STATE_POST; - - /* in the callback the connection should be tunneled with the - * GET connection */ - if (watch->funcs.tunnel_complete) { - watch->funcs.tunnel_complete (watch, watch->user_data); - keep_running = !(watch->conn->read_socket == NULL && - watch->conn->write_socket == NULL); - if (!keep_running) - goto done; - } - goto read_done; - } - } - } + if (watch->funcs.tunnel_start) + code = watch->funcs.tunnel_start (watch, watch->user_data); + else + code = GST_RTSP_STS_OK; - if (!watch->conn->manual_http) { - /* if manual HTTP support is not enabled, then restore the message to - * what it would have looked like without the support for parsing HTTP - * messages being present */ - if (watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) { - watch->message.type = GST_RTSP_MESSAGE_REQUEST; - watch->message.type_data.request.method = GST_RTSP_INVALID; - if (watch->message.type_data.request.version != GST_RTSP_VERSION_1_0) - watch->message.type_data.request.version = GST_RTSP_VERSION_INVALID; - res = GST_RTSP_EPARSE; - } else if (watch->message.type == GST_RTSP_MESSAGE_HTTP_RESPONSE) { - watch->message.type = GST_RTSP_MESSAGE_RESPONSE; - if (watch->message.type_data.response.version != GST_RTSP_VERSION_1_0) - watch->message.type_data.response.version = - GST_RTSP_VERSION_INVALID; - res = GST_RTSP_EPARSE; + /* queue the response */ + response = gen_tunnel_reply (watch->conn, code, &watch->message); + gst_rtsp_watch_send_message (watch, response, NULL); + gst_rtsp_message_free (response); + goto read_done; + } else if (watch->conn->tstate == TUNNEL_STATE_NONE && + watch->message.type_data.request.method == GST_RTSP_POST) { + watch->conn->tstate = TUNNEL_STATE_POST; + + /* in the callback the connection should be tunneled with the + * GET connection */ + if (watch->funcs.tunnel_complete) { + watch->funcs.tunnel_complete (watch, watch->user_data); } + goto read_done; } + } + } else + goto read_error; - if (G_LIKELY (res == GST_RTSP_OK)) { - if (watch->funcs.message_received) - watch->funcs.message_received (watch, &watch->message, - watch->user_data); - } else { - goto read_error; - } - - read_done: - gst_rtsp_message_unset (&watch->message); - build_reset (&watch->builder); - } while (FALSE); + if (!watch->conn->manual_http) { + /* if manual HTTP support is not enabled, then restore the message to + * what it would have looked like without the support for parsing HTTP + * messages being present */ + if (watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) { + watch->message.type = GST_RTSP_MESSAGE_REQUEST; + watch->message.type_data.request.method = GST_RTSP_INVALID; + if (watch->message.type_data.request.version != GST_RTSP_VERSION_1_0) + watch->message.type_data.request.version = GST_RTSP_VERSION_INVALID; + res = GST_RTSP_EPARSE; + } else if (watch->message.type == GST_RTSP_MESSAGE_HTTP_RESPONSE) { + watch->message.type = GST_RTSP_MESSAGE_RESPONSE; + if (watch->message.type_data.response.version != GST_RTSP_VERSION_1_0) + watch->message.type_data.response.version = GST_RTSP_VERSION_INVALID; + res = GST_RTSP_EPARSE; + } } + if (G_LIKELY (res != GST_RTSP_OK)) + goto read_error; - if (watch->writefd.revents & WRITE_COND) { - if (watch->writefd.revents & WRITE_ERR) - goto write_error; - - g_mutex_lock (&watch->mutex); - do { - if (watch->write_data == NULL) { - GstRTSPRec *rec; - - /* get a new message from the queue */ - rec = g_queue_pop_tail (watch->messages); - if (rec == NULL) - break; - - watch->messages_bytes -= rec->size; - - watch->write_off = 0; - watch->write_data = rec->data; - watch->write_size = rec->size; - watch->write_id = rec->id; - - g_slice_free (GstRTSPRec, rec); - } - - res = write_bytes (watch->conn->output_stream, watch->write_data, - &watch->write_off, watch->write_size, FALSE, - watch->conn->cancellable); - g_mutex_unlock (&watch->mutex); - - if (res == GST_RTSP_EINTR) - goto write_blocked; - else if (G_LIKELY (res == GST_RTSP_OK)) { - if (watch->funcs.message_sent) - watch->funcs.message_sent (watch, watch->write_id, watch->user_data); - } else { - goto write_error; - } - g_mutex_lock (&watch->mutex); - - g_free (watch->write_data); - watch->write_data = NULL; - } while (TRUE); - - watch->writefd.events = WRITE_ERR; + if (watch->funcs.message_received) + watch->funcs.message_received (watch, &watch->message, watch->user_data); - g_mutex_unlock (&watch->mutex); - } +read_done: + gst_rtsp_message_unset (&watch->message); + build_reset (&watch->builder); done: -write_blocked: - return keep_running; + return TRUE; /* ERRORS */ eof: @@ -2945,42 +2868,87 @@ eof: } read_error: { - watch->readfd.events = 0; - watch->readfd.revents = 0; - g_source_remove_poll ((GSource *) watch, &watch->readfd); - keep_running = (watch->writefd.events != 0); - - if (keep_running) { - if (watch->funcs.error_full) - GST_RTSP_CHECK (watch->funcs.error_full (watch, res, &watch->message, - 0, watch->user_data), error); - else - goto error; - } else - goto eof; + if (watch->funcs.error_full) + GST_RTSP_CHECK (watch->funcs.error_full (watch, res, &watch->message, + 0, watch->user_data), error); } +error: + { + if (watch->funcs.error) + watch->funcs.error (watch, res, watch->user_data); + + return FALSE; + } +} + +static gboolean +gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, + gpointer user_data G_GNUC_UNUSED) +{ + return TRUE; +} + +static gboolean +gst_rtsp_source_dispatch_write (GPollableInputStream * stream, + GstRTSPWatch * watch) +{ + GstRTSPResult res = GST_RTSP_ERROR; + + g_mutex_lock (&watch->mutex); + do { + if (watch->write_data == NULL) { + GstRTSPRec *rec; + + /* get a new message from the queue */ + rec = g_queue_pop_tail (watch->messages); + if (rec == NULL) + break; + + watch->messages_bytes -= rec->size; + + watch->write_off = 0; + watch->write_data = rec->data; + watch->write_size = rec->size; + watch->write_id = rec->id; + + g_slice_free (GstRTSPRec, rec); + } + + res = write_bytes (watch->conn->output_stream, watch->write_data, + &watch->write_off, watch->write_size, FALSE, watch->conn->cancellable); + g_mutex_unlock (&watch->mutex); + + if (res == GST_RTSP_EINTR) + goto write_blocked; + else if (G_LIKELY (res == GST_RTSP_OK)) { + if (watch->funcs.message_sent) + watch->funcs.message_sent (watch, watch->write_id, watch->user_data); + } else { + goto write_error; + } + g_mutex_lock (&watch->mutex); + + g_free (watch->write_data); + watch->write_data = NULL; + } while (TRUE); + g_mutex_unlock (&watch->mutex); + +write_blocked: + return TRUE; + + /* ERRORS */ write_error: { - watch->writefd.events = 0; - watch->writefd.revents = 0; - g_source_remove_poll ((GSource *) watch, &watch->writefd); - keep_running = (watch->readfd.events != 0); - - if (keep_running) { - if (watch->funcs.error_full) - GST_RTSP_CHECK (watch->funcs.error_full (watch, res, NULL, - watch->write_id, watch->user_data), error); - else - goto error; - } else - goto eof; + if (watch->funcs.error_full) + GST_RTSP_CHECK (watch->funcs.error_full (watch, res, NULL, + watch->write_id, watch->user_data), error); } error: { if (watch->funcs.error) watch->funcs.error (watch, res, watch->user_data); - return keep_running; + return FALSE; } } @@ -3060,9 +3028,6 @@ gst_rtsp_watch_new (GstRTSPConnection * conn, g_mutex_init (&result->mutex); result->messages = g_queue_new (); - result->readfd.fd = -1; - result->writefd.fd = -1; - gst_rtsp_watch_reset (result); result->funcs = *funcs; @@ -3082,23 +3047,30 @@ gst_rtsp_watch_new (GstRTSPConnection * conn, void gst_rtsp_watch_reset (GstRTSPWatch * watch) { - if (watch->readfd.fd != -1) - g_source_remove_poll ((GSource *) watch, &watch->readfd); - if (watch->writefd.fd != -1) - g_source_remove_poll ((GSource *) watch, &watch->writefd); - - watch->readfd.fd = g_socket_get_fd (watch->conn->read_socket); - watch->readfd.events = READ_COND; - watch->readfd.revents = 0; + if (watch->readsrc) + g_source_remove_child_source ((GSource *) watch, watch->readsrc); + if (watch->writesrc) + g_source_remove_child_source ((GSource *) watch, watch->writesrc); - watch->writefd.fd = g_socket_get_fd (watch->conn->write_socket); - watch->writefd.events = WRITE_ERR; - watch->writefd.revents = 0; + watch->readsrc = + g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM + (watch->conn->input_stream), NULL); + g_source_set_callback (watch->readsrc, + (GSourceFunc) gst_rtsp_source_dispatch_read, watch, NULL); + watch->writesrc = + g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM + (watch->conn->output_stream), NULL); + g_source_set_callback (watch->writesrc, + (GSourceFunc) gst_rtsp_source_dispatch_write, watch, NULL); - if (watch->readfd.fd != -1) - g_source_add_poll ((GSource *) watch, &watch->readfd); - if (watch->writefd.fd != -1) - g_source_add_poll ((GSource *) watch, &watch->writefd); + if (watch->readsrc) { + g_source_add_child_source ((GSource *) watch, watch->readsrc); + g_source_unref (watch->readsrc); + } + if (watch->writesrc) { + g_source_add_child_source ((GSource *) watch, watch->writesrc); + g_source_unref (watch->writesrc); + } } /** @@ -3266,10 +3238,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, /* make sure the main context will now also check for writability on the * socket */ - if (watch->writefd.events != WRITE_COND) { - watch->writefd.events = WRITE_COND; - context = ((GSource *) watch)->context; - } + context = ((GSource *) watch)->context; if (id != NULL) *id = rec->id; -- 2.7.4