Add TCP transports
authorWim Taymans <wim.taymans@collabora.co.uk>
Wed, 11 Mar 2009 15:45:12 +0000 (16:45 +0100)
committerWim Taymans <wim.taymans@collabora.co.uk>
Wed, 11 Mar 2009 15:45:12 +0000 (16:45 +0100)
Use appsrc and appsink to send and receive RTP/RTCP packets in the TCP
connection.

gst/rtsp-server/rtsp-client.c
gst/rtsp-server/rtsp-client.h
gst/rtsp-server/rtsp-media.c
gst/rtsp-server/rtsp-media.h
gst/rtsp-server/rtsp-server.c
gst/rtsp-server/rtsp-session.c
gst/rtsp-server/rtsp-session.h

index 811ae03..70b0e23 100644 (file)
@@ -190,9 +190,6 @@ send_response (GstRTSPClient *client, GstRTSPSession *session, GstRTSPMessage *r
     gst_rtsp_message_remove_header (response, GST_RTSP_HDR_SESSION, -1);
   }
 
-#if 0
-  gst_rtsp_connection_send (client->connection, response, &timeout);
-#endif
   gst_rtsp_watch_queue_message (client->watch, response);
   gst_rtsp_message_unset (response);
 }
@@ -297,6 +294,81 @@ no_prepare:
 }
 
 static gboolean
+do_send_data (GstBuffer *buffer, guint8 channel, GstRTSPClient *client)
+{
+  GstRTSPMessage message = { 0 };
+  guint8 *data;
+  guint size;
+
+  gst_rtsp_message_init_data (&message, channel);
+
+  data = GST_BUFFER_DATA (buffer);
+  size = GST_BUFFER_SIZE (buffer);
+  gst_rtsp_message_take_body (&message, data, size);
+
+  gst_rtsp_watch_queue_message (client->watch, &message);
+
+  gst_rtsp_message_steal_body (&message, &data, &size);
+  gst_rtsp_message_unset (&message);
+
+  return TRUE;
+}
+
+static void
+link_stream (GstRTSPClient *client, GstRTSPSessionStream *stream)
+{
+  gst_rtsp_session_stream_set_callbacks (stream, (GstRTSPSendFunc) do_send_data,
+       (GstRTSPSendFunc) do_send_data, client, NULL);
+  client->streams = g_list_prepend (client->streams, stream);
+}
+
+static void
+unlink_stream (GstRTSPClient *client, GstRTSPSessionStream *stream)
+{
+  gst_rtsp_session_stream_set_callbacks (stream, NULL,
+       NULL, client, g_object_unref);
+  client->streams = g_list_remove (client->streams, stream);
+}
+
+static void
+unlink_streams (GstRTSPClient *client)
+{
+  GList *walk;
+
+  for (walk = client->streams; walk; walk = g_list_next (walk)) {
+    GstRTSPSessionStream *stream = (GstRTSPSessionStream *) walk->data;
+
+    gst_rtsp_session_stream_set_callbacks (stream, NULL,
+         NULL, NULL, NULL);
+  }
+  g_list_free (client->streams);
+  client->streams = NULL;
+}
+
+static void
+unlink_session_streams (GstRTSPClient *client, GstRTSPSessionMedia *media)
+{
+  guint n_streams, i;
+
+  n_streams = gst_rtsp_media_n_streams (media->media);
+  for (i = 0; i < n_streams; i++) {
+    GstRTSPSessionStream *sstream;
+    GstRTSPTransport *tr;
+
+    /* get the stream as configured in the session */
+    sstream = gst_rtsp_session_media_get_stream (media, i);
+    /* get the transport, if there is no transport configured, skip this stream */
+    if (!(tr = sstream->trans.transport))
+      continue;
+
+    if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) {
+      /* for TCP, unlink the stream from the TCP connection of the client */
+      unlink_stream (client, sstream);
+    }
+  }
+}
+
+static gboolean
 handle_teardown_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *session, GstRTSPMessage *request)
 {
   GstRTSPSessionMedia *media;
@@ -311,7 +383,10 @@ handle_teardown_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession
   if (!media)
     goto not_found;
 
-  gst_rtsp_session_media_stop (media);
+  /* unlink the all TCP callbacks */
+  unlink_session_streams (client, media);
+
+  gst_rtsp_session_media_set_state (media, GST_STATE_NULL);
 
   /* unmanage the media in the session, returns false if all media session
    * are torn down. */
@@ -360,7 +435,11 @@ handle_pause_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
       media->state != GST_RTSP_STATE_RECORDING)
     goto invalid_state;
 
-  gst_rtsp_session_media_pause (media);
+  /* unlink the all TCP callbacks */
+  unlink_session_streams (client, media);
+
+  /* then pause sending */
+  gst_rtsp_session_media_set_state (media, GST_STATE_PAUSED);
 
   /* construct the response now */
   code = GST_RTSP_STS_OK;
