Merge branch 'master' into 0.11
[platform/upstream/gstreamer.git] / gst-libs / gst / rtsp / gstrtspconnection.c
index c45f639..2de21b9 100644 (file)
  * @short_description: manage RTSP connections
  * @see_also: gstrtspurl
  *  
- * <refsect2>
- * <para>
  * This object manages the RTSP connection to the server. It provides function
  * to receive and send bytes and messages.
- * </para>
- * </refsect2>
  *  
  * Last reviewed on 2007-07-24 (0.10.14)
  */
@@ -461,11 +457,20 @@ static gchar *
 do_resolve (const gchar * host)
 {
   static gchar ip[INET6_ADDRSTRLEN];
-  struct addrinfo *aires;
+  struct addrinfo *aires, hints;
   struct addrinfo *ai;
   gint aierr;
 
-  aierr = getaddrinfo (host, NULL, NULL, &aires);
+  memset (&hints, 0, sizeof (struct addrinfo));
+  hints.ai_family = AF_UNSPEC;  /* Allow IPv4 or IPv6 */
+  hints.ai_socktype = SOCK_DGRAM;       /* Datagram socket */
+  hints.ai_flags = AI_PASSIVE;  /* For wildcard IP address */
+  hints.ai_protocol = 0;        /* Any protocol */
+  hints.ai_canonname = NULL;
+  hints.ai_addr = NULL;
+  hints.ai_next = NULL;
+
+  aierr = getaddrinfo (host, NULL, &hints, &aires);
   if (aierr != 0)
     goto no_addrinfo;
 
@@ -587,6 +592,14 @@ do_connect (const gchar * ip, guint16 port, GstPollFD * fdout,
     getsockopt (fd, SOL_SOCKET, SO_ERROR, (char *) &errno, &len);
 #endif
     goto sys_error;
+  } else {
+#ifdef __APPLE__
+    /* osx wakes up select with POLLOUT if the connection is refused... */
+    socklen_t len = sizeof (errno);
+    getsockopt (fd, SOL_SOCKET, SO_ERROR, (char *) &errno, &len);
+    if (errno != 0)
+      goto sys_error;
+#endif
   }
 
   gst_poll_fd_ignored (fdset, fdout);
@@ -1013,20 +1026,26 @@ add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message)
 static void
 gen_date_string (gchar * date_string, guint len)
 {
-  GTimeVal tv;
+  static const char wkdays[7][4] =
+      { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" };
+  static const char months[12][4] =
+      { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct",
+    "Nov", "Dec"
+  };
+  struct tm tm;
   time_t t;
-#ifdef HAVE_GMTIME_R
-  struct tm tm_;
-#endif
 
-  g_get_current_time (&tv);
-  t = (time_t) tv.tv_sec;
+  time (&t);
 
 #ifdef HAVE_GMTIME_R
-  strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime_r (&t, &tm_));
+  gmtime_r (&t, &tm);
 #else
-  strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime (&t));
+  tm = *gmtime (&t);
 #endif
+
+  g_snprintf (date_string, len, "%s, %02d %s %04d %02d:%02d:%02d GMT",
+      wkdays[tm.tm_wday], tm.tm_mday, months[tm.tm_mon], tm.tm_year + 1900,
+      tm.tm_hour, tm.tm_min, tm.tm_sec);
 }
 
 static GstRTSPResult
@@ -1825,7 +1844,7 @@ normalize_line (guint8 * buffer)
 
 /* returns:
  *  GST_RTSP_OK when a complete message was read.
- *  GST_RTSP_EEOF: when the socket is closed
+ *  GST_RTSP_EEOF: when the read socket is closed
  *  GST_RTSP_EINTR: when more data is needed.
  *  GST_RTSP_..: some other error occured.
  */
@@ -2258,7 +2277,9 @@ gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message,
     if (gst_poll_fd_has_error (conn->fdset, conn->writefd))
       goto socket_error;
 
