src->stream_rec_lock = g_new (GStaticRecMutex, 1);
g_static_rec_mutex_init (src->stream_rec_lock);
- src->loop_cond = g_cond_new ();
-
src->location = g_strdup (DEFAULT_LOCATION);
src->url = NULL;
g_static_rec_mutex_free (rtspsrc->stream_rec_lock);
g_free (rtspsrc->stream_rec_lock);
- g_cond_free (rtspsrc->loop_cond);
g_free (rtspsrc->location);
g_free (rtspsrc->req_location);
g_free (rtspsrc->content_base);
goto no_element;
/* take ownership */
- gst_object_ref (stream->udpsrc[0]);
- gst_object_sink (stream->udpsrc[0]);
+ gst_object_ref (stream->udpsrc[1]);
+ gst_object_sink (stream->udpsrc[1]);
gst_element_set_state (stream->udpsrc[1], GST_STATE_READY);
}
else
port = transport->server_port.max;
- destination = transport->destination;
+ /* first take the source, then the endpoint to figure out where to send
+ * the RTCP. */
+ destination = transport->source;
if (destination == NULL)
- destination = src->addr;
+ destination = src->connection->ip;
GST_DEBUG_OBJECT (src, "configure UDP sink for %s:%d", destination, port);
stream->rtcppad = gst_element_get_pad (stream->udpsink, "sink");
/* get session RTCP pad */
- name = g_strdup_printf ("rtcp_src_%d", stream->id);
+ name = g_strdup_printf ("send_rtcp_src_%d", stream->id);
pad = gst_element_get_request_pad (src->session, name);
g_free (name);
gst_event_unref (event);
}
+/* FIXME, handle server request, reply with OK, for now */
+static RTSPResult
+gst_rtspsrc_handle_request (GstRTSPSrc * src, RTSPMessage * request)
+{
+ RTSPMessage response = { 0 };
+ RTSPResult res;
+
+ GST_DEBUG_OBJECT (src, "got server request message");
+
+ if (src->debug)
+ rtsp_message_dump (request);
+
+ res = rtsp_message_init_response (&response, RTSP_STS_OK, "OK", request);
+ if (res < 0)
+ goto send_error;
+
+ GST_DEBUG_OBJECT (src, "replying with OK");
+
+ if (src->debug)
+ rtsp_message_dump (&response);
+
+ if ((res = rtsp_connection_send (src->connection, &response, NULL)) < 0)
+ goto send_error;
+
+ return RTSP_OK;
+
+ /* ERRORS */
+send_error:
+ {
+ return res;
+ }
+}
+
static void
gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
{
- RTSPMessage response = { 0 };
+ RTSPMessage message = { 0 };
RTSPResult res;
gint channel;
GList *lstream;
guint size;
GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *buf;
- gboolean is_rtcp = FALSE;
+ gboolean is_rtcp, have_data;
+ have_data = FALSE;
do {
GST_DEBUG_OBJECT (src, "doing receive");
- if ((res = rtsp_connection_receive (src->connection, &response)) < 0)
+
+ if ((res = rtsp_connection_receive (src->connection, &message, NULL)) < 0)
goto receive_error;
- GST_DEBUG_OBJECT (src, "got packet type %d", response.type);
+ switch (message.type) {
+ case RTSP_MESSAGE_REQUEST:
+ /* server sends us a request message, handle it */
+ if ((res = gst_rtspsrc_handle_request (src, &message)) < 0)
+ goto handle_request_failed;
+ break;
+ case RTSP_MESSAGE_RESPONSE:
+ /* we ignore response messages */
+ GST_DEBUG_OBJECT (src, "ignoring response message");
+ break;
+ case RTSP_MESSAGE_DATA:
+ GST_DEBUG_OBJECT (src, "got data message");
+ have_data = TRUE;
+ break;
+ default:
+ GST_WARNING_OBJECT (src, "ignoring unknown message type %d",
+ message.type);
+ break;
+ }
}
- while (response.type != RTSP_MESSAGE_DATA);
+ while (!have_data);
- channel = response.type_data.data.channel;
+ channel = message.type_data.data.channel;
lstream = g_list_find_custom (src->streams, GINT_TO_POINTER (channel),
(GCompareFunc) find_stream_by_channel);
stream = (GstRTSPStream *) lstream->data;
if (channel == stream->channel[0]) {
outpad = stream->channelpad[0];
+ is_rtcp = FALSE;
} else if (channel == stream->channel[1]) {
outpad = stream->channelpad[1];
is_rtcp = TRUE;
+ } else {
+ is_rtcp = FALSE;
}
/* take a look at the body to figure out what we have */
- rtsp_message_get_body (&response, &data, &size);
+ rtsp_message_get_body (&message, &data, &size);
if (size < 2)
goto invalid_length;
if (outpad == NULL)
goto unknown_stream;
- /* and chain buffer to internal element */
- rtsp_message_steal_body (&response, &data, &size);
+ /* take the message body for further processing */
+ rtsp_message_steal_body (&message, &data, &size);
/* strip the trailing \0 */
size -= 1;
GST_BUFFER_SIZE (buf) = size;
/* don't need message anymore */
- rtsp_message_unset (&response);
+ rtsp_message_unset (&message);
GST_DEBUG_OBJECT (src, "pushing data of size %d on channel %d", size,
channel);
unknown_stream:
{
GST_DEBUG_OBJECT (src, "unknown stream on channel %d, ignored", channel);
- rtsp_message_unset (&response);
+ rtsp_message_unset (&message);
return;
}
receive_error:
g_free (str);
if (src->debug)
- rtsp_message_dump (&response);
+ rtsp_message_dump (&message);
- rtsp_message_unset (&response);
+ rtsp_message_unset (&message);
+ ret = GST_FLOW_UNEXPECTED;
+ goto need_pause;
+ }
+handle_request_failed:
+ {
+ gchar *str = rtsp_strresult (res);
+
+ GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL),
+ ("Could not send message. (%s)", str));
+ g_free (str);
+ rtsp_message_unset (&message);
ret = GST_FLOW_UNEXPECTED;
goto need_pause;
}
{
GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
("Short message received."));
- rtsp_message_unset (&response);
+ rtsp_message_unset (&message);
return;
}
need_pause:
}
}
+/* send server keep-alive */
+static RTSPResult
+gst_rtspsrc_send_keep_alive (GstRTSPSrc * src)
+{
+ RTSPMessage request = { 0 };
+ RTSPMessage response = { 0 };
+ RTSPResult res;
+
+ GST_DEBUG_OBJECT (src, "creating server keep-alive");
+
+ res =
+ rtsp_message_init_request (&request, RTSP_GET_PARAMETER,
+ src->req_location);
+ if (res < 0)
+ goto send_error;
+
+ if (!gst_rtspsrc_send (src, &request, &response, NULL))
+ goto send_error;
+
+ rtsp_message_unset (&request);
+
+ return RTSP_OK;
+
+ /* ERRORS */
+send_error:
+ {
+ gchar *str = rtsp_strresult (res);
+
+ rtsp_message_unset (&request);
+ GST_ELEMENT_WARNING (src, RESOURCE, WRITE, (NULL),
+ ("Could not send keep-alive. (%s)", str));
+ g_free (str);
+ return res;
+ }
+}
+
static void
gst_rtspsrc_loop_udp (GstRTSPSrc * src)
{
gboolean restart = FALSE;
+ RTSPResult res;
GST_OBJECT_LOCK (src);
if (src->loop_cmd == CMD_STOP)
goto stopping;
- /* FIXME, we should continue reading the TCP socket because the server might
- * send us requests */
while (src->loop_cmd == CMD_WAIT) {
- GST_DEBUG_OBJECT (src, "waiting");
- GST_RTSP_LOOP_WAIT (src);
- GST_DEBUG_OBJECT (src, "waiting done");
+ GTimeVal tv_timeout;
+ gint timeout;
+
+ GST_OBJECT_UNLOCK (src);
+
+ while (TRUE) {
+ RTSPMessage message = { 0 };
+
+ /* calculate the session timeout. We should send the keep-alive request a
+ * little earlier to compensate for the round trip time to the server. We
+ * subtract 1 second here. */
+ timeout = src->connection->timeout;
+ if (timeout > 1)
+ timeout -= 1;
+
+ /* use the session timeout for receiving data */
+ tv_timeout.tv_sec = timeout;
+ tv_timeout.tv_usec = 0;
+
+ GST_DEBUG_OBJECT (src, "doing receive with timeout %d seconds", timeout);
+
+ /* we should continue reading the TCP socket because the server might
+ * send us requests. When the session timeout expires, we need to send a
+ * keep-alive request to keep the session open. */
+ res = rtsp_connection_receive (src->connection, &message, &tv_timeout);
+
+ switch (res) {
+ case RTSP_OK:
+ GST_DEBUG_OBJECT (src, "we received a server message");
+ break;
+ case RTSP_EINTR:
+ /* we got interrupted, see what we have to do */
+ GST_DEBUG_OBJECT (src, "we got interrupted");
+ /* unset flushing so we can do something else */
+ rtsp_connection_flush (src->connection, FALSE);
+ goto interrupt;
+ case RTSP_ETIMEOUT:
+ /* ignore result, a warning was posted */
+ GST_DEBUG_OBJECT (src, "timout, sending keep-alive");
+ res = gst_rtspsrc_send_keep_alive (src);
+ continue;
+ default:
+ goto receive_error;
+ }
+
+ switch (message.type) {
+ case RTSP_MESSAGE_REQUEST:
+ /* server sends us a request message, handle it */
+ if ((res = gst_rtspsrc_handle_request (src, &message)) < 0)
+ goto handle_request_failed;
+ break;
+ case RTSP_MESSAGE_RESPONSE:
+ case RTSP_MESSAGE_DATA:
+ /* we ignore response and data messages */
+ GST_DEBUG_OBJECT (src, "ignoring message");
+ break;
+ default:
+ GST_WARNING_OBJECT (src, "ignoring unknown message type %d",
+ message.type);
+ break;
+ }
+ }
+ interrupt:
+ GST_OBJECT_LOCK (src);
+ GST_DEBUG_OBJECT (src, "we have command %d", src->loop_cmd);
if (src->loop_cmd == CMD_STOP)
goto stopping;
}
gst_rtspsrc_close (src);
/* see if we have TCP left to try */
- if (!(src->cur_protocols & RTSP_LOWER_TRANS_TCP))
+ if (!(src->protocols & RTSP_LOWER_TRANS_TCP))
goto no_protocols;
/* open new connection using tcp */
gst_task_pause (src->task);
return;
}
+receive_error:
+ {
+ gchar *str = rtsp_strresult (res);
+
+ GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
+ ("Could not receive message. (%s)", str));
+ g_free (str);
+ return;
+ }
+handle_request_failed:
+ {
+ gchar *str = rtsp_strresult (res);
+
+ GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
+ ("Could not handle server message. (%s)", str));
+ g_free (str);
+ return;
+ }
no_protocols:
{
src->cur_protocols = 0;
{
GST_OBJECT_LOCK (src);
src->loop_cmd = cmd;
- GST_RTSP_LOOP_SIGNAL (src);
+ if (cmd != CMD_WAIT)
+ rtsp_connection_flush (src->connection, TRUE);
GST_OBJECT_UNLOCK (src);
}
gst_rtspsrc_loop_udp (src);
}
-static RTSPResult
-gst_rtspsrc_handle_request (GstRTSPSrc * src, RTSPMessage * request)
-{
- RTSPMessage response = { 0 };
- RTSPResult res;
-
- res = rtsp_message_init_response (&response, RTSP_STS_OK, "OK", request);
- if (res < 0)
- goto send_error;
-
- if (src->debug)
- rtsp_message_dump (&response);
-
- if ((res = rtsp_connection_send (src->connection, &response)) < 0)
- goto send_error;
-
- return RTSP_OK;
-
- /* ERRORS */
-send_error:
- {
- gchar *str = rtsp_strresult (res);
-
- GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL),
- ("Could not send message. (%s)", str));
- g_free (str);
- return res;
- }
-}
-
#ifndef GST_DISABLE_GST_DEBUG
const gchar *
rtsp_auth_method_to_string (RTSPAuthMethod method)
if (src->debug)
rtsp_message_dump (request);
- if ((res = rtsp_connection_send (src->connection, request)) < 0)
+ if ((res = rtsp_connection_send (src->connection, request, NULL)) < 0)
goto send_error;
next:
- if ((res = rtsp_connection_receive (src->connection, response)) < 0)
+ if ((res = rtsp_connection_receive (src->connection, response, NULL)) < 0)
goto receive_error;
if (src->debug)
switch (response->type) {
case RTSP_MESSAGE_REQUEST:
- /* FIXME, handle server request, reply with OK, for now */
if ((res = gst_rtspsrc_handle_request (src, response)) < 0)
goto handle_request_failed;
goto next;
case RTSP_MESSAGE_RESPONSE:
/* ok, a response is good */
+ GST_DEBUG_OBJECT (src, "received response message");
break;
default:
case RTSP_MESSAGE_DATA:
/* get next response */
+ GST_DEBUG_OBJECT (src, "ignoring data response message");
goto next;
}
/* connect */
GST_DEBUG_OBJECT (src, "connecting (%s)...", src->req_location);
- if ((res = rtsp_connection_connect (src->connection)) < 0)
+ if ((res = rtsp_connection_connect (src->connection, NULL)) < 0)
goto could_not_connect;
/* create OPTIONS */
gst_message_unref (message);
break;
- /* fatal our not our message, forward */
forward:
+ /* fatal but not our message, forward */
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
break;
}
}
RTSPResult
-rtsp_connection_connect (RTSPConnection * conn)
+rtsp_connection_connect (RTSPConnection * conn, GTimeVal * timeout)
{
gint fd;
struct sockaddr_in sin;
goto sys_error;
conn->fd = fd;
+ conn->ip = ip;
return RTSP_OK;
}
RTSPResult
-rtsp_connection_send (RTSPConnection * conn, RTSPMessage * message)
+rtsp_connection_send (RTSPConnection * conn, RTSPMessage * message,
+ GTimeVal * timeout)
{
GString *str;
gint towrite;
}
RTSPResult
-rtsp_connection_read (RTSPConnection * conn, gpointer data, guint size)
+rtsp_connection_read (RTSPConnection * conn, gpointer data, guint size,
+ GTimeVal * timeout)
{
fd_set readfds;
guint toread;
gint retval;
+ struct timeval tv_timeout, *ptv_timeout = NULL;
#ifndef G_OS_WIN32
gint avail;
else if (avail >= toread)
goto do_read;
+ /* configure timeout if any */
+ if (timeout != NULL) {
+ tv_timeout.tv_sec = timeout->tv_sec;
+ tv_timeout.tv_usec = timeout->tv_usec;
+ ptv_timeout = &tv_timeout;
+ }
+
FD_ZERO (&readfds);
FD_SET (conn->fd, &readfds);
FD_SET (READ_SOCKET (conn), &readfds);
gint bytes;
do {
- retval = select (FD_SETSIZE, &readfds, NULL, NULL, NULL);
+ retval = select (FD_SETSIZE, &readfds, NULL, NULL, ptv_timeout);
} while ((retval == -1 && errno == EINTR));
if (retval == -1)
goto select_error;
+ /* check for timeout */
+ if (retval == 0)
+ goto select_timeout;
+
if (FD_ISSET (READ_SOCKET (conn), &readfds)) {
/* read all stop commands */
while (TRUE) {
{
return RTSP_ESYS;
}
+select_timeout:
+ {
+ return RTSP_ETIMEOUT;
+ }
stopped:
{
return RTSP_EINTR;
}
static RTSPResult
-read_body (RTSPConnection * conn, glong content_length, RTSPMessage * msg)
+read_body (RTSPConnection * conn, glong content_length, RTSPMessage * msg,
+ GTimeVal * timeout)
{
gchar *body;
RTSPResult res;
body = g_malloc (content_length + 1);
body[content_length] = '\0';
- RTSP_CHECK (rtsp_connection_read (conn, body, content_length), read_error);
+ RTSP_CHECK (rtsp_connection_read (conn, body, content_length, timeout),
+ read_error);
content_length += 1;
}
RTSPResult
-rtsp_connection_receive (RTSPConnection * conn, RTSPMessage * msg)
+rtsp_connection_receive (RTSPConnection * conn, RTSPMessage * msg,
+ GTimeVal * timeout)
{
gchar buffer[4096];
gint line;
guint8 c;
/* read first character, this identifies data messages */
- RTSP_CHECK (rtsp_connection_read (conn, &c, 1), read_error);
+ RTSP_CHECK (rtsp_connection_read (conn, &c, 1, timeout), read_error);
/* check for data packet, first character is $ */
if (c == '$') {
/* data packets are $<1 byte channel><2 bytes length,BE><data bytes> */
/* read channel, which is the next char */
- RTSP_CHECK (rtsp_connection_read (conn, &c, 1), read_error);
+ RTSP_CHECK (rtsp_connection_read (conn, &c, 1, timeout), read_error);
/* now we create a data message */
rtsp_message_init_data (msg, (gint) c);
/* next two bytes are the length of the data */
- RTSP_CHECK (rtsp_connection_read (conn, &size, 2), read_error);
+ RTSP_CHECK (rtsp_connection_read (conn, &size, 2, timeout), read_error);
size = GUINT16_FROM_BE (size);
/* and read the body */
- res = read_body (conn, size, msg);
+ res = read_body (conn, size, msg, timeout);
need_body = FALSE;
break;
} else {
&hdrval) == RTSP_OK) {
/* there is, read the body */
content_length = atol (hdrval);
- RTSP_CHECK (read_body (conn, content_length, msg), read_error);
+ RTSP_CHECK (read_body (conn, content_length, msg, timeout), read_error);
}
/* save session id in the connection for further use */
&session_id) == RTSP_OK) {
gint sesslen, maxlen, i;
+ /* default session timeout */
+ conn->timeout = 60;
+
sesslen = strlen (session_id);
maxlen = sizeof (conn->session_id) - 1;
/* the sessionid can have attributes marked with ;
* Make sure we strip them */
for (i = 0; i < sesslen; i++) {
- if (session_id[i] == ';')
+ if (session_id[i] == ';') {
maxlen = i;
+ /* parse timeout */
+ if (g_str_has_prefix (&session_id[i], ";timeout=")) {
+ gint timeout;
+
+ /* if we parsed something valid, configure */
+ if ((timeout = atoi (&session_id[i + 9])) > 0)
+ conn->timeout = timeout;
+ }
+ }
}
/* make sure to not overflow */
g_return_val_if_fail (conn != NULL, RTSP_EINVAL);
g_return_val_if_fail (conn->fd >= 0, RTSP_EINVAL);
- res = CLOSE_SOCKET (conn->fd);
+ if (conn->fd != -1) {
+ res = CLOSE_SOCKET (conn->fd);
#ifdef G_OS_WIN32
- WSACleanup ();
+ WSACleanup ();
#endif
- conn->fd = -1;
- if (res != 0)
- goto sys_error;
+ conn->fd = -1;
+ if (res != 0)
+ goto sys_error;
+ }
return RTSP_OK;