@@ -420,10 +499,23 @@ handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *ses
 
   n_streams = gst_rtsp_media_n_streams (media->media);
   for (i = 0; i < n_streams; i++) {
+    GstRTSPSessionStream *sstream;
     GstRTSPMediaStream *stream;
+    GstRTSPTransport *tr;
     gchar *uristr;
 
-    stream = gst_rtsp_media_get_stream (media->media, i);
+    /* get the stream as configured in the session */
+    sstream = gst_rtsp_session_media_get_stream (media, i);
+    /* get the transport, if there is no transport configured, skip this stream */
+    if (!(tr = sstream->trans.transport))
+      continue;
+
+    if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) {
+      /* for TCP, link the stream to the TCP connection of the client */
+      link_stream (client, sstream);
+    }
+
+    stream = sstream->media_stream;
 
     g_object_get (G_OBJECT (stream->payloader), "seqnum", &seqnum, NULL);
     g_object_get (G_OBJECT (stream->payloader), "timestamp", &timestamp, NULL);
@@ -451,7 +543,7 @@ handle_play_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *ses
   send_response (client, session, &response);
 
   /* start playing after sending the request */
-  gst_rtsp_session_media_play (media);
+  gst_rtsp_session_media_set_state (media, GST_STATE_PLAYING);
 
   media->state = GST_RTSP_STATE_PLAYING;
 
@@ -594,12 +686,6 @@ handle_setup_request (GstRTSPClient *client, GstRTSPUrl *uri, GstRTSPSession *se
   if (!(stream = gst_rtsp_session_media_get_stream (media, streamid)))
     goto no_stream;
 
-  /* setup the server transport from the client transport */
-  if (ct->lower_transport == GST_RTSP_LOWER_TRANS_TCP) {
-    ct->port.min = gst_rtsp_connection_get_readfd (client->connection);
-    ct->port.max = gst_rtsp_connection_get_writefd (client->connection);
-  }
-
   st = gst_rtsp_session_stream_set_transport (stream, ct);
 
   /* serialize the server transport */
@@ -806,6 +892,8 @@ handle_request (GstRTSPClient *client, GstRTSPMessage *request)
   gst_rtsp_message_dump (request);
 #endif
 
+  g_message ("client %p: received a request", client);
+
   gst_rtsp_message_parse_request (request, &method, &uristr, &version);
 
   if (version != GST_RTSP_VERSION_1_0) {
@@ -889,6 +977,54 @@ session_not_found:
   }
 }
 
+static void
+handle_data (GstRTSPClient *client, GstRTSPMessage *message)
+{
+  GstRTSPResult res;
+  guint8 channel;
+  GList *walk;
+  guint8 *data;
+  guint size;
+  GstBuffer *buffer;
+
+  /* find the stream for this message */ 
+  res = gst_rtsp_message_parse_data (message, &channel);
+  if (res != GST_RTSP_OK)
+    return;
+
+  gst_rtsp_message_steal_body (message, &data, &size);
+
+  buffer = gst_buffer_new ();
+  GST_BUFFER_DATA (buffer) = data;
+  GST_BUFFER_MALLOCDATA (buffer) = data;
+  GST_BUFFER_SIZE (buffer) = size;
+
+  for (walk = client->streams; walk; walk = g_list_next (walk)) {
+    GstRTSPSessionStream *stream = (GstRTSPSessionStream *) walk->data;
+    GstRTSPMediaStream *mstream;
+    GstRTSPTransport *tr;
+
+    /* get the transport, if there is no transport configured, skip this stream */
+    if (!(tr = stream->trans.transport))
+      continue;
+
+    /* we also need a media stream */
+    if (!(mstream = stream->media_stream))
+      continue;
+
+    /* check for TCP transport */
+    if (tr->lower_transport == GST_RTSP_LOWER_TRANS_TCP) {
+      /* dispatch to the stream based on the channel number */
+      if (tr->interleaved.min == channel) {
+       gst_rtsp_media_stream_rtp (mstream, buffer);
+      } else if (tr->interleaved.max == channel) {
+       gst_rtsp_media_stream_rtcp (mstream, buffer);
+      }
+    }
+  }
+  gst_buffer_unref (buffer);
+}
+
 /**
  * gst_rtsp_client_set_timeout:
  * @client: a #GstRTSPClient
@@ -1008,8 +1144,6 @@ message_received (GstRTSPWatch *watch, GstRTSPMessage *message, gpointer user_da
 {
   GstRTSPClient *client = GST_RTSP_CLIENT (user_data);
 
-  g_message ("client %p: received a message", client);
-
   switch (message->type) {
     case GST_RTSP_MESSAGE_REQUEST:
       handle_request (client, message);
@@ -1017,6 +1151,7 @@ message_received (GstRTSPWatch *watch, GstRTSPMessage *message, gpointer user_da
     case GST_RTSP_MESSAGE_RESPONSE:
       break;
     case GST_RTSP_MESSAGE_DATA:
+      handle_data (client, message);
       break;
     default:
       break;
@@ -1048,6 +1183,9 @@ closed (GstRTSPWatch *watch, gpointer user_data)
     g_mutex_unlock (tunnels_lock);
   }
 
+  /* remove all streams that are streaming over this client connection */
+  unlink_streams (client);
+
   return GST_RTSP_OK;
 }
 