-    gst_poll_set_controllable (conn->fdset, FALSE);
+    /* once we start reading the wait cannot be controlled */
+    if (builder.state != STATE_START)
+      gst_poll_set_controllable (conn->fdset, FALSE);
   }
 
   /* we have a message here */
@@ -2462,7 +2483,7 @@ stopped:
  * @timeout: a timeout
  *
  * Calculate the next timeout for @conn, storing the result in @timeout.
- * 
+ *
  * Returns: #GST_RTSP_OK.
  */
 GstRTSPResult
@@ -2471,16 +2492,34 @@ gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout)
   gdouble elapsed;
   glong sec;
   gulong usec;
+  gint ctimeout;
 
   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
   g_return_val_if_fail (timeout != NULL, GST_RTSP_EINVAL);
 
+  ctimeout = conn->timeout;
+  if (ctimeout >= 20) {
+    /* Because we should act before the timeout we timeout 5
+     * seconds in advance. */
+    ctimeout -= 5;
+  } else if (ctimeout >= 5) {
+    /* else timeout 20% earlier */
+    ctimeout -= ctimeout / 5;
+  } else if (ctimeout >= 1) {
+    /* else timeout 1 second earlier */
+    ctimeout -= 1;
+  }
+
   elapsed = g_timer_elapsed (conn->timer, &usec);
-  if (elapsed >= conn->timeout) {
+  if (elapsed >= ctimeout) {
     sec = 0;
     usec = 0;
   } else {
-    sec = conn->timeout - elapsed;
+    sec = ctimeout - elapsed;
+    if (usec <= G_USEC_PER_SEC)
+      usec = G_USEC_PER_SEC - usec;
+    else
+      usec = 0;
   }
 
   timeout->tv_sec = sec;
@@ -2494,7 +2533,7 @@ gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout)
  * @conn: a #GstRTSPConnection
  *
  * Reset the timeout of @conn.
- * 
+ *
  * Returns: #GST_RTSP_OK.
  */
 GstRTSPResult
@@ -2993,8 +3032,10 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
   return GST_RTSP_OK;
 }
 
