rtsp: Added new API for sending using GstRTSPWatch.
authorPeter Kjellerstedt <pkj@axis.com>
Mon, 17 Aug 2009 09:53:43 +0000 (11:53 +0200)
committerPeter Kjellerstedt <pkj@axis.com>
Mon, 24 Aug 2009 11:19:46 +0000 (13:19 +0200)
The new API to send messages using GstRTSPWatch will first try to send the
message immediately. Then, if that failed (or the message was not sent
fully), it will queue the remaining message for later delivery. This avoids
unnecessary context switches, and makes it possible to keep track of
whether the connection is blocked (the unblocking of the connection is
indicated by the reception of the message_sent signal).

This also deprecates the old API (gst_rtsp_watch_queue_data() and
gst_rtsp_watch_queue_message().)

API: gst_rtsp_watch_write_data()
API: gst_rtsp_watch_send_message()

gst-libs/gst/rtsp/gstrtspconnection.c
gst-libs/gst/rtsp/gstrtspconnection.h

index 26972c1..6b27753 100644 (file)
@@ -3035,7 +3035,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
 
             /* queue the response */
             response = gen_tunnel_reply (watch->conn, code, &watch->message);
-            gst_rtsp_watch_queue_message (watch, response);
+            gst_rtsp_watch_send_message (watch, response, NULL);
             gst_rtsp_message_free (response);
             goto read_done;
           } else if (watch->conn->tstate == TUNNEL_STATE_NONE &&
@@ -3315,6 +3315,119 @@ gst_rtsp_watch_unref (GstRTSPWatch * watch)
 }
 
 /**
+ * gst_rtsp_watch_write_data:
+ * @watch: a #GstRTSPWatch
+ * @data: the data to queue
+ * @size: the size of @data
+ * @id: location for a message ID or %NULL
+ *
+ * Write @data using the connection of the @watch. If it cannot be sent
+ * immediately, it will be queued for transmission in @watch. The contents of
+ * @message will then be serialized and transmitted when the connection of the
+ * @watch becomes writable. In case the @message is queued, the ID returned in
+ * @id will be non-zero and used as the ID argument in the message_sent
+ * callback.
+ *
+ * This function will take ownership of @data and g_free() it after use.
+ *
+ * Returns: #GST_RTSP_OK on success.
+ *
+ * Since: 0.10.25
+ */
+GstRTSPResult
+gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data,
+    guint size, guint * id)
+{
+  GstRTSPResult res;
+  GstRTSPRec *rec;
+  guint off = 0;
+
+  g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
+  g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL);
+  g_return_val_if_fail (size != 0, GST_RTSP_EINVAL);
+
+  g_mutex_lock (watch->mutex);
+
+  if (watch->messages->length == 0) {
+    res = write_bytes (watch->writefd.fd, data, &off, size);
+    if (res != GST_RTSP_EINTR) {
+      if (id != NULL)
+        *id = 0;
+      g_free ((gpointer) data);
+      goto done;
+    }
+  }
+
+  /* make a record with the data and id */
+  rec = g_slice_new (GstRTSPRec);
+  if (off == 0) {
+    rec->data = (guint8 *) data;
+    rec->size = size;
+  } else {
+    rec->data = g_memdup (data + off, size - off);
+    rec->size = size - off;
+    g_free ((gpointer) data);
+  }
+
+  do {
+    /* make sure rec->id is never 0 */
+    rec->id = ++watch->id;
+  } while (G_UNLIKELY (rec->id == 0));
+
+  /* add the record to a queue. FIXME we would like to have an upper limit here */
+  g_queue_push_head (watch->messages, rec);
+
+  /* make sure the main context will now also check for writability on the
+   * socket */
+  if (!watch->write_added) {
+    g_source_add_poll ((GSource *) watch, &watch->writefd);
+    watch->write_added = TRUE;
+  }
+
+  if (id != NULL)
+    *id = rec->id;
+  res = GST_RTSP_OK;
+
+done:
+  g_mutex_unlock (watch->mutex);
+  return res;
+}
+
+/**
+ * gst_rtsp_watch_send_message:
+ * @watch: a #GstRTSPWatch
+ * @message: a #GstRTSPMessage
+ * @id: location for a message ID or %NULL
+ *
+ * Send a @message using the connection of the @watch. If it cannot be sent
+ * immediately, it will be queued for transmission in @watch. The contents of
+ * @message will then be serialized and transmitted when the connection of the
+ * @watch becomes writable. In case the @message is queued, the ID returned in
+ * @id will be non-zero and used as the ID argument in the message_sent
+ * callback.
+ *
+ * Returns: #GST_RTSP_OK on success.
+ *
+ * Since: 0.10.25
+ */
+GstRTSPResult
+gst_rtsp_watch_send_message (GstRTSPWatch * watch, GstRTSPMessage * message,
+    guint * id)
+{
+  GString *str;
+  guint size;
+
+  g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL);
+  g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
+
+  /* make a record with the message as a string and id */
+  str = message_to_string (watch->conn, message);
+  size = str->len;
+  return gst_rtsp_watch_write_data (watch,
+      (guint8 *) g_string_free (str, FALSE), size, id);
+}
+
+/**
  * gst_rtsp_watch_queue_data:
  * @watch: a #GstRTSPWatch
  * @data: the data to queue
@@ -3328,10 +3441,13 @@ gst_rtsp_watch_unref (GstRTSPWatch * watch)
  * The return value of this function will be used as the id argument in the
  * message_sent callback.
  *
+ * Deprecated: Use gst_rtsp_watch_write_data()
+ *
  * Returns: an id.
  *
  * Since: 0.10.24
  */
+#ifndef GST_REMOVE_DEPRECATED
 guint
 gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
     guint size)
@@ -3366,6 +3482,7 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
   g_mutex_unlock (watch->mutex);
   return rec->id;
 }
+#endif /* GST_REMOVE_DEPRECATED */
 
 /**
  * gst_rtsp_watch_queue_message:
@@ -3379,10 +3496,13 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data,
  * The return value of this function will be used as the id argument in the
  * message_sent callback.
  *
+ * Deprecated: Use gst_rtsp_watch_send_message()
+ *
  * Returns: an id.
  *
  * Since: 0.10.23
  */
+#ifndef GST_REMOVE_DEPRECATED
 guint
 gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message)
 {
@@ -3398,3 +3518,4 @@ gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message)
   return gst_rtsp_watch_queue_data (watch,
       (guint8 *) g_string_free (str, FALSE), size);
 }
+#endif /* GST_REMOVE_DEPRECATED */
index f719be3..558994f 100644 (file)
@@ -186,11 +186,20 @@ void               gst_rtsp_watch_unref              (GstRTSPWatch *watch);
 guint              gst_rtsp_watch_attach             (GstRTSPWatch *watch,
                                                       GMainContext *context);
 
+GstRTSPResult      gst_rtsp_watch_write_data         (GstRTSPWatch *watch,
+                                                      const guint8 *data,
+                                                      guint size, guint *id);
+GstRTSPResult      gst_rtsp_watch_send_message       (GstRTSPWatch *watch,
+                                                      GstRTSPMessage *message,
+                                                      guint *id);
+
+#ifndef GST_DISABLE_DEPRECATED
 guint              gst_rtsp_watch_queue_data         (GstRTSPWatch * watch,
                                                       const guint8 * data,
                                                       guint size);
 guint              gst_rtsp_watch_queue_message      (GstRTSPWatch *watch,
                                                       GstRTSPMessage *message);
+#endif
 
 G_END_DECLS