X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Frtsp-server%2Frtsp-client.c;h=c41d019523e09f7127ea99ca3bd4ffdc8e35cd59;hb=08e0c79cee11daf7872828c9c60e1b78b3f7d256;hp=e442209bed7ce14200cb8a513e67257309519b11;hpb=5e2afcefdda5066641c01d243444c4f670054de0;p=platform%2Fupstream%2Fgstreamer.git diff --git a/gst/rtsp-server/rtsp-client.c b/gst/rtsp-server/rtsp-client.c index e442209..c41d019 100644 --- a/gst/rtsp-server/rtsp-client.c +++ b/gst/rtsp-server/rtsp-client.c @@ -1,5 +1,7 @@ /* GStreamer * Copyright (C) 2008 Wim Taymans + * Copyright (C) 2015 Centricular Ltd + * Author: Sebastian Dröge * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -59,6 +61,7 @@ struct _GstRTSPClientPrivate { GMutex lock; /* protects everything else */ GMutex send_lock; + GMutex watch_lock; GstRTSPConnection *connection; GstRTSPWatch *watch; GMainContext *watch_context; @@ -81,8 +84,9 @@ struct _GstRTSPClientPrivate gchar *path; GstRTSPMedia *media; - GList *transports; + GHashTable *transports; GList *sessions; + guint sessions_cookie; gboolean drop_backlog; }; @@ -90,6 +94,10 @@ struct _GstRTSPClientPrivate static GMutex tunnels_lock; static GHashTable *tunnels; /* protected by tunnels_lock */ +/* FIXME make this configurable. We don't want to do this yet because it will + * be superceeded by a cache object later */ +#define WATCH_BACKLOG_SIZE 100 + #define DEFAULT_SESSION_POOL NULL #define DEFAULT_MOUNT_POINTS NULL #define DEFAULT_DROP_BACKLOG TRUE @@ -117,6 +125,8 @@ enum SIGNAL_GET_PARAMETER_REQUEST, SIGNAL_HANDLE_RESPONSE, SIGNAL_SEND_MESSAGE, + SIGNAL_ANNOUNCE_REQUEST, + SIGNAL_RECORD_REQUEST, SIGNAL_LAST }; @@ -132,8 +142,8 @@ static void gst_rtsp_client_set_property (GObject * object, guint propid, static void gst_rtsp_client_finalize (GObject * obj); static GstSDPMessage *create_sdp (GstRTSPClient * client, GstRTSPMedia * media); -static void unlink_session_transports (GstRTSPClient * client, - GstRTSPSession * session, GstRTSPSessionMedia * sessmedia); +static gboolean handle_sdp (GstRTSPClient * client, GstRTSPContext * ctx, + GstRTSPMedia * media, GstSDPMessage * sdp); static gboolean default_configure_client_media (GstRTSPClient * client, GstRTSPMedia * media, GstRTSPStream * stream, GstRTSPContext * ctx); static gboolean default_configure_client_transport (GstRTSPClient * client, @@ -144,6 +154,8 @@ static GstRTSPResult default_params_get (GstRTSPClient * client, GstRTSPContext * ctx); static gchar *default_make_path_from_uri (GstRTSPClient * client, const GstRTSPUrl * uri); +static void client_session_removed (GstRTSPSessionPool * pool, + GstRTSPSession * session, GstRTSPClient * client); G_DEFINE_TYPE (GstRTSPClient, gst_rtsp_client, G_TYPE_OBJECT); @@ -161,6 +173,7 @@ gst_rtsp_client_class_init (GstRTSPClientClass * klass) gobject_class->finalize = gst_rtsp_client_finalize; klass->create_sdp = create_sdp; + klass->handle_sdp = handle_sdp; klass->configure_client_media = default_configure_client_media; klass->configure_client_transport = default_configure_client_transport; klass->params_set = default_params_set; @@ -256,9 +269,22 @@ gst_rtsp_client_class_init (GstRTSPClientClass * klass) */ gst_rtsp_client_signals[SIGNAL_SEND_MESSAGE] = g_signal_new ("send-message", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic, + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTSPClientClass, + send_message), NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, GST_TYPE_RTSP_CONTEXT, G_TYPE_POINTER); + gst_rtsp_client_signals[SIGNAL_ANNOUNCE_REQUEST] = + g_signal_new ("announce-request", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTSPClientClass, announce_request), + NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, + GST_TYPE_RTSP_CONTEXT); + + gst_rtsp_client_signals[SIGNAL_RECORD_REQUEST] = + g_signal_new ("record-request", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTSPClientClass, record_request), + NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, + GST_TYPE_RTSP_CONTEXT); + tunnels = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_object_unref); g_mutex_init (&tunnels_lock); @@ -275,20 +301,20 @@ gst_rtsp_client_init (GstRTSPClient * client) g_mutex_init (&priv->lock); g_mutex_init (&priv->send_lock); + g_mutex_init (&priv->watch_lock); priv->close_seq = 0; priv->drop_backlog = DEFAULT_DROP_BACKLOG; + priv->transports = + g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, + g_object_unref); } static GstRTSPFilterResult filter_session_media (GstRTSPSession * sess, GstRTSPSessionMedia * sessmedia, gpointer user_data) { - GstRTSPClient *client = GST_RTSP_CLIENT (user_data); - gst_rtsp_session_media_set_state (sessmedia, GST_STATE_NULL); - unlink_session_transports (client, sess, sessmedia); - /* unmanage the media in the session */ return GST_RTSP_FILTER_REMOVE; } @@ -299,9 +325,18 @@ client_watch_session (GstRTSPClient * client, GstRTSPSession * session) g_mutex_lock (&priv->lock); /* check if we already know about this session */ - if (g_list_find (priv->sessions, session) = NULL) { + if (g_list_find (priv->sessions, session) == NULL) { GST_INFO ("watching session %p", session); + priv->sessions = g_list_prepend (priv->sessions, g_object_ref (session)); + priv->sessions_cookie++; + + /* connect removed session handler, it will be disconnected when the last + * session gets removed */ + if (priv->session_removed_id == 0) + priv->session_removed_id = g_signal_connect_data (priv->session_pool, + "session-removed", G_CALLBACK (client_session_removed), + g_object_ref (client), (GClosureNotify) g_object_unref, 0); } g_mutex_unlock (&priv->lock); @@ -322,10 +357,16 @@ client_unwatch_session (GstRTSPClient * client, GstRTSPSession * session, if (link == NULL) return; } + priv->sessions = g_list_delete_link (priv->sessions, link); + priv->sessions_cookie++; - /* unlink all media managed in this session */ - gst_rtsp_session_filter (session, filter_session_media, client); + /* if this was the last session, disconnect the handler. + * This will also drop the extra client ref */ + if (!priv->sessions) { + g_signal_handler_disconnect (priv->session_pool, priv->session_removed_id); + priv->session_removed_id = 0; + } /* remove the session */ g_object_unref (session); @@ -335,9 +376,30 @@ static GstRTSPFilterResult cleanup_session (GstRTSPClient * client, GstRTSPSession * sess, gpointer user_data) { + /* unlink all media managed in this session. This needs to happen + * without the client lock, so we really want to do it here. */ + gst_rtsp_session_filter (sess, filter_session_media, client); + return GST_RTSP_FILTER_REMOVE; } +static void +clean_cached_media (GstRTSPClient * client, gboolean unprepare) +{ + GstRTSPClientPrivate *priv = client->priv; + + if (priv->path) { + g_free (priv->path); + priv->path = NULL; + } + if (priv->media) { + if (unprepare) + gst_rtsp_media_unprepare (priv->media); + g_object_unref (priv->media); + priv->media = NULL; + } +} + /* A client is finalized when the connection is broken */ static void gst_rtsp_client_finalize (GObject * obj) @@ -357,12 +419,17 @@ gst_rtsp_client_finalize (GObject * obj) if (priv->watch_context) g_main_context_unref (priv->watch_context); - gst_rtsp_client_session_filter (client, cleanup_session, NULL); + /* all sessions should have been removed by now. We keep a ref to + * the client object for the session removed handler. The ref is + * dropped when the last session is removed from the list. */ + g_assert (priv->sessions == NULL); + g_assert (priv->session_removed_id == 0); + + g_hash_table_unref (priv->transports); if (priv->connection) gst_rtsp_connection_free (priv->connection); if (priv->session_pool) { - g_signal_handler_disconnect (priv->session_pool, priv->session_removed_id); g_object_unref (priv->session_pool); } if (priv->mount_points) @@ -372,16 +439,12 @@ gst_rtsp_client_finalize (GObject * obj) if (priv->thread_pool) g_object_unref (priv->thread_pool); - if (priv->path) - g_free (priv->path); - if (priv->media) { - gst_rtsp_media_unprepare (priv->media); - g_object_unref (priv->media); - } + clean_cached_media (client, TRUE); g_free (priv->server_ip); g_mutex_clear (&priv->lock); g_mutex_clear (&priv->send_lock); + g_mutex_clear (&priv->watch_lock); G_OBJECT_CLASS (gst_rtsp_client_parent_class)->finalize (obj); } @@ -562,18 +625,9 @@ find_media (GstRTSPClient * client, GstRTSPContext * ctx, gchar * path, path_len = strlen (path); if (!paths_are_equal (priv->path, path, path_len)) { - GstRTSPThread *thread; - /* remove any previously cached values before we try to construct a new * media for uri */ - if (priv->path) - g_free (priv->path); - priv->path = NULL; - if (priv->media) { - gst_rtsp_media_unprepare (priv->media); - g_object_unref (priv->media); - } - priv->media = NULL; + clean_cached_media (client, TRUE); /* prepare the media and add it to the pipeline */ if (!(media = gst_rtsp_media_factory_construct (factory, ctx->uri))) @@ -581,14 +635,19 @@ find_media (GstRTSPClient * client, GstRTSPContext * ctx, gchar * path, ctx->media = media; - thread = gst_rtsp_thread_pool_get_thread (priv->thread_pool, - GST_RTSP_THREAD_TYPE_MEDIA, ctx); - if (thread == NULL) - goto no_thread; + if (!(gst_rtsp_media_get_transport_mode (media) & + GST_RTSP_TRANSPORT_MODE_RECORD)) { + GstRTSPThread *thread; - /* prepare the media */ - if (!(gst_rtsp_media_prepare (media, thread))) - goto no_prepare; + thread = gst_rtsp_thread_pool_get_thread (priv->thread_pool, + GST_RTSP_THREAD_TYPE_MEDIA, ctx); + if (thread == NULL) + goto no_thread; + + /* prepare the media */ + if (!gst_rtsp_media_prepare (media, thread)) + goto no_prepare; + } /* now keep track of the uri and the media */ priv->path = g_strndup (path, path_len); @@ -663,6 +722,7 @@ do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client) { GstRTSPClientPrivate *priv = client->priv; GstRTSPMessage message = { 0 }; + GstRTSPResult res = GST_RTSP_OK; GstMapInfo map_info; guint8 *data; guint usize; @@ -677,7 +737,7 @@ do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client) g_mutex_lock (&priv->send_lock); if (priv->send_func) - priv->send_func (client, &message, FALSE, priv->send_data); + res = priv->send_func (client, &message, FALSE, priv->send_data); g_mutex_unlock (&priv->send_lock); gst_rtsp_message_steal_body (&message, &data, &usize); @@ -685,97 +745,19 @@ do_send_data (GstBuffer * buffer, guint8 channel, GstRTSPClient * client) gst_rtsp_message_unset (&message); - return TRUE; -} - -static void -link_transport (GstRTSPClient * client, GstRTSPSession * session, - GstRTSPStreamTransport * trans) -{ - GstRTSPClientPrivate *priv = client->priv; - - GST_DEBUG ("client %p: linking transport %p", client, trans); - - gst_rtsp_stream_transport_set_callbacks (trans, - (GstRTSPSendFunc) do_send_data, - (GstRTSPSendFunc) do_send_data, client, NULL); - - priv->transports = g_list_prepend (priv->transports, trans); - - /* make sure our session can't expire */ - gst_rtsp_session_prevent_expire (session); -} - -static void -link_session_transports (GstRTSPClient * client, GstRTSPSession * session, - GstRTSPSessionMedia * sessmedia) -{ - guint n_streams, i; - - n_streams = - gst_rtsp_media_n_streams (gst_rtsp_session_media_get_media (sessmedia)); - for (i = 0; i < n_streams; i++) { - GstRTSPStreamTransport *trans; - const GstRTSPTransport *tr; - - /* get the transport, if there is no transport configured, skip this stream */ - trans = gst_rtsp_session_media_get_transport (sessmedia, i); - if (trans == NULL) - continue; - - tr = gst_rtsp_stream_transport_get_transport (trans); - - if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) { - /* for TCP, link the stream to the TCP connection of the client */ - link_transport (client, session, trans); - } - } + return res == GST_RTSP_OK; } -static void -unlink_transport (GstRTSPClient * client, GstRTSPSession * session, - GstRTSPStreamTransport * trans) -{ - GstRTSPClientPrivate *priv = client->priv; - - GST_DEBUG ("client %p: unlinking transport %p", client, trans); - - gst_rtsp_stream_transport_set_callbacks (trans, NULL, NULL, NULL, NULL); - - priv->transports = g_list_remove (priv->transports, trans); - - /* our session can now expire */ - gst_rtsp_session_allow_expire (session); -} - -static void -unlink_session_transports (GstRTSPClient * client, GstRTSPSession * session, - GstRTSPSessionMedia * sessmedia) -{ - guint n_streams, i; - - n_streams = - gst_rtsp_media_n_streams (gst_rtsp_session_media_get_media (sessmedia)); - for (i = 0; i < n_streams; i++) { - GstRTSPStreamTransport *trans; - const GstRTSPTransport *tr; - - /* get the transport, if there is no transport configured, skip this stream */ - trans = gst_rtsp_session_media_get_transport (sessmedia, i); - if (trans == NULL) - continue; - - tr = gst_rtsp_stream_transport_get_transport (trans); - - if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) { - /* for TCP, unlink the stream from the TCP connection of the client */ - unlink_transport (client, session, trans); - } - } -} - -static void -close_connection (GstRTSPClient * client) +/** + * gst_rtsp_client_close: + * @client: a #GstRTSPClient + * + * Close the connection of @client and remove all media it was managing. + * + * Since: 1.4 + */ +void +gst_rtsp_client_close (GstRTSPClient * client) { GstRTSPClientPrivate *priv = client->priv; const gchar *tunnelid; @@ -799,6 +781,8 @@ close_connection (GstRTSPClient * client) g_source_destroy ((GSource *) priv->watch); priv->watch = NULL; gst_rtsp_client_set_send_func (client, NULL, NULL, NULL); + g_main_context_unref (priv->watch_context); + priv->watch_context = NULL; } } @@ -855,18 +839,8 @@ handle_teardown_request (GstRTSPClient * client, GstRTSPContext * ctx) g_signal_emit (client, gst_rtsp_client_signals[SIGNAL_TEARDOWN_REQUEST], 0, ctx); - /* make sure we unblock the backlog and don't accept new messages - * on the watch */ - gst_rtsp_watch_set_flushing (priv->watch, TRUE); - - /* unlink the all TCP callbacks */ - unlink_session_transports (client, session, sessmedia); - gst_rtsp_session_media_set_state (sessmedia, GST_STATE_NULL); - /* allow messages again so that we can send the reply */ - gst_rtsp_watch_set_flushing (priv->watch, FALSE); - /* unmanage the media in the session, returns false if all media session * are torn down. */ keep_session = gst_rtsp_session_release_media (session, sessmedia); @@ -1047,9 +1021,6 @@ handle_pause_request (GstRTSPClient * client, GstRTSPContext * ctx) rtspstate != GST_RTSP_STATE_RECORDING) goto invalid_state; - /* unlink the all TCP callbacks */ - unlink_session_transports (client, session, sessmedia); - /* then pause sending */ gst_rtsp_session_media_set_state (sessmedia, GST_STATE_PAUSED); @@ -1166,6 +1137,10 @@ handle_play_request (GstRTSPClient * client, GstRTSPContext * ctx) ctx->sessmedia = sessmedia; ctx->media = media = gst_rtsp_session_media_get_media (sessmedia); + if (!(gst_rtsp_media_get_transport_mode (media) & + GST_RTSP_TRANSPORT_MODE_PLAY)) + goto unsupported_mode; + /* the session state must be playing or ready */ rtspstate = gst_rtsp_session_media_get_rtsp_state (sessmedia); if (rtspstate != GST_RTSP_STATE_PLAYING && rtspstate != GST_RTSP_STATE_READY) @@ -1179,16 +1154,19 @@ handle_play_request (GstRTSPClient * client, GstRTSPContext * ctx) res = gst_rtsp_message_get_header (ctx->request, GST_RTSP_HDR_RANGE, &str, 0); if (res == GST_RTSP_OK) { if (gst_rtsp_range_parse (str, &range) == GST_RTSP_OK) { + GstRTSPMediaStatus media_status; + /* we have a range, seek to the position */ unit = range->unit; gst_rtsp_media_seek (media, range); gst_rtsp_range_free (range); + + media_status = gst_rtsp_media_get_status (media); + if (media_status == GST_RTSP_MEDIA_STATUS_ERROR) + goto seek_failed; } } - /* link the all TCP callbacks */ - link_session_transports (client, session, sessmedia); - /* grab RTPInfo from the media now */ rtpinfo = gst_rtsp_session_media_get_rtpinfo (sessmedia); @@ -1258,6 +1236,18 @@ unsuspend_failed: send_generic_response (client, GST_RTSP_STS_SERVICE_UNAVAILABLE, ctx); return FALSE; } +seek_failed: + { + GST_ERROR ("client %p: seek failed", client); + send_generic_response (client, GST_RTSP_STS_SERVICE_UNAVAILABLE, ctx); + return FALSE; + } +unsupported_mode: + { + GST_ERROR ("client %p: media does not support PLAY", client); + send_generic_response (client, GST_RTSP_STS_METHOD_NOT_ALLOWED, ctx); + return FALSE; + } } static void @@ -1445,8 +1435,8 @@ no_address: } static GstRTSPTransport * -make_server_transport (GstRTSPClient * client, GstRTSPContext * ctx, - GstRTSPTransport * ct) +make_server_transport (GstRTSPClient * client, GstRTSPMedia * media, + GstRTSPContext * ctx, GstRTSPTransport * ct) { GstRTSPTransport *st; GInetAddress *addr; @@ -1458,6 +1448,8 @@ make_server_transport (GstRTSPClient * client, GstRTSPContext * ctx, st->trans = ct->trans; st->profile = ct->profile; st->lower_transport = ct->lower_transport; + st->mode_play = ct->mode_play; + st->mode_record = ct->mode_record; addr = g_inet_address_new_from_string (ct->destination); @@ -1488,7 +1480,9 @@ make_server_transport (GstRTSPClient * client, GstRTSPContext * ctx, break; } - gst_rtsp_stream_get_ssrc (ctx->stream, &st->ssrc); + if ((gst_rtsp_media_get_transport_mode (media) & + GST_RTSP_TRANSPORT_MODE_PLAY)) + gst_rtsp_stream_get_ssrc (ctx->stream, &st->ssrc); return st; } @@ -1659,7 +1653,8 @@ handle_mikey_data (GstRTSPClient * client, GstRTSPContext * ctx, gst_rtsp_stream_update_crypto (ctx->stream, map->ssrc, caps); gst_caps_unref (caps); } - gst_mikey_message_free (msg); + gst_mikey_message_unref (msg); + gst_buffer_unref (key); return TRUE; @@ -1691,7 +1686,7 @@ unsupported_encryption: } cleanup_message: { - gst_mikey_message_free (msg); + gst_mikey_message_unref (msg); return FALSE; } } @@ -1747,7 +1742,9 @@ handle_keymgmt (GstRTSPClient * client, GstRTSPContext * ctx, gchar * keymgmt) handle_mikey_data (client, ctx, data, size); } } + g_strfreev (split); } + g_strfreev (specs); return TRUE; } @@ -1768,8 +1765,9 @@ handle_setup_request (GstRTSPClient * client, GstRTSPContext * ctx) GstRTSPStream *stream; GstRTSPState rtspstate; GstRTSPClientClass *klass; - gchar *path, *control; + gchar *path, *control = NULL; gint matched; + gboolean new_session = FALSE; if (!ctx->uri) goto no_uri; @@ -1816,16 +1814,22 @@ handle_setup_request (GstRTSPClient * client, GstRTSPContext * ctx) if (media == NULL) goto media_not_found_no_reply; - if (path[matched] == '\0') - goto control_not_found; + if (path[matched] == '\0') { + if (gst_rtsp_media_n_streams (media) == 1) { + stream = gst_rtsp_media_get_stream (media, 0); + } else { + goto control_not_found; + } + } else { + /* path is what matched. */ + path[matched] = '\0'; + /* control is remainder */ + control = &path[matched + 1]; - /* path is what matched. */ - path[matched] = '\0'; - /* control is remainder */ - control = &path[matched + 1]; + /* find the stream now using the control part */ + stream = gst_rtsp_media_find_stream (media, control); + } - /* find the stream now using the control part */ - stream = gst_rtsp_media_find_stream (media, control); if (stream == NULL) goto stream_not_found; @@ -1842,6 +1846,7 @@ handle_setup_request (GstRTSPClient * client, GstRTSPContext * ctx) /* make sure this client is closed when the session is closed */ client_watch_session (client, session); + new_session = TRUE; /* signal new session */ g_signal_emit (client, gst_rtsp_client_signals[SIGNAL_NEW_SESSION], 0, session); @@ -1849,18 +1854,6 @@ handle_setup_request (GstRTSPClient * client, GstRTSPContext * ctx) ctx->session = session; } - if (sessmedia == NULL) { - /* manage the media in our session now, if not done already */ - sessmedia = gst_rtsp_session_manage_media (session, path, media); - /* if we stil have no media, error */ - if (sessmedia == NULL) - goto sessmedia_unavailable; - } else { - g_object_unref (media); - } - - ctx->sessmedia = sessmedia; - if (!klass->configure_client_media (client, media, stream, ctx)) goto configure_media_failed_no_reply; @@ -1870,9 +1863,12 @@ handle_setup_request (GstRTSPClient * client, GstRTSPContext * ctx) if (!parse_transport (transport, stream, ct)) goto unsupported_transports; - /* update the client transport */ - if (!klass->configure_client_transport (client, ctx, ct)) - goto unsupported_client_transport; + if ((ct->mode_play + && !(gst_rtsp_media_get_transport_mode (media) & + GST_RTSP_TRANSPORT_MODE_PLAY)) || (ct->mode_record + && !(gst_rtsp_media_get_transport_mode (media) & + GST_RTSP_TRANSPORT_MODE_RECORD))) + goto unsupported_mode; /* parse the keymgmt */ if (gst_rtsp_message_get_header (ctx->request, GST_RTSP_HDR_KEYMGMT, @@ -1881,19 +1877,53 @@ handle_setup_request (GstRTSPClient * client, GstRTSPContext * ctx) goto keymgmt_error; } + if (sessmedia == NULL) { + /* manage the media in our session now, if not done already */ + sessmedia = gst_rtsp_session_manage_media (session, path, media); + /* if we stil have no media, error */ + if (sessmedia == NULL) + goto sessmedia_unavailable; + + /* don't cache media anymore */ + clean_cached_media (client, FALSE); + } else { + g_object_unref (media); + } + + ctx->sessmedia = sessmedia; + + /* update the client transport */ + if (!klass->configure_client_transport (client, ctx, ct)) + goto unsupported_client_transport; + /* set in the session media transport */ trans = gst_rtsp_session_media_set_transport (sessmedia, stream, ct); + ctx->trans = trans; + /* configure the url used to set this transport, this we will use when * generating the response for the PLAY request */ gst_rtsp_stream_transport_set_url (trans, uri); - /* configure keepalive for this transport */ gst_rtsp_stream_transport_set_keepalive (trans, (GstRTSPKeepAliveFunc) do_keepalive, session, NULL); + if (ct->lower_transport == GST_RTSP_LOWER_TRANS_TCP) { + /* our callbacks to send data on this TCP connection */ + gst_rtsp_stream_transport_set_callbacks (trans, + (GstRTSPSendFunc) do_send_data, + (GstRTSPSendFunc) do_send_data, client, NULL); + + g_hash_table_insert (priv->transports, + GINT_TO_POINTER (ct->interleaved.min), trans); + g_object_ref (trans); + g_hash_table_insert (priv->transports, + GINT_TO_POINTER (ct->interleaved.max), trans); + g_object_ref (trans); + } + /* create and serialize the server transport */ - st = make_server_transport (client, ctx, ct); + st = make_server_transport (client, media, ctx, ct); trans_str = gst_rtsp_transport_as_text (st); gst_rtsp_transport_free (st); @@ -1967,7 +1997,8 @@ control_not_found: } stream_not_found: { - GST_ERROR ("client %p: stream '%s' not found", client, control); + GST_ERROR ("client %p: stream '%s' not found", client, + GST_STR_NULL (control)); send_generic_response (client, GST_RTSP_STS_NOT_FOUND, ctx); g_object_unref (media); goto cleanup_path; @@ -2004,6 +2035,17 @@ unsupported_client_transport: send_generic_response (client, GST_RTSP_STS_UNSUPPORTED_TRANSPORT, ctx); goto cleanup_transport; } +unsupported_mode: + { + GST_ERROR ("client %p: unsupported mode (media play: %d, media record: %d, " + "mode play: %d, mode record: %d)", client, + ! !(gst_rtsp_media_get_transport_mode (media) & + GST_RTSP_TRANSPORT_MODE_PLAY), + ! !(gst_rtsp_media_get_transport_mode (media) & + GST_RTSP_TRANSPORT_MODE_RECORD), ct->mode_play, ct->mode_record); + send_generic_response (client, GST_RTSP_STS_UNSUPPORTED_TRANSPORT, ctx); + goto cleanup_transport; + } keymgmt_error: { GST_ERROR ("client %p: keymgmt error", client); @@ -2014,6 +2056,8 @@ keymgmt_error: cleanup_transport: gst_rtsp_transport_free (ct); cleanup_session: + if (new_session) + gst_rtsp_session_pool_remove (priv->session_pool, session); g_object_unref (session); cleanup_path: g_free (path); @@ -2028,6 +2072,8 @@ create_sdp (GstRTSPClient * client, GstRTSPMedia * media) GstSDPMessage *sdp; GstSDPInfo info; const gchar *proto; + guint64 session_id_tmp; + gchar session_id[21]; gst_sdp_message_new (&sdp); @@ -2039,7 +2085,11 @@ create_sdp (GstRTSPClient * client, GstRTSPMedia * media) else proto = "IP4"; - gst_sdp_message_set_origin (sdp, "-", "1188340656180883", "1", "IN", proto, + session_id_tmp = (((guint64) g_random_int ()) << 32) | g_random_int (); + g_snprintf (session_id, sizeof (session_id), "%" G_GUINT64_FORMAT, + session_id_tmp); + + gst_sdp_message_set_origin (sdp, "-", session_id, "1", "IN", proto, priv->server_ip); gst_sdp_message_set_session_name (sdp, "Session streamed with GStreamer"); @@ -2109,6 +2159,10 @@ handle_describe_request (GstRTSPClient * client, GstRTSPContext * ctx) if (!(media = find_media (client, ctx, path, NULL))) goto no_media; + if (!(gst_rtsp_media_get_transport_mode (media) & + GST_RTSP_TRANSPORT_MODE_PLAY)) + goto unsupported_mode; + /* create an SDP for the media object on this client */ if (!(sdp = klass->create_sdp (client, media))) goto no_sdp; @@ -2168,6 +2222,14 @@ no_media: /* error reply is already sent */ return FALSE; } +unsupported_mode: + { + GST_ERROR ("client %p: media does not support DESCRIBE", client); + send_generic_response (client, GST_RTSP_STS_METHOD_NOT_ALLOWED, ctx); + g_free (path); + g_object_unref (media); + return FALSE; + } no_sdp: { GST_ERROR ("client %p: can't create SDP", client); @@ -2179,6 +2241,294 @@ no_sdp: } static gboolean +handle_sdp (GstRTSPClient * client, GstRTSPContext * ctx, GstRTSPMedia * media, + GstSDPMessage * sdp) +{ + GstRTSPClientPrivate *priv = client->priv; + GstRTSPThread *thread; + + /* create an SDP for the media object */ + if (!gst_rtsp_media_handle_sdp (media, sdp)) + goto unhandled_sdp; + + thread = gst_rtsp_thread_pool_get_thread (priv->thread_pool, + GST_RTSP_THREAD_TYPE_MEDIA, ctx); + if (thread == NULL) + goto no_thread; + + /* prepare the media */ + if (!gst_rtsp_media_prepare (media, thread)) + goto no_prepare; + + return TRUE; + + /* ERRORS */ +unhandled_sdp: + { + GST_ERROR ("client %p: could not handle SDP", client); + return FALSE; + } +no_thread: + { + GST_ERROR ("client %p: can't create thread", client); + return FALSE; + } +no_prepare: + { + GST_ERROR ("client %p: can't prepare media", client); + return FALSE; + } +} + +static gboolean +handle_announce_request (GstRTSPClient * client, GstRTSPContext * ctx) +{ + GstRTSPClientPrivate *priv = client->priv; + GstRTSPClientClass *klass; + GstSDPResult sres; + GstSDPMessage *sdp; + GstRTSPMedia *media; + gchar *path, *cont = NULL; + guint8 *data; + guint size; + + klass = GST_RTSP_CLIENT_GET_CLASS (client); + + if (!ctx->uri) + goto no_uri; + + if (!priv->mount_points) + goto no_mount_points; + + /* check if reply is SDP */ + gst_rtsp_message_get_header (ctx->request, GST_RTSP_HDR_CONTENT_TYPE, &cont, + 0); + /* could not be set but since the request returned OK, we assume it + * was SDP, else check it. */ + if (cont) { + if (g_ascii_strcasecmp (cont, "application/sdp") != 0) + goto wrong_content_type; + } + + /* get message body and parse as SDP */ + gst_rtsp_message_get_body (ctx->request, &data, &size); + if (data == NULL || size == 0) + goto no_message; + + GST_DEBUG ("client %p: parse SDP...", client); + gst_sdp_message_new (&sdp); + sres = gst_sdp_message_parse_buffer (data, size, sdp); + if (sres != GST_SDP_OK) + goto sdp_parse_failed; + + if (!(path = gst_rtsp_mount_points_make_path (priv->mount_points, ctx->uri))) + goto no_path; + + /* find the media object for the uri */ + if (!(media = find_media (client, ctx, path, NULL))) + goto no_media; + + if (!(gst_rtsp_media_get_transport_mode (media) & + GST_RTSP_TRANSPORT_MODE_RECORD)) + goto unsupported_mode; + + /* Tell client subclass about the media */ + if (!klass->handle_sdp (client, ctx, media, sdp)) + goto unhandled_sdp; + + /* we suspend after the announce */ + gst_rtsp_media_suspend (media); + g_object_unref (media); + + gst_rtsp_message_init_response (ctx->response, GST_RTSP_STS_OK, + gst_rtsp_status_as_text (GST_RTSP_STS_OK), ctx->request); + + send_message (client, ctx, ctx->response, FALSE); + + g_signal_emit (client, gst_rtsp_client_signals[SIGNAL_ANNOUNCE_REQUEST], + 0, ctx); + + gst_sdp_message_free (sdp); + g_free (path); + return TRUE; + +no_uri: + { + GST_ERROR ("client %p: no uri", client); + send_generic_response (client, GST_RTSP_STS_BAD_REQUEST, ctx); + return FALSE; + } +no_mount_points: + { + GST_ERROR ("client %p: no mount points configured", client); + send_generic_response (client, GST_RTSP_STS_NOT_FOUND, ctx); + return FALSE; + } +no_path: + { + GST_ERROR ("client %p: can't find path for url", client); + send_generic_response (client, GST_RTSP_STS_NOT_FOUND, ctx); + gst_sdp_message_free (sdp); + return FALSE; + } +wrong_content_type: + { + GST_ERROR ("client %p: unknown content type", client); + send_generic_response (client, GST_RTSP_STS_BAD_REQUEST, ctx); + return FALSE; + } +no_message: + { + GST_ERROR ("client %p: can't find SDP message", client); + send_generic_response (client, GST_RTSP_STS_BAD_REQUEST, ctx); + return FALSE; + } +sdp_parse_failed: + { + GST_ERROR ("client %p: failed to parse SDP message", client); + send_generic_response (client, GST_RTSP_STS_BAD_REQUEST, ctx); + gst_sdp_message_free (sdp); + return FALSE; + } +no_media: + { + GST_ERROR ("client %p: no media", client); + g_free (path); + /* error reply is already sent */ + gst_sdp_message_free (sdp); + return FALSE; + } +unsupported_mode: + { + GST_ERROR ("client %p: media does not support ANNOUNCE", client); + send_generic_response (client, GST_RTSP_STS_METHOD_NOT_ALLOWED, ctx); + g_free (path); + g_object_unref (media); + gst_sdp_message_free (sdp); + return FALSE; + } +unhandled_sdp: + { + GST_ERROR ("client %p: can't handle SDP", client); + send_generic_response (client, GST_RTSP_STS_UNSUPPORTED_MEDIA_TYPE, ctx); + g_free (path); + g_object_unref (media); + gst_sdp_message_free (sdp); + return FALSE; + } +} + +static gboolean +handle_record_request (GstRTSPClient * client, GstRTSPContext * ctx) +{ + GstRTSPSession *session; + GstRTSPClientClass *klass; + GstRTSPSessionMedia *sessmedia; + GstRTSPMedia *media; + GstRTSPUrl *uri; + GstRTSPState rtspstate; + gchar *path; + gint matched; + + if (!(session = ctx->session)) + goto no_session; + + if (!(uri = ctx->uri)) + goto no_uri; + + klass = GST_RTSP_CLIENT_GET_CLASS (client); + path = klass->make_path_from_uri (client, uri); + + /* get a handle to the configuration of the media in the session */ + sessmedia = gst_rtsp_session_get_media (session, path, &matched); + if (!sessmedia) + goto not_found; + + if (path[matched] != '\0') + goto no_aggregate; + + g_free (path); + + ctx->sessmedia = sessmedia; + ctx->media = media = gst_rtsp_session_media_get_media (sessmedia); + + if (!(gst_rtsp_media_get_transport_mode (media) & + GST_RTSP_TRANSPORT_MODE_RECORD)) + goto unsupported_mode; + + /* the session state must be playing or ready */ + rtspstate = gst_rtsp_session_media_get_rtsp_state (sessmedia); + if (rtspstate != GST_RTSP_STATE_PLAYING && rtspstate != GST_RTSP_STATE_READY) + goto invalid_state; + + /* in play we first unsuspend, media could be suspended from SDP or PAUSED */ + if (!gst_rtsp_media_unsuspend (media)) + goto unsuspend_failed; + + gst_rtsp_message_init_response (ctx->response, GST_RTSP_STS_OK, + gst_rtsp_status_as_text (GST_RTSP_STS_OK), ctx->request); + + send_message (client, ctx, ctx->response, FALSE); + + /* start playing after sending the response */ + gst_rtsp_session_media_set_state (sessmedia, GST_STATE_PLAYING); + + gst_rtsp_session_media_set_rtsp_state (sessmedia, GST_RTSP_STATE_PLAYING); + + g_signal_emit (client, gst_rtsp_client_signals[SIGNAL_RECORD_REQUEST], 0, + ctx); + + return TRUE; + + /* ERRORS */ +no_session: + { + GST_ERROR ("client %p: no session", client); + send_generic_response (client, GST_RTSP_STS_SESSION_NOT_FOUND, ctx); + return FALSE; + } +no_uri: + { + GST_ERROR ("client %p: no uri supplied", client); + send_generic_response (client, GST_RTSP_STS_BAD_REQUEST, ctx); + return FALSE; + } +not_found: + { + GST_ERROR ("client %p: media not found", client); + send_generic_response (client, GST_RTSP_STS_NOT_FOUND, ctx); + return FALSE; + } +no_aggregate: + { + GST_ERROR ("client %p: no aggregate path %s", client, path); + send_generic_response (client, + GST_RTSP_STS_ONLY_AGGREGATE_OPERATION_ALLOWED, ctx); + g_free (path); + return FALSE; + } +unsupported_mode: + { + GST_ERROR ("client %p: media does not support RECORD", client); + send_generic_response (client, GST_RTSP_STS_METHOD_NOT_ALLOWED, ctx); + return FALSE; + } +invalid_state: + { + GST_ERROR ("client %p: not PLAYING or READY", client); + send_generic_response (client, GST_RTSP_STS_METHOD_NOT_VALID_IN_THIS_STATE, + ctx); + return FALSE; + } +unsuspend_failed: + { + GST_ERROR ("client %p: unsuspend failed", client); + send_generic_response (client, GST_RTSP_STS_SERVICE_UNAVAILABLE, ctx); + return FALSE; + } +} + +static gboolean handle_options_request (GstRTSPClient * client, GstRTSPContext * ctx) { GstRTSPMethod options; @@ -2244,7 +2594,11 @@ client_session_removed (GstRTSPSessionPool * pool, GstRTSPSession * session, GST_INFO ("client %p: session %p removed", client, session); g_mutex_lock (&priv->lock); + if (priv->watch != NULL) + gst_rtsp_watch_set_send_backlog (priv->watch, 0, 0); client_unwatch_session (client, session, NULL); + if (priv->watch != NULL) + gst_rtsp_watch_set_send_backlog (priv->watch, 0, WATCH_BACKLOG_SIZE); g_mutex_unlock (&priv->lock); } @@ -2401,6 +2755,37 @@ handle_request (GstRTSPClient * client, GstRTSPMessage * request) if (!check_request_requirements (ctx->request, &unsupported_reqs)) goto unsupported_requirement; + /* the backlog must be unlimited while processing requests. + * the causes of this are two cases of deadlocks while streaming over TCP: + * + * 1. consider the scenario where the media pipeline's streaming thread + * is blocking in the appsink (taking the appsink's preroll lock) because + * the backlog is full. when a PAUSE request is received by the RTSP + * client thread then the the state of the session media ought to change + * to PAUSED. while most elements in the pipeline can change state this + * can never happen for the appsink since its preroll lock is taken by + * another thread. + * + * 2. consider the scenario where the media pipeline's streaming thread + * is blocking in the appsink new_sample callback (taking the send lock + * in RTSP client) because the backlog is full. when e.g. a GET request + * is received by the RTSP client thread then a response ought to be sent + * but this can never happen since it requires taking the send lock + * already taken by another thread. + * + * the reason that the backlog is never emptied is that the source used + * for dequeing messages from the backlog is never dispatched because it + * is attached to the same mainloop as the source receving RTSP requests and + * therefore run by the RTSP client thread which is alreayd blocking. + * + * without significant changes the easiest way to cope with this is to + * not block indefinitely when the backlog is full, but rather let the + * backlog grow in size. this in effect means that there can not be any + * upper boundary on its size. + */ + if (priv->watch != NULL) + gst_rtsp_watch_set_send_backlog (priv->watch, 0, 0); + /* now see what is asked and dispatch to a dedicated handler */ switch (method) { case GST_RTSP_OPTIONS: @@ -2428,14 +2813,25 @@ handle_request (GstRTSPClient * client, GstRTSPMessage * request) handle_get_param_request (client, ctx); break; case GST_RTSP_ANNOUNCE: + handle_announce_request (client, ctx); + break; case GST_RTSP_RECORD: + handle_record_request (client, ctx); + break; case GST_RTSP_REDIRECT: + if (priv->watch != NULL) + gst_rtsp_watch_set_send_backlog (priv->watch, 0, WATCH_BACKLOG_SIZE); goto not_implemented; case GST_RTSP_INVALID: default: + if (priv->watch != NULL) + gst_rtsp_watch_set_send_backlog (priv->watch, 0, WATCH_BACKLOG_SIZE); goto bad_request; } + if (priv->watch != NULL) + gst_rtsp_watch_set_send_backlog (priv->watch, 0, WATCH_BACKLOG_SIZE); + done: if (ctx == &sctx) gst_rtsp_context_pop_current (ctx); @@ -2569,48 +2965,47 @@ handle_data (GstRTSPClient * client, GstRTSPMessage * message) GstRTSPClientPrivate *priv = client->priv; GstRTSPResult res; guint8 channel; - GList *walk; guint8 *data; guint size; GstBuffer *buffer; - gboolean handled; + GstRTSPStreamTransport *trans; /* find the stream for this message */ res = gst_rtsp_message_parse_data (message, &channel); if (res != GST_RTSP_OK) return; + gst_rtsp_message_get_body (message, &data, &size); + if (size < 2) + goto invalid_length; + gst_rtsp_message_steal_body (message, &data, &size); - buffer = gst_buffer_new_wrapped (data, size); + /* Strip trailing \0 (which GstRTSPConnection adds) */ + --size; - handled = FALSE; - for (walk = priv->transports; walk; walk = g_list_next (walk)) { - GstRTSPStreamTransport *trans; - GstRTSPStream *stream; - const GstRTSPTransport *tr; + buffer = gst_buffer_new_wrapped (data, size); - trans = walk->data; + trans = + g_hash_table_lookup (priv->transports, GINT_TO_POINTER ((gint) channel)); + if (trans) { + /* dispatch to the stream based on the channel number */ + GST_LOG_OBJECT (client, "%u bytes of data on channel %u", size, channel); + gst_rtsp_stream_transport_recv_data (trans, channel, buffer); + } else { + GST_DEBUG_OBJECT (client, "received %u bytes of data for " + "unknown channel %u", size, channel); + gst_buffer_unref (buffer); + } - tr = gst_rtsp_stream_transport_get_transport (trans); - stream = gst_rtsp_stream_transport_get_stream (trans); + return; - /* check for TCP transport */ - if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) { - /* dispatch to the stream based on the channel number */ - if (tr->interleaved.min == channel) { - gst_rtsp_stream_recv_rtp (stream, buffer); - handled = TRUE; - break; - } else if (tr->interleaved.max == channel) { - gst_rtsp_stream_recv_rtcp (stream, buffer); - handled = TRUE; - break; - } - } +/* ERRORS */ +invalid_length: + { + GST_DEBUG ("client %p: Short message received, ignoring", client); + return; } - if (!handled) - gst_buffer_unref (buffer); } /** @@ -2640,13 +3035,10 @@ gst_rtsp_client_set_session_pool (GstRTSPClient * client, old = priv->session_pool; priv->session_pool = pool; - if (priv->session_removed_id) + if (priv->session_removed_id) { g_signal_handler_disconnect (old, priv->session_removed_id); - if (pool) - priv->session_removed_id = g_signal_connect (pool, "session-removed", - G_CALLBACK (client_session_removed), client); - else priv->session_removed_id = 0; + } g_mutex_unlock (&priv->lock); /* FIXME, should remove all sessions from the old pool for this client */ @@ -3106,7 +3498,7 @@ message_sent (GstRTSPWatch * watch, guint cseq, gpointer user_data) if (priv->close_seq && priv->close_seq == cseq) { GST_INFO ("client %p: send close message", client); priv->close_seq = 0; - close_connection (client); + gst_rtsp_client_close (client); } return GST_RTSP_OK; @@ -3129,7 +3521,9 @@ closed (GstRTSPWatch * watch, gpointer user_data) } gst_rtsp_watch_set_flushing (watch, TRUE); + g_mutex_lock (&priv->watch_lock); gst_rtsp_client_set_send_func (client, NULL, NULL, NULL); + g_mutex_unlock (&priv->watch_lock); return GST_RTSP_OK; } @@ -3249,6 +3643,7 @@ handle_tunnel (GstRTSPClient * client) opriv = oclient->priv; + g_mutex_lock (&opriv->watch_lock); if (opriv->watch == NULL) goto tunnel_closed; @@ -3258,13 +3653,12 @@ handle_tunnel (GstRTSPClient * client) gst_rtsp_connection_do_tunnel (opriv->connection, priv->connection); gst_rtsp_watch_reset (priv->watch); gst_rtsp_watch_reset (opriv->watch); + g_mutex_unlock (&opriv->watch_lock); g_object_unref (oclient); /* the old client owns the tunnel now, the new one will be freed */ g_source_destroy ((GSource *) priv->watch); priv->watch = NULL; - g_main_context_unref (priv->watch_context); - priv->watch_context = NULL; gst_rtsp_client_set_send_func (client, NULL, NULL, NULL); } @@ -3279,6 +3673,7 @@ no_tunnelid: tunnel_closed: { GST_ERROR ("client %p: tunnel session %s was closed", client, tunnelid); + g_mutex_unlock (&opriv->watch_lock); g_object_unref (oclient); return FALSE; } @@ -3349,8 +3744,8 @@ client_watch_notify (GstRTSPClient * client) GST_INFO ("client %p: watch destroyed", client); priv->watch = NULL; - g_main_context_unref (priv->watch_context); - priv->watch_context = NULL; + /* remove all sessions and so drop the extra client ref */ + gst_rtsp_client_session_filter (client, cleanup_session, NULL); g_signal_emit (client, gst_rtsp_client_signals[SIGNAL_CLOSED], 0, NULL); g_object_unref (client); } @@ -3389,11 +3784,9 @@ gst_rtsp_client_attach (GstRTSPClient * client, GMainContext * context) gst_rtsp_client_set_send_func (client, do_send_message, priv->watch, (GDestroyNotify) gst_rtsp_watch_unref); - /* FIXME make this configurable. We don't want to do this yet because it will - * be superceeded by a cache object later */ - gst_rtsp_watch_set_send_backlog (priv->watch, 0, 100); + gst_rtsp_watch_set_send_backlog (priv->watch, 0, WATCH_BACKLOG_SIZE); - GST_INFO ("attaching to context %p", context); + GST_INFO ("client %p: attaching to context %p", client, context); res = gst_rtsp_watch_attach (priv->watch, context); return res; @@ -3430,29 +3823,50 @@ gst_rtsp_client_session_filter (GstRTSPClient * client, { GstRTSPClientPrivate *priv; GList *result, *walk, *next; + GHashTable *visited; + guint cookie; g_return_val_if_fail (GST_IS_RTSP_CLIENT (client), NULL); priv = client->priv; result = NULL; + if (func) + visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL); g_mutex_lock (&priv->lock); +restart: + cookie = priv->sessions_cookie; for (walk = priv->sessions; walk; walk = next) { GstRTSPSession *sess = walk->data; GstRTSPFilterResult res; + gboolean changed; next = g_list_next (walk); - if (func) + if (func) { + /* only visit each session once */ + if (g_hash_table_contains (visited, sess)) + continue; + + g_hash_table_add (visited, g_object_ref (sess)); + g_mutex_unlock (&priv->lock); + res = func (client, sess, user_data); - else + + g_mutex_lock (&priv->lock); + } else res = GST_RTSP_FILTER_REF; + changed = (cookie != priv->sessions_cookie); + switch (res) { case GST_RTSP_FILTER_REMOVE: - /* stop watching the session and pretent it went away */ - client_unwatch_session (client, sess, walk); + /* stop watching the session and pretend it went away, if the list was + * changed, we can't use the current list position, try to see if we + * still have the session */ + client_unwatch_session (client, sess, changed ? NULL : walk); + cookie = priv->sessions_cookie; break; case GST_RTSP_FILTER_REF: result = g_list_prepend (result, g_object_ref (sess)); @@ -3461,8 +3875,13 @@ gst_rtsp_client_session_filter (GstRTSPClient * client, default: break; } + if (changed) + goto restart; } g_mutex_unlock (&priv->lock); + if (func) + g_hash_table_unref (visited); + return result; }