sink->state = GST_RTSP_STATE_INVALID;
+ g_mutex_init (&sink->conninfo.send_lock);
+ g_mutex_init (&sink->conninfo.recv_lock);
+
sink->internal_bin = (GstBin *) gst_bin_new ("rtspbin");
gst_element_set_locked_state (GST_ELEMENT_CAST (sink->internal_bin), TRUE);
gst_bin_add (GST_BIN (sink), GST_ELEMENT_CAST (sink->internal_bin));
g_rec_mutex_clear (&rtsp_client_sink->stream_rec_lock);
g_rec_mutex_clear (&rtsp_client_sink->state_rec_lock);
+ g_mutex_clear (&rtsp_client_sink->conninfo.send_lock);
+ g_mutex_clear (&rtsp_client_sink->conninfo.recv_lock);
+
g_mutex_clear (&rtsp_client_sink->send_lock);
g_mutex_clear (&rtsp_client_sink->preroll_lock);
(void) gst_rtsp_client_sink_get_factories ();
+ g_mutex_init (&context->conninfo.send_lock);
+ g_mutex_init (&context->conninfo.recv_lock);
+
GST_RTSP_STATE_LOCK (sink);
sink->contexts = g_list_prepend (sink->contexts, context);
GST_RTSP_STATE_UNLOCK (sink);
g_free (context->conninfo.location);
context->conninfo.location = NULL;
+ g_mutex_clear (&context->conninfo.send_lock);
+ g_mutex_clear (&context->conninfo.recv_lock);
+
g_free (context);
gst_element_remove_pad (element, pad);
static GstRTSPResult
gst_rtsp_client_sink_connection_send (GstRTSPClientSink * sink,
- GstRTSPConnection * conn, GstRTSPMessage * message, GTimeVal * timeout)
+ GstRTSPConnInfo * conninfo, GstRTSPMessage * message, GTimeVal * timeout)
{
GstRTSPResult ret;
- if (conn)
- ret = gst_rtsp_connection_send (conn, message, timeout);
- else
+ if (conninfo->connection) {
+ g_mutex_lock (&conninfo->send_lock);
+ ret = gst_rtsp_connection_send (conninfo->connection, message, timeout);
+ g_mutex_unlock (&conninfo->send_lock);
+ } else {
ret = GST_RTSP_ERROR;
+ }
return ret;
}
static GstRTSPResult
gst_rtsp_client_sink_connection_receive (GstRTSPClientSink * sink,
- GstRTSPConnection * conn, GstRTSPMessage * message, GTimeVal * timeout)
+ GstRTSPConnInfo * conninfo, GstRTSPMessage * message, GTimeVal * timeout)
{
GstRTSPResult ret;
- if (conn)
- ret = gst_rtsp_connection_receive (conn, message, timeout);
- else
+ if (conninfo->connection) {
+ g_mutex_lock (&conninfo->recv_lock);
+ ret = gst_rtsp_connection_receive (conninfo->connection, message, timeout);
+ g_mutex_unlock (&conninfo->recv_lock);
+ } else {
ret = GST_RTSP_ERROR;
+ }
return ret;
}
/* FIXME, handle server request, reply with OK, for now */
static GstRTSPResult
gst_rtsp_client_sink_handle_request (GstRTSPClientSink * sink,
- GstRTSPConnection * conn, GstRTSPMessage * request)
+ GstRTSPConnInfo * conninfo, GstRTSPMessage * request)
{
GstRTSPMessage response = { 0 };
GstRTSPResult res;
if (sink->debug)
gst_rtsp_message_dump (&response);
- res = gst_rtsp_client_sink_connection_send (sink, conn, &response, NULL);
+ res = gst_rtsp_client_sink_connection_send (sink, conninfo, &response, NULL);
if (res < 0)
goto send_error;
gst_rtsp_message_dump (&request);
res =
- gst_rtsp_client_sink_connection_send (sink, sink->conninfo.connection,
+ gst_rtsp_client_sink_connection_send (sink, &sink->conninfo,
&request, NULL);
if (res < 0)
goto send_error;
* keep-alive request to keep the session open. */
res =
gst_rtsp_client_sink_connection_receive (sink,
- sink->conninfo.connection, &message, &tv_timeout);
+ &sink->conninfo, &message, &tv_timeout);
switch (res) {
case GST_RTSP_OK:
/* server sends us a request message, handle it */
res =
gst_rtsp_client_sink_handle_request (sink,
- sink->conninfo.connection, &message);
+ &sink->conninfo, &message);
if (res == GST_RTSP_EEOF)
goto server_eof;
else if (res < 0)
static GstRTSPResult
gst_rtsp_client_sink_try_send (GstRTSPClientSink * sink,
- GstRTSPConnection * conn, GstRTSPMessage * request,
+ GstRTSPConnInfo * conninfo, GstRTSPMessage * request,
GstRTSPMessage * response, GstRTSPStatusCode * code)
{
GstRTSPResult res;
g_mutex_lock (&sink->send_lock);
res =
- gst_rtsp_client_sink_connection_send (sink, conn, request,
+ gst_rtsp_client_sink_connection_send (sink, conninfo, request,
sink->ptcp_timeout);
if (res < 0) {
g_mutex_unlock (&sink->send_lock);
goto send_error;
}
- gst_rtsp_connection_reset_timeout (conn);
+ gst_rtsp_connection_reset_timeout (conninfo->connection);
/* See if we should handle the response */
if (response == NULL) {
}
next:
res =
- gst_rtsp_client_sink_connection_receive (sink, conn, response,
+ gst_rtsp_client_sink_connection_receive (sink, conninfo, response,
sink->ptcp_timeout);
g_mutex_unlock (&sink->send_lock);
switch (response->type) {
case GST_RTSP_MESSAGE_REQUEST:
- res = gst_rtsp_client_sink_handle_request (sink, conn, response);
+ res = gst_rtsp_client_sink_handle_request (sink, conninfo, response);
if (res == GST_RTSP_EEOF)
goto server_eof;
else if (res < 0)
* Returns: #GST_RTSP_OK if the processing was successful.
*/
static GstRTSPResult
-gst_rtsp_client_sink_send (GstRTSPClientSink * sink, GstRTSPConnection * conn,
+gst_rtsp_client_sink_send (GstRTSPClientSink * sink, GstRTSPConnInfo * conninfo,
GstRTSPMessage * request, GstRTSPMessage * response,
GstRTSPStatusCode * code)
{
method = request->type_data.request.method;
if ((res =
- gst_rtsp_client_sink_try_send (sink, conn, request, response,
+ gst_rtsp_client_sink_try_send (sink, conninfo, request, response,
&int_code)) < 0)
goto error;
("Retrieving server options"));
if ((res =
- gst_rtsp_client_sink_send (sink, sink->conninfo.connection, &request,
+ gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
&response, NULL)) < 0)
goto send_error;
GST_ELEMENT_PROGRESS (sink, CONTINUE, "close", ("Closing stream"));
if ((res =
- gst_rtsp_client_sink_send (sink, info->connection, &request,
+ gst_rtsp_client_sink_send (sink, info, &request,
&response, NULL)) < 0)
goto send_error;
gst_rtsp_message_take_body (&message, map_info.data, map_info.size);
res =
- gst_rtsp_client_sink_try_send (sink, sink->conninfo.connection, &message,
+ gst_rtsp_client_sink_try_send (sink, &sink->conninfo, &message,
NULL, NULL);
gst_rtsp_message_steal_body (&message, &data, &usize);
GstRTSPStreamContext *context = (GstRTSPStreamContext *) walk->data;
GstRTSPStream *stream;
- GstRTSPConnection *conn;
+ GstRTSPConnInfo *info;
GstRTSPProfile profiles;
GstRTSPProfile cur_profile;
gchar *transports;
stream);
continue;
}
- conn = context->conninfo.connection;
+ info = &context->conninfo;
} else {
- conn = sink->conninfo.connection;
+ info = &sink->conninfo;
}
GST_DEBUG_OBJECT (sink, "doing setup of stream %p with %s", stream,
context->conninfo.location);
- conn_socket = gst_rtsp_connection_get_read_socket (conn);
+ conn_socket = gst_rtsp_connection_get_read_socket (info->connection);
sa = g_socket_get_local_address (conn_socket, NULL);
family = g_socket_address_get_family (sa);
g_object_unref (sa);
context->index));
/* handle the code ourselves */
- res = gst_rtsp_client_sink_send (sink, conn, &request, &response, &code);
+ res = gst_rtsp_client_sink_send (sink, info, &request, &response, &code);
if (res < 0)
goto send_error;
("Sending server stream info"));
if ((res =
- gst_rtsp_client_sink_send (sink, sink->conninfo.connection, &request,
+ gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
&response, NULL)) < 0)
goto send_error;
if (async)
GST_ELEMENT_PROGRESS (sink, CONTINUE, "record", ("Starting recording"));
if ((res =
- gst_rtsp_client_sink_send (sink, sink->conninfo.connection, &request,
+ gst_rtsp_client_sink_send (sink, &sink->conninfo, &request,
&response, NULL)) < 0)
goto send_error;
* aggregate control */
for (walk = sink->contexts; walk; walk = g_list_next (walk)) {
GstRTSPStreamContext *stream = (GstRTSPStreamContext *) walk->data;
- GstRTSPConnection *conn;
+ GstRTSPConnInfo *info;
const gchar *setup_url;
/* try aggregate control first but do non-aggregate control otherwise */
continue;
if (sink->conninfo.connection) {
- conn = sink->conninfo.connection;
+ info = &sink->conninfo;
} else if (stream->conninfo.connection) {
- conn = stream->conninfo.connection;
+ info = &stream->conninfo;
} else {
continue;
}
goto create_request_failed;
if ((res =
- gst_rtsp_client_sink_send (sink, conn, &request, &response,
+ gst_rtsp_client_sink_send (sink, info, &request, &response,
NULL)) < 0)
goto send_error;