index 9949f1a..07b81d9 100644 (file)
@@ -59,10 +59,14 @@ typedef struct _GstRTSPClientClass GstRTSPClientClass;
  * GstRTSPClient:
  *
  * @connection: the connection object handling the client request.
- * @address: the address of the connection
- * @media: handle to the media handled by the client.
- * @pool: handle to the session pool used by the client.
- * @thread: thread to handle the client connection
+ * @watch: watch for the connection
+ * @watchid: id of the watch
+ * @timeout: the session timeout
+ * @session_pool: handle to the session pool used by the client.
+ * @media_mapping: handle to the media mapping used by the client.
+ * @uri: cached uri
+ * @media: cached media
+ * @streams: a list of streams using @connection.
  *
  * The client structure.
  */
@@ -79,6 +83,8 @@ struct _GstRTSPClient {
 
   GstRTSPUrl     *uri;
   GstRTSPMedia   *media;
+
+  GList *streams;
 };
 
 struct _GstRTSPClientClass {
index 6284612..b75d386 100644 (file)
@@ -69,7 +69,6 @@ static void
 gst_rtsp_media_init (GstRTSPMedia * media)
 {
   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
-  media->complete = FALSE;
   media->is_live = FALSE;
   media->buffering = FALSE;
 }
@@ -251,6 +250,46 @@ gst_rtsp_media_get_stream (GstRTSPMedia *media, guint idx)
   return res;
 }
 
+/**
+ * gst_rtsp_media_stream_rtp:
+ * @stream: a #GstRTSPMediaStream
+ * @buffer: a #GstBuffer
+ *
+ * Handle an RTP buffer for the stream. This method is usually called when a
+ * message has been received from a client using the TCP transport.
+ *
+ * Returns: a GstFlowReturn.
+ */
+GstFlowReturn
+gst_rtsp_media_stream_rtp (GstRTSPMediaStream *stream, GstBuffer *buffer)
+{
+  GstFlowReturn ret;
+
+  g_signal_emit_by_name (stream->appsrc[0], "push-buffer", buffer, &ret);
+
+  return ret;
+}
+
+/**
+ * gst_rtsp_media_stream_rtcp:
+ * @stream: a #GstRTSPMediaStream
+ * @buffer: a #GstBuffer
+ *
+ * Handle an RTCP buffer for the stream. This method is usually called when a
+ * message has been received from a client using the TCP transport.
+ *
+ * Returns: a GstFlowReturn.
+ */
+GstFlowReturn
+gst_rtsp_media_stream_rtcp (GstRTSPMediaStream *stream, GstBuffer *buffer)
+{
+  GstFlowReturn ret;
+
+  g_signal_emit_by_name (stream->appsrc[1], "push-buffer", buffer, &ret);
+
+  return GST_FLOW_ERROR;
+}
+
 /* Allocate the udp ports and sockets */
 static gboolean
 alloc_udp_ports (GstRTSPMediaStream * stream)
@@ -461,19 +500,67 @@ on_timeout (GObject *session, GObject *source, GstRTSPMedia *media)
   g_message ("%p: source %p timeout", media, source);
 }
 
+static void
+handle_new_buffer (GstElement *sink, GstRTSPMediaStream *stream)
+{
+  GList *walk;
+  GstBuffer *buffer;
+
+  g_signal_emit_by_name (sink, "pull-buffer", &buffer);
+  if (!buffer)
+    return;
+
+  for (walk = stream->transports; walk; walk = g_list_next (walk)) {
+    GstRTSPMediaTrans *tr = (GstRTSPMediaTrans *) walk->data;
+
+    if (sink == stream->appsink[0]) {
+      if (tr->send_rtp) 
+        tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
+    }
+    else {
+      if (tr->send_rtcp) 
+        tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
+    }
+  }
+  gst_buffer_unref (buffer);
+}
+
 /* prepare the pipeline objects to handle @stream in @media */
 static gboolean
 setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media)
 {
   gchar *name;
-  GstPad *pad;
+  GstPad *pad, *teepad, *selpad;
+  GstPadLinkReturn ret;
+  gint i;
+  GstElement *tee, *selector;
 
-  alloc_udp_ports (stream);
+  /* allocate udp ports, we will have 4 of them, 2 for receiving RTP/RTCP and 2
+   * for sending RTP/RTCP. The sender and receiver ports are shared between the
+   * elements */
+  if (!alloc_udp_ports (stream))
+    return FALSE;
 
-  gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[0]);
-  gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[1]);
-  gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[0]);
-  gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[1]);
+  /* add the ports to the pipeline */
+  for (i = 0; i < 2; i++) {
+    gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsink[i]);
+    gst_bin_add (GST_BIN_CAST (media->pipeline), stream->udpsrc[i]);
+  }
+
+  /* create elements for the TCP transfer */
+  for (i = 0; i < 2; i++) {
+    stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
+    stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
+    g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
+    g_object_set (stream->appsink[i], "emit-signals", TRUE, NULL);
+    g_object_set (stream->appsink[i], "preroll-queue-len", 1, NULL);
+    gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsink[i]);
+    gst_bin_add (GST_BIN_CAST (media->pipeline), stream->appsrc[i]);
+  }
+  g_signal_connect (stream->appsink[0], "new-buffer",
+       (GCallback) handle_new_buffer, stream);
+  g_signal_connect (stream->appsink[1], "new-buffer",
+       (GCallback) handle_new_buffer, stream);
 
   /* hook up the stream to the RTP session elements. */
   name = g_strdup_printf ("send_rtp_sink_%d", idx);
