rtspconnection: Handle closed POST socket in tunneling
authorWim Taymans <wim.taymans@collabora.co.uk>
Tue, 6 Apr 2010 08:55:42 +0000 (10:55 +0200)
committerWim Taymans <wim.taymans@collabora.co.uk>
Tue, 6 Apr 2010 08:59:39 +0000 (10:59 +0200)
Catch more socket errors.
Rework how sockets are managed in the GSource, wake up the maincontext instead
of adding/removing the sockets from the source.
Add callback for when the tunnel connection is lost. Some clients (Quicktime
Player) close the POST connection in tunneled mode and reopen the socket when
needed.

See #612915

gst-libs/gst/rtsp/gstrtspconnection.c
gst-libs/gst/rtsp/gstrtspconnection.h

index b2a30d8..18f8389 100644 (file)
@@ -1834,7 +1834,7 @@ normalize_line (guint8 * buffer)
 
 /* returns:
  *  GST_RTSP_OK when a complete message was read.
- *  GST_RTSP_EEOF: when the socket is closed
+ *  GST_RTSP_EEOF: when the read socket is closed
  *  GST_RTSP_EINTR: when more data is needed.
  *  GST_RTSP_..: some other error occured.
  */
@@ -3006,8 +3006,10 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
   return GST_RTSP_OK;
 }
 
