*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
*/
/*
* Unless otherwise indicated, Source Code is licensed under MIT license.
* SECTION:gstrtspconnection
* @short_description: manage RTSP connections
* @see_also: gstrtspurl
- *
+ *
* This object manages the RTSP connection to the server. It provides function
* to receive and send bytes and messages.
- *
+ *
* Last reviewed on 2007-07-24 (0.10.14)
*/
#include <glib.h>
#include <gst/gst.h>
-#include "gstrtspconnection.h"
+/* necessary for IP_TOS define */
+#if GLIB_CHECK_VERSION(2, 36, 0)
+#include <gio/gnetworking.h>
+#endif
-#include "gst/glib-compat-private.h"
+#include "gstrtspconnection.h"
#ifdef IP_TOS
union gst_sockaddr
struct _GstRTSPConnection
{
/*< private > */
- /* URL for the connection */
+ /* URL for the remote connection */
GstRTSPUrl *url;
+ gboolean server;
+ GSocketClient *client;
+ GIOStream *stream0;
+ GIOStream *stream1;
+
+ GInputStream *input_stream;
+ GOutputStream *output_stream;
+ /* this is a read source we add on the write socket in tunneled mode to be
+ * able to detect when client disconnects the GET channel */
+ GInputStream *control_stream;
+
/* connection state */
GSocket *read_socket;
GSocket *write_socket;
- gboolean manual_http;
GSocket *socket0, *socket1;
+ gboolean manual_http;
+ gboolean may_cancel;
GCancellable *cancellable;
gchar tunnelid[TUNNELID_LEN];
gboolean tunneled;
GstRTSPTunnelState tstate;
- gchar *ip;
+ /* the remote and local ip */
+ gchar *remote_ip;
+ gchar *local_ip;
gint read_ahead;
gchar *initial_buffer;
gsize initial_buffer_offset;
+ gboolean remember_session_id; /* remember the session id or not */
+
/* Session state */
gint cseq; /* sequence number */
gchar session_id[512]; /* session id */
gchar *passwd;
GHashTable *auth_params;
+ /* TLS */
+ GTlsDatabase *tls_database;
+
DecodeCtx ctx;
DecodeCtx *ctxp;
memset (builder, 0, sizeof (GstRTSPBuilder));
}
+static gboolean
+tls_accept_certificate (GTlsConnection * conn, GTlsCertificate * peer_cert,
+ GTlsCertificateFlags errors, GstRTSPConnection * rtspconn)
+{
+ GError *error = NULL;
+ gboolean accept = FALSE;
+
+ if (rtspconn->tls_database) {
+ GSocketConnectable *peer_identity;
+ GTlsCertificateFlags validation_flags;
+
+ GST_DEBUG ("TLS peer certificate not accepted, checking user database...");
+
+ peer_identity =
+ g_tls_client_connection_get_server_identity (G_TLS_CLIENT_CONNECTION
+ (conn));
+
+ errors =
+ g_tls_database_verify_chain (rtspconn->tls_database, peer_cert,
+ G_TLS_DATABASE_PURPOSE_AUTHENTICATE_SERVER, peer_identity,
+ g_tls_connection_get_interaction (conn), G_TLS_DATABASE_VERIFY_NONE,
+ NULL, &error);
+
+ if (error)
+ goto verify_error;
+
+ validation_flags = gst_rtsp_connection_get_tls_validation_flags (rtspconn);
+
+ accept = ((errors & validation_flags) == 0);
+ if (accept)
+ GST_DEBUG ("Peer certificate accepted");
+ else
+ GST_DEBUG ("Peer certificate not accepted (errors: 0x%08X)", errors);
+ }
+
+ return accept;
+
+/* ERRORS */
+verify_error:
+ {
+ GST_ERROR ("An error occurred while verifying the peer certificate: %s",
+ error->message);
+ g_clear_error (&error);
+ return FALSE;
+ }
+}
+
+static void
+socket_client_event (GSocketClient * client, GSocketClientEvent event,
+ GSocketConnectable * connectable, GIOStream * connection,
+ GstRTSPConnection * rtspconn)
+{
+ if (event == G_SOCKET_CLIENT_TLS_HANDSHAKING) {
+ GST_DEBUG ("TLS handshaking about to start...");
+
+ g_signal_connect (connection, "accept-certificate",
+ (GCallback) tls_accept_certificate, rtspconn);
+ }
+}
+
/**
* gst_rtsp_connection_create:
- * @url: a #GstRTSPUrl
- * @conn: storage for a #GstRTSPConnection
+ * @url: a #GstRTSPUrl
+ * @conn: (out) (transfer full): storage for a #GstRTSPConnection
*
* Create a newly allocated #GstRTSPConnection from @url and store it in @conn.
* The connection will not yet attempt to connect to @url, use
GstRTSPConnection *newconn;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
+ g_return_val_if_fail (url != NULL, GST_RTSP_EINVAL);
newconn = g_new0 (GstRTSPConnection, 1);
+ newconn->may_cancel = TRUE;
newconn->cancellable = g_cancellable_new ();
+ newconn->client = g_socket_client_new ();
+
+ if (url->transports & GST_RTSP_LOWER_TRANS_TLS)
+ g_socket_client_set_tls (newconn->client, TRUE);
+
+ g_signal_connect (newconn->client, "event", (GCallback) socket_client_event,
+ newconn);
newconn->url = gst_rtsp_url_copy (url);
newconn->timer = g_timer_new ();
newconn->timeout = 60;
newconn->cseq = 1;
+ newconn->remember_session_id = TRUE;
+
newconn->auth_method = GST_RTSP_AUTH_NONE;
newconn->username = NULL;
newconn->passwd = NULL;
return GST_RTSP_OK;
}
+static gboolean
+collect_addresses (GSocket * socket, gchar ** ip, guint16 * port,
+ gboolean remote, GError ** error)
+{
+ GSocketAddress *addr;
+
+ if (remote)
+ addr = g_socket_get_remote_address (socket, error);
+ else
+ addr = g_socket_get_local_address (socket, error);
+ if (!addr)
+ return FALSE;
+
+ if (ip)
+ *ip = g_inet_address_to_string (g_inet_socket_address_get_address
+ (G_INET_SOCKET_ADDRESS (addr)));
+ if (port)
+ *port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (addr));
+
+ g_object_unref (addr);
+
+ return TRUE;
+}
+
+
/**
* gst_rtsp_connection_create_from_socket:
* @socket: a #GSocket
* @ip: the IP address of the other end
* @port: the port used by the other end
* @initial_buffer: data already read from @fd
- * @conn: storage for a #GstRTSPConnection
+ * @conn: (out) (transfer full): storage for a #GstRTSPConnection
*
* Create a new #GstRTSPConnection for handling communication on the existing
- * socket @socket. The @initial_buffer contains any data already read from
- * @socket which should be used before starting to read new data.
+ * socket @socket. The @initial_buffer contains zero terminated data already
+ * read from @socket which should be used before starting to read new data.
*
* Returns: #GST_RTSP_OK when @conn contains a valid connection.
- *
- * Since: 0.10.25
*/
GstRTSPResult
gst_rtsp_connection_create_from_socket (GSocket * socket, const gchar * ip,
GstRTSPConnection *newconn = NULL;
GstRTSPUrl *url;
GstRTSPResult res;
+ GError *err = NULL;
+ gchar *local_ip;
+ GIOStream *stream;
g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL);
g_return_val_if_fail (ip != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
- /* set to non-blocking mode so that we can cancel the communication */
- g_socket_set_blocking (socket, FALSE);
+ if (!collect_addresses (socket, &local_ip, NULL, FALSE, &err))
+ goto getnameinfo_failed;
/* create a url for the client address */
url = g_new0 (GstRTSPUrl, 1);
GST_RTSP_CHECK (gst_rtsp_connection_create (url, &newconn), newconn_failed);
gst_rtsp_url_free (url);
+ stream = G_IO_STREAM (g_socket_connection_factory_create_connection (socket));
+
/* both read and write initially */
- newconn->socket0 = G_SOCKET (g_object_ref (socket));
- newconn->socket1 = G_SOCKET (g_object_ref (socket));
+ newconn->server = TRUE;
+ newconn->socket0 = socket;
+ newconn->stream0 = stream;
newconn->write_socket = newconn->read_socket = newconn->socket0;
-
- newconn->ip = g_strdup (ip);
-
+ newconn->input_stream = g_io_stream_get_input_stream (stream);
+ newconn->output_stream = g_io_stream_get_output_stream (stream);
+ newconn->control_stream = NULL;
+ newconn->remote_ip = g_strdup (ip);
+ newconn->local_ip = local_ip;
newconn->initial_buffer = g_strdup (initial_buffer);
*conn = newconn;
return GST_RTSP_OK;
/* ERRORS */
+getnameinfo_failed:
+ {
+ GST_ERROR ("failed to get local address: %s", err->message);
+ g_clear_error (&err);
+ return GST_RTSP_ERROR;
+ }
newconn_failed:
{
+ GST_ERROR ("failed to make connection");
+ g_free (local_ip);
gst_rtsp_url_free (url);
return res;
}
/**
* gst_rtsp_connection_accept:
* @socket: a socket
- * @conn: storage for a #GstRTSPConnection
+ * @conn: (out) (transfer full): storage for a #GstRTSPConnection
* @cancellable: a #GCancellable to cancel the operation
*
* Accept a new connection on @socket and create a new #GstRTSPConnection for
* handling communication on new socket.
*
* Returns: #GST_RTSP_OK when @conn contains a valid connection.
- *
- * Since: 0.10.23
*/
GstRTSPResult
gst_rtsp_connection_accept (GSocket * socket, GstRTSPConnection ** conn,
gchar *ip;
guint16 port;
GSocket *client_sock;
- GSocketAddress *addr;
GstRTSPResult ret;
g_return_val_if_fail (G_IS_SOCKET (socket), GST_RTSP_EINVAL);
if (!client_sock)
goto accept_failed;
- addr = g_socket_get_remote_address (client_sock, &err);
- if (!addr)
+ /* get the remote ip address and port */
+ if (!collect_addresses (client_sock, &ip, &port, TRUE, &err))
goto getnameinfo_failed;
- ip = g_inet_address_to_string (g_inet_socket_address_get_address
- (G_INET_SOCKET_ADDRESS (addr)));
- port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (addr));
-
ret =
gst_rtsp_connection_create_from_socket (client_sock, ip, port, NULL,
conn);
+ g_object_unref (client_sock);
g_free (ip);
return ret;
}
getnameinfo_failed:
{
+ GST_DEBUG ("getnameinfo failed: %s", err->message);
+ g_clear_error (&err);
if (!g_socket_close (client_sock, &err)) {
GST_DEBUG ("Closing socket failed: %s", err->message);
g_clear_error (&err);
}
}
-static gchar *
-do_resolve (const gchar * host, GCancellable * cancellable)
-{
- GResolver *resolver;
- GInetAddress *addr;
- GError *err = NULL;
- gchar *ip;
-
- addr = g_inet_address_new_from_string (host);
- if (!addr) {
- GList *results, *l;
-
- resolver = g_resolver_get_default ();
-
- results = g_resolver_lookup_by_name (resolver, host, cancellable, &err);
- if (!results)
- goto name_resolve;
-
- for (l = results; l; l = l->next) {
- GInetAddress *tmp = l->data;
-
- if (g_inet_address_get_family (tmp) == G_SOCKET_FAMILY_IPV4 ||
- g_inet_address_get_family (tmp) == G_SOCKET_FAMILY_IPV6) {
- addr = G_INET_ADDRESS (g_object_ref (tmp));
- break;
- }
+/**
+ * gst_rtsp_connection_get_tls:
+ * @conn: a #GstRTSPConnection
+ * @error: #GError for error reporting, or NULL to ignore.
+ *
+ * Get the TLS connection of @conn.
+ *
+ * For client side this will return the #GTlsClientConnection when connected
+ * over TLS.
+ *
+ * For server side connections, this function will create a GTlsServerConnection
+ * when called the first time and will return that same connection on subsequent
+ * calls. The server is then responsible for configuring the TLS connection.
+ *
+ * Returns: (transfer none): the TLS connection for @conn.
+ *
+ * Since: 1.2
+ */
+GTlsConnection *
+gst_rtsp_connection_get_tls (GstRTSPConnection * conn, GError ** error)
+{
+ GTlsConnection *result;
+
+ if (G_IS_TLS_CONNECTION (conn->stream0)) {
+ /* we already had one, return it */
+ result = G_TLS_CONNECTION (conn->stream0);
+ } else if (conn->server) {
+ /* no TLS connection but we are server, make one */
+ result = (GTlsConnection *)
+ g_tls_server_connection_new (conn->stream0, NULL, error);
+ if (result) {
+ g_object_unref (conn->stream0);
+ conn->stream0 = G_IO_STREAM (result);
+ conn->input_stream = g_io_stream_get_input_stream (conn->stream0);
+ conn->output_stream = g_io_stream_get_output_stream (conn->stream0);
}
-
- g_resolver_free_addresses (results);
- g_object_unref (resolver);
+ } else {
+ /* client */
+ result = NULL;
+ g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_FAILED,
+ "client not connected with TLS");
}
+ return result;
+}
- if (!addr)
- return NULL;
+/**
+ * gst_rtsp_connection_set_tls_validation_flags:
+ * @conn: a #GstRTSPConnection
+ * @flags: the validation flags.
+ *
+ * Sets the TLS validation flags to be used to verify the peer
+ * certificate when a TLS connection is established.
+ *
+ * Returns: TRUE if the validation flags are set correctly, or FALSE if
+ * @conn is NULL or is not a TLS connection.
+ *
+ * Since: 1.2.1
+ */
+gboolean
+gst_rtsp_connection_set_tls_validation_flags (GstRTSPConnection * conn,
+ GTlsCertificateFlags flags)
+{
+ gboolean res = FALSE;
- ip = g_inet_address_to_string (addr);
- g_object_unref (addr);
+ g_return_val_if_fail (conn != NULL, FALSE);
- return ip;
+ res = g_socket_client_get_tls (conn->client);
+ if (res)
+ g_socket_client_set_tls_validation_flags (conn->client, flags);
- /* ERRORS */
-name_resolve:
- {
- GST_ERROR ("failed to resolve %s: %s", host, err->message);
- g_clear_error (&err);
- g_object_unref (resolver);
- return NULL;
- }
+ return res;
}
-static GstRTSPResult
-do_connect (const gchar * ip, guint16 port, GSocket ** socket_out,
- GTimeVal * timeout, GCancellable * cancellable)
+/**
+ * gst_rtsp_connection_get_tls_validation_flags:
+ * @conn: a #GstRTSPConnection
+ *
+ * Gets the TLS validation flags used to verify the peer certificate
+ * when a TLS connection is established.
+ *
+ * Returns: the validationg flags.
+ *
+ * Since: 1.2.1
+ */
+GTlsCertificateFlags
+gst_rtsp_connection_get_tls_validation_flags (GstRTSPConnection * conn)
{
- GSocket *socket;
- GstClockTime to;
- GInetAddress *addr;
- GSocketAddress *saddr;
- GError *err = NULL;
+ g_return_val_if_fail (conn != NULL, 0);
- addr = g_inet_address_new_from_string (ip);
- g_assert (addr);
- saddr = g_inet_socket_address_new (addr, port);
- g_object_unref (addr);
-
- socket =
- g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM,
- G_SOCKET_PROTOCOL_TCP, &err);
- if (socket == NULL)
- goto no_socket;
+ return g_socket_client_get_tls_validation_flags (conn->client);
+}
- /* set to non-blocking mode so that we can cancel the connect */
- g_socket_set_blocking (socket, FALSE);
+/**
+ * gst_rtsp_connection_set_tls_database:
+ * @conn: a #GstRTSPConnection
+ * @database: a #GTlsDatabase
+ *
+ * Sets the anchor certificate authorities database. This certificate
+ * database will be used to verify the server's certificate in case it
+ * can't be verified with the default certificate database first.
+ *
+ * Since: 1.4
+ */
+void
+gst_rtsp_connection_set_tls_database (GstRTSPConnection * conn,
+ GTlsDatabase * database)
+{
+ GTlsDatabase *old_db;
- /* we are going to connect ASYNC now */
- if (!g_socket_connect (socket, saddr, cancellable, &err)) {
- if (!g_error_matches (err, G_IO_ERROR, G_IO_ERROR_PENDING))
- goto sys_error;
- g_clear_error (&err);
- } else {
- goto done;
- }
+ g_return_if_fail (conn != NULL);
- /* wait for connect to complete up to the specified timeout or until we got
- * interrupted. */
- to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
+ if (database)
+ g_object_ref (database);
- g_socket_set_timeout (socket, (to + GST_SECOND - 1) / GST_SECOND);
- if (!g_socket_condition_wait (socket, G_IO_OUT, cancellable, &err)) {
- g_socket_set_timeout (socket, 0);
- if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT))
- goto timeout;
- else
- goto sys_error;
- }
- g_socket_set_timeout (socket, 0);
+ old_db = conn->tls_database;
+ conn->tls_database = database;
- if (!g_socket_check_connect_result (socket, &err))
- goto sys_error;
+ if (old_db)
+ g_object_unref (old_db);
+}
-done:
- g_object_unref (saddr);
+/**
+ * gst_rtsp_connection_get_tls_database:
+ * @conn: a #GstRTSPConnection
+ *
+ * Gets the anchor certificate authorities database that will be used
+ * after a server certificate can't be verified with the default
+ * certificate database.
+ *
+ * Returns: (transfer full): the anchor certificate authorities database, or NULL if no
+ * database has been previously set. Use g_object_unref() to release the
+ * certificate database.
+ *
+ * Since: 1.4
+ */
+GTlsDatabase *
+gst_rtsp_connection_get_tls_database (GstRTSPConnection * conn)
+{
+ GTlsDatabase *result;
- *socket_out = socket;
+ g_return_val_if_fail (conn != NULL, NULL);
- return GST_RTSP_OK;
+ if ((result = conn->tls_database))
+ g_object_ref (result);
- /* ERRORS */
-no_socket:
- {
- GST_ERROR ("no socket: %s", err->message);
- g_clear_error (&err);
- g_object_unref (saddr);
- return GST_RTSP_ESYS;
- }
-sys_error:
- {
- GST_ERROR ("system error: %s", err->message);
- g_clear_error (&err);
- g_object_unref (saddr);
- g_object_unref (socket);
- return GST_RTSP_ESYS;
- }
-timeout:
- {
- GST_ERROR ("timeout");
- g_clear_error (&err);
- g_object_unref (saddr);
- g_object_unref (socket);
- return GST_RTSP_ETIMEOUT;
- }
+ return result;
}
static GstRTSPResult
-setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout)
+setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout, gchar * uri)
{
gint i;
GstRTSPResult res;
- gchar *ip;
- gchar *uri;
gchar *value;
- guint16 port, url_port;
- GstRTSPUrl *url;
- gchar *hostparam;
+ guint16 url_port;
GstRTSPMessage *msg;
GstRTSPMessage response;
gboolean old_http;
+ GstRTSPUrl *url;
+ GError *error = NULL;
+ GSocketConnection *connection;
+ GSocket *socket;
memset (&response, 0, sizeof (response));
gst_rtsp_message_init (&response);
+ url = conn->url;
+
/* create a random sessionid */
for (i = 0; i < TUNNELID_LEN; i++)
conn->tunnelid[i] = g_random_int_range ('a', 'z');
conn->tunnelid[TUNNELID_LEN - 1] = '\0';
- url = conn->url;
- /* get the port from the url */
- gst_rtsp_url_get_port (url, &url_port);
-
- if (conn->proxy_host) {
- uri = g_strdup_printf ("http://%s:%d%s%s%s", url->host, url_port,
- url->abspath, url->query ? "?" : "", url->query ? url->query : "");
- hostparam = g_strdup_printf ("%s:%d", url->host, url_port);
- ip = conn->proxy_host;
- port = conn->proxy_port;
- } else {
- uri = g_strdup_printf ("%s%s%s", url->abspath, url->query ? "?" : "",
- url->query ? url->query : "");
- hostparam = NULL;
- ip = conn->ip;
- port = url_port;
- }
-
/* create the GET request for the read connection */
GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_GET, uri),
no_message);
msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST;
- if (hostparam != NULL)
- gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, hostparam);
gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE,
conn->tunnelid);
gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT,
gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CACHE_CONTROL, "no-cache");
gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache");
- /* we start by writing to this fd */
- conn->write_socket = conn->socket0;
-
/* we need to temporarily set conn->tunneled to FALSE to prevent the HTTP
* request from being base64 encoded */
conn->tunneled = FALSE;
if (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_X_SERVER_IP_ADDRESS,
&value, 0) == GST_RTSP_OK) {
- if (conn->proxy_host) {
- /* if we use a proxy we need to change the destination url */
- g_free (url->host);
- url->host = g_strdup (value);
- g_free (hostparam);
- hostparam = g_strdup_printf ("%s:%d", url->host, url_port);
- } else {
- /* and resolve the new ip address */
- if (!(ip = do_resolve (value, conn->cancellable)))
- goto not_resolved;
- g_free (conn->ip);
- conn->ip = ip;
- }
+ g_free (url->host);
+ url->host = g_strdup (value);
+ g_free (conn->remote_ip);
+ conn->remote_ip = g_strdup (value);
}
+ gst_rtsp_url_get_port (url, &url_port);
+ uri = g_strdup_printf ("http://%s:%d%s%s%s", url->host, url_port,
+ url->abspath, url->query ? "?" : "", url->query ? url->query : "");
+
/* connect to the host/port */
- res = do_connect (ip, port, &conn->socket1, timeout, conn->cancellable);
- if (res != GST_RTSP_OK)
+ if (conn->proxy_host) {
+ connection = g_socket_client_connect_to_host (conn->client,
+ conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
+ } else {
+ connection = g_socket_client_connect_to_uri (conn->client,
+ uri, 0, conn->cancellable, &error);
+ }
+ if (connection == NULL)
goto connect_failed;
+ socket = g_socket_connection_get_socket (connection);
+
+ /* get remote address */
+ g_free (conn->remote_ip);
+ conn->remote_ip = NULL;
+
+ if (!collect_addresses (socket, &conn->remote_ip, NULL, TRUE, &error))
+ goto remote_address_failed;
+
/* this is now our writing socket */
+ conn->stream1 = G_IO_STREAM (connection);
+ conn->socket1 = socket;
conn->write_socket = conn->socket1;
+ conn->output_stream = g_io_stream_get_output_stream (conn->stream1);
+ conn->control_stream = NULL;
/* create the POST request for the write connection */
GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_POST, uri),
no_message);
msg->type = GST_RTSP_MESSAGE_HTTP_REQUEST;
- if (hostparam != NULL)
- gst_rtsp_message_add_header (msg, GST_RTSP_HDR_HOST, hostparam);
gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SESSIONCOOKIE,
conn->tunnelid);
gst_rtsp_message_add_header (msg, GST_RTSP_HDR_ACCEPT,
exit:
gst_rtsp_message_unset (&response);
- g_free (hostparam);
g_free (uri);
return res;
res = GST_RTSP_ERROR;
goto exit;
}
-not_resolved:
+connect_failed:
{
- GST_ERROR ("could not resolve %s", conn->ip);
- res = GST_RTSP_ENET;
+ GST_ERROR ("failed to connect: %s", error->message);
+ res = GST_RTSP_ERROR;
+ g_clear_error (&error);
goto exit;
}
-connect_failed:
+remote_address_failed:
{
- GST_ERROR ("failed to connect");
- goto exit;
+ GST_ERROR ("failed to resolve address: %s", error->message);
+ g_object_unref (connection);
+ g_clear_error (&error);
+ return GST_RTSP_ERROR;
}
}
/**
* gst_rtsp_connection_connect:
- * @conn: a #GstRTSPConnection
+ * @conn: a #GstRTSPConnection
* @timeout: a #GTimeVal timeout
*
* Attempt to connect to the url of @conn made with
gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
{
GstRTSPResult res;
- gchar *ip;
- guint16 port;
+ GSocketConnection *connection;
+ GSocket *socket;
+ GError *error = NULL;
+ gchar *uri, *remote_ip;
+ GstClockTime to;
+ guint16 url_port;
GstRTSPUrl *url;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL);
- g_return_val_if_fail (conn->socket0 == NULL, GST_RTSP_EINVAL);
+ g_return_val_if_fail (conn->stream0 == NULL, GST_RTSP_EINVAL);
+
+ to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
+ g_socket_client_set_timeout (conn->client,
+ (to + GST_SECOND - 1) / GST_SECOND);
url = conn->url;
- if (conn->proxy_host && conn->tunneled) {
- if (!(ip = do_resolve (conn->proxy_host, conn->cancellable))) {
- GST_ERROR ("could not resolve %s", conn->proxy_host);
- goto not_resolved;
- }
- port = conn->proxy_port;
- g_free (conn->proxy_host);
- conn->proxy_host = ip;
- } else {
- if (!(ip = do_resolve (url->host, conn->cancellable))) {
- GST_ERROR ("could not resolve %s", url->host);
- goto not_resolved;
- }
- /* get the port from the url */
- gst_rtsp_url_get_port (url, &port);
+ gst_rtsp_url_get_port (url, &url_port);
- g_free (conn->ip);
- conn->ip = ip;
+ if (conn->tunneled) {
+ uri = g_strdup_printf ("http://%s:%d%s%s%s", url->host, url_port,
+ url->abspath, url->query ? "?" : "", url->query ? url->query : "");
+ } else {
+ uri = gst_rtsp_url_get_request_uri (url);
}
- /* connect to the host/port */
- res = do_connect (ip, port, &conn->socket0, timeout, conn->cancellable);
- if (res != GST_RTSP_OK)
+ if (conn->proxy_host) {
+ connection = g_socket_client_connect_to_host (conn->client,
+ conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
+ } else {
+ connection = g_socket_client_connect_to_uri (conn->client,
+ uri, url_port, conn->cancellable, &error);
+ }
+ if (connection == NULL)
goto connect_failed;
- /* this is our read URL */
+ /* get remote address */
+ socket = g_socket_connection_get_socket (connection);
+
+ if (!collect_addresses (socket, &remote_ip, NULL, TRUE, &error))
+ goto remote_address_failed;
+
+ g_free (conn->remote_ip);
+ conn->remote_ip = remote_ip;
+ conn->stream0 = G_IO_STREAM (connection);
+ conn->socket0 = socket;
+ /* this is our read socket */
conn->read_socket = conn->socket0;
+ conn->write_socket = conn->socket0;
+ conn->input_stream = g_io_stream_get_input_stream (conn->stream0);
+ conn->output_stream = g_io_stream_get_output_stream (conn->stream0);
+ conn->control_stream = NULL;
if (conn->tunneled) {
- res = setup_tunneling (conn, timeout);
+ res = setup_tunneling (conn, timeout, uri);
if (res != GST_RTSP_OK)
goto tunneling_failed;
- } else {
- conn->write_socket = conn->socket0;
}
+ g_free (uri);
return GST_RTSP_OK;
-not_resolved:
+ /* ERRORS */
+connect_failed:
{
- return GST_RTSP_ENET;
+ GST_ERROR ("failed to connect: %s", error->message);
+ g_clear_error (&error);
+ return GST_RTSP_ERROR;
}
-connect_failed:
+remote_address_failed:
{
- GST_ERROR ("failed to connect");
- return res;
+ GST_ERROR ("failed to connect: %s", error->message);
+ g_object_unref (connection);
+ g_clear_error (&error);
+ return GST_RTSP_ERROR;
}
tunneling_failed:
{
}
static GstRTSPResult
-write_bytes (GSocket * socket, const guint8 * buffer, guint * idx, guint size,
- GCancellable * cancellable)
+write_bytes (GOutputStream * stream, const guint8 * buffer, guint * idx,
+ guint size, gboolean block, GCancellable * cancellable)
{
guint left;
+ gssize r;
+ GError *err = NULL;
if (G_UNLIKELY (*idx > size))
return GST_RTSP_ERROR;
left = size - *idx;
while (left) {
- GError *err = NULL;
- gssize r;
+ if (block)
+ r = g_output_stream_write (stream, (gchar *) & buffer[*idx], left,
+ cancellable, &err);
+ else
+ r = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM
+ (stream), (gchar *) & buffer[*idx], left, cancellable, &err);
+ if (G_UNLIKELY (r < 0))
+ goto error;
+
+ left -= r;
+ *idx += r;
+ }
+ return GST_RTSP_OK;
+
+ /* ERRORS */
+error:
+ {
+ if (G_UNLIKELY (r == 0))
+ return GST_RTSP_EEOF;
- r = g_socket_send (socket, (gchar *) & buffer[*idx], left, cancellable,
- &err);
- if (G_UNLIKELY (r == 0)) {
+ GST_DEBUG ("%s", err->message);
+ if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+ g_clear_error (&err);
return GST_RTSP_EINTR;
- } else if (G_UNLIKELY (r < 0)) {
- if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
- g_clear_error (&err);
- return GST_RTSP_EINTR;
- }
+ } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
g_clear_error (&err);
- return GST_RTSP_ESYS;
- } else {
- left -= r;
- *idx += r;
+ return GST_RTSP_EINTR;
+ } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
+ g_clear_error (&err);
+ return GST_RTSP_ETIMEOUT;
}
+ g_clear_error (&err);
+ return GST_RTSP_ESYS;
}
- return GST_RTSP_OK;
}
static gint
fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
- GError ** err)
+ gboolean block, GError ** err)
{
gint out = 0;
if (G_LIKELY (size > (guint) out)) {
gssize r;
+ gsize count = size - out;
+ if (block)
+ r = g_input_stream_read (conn->input_stream, (gchar *) & buffer[out],
+ count, conn->may_cancel ? conn->cancellable : NULL, err);
+ else
+ r = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM
+ (conn->input_stream), (gchar *) & buffer[out], count,
+ conn->may_cancel ? conn->cancellable : NULL, err);
- r = g_socket_receive (conn->read_socket, (gchar *) & buffer[out],
- size - out, conn->cancellable, err);
- if (r <= 0) {
- if (out == 0)
+ if (G_UNLIKELY (r < 0)) {
+ if (out == 0) {
+ /* propagate the error */
out = r;
+ } else {
+ /* we have some data ignore error */
+ g_clear_error (err);
+ }
} else
out += r;
}
static gint
fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
- GError ** err)
+ gboolean block, GError ** err)
{
DecodeCtx *ctx = conn->ctxp;
gint out = 0;
break;
/* try to read more bytes */
- r = fill_raw_bytes (conn, in, sizeof (in), err);
+ r = fill_raw_bytes (conn, in, sizeof (in), block, err);
if (r <= 0) {
if (out == 0)
out = r;
&ctx->save);
}
} else {
- out = fill_raw_bytes (conn, buffer, size, err);
+ out = fill_raw_bytes (conn, buffer, size, block, err);
}
return out;
}
static GstRTSPResult
-read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size)
+read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size,
+ gboolean block)
{
guint left;
+ gint r;
GError *err = NULL;
if (G_UNLIKELY (*idx > size))
left = size - *idx;
while (left) {
- gint r;
+ r = fill_bytes (conn, &buffer[*idx], left, block, &err);
+ if (G_UNLIKELY (r <= 0))
+ goto error;
+
+ left -= r;
+ *idx += r;
+ }
+ return GST_RTSP_OK;
- r = fill_bytes (conn, &buffer[*idx], left, &err);
- if (G_UNLIKELY (r == 0)) {
+ /* ERRORS */
+error:
+ {
+ if (G_UNLIKELY (r == 0))
return GST_RTSP_EEOF;
- } else if (G_UNLIKELY (r < 0)) {
- if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
- g_clear_error (&err);
- return GST_RTSP_EINTR;
- }
+
+ GST_DEBUG ("%s", err->message);
+ if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
g_clear_error (&err);
- return GST_RTSP_ESYS;
- } else {
- left -= r;
- *idx += r;
+ return GST_RTSP_EINTR;
+ } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ g_clear_error (&err);
+ return GST_RTSP_EINTR;
+ } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
+ g_clear_error (&err);
+ return GST_RTSP_ETIMEOUT;
}
+ g_clear_error (&err);
+ return GST_RTSP_ESYS;
}
- return GST_RTSP_OK;
}
/* The code below tries to handle clients using \r, \n or \r\n to indicate the
* the method used in RTSP (and HTTP) to break long lines.
*/
static GstRTSPResult
-read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size)
+read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size,
+ gboolean block)
{
- GError *err = NULL;
+ GstRTSPResult res;
while (TRUE) {
guint8 c;
- gint r;
+ guint i;
if (conn->read_ahead == READ_AHEAD_EOH) {
/* the last call to read_line() already determined that we have reached
conn->read_ahead = 0;
} else {
/* read the next character */
- r = fill_bytes (conn, &c, 1, &err);
- if (G_UNLIKELY (r == 0)) {
- return GST_RTSP_EEOF;
- } else if (G_UNLIKELY (r < 0)) {
- if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
- g_clear_error (&err);
- return GST_RTSP_EINTR;
- }
-
- g_clear_error (&err);
- return GST_RTSP_ESYS;
- }
+ i = 0;
+ res = read_bytes (conn, &c, &i, 1, block);
+ if (G_UNLIKELY (res != GST_RTSP_OK))
+ return res;
}
/* special treatment of line endings */
retry:
/* need to read ahead one more character to know what to do... */
- r = fill_bytes (conn, &read_ahead, 1, &err);
- if (G_UNLIKELY (r == 0)) {
- return GST_RTSP_EEOF;
- } else if (G_UNLIKELY (r < 0)) {
- if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
- /* remember the original character we read and try again next time */
- if (conn->read_ahead == 0)
- conn->read_ahead = c;
- return GST_RTSP_EINTR;
- g_clear_error (&err);
- return GST_RTSP_EINTR;
- }
-
- g_clear_error (&err);
- return GST_RTSP_ESYS;
- }
+ i = 0;
+ res = read_bytes (conn, &read_ahead, &i, 1, block);
+ if (G_UNLIKELY (res != GST_RTSP_OK))
+ return res;
if (read_ahead == ' ' || read_ahead == '\t') {
if (conn->read_ahead == READ_AHEAD_CRLFCR) {
* Attempt to write @size bytes of @data to the connected @conn, blocking up to
* the specified @timeout. @timeout can be #NULL, in which case this function
* might block forever.
- *
+ *
* This function can be cancelled with gst_rtsp_connection_flush().
*
* Returns: #GST_RTSP_OK on success.
guint offset;
GstClockTime to;
GstRTSPResult res;
- GError *err = NULL;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
- g_return_val_if_fail (conn->write_socket != NULL, GST_RTSP_EINVAL);
-
- to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
+ g_return_val_if_fail (conn->output_stream != NULL, GST_RTSP_EINVAL);
offset = 0;
-
- while (TRUE) {
- /* try to write */
- res =
- write_bytes (conn->write_socket, data, &offset, size,
- conn->cancellable);
- if (G_LIKELY (res == GST_RTSP_OK))
- break;
- if (G_UNLIKELY (res != GST_RTSP_EINTR))
- goto write_error;
-
- /* not all is written, wait until we can write more */
- g_socket_set_timeout (conn->write_socket,
- (to + GST_SECOND - 1) / GST_SECOND);
- if (!g_socket_condition_wait (conn->write_socket,
- G_IO_OUT | G_IO_ERR | G_IO_HUP, conn->cancellable, &err)) {
- g_socket_set_timeout (conn->write_socket, 0);
- if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)) {
- g_clear_error (&err);
- goto stopped;
- } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
- g_clear_error (&err);
- goto timeout;
- }
- g_clear_error (&err);
- goto select_error;
- }
- g_socket_set_timeout (conn->write_socket, 0);
- }
- return GST_RTSP_OK;
-
- /* ERRORS */
-timeout:
- {
- return GST_RTSP_ETIMEOUT;
- }
-select_error:
- {
- return GST_RTSP_ESYS;
- }
-stopped:
- {
- return GST_RTSP_EINTR;
- }
-write_error:
- {
- return res;
- }
+
+ to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
+
+ g_socket_set_timeout (conn->write_socket, (to + GST_SECOND - 1) / GST_SECOND);
+ res =
+ write_bytes (conn->output_stream, data, &offset, size, TRUE,
+ conn->cancellable);
+ g_socket_set_timeout (conn->write_socket, 0);
+
+ return res;
}
static GString *
* Attempt to send @message to the connected @conn, blocking up to
* the specified @timeout. @timeout can be #NULL, in which case this function
* might block forever.
- *
+ *
* This function can be cancelled with gst_rtsp_connection_flush().
*
* Returns: #GST_RTSP_OK on success.
*/
static GstRTSPResult
build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
- GstRTSPConnection * conn)
+ GstRTSPConnection * conn, gboolean block)
{
GstRTSPResult res;
builder->offset = 0;
res =
- read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1);
+ read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1,
+ block);
if (res != GST_RTSP_OK)
goto done;
if (c == '$') {
/* data message, prepare for the header */
builder->state = STATE_DATA_HEADER;
+ conn->may_cancel = FALSE;
} else if (c == '\n' || c == '\r') {
/* skip \n and \r */
builder->offset = 0;
} else {
builder->line = 0;
builder->state = STATE_READ_LINES;
+ conn->may_cancel = FALSE;
}
break;
}
case STATE_DATA_HEADER:
{
res =
- read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 4);
+ read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 4,
+ block);
if (res != GST_RTSP_OK)
goto done;
{
res =
read_bytes (conn, builder->body_data, &builder->offset,
- builder->body_len);
+ builder->body_len, block);
if (res != GST_RTSP_OK)
goto done;
case STATE_READ_LINES:
{
res = read_line (conn, builder->buffer, &builder->offset,
- sizeof (builder->buffer));
+ sizeof (builder->buffer), block);
if (res != GST_RTSP_OK)
goto done;
gchar *session_cookie;
gchar *session_id;
+ conn->may_cancel = TRUE;
+
if (message->type == GST_RTSP_MESSAGE_DATA) {
/* data messages don't have headers */
res = GST_RTSP_OK;
}
/* make sure to not overflow */
- strncpy (conn->session_id, session_id, maxlen);
- conn->session_id[maxlen] = '\0';
+ if (conn->remember_session_id) {
+ strncpy (conn->session_id, session_id, maxlen);
+ conn->session_id[maxlen] = '\0';
+ }
}
res = builder->status;
goto done;
guint offset;
GstClockTime to;
GstRTSPResult res;
- GError *err = NULL;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
offset = 0;
/* configure timeout if any */
- to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
+ to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
- while (TRUE) {
- res = read_bytes (conn, data, &offset, size);
- if (G_UNLIKELY (res == GST_RTSP_EEOF))
- goto eof;
- if (G_LIKELY (res == GST_RTSP_OK))
- break;
- if (G_UNLIKELY (res != GST_RTSP_EINTR))
- goto read_error;
-
- g_socket_set_timeout (conn->read_socket,
- (to + GST_SECOND - 1) / GST_SECOND);
- if (!g_socket_condition_wait (conn->read_socket,
- G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, conn->cancellable,
- &err)) {
- g_socket_set_timeout (conn->read_socket, 0);
- if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)) {
- g_clear_error (&err);
- goto stopped;
- } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
- g_clear_error (&err);
- goto select_timeout;
- }
- g_clear_error (&err);
- goto select_error;
- }
- g_socket_set_timeout (conn->read_socket, 0);
- }
- return GST_RTSP_OK;
+ g_socket_set_timeout (conn->read_socket, (to + GST_SECOND - 1) / GST_SECOND);
+ res = read_bytes (conn, data, &offset, size, TRUE);
+ g_socket_set_timeout (conn->read_socket, 0);
- /* ERRORS */
-select_error:
- {
- return GST_RTSP_ESYS;
- }
-select_timeout:
- {
- return GST_RTSP_ETIMEOUT;
- }
-stopped:
- {
- return GST_RTSP_EINTR;
- }
-eof:
- {
- return GST_RTSP_EEOF;
- }
-read_error:
- {
- return res;
- }
+ return res;
}
static GstRTSPMessage *
gst_rtsp_message_add_header (msg, GST_RTSP_HDR_PRAGMA, "no-cache");
if (code == GST_RTSP_STS_OK) {
- if (conn->ip)
+ /* add the local ip address to the tunnel reply, this is where the client
+ * should send the POST request to */
+ if (conn->local_ip)
gst_rtsp_message_add_header (msg, GST_RTSP_HDR_X_SERVER_IP_ADDRESS,
- conn->ip);
+ conn->local_ip);
gst_rtsp_message_add_header (msg, GST_RTSP_HDR_CONTENT_TYPE,
"application/x-rtsp-tunnelled");
}
* Attempt to read into @message from the connected @conn, blocking up to
* the specified @timeout. @timeout can be #NULL, in which case this function
* might block forever.
- *
+ *
* This function can be cancelled with gst_rtsp_connection_flush().
*
* Returns: #GST_RTSP_OK on success.
GstRTSPResult res;
GstRTSPBuilder builder;
GstClockTime to;
- GError *err = NULL;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (conn->read_socket != NULL, GST_RTSP_EINVAL);
/* configure timeout if any */
- to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : GST_CLOCK_TIME_NONE;
+ to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
+ g_socket_set_timeout (conn->read_socket, (to + GST_SECOND - 1) / GST_SECOND);
memset (&builder, 0, sizeof (GstRTSPBuilder));
- while (TRUE) {
- res = build_next (&builder, message, conn);
- if (G_UNLIKELY (res == GST_RTSP_EEOF))
- goto eof;
- else if (G_LIKELY (res == GST_RTSP_OK)) {
- if (!conn->manual_http) {
- if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
- if (conn->tstate == TUNNEL_STATE_NONE &&
- message->type_data.request.method == GST_RTSP_GET) {
- GstRTSPMessage *response;
-
- conn->tstate = TUNNEL_STATE_GET;
-
- /* tunnel GET request, we can reply now */
- response = gen_tunnel_reply (conn, GST_RTSP_STS_OK, message);
- res = gst_rtsp_connection_send (conn, response, timeout);
- gst_rtsp_message_free (response);
- if (res == GST_RTSP_OK)
- res = GST_RTSP_ETGET;
- goto cleanup;
- } else if (conn->tstate == TUNNEL_STATE_NONE &&
- message->type_data.request.method == GST_RTSP_POST) {
- conn->tstate = TUNNEL_STATE_POST;
-
- /* tunnel POST request, the caller now has to link the two
- * connections. */
- res = GST_RTSP_ETPOST;
- goto cleanup;
- } else {
- res = GST_RTSP_EPARSE;
- goto cleanup;
- }
- } else if (message->type == GST_RTSP_MESSAGE_HTTP_RESPONSE) {
- res = GST_RTSP_EPARSE;
- goto cleanup;
- }
- }
-
- break;
- } else if (G_UNLIKELY (res != GST_RTSP_EINTR))
- goto read_error;
-
- g_socket_set_timeout (conn->read_socket,
- (to + GST_SECOND - 1) / GST_SECOND);
- if (!g_socket_condition_wait (conn->read_socket,
- G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, conn->cancellable,
- &err)) {
- g_socket_set_timeout (conn->read_socket, 0);
- if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)) {
- g_clear_error (&err);
- goto stopped;
- } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
- g_clear_error (&err);
- goto select_timeout;
+ res = build_next (&builder, message, conn, TRUE);
+ g_socket_set_timeout (conn->read_socket, 0);
+
+ if (G_UNLIKELY (res != GST_RTSP_OK))
+ goto read_error;
+
+ if (!conn->manual_http) {
+ if (message->type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
+ if (conn->tstate == TUNNEL_STATE_NONE &&
+ message->type_data.request.method == GST_RTSP_GET) {
+ GstRTSPMessage *response;
+
+ conn->tstate = TUNNEL_STATE_GET;
+
+ /* tunnel GET request, we can reply now */
+ response = gen_tunnel_reply (conn, GST_RTSP_STS_OK, message);
+ res = gst_rtsp_connection_send (conn, response, timeout);
+ gst_rtsp_message_free (response);
+ if (res == GST_RTSP_OK)
+ res = GST_RTSP_ETGET;
+ goto cleanup;
+ } else if (conn->tstate == TUNNEL_STATE_NONE &&
+ message->type_data.request.method == GST_RTSP_POST) {
+ conn->tstate = TUNNEL_STATE_POST;
+
+ /* tunnel POST request, the caller now has to link the two
+ * connections. */
+ res = GST_RTSP_ETPOST;
+ goto cleanup;
+ } else {
+ res = GST_RTSP_EPARSE;
+ goto cleanup;
}
- g_clear_error (&err);
- goto select_error;
+ } else if (message->type == GST_RTSP_MESSAGE_HTTP_RESPONSE) {
+ res = GST_RTSP_EPARSE;
+ goto cleanup;
}
- g_socket_set_timeout (conn->read_socket, 0);
}
/* we have a message here */
return GST_RTSP_OK;
/* ERRORS */
-select_error:
- {
- res = GST_RTSP_ESYS;
- goto cleanup;
- }
-select_timeout:
- {
- res = GST_RTSP_ETIMEOUT;
- goto cleanup;
- }
-stopped:
- {
- res = GST_RTSP_EINTR;
- goto cleanup;
- }
-eof:
- {
- res = GST_RTSP_EEOF;
- goto cleanup;
- }
read_error:
cleanup:
{
*
* Close the connected @conn. After this call, the connection is in the same
* state as when it was first created.
- *
+ *
* Returns: #GST_RTSP_OK on success.
*/
GstRTSPResult
/* last unref closes the connection we don't want to explicitly close here
* because these sockets might have been provided at construction */
- if (conn->socket0) {
- g_object_unref (conn->socket0);
+ if (conn->stream0) {
+ g_object_unref (conn->stream0);
+ conn->stream0 = NULL;
conn->socket0 = NULL;
}
- if (conn->socket1) {
- g_object_unref (conn->socket1);
+ if (conn->stream1) {
+ g_object_unref (conn->stream1);
+ conn->stream1 = NULL;
conn->socket1 = NULL;
}
- g_free (conn->ip);
- conn->ip = NULL;
+ /* these were owned by the stream */
+ conn->input_stream = NULL;
+ conn->output_stream = NULL;
+
+ g_free (conn->remote_ip);
+ conn->remote_ip = NULL;
+ g_free (conn->local_ip);
+ conn->local_ip = NULL;
conn->read_ahead = 0;
* @conn: a #GstRTSPConnection
*
* Close and free @conn.
- *
+ *
* Returns: #GST_RTSP_OK on success.
*/
GstRTSPResult
if (conn->cancellable)
g_object_unref (conn->cancellable);
+ if (conn->client)
+ g_object_unref (conn->client);
+ if (conn->tls_database)
+ g_object_unref (conn->tls_database);
g_timer_destroy (conn->timer);
gst_rtsp_url_free (conn->url);
* gst_rtsp_connection_poll:
* @conn: a #GstRTSPConnection
* @events: a bitmask of #GstRTSPEvent flags to check
- * @revents: location for result flags
+ * @revents: location for result flags
* @timeout: a timeout
*
* Wait up to the specified @timeout for the connection to become available for
* @timeout can be #NULL, in which case this function might block forever.
*
* This function can be cancelled with gst_rtsp_connection_flush().
- *
- * Returns: #GST_RTSP_OK on success.
*
- * Since: 0.10.15
+ * Returns: #GST_RTSP_OK on success.
*/
GstRTSPResult
gst_rtsp_connection_poll (GstRTSPConnection * conn, GstRTSPEvent events,
* Start or stop the flushing action on @conn. When flushing, all current
* and future actions on @conn will return #GST_RTSP_EINTR until the connection
* is set to non-flushing mode again.
- *
+ *
* Returns: #GST_RTSP_OK.
*/
GstRTSPResult
* @port: the proxy port
*
* Set the proxy host and port.
- *
- * Returns: #GST_RTSP_OK.
*
- * Since: 0.10.23
+ * Returns: #GST_RTSP_OK.
*/
GstRTSPResult
gst_rtsp_connection_set_proxy (GstRTSPConnection * conn,
*
* Configure @conn for authentication mode @method with @user and @pass as the
* user and password respectively.
- *
+ *
* Returns: #GST_RTSP_OK.
*/
GstRTSPResult
* #GST_RTSP_AUTH_DIGEST, directives should be taken from the digest challenge
* in the WWW-Authenticate response header and can include realm, domain,
* nonce, opaque, stale, algorithm, qop as per RFC2617.
- *
- * Since: 0.10.20
*/
void
gst_rtsp_connection_set_auth_param (GstRTSPConnection * conn,
* @conn: a #GstRTSPConnection
*
* Clear the list of authentication directives stored in @conn.
- *
- * Since: 0.10.20
*/
void
gst_rtsp_connection_clear_auth_params (GstRTSPConnection * conn)
set_qos_dscp (GSocket * socket, guint qos_dscp)
{
#ifndef IP_TOS
+ GST_FIXME ("IP_TOS socket option is not defined, not setting dscp");
return GST_RTSP_OK;
#else
gint fd;
* Configure @conn to use the specified DSCP value.
*
* Returns: #GST_RTSP_OK on success.
- *
- * Since: 0.10.20
*/
GstRTSPResult
gst_rtsp_connection_set_qos_dscp (GstRTSPConnection * conn, guint qos_dscp)
*
* Returns: The URL. This value remains valid until the
* connection is freed.
- *
- * Since: 0.10.23
*/
GstRTSPUrl *
gst_rtsp_connection_get_url (const GstRTSPConnection * conn)
*
* Returns: The IP address as a string. this value remains valid until the
* connection is closed.
- *
- * Since: 0.10.20
*/
const gchar *
gst_rtsp_connection_get_ip (const GstRTSPConnection * conn)
{
g_return_val_if_fail (conn != NULL, NULL);
- return conn->ip;
+ return conn->remote_ip;
}
/**
* @ip: an ip address
*
* Set the IP address of the server.
- *
- * Since: 0.10.23
*/
void
gst_rtsp_connection_set_ip (GstRTSPConnection * conn, const gchar * ip)
{
g_return_if_fail (conn != NULL);
- g_free (conn->ip);
- conn->ip = g_strdup (ip);
+ g_free (conn->remote_ip);
+ conn->remote_ip = g_strdup (ip);
}
/**
*
* Get the file descriptor for reading.
*
- * Returns: the file descriptor used for reading or %NULL on error. The file
- * descriptor remains valid until the connection is closed.
- *
- * Since: 0.10.23
+ * Returns: (transfer none): the file descriptor used for reading or %NULL on
+ * error. The file descriptor remains valid until the connection is closed.
*/
GSocket *
gst_rtsp_connection_get_read_socket (const GstRTSPConnection * conn)
*
* Get the file descriptor for writing.
*
- * Returns: the file descriptor used for writing or NULL on error. The file
- * descriptor remains valid until the connection is closed.
- *
- * Since: 0.10.23
+ * Returns: (transfer none): the file descriptor used for writing or NULL on
+ * error. The file descriptor remains valid until the connection is closed.
*/
GSocket *
gst_rtsp_connection_get_write_socket (const GstRTSPConnection * conn)
* By setting the HTTP mode to %TRUE the message parsing will support HTTP
* messages in addition to the RTSP messages. It will also disable the
* automatic handling of setting up an HTTP tunnel.
- *
- * Since: 0.10.25
*/
void
gst_rtsp_connection_set_http_mode (GstRTSPConnection * conn, gboolean enable)
*
* Set the HTTP tunneling state of the connection. This must be configured before
* the @conn is connected.
- *
- * Since: 0.10.23
*/
void
gst_rtsp_connection_set_tunneled (GstRTSPConnection * conn, gboolean tunneled)
* gst_rtsp_connection_is_tunneled:
* @conn: a #GstRTSPConnection
*
- * Get the tunneling state of the connection.
+ * Get the tunneling state of the connection.
*
* Returns: if @conn is using HTTP tunneling.
- *
- * Since: 0.10.23
*/
gboolean
gst_rtsp_connection_is_tunneled (const GstRTSPConnection * conn)
* gst_rtsp_connection_get_tunnelid:
* @conn: a #GstRTSPConnection
*
- * Get the tunnel session id the connection.
+ * Get the tunnel session id the connection.
*
* Returns: returns a non-empty string if @conn is being tunneled over HTTP.
- *
- * Since: 0.10.23
*/
const gchar *
gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn)
* @conn.
*
* Returns: return GST_RTSP_OK on success.
- *
- * Since: 0.10.23
*/
GstRTSPResult
gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
/* both connections have socket0 as the read/write socket. start by taking the
* socket from conn2 and set it as the socket in conn */
conn->socket1 = conn2->socket0;
+ conn->stream1 = conn2->stream0;
+ conn->input_stream = conn2->input_stream;
+ conn->control_stream = g_io_stream_get_input_stream (conn->stream0);
/* clean up some of the state of conn2 */
g_cancellable_cancel (conn2->cancellable);
- conn2->socket0 = 0;
- g_object_unref (conn2->socket1);
- conn2->socket1 = NULL;
conn2->write_socket = conn2->read_socket = NULL;
+ conn2->socket0 = NULL;
+ conn2->stream0 = NULL;
+ conn2->input_stream = NULL;
+ conn2->output_stream = NULL;
+ conn2->control_stream = NULL;
g_cancellable_reset (conn2->cancellable);
/* We make socket0 the write socket and socket1 the read socket. */
conn->read_socket = conn->socket1;
conn->tstate = TUNNEL_STATE_COMPLETE;
+
+ g_free (conn->initial_buffer);
+ conn->initial_buffer = conn2->initial_buffer;
+ conn2->initial_buffer = NULL;
+ conn->initial_buffer_offset = conn2->initial_buffer_offset;
}
/* we need base64 decoding for the readfd */
return GST_RTSP_OK;
}
+/**
+ * gst_rtsp_connection_set_remember_session_id:
+ * @conn: a #GstRTSPConnection
+ * @remember: %TRUE if the connection should remember the session id
+ *
+ * Sets if the #GstRTSPConnection should remember the session id from the last
+ * response received and force it onto any further requests.
+ *
+ * The default value is %TRUE
+ */
+
+void
+gst_rtsp_connection_set_remember_session_id (GstRTSPConnection * conn,
+ gboolean remember)
+{
+ conn->remember_session_id = remember;
+ if (!remember)
+ conn->session_id[0] = '\0';
+}
+
+/**
+ * gst_rtsp_connection_get_remember_session_id:
+ * @conn: a #GstRTSPConnection
+ *
+ * Returns: %TRUE if the #GstRTSPConnection remembers the session id in the
+ * last response to set it on any further request.
+ */
+
+gboolean
+gst_rtsp_connection_get_remember_session_id (GstRTSPConnection * conn)
+{
+ return conn->remember_session_id;
+}
+
+
#define READ_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
#define READ_COND (G_IO_IN | READ_ERR)
#define WRITE_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
GstRTSPBuilder builder;
GstRTSPMessage message;
- GPollFD readfd;
- GPollFD writefd;
+ GSource *readsrc;
+ GSource *writesrc;
+ GSource *controlsrc;
+
+ gboolean keep_running;
/* queued message for transmission */
guint id;
- GMutex *mutex;
+ GMutex mutex;
GQueue *messages;
+ gsize messages_bytes;
guint8 *write_data;
guint write_off;
guint write_size;
guint write_id;
+ gsize max_bytes;
+ guint max_messages;
GstRTSPWatchFuncs funcs;
static gboolean
gst_rtsp_source_check (GSource * source)
{
- GstRTSPWatch *watch = (GstRTSPWatch *) source;
+ return FALSE;
+}
- if (watch->readfd.revents & READ_COND)
- return TRUE;
+static gboolean
+gst_rtsp_source_dispatch_read_get_channel (GPollableInputStream * stream,
+ GstRTSPWatch * watch)
+{
+ gssize count;
+ guint8 buffer[1024];
+ GError *error = NULL;
+
+ /* try to read in order to be able to detect errors, we read 1k in case some
+ * client actually decides to send data on the GET channel */
+ count = g_pollable_input_stream_read_nonblocking (stream, buffer, 1024, NULL,
+ &error);
+ if (count == 0) {
+ /* other end closed the socket */
+ goto eof;
+ }
+
+ if (count < 0) {
+ GST_DEBUG ("%s", error->message);
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
+ g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
+ g_clear_error (&error);
+ goto done;
+ }
+ g_clear_error (&error);
+ goto read_error;
+ }
- if (watch->writefd.revents & WRITE_COND)
- return TRUE;
+ /* client sent data on the GET channel, ignore it */
- return FALSE;
+done:
+ return TRUE;
+
+ /* ERRORS */
+eof:
+ {
+ if (watch->funcs.closed)
+ watch->funcs.closed (watch, watch->user_data);
+
+ /* the read connection was closed, stop the watch now */
+ watch->keep_running = FALSE;
+
+ return FALSE;
+ }
+read_error:
+ {
+ if (watch->funcs.error_full)
+ watch->funcs.error_full (watch, GST_RTSP_ESYS, &watch->message,
+ 0, watch->user_data);
+ else if (watch->funcs.error)
+ watch->funcs.error (watch, GST_RTSP_ESYS, watch->user_data);
+
+ goto eof;
+ }
}
static gboolean
-gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
- gpointer user_data G_GNUC_UNUSED)
+gst_rtsp_source_dispatch_read (GPollableInputStream * stream,
+ GstRTSPWatch * watch)
{
- GstRTSPWatch *watch = (GstRTSPWatch *) source;
GstRTSPResult res = GST_RTSP_ERROR;
- gboolean keep_running = TRUE;
+ GstRTSPConnection *conn = watch->conn;
- /* first read as much as we can */
- if (watch->readfd.revents & READ_COND || watch->conn->initial_buffer != NULL) {
- do {
- if (watch->readfd.revents & READ_ERR)
- goto read_error;
+ /* if this connection was already closed, stop now */
+ if (G_POLLABLE_INPUT_STREAM (conn->input_stream) != stream)
+ goto eof;
- res = build_next (&watch->builder, &watch->message, watch->conn);
- if (res == GST_RTSP_EINTR)
- break;
- else if (G_UNLIKELY (res == GST_RTSP_EEOF)) {
- watch->readfd.events = 0;
- watch->readfd.revents = 0;
- g_source_remove_poll ((GSource *) watch, &watch->readfd);
- /* When we are in tunnelled mode, the read socket can be closed and we
- * should be prepared for a new POST method to reopen it */
- if (watch->conn->tstate == TUNNEL_STATE_COMPLETE) {
- /* remove the read connection for the tunnel */
- /* we accept a new POST request */
- watch->conn->tstate = TUNNEL_STATE_GET;
- /* and signal that we lost our tunnel */
- if (watch->funcs.tunnel_lost)
- res = watch->funcs.tunnel_lost (watch, watch->user_data);
- goto read_done;
- } else
- goto eof;
- } else if (G_LIKELY (res == GST_RTSP_OK)) {
- if (!watch->conn->manual_http &&
- watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
- if (watch->conn->tstate == TUNNEL_STATE_NONE &&
- watch->message.type_data.request.method == GST_RTSP_GET) {
- GstRTSPMessage *response;
- GstRTSPStatusCode code;
-
- watch->conn->tstate = TUNNEL_STATE_GET;
-
- if (watch->funcs.tunnel_start)
- code = watch->funcs.tunnel_start (watch, watch->user_data);
- else
- code = GST_RTSP_STS_OK;
-
- /* queue the response */
- response = gen_tunnel_reply (watch->conn, code, &watch->message);
- gst_rtsp_watch_send_message (watch, response, NULL);
- gst_rtsp_message_free (response);
- goto read_done;
- } else if (watch->conn->tstate == TUNNEL_STATE_NONE &&
- watch->message.type_data.request.method == GST_RTSP_POST) {
- watch->conn->tstate = TUNNEL_STATE_POST;
-
- /* in the callback the connection should be tunneled with the
- * GET connection */
- if (watch->funcs.tunnel_complete)
- watch->funcs.tunnel_complete (watch, watch->user_data);
- goto read_done;
- }
- }
- }
+ res = build_next (&watch->builder, &watch->message, conn, FALSE);
+ if (res == GST_RTSP_EINTR)
+ goto done;
+ else if (G_UNLIKELY (res == GST_RTSP_EEOF)) {
+ if (watch->readsrc) {
+ g_source_remove_child_source ((GSource *) watch, watch->readsrc);
+ g_source_unref (watch->readsrc);
+ watch->readsrc = NULL;
+ }
- if (!watch->conn->manual_http) {
- /* if manual HTTP support is not enabled, then restore the message to
- * what it would have looked like without the support for parsing HTTP
- * messages being present */
- if (watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
- watch->message.type = GST_RTSP_MESSAGE_REQUEST;
- watch->message.type_data.request.method = GST_RTSP_INVALID;
- if (watch->message.type_data.request.version != GST_RTSP_VERSION_1_0)
- watch->message.type_data.request.version = GST_RTSP_VERSION_INVALID;
- res = GST_RTSP_EPARSE;
- } else if (watch->message.type == GST_RTSP_MESSAGE_HTTP_RESPONSE) {
- watch->message.type = GST_RTSP_MESSAGE_RESPONSE;
- if (watch->message.type_data.response.version != GST_RTSP_VERSION_1_0)
- watch->message.type_data.response.version =
- GST_RTSP_VERSION_INVALID;
- res = GST_RTSP_EPARSE;
- }
- }
+ if (conn->stream1) {
+ g_object_unref (conn->stream1);
+ conn->stream1 = NULL;
+ conn->socket1 = NULL;
+ conn->input_stream = NULL;
+ }
- if (G_LIKELY (res == GST_RTSP_OK)) {
- if (watch->funcs.message_received)
- watch->funcs.message_received (watch, &watch->message,
+ /* When we are in tunnelled mode, the read socket can be closed and we
+ * should be prepared for a new POST method to reopen it */
+ if (conn->tstate == TUNNEL_STATE_COMPLETE) {
+ /* remove the read connection for the tunnel */
+ /* we accept a new POST request */
+ conn->tstate = TUNNEL_STATE_GET;
+ /* and signal that we lost our tunnel */
+ if (watch->funcs.tunnel_lost)
+ res = watch->funcs.tunnel_lost (watch, watch->user_data);
+ goto read_done;
+ } else
+ goto eof;
+ } else if (G_LIKELY (res == GST_RTSP_OK)) {
+ if (!conn->manual_http &&
+ watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
+ if (conn->tstate == TUNNEL_STATE_NONE &&
+ watch->message.type_data.request.method == GST_RTSP_GET) {
+ GstRTSPMessage *response;
+ GstRTSPStatusCode code;
+
+ conn->tstate = TUNNEL_STATE_GET;
+
+ if (watch->funcs.tunnel_start)
+ code = watch->funcs.tunnel_start (watch, watch->user_data);
+ else
+ code = GST_RTSP_STS_OK;
+
+ /* queue the response */
+ response = gen_tunnel_reply (conn, code, &watch->message);
+ if (watch->funcs.tunnel_http_response)
+ watch->funcs.tunnel_http_response (watch, &watch->message, response,
watch->user_data);
- } else {
- goto read_error;
+ gst_rtsp_watch_send_message (watch, response, NULL);
+ gst_rtsp_message_free (response);
+ goto read_done;
+ } else if (conn->tstate == TUNNEL_STATE_NONE &&
+ watch->message.type_data.request.method == GST_RTSP_POST) {
+ conn->tstate = TUNNEL_STATE_POST;
+
+ /* in the callback the connection should be tunneled with the
+ * GET connection */
+ if (watch->funcs.tunnel_complete) {
+ watch->funcs.tunnel_complete (watch, watch->user_data);
+ }
+ goto read_done;
}
+ }
+ } else
+ goto read_error;
+
+ if (!conn->manual_http) {
+ /* if manual HTTP support is not enabled, then restore the message to
+ * what it would have looked like without the support for parsing HTTP
+ * messages being present */
+ if (watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
+ watch->message.type = GST_RTSP_MESSAGE_REQUEST;
+ watch->message.type_data.request.method = GST_RTSP_INVALID;
+ if (watch->message.type_data.request.version != GST_RTSP_VERSION_1_0)
+ watch->message.type_data.request.version = GST_RTSP_VERSION_INVALID;
+ res = GST_RTSP_EPARSE;
+ } else if (watch->message.type == GST_RTSP_MESSAGE_HTTP_RESPONSE) {
+ watch->message.type = GST_RTSP_MESSAGE_RESPONSE;
+ if (watch->message.type_data.response.version != GST_RTSP_VERSION_1_0)
+ watch->message.type_data.response.version = GST_RTSP_VERSION_INVALID;
+ res = GST_RTSP_EPARSE;
+ }
+ }
+ if (G_LIKELY (res != GST_RTSP_OK))
+ goto read_error;
+
+ if (watch->funcs.message_received)
+ watch->funcs.message_received (watch, &watch->message, watch->user_data);
+
+read_done:
+ gst_rtsp_message_unset (&watch->message);
+ build_reset (&watch->builder);
+
+done:
+ return TRUE;
+
+ /* ERRORS */
+eof:
+ {
+ if (watch->funcs.closed)
+ watch->funcs.closed (watch, watch->user_data);
- read_done:
- gst_rtsp_message_unset (&watch->message);
- build_reset (&watch->builder);
- } while (FALSE);
+ /* we closed the read connection, stop the watch now */
+ watch->keep_running = FALSE;
+
+ /* always stop when the input returns EOF in non-tunneled mode */
+ return FALSE;
}
+read_error:
+ {
+ if (watch->funcs.error_full)
+ watch->funcs.error_full (watch, res, &watch->message,
+ 0, watch->user_data);
+ else if (watch->funcs.error)
+ watch->funcs.error (watch, res, watch->user_data);
- if (watch->writefd.revents & WRITE_COND) {
- if (watch->writefd.revents & WRITE_ERR)
- goto write_error;
+ goto eof;
+ }
+}
- g_mutex_lock (watch->mutex);
- do {
- if (watch->write_data == NULL) {
- GstRTSPRec *rec;
+static gboolean
+gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
+ gpointer user_data G_GNUC_UNUSED)
+{
+ GstRTSPWatch *watch = (GstRTSPWatch *) source;
+ GstRTSPConnection *conn = watch->conn;
- /* get a new message from the queue */
- rec = g_queue_pop_tail (watch->messages);
- if (rec == NULL)
- break;
+ if (conn->initial_buffer != NULL) {
+ gst_rtsp_source_dispatch_read (G_POLLABLE_INPUT_STREAM (conn->input_stream),
+ watch);
+ }
+ return watch->keep_running;
+}
- watch->write_off = 0;
- watch->write_data = rec->data;
- watch->write_size = rec->size;
- watch->write_id = rec->id;
+static gboolean
+gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
+ GstRTSPWatch * watch)
+{
+ GstRTSPResult res = GST_RTSP_ERROR;
+ GstRTSPConnection *conn = watch->conn;
+
+ /* if this connection was already closed, stop now */
+ if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream)
+ goto eof;
- g_slice_free (GstRTSPRec, rec);
+ g_mutex_lock (&watch->mutex);
+ do {
+ if (watch->write_data == NULL) {
+ GstRTSPRec *rec;
+
+ /* get a new message from the queue */
+ rec = g_queue_pop_tail (watch->messages);
+ if (rec == NULL) {
+ if (watch->writesrc) {
+ g_source_remove_child_source ((GSource *) watch, watch->writesrc);
+ g_source_unref (watch->writesrc);
+ watch->writesrc = NULL;
+ /* we create and add the write source again when we actually have
+ * something to write */
+
+ /* since write source is now removed we add read source on the write
+ * socket instead to be able to detect when client closes get channel
+ * in tunneled mode */
+ if (watch->conn->control_stream) {
+ watch->controlsrc =
+ g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
+ (watch->conn->control_stream), NULL);
+ g_source_set_callback (watch->controlsrc,
+ (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch,
+ NULL);
+ g_source_add_child_source ((GSource *) watch, watch->controlsrc);
+ } else {
+ watch->controlsrc = NULL;
+ }
+ }
+ break;
}
- res = write_bytes (watch->conn->write_socket, watch->write_data,
- &watch->write_off, watch->write_size, watch->conn->cancellable);
- g_mutex_unlock (watch->mutex);
+ watch->messages_bytes -= rec->size;
- if (res == GST_RTSP_EINTR)
- goto write_blocked;
- else if (G_LIKELY (res == GST_RTSP_OK)) {
- if (watch->funcs.message_sent)
- watch->funcs.message_sent (watch, watch->write_id, watch->user_data);
- } else {
- goto write_error;
- }
- g_mutex_lock (watch->mutex);
+ watch->write_off = 0;
+ watch->write_data = rec->data;
+ watch->write_size = rec->size;
+ watch->write_id = rec->id;
- g_free (watch->write_data);
- watch->write_data = NULL;
- } while (TRUE);
+ g_slice_free (GstRTSPRec, rec);
+ }
- watch->writefd.events = WRITE_ERR;
+ res = write_bytes (conn->output_stream, watch->write_data,
+ &watch->write_off, watch->write_size, FALSE, conn->cancellable);
+ g_mutex_unlock (&watch->mutex);
- g_mutex_unlock (watch->mutex);
- }
+ if (res == GST_RTSP_EINTR)
+ goto write_blocked;
+ else if (G_LIKELY (res == GST_RTSP_OK)) {
+ if (watch->funcs.message_sent)
+ watch->funcs.message_sent (watch, watch->write_id, watch->user_data);
+ } else {
+ goto write_error;
+ }
+ g_mutex_lock (&watch->mutex);
+
+ g_free (watch->write_data);
+ watch->write_data = NULL;
+ } while (TRUE);
+ g_mutex_unlock (&watch->mutex);
write_blocked:
- return keep_running;
+ return TRUE;
/* ERRORS */
eof:
{
- if (watch->funcs.closed)
- watch->funcs.closed (watch, watch->user_data);
-
- /* always stop when the readfd returns EOF in non-tunneled mode */
return FALSE;
}
-read_error:
- {
- watch->readfd.events = 0;
- watch->readfd.revents = 0;
- g_source_remove_poll ((GSource *) watch, &watch->readfd);
- keep_running = (watch->writefd.events != 0);
-
- if (keep_running) {
- if (watch->funcs.error_full)
- GST_RTSP_CHECK (watch->funcs.error_full (watch, res, &watch->message,
- 0, watch->user_data), error);
- else
- goto error;
- } else
- goto eof;
- }
write_error:
{
- watch->writefd.events = 0;
- watch->writefd.revents = 0;
- g_source_remove_poll ((GSource *) watch, &watch->writefd);
- keep_running = (watch->readfd.events != 0);
-
- if (keep_running) {
- if (watch->funcs.error_full)
- GST_RTSP_CHECK (watch->funcs.error_full (watch, res, NULL,
- watch->write_id, watch->user_data), error);
- else
- goto error;
- } else
- goto eof;
- }
-error:
- {
- if (watch->funcs.error)
+ if (watch->funcs.error_full)
+ watch->funcs.error_full (watch, res, NULL,
+ watch->write_id, watch->user_data);
+ else if (watch->funcs.error)
watch->funcs.error (watch, res, watch->user_data);
- return keep_running;
+ return FALSE;
}
}
g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL);
g_queue_free (watch->messages);
watch->messages = NULL;
+ watch->messages_bytes = 0;
g_free (watch->write_data);
- g_mutex_free (watch->mutex);
+ if (watch->readsrc)
+ g_source_unref (watch->readsrc);
+ if (watch->writesrc)
+ g_source_unref (watch->writesrc);
+ if (watch->controlsrc)
+ g_source_unref (watch->controlsrc);
+
+ g_mutex_clear (&watch->mutex);
if (watch->notify)
watch->notify (watch->user_data);
* called with @user_data when activity happened on the watch.
*
* The new watch is usually created so that it can be attached to a
- * maincontext with gst_rtsp_watch_attach().
+ * maincontext with gst_rtsp_watch_attach().
*
* @conn must exist for the entire lifetime of the watch.
*
* Returns: a #GstRTSPWatch that can be used for asynchronous RTSP
* communication. Free with gst_rtsp_watch_unref () after usage.
- *
- * Since: 0.10.23
*/
GstRTSPWatch *
gst_rtsp_watch_new (GstRTSPConnection * conn,
result->conn = conn;
result->builder.state = STATE_START;
- result->mutex = g_mutex_new ();
+ g_mutex_init (&result->mutex);
result->messages = g_queue_new ();
- result->readfd.fd = -1;
- result->writefd.fd = -1;
-
gst_rtsp_watch_reset (result);
+ result->keep_running = TRUE;
result->funcs = *funcs;
result->user_data = user_data;
*
* Reset @watch, this is usually called after gst_rtsp_connection_do_tunnel()
* when the file descriptors of the connection might have changed.
- *
- * Since: 0.10.23
*/
void
gst_rtsp_watch_reset (GstRTSPWatch * watch)
{
- if (watch->readfd.fd != -1)
- g_source_remove_poll ((GSource *) watch, &watch->readfd);
- if (watch->writefd.fd != -1)
- g_source_remove_poll ((GSource *) watch, &watch->writefd);
-
- watch->readfd.fd = g_socket_get_fd (watch->conn->read_socket);
- watch->readfd.events = READ_COND;
- watch->readfd.revents = 0;
-
- watch->writefd.fd = g_socket_get_fd (watch->conn->write_socket);
- watch->writefd.events = WRITE_ERR;
- watch->writefd.revents = 0;
+ if (watch->readsrc) {
+ g_source_remove_child_source ((GSource *) watch, watch->readsrc);
+ g_source_unref (watch->readsrc);
+ }
+ if (watch->writesrc) {
+ g_source_remove_child_source ((GSource *) watch, watch->writesrc);
+ g_source_unref (watch->writesrc);
+ watch->writesrc = NULL;
+ }
+ if (watch->controlsrc) {
+ g_source_remove_child_source ((GSource *) watch, watch->controlsrc);
+ g_source_unref (watch->controlsrc);
+ watch->controlsrc = NULL;
+ }
- if (watch->readfd.fd != -1)
- g_source_add_poll ((GSource *) watch, &watch->readfd);
- if (watch->writefd.fd != -1)
- g_source_add_poll ((GSource *) watch, &watch->writefd);
+ if (watch->conn->input_stream) {
+ watch->readsrc =
+ g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
+ (watch->conn->input_stream), NULL);
+ g_source_set_callback (watch->readsrc,
+ (GSourceFunc) gst_rtsp_source_dispatch_read, watch, NULL);
+ g_source_add_child_source ((GSource *) watch, watch->readsrc);
+ } else {
+ watch->readsrc = NULL;
+ }
+
+ /* we create and add the write source when we actually have something to
+ * write */
+
+ /* when write source is not added we add read source on the write socket
+ * instead to be able to detect when client closes get channel in tunneled
+ * mode */
+ if (watch->conn->control_stream) {
+ watch->controlsrc =
+ g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
+ (watch->conn->control_stream), NULL);
+ g_source_set_callback (watch->controlsrc,
+ (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch, NULL);
+ g_source_add_child_source ((GSource *) watch, watch->controlsrc);
+ } else {
+ watch->controlsrc = NULL;
+ }
}
/**
*
* Adds a #GstRTSPWatch to a context so that it will be executed within that context.
*
- * Returns: the ID (greater than 0) for the watch within the GMainContext.
- *
- * Since: 0.10.23
+ * Returns: the ID (greater than 0) for the watch within the GMainContext.
*/
guint
gst_rtsp_watch_attach (GstRTSPWatch * watch, GMainContext * context)
*
* Decreases the reference count of @watch by one. If the resulting reference
* count is zero the watch and associated memory will be destroyed.
- *
- * Since: 0.10.23
*/
void
gst_rtsp_watch_unref (GstRTSPWatch * watch)
}
/**
+ * gst_rtsp_watch_set_send_backlog:
+ * @watch: a #GstRTSPWatch
+ * @bytes: maximum bytes
+ * @messages: maximum messages
+ *
+ * Set the maximum amount of bytes and messages that will be queued in @watch.
+ * When the maximum amounts are exceeded, gst_rtsp_watch_write_data() and
+ * gst_rtsp_watch_send_message() will return #GST_RTSP_ENOMEM.
+ *
+ * A value of 0 for @bytes or @messages means no limits.
+ *
+ * Since: 1.2
+ */
+void
+gst_rtsp_watch_set_send_backlog (GstRTSPWatch * watch,
+ gsize bytes, guint messages)
+{
+ g_return_if_fail (watch != NULL);
+
+ g_mutex_lock (&watch->mutex);
+ watch->max_bytes = bytes;
+ watch->max_messages = messages;
+ g_mutex_unlock (&watch->mutex);
+
+ GST_DEBUG ("set backlog to bytes %" G_GSIZE_FORMAT ", messages %u",
+ bytes, messages);
+}
+
+/**
+ * gst_rtsp_watch_get_send_backlog:
+ * @watch: a #GstRTSPWatch
+ * @bytes: (out) (allow-none): maximum bytes
+ * @messages: (out) (allow-none): maximum messages
+ *
+ * Get the maximum amount of bytes and messages that will be queued in @watch.
+ * See gst_rtsp_watch_set_send_backlog().
+ *
+ * Since: 1.2
+ */
+void
+gst_rtsp_watch_get_send_backlog (GstRTSPWatch * watch,
+ gsize * bytes, guint * messages)
+{
+ g_return_if_fail (watch != NULL);
+
+ g_mutex_lock (&watch->mutex);
+ if (bytes)
+ *bytes = watch->max_bytes;
+ if (messages)
+ *messages = watch->max_messages;
+ g_mutex_unlock (&watch->mutex);
+}
+
+/**
* gst_rtsp_watch_write_data:
* @watch: a #GstRTSPWatch
- * @data: the data to queue
+ * @data: (array length=size) (transfer full): the data to queue
* @size: the size of @data
- * @id: location for a message ID or %NULL
+ * @id: (out) (allow-none): location for a message ID or %NULL
*
* Write @data using the connection of the @watch. If it cannot be sent
* immediately, it will be queued for transmission in @watch. The contents of
*
* This function will take ownership of @data and g_free() it after use.
*
- * Returns: #GST_RTSP_OK on success.
+ * If the amount of queued data exceeds the limits set with
+ * gst_rtsp_watch_set_send_backlog(), this function will return
+ * #GST_RTSP_ENOMEM.
*
- * Since: 0.10.25
+ * Returns: #GST_RTSP_OK on success. #GST_RTSP_ENOMEM when the backlog limits
+ * are reached.
*/
GstRTSPResult
gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (size != 0, GST_RTSP_EINVAL);
- g_mutex_lock (watch->mutex);
+ g_mutex_lock (&watch->mutex);
/* try to send the message synchronously first */
if (watch->messages->length == 0 && watch->write_data == NULL) {
res =
- write_bytes (watch->conn->write_socket, data, &off, size,
- watch->conn->cancellable);
+ write_bytes (watch->conn->output_stream, data, &off, size,
+ FALSE, watch->conn->cancellable);
if (res != GST_RTSP_EINTR) {
if (id != NULL)
*id = 0;
}
}
+ /* check limits */
+ if ((watch->max_bytes != 0 && watch->messages_bytes >= watch->max_bytes) ||
+ (watch->max_messages != 0
+ && watch->messages->length >= watch->max_messages))
+ goto too_much_backlog;
+
/* make a record with the data and id for sending async */
rec = g_slice_new (GstRTSPRec);
if (off == 0) {
rec->id = ++watch->id;
} while (G_UNLIKELY (rec->id == 0));
- /* add the record to a queue. FIXME we would like to have an upper limit here */
+ /* add the record to a queue. */
g_queue_push_head (watch->messages, rec);
+ watch->messages_bytes += rec->size;
/* make sure the main context will now also check for writability on the
* socket */
- if (watch->writefd.events != WRITE_COND) {
- watch->writefd.events = WRITE_COND;
- context = ((GSource *) watch)->context;
+ context = ((GSource *) watch)->context;
+ if (!watch->writesrc) {
+ /* remove the read source on the write socket, we will be able to detect
+ * errors while writing */
+ if (watch->controlsrc) {
+ g_source_remove_child_source ((GSource *) watch, watch->controlsrc);
+ g_source_unref (watch->controlsrc);
+ watch->controlsrc = NULL;
+ }
+
+ watch->writesrc =
+ g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM
+ (watch->conn->output_stream), NULL);
+ g_source_set_callback (watch->writesrc,
+ (GSourceFunc) gst_rtsp_source_dispatch_write, watch, NULL);
+ g_source_add_child_source ((GSource *) watch, watch->writesrc);
}
if (id != NULL)
res = GST_RTSP_OK;
done:
- g_mutex_unlock (watch->mutex);
+ g_mutex_unlock (&watch->mutex);
if (context)
g_main_context_wakeup (context);
return res;
+
+ /* ERRORS */
+too_much_backlog:
+ {
+ GST_WARNING ("too much backlog: max_bytes %" G_GSIZE_FORMAT ", current %"
+ G_GSIZE_FORMAT ", max_messages %u, current %u", watch->max_bytes,
+ watch->messages_bytes, watch->max_messages, watch->messages->length);
+ g_mutex_unlock (&watch->mutex);
+ g_free ((gpointer) data);
+ return GST_RTSP_ENOMEM;
+ }
}
/**
* gst_rtsp_watch_send_message:
* @watch: a #GstRTSPWatch
* @message: a #GstRTSPMessage
- * @id: location for a message ID or %NULL
+ * @id: (out) (allow-none): location for a message ID or %NULL
*
* Send a @message using the connection of the @watch. If it cannot be sent
* immediately, it will be queued for transmission in @watch. The contents of
* callback.
*
* Returns: #GST_RTSP_OK on success.
- *
- * Since: 0.10.25
*/
GstRTSPResult
gst_rtsp_watch_send_message (GstRTSPWatch * watch, GstRTSPMessage * message,