Merge branch 'master' into 0.11
[platform/upstream/gstreamer.git] / gst-libs / gst / rtsp / gstrtspconnection.c
index c78da0b..2de21b9 100644 (file)
  * @short_description: manage RTSP connections
  * @see_also: gstrtspurl
  *  
- * <refsect2>
- * <para>
  * This object manages the RTSP connection to the server. It provides function
  * to receive and send bytes and messages.
- * </para>
- * </refsect2>
  *  
  * Last reviewed on 2007-07-24 (0.10.14)
  */
@@ -461,11 +457,20 @@ static gchar *
 do_resolve (const gchar * host)
 {
   static gchar ip[INET6_ADDRSTRLEN];
-  struct addrinfo *aires;
+  struct addrinfo *aires, hints;
   struct addrinfo *ai;
   gint aierr;
 
-  aierr = getaddrinfo (host, NULL, NULL, &aires);
+  memset (&hints, 0, sizeof (struct addrinfo));
+  hints.ai_family = AF_UNSPEC;  /* Allow IPv4 or IPv6 */
+  hints.ai_socktype = SOCK_DGRAM;       /* Datagram socket */
+  hints.ai_flags = AI_PASSIVE;  /* For wildcard IP address */
+  hints.ai_protocol = 0;        /* Any protocol */
+  hints.ai_canonname = NULL;
+  hints.ai_addr = NULL;
+  hints.ai_next = NULL;
+
+  aierr = getaddrinfo (host, NULL, &hints, &aires);
   if (aierr != 0)
     goto no_addrinfo;
 
@@ -587,6 +592,14 @@ do_connect (const gchar * ip, guint16 port, GstPollFD * fdout,
     getsockopt (fd, SOL_SOCKET, SO_ERROR, (char *) &errno, &len);
 #endif
     goto sys_error;
+  } else {
+#ifdef __APPLE__
+    /* osx wakes up select with POLLOUT if the connection is refused... */
+    socklen_t len = sizeof (errno);
+    getsockopt (fd, SOL_SOCKET, SO_ERROR, (char *) &errno, &len);
+    if (errno != 0)
+      goto sys_error;
+#endif
   }
 
   gst_poll_fd_ignored (fdset, fdout);
@@ -710,7 +723,7 @@ setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout)
     goto wrong_result;
 
   if (gst_rtsp_message_get_header (&response, GST_RTSP_HDR_X_SERVER_IP_ADDRESS,
-          &value, 0) != GST_RTSP_OK) {
+          &value, 0) == GST_RTSP_OK) {
     if (conn->proxy_host) {
       /* if we use a proxy we need to change the destination url */
       g_free (url->host);
@@ -719,7 +732,7 @@ setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout)
       hostparam = g_strdup_printf ("%s:%d", url->host, url_port);
     } else {
       /* and resolve the new ip address */
-      if (!(ip = do_resolve (conn->ip)))
+      if (!(ip = do_resolve (value)))
         goto not_resolved;
       g_free (conn->ip);
       conn->ip = ip;
@@ -946,6 +959,9 @@ add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message)
       gchar *user_pass64;
       gchar *auth_string;
 
+      if (conn->username == NULL || conn->passwd == NULL)
+        break;
+
       user_pass = g_strdup_printf ("%s:%s", conn->username, conn->passwd);
       user_pass64 = g_base64_encode ((guchar *) user_pass, strlen (user_pass));
       auth_string = g_strdup_printf ("Basic %s", user_pass64);
@@ -967,7 +983,8 @@ add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message)
       const gchar *method;
 
       /* we need to have some params set */
-      if (conn->auth_params == NULL)
+      if (conn->auth_params == NULL || conn->username == NULL ||
+          conn->passwd == NULL)
         break;
 
       /* we need the realm and nonce */