@@ -505,19 +592,73 @@ setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media)
       media);
 
   /* link the RTP pad to the session manager */
-  gst_pad_link (stream->srcpad, stream->send_rtp_sink);
+  ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
+  if (ret != GST_PAD_LINK_OK)
+    goto link_failed;
 
-  /* link udp elements */
-  pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
+  /* make tee for RTP and link to stream */
+  tee = gst_element_factory_make ("tee", NULL);
+  gst_bin_add (GST_BIN_CAST (media->pipeline), tee);
+
+  pad = gst_element_get_static_pad (tee, "sink");
   gst_pad_link (stream->send_rtp_src, pad);
   gst_object_unref (pad);
-  pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
+
+  /* link RTP sink, we're pretty sure this will work. */
+  teepad = gst_element_get_request_pad (tee, "src%d");
+  pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
+  gst_pad_link (teepad, pad);
+  gst_object_unref (pad);
+  gst_object_unref (teepad);
+
+  teepad = gst_element_get_request_pad (tee, "src%d");
+  pad = gst_element_get_static_pad (stream->appsink[0], "sink");
+  gst_pad_link (teepad, pad);
+  gst_object_unref (pad);
+  gst_object_unref (teepad);
+
+  /* make tee for RTCP */
+  tee = gst_element_factory_make ("tee", NULL);
+  gst_bin_add (GST_BIN_CAST (media->pipeline), tee);
+
+  pad = gst_element_get_static_pad (tee, "sink");
   gst_pad_link (stream->send_rtcp_src, pad);
   gst_object_unref (pad);
-  pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
+
+  /* link RTCP elements */
+  teepad = gst_element_get_request_pad (tee, "src%d");
+  pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
+  gst_pad_link (teepad, pad);
+  gst_object_unref (pad);
+  gst_object_unref (teepad);
+
+  teepad = gst_element_get_request_pad (tee, "src%d");
+  pad = gst_element_get_static_pad (stream->appsink[1], "sink");
+  gst_pad_link (teepad, pad);
+  gst_object_unref (pad);
+  gst_object_unref (teepad);
+
+  /* make selector for the RTCP receivers */
+  selector = gst_element_factory_make ("input-selector", NULL);
+  g_object_set (selector, "select-all", TRUE, NULL);
+  gst_bin_add (GST_BIN_CAST (media->pipeline), selector);
+
+  pad = gst_element_get_static_pad (selector, "src");
   gst_pad_link (pad, stream->recv_rtcp_sink);
   gst_object_unref (pad);
 
+  selpad = gst_element_get_request_pad (selector, "sink%d");
+  pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
+  gst_pad_link (pad, selpad);
+  gst_object_unref (pad);
+  gst_object_unref (selpad);
+
+  selpad = gst_element_get_request_pad (selector, "sink%d");
+  pad = gst_element_get_static_pad (stream->appsrc[1], "src");
+  gst_pad_link (pad, selpad);
+  gst_object_unref (pad);
+  gst_object_unref (selpad);
+
   /* we set and keep these to playing so that they don't cause NO_PREROLL return
    * values */
   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
@@ -532,6 +673,13 @@ setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media)
   stream->prepared = TRUE;
 
   return TRUE;
+
+  /* ERRORS */
+link_failed:
+  {
+    g_warning ("failed to link stream %d", idx);
+    return FALSE;
+  }
 }
 
 static void
@@ -648,6 +796,17 @@ default_handle_message (GstRTSPMedia *media, GstMessage *message)
       g_free (debug);
       break;
     }
+    case GST_MESSAGE_WARNING:
+    {
+      GError *gerror;
+      gchar *debug;
+
+      gst_message_parse_warning (message, &gerror, &debug);
+      g_warning ("%p: got warning %s (%s)", media, gerror->message, debug);
+      g_error_free (gerror);
+      g_free (debug);
+      break;
+    }
     default:
       g_message ("%p: got message type %s", media, gst_message_type_get_name (type));
       break;