-#define READ_COND   (G_IO_IN | G_IO_HUP | G_IO_ERR)
-#define WRITE_COND  (G_IO_OUT | G_IO_ERR)
+#define READ_ERR    (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
+#define READ_COND   (G_IO_IN | READ_ERR)
+#define WRITE_ERR   (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
+#define WRITE_COND  (G_IO_OUT | WRITE_ERR)
 
 typedef struct
 {
@@ -3015,7 +3056,6 @@ struct _GstRTSPWatch
 
   GPollFD readfd;
   GPollFD writefd;
-  gboolean write_added;
 
   /* queued message for transmission */
   guint id;
@@ -3064,17 +3104,35 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
     gpointer user_data G_GNUC_UNUSED)
 {
   GstRTSPWatch *watch = (GstRTSPWatch *) source;
-  GstRTSPResult res;
+  GstRTSPResult res = GST_RTSP_ERROR;
+  gboolean keep_running = TRUE;
 
   /* first read as much as we can */
   if (watch->readfd.revents & READ_COND || watch->conn->initial_buffer != NULL) {
     do {
+      if (watch->readfd.revents & READ_ERR)
+        goto read_error;
+
       res = build_next (&watch->builder, &watch->message, watch->conn);
       if (res == GST_RTSP_EINTR)
         break;
-      else if (G_UNLIKELY (res == GST_RTSP_EEOF))
-        goto eof;
-      else if (G_LIKELY (res == GST_RTSP_OK)) {
+      else if (G_UNLIKELY (res == GST_RTSP_EEOF)) {
+        watch->readfd.events = 0;
+        watch->readfd.revents = 0;
+        g_source_remove_poll ((GSource *) watch, &watch->readfd);
+        /* When we are in tunnelled mode, the read socket can be closed and we
+         * should be prepared for a new POST method to reopen it */
+        if (watch->conn->tstate == TUNNEL_STATE_COMPLETE) {
+          /* remove the read connection for the tunnel */
+          /* we accept a new POST request */
+          watch->conn->tstate = TUNNEL_STATE_GET;
+          /* and signal that we lost our tunnel */
+          if (watch->funcs.tunnel_lost)
+            res = watch->funcs.tunnel_lost (watch, watch->user_data);
+          goto read_done;
+        } else
+          goto eof;
+      } else if (G_LIKELY (res == GST_RTSP_OK)) {
         if (!watch->conn->manual_http &&
             watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
           if (watch->conn->tstate == TUNNEL_STATE_NONE &&
@@ -3131,11 +3189,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
           watch->funcs.message_received (watch, &watch->message,
               watch->user_data);
       } else {
-        if (watch->funcs.error_full)
-          GST_RTSP_CHECK (watch->funcs.error_full (watch, res, &watch->message,
-                  0, watch->user_data), error);
-        else
-          goto error;
+        goto read_error;
       }
 
     read_done:
@@ -3145,6 +3199,9 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
   }
 
   if (watch->writefd.revents & WRITE_COND) {
+    if (watch->writefd.revents & WRITE_ERR)
+      goto write_error;
+
     g_mutex_lock (watch->mutex);
     do {
       if (watch->write_data == NULL) {
@@ -3153,7 +3210,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
         /* get a new message from the queue */
         rec = g_queue_pop_tail (watch->messages);
         if (rec == NULL)
-          goto done;
+          break;
 
         watch->write_off = 0;
         watch->write_data = rec->data;
@@ -3166,17 +3223,14 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
       res = write_bytes (watch->writefd.fd, watch->write_data,
           &watch->write_off, watch->write_size);
       g_mutex_unlock (watch->mutex);
+
       if (res == GST_RTSP_EINTR)
         goto write_blocked;
       else if (G_LIKELY (res == GST_RTSP_OK)) {
         if (watch->funcs.message_sent)
           watch->funcs.message_sent (watch, watch->write_id, watch->user_data);
       } else {
-        if (watch->funcs.error_full)
-          GST_RTSP_CHECK (watch->funcs.error_full (watch, res, NULL,
-                  watch->write_id, watch->user_data), error);
-        else
-          goto error;
+        goto write_error;
       }
       g_mutex_lock (watch->mutex);
 
@@ -3184,31 +3238,61 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
       watch->write_data = NULL;
     } while (TRUE);
 
-  done:
-    if (watch->write_added) {
-      g_source_remove_poll ((GSource *) watch, &watch->writefd);
-      watch->write_added = FALSE;
-      watch->writefd.revents = 0;
-    }
+    watch->writefd.events = WRITE_ERR;
 
     g_mutex_unlock (watch->mutex);
   }
 
 write_blocked:
-  return TRUE;
+  return keep_running;
 
   /* ERRORS */
 eof:
   {
     if (watch->funcs.closed)
       watch->funcs.closed (watch, watch->user_data);
+
+    /* always stop when the readfd returns EOF in non-tunneled mode */
     return FALSE;
   }
+read_error:
+  {
+    watch->readfd.events = 0;
+    watch->readfd.revents = 0;
+    g_source_remove_poll ((GSource *) watch, &watch->readfd);
+    keep_running = (watch->writefd.events != 0);
+
+    if (keep_running) {
+      if (watch->funcs.error_full)
+        GST_RTSP_CHECK (watch->funcs.error_full (watch, res, &watch->message,
+                0, watch->user_data), error);
+      else
+        goto error;
+    } else
+      goto eof;
+  }
+write_error:
+  {
+    watch->writefd.events = 0;
+    watch->writefd.revents = 0;
+    g_source_remove_poll ((GSource *) watch, &watch->writefd);
+    keep_running = (watch->readfd.events != 0);
+
+    if (keep_running) {
+      if (watch->funcs.error_full)
+        GST_RTSP_CHECK (watch->funcs.error_full (watch, res, NULL,
+                watch->write_id, watch->user_data), error);
+      else
+        goto error;
+    } else
+      goto eof;
+  }
 error:
   {
     if (watch->funcs.error)
       watch->funcs.error (watch, res, watch->user_data);
-    return FALSE;
+
+    return keep_running;
   }
 }
 
@@ -3232,11 +3316,10 @@ gst_rtsp_source_finalize (GSource * source)
   g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL);
   g_queue_free (watch->messages);
   watch->messages = NULL;
+  g_free (watch->write_data);
 
   g_mutex_free (watch->mutex);
 
-  g_free (watch->write_data);
-
   if (watch->notify)
     watch->notify (watch->user_data);
 }
@@ -3299,10 +3382,6 @@ gst_rtsp_watch_new (GstRTSPConnection * conn,
   result->user_data = user_data;
   result->notify = notify;
 
-  /* only add the read fd, the write fd is only added when we have data
-   * to send. */
-  g_source_add_poll ((GSource *) result, &result->readfd);
-
   return result;
 }
 
@@ -3328,11 +3407,13 @@ gst_rtsp_watch_reset (GstRTSPWatch * watch)
   watch->readfd.revents = 0;
 
   watch->writefd.fd = watch->conn->writefd->fd;
-  watch->writefd.events = WRITE_COND;
+  watch->writefd.events = WRITE_ERR;
   watch->writefd.revents = 0;
-  watch->write_added = FALSE;
 
-  g_source_add_poll ((GSource *) watch, &watch->readfd);
+  if (watch->readfd.fd != -1)
+    g_source_add_poll ((GSource *) watch, &watch->readfd);
+  if (watch->writefd.fd != -1)
+    g_source_add_poll ((GSource *) watch, &watch->writefd);
 }
 
 /**
@@ -3398,6 +3479,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
   GstRTSPResult res;
   GstRTSPRec *rec;
   guint off = 0;
+  GMainContext *context = NULL;
 
   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
@@ -3405,6 +3487,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
 
   g_mutex_lock (watch->mutex);
 
+  /* try to send the message synchronously first */
   if (watch->messages->length == 0) {
     res = write_bytes (watch->writefd.fd, data, &off, size);
     if (res != GST_RTSP_EINTR) {
@@ -3415,7 +3498,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
     }
   }
 
-  /* make a record with the data and id */
+  /* make a record with the data and id for sending async */
   rec = g_slice_new (GstRTSPRec);
   if (off == 0) {
     rec->data = (guint8 *) data;
@@ -3436,9 +3519,9 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
 
   /* make sure the main context will now also check for writability on the
    * socket */
-  if (!watch->write_added) {
-    g_source_add_poll ((GSource *) watch, &watch->writefd);
-    watch->write_added = TRUE;
+  if (watch->writefd.events != WRITE_COND) {
+    watch->writefd.events = WRITE_COND;
+    context = ((GSource *) watch)->context;
   }
 
   if (id != NULL)
@@ -3447,6 +3530,10 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
 
 done:
   g_mutex_unlock (watch->mutex);
+
+  if (context)
+    g_main_context_wakeup (context);
+
   return res;
 }
 
@@ -3515,6 +3602,7 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
     guint size)
 {
   GstRTSPRec *rec;
+  GMainContext *context = NULL;
 
   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
@@ -3536,12 +3624,15 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
 
   /* make sure the main context will now also check for writability on the
    * socket */
-  if (!watch->write_added) {
-    g_source_add_poll ((GSource *) watch, &watch->writefd);
-    watch->write_added = TRUE;
+  if (watch->writefd.events != WRITE_COND) {
+    watch->writefd.events = WRITE_COND;
+    context = ((GSource *) watch)->context;
   }
-
   g_mutex_unlock (watch->mutex);
+
+  if (context)
+    g_main_context_wakeup (context);
+
   return rec->id;
 }
 #endif /* GST_REMOVE_DEPRECATED */