rtspconnection: GstRTSPWatch func for tunnel GET response
[platform/upstream/gstreamer.git] / gst-libs / gst / rtsp / gstrtspconnection.c
index f8c6f75..f988c0d 100644 (file)
  * SECTION:gstrtspconnection
  * @short_description: manage RTSP connections
  * @see_also: gstrtspurl
- *  
+ *
  * This object manages the RTSP connection to the server. It provides function
  * to receive and send bytes and messages.
- *  
+ *
  * Last reviewed on 2007-07-24 (0.10.14)
  */
 
 #include <glib.h>
 #include <gst/gst.h>
 
+/* necessary for IP_TOS define */
+#if GLIB_CHECK_VERSION(2, 36, 0)
+#include <gio/gnetworking.h>
+#endif
+
 #include "gstrtspconnection.h"
 
 #ifdef IP_TOS
@@ -108,18 +113,23 @@ struct _GstRTSPConnection
   /* URL for the remote connection */
   GstRTSPUrl *url;
 
+  gboolean server;
   GSocketClient *client;
   GIOStream *stream0;
   GIOStream *stream1;
 
   GInputStream *input_stream;
   GOutputStream *output_stream;
+  /* this is a read source we add on the write socket in tunneled mode to be
+   * able to detect when client disconnects the GET channel */
+  GInputStream *control_stream;
 
   /* connection state */
   GSocket *read_socket;
   GSocket *write_socket;
   GSocket *socket0, *socket1;
   gboolean manual_http;
+  gboolean may_cancel;
   GCancellable *cancellable;
 
   gchar tunnelid[TUNNELID_LEN];
@@ -149,6 +159,9 @@ struct _GstRTSPConnection
   gchar *passwd;
   GHashTable *auth_params;
 
+  /* TLS */
+  GTlsDatabase *tls_database;
+
   DecodeCtx ctx;
   DecodeCtx *ctxp;
 
@@ -193,9 +206,69 @@ build_reset (GstRTSPBuilder * builder)
   memset (builder, 0, sizeof (GstRTSPBuilder));
 }
 
+static gboolean
+tls_accept_certificate (GTlsConnection * conn, GTlsCertificate * peer_cert,
+    GTlsCertificateFlags errors, GstRTSPConnection * rtspconn)
+{
+  GError *error = NULL;
+  gboolean accept = FALSE;
+
+  if (rtspconn->tls_database) {
+    GSocketConnectable *peer_identity;
+    GTlsCertificateFlags validation_flags;
+
+    GST_DEBUG ("TLS peer certificate not accepted, checking user database...");
+
+    peer_identity =
+        g_tls_client_connection_get_server_identity (G_TLS_CLIENT_CONNECTION
+        (conn));
+
+    errors =
+        g_tls_database_verify_chain (rtspconn->tls_database, peer_cert,
+        G_TLS_DATABASE_PURPOSE_AUTHENTICATE_SERVER, peer_identity,
+        g_tls_connection_get_interaction (conn), G_TLS_DATABASE_VERIFY_NONE,
+        NULL, &error);
+
+    if (error)
+      goto verify_error;
+
+    validation_flags = gst_rtsp_connection_get_tls_validation_flags (rtspconn);
+
+    accept = ((errors & validation_flags) == 0);
+    if (accept)
+      GST_DEBUG ("Peer certificate accepted");
+    else
+      GST_DEBUG ("Peer certificate not accepted (errors: 0x%08X)", errors);
+  }
+
+  return accept;
+
+/* ERRORS */
+verify_error:
+  {
+    GST_ERROR ("An error occurred while verifying the peer certificate: %s",
+        error->message);
+    g_clear_error (&error);
+    return FALSE;
+  }
+}
+
+static void
+socket_client_event (GSocketClient * client, GSocketClientEvent event,
+    GSocketConnectable * connectable, GIOStream * connection,
+    GstRTSPConnection * rtspconn)
+{
+  if (event == G_SOCKET_CLIENT_TLS_HANDSHAKING) {
+    GST_DEBUG ("TLS handshaking about to start...");
+
+    g_signal_connect (connection, "accept-certificate",
+        (GCallback) tls_accept_certificate, rtspconn);
+  }
+}
+
 /**
  * gst_rtsp_connection_create:
- * @url: a #GstRTSPUrl 
+ * @url: a #GstRTSPUrl
  * @conn: (out) (transfer full): storage for a #GstRTSPConnection
  *
  * Create a newly allocated #GstRTSPConnection from @url and store it in @conn.
@@ -216,12 +289,16 @@ gst_rtsp_connection_create (const GstRTSPUrl * url, GstRTSPConnection ** conn)
 
   newconn = g_new0 (GstRTSPConnection, 1);
 
+  newconn->may_cancel = TRUE;
   newconn->cancellable = g_cancellable_new ();
   newconn->client = g_socket_client_new ();
 
   if (url->transports & GST_RTSP_LOWER_TRANS_TLS)
     g_socket_client_set_tls (newconn->client, TRUE);
 
+  g_signal_connect (newconn->client, "event", (GCallback) socket_client_event,
+      newconn);
+
   newconn->url = gst_rtsp_url_copy (url);
   newconn->timer = g_timer_new ();
   newconn->timeout = 60;
@@ -308,11 +385,13 @@ gst_rtsp_connection_create_from_socket (GSocket * socket, const gchar * ip,
   stream = G_IO_STREAM (g_socket_connection_factory_create_connection (socket));
 
   /* both read and write initially */