@@ -678,6 +837,9 @@ bus_message (GstBus *bus, GstMessage *message, GstRTSPMedia *media)
  * Prepare @media for streaming. This function will create the pipeline and
  * other objects to manage the streaming.
  *
+ * It will preroll the pipeline and collect vital information about the streams
+ * such as the duration.
+ *
  * Returns: %TRUE on success.
  */
 gboolean
@@ -736,19 +898,22 @@ gst_rtsp_media_prepare (GstRTSPMedia *media)
       g_message ("live media %p", media);
       media->is_live = TRUE;
       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
+      if (ret == GST_STATE_CHANGE_FAILURE)
+        goto state_failed;
       break;
     case GST_STATE_CHANGE_FAILURE:
-    {
-      unlock_streams (media);
       goto state_failed;
-    }
   }
 
   /* now wait for all pads to be prerolled */
   ret = gst_element_get_state (media->pipeline, NULL, NULL, -1);
+  if (ret == GST_STATE_CHANGE_FAILURE)
+    goto state_failed;
 
   /* and back to PAUSED for live pipelines */
   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
+  if (ret == GST_STATE_CHANGE_FAILURE)
+    goto state_failed;
 
   /* collect stats about the media */
   collect_media_stats (media);
@@ -770,95 +935,58 @@ was_prepared:
   /* ERRORS */
 state_failed:
   {
+    g_warning ("failed to preroll pipeline");
+    unlock_streams (media);
     gst_element_set_state (media->pipeline, GST_STATE_NULL);
     return FALSE;
   }
 }
 
 /**
- * gst_rtsp_media_play:
+ * gst_rtsp_media_set_state:
  * @media: a #GstRTSPMedia
+ * @state: the target state of the media
  * @transports: a GArray of #GstRTSPMediaTrans pointers
  *
- * Start playing @media for to the transports in @transports.
+ * Set the state of @media to @state and for the transports in @transports.
  *
  * Returns: %TRUE on success.
  */
 gboolean
