GstRTSPConnection *connection;
GstRTSPWatch *watch;
GMainContext *watch_context;
- guint close_seq;
gchar *server_ip;
gboolean is_ipv6;
- GstRTSPClientSendFunc send_func; /* protected by send_lock */
- gpointer send_data; /* protected by send_lock */
- GDestroyNotify send_notify; /* protected by send_lock */
+ /* protected by send_lock */
+ GstRTSPClientSendFunc send_func;
+ gpointer send_data;
+ GDestroyNotify send_notify;
+ guint close_seq;
+ GArray *data_seqs;
GstRTSPSessionPool *session_pool;
gulong session_removed_id;
GstRTSPTunnelState tstate;
};
+typedef struct
+{
+ guint8 channel;
+ guint seq;
+} DataSeq;
+
static GMutex tunnels_lock;
static GHashTable *tunnels; /* protected by tunnels_lock */
-/* FIXME make this configurable. We don't want to do this yet because it will
- * be superceeded by a cache object later */
#define WATCH_BACKLOG_SIZE 100
#define DEFAULT_SESSION_POOL NULL
g_mutex_init (&priv->send_lock);
g_mutex_init (&priv->watch_lock);
priv->close_seq = 0;
+ priv->data_seqs = g_array_new (FALSE, FALSE, sizeof (DataSeq));
priv->drop_backlog = DEFAULT_DROP_BACKLOG;
priv->transports =
g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
g_assert (priv->sessions == NULL);
g_assert (priv->session_removed_id == 0);
+ g_array_unref (priv->data_seqs);
g_hash_table_unref (priv->transports);
g_hash_table_unref (priv->pipelined_requests);
}
}
+static inline DataSeq *
+get_data_seq_element (GstRTSPClient * client, guint8 channel)
+{
+ GstRTSPClientPrivate *priv = client->priv;
+ GArray *data_seqs = priv->data_seqs;
+ gint i = 0;
+
+ while (i < data_seqs->len) {
+ DataSeq *data_seq = &g_array_index (data_seqs, DataSeq, i);
+ if (data_seq->channel == channel)
+ return data_seq;
+ i++;
+ }
+
+ return NULL;
+}
+
+static void
+add_data_seq (GstRTSPClient * client, guint8 channel)
+{
+ GstRTSPClientPrivate *priv = client->priv;
+ DataSeq data_seq = {.channel = channel,.seq = 0 };
+
+ if (get_data_seq_element (client, channel) == NULL)
+ g_array_append_val (priv->data_seqs, data_seq);
+}
+
+static void
+set_data_seq (GstRTSPClient * client, guint8 channel, guint seq)
+{
+ DataSeq *data_seq;
+
+ data_seq = get_data_seq_element (client, channel);
+ g_assert_nonnull (data_seq);
+ data_seq->seq = seq;
+}
+
+static guint
+get_data_seq (GstRTSPClient * client, guint8 channel)
+{
+ DataSeq *data_seq;
+
+ data_seq = get_data_seq_element (client, channel);
+ g_assert_nonnull (data_seq);
+ return data_seq->seq;
+}
+
+static gboolean
+get_data_channel (GstRTSPClient * client, guint seq, guint8 * channel)
+{
+ GstRTSPClientPrivate *priv = client->priv;
+ GArray *data_seqs = priv->data_seqs;
+ gint i = 0;
+
+ while (i < data_seqs->len) {
+ DataSeq *data_seq = &g_array_index (data_seqs, DataSeq, i);
+ if (data_seq->seq == seq) {
+ *channel = data_seq->channel;
+ return TRUE;
+ }
+ i++;
+ }
+
+ return FALSE;
+}
+
+static gboolean
+do_close (gpointer user_data)
+{
+ GstRTSPClient *client = user_data;
+
+ gst_rtsp_client_close (client);
+
+ return G_SOURCE_REMOVE;
+}
+
static gboolean
do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client)
{
gst_rtsp_message_take_body (&message, map_info.data, map_info.size);
g_mutex_lock (&priv->send_lock);
+ if (get_data_seq (client, channel) != 0) {
+ GST_WARNING ("already a queued data message for channel %d", channel);
+ g_mutex_unlock (&priv->send_lock);
+ return FALSE;
+ }
if (priv->send_func)
ret = priv->send_func (client, &message, FALSE, priv->send_data);
g_mutex_unlock (&priv->send_lock);
gst_rtsp_message_unset (&message);
+ if (!ret) {
+ GSource *idle_src;
+
+ /* close in watch context */
+ idle_src = g_idle_source_new ();
+ g_source_set_callback (idle_src, do_close, client, NULL);
+ g_source_attach (idle_src, priv->watch_context);
+ g_source_unref (idle_src);
+ }
+
return ret;
}
g_hash_table_insert (priv->transports,
GINT_TO_POINTER (ct->interleaved.max), trans);
g_object_ref (trans);
+ add_data_seq (client, ct->interleaved.min);
+ add_data_seq (client, ct->interleaved.max);
}
/* create and serialize the server transport */
gboolean close, gpointer user_data)
{
GstRTSPClientPrivate *priv = client->priv;
+ guint id = 0;
GstRTSPResult ret;
- GTimeVal time;
- time.tv_sec = 1;
- time.tv_usec = 0;
+ /* send the message */
+ ret = gst_rtsp_watch_send_message (priv->watch, message, &id);
+ if (ret != GST_RTSP_OK)
+ goto error;
- do {
- /* send the response and store the seq number so we can wait until it's
- * written to the client to close the connection */
- ret =
- gst_rtsp_watch_send_message (priv->watch, message,
- close ? &priv->close_seq : NULL);
- if (ret == GST_RTSP_OK)
- break;
+ /* if close flag is set, store the seq number so we can wait until it's
+ * written to the client to close the connection */
+ if (close)
+ priv->close_seq = id;
- if (ret != GST_RTSP_ENOMEM)
- goto error;
+ if (gst_rtsp_message_get_type (message) == GST_RTSP_MESSAGE_DATA) {
+ guint8 channel = 0;
+ GstRTSPResult r;
- /* drop backlog */
- if (priv->drop_backlog)
- break;
+ r = gst_rtsp_message_parse_data (message, &channel);
+ if (r != GST_RTSP_OK) {
+ ret = r;
+ goto error;
+ }
- /* queue was full, wait for more space */
- GST_DEBUG_OBJECT (client, "waiting for backlog");
- ret = gst_rtsp_watch_wait_backlog (priv->watch, &time);
- GST_DEBUG_OBJECT (client, "Resend due to backlog full");
- } while (ret != GST_RTSP_EINTR);
+ /* check if the message has been queued for transmission in watch */
+ if (id) {
+ /* store the seq number so we can wait until it has been sent */
+ GST_DEBUG_OBJECT (client, "wait for message %d, channel %d", id, channel);
+ set_data_seq (client, channel, id);
+ } else {
+ GstRTSPStreamTransport *trans;
+
+ trans =
+ g_hash_table_lookup (priv->transports,
+ GINT_TO_POINTER ((gint) channel));
+ if (trans) {
+ GST_DEBUG_OBJECT (client, "emit 'message-sent' signal");
+ g_mutex_unlock (&priv->send_lock);
+ gst_rtsp_stream_transport_message_sent (trans);
+ g_mutex_lock (&priv->send_lock);
+ }
+ }
+ }
return ret == GST_RTSP_OK;
error:
{
GST_DEBUG_OBJECT (client, "got error %d", ret);
- return ret == GST_RTSP_OK;
+ return FALSE;
}
}
{
GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
GstRTSPClientPrivate *priv = client->priv;
+ GstRTSPStreamTransport *trans = NULL;
+ guint8 channel = 0;
+ gboolean close = FALSE;
+
+ g_mutex_lock (&priv->send_lock);
+
+ if (get_data_channel (client, cseq, &channel)) {
+ trans = g_hash_table_lookup (priv->transports, GINT_TO_POINTER (channel));
+ set_data_seq (client, channel, 0);
+ }
if (priv->close_seq && priv->close_seq == cseq) {
GST_INFO ("client %p: send close message", client);
+ close = TRUE;
priv->close_seq = 0;
- gst_rtsp_client_close (client);
}
+ g_mutex_unlock (&priv->send_lock);
+
+ if (trans) {
+ GST_DEBUG_OBJECT (client, "emit 'message-sent' signal");
+ gst_rtsp_stream_transport_message_sent (trans);
+ }
+
+ if (close)
+ gst_rtsp_client_close (client);
+
return GST_RTSP_OK;
}
* is received from the client. It will also call
* gst_rtsp_stream_transport_set_timed_out() when a receiver has timed out.
*
+ * A #GstRTSPClient will call gst_rtsp_stream_transport_message_sent() when it
+ * has sent a data message for the transport.
+ *
* Last reviewed on 2013-07-16 (1.0.0)
*/
gboolean active;
gboolean timed_out;
+ GstRTSPMessageSentFunc message_sent;
+ gpointer ms_user_data;
+ GDestroyNotify ms_notify;
+
GstRTSPTransport *transport;
GstRTSPUrl *url;
/* remove callbacks now */
gst_rtsp_stream_transport_set_callbacks (trans, NULL, NULL, NULL, NULL);
gst_rtsp_stream_transport_set_keepalive (trans, NULL, NULL, NULL);
+ gst_rtsp_stream_transport_set_message_sent (trans, NULL, NULL, NULL);
if (priv->stream)
g_object_unref (priv->stream);
priv->ka_notify = notify;
}
+/**
+ * gst_rtsp_stream_transport_set_message_sent:
+ * @trans: a #GstRTSPStreamTransport
+ * @message_sent: (scope notified): a callback called when a message has been sent
+ * @user_data: (closure): user data passed to callback
+ * @notify: (allow-none): called with the user_data when no longer needed
+ *
+ * Install a callback that will be called when a message has been sent on @trans.
+ */
+void
+gst_rtsp_stream_transport_set_message_sent (GstRTSPStreamTransport * trans,
+ GstRTSPMessageSentFunc message_sent, gpointer user_data,
+ GDestroyNotify notify)
+{
+ GstRTSPStreamTransportPrivate *priv;
+
+ g_return_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans));
+
+ priv = trans->priv;
+
+ priv->message_sent = message_sent;
+ if (priv->ms_notify)
+ priv->ms_notify (priv->ms_user_data);
+ priv->ms_user_data = user_data;
+ priv->ms_notify = notify;
+}
+
/**
* gst_rtsp_stream_transport_set_transport:
}
/**
+ * gst_rtsp_stream_transport_message_sent:
+ * @trans: a #GstRTSPStreamTransport
+ *
+ * Signal the installed message_sent callback for @trans.
+ */
+void
+gst_rtsp_stream_transport_message_sent (GstRTSPStreamTransport * trans)
+{
+ GstRTSPStreamTransportPrivate *priv;
+
+ priv = trans->priv;
+
+ if (priv->message_sent)
+ priv->message_sent (priv->ms_user_data);
+}
+
+/**
* gst_rtsp_stream_transport_recv_data:
* @trans: a #GstRTSPStreamTransport
* @channel: a channel
typedef void (*GstRTSPKeepAliveFunc) (gpointer user_data);
/**
+ * GstRTSPMessageSentFunc:
+ * @user_data: user data
+ *
+ * Function registered with gst_rtsp_stream_transport_set_message_sent()
+ * and called when a message has been sent on the transport.
+ */
+typedef void (*GstRTSPMessageSentFunc) (gpointer user_data);
+
+/**
* GstRTSPStreamTransport:
* @parent: parent instance
*
GDestroyNotify notify);
GST_RTSP_SERVER_API
+void gst_rtsp_stream_transport_set_message_sent (GstRTSPStreamTransport *trans,
+ GstRTSPMessageSentFunc message_sent,
+ gpointer user_data,
+ GDestroyNotify notify);
+
+GST_RTSP_SERVER_API
void gst_rtsp_stream_transport_keep_alive (GstRTSPStreamTransport *trans);
GST_RTSP_SERVER_API
+void gst_rtsp_stream_transport_message_sent (GstRTSPStreamTransport *trans);
+
+GST_RTSP_SERVER_API
gboolean gst_rtsp_stream_transport_set_active (GstRTSPStreamTransport *trans,
gboolean active);
GList *tr_cache_rtcp;
guint tr_cache_cookie_rtp;
guint tr_cache_cookie_rtcp;
+ guint n_tcp_transports;
+ gboolean have_buffer[2];
+ guint n_outstanding;
gint dscp_qos;
static void gst_rtsp_stream_finalize (GObject * obj);
+static gboolean
+update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
+ gboolean add);
+
static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
G_DEFINE_TYPE_WITH_PRIVATE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
}
}
-static GstFlowReturn
-handle_new_sample (GstAppSink * sink, gpointer user_data)
+static void
+send_tcp_message (GstRTSPStream * stream, gint idx)
{
- GstRTSPStreamPrivate *priv;
+ GstRTSPStreamPrivate *priv = stream->priv;
+ GstAppSink *sink;
GList *walk;
GstSample *sample;
GstBuffer *buffer;
- GstRTSPStream *stream;
gboolean is_rtp;
+ g_mutex_lock (&priv->lock);
+
+ if (priv->n_outstanding > 0 || !priv->have_buffer[idx]) {
+ g_mutex_unlock (&priv->lock);
+ return;
+ }
+
+ priv->have_buffer[idx] = FALSE;
+
+ if (priv->appsink[idx] == NULL) {
+ /* session expired */
+ g_mutex_unlock (&priv->lock);
+ return;
+ }
+
+ sink = GST_APP_SINK (priv->appsink[idx]);
sample = gst_app_sink_pull_sample (sink);
- if (!sample)
- return GST_FLOW_OK;
+ if (!sample) {
+ g_mutex_unlock (&priv->lock);
+ return;
+ }
- stream = (GstRTSPStream *) user_data;
- priv = stream->priv;
buffer = gst_sample_get_buffer (sample);
- is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
+ is_rtp = (idx == 0);
- g_mutex_lock (&priv->lock);
if (is_rtp) {
if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
clear_tr_cache (priv, is_rtp);
for (walk = priv->transports; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
+ const GstRTSPTransport *t = gst_rtsp_stream_transport_get_transport (tr);
+
+ if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP)
+ continue;
+
priv->tr_cache_rtp =
g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
}
clear_tr_cache (priv, is_rtp);
for (walk = priv->transports; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
+ const GstRTSPTransport *t = gst_rtsp_stream_transport_get_transport (tr);
+
+ if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP)
+ continue;
+
priv->tr_cache_rtcp =
g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
}
priv->tr_cache_cookie_rtcp = priv->transports_cookie;
}
}
+
+ priv->n_outstanding += priv->n_tcp_transports;
+
g_mutex_unlock (&priv->lock);
if (is_rtp) {
for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
- gst_rtsp_stream_transport_send_rtp (tr, buffer);
+ if (!gst_rtsp_stream_transport_send_rtp (tr, buffer)) {
+ /* remove transport on send error */
+ g_mutex_lock (&priv->lock);
+ priv->n_outstanding--;
+ update_transport (stream, tr, FALSE);
+ g_mutex_unlock (&priv->lock);
+ }
}
} else {
for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
- gst_rtsp_stream_transport_send_rtcp (tr, buffer);
+ if (!gst_rtsp_stream_transport_send_rtcp (tr, buffer)) {
+ /* remove transport on send error */
+ g_mutex_lock (&priv->lock);
+ priv->n_outstanding--;
+ update_transport (stream, tr, FALSE);
+ g_mutex_unlock (&priv->lock);
+ }
}
}
gst_sample_unref (sample);
+}
+
+static GstFlowReturn
+handle_new_sample (GstAppSink * sink, gpointer user_data)
+{
+ GstRTSPStream *stream = user_data;
+ GstRTSPStreamPrivate *priv = stream->priv;
+ int i;
+ int idx = -1;
+
+ g_mutex_lock (&priv->lock);
+
+ for (i = 0; i < 2; i++)
+ if (GST_ELEMENT_CAST (sink) == priv->appsink[i]) {
+ priv->have_buffer[i] = TRUE;
+ if (priv->n_outstanding == 0) {
+ /* send message */
+ idx = i;
+ }
+ break;
+ }
+
+ g_mutex_unlock (&priv->lock);
+
+ if (idx != -1)
+ send_tcp_message (stream, idx);
return GST_FLOW_OK;
}
} else if (is_tcp && !priv->appsink[i]) {
/* make appsink */
priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
- g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
+ g_object_set (priv->appsink[i], "emit-signals", FALSE, "max-buffers", 1,
+ NULL);
/* we need to set sync and preroll to FALSE for the sink to avoid
* deadlock. This is only needed for sink sending RTCP data. */
if (add) {
GST_INFO ("adding TCP %s", tr->destination);
priv->transports = g_list_prepend (priv->transports, trans);
+ priv->n_tcp_transports++;
} else {
GST_INFO ("removing TCP %s", tr->destination);
priv->transports = g_list_remove (priv->transports, trans);
+ priv->n_tcp_transports--;
}
priv->transports_cookie++;
break;
}
}
+static void
+on_message_sent (gpointer user_data)
+{
+ GstRTSPStream *stream = user_data;
+ GstRTSPStreamPrivate *priv = stream->priv;
+ gint idx = -1;
+
+ GST_DEBUG_OBJECT (stream, "message send complete");
+
+ g_mutex_lock (&priv->lock);
+
+ g_assert (priv->n_outstanding >= 0);
+
+ if (priv->n_outstanding == 0)
+ goto no_outstanding;
+
+ priv->n_outstanding--;
+ if (priv->n_outstanding == 0) {
+ gint i;
+
+ /* iterate from 1 and down, so we prioritize RTCP over RTP */
+ for (i = 1; i >= 0; i--) {
+ if (priv->have_buffer[i]) {
+ /* send message */
+ idx = i;
+ break;
+ }
+ }
+ }
+
+ g_mutex_unlock (&priv->lock);
+
+ if (idx != -1)
+ send_tcp_message (stream, idx);
+
+ return;
+
+ /* ERRORS */
+no_outstanding:
+ {
+ GST_INFO ("no outstanding messages");
+ g_mutex_unlock (&priv->lock);
+ return;
+ }
+}
/**
* gst_rtsp_stream_add_transport:
g_mutex_lock (&priv->lock);
res = update_transport (stream, trans, TRUE);
+ if (res)
+ gst_rtsp_stream_transport_set_message_sent (trans, on_message_sent, stream,
+ NULL);
g_mutex_unlock (&priv->lock);
return res;