+  newconn->server = TRUE;
   newconn->socket0 = socket;
   newconn->stream0 = stream;
   newconn->write_socket = newconn->read_socket = newconn->socket0;
   newconn->input_stream = g_io_stream_get_input_stream (stream);
   newconn->output_stream = g_io_stream_get_output_stream (stream);
+  newconn->control_stream = NULL;
   newconn->remote_ip = g_strdup (ip);
   newconn->local_ip = local_ip;
   newconn->initial_buffer = g_strdup (initial_buffer);
@@ -397,6 +476,154 @@ getnameinfo_failed:
   }
 }
 
+/**
+ * gst_rtsp_connection_get_tls:
+ * @conn: a #GstRTSPConnection
+ * @error: #GError for error reporting, or NULL to ignore.
+ *
+ * Get the TLS connection of @conn.
+ *
+ * For client side this will return the #GTlsClientConnection when connected
+ * over TLS.
+ *
+ * For server side connections, this function will create a GTlsServerConnection
+ * when called the first time and will return that same connection on subsequent
+ * calls. The server is then responsible for configuring the TLS connection.
+ *
+ * Returns: (transfer none): the TLS connection for @conn.
+ *
+ * Since: 1.2
+ */
+GTlsConnection *
+gst_rtsp_connection_get_tls (GstRTSPConnection * conn, GError ** error)
+{
+  GTlsConnection *result;
+
+  if (G_IS_TLS_CONNECTION (conn->stream0)) {
+    /* we already had one, return it */
+    result = G_TLS_CONNECTION (conn->stream0);
+  } else if (conn->server) {
+    /* no TLS connection but we are server, make one */
+    result = (GTlsConnection *)
+        g_tls_server_connection_new (conn->stream0, NULL, error);
+    if (result) {
+      g_object_unref (conn->stream0);
+      conn->stream0 = G_IO_STREAM (result);
+      conn->input_stream = g_io_stream_get_input_stream (conn->stream0);
+      conn->output_stream = g_io_stream_get_output_stream (conn->stream0);
+    }
+  } else {
+    /* client */
+    result = NULL;
+    g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_FAILED,
+        "client not connected with TLS");
+  }
+  return result;
+}
+
+/**
+ * gst_rtsp_connection_set_tls_validation_flags:
+ * @conn: a #GstRTSPConnection
+ * @flags: the validation flags.
+ *
+ * Sets the TLS validation flags to be used to verify the peer
+ * certificate when a TLS connection is established.
+ *
+ * Returns: TRUE if the validation flags are set correctly, or FALSE if
+ * @conn is NULL or is not a TLS connection.
+ *
+ * Since: 1.2.1
+ */
+gboolean
+gst_rtsp_connection_set_tls_validation_flags (GstRTSPConnection * conn,
+    GTlsCertificateFlags flags)
+{
+  gboolean res = FALSE;
+
+  g_return_val_if_fail (conn != NULL, FALSE);
+
+  res = g_socket_client_get_tls (conn->client);
+  if (res)
+    g_socket_client_set_tls_validation_flags (conn->client, flags);
+
+  return res;
+}
+
+/**
+ * gst_rtsp_connection_get_tls_validation_flags:
+ * @conn: a #GstRTSPConnection
+ *
+ * Gets the TLS validation flags used to verify the peer certificate
+ * when a TLS connection is established.
+ *
+ * Returns: the validationg flags.
+ *
+ * Since: 1.2.1
+ */
+GTlsCertificateFlags
+gst_rtsp_connection_get_tls_validation_flags (GstRTSPConnection * conn)
+{
+  g_return_val_if_fail (conn != NULL, 0);
+
+  return g_socket_client_get_tls_validation_flags (conn->client);
+}
+
+/**
+ * gst_rtsp_connection_set_tls_database:
+ * @conn: a #GstRTSPConnection
+ * @database: a #GTlsDatabase
+ *
+ * Sets the anchor certificate authorities database. This certificate
+ * database will be used to verify the server's certificate in case it
+ * can't be verified with the default certificate database first.
+ *
+ * Since: 1.4
+ */
+void
+gst_rtsp_connection_set_tls_database (GstRTSPConnection * conn,
+    GTlsDatabase * database)
+{
+  GTlsDatabase *old_db;
+
+  g_return_if_fail (conn != NULL);
+
+  if (database)
+    g_object_ref (database);
+
+  old_db = conn->tls_database;
+  conn->tls_database = database;
+
+  if (old_db)
+    g_object_unref (old_db);
+}
+
+/**
+ * gst_rtsp_connection_get_tls_database:
+ * @conn: a #GstRTSPConnection
+ *
+ * Gets the anchor certificate authorities database that will be used
+ * after a server certificate can't be verified with the default
+ * certificate database.
+ *
+ * Returns: (transfer full): the anchor certificate authorities database, or NULL if no
+ * database has been previously set. Use g_object_unref() to release the
+ * certificate database.
+ *
+ * Since: 1.4
+ */
+GTlsDatabase *
+gst_rtsp_connection_get_tls_database (GstRTSPConnection * conn)
+{
+  GTlsDatabase *result;
+
+  g_return_val_if_fail (conn != NULL, NULL);
+
+  if ((result = conn->tls_database))
+    g_object_ref (result);
+
+  return result;
+}
+
 static GstRTSPResult
 setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout, gchar * uri)
 {
@@ -468,8 +695,13 @@ setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout, gchar * uri)
       url->abspath, url->query ? "?" : "", url->query ? url->query : "");
 
   /* connect to the host/port */
