X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Frtsp-server%2Frtsp-media.c;h=d8191bbf24f638cfd22d32e100eb1c6ae5448a96;hb=01973c924d3fc850b812a7aaf1b2691a8fd01ae0;hp=38a37d710f4ad5657cc0a054f8743e5e056fc465;hpb=ec2201a3a8adee4463ac2d52194c814f48b28f20;p=platform%2Fupstream%2Fgstreamer.git diff --git a/gst/rtsp-server/rtsp-media.c b/gst/rtsp-server/rtsp-media.c index 38a37d7..d8191bb 100644 --- a/gst/rtsp-server/rtsp-media.c +++ b/gst/rtsp-server/rtsp-media.c @@ -13,8 +13,8 @@ * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. */ #include @@ -23,15 +23,14 @@ #include #include -#include "rtsp-funnel.h" #include "rtsp-media.h" -#define DEFAULT_SHARED FALSE -#define DEFAULT_REUSABLE FALSE -#define DEFAULT_PROTOCOLS GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_TCP +#define DEFAULT_SHARED FALSE +#define DEFAULT_REUSABLE FALSE +#define DEFAULT_PROTOCOLS GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_TCP //#define DEFAULT_PROTOCOLS GST_RTSP_LOWER_TRANS_UDP_MCAST -#define DEFAULT_EOS_SHUTDOWN FALSE -#define DEFAULT_BUFFER_SIZE 0x80000 +#define DEFAULT_EOS_SHUTDOWN FALSE +#define DEFAULT_BUFFER_SIZE 0x80000 /* define to dump received RTCP packets */ #undef DUMP_STATS @@ -49,6 +48,7 @@ enum enum { + SIGNAL_NEW_STREAM, SIGNAL_PREPARED, SIGNAL_UNPREPARED, SIGNAL_NEW_STATE, @@ -58,8 +58,6 @@ enum GST_DEBUG_CATEGORY_STATIC (rtsp_media_debug); #define GST_CAT_DEFAULT rtsp_media_debug -static GQuark ssrc_stream_map_key; - static void gst_rtsp_media_get_property (GObject * object, guint propid, GValue * value, GParamSpec * pspec); static void gst_rtsp_media_set_property (GObject * object, guint propid, @@ -69,8 +67,8 @@ static void gst_rtsp_media_finalize (GObject * obj); static gpointer do_loop (GstRTSPMediaClass * klass); static gboolean default_handle_message (GstRTSPMedia * media, GstMessage * message); +static void finish_unprepare (GstRTSPMedia * media); static gboolean default_unprepare (GstRTSPMedia * media); -static void unlock_streams (GstRTSPMedia * media); static guint gst_rtsp_media_signals[SIGNAL_LAST] = { 0 }; @@ -80,7 +78,6 @@ static void gst_rtsp_media_class_init (GstRTSPMediaClass * klass) { GObjectClass *gobject_class; - GError *error = NULL; gobject_class = G_OBJECT_CLASS (klass); @@ -113,6 +110,11 @@ gst_rtsp_media_class_init (GstRTSPMediaClass * klass) "The kernel UDP buffer size to use", 0, G_MAXUINT, DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gst_rtsp_media_signals[SIGNAL_NEW_STREAM] = + g_signal_new ("new-stream", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (GstRTSPMediaClass, new_stream), NULL, NULL, + g_cclosure_marshal_generic, G_TYPE_NONE, 1, GST_TYPE_RTSP_STREAM); + gst_rtsp_media_signals[SIGNAL_PREPARED] = g_signal_new ("prepared", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTSPMediaClass, prepared), NULL, NULL, @@ -133,25 +135,19 @@ gst_rtsp_media_class_init (GstRTSPMediaClass * klass) GST_DEBUG_CATEGORY_INIT (rtsp_media_debug, "rtspmedia", 0, "GstRTSPMedia"); - klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error); - if (error != NULL) { - g_critical ("could not start bus thread: %s", error->message); - } + klass->thread = g_thread_new ("Bus Thread", (GThreadFunc) do_loop, klass); + klass->handle_message = default_handle_message; klass->unprepare = default_unprepare; - - ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream"); - - gst_element_register (NULL, "rtspfunnel", GST_RANK_NONE, RTSP_TYPE_FUNNEL); - } static void gst_rtsp_media_init (GstRTSPMedia * media) { - media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *)); - media->lock = g_mutex_new (); - media->cond = g_cond_new (); + media->streams = g_ptr_array_new_with_free_func (g_object_unref); + g_mutex_init (&media->lock); + g_cond_init (&media->cond); + g_rec_mutex_init (&media->state_lock); media->shared = DEFAULT_SHARED; media->reusable = DEFAULT_REUSABLE; @@ -160,117 +156,32 @@ gst_rtsp_media_init (GstRTSPMedia * media) media->buffer_size = DEFAULT_BUFFER_SIZE; } -/* FIXME. this should be done in multiudpsink */ -typedef struct -{ - gint count; - gchar *dest; - gint min, max; -} RTSPDestination; - -static gint -dest_compare (RTSPDestination * a, RTSPDestination * b) -{ - if ((a->min == b->min) && (a->max == b->max) - && (strcmp (a->dest, b->dest) == 0)) - return 0; - - return 1; -} - -static RTSPDestination * -create_destination (const gchar * dest, gint min, gint max) -{ - RTSPDestination *res; - - res = g_slice_new (RTSPDestination); - res->count = 1; - res->dest = g_strdup (dest); - res->min = min; - res->max = max; - - return res; -} - -static void -free_destination (RTSPDestination * dest) -{ - g_free (dest->dest); - g_slice_free (RTSPDestination, dest); -} - -void -gst_rtsp_media_trans_cleanup (GstRTSPMediaTrans * trans) -{ - if (trans->transport) { - gst_rtsp_transport_free (trans->transport); - trans->transport = NULL; - } - if (trans->rtpsource) { - g_object_set_qdata (trans->rtpsource, ssrc_stream_map_key, NULL); - trans->rtpsource = NULL; - } -} - -static void -gst_rtsp_media_stream_free (GstRTSPMediaStream * stream) -{ - if (stream->session) - g_object_unref (stream->session); - - if (stream->caps) - gst_caps_unref (stream->caps); - - if (stream->send_rtp_sink) - gst_object_unref (stream->send_rtp_sink); - if (stream->send_rtp_src) - gst_object_unref (stream->send_rtp_src); - if (stream->send_rtcp_src) - gst_object_unref (stream->send_rtcp_src); - if (stream->recv_rtcp_sink) - gst_object_unref (stream->recv_rtcp_sink); - if (stream->recv_rtp_sink) - gst_object_unref (stream->recv_rtp_sink); - - g_list_free (stream->transports); - - g_free (stream); -} - static void gst_rtsp_media_finalize (GObject * obj) { GstRTSPMedia *media; - guint i; media = GST_RTSP_MEDIA (obj); GST_INFO ("finalize media %p", media); - if (media->pipeline) { - unlock_streams (media); - gst_element_set_state (media->pipeline, GST_STATE_NULL); - gst_object_unref (media->pipeline); - } - - for (i = 0; i < media->streams->len; i++) { - GstRTSPMediaStream *stream; - - stream = g_array_index (media->streams, GstRTSPMediaStream *, i); + gst_rtsp_media_unprepare (media); - gst_rtsp_media_stream_free (stream); - } - g_array_free (media->streams, TRUE); + g_ptr_array_unref (media->streams); - g_list_foreach (media->dynamic, (GFunc) gst_object_unref, NULL); - g_list_free (media->dynamic); + g_list_free_full (media->dynamic, gst_object_unref); if (media->source) { g_source_destroy (media->source); g_source_unref (media->source); } - g_mutex_free (media->lock); - g_cond_free (media->cond); + if (media->auth) + g_object_unref (media->auth); + if (media->pool) + g_object_unref (media->pool); + g_mutex_clear (&media->lock); + g_cond_clear (&media->cond); + g_rec_mutex_clear (&media->state_lock); G_OBJECT_CLASS (gst_rtsp_media_parent_class)->finalize (obj); } @@ -339,14 +250,16 @@ do_loop (GstRTSPMediaClass * klass) return NULL; } +/* must be called with state lock */ static void collect_media_stats (GstRTSPMedia * media) { - GstFormat format; gint64 position, duration; media->range.unit = GST_RTSP_RANGE_NPT; + GST_INFO ("collect media stats"); + if (media->is_live) { media->range.min.type = GST_RTSP_TIME_NOW; media->range.min.seconds = -1; @@ -354,15 +267,15 @@ collect_media_stats (GstRTSPMedia * media) media->range.max.seconds = -1; } else { /* get the position */ - format = GST_FORMAT_TIME; - if (!gst_element_query_position (media->pipeline, &format, &position)) { + if (!gst_element_query_position (media->pipeline, GST_FORMAT_TIME, + &position)) { GST_INFO ("position query failed"); position = 0; } /* get the duration */ - format = GST_FORMAT_TIME; - if (!gst_element_query_duration (media->pipeline, &format, &duration)) { + if (!gst_element_query_duration (media->pipeline, GST_FORMAT_TIME, + &duration)) { GST_INFO ("duration query failed"); duration = -1; } @@ -391,7 +304,7 @@ collect_media_stats (GstRTSPMedia * media) * gst_rtsp_media_new: * * Create a new #GstRTSPMedia instance. The #GstRTSPMedia object contains the - * element to produde RTP data for one or more related (audio/video/..) + * element to produce RTP data for one or more related (audio/video/..) * streams. * * Returns: a new #GstRTSPMedia object. @@ -420,7 +333,9 @@ gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared) { g_return_if_fail (GST_IS_RTSP_MEDIA (media)); + g_mutex_lock (&media->lock); media->shared = shared; + g_mutex_unlock (&media->lock); } /** @@ -434,9 +349,15 @@ gst_rtsp_media_set_shared (GstRTSPMedia * media, gboolean shared) gboolean gst_rtsp_media_is_shared (GstRTSPMedia * media) { + gboolean res; + g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE); - return media->shared; + g_mutex_lock (&media->lock); + res = media->shared; + g_mutex_unlock (&media->lock); + + return res; } /** @@ -452,7 +373,9 @@ gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable) { g_return_if_fail (GST_IS_RTSP_MEDIA (media)); + g_mutex_lock (&media->lock); media->reusable = reusable; + g_mutex_unlock (&media->lock); } /** @@ -466,9 +389,15 @@ gst_rtsp_media_set_reusable (GstRTSPMedia * media, gboolean reusable) gboolean gst_rtsp_media_is_reusable (GstRTSPMedia * media) { + gboolean res; + g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE); - return media->reusable; + g_mutex_lock (&media->lock); + res = media->reusable; + g_mutex_unlock (&media->lock); + + return res; } /** @@ -483,7 +412,9 @@ gst_rtsp_media_set_protocols (GstRTSPMedia * media, GstRTSPLowerTrans protocols) { g_return_if_fail (GST_IS_RTSP_MEDIA (media)); + g_mutex_lock (&media->lock); media->protocols = protocols; + g_mutex_unlock (&media->lock); } /** @@ -497,10 +428,16 @@ gst_rtsp_media_set_protocols (GstRTSPMedia * media, GstRTSPLowerTrans protocols) GstRTSPLowerTrans gst_rtsp_media_get_protocols (GstRTSPMedia * media) { + GstRTSPLowerTrans res; + g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), GST_RTSP_LOWER_TRANS_UNKNOWN); - return media->protocols; + g_mutex_lock (&media->lock); + res = media->protocols; + g_mutex_unlock (&media->lock); + + return res; } /** @@ -516,7 +453,9 @@ gst_rtsp_media_set_eos_shutdown (GstRTSPMedia * media, gboolean eos_shutdown) { g_return_if_fail (GST_IS_RTSP_MEDIA (media)); + g_mutex_lock (&media->lock); media->eos_shutdown = eos_shutdown; + g_mutex_unlock (&media->lock); } /** @@ -531,9 +470,15 @@ gst_rtsp_media_set_eos_shutdown (GstRTSPMedia * media, gboolean eos_shutdown) gboolean gst_rtsp_media_is_eos_shutdown (GstRTSPMedia * media) { + gboolean res; + g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE); - return media->eos_shutdown; + g_mutex_lock (&media->lock); + res = media->eos_shutdown; + g_mutex_unlock (&media->lock); + + return res; } /** @@ -548,7 +493,11 @@ gst_rtsp_media_set_buffer_size (GstRTSPMedia * media, guint size) { g_return_if_fail (GST_IS_RTSP_MEDIA (media)); + GST_LOG_OBJECT (media, "set buffer size %u", size); + + g_mutex_lock (&media->lock); media->buffer_size = size; + g_mutex_unlock (&media->lock); } /** @@ -562,9 +511,15 @@ gst_rtsp_media_set_buffer_size (GstRTSPMedia * media, guint size) guint gst_rtsp_media_get_buffer_size (GstRTSPMedia * media) { + guint res; + g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE); - return media->buffer_size; + g_mutex_unlock (&media->lock); + res = media->buffer_size; + g_mutex_unlock (&media->lock); + + return res; } /** @@ -581,15 +536,17 @@ gst_rtsp_media_set_auth (GstRTSPMedia * media, GstRTSPAuth * auth) g_return_if_fail (GST_IS_RTSP_MEDIA (media)); - old = media->auth; + GST_LOG_OBJECT (media, "set auth %p", auth); - if (old != auth) { - if (auth) - g_object_ref (auth); - media->auth = auth; - if (old) - g_object_unref (old); - } + g_mutex_lock (&media->lock); + if ((old = media->auth) != auth) + media->auth = auth ? g_object_ref (auth) : NULL; + else + old = NULL; + g_mutex_unlock (&media->lock); + + if (old) + g_object_unref (old); } /** @@ -598,7 +555,7 @@ gst_rtsp_media_set_auth (GstRTSPMedia * media, GstRTSPAuth * auth) * * Get the #GstRTSPAuth used as the authentication manager of @media. * - * Returns: the #GstRTSPAuth of @media. g_object_unref() after + * Returns: (transfer full): the #GstRTSPAuth of @media. g_object_unref() after * usage. */ GstRTSPAuth * @@ -608,12 +565,175 @@ gst_rtsp_media_get_auth (GstRTSPMedia * media) g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL); + g_mutex_lock (&media->lock); if ((result = media->auth)) g_object_ref (result); + g_mutex_unlock (&media->lock); return result; } +/** + * gst_rtsp_media_set_address_pool: + * @media: a #GstRTSPMedia + * @pool: a #GstRTSPAddressPool + * + * configure @pool to be used as the address pool of @media. + */ +void +gst_rtsp_media_set_address_pool (GstRTSPMedia * media, + GstRTSPAddressPool * pool) +{ + GstRTSPAddressPool *old; + + g_return_if_fail (GST_IS_RTSP_MEDIA (media)); + + GST_LOG_OBJECT (media, "set address pool %p", pool); + + g_mutex_lock (&media->lock); + if ((old = media->pool) != pool) + media->pool = pool ? g_object_ref (pool) : NULL; + else + old = NULL; + g_ptr_array_foreach (media->streams, (GFunc) gst_rtsp_stream_set_address_pool, + pool); + g_mutex_unlock (&media->lock); + + if (old) + g_object_unref (old); +} + +/** + * gst_rtsp_media_get_address_pool: + * @media: a #GstRTSPMedia + * + * Get the #GstRTSPAddressPool used as the address pool of @media. + * + * Returns: (transfer full): the #GstRTSPAddressPool of @media. g_object_unref() after + * usage. + */ +GstRTSPAddressPool * +gst_rtsp_media_get_address_pool (GstRTSPMedia * media) +{ + GstRTSPAddressPool *result; + + g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL); + + g_mutex_lock (&media->lock); + if ((result = media->pool)) + g_object_ref (result); + g_mutex_unlock (&media->lock); + + return result; +} + +/** + * gst_rtsp_media_collect_streams: + * @media: a #GstRTSPMedia + * + * Find all payloader elements, they should be named pay%d in the + * element of @media, and create #GstRTSPStreams for them. + * + * Collect all dynamic elements, named dynpay%d, and add them to + * the list of dynamic elements. + */ +void +gst_rtsp_media_collect_streams (GstRTSPMedia * media) +{ + GstElement *element, *elem; + GstPad *pad; + gint i; + gboolean have_elem; + + g_return_if_fail (GST_IS_RTSP_MEDIA (media)); + + element = media->element; + + have_elem = TRUE; + for (i = 0; have_elem; i++) { + gchar *name; + + have_elem = FALSE; + + name = g_strdup_printf ("pay%d", i); + if ((elem = gst_bin_get_by_name (GST_BIN (element), name))) { + GST_INFO ("found stream %d with payloader %p", i, elem); + + /* take the pad of the payloader */ + pad = gst_element_get_static_pad (elem, "src"); + /* create the stream */ + gst_rtsp_media_create_stream (media, elem, pad); + g_object_unref (pad); + + gst_object_unref (elem); + + have_elem = TRUE; + } + g_free (name); + + name = g_strdup_printf ("dynpay%d", i); + if ((elem = gst_bin_get_by_name (GST_BIN (element), name))) { + /* a stream that will dynamically create pads to provide RTP packets */ + + GST_INFO ("found dynamic element %d, %p", i, elem); + + g_mutex_lock (&media->lock); + media->dynamic = g_list_prepend (media->dynamic, elem); + g_mutex_unlock (&media->lock); + + have_elem = TRUE; + } + g_free (name); + } +} + +/** + * gst_rtsp_media_create_stream: + * @media: a #GstRTSPMedia + * @payloader: a #GstElement + * @srcpad: a source #GstPad + * + * Create a new stream in @media that provides RTP data on @srcpad. + * @srcpad should be a pad of an element inside @media->element. + * + * Returns: (transfer none): a new #GstRTSPStream that remains valid for as long + * as @media exists. + */ +GstRTSPStream * +gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader, + GstPad * pad) +{ + GstRTSPStream *stream; + GstPad *srcpad; + gchar *name; + gint idx; + + g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL); + g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL); + g_return_val_if_fail (GST_IS_PAD (pad), NULL); + g_return_val_if_fail (GST_PAD_IS_SRC (pad), NULL); + + g_mutex_lock (&media->lock); + idx = media->streams->len; + + name = g_strdup_printf ("src_%u", idx); + srcpad = gst_ghost_pad_new (name, pad); + gst_pad_set_active (srcpad, TRUE); + gst_element_add_pad (media->element, srcpad); + g_free (name); + + stream = gst_rtsp_stream_new (idx, payloader, srcpad); + if (media->pool) + gst_rtsp_stream_set_address_pool (stream, media->pool); + + g_ptr_array_add (media->streams, stream); + g_mutex_unlock (&media->lock); + + g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_NEW_STREAM], 0, stream, + NULL); + + return stream; +} /** * gst_rtsp_media_n_streams: @@ -626,9 +746,15 @@ gst_rtsp_media_get_auth (GstRTSPMedia * media) guint gst_rtsp_media_n_streams (GstRTSPMedia * media) { + guint res; + g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), 0); - return media->streams->len; + g_mutex_lock (&media->lock); + res = media->streams->len; + g_mutex_unlock (&media->lock); + + return res; } /** @@ -638,20 +764,22 @@ gst_rtsp_media_n_streams (GstRTSPMedia * media) * * Retrieve the stream with index @idx from @media. * - * Returns: the #GstRTSPMediaStream at index @idx or %NULL when a stream with + * Returns: (transfer none): the #GstRTSPStream at index @idx or %NULL when a stream with * that index did not exist. */ -GstRTSPMediaStream * +GstRTSPStream * gst_rtsp_media_get_stream (GstRTSPMedia * media, guint idx) { - GstRTSPMediaStream *res; + GstRTSPStream *res; g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL); + g_mutex_lock (&media->lock); if (idx < media->streams->len) - res = g_array_index (media->streams, GstRTSPMediaStream *, idx); + res = g_ptr_array_index (media->streams, idx); else res = NULL; + g_mutex_unlock (&media->lock); return res; } @@ -671,13 +799,15 @@ gst_rtsp_media_get_range_string (GstRTSPMedia * media, gboolean play) gchar *result; GstRTSPTimeRange range; + g_mutex_lock (&media->lock); /* make copy */ range = media->range; - if (!play && media->active > 0) { + if (!play && media->n_active > 0) { range.min.type = GST_RTSP_TIME_NOW; range.min.seconds = -1; } + g_mutex_unlock (&media->lock); result = gst_rtsp_range_to_string (&range); @@ -704,6 +834,10 @@ gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range) g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE); g_return_val_if_fail (range != NULL, FALSE); + g_rec_mutex_lock (&media->state_lock); + if (!media->seekable) + goto not_seekable; + if (range->unit != GST_RTSP_RANGE_NPT) goto not_supported; @@ -766,708 +900,55 @@ gst_rtsp_media_seek (GstRTSPMedia * media, GstRTSPTimeRange * range) GST_INFO ("no seek needed"); res = TRUE; } + g_rec_mutex_unlock (&media->state_lock); return res; /* ERRORS */ +not_seekable: + { + g_rec_mutex_unlock (&media->state_lock); + GST_INFO ("pipeline is not seekable"); + return TRUE; + } not_supported: { + g_rec_mutex_unlock (&media->state_lock); GST_WARNING ("seek unit %d not supported", range->unit); return FALSE; } weird_type: { + g_rec_mutex_unlock (&media->state_lock); GST_WARNING ("weird range type %d not supported", range->min.type); return FALSE; } } -/** - * gst_rtsp_media_stream_rtp: - * @stream: a #GstRTSPMediaStream - * @buffer: a #GstBuffer - * - * Handle an RTP buffer for the stream. This method is usually called when a - * message has been received from a client using the TCP transport. - * - * This function takes ownership of @buffer. - * - * Returns: a GstFlowReturn. - */ -GstFlowReturn -gst_rtsp_media_stream_rtp (GstRTSPMediaStream * stream, GstBuffer * buffer) -{ - GstFlowReturn ret; - - ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer); - - return ret; -} - -/** - * gst_rtsp_media_stream_rtcp: - * @stream: a #GstRTSPMediaStream - * @buffer: a #GstBuffer - * - * Handle an RTCP buffer for the stream. This method is usually called when a - * message has been received from a client using the TCP transport. - * - * This function takes ownership of @buffer. - * - * Returns: a GstFlowReturn. - */ -GstFlowReturn -gst_rtsp_media_stream_rtcp (GstRTSPMediaStream * stream, GstBuffer * buffer) -{ - GstFlowReturn ret; - - ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer); - - return ret; -} - -/* Allocate the udp ports and sockets */ -static gboolean -alloc_udp_ports (GstRTSPMedia * media, GstRTSPMediaStream * stream) -{ - GstStateChangeReturn ret; - GstElement *udpsrc0, *udpsrc1; - GstElement *udpsink0, *udpsink1; - gint tmp_rtp, tmp_rtcp; - guint count; - gint rtpport, rtcpport, sockfd; - const gchar *host; - - udpsrc0 = NULL; - udpsrc1 = NULL; - udpsink0 = NULL; - udpsink1 = NULL; - count = 0; - - /* Start with random port */ - tmp_rtp = 0; - - if (media->is_ipv6) - host = "udp://[::0]"; - else - host = "udp://0.0.0.0"; - - /* try to allocate 2 UDP ports, the RTP port should be an even - * number and the RTCP port should be the next (uneven) port */ -again: - udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL); - if (udpsrc0 == NULL) - goto no_udp_protocol; - g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL); - - ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED); - if (ret == GST_STATE_CHANGE_FAILURE) { - if (tmp_rtp != 0) { - tmp_rtp += 2; - if (++count > 20) - goto no_ports; - - gst_element_set_state (udpsrc0, GST_STATE_NULL); - gst_object_unref (udpsrc0); - - goto again; - } - goto no_udp_protocol; - } - - g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL); - - /* check if port is even */ - if ((tmp_rtp & 1) != 0) { - /* port not even, close and allocate another */ - if (++count > 20) - goto no_ports; - - gst_element_set_state (udpsrc0, GST_STATE_NULL); - gst_object_unref (udpsrc0); - - tmp_rtp++; - goto again; - } - - /* allocate port+1 for RTCP now */ - udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL); - if (udpsrc1 == NULL) - goto no_udp_rtcp_protocol; - - /* set port */ - tmp_rtcp = tmp_rtp + 1; - g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL); - - ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED); - /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */ - if (ret == GST_STATE_CHANGE_FAILURE) { - - if (++count > 20) - goto no_ports; - - gst_element_set_state (udpsrc0, GST_STATE_NULL); - gst_object_unref (udpsrc0); - - gst_element_set_state (udpsrc1, GST_STATE_NULL); - gst_object_unref (udpsrc1); - - tmp_rtp += 2; - goto again; - } - - /* all fine, do port check */ - g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL); - g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL); - - /* this should not happen... */ - if (rtpport != tmp_rtp || rtcpport != tmp_rtcp) - goto port_error; - - udpsink0 = gst_element_factory_make ("multiudpsink", NULL); - if (!udpsink0) - goto no_udp_protocol; - - g_object_get (G_OBJECT (udpsrc0), "sock", &sockfd, NULL); - g_object_set (G_OBJECT (udpsink0), "sockfd", sockfd, NULL); - g_object_set (G_OBJECT (udpsink0), "closefd", FALSE, NULL); - - udpsink1 = gst_element_factory_make ("multiudpsink", NULL); - if (!udpsink1) - goto no_udp_protocol; - - if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0), - "send-duplicates")) { - g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL); - } else { - g_warning - ("old multiudpsink version found without send-duplicates property"); - } - - if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0), - "buffer-size")) { - g_object_set (G_OBJECT (udpsink0), "buffer-size", media->buffer_size, NULL); - } else { - GST_WARNING ("multiudpsink version found without buffer-size property"); - } - - g_object_get (G_OBJECT (udpsrc1), "sock", &sockfd, NULL); - g_object_set (G_OBJECT (udpsink1), "sockfd", sockfd, NULL); - g_object_set (G_OBJECT (udpsink1), "closefd", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL); - - g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL); - g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL); - g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL); - - /* we keep these elements, we configure all in configure_transport when the - * server told us to really use the UDP ports. */ - stream->udpsrc[0] = udpsrc0; - stream->udpsrc[1] = udpsrc1; - stream->udpsink[0] = udpsink0; - stream->udpsink[1] = udpsink1; - stream->server_port.min = rtpport; - stream->server_port.max = rtcpport; - - return TRUE; - - /* ERRORS */ -no_udp_protocol: - { - goto cleanup; - } -no_ports: - { - goto cleanup; - } -no_udp_rtcp_protocol: - { - goto cleanup; - } -port_error: - { - goto cleanup; - } -cleanup: - { - if (udpsrc0) { - gst_element_set_state (udpsrc0, GST_STATE_NULL); - gst_object_unref (udpsrc0); - } - if (udpsrc1) { - gst_element_set_state (udpsrc1, GST_STATE_NULL); - gst_object_unref (udpsrc1); - } - if (udpsink0) { - gst_element_set_state (udpsink0, GST_STATE_NULL); - gst_object_unref (udpsink0); - } - if (udpsink1) { - gst_element_set_state (udpsink1, GST_STATE_NULL); - gst_object_unref (udpsink1); - } - return FALSE; - } -} - -/* executed from streaming thread */ -static void -caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPMediaStream * stream) -{ - gchar *capsstr; - GstCaps *newcaps, *oldcaps; - - if ((newcaps = GST_PAD_CAPS (pad))) - gst_caps_ref (newcaps); - - oldcaps = stream->caps; - stream->caps = newcaps; - - if (oldcaps) - gst_caps_unref (oldcaps); - - capsstr = gst_caps_to_string (newcaps); - GST_INFO ("stream %p received caps %p, %s", stream, newcaps, capsstr); - g_free (capsstr); -} - -static void -dump_structure (const GstStructure * s) -{ - gchar *sstr; - - sstr = gst_structure_to_string (s); - GST_INFO ("structure: %s", sstr); - g_free (sstr); -} - -static GstRTSPMediaTrans * -find_transport (GstRTSPMediaStream * stream, const gchar * rtcp_from) -{ - GList *walk; - GstRTSPMediaTrans *result = NULL; - const gchar *tmp; - gchar *dest; - guint port; - - if (rtcp_from == NULL) - return NULL; - - tmp = g_strrstr (rtcp_from, ":"); - if (tmp == NULL) - return NULL; - - port = atoi (tmp + 1); - dest = g_strndup (rtcp_from, tmp - rtcp_from); - - GST_INFO ("finding %s:%d", dest, port); - - for (walk = stream->transports; walk; walk = g_list_next (walk)) { - GstRTSPMediaTrans *trans = walk->data; - gint min, max; - - min = trans->transport->client_port.min; - max = trans->transport->client_port.max; - - if ((strcmp (trans->transport->destination, dest) == 0) && (min == port - || max == port)) { - result = trans; - break; - } - } - g_free (dest); - - return result; -} - -static void -on_new_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream) -{ - GstStructure *stats; - GstRTSPMediaTrans *trans; - - GST_INFO ("%p: new source %p", stream, source); - - /* see if we have a stream to match with the origin of the RTCP packet */ - trans = g_object_get_qdata (source, ssrc_stream_map_key); - if (trans == NULL) { - g_object_get (source, "stats", &stats, NULL); - if (stats) { - const gchar *rtcp_from; - - dump_structure (stats); - - rtcp_from = gst_structure_get_string (stats, "rtcp-from"); - if ((trans = find_transport (stream, rtcp_from))) { - GST_INFO ("%p: found transport %p for source %p", stream, trans, - source); - - /* keep ref to the source */ - trans->rtpsource = source; - - g_object_set_qdata (source, ssrc_stream_map_key, trans); - } - gst_structure_free (stats); - } - } else { - GST_INFO ("%p: source %p for transport %p", stream, source, trans); - } -} - -static void -on_ssrc_sdes (GObject * session, GObject * source, GstRTSPMediaStream * stream) -{ - GST_INFO ("%p: new SDES %p", stream, source); -} - -static void -on_ssrc_active (GObject * session, GObject * source, - GstRTSPMediaStream * stream) -{ - GstRTSPMediaTrans *trans; - - trans = g_object_get_qdata (source, ssrc_stream_map_key); - - GST_INFO ("%p: source %p in transport %p is active", stream, source, trans); - - if (trans && trans->keep_alive) - trans->keep_alive (trans->ka_user_data); - -#ifdef DUMP_STATS - { - GstStructure *stats; - g_object_get (source, "stats", &stats, NULL); - if (stats) { - dump_structure (stats); - gst_structure_free (stats); - } - } -#endif -} - -static void -on_bye_ssrc (GObject * session, GObject * source, GstRTSPMediaStream * stream) -{ - GST_INFO ("%p: source %p bye", stream, source); -} - -static void -on_bye_timeout (GObject * session, GObject * source, - GstRTSPMediaStream * stream) -{ - GstRTSPMediaTrans *trans; - - GST_INFO ("%p: source %p bye timeout", stream, source); - - if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) { - trans->rtpsource = NULL; - trans->timeout = TRUE; - } -} - -static void -on_timeout (GObject * session, GObject * source, GstRTSPMediaStream * stream) -{ - GstRTSPMediaTrans *trans; - - GST_INFO ("%p: source %p timeout", stream, source); - - if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) { - trans->rtpsource = NULL; - trans->timeout = TRUE; - } -} - -static GstFlowReturn -handle_new_buffer (GstAppSink * sink, gpointer user_data) -{ - GList *walk; - GstBuffer *buffer; - GstRTSPMediaStream *stream; - - buffer = gst_app_sink_pull_buffer (sink); - if (!buffer) - return GST_FLOW_OK; - - stream = (GstRTSPMediaStream *) user_data; - - for (walk = stream->transports; walk; walk = g_list_next (walk)) { - GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data; - - if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) { - if (tr->send_rtp) - tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data); - } else { - if (tr->send_rtcp) - tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data); - } - } - gst_buffer_unref (buffer); - - return GST_FLOW_OK; -} - -static GstFlowReturn -handle_new_buffer_list (GstAppSink * sink, gpointer user_data) -{ - GList *walk; - GstBufferList *blist; - GstRTSPMediaStream *stream; - - blist = gst_app_sink_pull_buffer_list (sink); - if (!blist) - return GST_FLOW_OK; - - stream = (GstRTSPMediaStream *) user_data; - - for (walk = stream->transports; walk; walk = g_list_next (walk)) { - GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data; - - if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) { - if (tr->send_rtp_list) - tr->send_rtp_list (blist, tr->transport->interleaved.min, - tr->user_data); - } else { - if (tr->send_rtcp_list) - tr->send_rtcp_list (blist, tr->transport->interleaved.max, - tr->user_data); - } - } - gst_buffer_list_unref (blist); - - return GST_FLOW_OK; -} - -static GstAppSinkCallbacks sink_cb = { - NULL, /* not interested in EOS */ - NULL, /* not interested in preroll buffers */ - handle_new_buffer, - handle_new_buffer_list -}; - -/* prepare the pipeline objects to handle @stream in @media */ -static gboolean -setup_stream (GstRTSPMediaStream * stream, guint idx, GstRTSPMedia * media) -{ - gchar *name; - GstPad *pad, *teepad, *selpad; - GstPadLinkReturn ret; - gint i; - - /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2 - * for sending RTP/RTCP. The sender and receiver ports are shared between the - * elements */ - if (!alloc_udp_ports (media, stream)) - return FALSE; - - /* add the ports to the pipeline */ - for (i = 0; i < 2; i++) { - gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]); - gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]); - } - - /* create elements for the TCP transfer */ - for (i = 0; i < 2; i++) { - stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL); - stream->appsink[i] = gst_element_factory_make ("appsink", NULL); - g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL); - g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL); - g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL); - gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]); - gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]); - gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]), - &sink_cb, stream, NULL); - } - - /* hook up the stream to the RTP session elements. */ - name = g_strdup_printf ("send_rtp_sink_%d", idx); - stream->send_rtp_sink = gst_element_get_request_pad (media->rtpbin, name); - g_free (name); - name = g_strdup_printf ("send_rtp_src_%d", idx); - stream->send_rtp_src = gst_element_get_static_pad (media->rtpbin, name); - g_free (name); - name = g_strdup_printf ("send_rtcp_src_%d", idx); - stream->send_rtcp_src = gst_element_get_request_pad (media->rtpbin, name); - g_free (name); - name = g_strdup_printf ("recv_rtcp_sink_%d", idx); - stream->recv_rtcp_sink = gst_element_get_request_pad (media->rtpbin, name); - g_free (name); - name = g_strdup_printf ("recv_rtp_sink_%d", idx); - stream->recv_rtp_sink = gst_element_get_request_pad (media->rtpbin, name); - g_free (name); - - /* get the session */ - g_signal_emit_by_name (media->rtpbin, "get-internal-session", idx, - &stream->session); - - g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc, - stream); - g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes, - stream); - g_signal_connect (stream->session, "on-ssrc-active", - (GCallback) on_ssrc_active, stream); - g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc, - stream); - g_signal_connect (stream->session, "on-bye-timeout", - (GCallback) on_bye_timeout, stream); - g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout, - stream); - - /* link the RTP pad to the session manager */ - ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink); - if (ret != GST_PAD_LINK_OK) - goto link_failed; - - /* make tee for RTP and link to stream */ - stream->tee[0] = gst_element_factory_make ("tee", NULL); - gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[0]); - - pad = gst_element_get_static_pad (stream->tee[0], "sink"); - gst_pad_link (stream->send_rtp_src, pad); - gst_object_unref (pad); - - /* link RTP sink, we're pretty sure this will work. */ - teepad = gst_element_get_request_pad (stream->tee[0], "src%d"); - pad = gst_element_get_static_pad (stream->udpsink[0], "sink"); - gst_pad_link (teepad, pad); - gst_object_unref (pad); - gst_object_unref (teepad); - - teepad = gst_element_get_request_pad (stream->tee[0], "src%d"); - pad = gst_element_get_static_pad (stream->appsink[0], "sink"); - gst_pad_link (teepad, pad); - gst_object_unref (pad); - gst_object_unref (teepad); - - /* make tee for RTCP */ - stream->tee[1] = gst_element_factory_make ("tee", NULL); - gst_bin_add (GST_BIN_CAST (media->pipeline), stream->tee[1]); - - pad = gst_element_get_static_pad (stream->tee[1], "sink"); - gst_pad_link (stream->send_rtcp_src, pad); - gst_object_unref (pad); - - /* link RTCP elements */ - teepad = gst_element_get_request_pad (stream->tee[1], "src%d"); - pad = gst_element_get_static_pad (stream->udpsink[1], "sink"); - gst_pad_link (teepad, pad); - gst_object_unref (pad); - gst_object_unref (teepad); - - teepad = gst_element_get_request_pad (stream->tee[1], "src%d"); - pad = gst_element_get_static_pad (stream->appsink[1], "sink"); - gst_pad_link (teepad, pad); - gst_object_unref (pad); - gst_object_unref (teepad); - - /* make selector for the RTP receivers */ - stream->selector[0] = gst_element_factory_make ("rtspfunnel", NULL); - gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[0]); - - pad = gst_element_get_static_pad (stream->selector[0], "src"); - gst_pad_link (pad, stream->recv_rtp_sink); - gst_object_unref (pad); - - selpad = gst_element_get_request_pad (stream->selector[0], "sink%d"); - pad = gst_element_get_static_pad (stream->udpsrc[0], "src"); - gst_pad_link (pad, selpad); - gst_object_unref (pad); - gst_object_unref (selpad); - - selpad = gst_element_get_request_pad (stream->selector[0], "sink%d"); - pad = gst_element_get_static_pad (stream->appsrc[0], "src"); - gst_pad_link (pad, selpad); - gst_object_unref (pad); - gst_object_unref (selpad); - - /* make selector for the RTCP receivers */ - stream->selector[1] = gst_element_factory_make ("rtspfunnel", NULL); - gst_bin_add (GST_BIN_CAST (media->pipeline), stream->selector[1]); - - pad = gst_element_get_static_pad (stream->selector[1], "src"); - gst_pad_link (pad, stream->recv_rtcp_sink); - gst_object_unref (pad); - - selpad = gst_element_get_request_pad (stream->selector[1], "sink%d"); - pad = gst_element_get_static_pad (stream->udpsrc[1], "src"); - gst_pad_link (pad, selpad); - gst_object_unref (pad); - gst_object_unref (selpad); - - selpad = gst_element_get_request_pad (stream->selector[1], "sink%d"); - pad = gst_element_get_static_pad (stream->appsrc[1], "src"); - gst_pad_link (pad, selpad); - gst_object_unref (pad); - gst_object_unref (selpad); - - /* we set and keep these to playing so that they don't cause NO_PREROLL return - * values */ - gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING); - gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING); - gst_element_set_locked_state (stream->udpsrc[0], TRUE); - gst_element_set_locked_state (stream->udpsrc[1], TRUE); - - /* be notified of caps changes */ - stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps", - (GCallback) caps_notify, stream); - - stream->prepared = TRUE; - - return TRUE; - - /* ERRORS */ -link_failed: - { - GST_WARNING ("failed to link stream %d", idx); - return FALSE; - } -} - -static void -unlock_streams (GstRTSPMedia * media) -{ - guint i, n_streams; - - /* unlock the udp src elements */ - n_streams = gst_rtsp_media_n_streams (media); - for (i = 0; i < n_streams; i++) { - GstRTSPMediaStream *stream; - - stream = gst_rtsp_media_get_stream (media, i); - - gst_element_set_locked_state (stream->udpsrc[0], FALSE); - gst_element_set_locked_state (stream->udpsrc[1], FALSE); - } -} - static void gst_rtsp_media_set_status (GstRTSPMedia * media, GstRTSPMediaStatus status) { - g_mutex_lock (media->lock); + g_mutex_lock (&media->lock); /* never overwrite the error status */ if (media->status != GST_RTSP_MEDIA_STATUS_ERROR) media->status = status; GST_DEBUG ("setting new status to %d", status); - g_cond_broadcast (media->cond); - g_mutex_unlock (media->lock); + g_cond_broadcast (&media->cond); + g_mutex_unlock (&media->lock); } static GstRTSPMediaStatus gst_rtsp_media_get_status (GstRTSPMedia * media) { GstRTSPMediaStatus result; - GTimeVal timeout; + gint64 end_time; - g_mutex_lock (media->lock); - g_get_current_time (&timeout); - g_time_val_add (&timeout, 20 * G_USEC_PER_SEC); + g_mutex_lock (&media->lock); + end_time = g_get_monotonic_time () + 20 * G_TIME_SPAN_SECOND; /* while we are preparing, wait */ while (media->status == GST_RTSP_MEDIA_STATUS_PREPARING) { GST_DEBUG ("waiting for status change"); - if (!g_cond_timed_wait (media->cond, media->lock, &timeout)) { + if (!g_cond_wait_until (&media->cond, &media->lock, end_time)) { GST_DEBUG ("timeout, assuming error status"); media->status = GST_RTSP_MEDIA_STATUS_ERROR; } @@ -1475,11 +956,12 @@ gst_rtsp_media_get_status (GstRTSPMedia * media) /* could be success or error */ result = media->status; GST_DEBUG ("got status %d", result); - g_mutex_unlock (media->lock); + g_mutex_unlock (&media->lock); return result; } +/* called with state-lock */ static gboolean default_handle_message (GstRTSPMedia * media, GstMessage * message) { @@ -1573,10 +1055,10 @@ default_handle_message (GstRTSPMedia * media, GstMessage * message) break; case GST_MESSAGE_EOS: GST_INFO ("%p: got EOS", media); - if (media->eos_pending) { + + if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARING) { GST_DEBUG ("shutting down after EOS"); - gst_element_set_state (media->pipeline, GST_STATE_NULL); - media->eos_pending = FALSE; + finish_unprepare (media); g_object_unref (media); } break; @@ -1596,64 +1078,65 @@ bus_message (GstBus * bus, GstMessage * message, GstRTSPMedia * media) klass = GST_RTSP_MEDIA_GET_CLASS (media); + g_rec_mutex_lock (&media->state_lock); if (klass->handle_message) ret = klass->handle_message (media, message); else ret = FALSE; + g_rec_mutex_unlock (&media->state_lock); return ret; } +static void +watch_destroyed (GstRTSPMedia * media) +{ + GST_DEBUG_OBJECT (media, "source destroyed"); + gst_object_unref (media); +} + /* called from streaming threads */ static void pad_added_cb (GstElement * element, GstPad * pad, GstRTSPMedia * media) { - GstRTSPMediaStream *stream; - gchar *name; - gint i; - - i = media->streams->len + 1; + GstRTSPStream *stream; - GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), i); + /* FIXME, element is likely not a payloader, find the payloader here */ + stream = gst_rtsp_media_create_stream (media, element, pad); - stream = g_new0 (GstRTSPMediaStream, 1); - stream->payloader = element; - - name = g_strdup_printf ("dynpay%d", i); + GST_INFO ("pad added %s:%s, stream %d", GST_DEBUG_PAD_NAME (pad), + stream->idx); + g_rec_mutex_lock (&media->state_lock); + /* we will be adding elements below that will cause ASYNC_DONE to be + * posted in the bus. We want to ignore those messages until the + * pipeline really prerolled. */ media->adding = TRUE; - /* ghost the pad of the payloader to the element */ - stream->srcpad = gst_ghost_pad_new (name, pad); - gst_pad_set_active (stream->srcpad, TRUE); - gst_element_add_pad (media->element, stream->srcpad); - g_free (name); + /* join the element in the PAUSED state because this callback is + * called from the streaming thread and it is PAUSED */ + gst_rtsp_stream_join_bin (stream, GST_BIN (media->pipeline), + media->rtpbin, GST_STATE_PAUSED); - /* add stream now */ - g_array_append_val (media->streams, stream); - - setup_stream (stream, i, media); - - for (i = 0; i < 2; i++) { - gst_element_set_state (stream->udpsink[i], GST_STATE_PAUSED); - gst_element_set_state (stream->appsink[i], GST_STATE_PAUSED); - gst_element_set_state (stream->tee[i], GST_STATE_PAUSED); - gst_element_set_state (stream->selector[i], GST_STATE_PAUSED); - gst_element_set_state (stream->appsrc[i], GST_STATE_PAUSED); - } media->adding = FALSE; + g_rec_mutex_unlock (&media->state_lock); } static void no_more_pads_cb (GstElement * element, GstRTSPMedia * media) { + GstElement *fakesink; + + g_mutex_lock (&media->lock); GST_INFO ("no more pads"); - if (media->fakesink) { - gst_object_ref (media->fakesink); - gst_bin_remove (GST_BIN (media->pipeline), media->fakesink); - gst_element_set_state (media->fakesink, GST_STATE_NULL); - gst_object_unref (media->fakesink); + if ((fakesink = media->fakesink)) { + gst_object_ref (fakesink); media->fakesink = NULL; + g_mutex_unlock (&media->lock); + + gst_bin_remove (GST_BIN (media->pipeline), fakesink); + gst_element_set_state (fakesink, GST_STATE_NULL); + gst_object_unref (fakesink); GST_INFO ("removed fakesink"); } } @@ -1675,21 +1158,33 @@ gst_rtsp_media_prepare (GstRTSPMedia * media) { GstStateChangeReturn ret; GstRTSPMediaStatus status; - guint i, n_streams; + guint i; GstRTSPMediaClass *klass; GstBus *bus; GList *walk; + g_rec_mutex_lock (&media->state_lock); if (media->status == GST_RTSP_MEDIA_STATUS_PREPARED) goto was_prepared; + if (media->status == GST_RTSP_MEDIA_STATUS_PREPARING) + goto wait_status; + + if (media->status != GST_RTSP_MEDIA_STATUS_UNPREPARED) + goto not_unprepared; + if (!media->reusable && media->reused) goto is_reused; + media->rtpbin = gst_element_factory_make ("rtpbin", NULL); + if (media->rtpbin == NULL) + goto no_rtpbin; + GST_INFO ("preparing media %p", media); /* reset some variables */ media->is_live = FALSE; + media->seekable = FALSE; media->buffering = FALSE; /* we're preparing now */ media->status = GST_RTSP_MEDIA_STATUS_PREPARING; @@ -1700,25 +1195,24 @@ gst_rtsp_media_prepare (GstRTSPMedia * media) media->source = gst_bus_create_watch (bus); gst_object_unref (bus); - g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL); + g_source_set_callback (media->source, (GSourceFunc) bus_message, + gst_object_ref (media), (GDestroyNotify) watch_destroyed); klass = GST_RTSP_MEDIA_GET_CLASS (media); media->id = g_source_attach (media->source, klass->context); - media->rtpbin = gst_element_factory_make ("gstrtpbin", NULL); - /* add stuff to the bin */ gst_bin_add (GST_BIN (media->pipeline), media->rtpbin); /* link streams we already have, other streams might appear when we have * dynamic elements */ - n_streams = gst_rtsp_media_n_streams (media); - for (i = 0; i < n_streams; i++) { - GstRTSPMediaStream *stream; + for (i = 0; i < media->streams->len; i++) { + GstRTSPStream *stream; - stream = gst_rtsp_media_get_stream (media, i); + stream = g_ptr_array_index (media->streams, i); - setup_stream (stream, i, media); + gst_rtsp_stream_join_bin (stream, GST_BIN (media->pipeline), + media->rtpbin, GST_STATE_NULL); } for (walk = media->dynamic; walk; walk = g_list_next (walk)) { @@ -1743,13 +1237,18 @@ gst_rtsp_media_prepare (GstRTSPMedia * media) switch (ret) { case GST_STATE_CHANGE_SUCCESS: GST_INFO ("SUCCESS state change for media %p", media); + media->seekable = TRUE; break; case GST_STATE_CHANGE_ASYNC: GST_INFO ("ASYNC state change for media %p", media); + media->seekable = TRUE; break; case GST_STATE_CHANGE_NO_PREROLL: /* we need to go to PLAYING */ GST_INFO ("NO_PREROLL state change: live media %p", media); + /* FIXME we disable seeking for live streams for now. We should perform a + * seeking query in preroll instead */ + media->seekable = FALSE; media->is_live = TRUE; ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING); if (ret == GST_STATE_CHANGE_FAILURE) @@ -1758,8 +1257,11 @@ gst_rtsp_media_prepare (GstRTSPMedia * media) case GST_STATE_CHANGE_FAILURE: goto state_failed; } +wait_status: + g_rec_mutex_unlock (&media->state_lock); - /* now wait for all pads to be prerolled */ + /* now wait for all pads to be prerolled, FIXME, we should somehow be + * able to do this async so that we don't block the server thread. */ status = gst_rtsp_media_get_status (media); if (status == GST_RTSP_MEDIA_STATUS_ERROR) goto state_failed; @@ -1773,62 +1275,76 @@ gst_rtsp_media_prepare (GstRTSPMedia * media) /* OK */ was_prepared: { + GST_LOG ("media %p was prepared", media); + g_rec_mutex_unlock (&media->state_lock); return TRUE; } /* ERRORS */ +not_unprepared: + { + GST_WARNING ("media %p was not unprepared", media); + g_rec_mutex_unlock (&media->state_lock); + return FALSE; + } is_reused: { + g_rec_mutex_unlock (&media->state_lock); GST_WARNING ("can not reuse media %p", media); return FALSE; } +no_rtpbin: + { + g_rec_mutex_unlock (&media->state_lock); + GST_WARNING ("no rtpbin element"); + g_warning ("failed to create element 'rtpbin', check your installation"); + return FALSE; + } state_failed: { GST_WARNING ("failed to preroll pipeline"); - unlock_streams (media); - gst_element_set_state (media->pipeline, GST_STATE_NULL); gst_rtsp_media_unprepare (media); + g_rec_mutex_unlock (&media->state_lock); return FALSE; } } -/** - * gst_rtsp_media_unprepare: - * @media: a #GstRTSPMedia - * - * Unprepare @media. After this call, the media should be prepared again before - * it can be used again. If the media is set to be non-reusable, a new instance - * must be created. - * - * Returns: %TRUE on success. - */ -gboolean -gst_rtsp_media_unprepare (GstRTSPMedia * media) +/* must be called with state-lock */ +static void +finish_unprepare (GstRTSPMedia * media) { - GstRTSPMediaClass *klass; - gboolean success; + gint i; - if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARED) - return TRUE; + GST_DEBUG ("shutting down"); - GST_INFO ("unprepare media %p", media); - media->target_state = GST_STATE_NULL; + gst_element_set_state (media->pipeline, GST_STATE_NULL); - klass = GST_RTSP_MEDIA_GET_CLASS (media); - if (klass->unprepare) - success = klass->unprepare (media); - else - success = TRUE; + for (i = 0; i < media->streams->len; i++) { + GstRTSPStream *stream; + + GST_INFO ("Removing elements of stream %d from pipeline", i); + + stream = g_ptr_array_index (media->streams, i); + + gst_rtsp_stream_leave_bin (stream, GST_BIN (media->pipeline), + media->rtpbin); + } + g_ptr_array_set_size (media->streams, 0); + + gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin); + media->rtpbin = NULL; + + gst_object_unref (media->pipeline); + media->pipeline = NULL; - media->status = GST_RTSP_MEDIA_STATUS_UNPREPARED; media->reused = TRUE; + media->status = GST_RTSP_MEDIA_STATUS_UNPREPARED; /* when the media is not reusable, this will effectively unref the media and * recreate it */ g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_UNPREPARED], 0, NULL); - - return success; } +/* called with state-lock */ static gboolean default_unprepare (GstRTSPMedia * media) { @@ -1836,48 +1352,71 @@ default_unprepare (GstRTSPMedia * media) GST_DEBUG ("sending EOS for shutdown"); /* ref so that we don't disappear */ g_object_ref (media); - media->eos_pending = TRUE; gst_element_send_event (media->pipeline, gst_event_new_eos ()); /* we need to go to playing again for the EOS to propagate, normally in this * state, nothing is receiving data from us anymore so this is ok. */ gst_element_set_state (media->pipeline, GST_STATE_PLAYING); + media->status = GST_RTSP_MEDIA_STATUS_UNPREPARING; } else { - GST_DEBUG ("shutting down"); - gst_element_set_state (media->pipeline, GST_STATE_NULL); + finish_unprepare (media); } return TRUE; } -static void -add_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream, - gchar * dest, gint min, gint max) +/** + * gst_rtsp_media_unprepare: + * @media: a #GstRTSPMedia + * + * Unprepare @media. After this call, the media should be prepared again before + * it can be used again. If the media is set to be non-reusable, a new instance + * must be created. + * + * Returns: %TRUE on success. + */ +gboolean +gst_rtsp_media_unprepare (GstRTSPMedia * media) { - gboolean do_add = TRUE; - RTSPDestination *ndest; + gboolean success; - GST_INFO ("adding %s:%d-%d", dest, min, max); - g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL); - g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL); -} + g_rec_mutex_lock (&media->state_lock); + if (media->status == GST_RTSP_MEDIA_STATUS_UNPREPARED) + goto was_unprepared; -static void -remove_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream, - gchar * dest, gint min, gint max) -{ - gboolean do_remove = TRUE; - RTSPDestination *ndest = NULL; - GList *find = NULL; + GST_INFO ("unprepare media %p", media); + media->target_state = GST_STATE_NULL; + success = TRUE; + + if (media->status == GST_RTSP_MEDIA_STATUS_PREPARED) { + GstRTSPMediaClass *klass; + + klass = GST_RTSP_MEDIA_GET_CLASS (media); + if (klass->unprepare) + success = klass->unprepare (media); + } else { + finish_unprepare (media); + } + if (media->source) { + g_source_destroy (media->source); + g_source_unref (media->source); + media->source = NULL; + } + g_rec_mutex_unlock (&media->state_lock); + + return success; - GST_INFO ("removing %s:%d-%d", dest, min, max); - g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL); - g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL); +was_unprepared: + { + g_rec_mutex_unlock (&media->state_lock); + GST_INFO ("media %p was already unprepared", media); + return TRUE; + } } /** * gst_rtsp_media_set_state: * @media: a #GstRTSPMedia * @state: the target state of the media - * @transports: a #GArray of #GstRTSPMediaTrans pointers + * @transports: a #GPtrArray of #GstRTSPStreamTransport pointers * * Set the state of @media to @state and for the transports in @transports. * @@ -1885,16 +1424,17 @@ remove_udp_destination (GstRTSPMedia * media, GstRTSPMediaStream * stream, */ gboolean gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state, - GArray * transports) + GPtrArray * transports) { gint i; - GstStateChangeReturn ret; gboolean add, remove, do_state; gint old_active; g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE); g_return_val_if_fail (transports != NULL, FALSE); + g_rec_mutex_lock (&media->state_lock); + /* NULL and READY are the same */ if (state == GST_STATE_READY) state = GST_STATE_NULL; @@ -1906,9 +1446,6 @@ gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state, switch (state) { case GST_STATE_NULL: - /* unlock the streams so that they follow the state changes from now on */ - unlock_streams (media); - /* fallthrough */ case GST_STATE_PAUSED: /* we're going from PLAYING to PAUSED, READY or NULL, remove */ if (media->target_state == GST_STATE_PLAYING) @@ -1921,69 +1458,26 @@ gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state, default: break; } - old_active = media->active; + old_active = media->n_active; for (i = 0; i < transports->len; i++) { - GstRTSPMediaTrans *tr; - GstRTSPMediaStream *stream; - GstRTSPTransport *trans; + GstRTSPStreamTransport *trans; /* we need a non-NULL entry in the array */ - tr = g_array_index (transports, GstRTSPMediaTrans *, i); - if (tr == NULL) + trans = g_ptr_array_index (transports, i); + if (trans == NULL) continue; /* we need a transport */ - if (!(trans = tr->transport)) + if (!trans->transport) continue; - /* get the stream and add the destinations */ - stream = gst_rtsp_media_get_stream (media, tr->idx); - switch (trans->lower_transport) { - case GST_RTSP_LOWER_TRANS_UDP: - case GST_RTSP_LOWER_TRANS_UDP_MCAST: - { - gchar *dest; - gint min, max; - - dest = trans->destination; - if (trans->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) { - min = trans->port.min; - max = trans->port.max; - } else { - min = trans->client_port.min; - max = trans->client_port.max; - } - - if (add && !tr->active) { - add_udp_destination (media, stream, dest, min, max); - stream->transports = g_list_prepend (stream->transports, tr); - tr->active = TRUE; - media->active++; - } else if (remove && tr->active) { - remove_udp_destination (media, stream, dest, min, max); - stream->transports = g_list_remove (stream->transports, tr); - tr->active = FALSE; - media->active--; - } - break; - } - case GST_RTSP_LOWER_TRANS_TCP: - if (add && !tr->active) { - GST_INFO ("adding TCP %s", trans->destination); - stream->transports = g_list_prepend (stream->transports, tr); - tr->active = TRUE; - media->active++; - } else if (remove && tr->active) { - GST_INFO ("removing TCP %s", trans->destination); - stream->transports = g_list_remove (stream->transports, tr); - tr->active = FALSE; - media->active--; - } - break; - default: - GST_INFO ("Unknown transport %d", trans->lower_transport); - break; + if (add) { + if (gst_rtsp_stream_add_transport (trans->stream, trans)) + media->n_active++; + } else if (remove) { + if (gst_rtsp_stream_remove_transport (trans->stream, trans)) + media->n_active--; } } @@ -1991,12 +1485,12 @@ gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state, if (old_active == 0 && add) do_state = TRUE; /* if we have no more active media, do the downward state changes */ - else if (media->active == 0) + else if (media->n_active == 0) do_state = TRUE; else do_state = FALSE; - GST_INFO ("state %d active %d media %p do_state %d", state, media->active, + GST_INFO ("state %d active %d media %p do_state %d", state, media->n_active, media, do_state); if (media->target_state != state) { @@ -2007,7 +1501,7 @@ gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state, GST_INFO ("state %s media %p", gst_element_state_get_name (state), media); media->target_state = state; - ret = gst_element_set_state (media->pipeline, state); + gst_element_set_state (media->pipeline, state); } } g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_NEW_STATE], 0, state, @@ -2015,61 +1509,11 @@ gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state, } /* remember where we are */ - if (state == GST_STATE_PAUSED || old_active != media->active) + if (state != GST_STATE_NULL && (state == GST_STATE_PAUSED || + old_active != media->n_active)) collect_media_stats (media); - return TRUE; -} - -/** - * gst_rtsp_media_remove_elements: - * @media: a #GstRTSPMedia - * - * Remove all elements and the pipeline controlled by @media. - */ -void -gst_rtsp_media_remove_elements (GstRTSPMedia * media) -{ - gint i, j; - - unlock_streams (media); - - for (i = 0; i < media->streams->len; i++) { - GstRTSPMediaStream *stream; - - GST_INFO ("Removing elements of stream %d from pipeline", i); - - stream = g_array_index (media->streams, GstRTSPMediaStream *, i); + g_rec_mutex_unlock (&media->state_lock); - gst_pad_unlink (stream->srcpad, stream->send_rtp_sink); - - g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig); - - for (j = 0; j < 2; j++) { - gst_element_set_state (stream->udpsrc[j], GST_STATE_NULL); - gst_element_set_state (stream->udpsink[j], GST_STATE_NULL); - gst_element_set_state (stream->appsrc[j], GST_STATE_NULL); - gst_element_set_state (stream->appsink[j], GST_STATE_NULL); - gst_element_set_state (stream->tee[j], GST_STATE_NULL); - gst_element_set_state (stream->selector[j], GST_STATE_NULL); - - gst_bin_remove (GST_BIN (media->pipeline), stream->udpsrc[j]); - gst_bin_remove (GST_BIN (media->pipeline), stream->udpsink[j]); - gst_bin_remove (GST_BIN (media->pipeline), stream->appsrc[j]); - gst_bin_remove (GST_BIN (media->pipeline), stream->appsink[j]); - gst_bin_remove (GST_BIN (media->pipeline), stream->tee[j]); - gst_bin_remove (GST_BIN (media->pipeline), stream->selector[j]); - } - if (stream->caps) - gst_caps_unref (stream->caps); - stream->caps = NULL; - gst_rtsp_media_stream_free (stream); - } - g_array_remove_range (media->streams, 0, media->streams->len); - - gst_element_set_state (media->rtpbin, GST_STATE_NULL); - gst_bin_remove (GST_BIN (media->pipeline), media->rtpbin); - - gst_object_unref (media->pipeline); - media->pipeline = NULL; + return TRUE; }