GstRTSPSendFunc
gst_rtsp_stream_transport_set_callbacks
+GstRTSPSendListFunc
+gst_rtsp_stream_transport_set_list_callbacks
GstRTSPKeepAliveFunc
gst_rtsp_stream_transport_set_keepalive
gst_rtsp_stream_transport_set_timed_out
gst_rtsp_stream_transport_is_timed_out
-gst_rtsp_stream_transport_send_rtcp
gst_rtsp_stream_transport_send_rtp
+gst_rtsp_stream_transport_send_rtp_list
+gst_rtsp_stream_transport_send_rtcp
+gst_rtsp_stream_transport_send_rtcp_list
<SUBSECTION Standard>
GST_RTSP_STREAM_TRANSPORT_CAST
return ret;
}
+static gboolean
+do_send_data_list (GstBufferList * buffer_list, guint8 channel,
+ GstRTSPClient * client)
+{
+ gboolean ret = TRUE;
+ guint i, n = gst_buffer_list_length (buffer_list);
+
+ /* TODO: Needs support for a) queueing up multiple messages on the
+ * GstRTSPWatch in do_send_data() above and b) for one message having a body
+ * consisting of multiple parts here */
+ for (i = 0; i < n; i++) {
+ GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
+
+ ret = do_send_data (buffer, channel, client);
+ if (!ret)
+ break;
+ }
+
+ return ret;
+}
+
/**
* gst_rtsp_client_close:
* @client: a #GstRTSPClient
gst_rtsp_stream_transport_set_callbacks (trans,
(GstRTSPSendFunc) do_send_data,
(GstRTSPSendFunc) do_send_data, client, NULL);
+ gst_rtsp_stream_transport_set_list_callbacks (trans,
+ (GstRTSPSendListFunc) do_send_data_list,
+ (GstRTSPSendListFunc) do_send_data_list, client, NULL);
g_hash_table_insert (priv->transports,
GINT_TO_POINTER (ct->interleaved.min), trans);
/* create watch for the connection and attach */
priv->watch = gst_rtsp_watch_new (priv->connection, &watch_funcs,
g_object_ref (client), (GDestroyNotify) client_watch_notify);
- gst_rtsp_client_set_send_func (client, do_send_message, priv->watch,
+ gst_rtsp_client_set_send_func (client, do_send_message,
+ g_source_ref ((GSource *) priv->watch),
(GDestroyNotify) gst_rtsp_watch_unref);
gst_rtsp_watch_set_send_backlog (priv->watch, 0, WATCH_BACKLOG_SIZE);
}
g_object_set (appsrc, "block", TRUE, "format", GST_FORMAT_TIME, "is-live",
- TRUE, NULL);
- g_object_set (appsink, "sync", FALSE, "async", FALSE, NULL);
+ TRUE, "emit-signals", FALSE, NULL);
+ g_object_set (appsink, "sync", FALSE, "async", FALSE, "emit-signals",
+ FALSE, "buffer-list", TRUE, NULL);
data = g_new0 (AppSinkSrcData, 1);
data->appsink = appsink;
gpointer user_data;
GDestroyNotify notify;
+ GstRTSPSendListFunc send_rtp_list;
+ GstRTSPSendListFunc send_rtcp_list;
+ gpointer list_user_data;
+ GDestroyNotify list_notify;
+
GstRTSPKeepAliveFunc keep_alive;
gpointer ka_user_data;
GDestroyNotify ka_notify;
}
/**
+ * gst_rtsp_stream_transport_set_list_callbacks:
+ * @trans: a #GstRTSPStreamTransport
+ * @send_rtp_list: (scope notified): a callback called when RTP should be sent
+ * @send_rtcp_list: (scope notified): a callback called when RTCP should be sent
+ * @user_data: (closure): user data passed to callbacks
+ * @notify: (allow-none): called with the user_data when no longer needed.
+ *
+ * Install callbacks that will be called when data for a stream should be sent
+ * to a client. This is usually used when sending RTP/RTCP over TCP.
+ *
+ * Since: 1.16
+ */
+void
+gst_rtsp_stream_transport_set_list_callbacks (GstRTSPStreamTransport * trans,
+ GstRTSPSendListFunc send_rtp_list, GstRTSPSendListFunc send_rtcp_list,
+ gpointer user_data, GDestroyNotify notify)
+{
+ GstRTSPStreamTransportPrivate *priv;
+
+ g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
+
+ priv = trans->priv;
+
+ priv->send_rtp_list = send_rtp_list;
+ priv->send_rtcp_list = send_rtcp_list;
+ if (priv->list_notify)
+ priv->list_notify (priv->list_user_data);
+ priv->list_user_data = user_data;
+ priv->list_notify = notify;
+}
+
+/**
* gst_rtsp_stream_transport_set_keepalive:
* @trans: a #GstRTSPStreamTransport
* @keep_alive: (scope notified): a callback called when the receiver is active
}
/**
+ * gst_rtsp_stream_transport_send_rtp_list:
+ * @trans: a #GstRTSPStreamTransport
+ * @buffer_list: (transfer none): a #GstBufferList
+ *
+ * Send @buffer_list to the installed RTP callback for @trans.
+ *
+ * Returns: %TRUE on success
+ *
+ * Since: 1.16
+ */
+gboolean
+gst_rtsp_stream_transport_send_rtp_list (GstRTSPStreamTransport * trans,
+ GstBufferList * buffer_list)
+{
+ GstRTSPStreamTransportPrivate *priv;
+ gboolean res = FALSE;
+
+ g_return_val_if_fail (GST_IS_BUFFER_LIST (buffer_list), FALSE);
+
+ priv = trans->priv;
+
+ if (priv->send_rtp_list) {
+ res =
+ priv->send_rtp_list (buffer_list, priv->transport->interleaved.min,
+ priv->list_user_data);
+ } else if (priv->send_rtp) {
+ guint n = gst_buffer_list_length (buffer_list), i;
+
+ for (i = 0; i < n; i++) {
+ GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
+
+ res =
+ priv->send_rtp (buffer, priv->transport->interleaved.min,
+ priv->user_data);
+ if (!res)
+ break;
+ }
+ }
+
+ if (res)
+ gst_rtsp_stream_transport_keep_alive (trans);
+
+ return res;
+}
+
+/**
+ * gst_rtsp_stream_transport_send_rtcp_list:
+ * @trans: a #GstRTSPStreamTransport
+ * @buffer_list: (transfer none): a #GstBuffer
+ *
+ * Send @buffer_list to the installed RTCP callback for @trans.
+ *
+ * Returns: %TRUE on success
+ *
+ * Since: 1.16
+ */
+gboolean
+gst_rtsp_stream_transport_send_rtcp_list (GstRTSPStreamTransport * trans,
+ GstBufferList * buffer_list)
+{
+ GstRTSPStreamTransportPrivate *priv;
+ gboolean res = FALSE;
+
+ g_return_val_if_fail (GST_IS_BUFFER_LIST (buffer_list), FALSE);
+
+ priv = trans->priv;
+
+ if (priv->send_rtcp_list) {
+ res =
+ priv->send_rtcp_list (buffer_list, priv->transport->interleaved.max,
+ priv->list_user_data);
+ } else if (priv->send_rtcp) {
+ guint n = gst_buffer_list_length (buffer_list), i;
+
+ for (i = 0; i < n; i++) {
+ GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
+
+ res =
+ priv->send_rtcp (buffer, priv->transport->interleaved.max,
+ priv->user_data);
+ if (!res)
+ break;
+ }
+ }
+
+ if (res)
+ gst_rtsp_stream_transport_keep_alive (trans);
+
+ return res;
+}
+
+/**
* gst_rtsp_stream_transport_keep_alive:
* @trans: a #GstRTSPStreamTransport
*
* Returns: %TRUE on success
*/
typedef gboolean (*GstRTSPSendFunc) (GstBuffer *buffer, guint8 channel, gpointer user_data);
+
+/**
+ * GstRTSPSendListFunc:
+ * @buffer_list: a #GstBufferList
+ * @channel: a channel
+ * @user_data: user data
+ *
+ * Function registered with gst_rtsp_stream_transport_set_callbacks() and
+ * called when @buffer_list must be sent on @channel.
+ *
+ * Returns: %TRUE on success
+ *
+ * Since: 1.16
+ */
+typedef gboolean (*GstRTSPSendListFunc) (GstBufferList *buffer_list, guint8 channel, gpointer user_data);
+
/**
* GstRTSPKeepAliveFunc:
* @user_data: user data
GDestroyNotify notify);
GST_RTSP_SERVER_API
+void gst_rtsp_stream_transport_set_list_callbacks (GstRTSPStreamTransport *trans,
+ GstRTSPSendListFunc send_rtp_list,
+ GstRTSPSendListFunc send_rtcp_list,
+ gpointer user_data,
+ GDestroyNotify notify);
+
+GST_RTSP_SERVER_API
void gst_rtsp_stream_transport_set_keepalive (GstRTSPStreamTransport *trans,
GstRTSPKeepAliveFunc keep_alive,
gpointer user_data,
GstBuffer *buffer);
GST_RTSP_SERVER_API
+gboolean gst_rtsp_stream_transport_send_rtp_list (GstRTSPStreamTransport *trans,
+ GstBufferList *buffer_list);
+
+GST_RTSP_SERVER_API
+gboolean gst_rtsp_stream_transport_send_rtcp_list(GstRTSPStreamTransport *trans,
+ GstBufferList *buffer_list);
+
+GST_RTSP_SERVER_API
GstFlowReturn gst_rtsp_stream_transport_recv_data (GstRTSPStreamTransport *trans,
guint channel, GstBuffer *buffer);
GList *walk;
GstSample *sample;
GstBuffer *buffer;
+ GstBufferList *buffer_list;
+ guint n_messages = 0;
gboolean is_rtp;
if (priv->n_outstanding > 0 || !priv->have_buffer[idx]) {
}
buffer = gst_sample_get_buffer (sample);
+ buffer_list = gst_sample_get_buffer_list (sample);
+
+ /* We will get one message-sent notification per message,
+ * i.e. per buffer that is actually sent out */
+ if (buffer)
+ n_messages += 1;
+ if (buffer_list)
+ n_messages += gst_buffer_list_length (buffer_list);
is_rtp = (idx == 0);
}
}
- priv->n_outstanding += priv->n_tcp_transports;
+ priv->n_outstanding += n_messages * priv->n_tcp_transports;
g_mutex_unlock (&priv->lock);
if (is_rtp) {
for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
- if (!gst_rtsp_stream_transport_send_rtp (tr, buffer)) {
+ gboolean send_ret = TRUE;
+
+ if (buffer)
+ send_ret = gst_rtsp_stream_transport_send_rtp (tr, buffer);
+ if (buffer_list)
+ send_ret = gst_rtsp_stream_transport_send_rtp_list (tr, buffer_list);
+
+ if (!send_ret) {
/* remove transport on send error */
g_mutex_lock (&priv->lock);
- priv->n_outstanding--;
+ priv->n_outstanding -= n_messages;
update_transport (stream, tr, FALSE);
g_mutex_unlock (&priv->lock);
}
} else {
for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
- if (!gst_rtsp_stream_transport_send_rtcp (tr, buffer)) {
+ gboolean send_ret = TRUE;
+
+ if (buffer)
+ send_ret = gst_rtsp_stream_transport_send_rtcp (tr, buffer);
+ if (buffer_list)
+ send_ret = gst_rtsp_stream_transport_send_rtcp_list (tr, buffer_list);
+
+ if (!send_ret) {
/* remove transport on send error */
g_mutex_lock (&priv->lock);
- priv->n_outstanding--;
+ priv->n_outstanding -= n_messages;
update_transport (stream, tr, FALSE);
g_mutex_unlock (&priv->lock);
}
} else if (is_tcp && !priv->appsink[i]) {
/* make appsink */
priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
- g_object_set (priv->appsink[i], "emit-signals", FALSE, "max-buffers", 1,
- NULL);
+ g_object_set (priv->appsink[i], "emit-signals", FALSE, "buffer-list",
+ TRUE, "max-buffers", 1, NULL);
/* we need to set sync and preroll to FALSE for the sink to avoid
* deadlock. This is only needed for sink sending RTCP data. */
return res == GST_RTSP_OK;
}
+static gboolean
+do_send_data_list (GstBufferList * buffer_list, guint8 channel,
+ GstRTSPStreamContext * context)
+{
+ gboolean ret = TRUE;
+ guint i, n = gst_buffer_list_length (buffer_list);
+
+ /* TODO: Needs support for a) queueing up multiple messages on the
+ * GstRTSPWatch in do_send_data() above and b) for one message having a body
+ * consisting of multiple parts here */
+ for (i = 0; i < n; i++) {
+ GstBuffer *buffer = gst_buffer_list_get (buffer_list, i);
+
+ ret = do_send_data (buffer, channel, context);
+ if (!ret)
+ break;
+ }
+
+ return ret;
+}
+
static GstRTSPResult
gst_rtsp_client_sink_setup_streams (GstRTSPClientSink * sink, gboolean async)
{
gst_rtsp_stream_transport_set_callbacks (context->stream_transport,
(GstRTSPSendFunc) do_send_data,
(GstRTSPSendFunc) do_send_data, context, NULL);
+ gst_rtsp_stream_transport_set_list_callbacks
+ (context->stream_transport,
+ (GstRTSPSendListFunc) do_send_data_list,
+ (GstRTSPSendListFunc) do_send_data_list, context, NULL);
}
/* The stream_transport now owns the transport */