-  connection = g_socket_client_connect_to_uri (conn->client,
-      uri, 0, conn->cancellable, &error);
+  if (conn->proxy_host) {
+    connection = g_socket_client_connect_to_host (conn->client,
+        conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
+  } else {
+    connection = g_socket_client_connect_to_uri (conn->client,
+        uri, 0, conn->cancellable, &error);
+  }
   if (connection == NULL)
     goto connect_failed;
 
@@ -487,6 +719,7 @@ setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout, gchar * uri)
   conn->socket1 = socket;
   conn->write_socket = conn->socket1;
   conn->output_stream = g_io_stream_get_output_stream (conn->stream1);
+  conn->control_stream = NULL;
 
   /* create the POST request for the write connection */
   GST_RTSP_CHECK (gst_rtsp_message_new_request (&msg, GST_RTSP_POST, uri),
@@ -560,7 +793,7 @@ remote_address_failed:
 
 /**
  * gst_rtsp_connection_connect:
- * @conn: a #GstRTSPConnection 
+ * @conn: a #GstRTSPConnection
  * @timeout: a #GTimeVal timeout
  *
  * Attempt to connect to the url of @conn made with
@@ -600,10 +833,16 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
     uri = g_strdup_printf ("http://%s:%d%s%s%s", url->host, url_port,
         url->abspath, url->query ? "?" : "", url->query ? url->query : "");
   } else {
-    uri = gst_rtsp_url_get_request_uri (conn->url);
+    uri = gst_rtsp_url_get_request_uri (url);
+  }
+
+  if (conn->proxy_host) {
+    connection = g_socket_client_connect_to_host (conn->client,
+        conn->proxy_host, conn->proxy_port, conn->cancellable, &error);
+  } else {
+    connection = g_socket_client_connect_to_uri (conn->client,
+        uri, url_port, conn->cancellable, &error);
   }
-  connection = g_socket_client_connect_to_uri (conn->client,
-      uri, 0, conn->cancellable, &error);
   if (connection == NULL)
     goto connect_failed;
 
@@ -622,6 +861,7 @@ gst_rtsp_connection_connect (GstRTSPConnection * conn, GTimeVal * timeout)
   conn->write_socket = conn->socket0;
   conn->input_stream = g_io_stream_get_input_stream (conn->stream0);
   conn->output_stream = g_io_stream_get_output_stream (conn->stream0);
+  conn->control_stream = NULL;
 
   if (conn->tunneled) {
     res = setup_tunneling (conn, timeout, uri);
@@ -816,8 +1056,12 @@ write_bytes (GOutputStream * stream, const guint8 * buffer, guint * idx,
   left = size - *idx;
 
   while (left) {
-    r = g_pollable_stream_write (stream, (gchar *) & buffer[*idx], left,
-        block, cancellable, &err);
+    if (block)
+      r = g_output_stream_write (stream, (gchar *) & buffer[*idx], left,
+          cancellable, &err);
+    else
+      r = g_pollable_output_stream_write_nonblocking (G_POLLABLE_OUTPUT_STREAM
+          (stream), (gchar *) & buffer[*idx], left, cancellable, &err);
     if (G_UNLIKELY (r < 0))
       goto error;
 
@@ -870,9 +1114,14 @@ fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
 
   if (G_LIKELY (size > (guint) out)) {
     gssize r;
-
-    r = g_pollable_stream_read (conn->input_stream,
-        (gchar *) & buffer[out], size - out, block, conn->cancellable, err);
+    gsize count = size - out;
+    if (block)
+      r = g_input_stream_read (conn->input_stream, (gchar *) & buffer[out],
+          count, conn->may_cancel ? conn->cancellable : NULL, err);
+    else
+      r = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM
+          (conn->input_stream), (gchar *) & buffer[out], count,
+          conn->may_cancel ? conn->cancellable : NULL, err);
 
     if (G_UNLIKELY (r < 0)) {
       if (out == 0) {
@@ -1098,7 +1347,7 @@ read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size,
  * Attempt to write @size bytes of @data to the connected @conn, blocking up to
  * the specified @timeout. @timeout can be #NULL, in which case this function
  * might block forever.
- * 
+ *
  * This function can be cancelled with gst_rtsp_connection_flush().
  *
  * Returns: #GST_RTSP_OK on success.
@@ -1238,7 +1487,7 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
  * Attempt to send @message to the connected @conn, blocking up to
  * the specified @timeout. @timeout can be #NULL, in which case this function
  * might block forever.
- * 
+ *
  * This function can be cancelled with gst_rtsp_connection_flush().
  *
  * Returns: #GST_RTSP_OK on success.
@@ -1606,12 +1855,14 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
         if (c == '$') {
           /* data message, prepare for the header */
           builder->state = STATE_DATA_HEADER;
+          conn->may_cancel = FALSE;
         } else if (c == '\n' || c == '\r') {
           /* skip \n and \r */
           builder->offset = 0;
         } else {
           builder->line = 0;
           builder->state = STATE_READ_LINES;
+          conn->may_cancel = FALSE;
         }
         break;
       }