@@ -1009,20 +1026,26 @@ add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message)
 static void
 gen_date_string (gchar * date_string, guint len)
 {
-  GTimeVal tv;
+  static const char wkdays[7][4] =
+      { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" };
+  static const char months[12][4] =
+      { "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct",
+    "Nov", "Dec"
+  };
+  struct tm tm;
   time_t t;
-#ifdef HAVE_GMTIME_R
-  struct tm tm_;
-#endif
 
-  g_get_current_time (&tv);
-  t = (time_t) tv.tv_sec;
+  time (&t);
 
 #ifdef HAVE_GMTIME_R
-  strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime_r (&t, &tm_));
+  gmtime_r (&t, &tm);
 #else
-  strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime (&t));
+  tm = *gmtime (&t);
 #endif
+
+  g_snprintf (date_string, len, "%s, %02d %s %04d %02d:%02d:%02d GMT",
+      wkdays[tm.tm_wday], tm.tm_mday, months[tm.tm_mon], tm.tm_year + 1900,
+      tm.tm_hour, tm.tm_min, tm.tm_sec);
 }
 
 static GstRTSPResult
@@ -1821,7 +1844,7 @@ normalize_line (guint8 * buffer)
 
 /* returns:
  *  GST_RTSP_OK when a complete message was read.
- *  GST_RTSP_EEOF: when the socket is closed
+ *  GST_RTSP_EEOF: when the read socket is closed
  *  GST_RTSP_EINTR: when more data is needed.
  *  GST_RTSP_..: some other error occured.
  */
@@ -1834,22 +1857,31 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
   while (TRUE) {
     switch (builder->state) {
       case STATE_START:
+      {
+        guint8 c;
+
         builder->offset = 0;
         res =
             read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1);
         if (res != GST_RTSP_OK)
           goto done;
 
+        c = builder->buffer[0];
+
         /* we have 1 bytes now and we can see if this is a data message or
          * not */
-        if (builder->buffer[0] == '$') {
+        if (c == '$') {
           /* data message, prepare for the header */
           builder->state = STATE_DATA_HEADER;
+        } else if (c == '\n' || c == '\r') {
+          /* skip \n and \r */
+          builder->offset = 0;
         } else {
           builder->line = 0;
           builder->state = STATE_READ_LINES;
         }
         break;
+      }
       case STATE_DATA_HEADER:
       {
         res =
@@ -1906,7 +1938,13 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
                       GST_RTSP_HDR_X_SESSIONCOOKIE, NULL, 0) != GST_RTSP_OK)) {
             /* there is, prepare to read the body */
             builder->body_len = atol (hdrval);
-            builder->body_data = g_malloc (builder->body_len + 1);
+            builder->body_data = g_try_malloc (builder->body_len + 1);
+            /* we can't do much here, we need the length to know how many bytes
+             * we need to read next and when allocation fails, something is
+             * probably wrong with the length. */
+            if (builder->body_data == NULL)
+              goto invalid_body_len;
+
             builder->body_data[builder->body_len] = '\0';
             builder->offset = 0;
             builder->state = STATE_DATA_BODY;
@@ -1999,6 +2037,13 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
   }
 done:
   return res;
+
+  /* ERRORS */
+invalid_body_len:
+  {
+    GST_DEBUG ("could not allocate body");
+    return GST_RTSP_ERROR;
+  }
 }
 
 /**
@@ -2232,7 +2277,9 @@ gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message,
     if (gst_poll_fd_has_error (conn->fdset, conn->writefd))
       goto socket_error;
 
-    gst_poll_set_controllable (conn->fdset, FALSE);
+    /* once we start reading the wait cannot be controlled */
+    if (builder.state != STATE_START)
+      gst_poll_set_controllable (conn->fdset, FALSE);
   }
 
   /* we have a message here */
@@ -2436,7 +2483,7 @@ stopped:
  * @timeout: a timeout
  *
  * Calculate the next timeout for @conn, storing the result in @timeout.
- * 
+ *
  * Returns: #GST_RTSP_OK.
  */
 GstRTSPResult
@@ -2445,16 +2492,34 @@ gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout)
   gdouble elapsed;
   glong sec;
   gulong usec;
