GstRTSPClientSendFunc send_func;
gpointer send_data;
GDestroyNotify send_notify;
+ GstRTSPClientSendMessagesFunc send_messages_func;
+ gpointer send_messages_data;
+ GDestroyNotify send_messages_notify;
guint close_seq;
GArray *data_seqs;
if (priv->watch)
gst_rtsp_watch_set_flushing (priv->watch, TRUE);
gst_rtsp_client_set_send_func (client, NULL, NULL, NULL);
+ gst_rtsp_client_set_send_messages_func (client, NULL, NULL, NULL);
if (priv->watch)
g_source_destroy ((GSource *) priv->watch);
GstRTSPClientPrivate *priv = client->priv;
GstRTSPMessage message = { 0 };
gboolean ret = TRUE;
- GstMapInfo map_info;
- guint8 *data;
- guint usize;
gst_rtsp_message_init_data (&message, channel);
- /* FIXME, need some sort of iovec RTSPMessage here */
- if (!gst_buffer_map (buffer, &map_info, GST_MAP_READ))
- return FALSE;
-
- gst_rtsp_message_take_body (&message, map_info.data, map_info.size);
+ gst_rtsp_message_set_body_buffer (&message, buffer);
g_mutex_lock (&priv->send_lock);
if (get_data_seq (client, channel) != 0) {
ret = priv->send_func (client, &message, FALSE, priv->send_data);
g_mutex_unlock (&priv->send_lock);
- gst_rtsp_message_steal_body (&message, &data, &usize);
- gst_buffer_unmap (buffer, &map_info);
-
gst_rtsp_message_unset (&message);
if (!ret) {
do_send_data_list (GstBufferList * buffer_list, guint8 channel,
GstRTSPClient * client)
{
+ GstRTSPClientPrivate *priv = client->priv;
gboolean ret = TRUE;
guint i, n = gst_buffer_list_length (buffer_list);
+ GstRTSPMessage *messages;
+
+ g_mutex_lock (&priv->send_lock);
+ if (get_data_seq (client, channel) != 0) {
+ GST_WARNING ("already a queued data message for channel %d", channel);
+ g_mutex_unlock (&priv->send_lock);
+ return FALSE;
+ }
- /* TODO: Needs support for a) queueing up multiple messages on the
- * GstRTSPWatch in do_send_data() above and b) for one message having a body
- * consisting of multiple parts here */
+ messages = g_newa (GstRTSPMessage, n);
+ memset (messages, 0, sizeof (GstRTSPMessage) * n);
for (i = 0; i < n; i++) {
GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
+ gst_rtsp_message_init_data (&messages[i], channel);
+ gst_rtsp_message_set_body_buffer (&messages[i], buffer);
+ }
- ret = do_send_data (buffer, channel, client);
- if (!ret)
- break;
+ if (priv->send_messages_func) {
+ ret =
+ priv->send_messages_func (client, messages, n, FALSE, priv->send_data);
+ } else if (priv->send_func) {
+ for (i = 0; i < n; i++) {
+ ret = priv->send_func (client, &messages[i], FALSE, priv->send_data);
+ if (!ret)
+ break;
+ }
+ }
+ g_mutex_unlock (&priv->send_lock);
+
+ for (i = 0; i < n; i++) {
+ gst_rtsp_message_unset (&messages[i]);
+ }
+
+ if (!ret) {
+ GSource *idle_src;
+
+ /* close in watch context */
+ idle_src = g_idle_source_new ();
+ g_source_set_callback (idle_src, do_close, client, NULL);
+ g_source_attach (idle_src, priv->watch_context);
+ g_source_unref (idle_src);
}
return ret;
g_source_destroy ((GSource *) priv->watch);
priv->watch = NULL;
gst_rtsp_client_set_send_func (client, NULL, NULL, NULL);
+ gst_rtsp_client_set_send_messages_func (client, NULL, NULL, NULL);
rtsp_ctrl_timeout_remove (priv);
g_main_context_unref (priv->watch_context);
priv->watch_context = NULL;
}
/**
+ * gst_rtsp_client_set_send_messages_func:
+ * @client: a #GstRTSPClient
+ * @func: (scope notified): a #GstRTSPClientSendMessagesFunc
+ * @user_data: (closure): user data passed to @func
+ * @notify: (allow-none): called when @user_data is no longer in use
+ *
+ * Set @func as the callback that will be called when new messages needs to be
+ * sent to the client. @user_data is passed to @func and @notify is called when
+ * @user_data is no longer in use.
+ *
+ * By default, the client will send the messages on the #GstRTSPConnection that
+ * was configured with gst_rtsp_client_attach() was called.
+ *
+ * Since: 1.16
+ */
+void
+gst_rtsp_client_set_send_messages_func (GstRTSPClient * client,
+ GstRTSPClientSendMessagesFunc func, gpointer user_data,
+ GDestroyNotify notify)
+{
+ GstRTSPClientPrivate *priv;
+ GDestroyNotify old_notify;
+ gpointer old_data;
+
+ g_return_if_fail (GST_IS_RTSP_CLIENT (client));
+
+ priv = client->priv;
+
+ g_mutex_lock (&priv->send_lock);
+ priv->send_messages_func = func;
+ old_notify = priv->send_messages_notify;
+ old_data = priv->send_messages_data;
+ priv->send_messages_notify = notify;
+ priv->send_messages_data = user_data;
+ g_mutex_unlock (&priv->send_lock);
+
+ if (old_notify)
+ old_notify (old_data);
+}
+
+/**
* gst_rtsp_client_handle_message:
* @client: a #GstRTSPClient
* @message: (transfer none): an #GstRTSPMessage
}
}
+static gboolean
+do_send_messages (GstRTSPClient * client, GstRTSPMessage * messages,
+ guint n_messages, gboolean close, gpointer user_data)
+{
+ GstRTSPClientPrivate *priv = client->priv;
+ guint id = 0;
+ GstRTSPResult ret;
+ guint i;
+
+ /* send the message */
+ ret = gst_rtsp_watch_send_messages (priv->watch, messages, n_messages, &id);
+ if (ret != GST_RTSP_OK)
+ goto error;
+
+ /* if close flag is set, store the seq number so we can wait until it's
+ * written to the client to close the connection */
+ if (close)
+ priv->close_seq = id;
+
+ for (i = 0; i < n_messages; i++) {
+ if (gst_rtsp_message_get_type (&messages[i]) == GST_RTSP_MESSAGE_DATA) {
+ guint8 channel = 0;
+ GstRTSPResult r;
+
+ /* We assume that all data messages in the list are for the
+ * same channel */
+ r = gst_rtsp_message_parse_data (&messages[i], &channel);
+ if (r != GST_RTSP_OK) {
+ ret = r;
+ goto error;
+ }
+
+ /* check if the message has been queued for transmission in watch */
+ if (id) {
+ /* store the seq number so we can wait until it has been sent */
+ GST_DEBUG_OBJECT (client, "wait for message %d, channel %d", id,
+ channel);
+ set_data_seq (client, channel, id);
+ } else {
+ GstRTSPStreamTransport *trans;
+
+ trans =
+ g_hash_table_lookup (priv->transports,
+ GINT_TO_POINTER ((gint) channel));
+ if (trans) {
+ GST_DEBUG_OBJECT (client, "emit 'message-sent' signal");
+ g_mutex_unlock (&priv->send_lock);
+ gst_rtsp_stream_transport_message_sent (trans);
+ g_mutex_lock (&priv->send_lock);
+ }
+ }
+ break;
+ }
+ }
+
+ return ret == GST_RTSP_OK;
+
+ /* ERRORS */
+error:
+ {
+ GST_DEBUG_OBJECT (client, "got error %d", ret);
+ return FALSE;
+ }
+}
+
static GstRTSPResult
message_received (GstRTSPWatch * watch, GstRTSPMessage * message,
gpointer user_data)
gst_rtsp_watch_set_flushing (watch, TRUE);
g_mutex_lock (&priv->watch_lock);
gst_rtsp_client_set_send_func (client, NULL, NULL, NULL);
+ gst_rtsp_client_set_send_messages_func (client, NULL, NULL, NULL);
g_mutex_unlock (&priv->watch_lock);
return GST_RTSP_OK;
g_source_destroy ((GSource *) priv->watch);
priv->watch = NULL;
gst_rtsp_client_set_send_func (client, NULL, NULL, NULL);
+ gst_rtsp_client_set_send_messages_func (client, NULL, NULL, NULL);
}
return GST_RTSP_STS_OK;
gst_rtsp_client_set_send_func (client, do_send_message,
g_source_ref ((GSource *) priv->watch),
(GDestroyNotify) gst_rtsp_watch_unref);
+ gst_rtsp_client_set_send_messages_func (client, do_send_messages, priv->watch,
+ (GDestroyNotify) gst_rtsp_watch_unref);
gst_rtsp_watch_set_send_backlog (priv->watch, 0, WATCH_BACKLOG_SIZE);