@@ -1713,6 +1964,8 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
         gchar *session_cookie;
         gchar *session_id;
 
+        conn->may_cancel = TRUE;
+
         if (message->type == GST_RTSP_MESSAGE_DATA) {
           /* data messages don't have headers */
           res = GST_RTSP_OK;
@@ -1953,7 +2206,7 @@ cleanup:
  *
  * Close the connected @conn. After this call, the connection is in the same
  * state as when it was first created.
- * 
+ *
  * Returns: #GST_RTSP_OK on success.
  */
 GstRTSPResult
@@ -1974,6 +2227,10 @@ gst_rtsp_connection_close (GstRTSPConnection * conn)
     conn->socket1 = NULL;
   }
 
+  /* these were owned by the stream */
+  conn->input_stream = NULL;
+  conn->output_stream = NULL;
+
   g_free (conn->remote_ip);
   conn->remote_ip = NULL;
   g_free (conn->local_ip);
@@ -2007,7 +2264,7 @@ gst_rtsp_connection_close (GstRTSPConnection * conn)
  * @conn: a #GstRTSPConnection
  *
  * Close and free @conn.
- * 
+ *
  * Returns: #GST_RTSP_OK on success.
  */
 GstRTSPResult
@@ -2023,6 +2280,8 @@ gst_rtsp_connection_free (GstRTSPConnection * conn)
     g_object_unref (conn->cancellable);
   if (conn->client)
     g_object_unref (conn->client);