-gst_rtsp_media_play (GstRTSPMedia *media, GArray *transports)
+gst_rtsp_media_set_state (GstRTSPMedia *media, GstState state, GArray *transports)
 {
   gint i;
   GstStateChangeReturn ret;
+  gboolean add, remove;
 
   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
   g_return_val_if_fail (transports != NULL, FALSE);
   g_return_val_if_fail (media->prepared, FALSE);
 
-  if (media->target_state == GST_STATE_PLAYING)
-    return TRUE;
+  /* NULL and READY are the same */
+  if (state == GST_STATE_READY)
+    state = GST_STATE_NULL;
 
-  for (i = 0; i < transports->len; i++) {
-    GstRTSPMediaTrans *tr;
-    GstRTSPMediaStream *stream;
-    GstRTSPTransport *trans;
-
-    /* we need a non-NULL entry in the array */
-    tr = g_array_index (transports, GstRTSPMediaTrans *, i);
-    if (tr == NULL)
-      continue;
-
-    /* we need a transport */
-    if (!(trans = tr->transport))
-      continue;
+  if (media->target_state == state)
+    return TRUE;
 
-    /* get the stream and add the destinations */
-    stream = gst_rtsp_media_get_stream (media, tr->idx);
-    switch (trans->lower_transport) {
-      case GST_RTSP_LOWER_TRANS_UDP:
-      case GST_RTSP_LOWER_TRANS_UDP_MCAST:
-        g_message ("adding %s:%d-%d", trans->destination, trans->client_port.min, trans->client_port.max);
+  add = remove = FALSE;
 
-        g_signal_emit_by_name (stream->udpsink[0], "add", trans->destination, trans->client_port.min, NULL);
-        g_signal_emit_by_name (stream->udpsink[1], "add", trans->destination, trans->client_port.max, NULL);
-        break;
-      case GST_RTSP_LOWER_TRANS_TCP:
-        g_message ("TCP transport not yet implemented");
-        break;
-      default:
-        g_message ("Unknown transport %d", trans->lower_transport);
-        break;
-    }
+  switch (state) {
+    case GST_STATE_NULL:
+    case GST_STATE_PAUSED:
+      /* we're going from PLAYING to READY or NULL, remove */
+      if (media->target_state == GST_STATE_PLAYING)
+       remove = TRUE;
+      break;
+    case GST_STATE_PLAYING:
+      /* we're going to PLAYING, add */
+      add = TRUE;
+      break;
+    default:
+      break;
   }
 
-  g_message ("playing media %p", media);
-  media->target_state = GST_STATE_PLAYING;
-  ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
-
-  return TRUE;
-}
-
-/**
- * gst_rtsp_media_pause:
- * @media: a #GstRTSPMedia
- * @transports: a array of #GstRTSPTransport pointers
- *
- * Pause playing @media for to the transports in @transports.
- *
- * Returns: %TRUE on success.
- */
-gboolean
-gst_rtsp_media_pause (GstRTSPMedia *media, GArray *transports)
-{
-  gint i;
-  GstStateChangeReturn ret;
-
-  g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
-  g_return_val_if_fail (transports != NULL, FALSE);
-  g_return_val_if_fail (media->prepared, FALSE);
-
-  if (media->target_state == GST_STATE_PAUSED)
-    return TRUE;
-
   for (i = 0; i < transports->len; i++) {
     GstRTSPMediaTrans *tr;
     GstRTSPMediaStream *stream;
@@ -875,56 +1003,35 @@ gst_rtsp_media_pause (GstRTSPMedia *media, GArray *transports)
 
     /* get the stream and add the destinations */
     stream = gst_rtsp_media_get_stream (media, tr->idx);
-
     switch (trans->lower_transport) {
       case GST_RTSP_LOWER_TRANS_UDP:
       case GST_RTSP_LOWER_TRANS_UDP_MCAST:
-        g_message ("removing %s:%d-%d", trans->destination, trans->client_port.min, trans->client_port.max);
-
-        g_signal_emit_by_name (stream->udpsink[0], "remove", trans->destination, trans->client_port.min, NULL);
-        g_signal_emit_by_name (stream->udpsink[1], "remove", trans->destination, trans->client_port.max, NULL);
+       if (add) {
+          g_message ("adding %s:%d-%d", trans->destination, trans->client_port.min, trans->client_port.max);
+          g_signal_emit_by_name (stream->udpsink[0], "add", trans->destination, trans->client_port.min, NULL);
+          g_signal_emit_by_name (stream->udpsink[1], "add", trans->destination, trans->client_port.max, NULL);
+       } else if (remove) {
+          g_message ("removing %s:%d-%d", trans->destination, trans->client_port.min, trans->client_port.max);
+          g_signal_emit_by_name (stream->udpsink[0], "remove", trans->destination, trans->client_port.min, NULL);
+          g_signal_emit_by_name (stream->udpsink[1], "remove", trans->destination, trans->client_port.max, NULL);
+       }
         break;
       case GST_RTSP_LOWER_TRANS_TCP:
-        g_message ("TCP transport not yet implemented");
+       if (add) {
+         stream->transports = g_list_prepend (stream->transports, tr);
+       } else if (remove) {
+         stream->transports = g_list_remove (stream->transports, tr);
+       }
         break;
       default:
         g_message ("Unknown transport %d", trans->lower_transport);
         break;
     }
   }
-  g_message ("pause media %p", media);
-  media->target_state = GST_STATE_PAUSED;
-  ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
-
-  return TRUE;
-}
-
-/**
- * gst_rtsp_media_stream_stop:
- * @media: a #GstRTSPMedia
- * @transports: a GArray of #GstRTSPMediaTrans pointers
- *
- * Stop playing @media for to the transports in @transports.
- *
- * Returns: %TRUE on success.
- */
-gboolean
-gst_rtsp_media_stop (GstRTSPMedia *media, GArray *transports)
-{
-  GstStateChangeReturn ret;
-
-  g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
-  g_return_val_if_fail (transports != NULL, FALSE);
-  g_return_val_if_fail (media->prepared, FALSE);
-
-  if (media->target_state == GST_STATE_NULL)
-    return TRUE;
-
-  gst_rtsp_media_pause (media, transports);
 
-  g_message ("stop media %p", media);
-  media->target_state = GST_STATE_NULL;
-  ret = gst_element_set_state (media->pipeline, GST_STATE_NULL);
+  g_message ("state %s media %p", gst_element_state_get_name (state), media);
+  media->target_state = state;
+  ret = gst_element_set_state (media->pipeline, state);
 
   return TRUE;
 }
index f746be6..4aaaa1d 100644 (file)
@@ -41,9 +41,15 @@ typedef struct _GstRTSPMedia GstRTSPMedia;
 typedef struct _GstRTSPMediaClass GstRTSPMediaClass;
 typedef struct _GstRTSPMediaTrans GstRTSPMediaTrans;
 
+typedef gboolean (*GstRTSPSendFunc) (GstBuffer *buffer, guint8 channel, gpointer user_data);
+
 /**
  * GstRTSPMediaTrans:
  * @idx: a stream index
+ * @send_rtp: callback for sending RTP messages
+ * @send_rtcp: callback for sending RTCP messages
+ * @user_data: user data passed in the callbacks
+ * @notify: free function for the user_data.
  * @transport: a transport description
  *
  * A Transport description for stream @idx
@@ -51,13 +57,17 @@ typedef struct _GstRTSPMediaTrans GstRTSPMediaTrans;
 struct _GstRTSPMediaTrans {
   guint idx;
 
+  GstRTSPSendFunc send_rtp;
+  GstRTSPSendFunc send_rtcp;
+  gpointer        user_data;
+  GDestroyNotify  notify;
+
   GstRTSPTransport *transport;
 };
 
 /**
  * GstRTSPMediaStream:
  *
- * @media: the owner #GstRTSPMedia
  * @srcpad: the srcpad of the stream
  * @payloader: the payloader of the format
  * @prepared: if the stream is prepared for streaming
@@ -68,8 +78,12 @@ struct _GstRTSPMediaTrans {
  * @recv_rtcp_src: srcpad for RTCP buffers
  * @udpsrc: the udp source elements for RTP/RTCP
  * @udpsink: the udp sink elements for RTP/RTCP
+ * @appsrc: the app source elements for RTP/RTCP
+ * @appsink: the app sink elements for RTP/RTCP
+ * @server_port: the server ports for this stream
  * @caps_sig: the signal id for detecting caps
  * @caps: the caps of the stream
+ * @tranports: the current transports being streamed
  *
  * The definition of a media stream. The streams are identified by @id.
  */
@@ -91,7 +105,9 @@ struct _GstRTSPMediaStream {
    * sockets */
   GstElement   *udpsrc[2];
   GstElement   *udpsink[2];
+  /* for TCP transport */
   GstElement   *appsrc[2];
+  GstElement   *appsink[2];
 
   /* server ports for sending/receiving */
   GstRTSPRange  server_port;
@@ -99,17 +115,25 @@ struct _GstRTSPMediaStream {
   /* the caps of the stream */
   gulong        caps_sig;
   GstCaps      *caps;
+
+  /* transports we stream to */
+  GList        *transports;
 };
 
 /**
  * GstRTSPMedia:
  * @shared: if this media can be shared between clients
  * @element: the data providing element
- * @stream: the different streams provided by @element
+ * @streams: the different streams provided by @element
  * @prepared: if the media is prepared for streaming
  * @pipeline: the toplevel pipeline
+ * @source: the bus watch for pipeline messages.
+ * @id: the id of the watch
+ * @is_live: if the pipeline is live
+ * @buffering: if the pipeline is buffering
+ * @target_state: the desired target state of the pipeline
  * @rtpbin: the rtpbin
- * @multifdsink: multifdsink element for TCP transport
+ * @range: the range of the media being streamed
  *
  * A class that contains the GStreamer element along with a list of
  * #GstRTSPediaStream objects that can produce data.
@@ -120,7 +144,6 @@ struct _GstRTSPMedia {
   GObject       parent;
 
   gboolean      shared;
-  gboolean      complete;
 
   GstElement   *element;
   GArray       *streams;
@@ -138,9 +161,6 @@ struct _GstRTSPMedia {
   /* RTP session manager */
   GstElement   *rtpbin;
 
-  /* for TCP transport */
-  GstElement   *multifdsink;
-
   /* the range of media */
   GstRTSPTimeRange range;
 };
@@ -180,9 +200,10 @@ gboolean              gst_rtsp_media_prepare          (GstRTSPMedia *media);
 guint                 gst_rtsp_media_n_streams        (GstRTSPMedia *media);
 GstRTSPMediaStream *  gst_rtsp_media_get_stream       (GstRTSPMedia *media, guint idx);
 
-gboolean              gst_rtsp_media_play             (GstRTSPMedia *media, GArray *trans);
-gboolean              gst_rtsp_media_pause            (GstRTSPMedia *media, GArray *trans);
-gboolean              gst_rtsp_media_stop             (GstRTSPMedia *media, GArray *trans);
+GstFlowReturn         gst_rtsp_media_stream_rtp       (GstRTSPMediaStream *stream, GstBuffer *buffer);
+GstFlowReturn         gst_rtsp_media_stream_rtcp      (GstRTSPMediaStream *stream, GstBuffer *buffer);
+
+gboolean              gst_rtsp_media_set_state        (GstRTSPMedia *media, GstState state, GArray *trans);
 
 G_END_DECLS
 
index b860563..e9c965e 100644 (file)
@@ -220,7 +220,6 @@ gst_rtsp_server_set_session_pool (GstRTSPServer *server, GstRTSPSessionPool *poo
   }
 }
 
