rtsp-media: Handle set state when preparing.
authorGöran Jönsson <goranjn@axis.com>
Thu, 14 Mar 2019 06:37:26 +0000 (07:37 +0100)
committerGöran Jönsson <goranjn@axis.com>
Wed, 20 Mar 2019 11:26:50 +0000 (12:26 +0100)
Handle the situation when  a call to gst_rtsp_media_set_state is done
when media status is preparing.

Also add unit test for this scenario.

The unit test simulate on a media level when two clients share a (live)
media.
Both clients have done SETUP and got responses. Now client 1 is doing
play and client 2 is just closing the connection.

Then without patch there are a problem when
client1 is calling gst_rtsp_media_unsuspend in handle_play_request.
And client2 is doing closing connection we can end up in a call
to gst_rtsp_media_set_state when
priv->status == GST_RTSP_MEDIA_STATUS_PREPARING and all the logic for
shut down media is jumped over .

With this patch and this scenario we wait until
priv->status == GST_RTSP_MEDIA_STATUS_PREPARED and then continue to
execute after that and now we will execute the logic for
shut down media.

gst/rtsp-server/rtsp-media.c
tests/check/gst/media.c

index be98afd..719f284 100644 (file)
@@ -4376,6 +4376,13 @@ gst_rtsp_media_set_state (GstRTSPMedia * media, GstState state,
   priv = media->priv;
 
   g_rec_mutex_lock (&priv->state_lock);
+
+  if (priv->status == GST_RTSP_MEDIA_STATUS_PREPARING
+      && gst_rtsp_media_is_shared (media)) {
+    g_rec_mutex_unlock (&priv->state_lock);
+    gst_rtsp_media_get_status (media);
+    g_rec_mutex_lock (&priv->state_lock);
+  }
   if (priv->status == GST_RTSP_MEDIA_STATUS_ERROR)
     goto error_status;
   if (priv->status != GST_RTSP_MEDIA_STATUS_PREPARED &&
index 20527fc..5ae704c 100644 (file)
@@ -402,6 +402,173 @@ GST_START_TEST (test_media_prepare)
 
 GST_END_TEST;
 
+enum _SyncState
+{
+  SYNC_STATE_INIT,
+  SYNC_STATE_1,
+  SYNC_STATE_2,
+  SYNC_STATE_RACE
+};
+typedef enum _SyncState SyncState;
+
+struct _help_thread_data
+{
+  GstRTSPThreadPool *pool;
+  GstRTSPMedia *media;
+  GstRTSPTransport *transport;
+  GstRTSPStream *stream;
+  SyncState *state;
+  GMutex *sync_mutex;
+  GCond *sync_cond;
+};
+typedef struct _help_thread_data help_thread_data;
+
+static gpointer
+help_thread_main (gpointer user_data)
+{
+  help_thread_data *data;
+  GstRTSPThread *thread;
+  GPtrArray *transports;
+  GstRTSPStreamTransport *stream_transport;
+
+  data = (help_thread_data *) user_data;
+  GST_INFO ("Another thread sharing media");
+
+  /* wait SYNC_STATE_1 */
+  g_mutex_lock (data->sync_mutex);
+  while (*data->state < SYNC_STATE_1)
+    g_cond_wait (data->sync_cond, data->sync_mutex);
+  g_mutex_unlock (data->sync_mutex);
+
+  /* prepare */
+  thread = gst_rtsp_thread_pool_get_thread (data->pool,
+      GST_RTSP_THREAD_TYPE_MEDIA, NULL);
+  fail_unless (gst_rtsp_media_prepare (data->media, thread));
+
+  /* set SYNC_STATE_2 */
+  g_mutex_lock (data->sync_mutex);
+  *data->state = SYNC_STATE_2;
+  g_cond_signal (data->sync_cond);
+  g_mutex_unlock (data->sync_mutex);
+
+  /* wait SYNC_STATE_RACE */
+  g_mutex_lock (data->sync_mutex);
+  while (*data->state < SYNC_STATE_RACE)
+    g_cond_wait (data->sync_cond, data->sync_mutex);
+  g_mutex_unlock (data->sync_mutex);
+
+  /* set state */
+  transports = g_ptr_array_new_with_free_func (g_object_unref);
+  fail_unless (transports != NULL);
+  stream_transport =
+      gst_rtsp_stream_transport_new (data->stream, data->transport);
+  fail_unless (stream_transport != NULL);
+  g_ptr_array_add (transports, stream_transport);
+  fail_unless (gst_rtsp_media_set_state (data->media, GST_STATE_NULL,
+          transports));
+
+  /* clean up */
+  GST_INFO ("Thread exit");
+  fail_unless (gst_rtsp_media_unprepare (data->media));
+  g_ptr_array_unref (transports);
+  return NULL;
+}
+
+GST_START_TEST (test_media_shared_race_test_unsuspend_vs_set_state_null)
+{
+  help_thread_data data;
+  GstRTSPMediaFactory *factory;
+  GstRTSPMedia *media;
+  GstRTSPUrl *url;
+  GstRTSPThreadPool *pool;
+  GstRTSPThread *thread;
+  GThread *sharing_media_thread;
+  GstRTSPTransport *transport;
+  GstRTSPStream *stream;
+  SyncState state = SYNC_STATE_INIT;
+  GMutex sync_mutex;
+  GCond sync_cond;
+
+  g_mutex_init (&sync_mutex);
+  g_cond_init (&sync_cond);
+
+  pool = gst_rtsp_thread_pool_new ();
+
+  /* test non-reusable media first */
+  factory = gst_rtsp_media_factory_new ();
+  gst_rtsp_media_factory_set_shared (factory, TRUE);
+  fail_unless (gst_rtsp_url_parse ("rtsp://localhost:8554/test",
+          &url) == GST_RTSP_OK);
+
+  gst_rtsp_media_factory_set_launch (factory,
+      "( videotestsrc ! rtpvrawpay pt=96 name=pay0 )");
+
+  media = gst_rtsp_media_factory_construct (factory, url);
+  fail_unless (GST_IS_RTSP_MEDIA (media));
+  fail_unless (gst_rtsp_media_n_streams (media) == 1);
+  gst_rtsp_media_set_suspend_mode (media, GST_RTSP_SUSPEND_MODE_RESET);
+
+  stream = gst_rtsp_media_get_stream (media, 0);
+  fail_unless (stream != NULL);
+
+  thread = gst_rtsp_thread_pool_get_thread (pool,
+      GST_RTSP_THREAD_TYPE_MEDIA, NULL);
+  fail_unless (gst_rtsp_media_prepare (media, thread));
+
+  /* help thread */
+  data.pool = pool;
+  data.media = media;
+  data.stream = stream;
+  data.state = &state;
+  data.sync_mutex = &sync_mutex;
+  data.sync_cond = &sync_cond;
+  sharing_media_thread = g_thread_new ("new thread", help_thread_main, &data);
+  fail_unless (sharing_media_thread != NULL);
+
+  /* set state SYNC_STATE_1 */
+  g_mutex_lock (&sync_mutex);
+  state = SYNC_STATE_1;
+  g_cond_signal (&sync_cond);
+  g_mutex_unlock (&sync_mutex);
+
+  /* wait SYNC_STATE_2 */
+  g_mutex_lock (&sync_mutex);
+  while (state < SYNC_STATE_2)
+    g_cond_wait (&sync_cond, &sync_mutex);
+  g_mutex_unlock (&sync_mutex);
+
+  gst_rtsp_media_suspend (media);
+
+  fail_unless (gst_rtsp_transport_new (&transport) == GST_RTSP_OK);
+  transport->lower_transport = GST_RTSP_LOWER_TRANS_TCP;
+  fail_unless (gst_rtsp_stream_complete_stream (stream, transport));
+  data.transport = transport;
+
+  /* set state SYNC_STATE_RACE let the race begin unsuspend <-> set state GST_STATE_NULL */
+  g_mutex_lock (&sync_mutex);
+  state = SYNC_STATE_RACE;
+  g_cond_signal (&sync_cond);
+  g_mutex_unlock (&sync_mutex);
+
+  fail_unless (gst_rtsp_media_unsuspend (media));
+
+  /* sync end of other thread */
+  g_thread_join (sharing_media_thread);
+
+  /* clean up */
+  g_cond_clear (&sync_cond);
+  g_mutex_clear (&sync_mutex);
+  fail_unless (gst_rtsp_media_unprepare (media));
+  g_object_unref (media);
+  gst_rtsp_url_free (url);
+  g_object_unref (factory);
+  g_object_unref (pool);
+  gst_rtsp_thread_pool_cleanup ();
+}
+
+GST_END_TEST;
+
+
 #define FLAG_HAVE_CAPS GST_ELEMENT_FLAG_LAST
 static void
 on_notify_caps (GstPad * pad, GParamSpec * pspec, GstElement * pay)
@@ -663,6 +830,7 @@ rtspmedia_suite (void)
   tcase_add_test (tc, test_media_seek_one_active_stream);
   tcase_add_test (tc, test_media);
   tcase_add_test (tc, test_media_prepare);
+  tcase_add_test (tc, test_media_shared_race_test_unsuspend_vs_set_state_null);
   tcase_add_test (tc, test_media_reusable);
   tcase_add_test (tc, test_media_dyn_prepare);
   tcase_add_test (tc, test_media_take_pipeline);