+  gint ctimeout;
 
   g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
   g_return_val_if_fail (timeout != NULL, GST_RTSP_EINVAL);
 
+  ctimeout = conn->timeout;
+  if (ctimeout >= 20) {
+    /* Because we should act before the timeout we timeout 5
+     * seconds in advance. */
+    ctimeout -= 5;
+  } else if (ctimeout >= 5) {
+    /* else timeout 20% earlier */
+    ctimeout -= ctimeout / 5;
+  } else if (ctimeout >= 1) {
+    /* else timeout 1 second earlier */
+    ctimeout -= 1;
+  }
+
   elapsed = g_timer_elapsed (conn->timer, &usec);
-  if (elapsed >= conn->timeout) {
+  if (elapsed >= ctimeout) {
     sec = 0;
     usec = 0;
   } else {
-    sec = conn->timeout - elapsed;
+    sec = ctimeout - elapsed;
+    if (usec <= G_USEC_PER_SEC)
+      usec = G_USEC_PER_SEC - usec;
+    else
+      usec = 0;
   }
 
   timeout->tv_sec = sec;
@@ -2468,7 +2533,7 @@ gst_rtsp_connection_next_timeout (GstRTSPConnection * conn, GTimeVal * timeout)
  * @conn: a #GstRTSPConnection
  *
  * Reset the timeout of @conn.
- * 
+ *
  * Returns: #GST_RTSP_OK.
  */
 GstRTSPResult
@@ -2967,8 +3032,10 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
   return GST_RTSP_OK;
 }
 