-
 /**
  * gst_rtsp_server_get_session_pool:
  * @server: a #GstRTSPServer
index a80e280..b590146 100644 (file)
@@ -89,7 +89,7 @@ gst_rtsp_session_free_media (GstRTSPSessionMedia *media, GstRTSPSession *session
 
   g_message ("free session media %p", media);
 
-  gst_rtsp_session_media_stop (media);
+  gst_rtsp_session_media_set_state (media, GST_STATE_NULL);
 
   for (i = 0; i < size; i++) {
     GstRTSPSessionStream *stream;
@@ -476,75 +476,59 @@ gst_rtsp_session_stream_set_transport (GstRTSPSessionStream *stream,
   st->profile = ct->profile;
   st->lower_transport = ct->lower_transport;
   st->client_port = ct->client_port;
+  st->interleaved = ct->interleaved;
+  st->server_port.min = stream->media_stream->server_port.min;
+  st->server_port.max = stream->media_stream->server_port.max;
 
-  /* keep track of the transports */
+  /* keep track of the transports in the stream. */
   if (stream->trans.transport)
     gst_rtsp_transport_free (stream->trans.transport);
   stream->trans.transport = ct;
 
-  st->server_port.min = stream->media_stream->server_port.min;
-  st->server_port.max = stream->media_stream->server_port.max;
-
   return st;
 }
 
 /**
- * gst_rtsp_session_media_play:
- * @media: a #GstRTSPSessionMedia
- *
- * Tell the media object @media to start playing and streaming to the client.
- *
- * Returns: %TRUE on success.
- */
-gboolean
-gst_rtsp_session_media_play (GstRTSPSessionMedia *media)
-{
-  gboolean ret;
-
-  g_return_val_if_fail (media != NULL, FALSE);
-
-  ret = gst_rtsp_media_play (media->media, media->streams);
-
-  return ret;
-}
-
-/**
- * gst_rtsp_session_media_pause:
- * @media: a #GstRTSPSessionMedia
- *
- * Tell the media object @media to pause.
+ * gst_rtsp_session_stream_set_callbacks:
+ * @stream: a #GstRTSPSessionStream
+ * @send_rtp: a callback called when RTP should be send
+ * @send_rtcp: a callback called when RTCP should be send
+ * @user_data: user data passed to callbacks
+ * @notify: called with the user_data when no longer needed.
  *
- * Returns: %TRUE on success.
+ * Install callbacks that will be called when data for a stream should be sent
+ * to a client. This is usually used when sending RTP/RTCP over TCP.
  */