+  if (conn->tls_database)
+    g_object_unref (conn->tls_database);
 
   g_timer_destroy (conn->timer);
   gst_rtsp_url_free (conn->url);
@@ -2036,7 +2295,7 @@ gst_rtsp_connection_free (GstRTSPConnection * conn)
  * gst_rtsp_connection_poll:
  * @conn: a #GstRTSPConnection
  * @events: a bitmask of #GstRTSPEvent flags to check
- * @revents: location for result flags 
+ * @revents: location for result flags
  * @timeout: a timeout
  *
  * Wait up to the specified @timeout for the connection to become available for
@@ -2047,7 +2306,7 @@ gst_rtsp_connection_free (GstRTSPConnection * conn)
  * @timeout can be #NULL, in which case this function might block forever.
  *
  * This function can be cancelled with gst_rtsp_connection_flush().
- * 
+ *
  * Returns: #GST_RTSP_OK on success.
  */
 GstRTSPResult
@@ -2194,7 +2453,7 @@ gst_rtsp_connection_reset_timeout (GstRTSPConnection * conn)
  * Start or stop the flushing action on @conn. When flushing, all current
  * and future actions on @conn will return #GST_RTSP_EINTR until the connection
  * is set to non-flushing mode again.
- * 
+ *
  * Returns: #GST_RTSP_OK.
  */
 GstRTSPResult
@@ -2217,7 +2476,7 @@ gst_rtsp_connection_flush (GstRTSPConnection * conn, gboolean flush)
  * @port: the proxy port
  *
  * Set the proxy host and port.
- * 
+ *
  * Returns: #GST_RTSP_OK.
  */
 GstRTSPResult
