+
+#define READ_COND (G_IO_IN | G_IO_HUP | G_IO_ERR)
+#define WRITE_COND (G_IO_OUT | G_IO_ERR)
+
+typedef struct
+{
+ GString *str;
+ guint cseq;
+} GstRTSPRec;
+
+/* async functions */
+struct _GstRTSPChannel
+{
+ GSource source;
+
+ GstRTSPConnection *conn;
+
+ GstRTSPBuilder builder;
+ GstRTSPMessage message;
+
+ GPollFD readfd;
+ GPollFD writefd;
+ gboolean write_added;
+
+ /* queued message for transmission */
+ GList *messages;
+ guint8 *write_data;
+ guint write_off;
+ guint write_len;
+ guint write_cseq;
+
+ GstRTSPChannelFuncs funcs;
+
+ gpointer user_data;
+ GDestroyNotify notify;
+};
+
+static gboolean
+gst_rtsp_source_prepare (GSource * source, gint * timeout)
+{
+ GstRTSPChannel *channel = (GstRTSPChannel *) source;
+
+ *timeout = (channel->conn->timeout * 1000);
+
+ return FALSE;
+}
+
+static gboolean
+gst_rtsp_source_check (GSource * source)
+{
+ GstRTSPChannel *channel = (GstRTSPChannel *) source;
+
+ if (channel->readfd.revents & READ_COND)
+ return TRUE;
+
+ if (channel->writefd.revents & WRITE_COND)
+ return TRUE;
+
+ return FALSE;
+}
+
+static gboolean
+gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback,
+ gpointer user_data)
+{
+ GstRTSPChannel *channel = (GstRTSPChannel *) source;
+ GstRTSPResult res;
+
+ /* first read as much as we can */
+ if (channel->readfd.revents & READ_COND) {
+ do {
+ res = build_next (&channel->builder, &channel->message, channel->conn);
+ if (res == GST_RTSP_EINTR)
+ break;
+ if (res == GST_RTSP_EEOF)
+ goto eof;
+ if (res != GST_RTSP_OK)
+ goto error;
+
+ if (channel->funcs.message_received)
+ channel->funcs.message_received (channel, &channel->message,
+ channel->user_data);
+
+ gst_rtsp_message_unset (&channel->message);
+ build_reset (&channel->builder);
+ } while (FALSE);
+ }
+
+ if (channel->writefd.revents & WRITE_COND) {
+ do {
+ if (channel->write_data == NULL) {
+ GstRTSPRec *data;
+
+ if (!channel->messages)
+ goto done;
+
+ /* no data, get a new message from the queue */
+ data = channel->messages->data;
+ channel->messages =
+ g_list_delete_link (channel->messages, channel->messages);
+
+ channel->write_off = 0;
+ channel->write_len = data->str->len;
+ channel->write_data = (guint8 *) g_string_free (data->str, FALSE);
+ channel->write_cseq = data->cseq;
+
+ g_slice_free (GstRTSPRec, data);
+ }
+
+ res = write_bytes (channel->writefd.fd, channel->write_data,
+ &channel->write_off, channel->write_len);
+ if (res == GST_RTSP_EINTR)
+ break;
+ if (res != GST_RTSP_OK)
+ goto error;
+
+ if (channel->funcs.message_sent)
+ channel->funcs.message_sent (channel, channel->write_cseq,
+ channel->user_data);
+
+ done:
+ if (channel->messages == NULL && channel->write_added) {
+ g_source_remove_poll ((GSource *) channel, &channel->writefd);
+ channel->write_added = FALSE;
+ channel->writefd.revents = 0;
+ }
+ g_free (channel->write_data);
+ channel->write_data = NULL;
+ } while (FALSE);
+ }
+
+ return TRUE;
+
+ /* ERRORS */
+eof:
+ {
+ if (channel->funcs.closed)
+ channel->funcs.closed (channel, channel->user_data);
+ return FALSE;
+ }
+error:
+ {
+ if (channel->funcs.error)
+ channel->funcs.error (channel, res, channel->user_data);
+ return FALSE;
+ }
+}
+
+static void
+gst_rtsp_source_finalize (GSource * source)
+{
+ GstRTSPChannel *channel = (GstRTSPChannel *) source;
+ GList *walk;
+
+ build_reset (&channel->builder);
+
+ for (walk = channel->messages; walk; walk = g_list_next (walk)) {
+ GstRTSPRec *data = walk->data;
+
+ g_string_free (data->str, TRUE);
+ g_slice_free (GstRTSPRec, data);
+ }
+ g_list_free (channel->messages);
+ g_free (channel->write_data);
+
+ if (channel->notify)
+ channel->notify (channel->user_data);
+}
+
+static GSourceFuncs gst_rtsp_source_funcs = {
+ gst_rtsp_source_prepare,
+ gst_rtsp_source_check,
+ gst_rtsp_source_dispatch,
+ gst_rtsp_source_finalize
+};
+
+/**
+ * gst_rtsp_channel_new:
+ * @conn: a #GstRTSPConnection
+ * @funcs: channel functions
+ * @user_data: user data to pass to @funcs
+ *
+ * Create a channel object for @conn. The functions provided in @funcs will be
+ * called with @user_data when activity happened on the channel.
+ *
+ * The new channel is usually created so that it can be attached to a
+ * maincontext with gst_rtsp_channel_attach().
+ *
+ * @conn must exist for the entire lifetime of the channel.
+ *
+ * Returns: a #GstRTSPChannel that can be used for asynchronous RTSP
+ * communication. Free with gst_rtsp_channel_unref () after usage.
+ *
+ * Since: 0.10.23
+ */
+GstRTSPChannel *
+gst_rtsp_channel_new (GstRTSPConnection * conn,
+ GstRTSPChannelFuncs * funcs, gpointer user_data, GDestroyNotify notify)
+{
+ GstRTSPChannel *result;
+
+ g_return_val_if_fail (conn != NULL, NULL);
+ g_return_val_if_fail (funcs != NULL, NULL);
+
+ result = (GstRTSPChannel *) g_source_new (&gst_rtsp_source_funcs,
+ sizeof (GstRTSPChannel));
+
+ result->conn = conn;
+ result->builder.state = STATE_START;
+
+ result->readfd.fd = conn->fd.fd;
+ result->readfd.events = READ_COND;
+ result->readfd.revents = 0;
+
+ result->writefd.fd = conn->fd.fd;
+ result->writefd.events = WRITE_COND;
+ result->writefd.revents = 0;
+ result->write_added = FALSE;
+
+ result->funcs = *funcs;
+ result->user_data = user_data;
+ result->notify = notify;
+
+ /* only add the read fd, the write fd is only added when we have data
+ * to send. */
+ g_source_add_poll ((GSource *) result, &result->readfd);
+
+ return result;
+}
+
+/**
+ * gst_rtsp_channel_attach:
+ * @channel: a #GstRTSPChannel
+ * @context: a GMainContext (if NULL, the default context will be used)
+ *
+ * Adds a #GstRTSPChannel to a context so that it will be executed within that context.
+ *
+ * Returns: the ID (greater than 0) for the channel within the GMainContext.
+ *
+ * Since: 0.10.23
+ */
+guint
+gst_rtsp_channel_attach (GstRTSPChannel * channel, GMainContext * context)
+{
+ g_return_val_if_fail (channel != NULL, 0);
+
+ return g_source_attach ((GSource *) channel, context);
+}
+
+/**
+ * gst_rtsp_channel_free:
+ * @channel: a #GstRTSPChannel
+ *
+ * Decreases the reference count of @channel by one. If the resulting reference
+ * count is zero the channel and associated memory will be destroyed.
+ *
+ * Since: 0.10.23
+ */
+void
+gst_rtsp_channel_unref (GstRTSPChannel * channel)
+{
+ g_return_if_fail (channel != NULL);
+
+ g_source_unref ((GSource *) channel);
+}
+
+/**
+ * gst_rtsp_channel_queue_message:
+ * @channel: a #GstRTSPChannel
+ * @message: a #GstRTSPMessage
+ *
+ * Queue a @message for transmission in @channel. The contents of this
+ * message will be serialized and transmitted when the connection of the
+ * channel becomes writable.
+ *
+ * The return value of this function will be returned as the cseq argument in
+ * the message_sent callback.
+ *
+ * Returns: the sequence number of the message or -1 if the cseq could not be
+ * determined.
+ *
+ * Since: 0.10.23
+ */
+guint
+gst_rtsp_channel_queue_message (GstRTSPChannel * channel,
+ GstRTSPMessage * message)
+{
+ GstRTSPRec *data;
+ gchar *header;
+ guint cseq;
+
+ g_return_val_if_fail (channel != NULL, GST_RTSP_EINVAL);
+ g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
+
+ /* get the cseq from the message, when we finish writing this message on the
+ * socket we will have to pass the cseq to the callback. */
+ if (gst_rtsp_message_get_header (message, GST_RTSP_HDR_CSEQ, &header,
+ 0) == GST_RTSP_OK) {
+ cseq = atoi (header);
+ } else {
+ cseq = -1;
+ }
+
+ /* make a record with the message as a string ans cseq */
+ data = g_slice_new (GstRTSPRec);
+ data->str = message_to_string (channel->conn, message);
+ data->cseq = cseq;
+
+ /* add the record to a queue */
+ channel->messages = g_list_append (channel->messages, data);
+
+ /* make sure the main context will now also check for writability on the
+ * socket */
+ if (!channel->write_added) {
+ g_source_add_poll ((GSource *) channel, &channel->writefd);
+ channel->write_added = TRUE;
+ }
+ return cseq;
+}