-gboolean
-gst_rtsp_session_media_pause (GstRTSPSessionMedia *media)
+void
+gst_rtsp_session_stream_set_callbacks (GstRTSPSessionStream *stream,
+    GstRTSPSendFunc send_rtp, GstRTSPSendFunc send_rtcp, gpointer user_data,
+    GDestroyNotify  notify)
 {
-  gboolean ret;
-
-  g_return_val_if_fail (media != NULL, FALSE);
-
-  ret = gst_rtsp_media_pause (media->media, media->streams);
-
-  return ret;
+  stream->trans.send_rtp = send_rtp;
+  stream->trans.send_rtcp = send_rtcp;
+  if (stream->trans.notify)
+    stream->trans.notify (stream->trans.user_data);
+  stream->trans.user_data = user_data;
+  stream->trans.notify = notify;
 }
 
 /**
- * gst_rtsp_session_media_stop:
+ * gst_rtsp_session_media_set_state:
  * @media: a #GstRTSPSessionMedia
+ * @state: the new state
  *
- * Tell the media object @media to stop playing. After this call the media
- * cannot be played or paused anymore
+ * Tell the media object @media to change to @state.
  *
  * Returns: %TRUE on success.
  */
 gboolean
-gst_rtsp_session_media_stop (GstRTSPSessionMedia *media)
+gst_rtsp_session_media_set_state (GstRTSPSessionMedia *media, GstState state)
 {
   gboolean ret;
 
   g_return_val_if_fail (media != NULL, FALSE);
 
-  ret = gst_rtsp_media_stop (media->media, media->streams);
+  ret = gst_rtsp_media_set_state (media->media, state, media->streams);
 
   return ret;
 }
index b3797b1..495bc75 100644 (file)
@@ -132,9 +132,7 @@ gboolean               gst_rtsp_session_release_media        (GstRTSPSession *se
 GstRTSPSessionMedia *  gst_rtsp_session_get_media            (GstRTSPSession *sess,
                                                               const GstRTSPUrl *uri);
 /* control media */
-gboolean               gst_rtsp_session_media_play           (GstRTSPSessionMedia *media);
-gboolean               gst_rtsp_session_media_pause          (GstRTSPSessionMedia *media);
-gboolean               gst_rtsp_session_media_stop           (GstRTSPSessionMedia *media);
+gboolean               gst_rtsp_session_media_set_state      (GstRTSPSessionMedia *media, GstState state);
 
 /* get stream config */
 GstRTSPSessionStream * gst_rtsp_session_media_get_stream     (GstRTSPSessionMedia *media,
@@ -143,6 +141,11 @@ GstRTSPSessionStream * gst_rtsp_session_media_get_stream     (GstRTSPSessionMedi
 /* configure transport */
 GstRTSPTransport *     gst_rtsp_session_stream_set_transport (GstRTSPSessionStream *stream,
                                                               GstRTSPTransport *ct);
+void                   gst_rtsp_session_stream_set_callbacks (GstRTSPSessionStream *stream,
+                                                              GstRTSPSendFunc send_rtp,
+                                                              GstRTSPSendFunc send_rtcp,
+                                                              gpointer user_data,
+                                                              GDestroyNotify  notify);
 
 G_END_DECLS