@@ -2242,7 +2501,7 @@ gst_rtsp_connection_set_proxy (GstRTSPConnection * conn,
  *
  * Configure @conn for authentication mode @method with @user and @pass as the
  * user and password respectively.
- * 
+ *
  * Returns: #GST_RTSP_OK.
  */
 GstRTSPResult
@@ -2359,6 +2618,7 @@ static GstRTSPResult
 set_qos_dscp (GSocket * socket, guint qos_dscp)
 {
 #ifndef IP_TOS
+  GST_FIXME ("IP_TOS socket option is not defined, not setting dscp");
   return GST_RTSP_OK;
 #else
   gint fd;
@@ -2497,8 +2757,8 @@ gst_rtsp_connection_set_ip (GstRTSPConnection * conn, const gchar * ip)
  *
  * Get the file descriptor for reading.
  *
- * Returns: the file descriptor used for reading or %NULL on error. The file
- * descriptor remains valid until the connection is closed.
+ * Returns: (transfer none): the file descriptor used for reading or %NULL on
+ * error. The file descriptor remains valid until the connection is closed.
  */
 GSocket *
 gst_rtsp_connection_get_read_socket (const GstRTSPConnection * conn)
@@ -2515,8 +2775,8 @@ gst_rtsp_connection_get_read_socket (const GstRTSPConnection * conn)
  *
  * Get the file descriptor for writing.
  *
- * Returns: the file descriptor used for writing or NULL on error. The file
- * descriptor remains valid until the connection is closed.
+ * Returns: (transfer none): the file descriptor used for writing or NULL on
+ * error. The file descriptor remains valid until the connection is closed.
  */
 GSocket *
 gst_rtsp_connection_get_write_socket (const GstRTSPConnection * conn)
@@ -2566,7 +2826,7 @@ gst_rtsp_connection_set_tunneled (GstRTSPConnection * conn, gboolean tunneled)
  * gst_rtsp_connection_is_tunneled:
  * @conn: a #GstRTSPConnection
  *
- * Get the tunneling state of the connection. 
+ * Get the tunneling state of the connection.
  *
  * Returns: if @conn is using HTTP tunneling.
  */
@@ -2582,7 +2842,7 @@ gst_rtsp_connection_is_tunneled (const GstRTSPConnection * conn)
  * gst_rtsp_connection_get_tunnelid:
  * @conn: a #GstRTSPConnection
  *
- * Get the tunnel session id the connection. 
+ * Get the tunnel session id the connection.
  *
  * Returns: returns a non-empty string if @conn is being tunneled over HTTP.
  */
@@ -2631,6 +2891,7 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
     conn->socket1 = conn2->socket0;
     conn->stream1 = conn2->stream0;
     conn->input_stream = conn2->input_stream;
+    conn->control_stream = g_io_stream_get_input_stream (conn->stream0);
 
     /* clean up some of the state of conn2 */
     g_cancellable_cancel (conn2->cancellable);
@@ -2639,6 +2900,7 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
     conn2->stream0 = NULL;
     conn2->input_stream = NULL;
     conn2->output_stream = NULL;
+    conn2->control_stream = NULL;
     g_cancellable_reset (conn2->cancellable);
 
     /* We make socket0 the write socket and socket1 the read socket. */
@@ -2722,6 +2984,7 @@ struct _GstRTSPWatch
 
   GSource *readsrc;
   GSource *writesrc;
+  GSource *controlsrc;
 
   gboolean keep_running;
 
@@ -2763,6 +3026,62 @@ gst_rtsp_source_check (GSource * source)
 }
 
 static gboolean
+gst_rtsp_source_dispatch_read_get_channel (GPollableInputStream * stream,
+    GstRTSPWatch * watch)
+{
+  gssize count;
+  guint8 buffer[1024];
+  GError *error = NULL;
+
+  /* try to read in order to be able to detect errors, we read 1k in case some
+   * client actually decides to send data on the GET channel */
+  count = g_pollable_input_stream_read_nonblocking (stream, buffer, 1024, NULL,
+      &error);
+  if (count == 0) {
+    /* other end closed the socket */
+    goto eof;
+  }
+
+  if (count < 0) {
+    GST_DEBUG ("%s", error->message);
+    if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
+        g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
+      g_clear_error (&error);
+      goto done;
+    }
+    g_clear_error (&error);
+    goto read_error;
+  }
+
+  /* client sent data on the GET channel, ignore it */
+
+done:
+  return TRUE;
+
+  /* ERRORS */
+eof:
+  {
+    if (watch->funcs.closed)
+      watch->funcs.closed (watch, watch->user_data);
+
+    /* the read connection was closed, stop the watch now */
+    watch->keep_running = FALSE;
+
+    return FALSE;
+  }
+read_error:
+  {
+    if (watch->funcs.error_full)
+      watch->funcs.error_full (watch, GST_RTSP_ESYS, &watch->message,
+          0, watch->user_data);
+    else if (watch->funcs.error)
+      watch->funcs.error (watch, GST_RTSP_ESYS, watch->user_data);
+
+    goto eof;
+  }
+}
+
+static gboolean
 gst_rtsp_source_dispatch_read (GPollableInputStream * stream,
     GstRTSPWatch * watch)
 {
@@ -2777,6 +3096,19 @@ gst_rtsp_source_dispatch_read (GPollableInputStream * stream,
   if (res == GST_RTSP_EINTR)
     goto done;
   else if (G_UNLIKELY (res == GST_RTSP_EEOF)) {
+    if (watch->readsrc) {
+      g_source_remove_child_source ((GSource *) watch, watch->readsrc);
+      g_source_unref (watch->readsrc);
+      watch->readsrc = NULL;
+    }
+
+    if (conn->stream1) {
+      g_object_unref (conn->stream1);
+      conn->stream1 = NULL;
+      conn->socket1 = NULL;
+      conn->input_stream = NULL;
+    }
+
     /* When we are in tunnelled mode, the read socket can be closed and we
      * should be prepared for a new POST method to reopen it */
     if (conn->tstate == TUNNEL_STATE_COMPLETE) {
@@ -2806,6 +3138,9 @@ gst_rtsp_source_dispatch_read (GPollableInputStream * stream,
 
         /* queue the response */
         response = gen_tunnel_reply (conn, code, &watch->message);
+        if (watch->funcs.tunnel_http_response)
+          watch->funcs.tunnel_http_response (watch, &watch->message, response,
+              watch->user_data);
         gst_rtsp_watch_send_message (watch, response, NULL);
         gst_rtsp_message_free (response);
         goto read_done;
@@ -2883,7 +3218,12 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
     gpointer user_data G_GNUC_UNUSED)
 {
   GstRTSPWatch *watch = (GstRTSPWatch *) source;
+  GstRTSPConnection *conn = watch->conn;
 
+  if (conn->initial_buffer != NULL) {
+    gst_rtsp_source_dispatch_read (G_POLLABLE_INPUT_STREAM (conn->input_stream),
+        watch);
+  }
   return watch->keep_running;
 }
 
@@ -2905,8 +3245,31 @@ gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
 
       /* get a new message from the queue */
       rec = g_queue_pop_tail (watch->messages);
-      if (rec == NULL)
+      if (rec == NULL) {
+        if (watch->writesrc) {
+          g_source_remove_child_source ((GSource *) watch, watch->writesrc);
+          g_source_unref (watch->writesrc);
+          watch->writesrc = NULL;
+          /* we create and add the write source again when we actually have
+           * something to write */
+
+          /* since write source is now removed we add read source on the write
+           * socket instead to be able to detect when client closes get channel
+           * in tunneled mode */
+          if (watch->conn->control_stream) {
+            watch->controlsrc =
+                g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
+                (watch->conn->control_stream), NULL);
+            g_source_set_callback (watch->controlsrc,
+                (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch,
+                NULL);
+            g_source_add_child_source ((GSource *) watch, watch->controlsrc);
+          } else {
+            watch->controlsrc = NULL;
+          }
+        }
         break;
+      }
 
       watch->messages_bytes -= rec->size;
 
@@ -2980,6 +3343,13 @@ gst_rtsp_source_finalize (GSource * source)
   watch->messages_bytes = 0;
   g_free (watch->write_data);
 
+  if (watch->readsrc)
+    g_source_unref (watch->readsrc);
+  if (watch->writesrc)
+    g_source_unref (watch->writesrc);
+  if (watch->controlsrc)
+    g_source_unref (watch->controlsrc);
+
   g_mutex_clear (&watch->mutex);
 
   if (watch->notify)
@@ -3006,7 +3376,7 @@ static GSourceFuncs gst_rtsp_source_funcs = {
  * called with @user_data when activity happened on the watch.
  *
  * The new watch is usually created so that it can be attached to a
- * maincontext with gst_rtsp_watch_attach(). 
+ * maincontext with gst_rtsp_watch_attach().
  *
  * @conn must exist for the entire lifetime of the watch.
  *
@@ -3053,10 +3423,20 @@ gst_rtsp_watch_new (GstRTSPConnection * conn,
 void
 gst_rtsp_watch_reset (GstRTSPWatch * watch)
 {
-  if (watch->readsrc)
+  if (watch->readsrc) {
     g_source_remove_child_source ((GSource *) watch, watch->readsrc);
-  if (watch->writesrc)
+    g_source_unref (watch->readsrc);
+  }
+  if (watch->writesrc) {
     g_source_remove_child_source ((GSource *) watch, watch->writesrc);
+    g_source_unref (watch->writesrc);
+    watch->writesrc = NULL;
+  }
+  if (watch->controlsrc) {
+    g_source_remove_child_source ((GSource *) watch, watch->controlsrc);
+    g_source_unref (watch->controlsrc);
+    watch->controlsrc = NULL;
+  }
 
   if (watch->conn->input_stream) {
     watch->readsrc =
@@ -3065,21 +3445,25 @@ gst_rtsp_watch_reset (GstRTSPWatch * watch)
     g_source_set_callback (watch->readsrc,
         (GSourceFunc) gst_rtsp_source_dispatch_read, watch, NULL);
     g_source_add_child_source ((GSource *) watch, watch->readsrc);
-    g_source_unref (watch->readsrc);
   } else {
     watch->readsrc = NULL;
   }
 
-  if (watch->conn->output_stream) {
-    watch->writesrc =
-        g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM
-        (watch->conn->output_stream), NULL);
-    g_source_set_callback (watch->writesrc,
-        (GSourceFunc) gst_rtsp_source_dispatch_write, watch, NULL);
-    g_source_add_child_source ((GSource *) watch, watch->writesrc);
-    g_source_unref (watch->writesrc);
+  /* we create and add the write source when we actually have something to
+   * write */
+
+  /* when write source is not added we add read source on the write socket
+   * instead to be able to detect when client closes get channel in tunneled
+   * mode */
+  if (watch->conn->control_stream) {
+    watch->controlsrc =
+        g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
+        (watch->conn->control_stream), NULL);
+    g_source_set_callback (watch->controlsrc,
+        (GSourceFunc) gst_rtsp_source_dispatch_read_get_channel, watch, NULL);
+    g_source_add_child_source ((GSource *) watch, watch->controlsrc);
   } else {
-    watch->writesrc = NULL;
+    watch->controlsrc = NULL;
   }
 }
 
@@ -3090,7 +3474,7 @@ gst_rtsp_watch_reset (GstRTSPWatch * watch)
  *
  * Adds a #GstRTSPWatch to a context so that it will be executed within that context.
  *
- * Returns: the ID (greater than 0) for the watch within the GMainContext. 
+ * Returns: the ID (greater than 0) for the watch within the GMainContext.
  */
 guint
 gst_rtsp_watch_attach (GstRTSPWatch * watch, GMainContext * context)
@@ -3127,7 +3511,7 @@ gst_rtsp_watch_unref (GstRTSPWatch * watch)
  *
  * A value of 0 for @bytes or @messages means no limits.
  *
- * Since: 1.1.1
+ * Since: 1.2
  */
 void
 gst_rtsp_watch_set_send_backlog (GstRTSPWatch * watch,
@@ -3153,7 +3537,7 @@ gst_rtsp_watch_set_send_backlog (GstRTSPWatch * watch,
  * Get the maximum amount of bytes and messages that will be queued in @watch.
  * See gst_rtsp_watch_set_send_backlog().
  *
- * Since: 1.1.1
+ * Since: 1.2
  */
 void
 gst_rtsp_watch_get_send_backlog (GstRTSPWatch * watch,
@@ -3249,6 +3633,22 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
   /* make sure the main context will now also check for writability on the
    * socket */
   context = ((GSource *) watch)->context;
+  if (!watch->writesrc) {
+    /* remove the read source on the write socket, we will be able to detect
+     * errors while writing */
+    if (watch->controlsrc) {
+      g_source_remove_child_source ((GSource *) watch, watch->controlsrc);
+      g_source_unref (watch->controlsrc);
+      watch->controlsrc = NULL;
+    }
+
+    watch->writesrc =
+        g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM
+        (watch->conn->output_stream), NULL);
+    g_source_set_callback (watch->writesrc,
+        (GSourceFunc) gst_rtsp_source_dispatch_write, watch, NULL);
+    g_source_add_child_source ((GSource *) watch, watch->writesrc);
+  }
 
   if (id != NULL)
     *id = rec->id;