-#define READ_COND   (G_IO_IN | G_IO_HUP | G_IO_ERR)
-#define WRITE_COND  (G_IO_OUT | G_IO_ERR)
+#define READ_ERR    (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
+#define READ_COND   (G_IO_IN | READ_ERR)
+#define WRITE_ERR   (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
+#define WRITE_COND  (G_IO_OUT | WRITE_ERR)
 
 typedef struct
 {
@@ -3028,7 +3030,6 @@ struct _GstRTSPWatch
 
   GPollFD readfd;
   GPollFD writefd;
-  gboolean write_added;
 
   /* queued message for transmission */
   guint id;
@@ -3077,17 +3078,35 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
     gpointer user_data G_GNUC_UNUSED)
 {
   GstRTSPWatch *watch = (GstRTSPWatch *) source;
-  GstRTSPResult res;
+  GstRTSPResult res = GST_RTSP_ERROR;
+  gboolean keep_running = TRUE;
 
   /* first read as much as we can */
   if (watch->readfd.revents & READ_COND || watch->conn->initial_buffer != NULL) {
     do {
+      if (watch->readfd.revents & READ_ERR)
+        goto read_error;
+
       res = build_next (&watch->builder, &watch->message, watch->conn);
       if (res == GST_RTSP_EINTR)
         break;
-      else if (G_UNLIKELY (res == GST_RTSP_EEOF))
-        goto eof;
-      else if (G_LIKELY (res == GST_RTSP_OK)) {
+      else if (G_UNLIKELY (res == GST_RTSP_EEOF)) {
+        watch->readfd.events = 0;
+        watch->readfd.revents = 0;
+        g_source_remove_poll ((GSource *) watch, &watch->readfd);
+        /* When we are in tunnelled mode, the read socket can be closed and we
+         * should be prepared for a new POST method to reopen it */
+        if (watch->conn->tstate == TUNNEL_STATE_COMPLETE) {
+          /* remove the read connection for the tunnel */
+          /* we accept a new POST request */
+          watch->conn->tstate = TUNNEL_STATE_GET;
+          /* and signal that we lost our tunnel */
+          if (watch->funcs.tunnel_lost)
+            res = watch->funcs.tunnel_lost (watch, watch->user_data);
+          goto read_done;
+        } else
+          goto eof;
+      } else if (G_LIKELY (res == GST_RTSP_OK)) {
         if (!watch->conn->manual_http &&
             watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
           if (watch->conn->tstate == TUNNEL_STATE_NONE &&
@@ -3144,11 +3163,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
           watch->funcs.message_received (watch, &watch->message,
               watch->user_data);
       } else {
-        if (watch->funcs.error_full)
-          GST_RTSP_CHECK (watch->funcs.error_full (watch, res, &watch->message,
-                  0, watch->user_data), error);
-        else
-          goto error;
+        goto read_error;
       }
 
     read_done:
@@ -3158,6 +3173,9 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
   }
 
   if (watch->writefd.revents & WRITE_COND) {
+    if (watch->writefd.revents & WRITE_ERR)
+      goto write_error;
+
     g_mutex_lock (watch->mutex);
     do {
       if (watch->write_data == NULL) {
@@ -3166,7 +3184,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
         /* get a new message from the queue */
         rec = g_queue_pop_tail (watch->messages);
         if (rec == NULL)
-          goto done;
+          break;
 
         watch->write_off = 0;
         watch->write_data = rec->data;
@@ -3179,17 +3197,14 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
       res = write_bytes (watch->writefd.fd, watch->write_data,
           &watch->write_off, watch->write_size);
       g_mutex_unlock (watch->mutex);
+
       if (res == GST_RTSP_EINTR)
         goto write_blocked;
       else if (G_LIKELY (res == GST_RTSP_OK)) {
         if (watch->funcs.message_sent)
           watch->funcs.message_sent (watch, watch->write_id, watch->user_data);
       } else {
-        if (watch->funcs.error_full)
-          GST_RTSP_CHECK (watch->funcs.error_full (watch, res, NULL,
-                  watch->write_id, watch->user_data), error);
-        else
-          goto error;
+        goto write_error;
       }
       g_mutex_lock (watch->mutex);
 
@@ -3197,31 +3212,61 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
       watch->write_data = NULL;
     } while (TRUE);
 
-  done:
-    if (watch->write_added) {
-      g_source_remove_poll ((GSource *) watch, &watch->writefd);
-      watch->write_added = FALSE;
-      watch->writefd.revents = 0;
-    }
+    watch->writefd.events = WRITE_ERR;
 
     g_mutex_unlock (watch->mutex);
   }
 
 write_blocked:
-  return TRUE;
+  return keep_running;
 
   /* ERRORS */
 eof:
   {
     if (watch->funcs.closed)
       watch->funcs.closed (watch, watch->user_data);
+
+    /* always stop when the readfd returns EOF in non-tunneled mode */
     return FALSE;
   }
+read_error:
+  {
+    watch->readfd.events = 0;
+    watch->readfd.revents = 0;
+    g_source_remove_poll ((GSource *) watch, &watch->readfd);
+    keep_running = (watch->writefd.events != 0);
+
+    if (keep_running) {
+      if (watch->funcs.error_full)
+        GST_RTSP_CHECK (watch->funcs.error_full (watch, res, &watch->message,
+                0, watch->user_data), error);
+      else
+        goto error;
+    } else
+      goto eof;
+  }
+write_error:
+  {
+    watch->writefd.events = 0;
+    watch->writefd.revents = 0;
+    g_source_remove_poll ((GSource *) watch, &watch->writefd);
+    keep_running = (watch->readfd.events != 0);
+
+    if (keep_running) {
+      if (watch->funcs.error_full)
+        GST_RTSP_CHECK (watch->funcs.error_full (watch, res, NULL,
+                watch->write_id, watch->user_data), error);
+      else
+        goto error;
+    } else
+      goto eof;
+  }
 error:
   {
     if (watch->funcs.error)
       watch->funcs.error (watch, res, watch->user_data);
-    return FALSE;
+
+    return keep_running;
   }
 }
 
@@ -3245,11 +3290,10 @@ gst_rtsp_source_finalize (GSource * source)
   g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL);
   g_queue_free (watch->messages);
   watch->messages = NULL;
+  g_free (watch->write_data);
 
   g_mutex_free (watch->mutex);
 
-  g_free (watch->write_data);
-
   if (watch->notify)
     watch->notify (watch->user_data);
 }
@@ -3312,10 +3356,6 @@ gst_rtsp_watch_new (GstRTSPConnection * conn,
   result->user_data = user_data;
   result->notify = notify;
 
-  /* only add the read fd, the write fd is only added when we have data
-   * to send. */
-  g_source_add_poll ((GSource *) result, &result->readfd);
-
   return result;
 }
 
@@ -3341,11 +3381,13 @@ gst_rtsp_watch_reset (GstRTSPWatch * watch)
   watch->readfd.revents = 0;
 
   watch->writefd.fd = watch->conn->writefd->fd;
-  watch->writefd.events = WRITE_COND;
+  watch->writefd.events = WRITE_ERR;
   watch->writefd.revents = 0;
-  watch->write_added = FALSE;
 
-  g_source_add_poll ((GSource *) watch, &watch->readfd);
+  if (watch->readfd.fd != -1)
+    g_source_add_poll ((GSource *) watch, &watch->readfd);
+  if (watch->writefd.fd != -1)
+    g_source_add_poll ((GSource *) watch, &watch->writefd);
 }
 
 /**
@@ -3411,6 +3453,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
   GstRTSPResult res;
   GstRTSPRec *rec;
   guint off = 0;
+  GMainContext *context = NULL;
 
   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
@@ -3418,6 +3461,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
 
   g_mutex_lock (watch->mutex);
 
+  /* try to send the message synchronously first */
   if (watch->messages->length == 0) {
     res = write_bytes (watch->writefd.fd, data, &off, size);
     if (res != GST_RTSP_EINTR) {
@@ -3428,7 +3472,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
     }
   }
 
-  /* make a record with the data and id */
+  /* make a record with the data and id for sending async */
   rec = g_slice_new (GstRTSPRec);
   if (off == 0) {
     rec->data = (guint8 *) data;
@@ -3449,9 +3493,9 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
 
   /* make sure the main context will now also check for writability on the
    * socket */
-  if (!watch->write_added) {
-    g_source_add_poll ((GSource *) watch, &watch->writefd);
-    watch->write_added = TRUE;
+  if (watch->writefd.events != WRITE_COND) {
+    watch->writefd.events = WRITE_COND;
+    context = ((GSource *) watch)->context;
   }
 
   if (id != NULL)
@@ -3460,6 +3504,10 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
 
 done:
   g_mutex_unlock (watch->mutex);
+
+  if (context)
+    g_main_context_wakeup (context);
+
   return res;
 }
 
@@ -3528,6 +3576,7 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
     guint size)
 {
   GstRTSPRec *rec;
+  GMainContext *context = NULL;
 
   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
@@ -3549,12 +3598,15 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
 
   /* make sure the main context will now also check for writability on the
    * socket */
-  if (!watch->write_added) {
-    g_source_add_poll ((GSource *) watch, &watch->writefd);
-    watch->write_added = TRUE;
+  if (watch->writefd.events != WRITE_COND) {
+    watch->writefd.events = WRITE_COND;
+    context = ((GSource *) watch)->context;
   }
-
   g_mutex_unlock (watch->mutex);
+
+  if (context)
+    g_main_context_wakeup (context);
+
   return rec->id;
 }
 #endif /* GST_REMOVE_DEPRECATED */
index c811b12..2a34abd 100644 (file)
@@ -153,6 +153,8 @@ typedef struct _GstRTSPWatch GstRTSPWatch;
  *   gst_rtsp_connection_do_tunnel().
  * @error_full: callback when an error occured with more information than
  *   the @error callback. Since 0.10.25
+ * @tunnel_lost: callback when the post connection of a tunnel is closed.
+ *   Since 0.10.29
  *
  * Callback functions from a #GstRTSPWatch.
  *
@@ -171,9 +173,10 @@ typedef struct {
   GstRTSPResult     (*error_full)       (GstRTSPWatch *watch, GstRTSPResult result,
                                          GstRTSPMessage *message, guint id,
                                          gpointer user_data);
+  GstRTSPResult     (*tunnel_lost)      (GstRTSPWatch *watch, gpointer user_data);
 
   /*< private >*/
-  gpointer _gst_reserved[GST_PADDING - 1];
+  gpointer _gst_reserved[GST_PADDING - 2];
 } GstRTSPWatchFuncs;
 
 GstRTSPWatch *     gst_rtsp_watch_new                (GstRTSPConnection *conn,