#include <gst/app/gstappsrc.h>
#include <gst/app/gstappsink.h>
+#include "rtsp-funnel.h"
#include "rtsp-media.h"
#define DEFAULT_SHARED FALSE
#define DEFAULT_PROTOCOLS GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_TCP
//#define DEFAULT_PROTOCOLS GST_RTSP_LOWER_TRANS_UDP_MCAST
#define DEFAULT_EOS_SHUTDOWN FALSE
+#define DEFAULT_BUFFER_SIZE 0x80000
/* define to dump received RTCP packets */
#undef DUMP_STATS
PROP_REUSABLE,
PROP_PROTOCOLS,
PROP_EOS_SHUTDOWN,
+ PROP_BUFFER_SIZE,
PROP_LAST
};
enum
{
+ SIGNAL_PREPARED,
SIGNAL_UNPREPARED,
+ SIGNAL_NEW_STATE,
SIGNAL_LAST
};
-GST_DEBUG_CATEGORY_EXTERN (rtsp_media_debug);
+GST_DEBUG_CATEGORY_STATIC (rtsp_media_debug);
#define GST_CAT_DEFAULT rtsp_media_debug
static GQuark ssrc_stream_map_key;
"Send an EOS event to the pipeline before unpreparing",
DEFAULT_EOS_SHUTDOWN, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_BUFFER_SIZE,
+ g_param_spec_uint ("buffer-size", "Buffer Size",
+ "The kernel UDP buffer size to use", 0, G_MAXUINT,
+ DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ gst_rtsp_media_signals[SIGNAL_PREPARED] =
+ g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
+ G_STRUCT_OFFSET (GstRTSPMediaClass, prepared), NULL, NULL,
+ g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
+
gst_rtsp_media_signals[SIGNAL_UNPREPARED] =
g_signal_new ("unprepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstRTSPMediaClass, unprepared), NULL, NULL,
g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
+ gst_rtsp_media_signals[SIGNAL_NEW_STATE] =
+ g_signal_new ("new-state", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
+ G_STRUCT_OFFSET (GstRTSPMediaClass, new_state), NULL, NULL,
+ g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 0, G_TYPE_INT);
+
klass->context = g_main_context_new ();
klass->loop = g_main_loop_new (klass->context, TRUE);
+ GST_DEBUG_CATEGORY_INIT (rtsp_media_debug, "rtspmedia", 0, "GstRTSPMedia");
+
klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
if (error != NULL) {
g_critical ("could not start bus thread: %s", error->message);
klass->unprepare = default_unprepare;
ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
+
+ gst_element_register (NULL, "rtspfunnel", GST_RANK_NONE, RTSP_TYPE_FUNNEL);
+
}
static void
media->reusable = DEFAULT_REUSABLE;
media->protocols = DEFAULT_PROTOCOLS;
media->eos_shutdown = DEFAULT_EOS_SHUTDOWN;
-}
-
-/* FIXME. this should be done in multiudpsink */
-typedef struct
-{
- gint count;
- gchar *dest;
- gint min, max;
-} RTSPDestination;
-
-static gint
-dest_compare (RTSPDestination * a, RTSPDestination * b)
-{
- if ((a->min == b->min) && (a->max == b->max)
- && (strcmp (a->dest, b->dest) == 0))
- return 0;
-
- return 1;
-}
-
-static RTSPDestination *
-create_destination (const gchar * dest, gint min, gint max)
-{
- RTSPDestination *res;
-
- res = g_slice_new (RTSPDestination);
- res->count = 1;
- res->dest = g_strdup (dest);
- res->min = min;
- res->max = max;
-
- return res;
-}
-
-static void
-free_destination (RTSPDestination * dest)
-{
- g_free (dest->dest);
- g_slice_free (RTSPDestination, dest);
+ media->buffer_size = DEFAULT_BUFFER_SIZE;
}
void
g_list_free (stream->transports);
- g_list_foreach (stream->destinations, (GFunc) free_destination, NULL);
- g_list_free (stream->destinations);
-
g_free (stream);
}
case PROP_EOS_SHUTDOWN:
g_value_set_boolean (value, gst_rtsp_media_is_eos_shutdown (media));
break;
+ case PROP_BUFFER_SIZE:
+ g_value_set_uint (value, gst_rtsp_media_get_buffer_size (media));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
}
case PROP_EOS_SHUTDOWN:
gst_rtsp_media_set_eos_shutdown (media, g_value_get_boolean (value));
break;
+ case PROP_BUFFER_SIZE:
+ gst_rtsp_media_set_buffer_size (media, g_value_get_uint (value));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
}
GST_INFO ("stats: position %" GST_TIME_FORMAT ", duration %"
GST_TIME_FORMAT, GST_TIME_ARGS (position), GST_TIME_ARGS (duration));
- if (position == -1 || media->active > 0) {
+ if (position == -1) {
media->range.min.type = GST_RTSP_TIME_NOW;
media->range.min.seconds = -1;
} else {
}
/**
+ * gst_rtsp_media_set_buffer_size:
+ * @media: a #GstRTSPMedia
+ * @size: the new value
+ *
+ * Set the kernel UDP buffer size.
+ */
+void
+gst_rtsp_media_set_buffer_size (GstRTSPMedia * media, guint size)
+{
+ g_return_if_fail (GST_IS_RTSP_MEDIA (media));
+
+ media->buffer_size = size;
+}
+
+/**
+ * gst_rtsp_media_get_buffer_size:
+ * @media: a #GstRTSPMedia
+ *
+ * Get the kernel UDP buffer size.
+ *
+ * Returns: the kernel UDP buffer size.
+ */
+guint
+gst_rtsp_media_get_buffer_size (GstRTSPMedia * media)
+{
+ g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
+
+ return media->buffer_size;
+}
+
+/**
+ * gst_rtsp_media_set_auth:
+ * @media: a #GstRTSPMedia
+ * @auth: a #GstRTSPAuth
+ *
+ * configure @auth to be used as the authentication manager of @media.
+ */
+void
+gst_rtsp_media_set_auth (GstRTSPMedia * media, GstRTSPAuth * auth)
+{
+ GstRTSPAuth *old;
+
+ g_return_if_fail (GST_IS_RTSP_MEDIA (media));
+
+ old = media->auth;
+
+ if (old != auth) {
+ if (auth)
+ g_object_ref (auth);
+ media->auth = auth;
+ if (old)
+ g_object_unref (old);
+ }
+}
+
+/**
+ * gst_rtsp_media_get_auth:
+ * @media: a #GstRTSPMedia
+ *
+ * Get the #GstRTSPAuth used as the authentication manager of @media.
+ *
+ * Returns: the #GstRTSPAuth of @media. g_object_unref() after
+ * usage.
+ */
+GstRTSPAuth *
+gst_rtsp_media_get_auth (GstRTSPMedia * media)
+{
+ GstRTSPAuth *result;
+
+ g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
+
+ if ((result = media->auth))
+ g_object_ref (result);
+
+ return result;
+}
+
+
+/**
* gst_rtsp_media_n_streams:
* @media: a #GstRTSPMedia
*
}
/**
+ * gst_rtsp_media_get_range_string:
+ * @media: a #GstRTSPMedia
+ * @play: for the PLAY request
+ *
+ * Get the current range as a string.
+ *
+ * Returns: The range as a string, g_free() after usage.
+ */
+gchar *
+gst_rtsp_media_get_range_string (GstRTSPMedia * media, gboolean play)
+{
+ gchar *result;
+ GstRTSPTimeRange range;
+
+ /* make copy */
+ range = media->range;
+
+ if (!play && media->active > 0) {
+ range.min.type = GST_RTSP_TIME_NOW;
+ range.min.seconds = -1;
+ }
+
+ result = gst_rtsp_range_to_string (&range);
+
+ return result;
+}
+
+/**
* gst_rtsp_media_seek:
* @media: a #GstRTSPMedia
* @range: a #GstRTSPTimeRange
"send-duplicates")) {
g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
- stream->filter_duplicates = FALSE;
} else {
- GST_WARNING ("multiudpsink version found without send-duplicates property");
- stream->filter_duplicates = TRUE;
+ g_warning
+ ("old multiudpsink version found without send-duplicates property");
+ }
+
+ if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
+ "buffer-size")) {
+ g_object_set (G_OBJECT (udpsink0), "buffer-size", media->buffer_size, NULL);
+ } else {
+ GST_WARNING ("multiudpsink version found without buffer-size property");
}
g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL);
return GST_FLOW_OK;
}
+static GstFlowReturn
+handle_new_buffer_list (GstAppSink * sink, gpointer user_data)
+{
+ GList *walk;
+ GstBufferList *blist;
+ GstRTSPMediaStream *stream;
+
+ blist = gst_app_sink_pull_buffer_list (sink);
+ if (!blist)
+ return GST_FLOW_OK;
+
+ stream = (GstRTSPMediaStream *) user_data;
+
+ for (walk = stream->transports; walk; walk = g_list_next (walk)) {
+ GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
+
+ if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
+ if (tr->send_rtp_list)
+ tr->send_rtp_list (blist, tr->transport->interleaved.min,
+ tr->user_data);
+ } else {
+ if (tr->send_rtcp_list)
+ tr->send_rtcp_list (blist, tr->transport->interleaved.max,
+ tr->user_data);
+ }
+ }
+ gst_buffer_list_unref (blist);
+
+ return GST_FLOW_OK;
+}
+
static GstAppSinkCallbacks sink_cb = {
NULL, /* not interested in EOS */
NULL, /* not interested in preroll buffers */
- handle_new_buffer
+ handle_new_buffer,
+ handle_new_buffer_list
};
/* prepare the pipeline objects to handle @stream in @media */
gst_object_unref (teepad);
/* make selector for the RTP receivers */
- stream->selector[0] = gst_element_factory_make ("input-selector", NULL);
- g_object_set (stream->selector[0], "select-all", TRUE, NULL);
+ stream->selector[0] = gst_element_factory_make ("rtspfunnel", NULL);
gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]);
pad = gst_element_get_static_pad (stream->selector[0], "src");
gst_object_unref (selpad);
/* make selector for the RTCP receivers */
- stream->selector[1] = gst_element_factory_make ("input-selector", NULL);
- g_object_set (stream->selector[1], "select-all", TRUE, NULL);
+ stream->selector[1] = gst_element_factory_make ("rtspfunnel", NULL);
gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]);
pad = gst_element_get_static_pad (stream->selector[1], "src");
if (status == GST_RTSP_MEDIA_STATUS_ERROR)
goto state_failed;
+ g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_PREPARED], 0, NULL);
+
GST_INFO ("object %p is prerolled", media);
return TRUE;
add_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
gchar * dest, gint min, gint max)
{
- gboolean do_add = TRUE;
- RTSPDestination *ndest;
-
- if (stream->filter_duplicates) {
- RTSPDestination fdest;
- GList *find;
-
- fdest.dest = dest;
- fdest.min = min;
- fdest.max = max;
-
- /* first see if we already added this destination */
- find =
- g_list_find_custom (stream->destinations, &fdest,
- (GCompareFunc) dest_compare);
- if (find) {
- ndest = (RTSPDestination *) find->data;
-
- GST_INFO ("already streaming to %s:%d-%d with %d clients", dest, min, max,
- ndest->count);
- ndest->count++;
- do_add = FALSE;
- }
- }
-
- if (do_add) {
- GST_INFO ("adding %s:%d-%d", dest, min, max);
- g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
- g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
-
- if (stream->filter_duplicates) {
- ndest = create_destination (dest, min, max);
- stream->destinations = g_list_prepend (stream->destinations, ndest);
- }
- }
+ GST_INFO ("adding %s:%d-%d", dest, min, max);
+ g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
+ g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
}
static void
remove_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream,
gchar * dest, gint min, gint max)
{
- gboolean do_remove = TRUE;
- RTSPDestination *ndest = NULL;
- GList *find = NULL;
-
- if (stream->filter_duplicates) {
- RTSPDestination fdest;
-
- fdest.dest = dest;
- fdest.min = min;
- fdest.max = max;
-
- /* first see if we already added this destination */
- find =
- g_list_find_custom (stream->destinations, &fdest,
- (GCompareFunc) dest_compare);
- if (!find)
- return;
-
- ndest = (RTSPDestination *) find->data;
- if (--ndest->count > 0) {
- do_remove = FALSE;
- GST_INFO ("still streaming to %s:%d-%d with %d clients", dest, min, max,
- ndest->count);
- }
- }
-
- if (do_remove) {
- GST_INFO ("removing %s:%d-%d", dest, min, max);
- g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
- g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
-
- if (stream->filter_duplicates) {
- stream->destinations = g_list_delete_link (stream->destinations, find);
- free_destination (ndest);
- }
- }
+ GST_INFO ("removing %s:%d-%d", dest, min, max);
+ g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
+ g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
}
/**
else
do_state = FALSE;
- GST_INFO ("active %d media %p", media->active, media);
+ GST_INFO ("state %d active %d media %p do_state %d", state, media->active,
+ media, do_state);
- if (do_state && media->target_state != state) {
- if (state == GST_STATE_NULL) {
- gst_rtsp_media_unprepare (media);
- } else {
- GST_INFO ("state %s media %p", gst_element_state_get_name (state), media);
- media->target_state = state;
- ret = gst_element_set_state (media->pipeline, state);
+ if (media->target_state != state) {
+ if (do_state) {
+ if (state == GST_STATE_NULL) {
+ gst_rtsp_media_unprepare (media);
+ } else {
+ GST_INFO ("state %s media %p", gst_element_state_get_name (state),
+ media);
+ media->target_state = state;
+ ret = gst_element_set_state (media->pipeline, state);
+ }
}
+ g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_NEW_STATE], 0, state,
+ NULL);
}
/* remember where we are */