-#define READ_COND   (G_IO_IN | G_IO_HUP | G_IO_ERR)
-#define WRITE_COND  (G_IO_OUT | G_IO_ERR)
+#define READ_ERR    (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
+#define READ_COND   (G_IO_IN | READ_ERR)
+#define WRITE_ERR   (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
+#define WRITE_COND  (G_IO_OUT | WRITE_ERR)
 
 typedef struct
 {
@@ -2989,7 +3056,6 @@ struct _GstRTSPWatch
 
   GPollFD readfd;
   GPollFD writefd;
-  gboolean write_added;
 
   /* queued message for transmission */
   guint id;
@@ -3038,17 +3104,35 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
     gpointer user_data G_GNUC_UNUSED)
 {
   GstRTSPWatch *watch = (GstRTSPWatch *) source;
-  GstRTSPResult res;
+  GstRTSPResult res = GST_RTSP_ERROR;
+  gboolean keep_running = TRUE;
 
   /* first read as much as we can */
   if (watch->readfd.revents & READ_COND || watch->conn->initial_buffer != NULL) {
     do {
+      if (watch->readfd.revents & READ_ERR)
+        goto read_error;
+
       res = build_next (&watch->builder, &watch->message, watch->conn);
       if (res == GST_RTSP_EINTR)
         break;
-      else if (G_UNLIKELY (res == GST_RTSP_EEOF))
-        goto eof;
-      else if (G_LIKELY (res == GST_RTSP_OK)) {
+      else if (G_UNLIKELY (res == GST_RTSP_EEOF)) {
+        watch->readfd.events = 0;
+        watch->readfd.revents = 0;
+        g_source_remove_poll ((GSource *) watch, &watch->readfd);
+        /* When we are in tunnelled mode, the read socket can be closed and we
+         * should be prepared for a new POST method to reopen it */
+        if (watch->conn->tstate == TUNNEL_STATE_COMPLETE) {
+          /* remove the read connection for the tunnel */
+          /* we accept a new POST request */
+          watch->conn->tstate = TUNNEL_STATE_GET;
+          /* and signal that we lost our tunnel */
+          if (watch->funcs.tunnel_lost)
+            res = watch->funcs.tunnel_lost (watch, watch->user_data);
+          goto read_done;
+        } else
+          goto eof;
+      } else if (G_LIKELY (res == GST_RTSP_OK)) {
         if (!watch->conn->manual_http &&
             watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) {
           if (watch->conn->tstate == TUNNEL_STATE_NONE &&
@@ -3105,11 +3189,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
           watch->funcs.message_received (watch, &watch->message,
               watch->user_data);
       } else {
-        if (watch->funcs.error_full)
-          GST_RTSP_CHECK (watch->funcs.error_full (watch, res, &watch->message,
-                  0, watch->user_data), error);
-        else
-          goto error;
+        goto read_error;
       }
 
     read_done:
@@ -3119,6 +3199,9 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
   }
 
   if (watch->writefd.revents & WRITE_COND) {
+    if (watch->writefd.revents & WRITE_ERR)
+      goto write_error;
+
     g_mutex_lock (watch->mutex);
     do {
       if (watch->write_data == NULL) {
@@ -3127,7 +3210,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
         /* get a new message from the queue */
         rec = g_queue_pop_tail (watch->messages);
         if (rec == NULL)
-          goto done;
+          break;
 
         watch->write_off = 0;
         watch->write_data = rec->data;
@@ -3140,17 +3223,14 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
       res = write_bytes (watch->writefd.fd, watch->write_data,
           &watch->write_off, watch->write_size);
       g_mutex_unlock (watch->mutex);
+
       if (res == GST_RTSP_EINTR)
         goto write_blocked;
       else if (G_LIKELY (res == GST_RTSP_OK)) {
         if (watch->funcs.message_sent)
           watch->funcs.message_sent (watch, watch->write_id, watch->user_data);
       } else {
-        if (watch->funcs.error_full)
-          GST_RTSP_CHECK (watch->funcs.error_full (watch, res, NULL,
-                  watch->write_id, watch->user_data), error);
-        else
-          goto error;
+        goto write_error;
       }
       g_mutex_lock (watch->mutex);
 
@@ -3158,31 +3238,61 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
       watch->write_data = NULL;
     } while (TRUE);
 
-  done:
-    if (watch->write_added) {
-      g_source_remove_poll ((GSource *) watch, &watch->writefd);
-      watch->write_added = FALSE;
-      watch->writefd.revents = 0;
-    }
+    watch->writefd.events = WRITE_ERR;
 
     g_mutex_unlock (watch->mutex);
   }
 
 write_blocked:
-  return TRUE;
+  return keep_running;
 
   /* ERRORS */
 eof:
   {
     if (watch->funcs.closed)
       watch->funcs.closed (watch, watch->user_data);
+
+    /* always stop when the readfd returns EOF in non-tunneled mode */
     return FALSE;
   }
+read_error:
+  {
+    watch->readfd.events = 0;
+    watch->readfd.revents = 0;
+    g_source_remove_poll ((GSource *) watch, &watch->readfd);
+    keep_running = (watch->writefd.events != 0);
+
+    if (keep_running) {
+      if (watch->funcs.error_full)
+        GST_RTSP_CHECK (watch->funcs.error_full (watch, res, &watch->message,
+                0, watch->user_data), error);
+      else
+        goto error;
+    } else
+      goto eof;
+  }
+write_error:
+  {
+    watch->writefd.events = 0;
+    watch->writefd.revents = 0;
+    g_source_remove_poll ((GSource *) watch, &watch->writefd);
+    keep_running = (watch->readfd.events != 0);
+
+    if (keep_running) {
+      if (watch->funcs.error_full)
+        GST_RTSP_CHECK (watch->funcs.error_full (watch, res, NULL,
+                watch->write_id, watch->user_data), error);
+      else
+        goto error;
+    } else
+      goto eof;
+  }
 error:
   {
     if (watch->funcs.error)
       watch->funcs.error (watch, res, watch->user_data);
-    return FALSE;
+
+    return keep_running;
   }
 }
 
@@ -3206,11 +3316,10 @@ gst_rtsp_source_finalize (GSource * source)
   g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL);
   g_queue_free (watch->messages);
   watch->messages = NULL;
+  g_free (watch->write_data);
 
   g_mutex_free (watch->mutex);
 
-  g_free (watch->write_data);
-
   if (watch->notify)
     watch->notify (watch->user_data);
 }
@@ -3273,10 +3382,6 @@ gst_rtsp_watch_new (GstRTSPConnection * conn,
   result->user_data = user_data;
   result->notify = notify;
 
-  /* only add the read fd, the write fd is only added when we have data
-   * to send. */
-  g_source_add_poll ((GSource *) result, &result->readfd);
-
   return result;
 }
 
@@ -3302,11 +3407,13 @@ gst_rtsp_watch_reset (GstRTSPWatch * watch)
   watch->readfd.revents = 0;
 
   watch->writefd.fd = watch->conn->writefd->fd;
-  watch->writefd.events = WRITE_COND;
+  watch->writefd.events = WRITE_ERR;
   watch->writefd.revents = 0;
-  watch->write_added = FALSE;
 
-  g_source_add_poll ((GSource *) watch, &watch->readfd);
+  if (watch->readfd.fd != -1)
+    g_source_add_poll ((GSource *) watch, &watch->readfd);
+  if (watch->writefd.fd != -1)
+    g_source_add_poll ((GSource *) watch, &watch->writefd);
 }
 
 /**
@@ -3372,6 +3479,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
   GstRTSPResult res;
   GstRTSPRec *rec;
   guint off = 0;
+  GMainContext *context = NULL;
 
   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
@@ -3379,6 +3487,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
 
   g_mutex_lock (watch->mutex);
 
+  /* try to send the message synchronously first */
   if (watch->messages->length == 0) {
     res = write_bytes (watch->writefd.fd, data, &off, size);
     if (res != GST_RTSP_EINTR) {
@@ -3389,7 +3498,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
     }
   }
 
-  /* make a record with the data and id */
+  /* make a record with the data and id for sending async */
   rec = g_slice_new (GstRTSPRec);
   if (off == 0) {
     rec->data = (guint8 *) data;
@@ -3410,9 +3519,9 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
 
   /* make sure the main context will now also check for writability on the
    * socket */
-  if (!watch->write_added) {
-    g_source_add_poll ((GSource *) watch, &watch->writefd);
-    watch->write_added = TRUE;
+  if (watch->writefd.events != WRITE_COND) {
+    watch->writefd.events = WRITE_COND;
+    context = ((GSource *) watch)->context;
   }
 
   if (id != NULL)
@@ -3421,6 +3530,10 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
 
 done:
   g_mutex_unlock (watch->mutex);
+
+  if (context)
+    g_main_context_wakeup (context);
+
   return res;
 }
 
@@ -3479,11 +3592,17 @@ gst_rtsp_watch_send_message (GstRTSPWatch * watch, GstRTSPMessage * message,
  * Since: 0.10.24
  */
 #ifndef GST_REMOVE_DEPRECATED
+#ifdef GST_DISABLE_DEPRECATED
+guint
+gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
+    guint size);
+#endif
 guint
 gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
     guint size)
 {
   GstRTSPRec *rec;
+  GMainContext *context = NULL;
 
   g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
   g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
@@ -3505,12 +3624,15 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
 
   /* make sure the main context will now also check for writability on the
    * socket */
-  if (!watch->write_added) {
-    g_source_add_poll ((GSource *) watch, &watch->writefd);
-    watch->write_added = TRUE;
+  if (watch->writefd.events != WRITE_COND) {
+    watch->writefd.events = WRITE_COND;
+    context = ((GSource *) watch)->context;
   }
-
   g_mutex_unlock (watch->mutex);
+
+  if (context)
+    g_main_context_wakeup (context);
+
   return rec->id;
 }
 #endif /* GST_REMOVE_DEPRECATED */
@@ -3534,6 +3656,10 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
  * Since: 0.10.23
  */
 #ifndef GST_REMOVE_DEPRECATED
+#ifdef GST_DISABLE_DEPRECATED
+guint
+gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message);
+#endif
 guint
 gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message)
 {