GMutex socket_use_mutex;
gboolean manual_http;
gboolean may_cancel;
- GCancellable *cancellable;
+ GMutex cancellable_mutex;
+ GCancellable *cancellable; /* protected by cancellable_mutex */
gchar tunnelid[TUNNELID_LEN];
gboolean tunneled;
}
}
+/* transfer full */
+static GCancellable *
+get_cancellable (GstRTSPConnection * conn)
+{
+ GCancellable *cancellable = NULL;
+
+ g_mutex_lock (&conn->cancellable_mutex);
+ if (conn->cancellable)
+ cancellable = g_object_ref (conn->cancellable);
+ g_mutex_unlock (&conn->cancellable_mutex);
+
+ return cancellable;
+}
+
/**
* gst_rtsp_connection_create:
* @url: a #GstRTSPUrl
newconn->may_cancel = TRUE;
newconn->cancellable = g_cancellable_new ();
+ g_mutex_init (&newconn->cancellable_mutex);
newconn->client = g_socket_client_new ();
if (url->transports & GST_RTSP_LOWER_TRANS_TLS)
gchar *connection_uri = NULL;
gchar *request_uri = NULL;
gchar *host = NULL;
+ GCancellable *cancellable;
url = conn->url;
connection_uri = get_tunneled_connection_uri_strdup (url, url_port);
+ cancellable = get_cancellable (conn);
+
/* connect to the host/port */
if (conn->proxy_host) {
connection = g_socket_client_connect_to_host (conn->client,
- conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
+ conn->proxy_host, conn->proxy_port, cancellable, &error);
request_uri = g_strdup (connection_uri);
} else {
connection = g_socket_client_connect_to_uri (conn->client,
- connection_uri, 0, conn->cancellable, &error);
+ connection_uri, 0, cancellable, &error);
request_uri =
g_strdup_printf ("%s%s%s", url->abspath,
url->query ? "?" : "", url->query ? url->query : "");
}
+
+ g_clear_object (&cancellable);
+
if (connection == NULL)
goto connect_failed;
GstClockTime to;
guint16 url_port;
GstRTSPUrl *url;
+ GCancellable *cancellable;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (conn->url != NULL, GST_RTSP_EINVAL);
connection_uri = gst_rtsp_url_get_request_uri (url);
}
+ cancellable = get_cancellable (conn);
+
if (conn->proxy_host) {
connection = g_socket_client_connect_to_host (conn->client,
- conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
+ conn->proxy_host, conn->proxy_port, cancellable, &error);
request_uri = g_strdup (connection_uri);
} else {
connection = g_socket_client_connect_to_uri (conn->client,
- connection_uri, url_port, conn->cancellable, &error);
+ connection_uri, url_port, cancellable, &error);
/* use the relative component of the uri for non-proxy connections */
request_uri = g_strdup_printf ("%s%s%s", url->abspath,
url->query ? "?" : "", url->query ? url->query : "");
}
+
+ g_clear_object (&cancellable);
+
if (connection == NULL)
goto connect_failed;
/* ERRORS */
error:
{
+ g_object_unref (cancellable);
+
if (G_UNLIKELY (r == 0))
return GST_RTSP_EEOF;
if (G_LIKELY (size > (guint) out)) {
gssize r;
gsize count = size - out;
+ GCancellable *cancellable;
+
+ cancellable = conn->may_cancel ? get_cancellable (conn) : NULL;
+
if (block)
r = g_input_stream_read (conn->input_stream, (gchar *) & buffer[out],
- count, conn->may_cancel ? conn->cancellable : NULL, err);
+ count, cancellable, err);
else
r = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM
(conn->input_stream), (gchar *) & buffer[out], count,
- conn->may_cancel ? conn->cancellable : NULL, err);
+ cancellable, err);
+
+ g_clear_object (&cancellable);
if (G_UNLIKELY (r < 0)) {
if (out == 0) {
{
guint offset;
GstRTSPResult res;
+ GCancellable *cancellable;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (data != NULL || size == 0, GST_RTSP_EINVAL);
set_write_socket_timeout (conn, timeout);
+ cancellable = get_cancellable (conn);
res =
- write_bytes (conn->output_stream, data, &offset, size, TRUE,
- conn->cancellable);
+ write_bytes (conn->output_stream, data, &offset, size, TRUE, cancellable);
+ g_clear_object (&cancellable);
clear_write_socket_timeout (conn);
guint n_vectors, n_memories;
gint i, j, k;
gsize bytes_to_write, bytes_written;
+ GCancellable *cancellable;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
/* write request: this is synchronous */
set_write_socket_timeout (conn, timeout);
+ cancellable = get_cancellable (conn);
res =
writev_bytes (conn->output_stream, vectors, n_vectors, &bytes_written,
- TRUE, conn->cancellable);
+ TRUE, cancellable);
+ g_clear_object (&cancellable);
clear_write_socket_timeout (conn);
res = gst_rtsp_connection_close (conn);
- if (conn->cancellable)
- g_object_unref (conn->cancellable);
+ g_mutex_lock (&conn->cancellable_mutex);
+ g_clear_object (&conn->cancellable);
+ g_mutex_unlock (&conn->cancellable_mutex);
+ g_mutex_clear (&conn->cancellable_mutex);
if (conn->client)
g_object_unref (conn->client);
if (conn->tls_database)
GMainContext *ctx;
GSource *rs, *ws, *ts;
GIOCondition condition;
+ GCancellable *cancellable;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (events != 0, GST_RTSP_EINVAL);
g_source_unref (ts);
}
+ cancellable = get_cancellable (conn);
if (events & GST_RTSP_EV_READ) {
rs = g_socket_create_source (conn->read_socket, G_IO_IN | G_IO_PRI,
- conn->cancellable);
+ cancellable);
g_source_set_dummy_callback (rs);
g_source_attach (rs, ctx);
g_source_unref (rs);
}
if (events & GST_RTSP_EV_WRITE) {
- ws = g_socket_create_source (conn->write_socket, G_IO_OUT,
- conn->cancellable);
+ ws = g_socket_create_source (conn->write_socket, G_IO_OUT, cancellable);
g_source_set_dummy_callback (ws);
g_source_attach (ws, ctx);
g_source_unref (ws);
}
+ g_clear_object (&cancellable);
/* Returns after handling all pending events */
while (!g_main_context_iteration (ctx, TRUE));
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
if (flush) {
- g_cancellable_cancel (conn->cancellable);
+ GCancellable *cancellable = get_cancellable (conn);
+ g_cancellable_cancel (cancellable);
+ g_clear_object (&cancellable);
} else {
+ g_mutex_lock (&conn->cancellable_mutex);
g_object_unref (conn->cancellable);
conn->cancellable = g_cancellable_new ();
+ g_mutex_unlock (&conn->cancellable_mutex);
}
return GST_RTSP_OK;
}
/* clean up some of the state of conn2 */
+ g_mutex_lock (&conn2->cancellable_mutex);
g_cancellable_cancel (conn2->cancellable);
conn2->write_socket = conn2->read_socket = NULL;
conn2->socket0 = NULL;
conn2->control_stream = NULL;
g_object_unref (conn2->cancellable);
conn2->cancellable = NULL;
+ g_mutex_unlock (&conn2->cancellable_mutex);
/* We make socket0 the write socket and socket1 the read socket. */
conn->write_socket = conn->socket0;
guint n_vectors, n_memories, n_ids, drop_messages;
gint i, j, l, n_mmap;
GstRTSPSerializedMessage *msg;
+ GCancellable *cancellable;
/* if this connection was already closed, stop now */
if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
}
}
+ cancellable = get_cancellable (watch->conn);
+
res =
writev_bytes (watch->conn->output_stream, vectors, n_vectors,
- &bytes_written, FALSE, watch->conn->cancellable);
+ &bytes_written, FALSE, cancellable);
g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
+ g_clear_object (&cancellable);
+
/* First unmap all memories here, this simplifies the code below
* as we don't have to skip all memories that were already written
* before */
{
GstRTSPResult res;
GMainContext *context = NULL;
+ GCancellable *cancellable;
gint i;
g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
}
}
+ cancellable = get_cancellable (watch->conn);
+
res =
writev_bytes (watch->conn->output_stream, vectors, n_vectors,
- &bytes_written, FALSE, watch->conn->cancellable);
+ &bytes_written, FALSE, cancellable);
g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
+ g_clear_object (&cancellable);
+
/* At this point we sent everything we could without blocking or
* error and updated the offsets inside the message accordingly */