g_free (caller);
}
+/* called with sock_lock */
static void
srt_caller_invoke_removed_closure (SRTCaller * caller, GstSRTObject * srtobject)
{
{
struct srt_constant_params *params = srt_params;
+ GST_OBJECT_LOCK (srtobject->element);
+
for (; params->name != NULL; params++) {
if (srt_setsockopt (sock, 0, params->param, ¶ms->val, sizeof (gint))) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
"failed to set %s (reason: %s)", params->name,
srt_getlasterror_str ());
- return FALSE;
+ goto err;
}
}
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
"failed to set passphrase (reason: %s)", srt_getlasterror_str ());
- return FALSE;
+ goto err;
}
if (!gst_structure_get_int (srtobject->parameters, "pbkeylen", &pbkeylen)) {
if (srt_setsockopt (sock, 0, SRTO_PBKEYLEN, &pbkeylen, sizeof (int))) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
"failed to set pbkeylen (reason: %s)", srt_getlasterror_str ());
- return FALSE;
+ goto err;
}
}
if (srt_setsockopt (sock, 0, SRTO_LATENCY, &latency, sizeof (int))) {
g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
"failed to set latency (reason: %s)", srt_getlasterror_str ());
- return FALSE;
+ goto err;
}
}
}
}
+ GST_OBJECT_UNLOCK (srtobject->element);
return TRUE;
+
+err:
+ GST_OBJECT_UNLOCK (srtobject->element);
+ return FALSE;
}
GstSRTObject *
gst_srt_object_set_property_helper (GstSRTObject * srtobject,
guint prop_id, const GValue * value, GParamSpec * pspec)
{
+ GST_OBJECT_LOCK (srtobject->element);
+
switch (prop_id) {
- case PROP_URI:{
- const gchar *uri = g_value_get_string (value);
- gst_srt_object_set_uri (srtobject, uri, NULL);
+ case PROP_URI:
+ gst_srt_object_set_uri (srtobject, g_value_get_string (value), NULL);
break;
- }
case PROP_MODE:
gst_structure_set_value (srtobject->parameters, "mode", value);
break;
gst_structure_set_value (srtobject->parameters, "pbkeylen", value);
break;
case PROP_WAIT_FOR_CONNECTION:
- GST_OBJECT_LOCK (srtobject->element);
srtobject->wait_for_connection = g_value_get_boolean (value);
- GST_OBJECT_UNLOCK (srtobject->element);
break;
default:
- return FALSE;
+ goto err;
}
+
+ GST_OBJECT_UNLOCK (srtobject->element);
return TRUE;
+
+err:
+ GST_OBJECT_UNLOCK (srtobject->element);
+ return FALSE;
}
gboolean
gst_srt_object_get_property_helper (GstSRTObject * srtobject,
guint prop_id, GValue * value, GParamSpec * pspec)
{
+ GST_OBJECT_LOCK (srtobject->element);
+
switch (prop_id) {
case PROP_URI:
g_value_take_string (value, gst_uri_to_string (srtobject->uri));
case PROP_STATS:
g_value_take_boxed (value, gst_srt_object_get_stats (srtobject));
break;
- case PROP_WAIT_FOR_CONNECTION:{
+ case PROP_WAIT_FOR_CONNECTION:
g_value_set_boolean (value, srtobject->wait_for_connection);
break;
- }
default:
- return FALSE;
+ goto err;
}
+ GST_OBJECT_UNLOCK (srtobject->element);
return TRUE;
+
+err:
+ GST_OBJECT_UNLOCK (srtobject->element);
+ return FALSE;
}
void
}
}
+/* called with GST_OBJECT_LOCK (srtobject->element) held */
gboolean
gst_srt_object_set_uri (GstSRTObject * srtobject, const gchar * uri,
GError ** err)
gint rsocklen = 1;
for (;;) {
+ GST_OBJECT_LOCK (srtobject->element);
if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
&poll_timeout)) {
poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
}
+ GST_OBJECT_UNLOCK (srtobject->element);
GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
continue;
}
- GST_OBJECT_LOCK (srtobject->element);
+ g_mutex_lock (&srtobject->sock_lock);
srtobject->callers = g_list_append (srtobject->callers, caller);
g_cond_signal (&srtobject->sock_cond);
- GST_OBJECT_UNLOCK (srtobject->element);
+ g_mutex_unlock (&srtobject->sock_lock);
/* notifying caller-added */
if (srtobject->caller_added_closure != NULL) {
gsize bind_sa_len;
GSocketAddress *bind_addr;
+ GST_OBJECT_LOCK (srtobject->element);
+
gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
local_address =
if (local_address == NULL)
local_address = GST_SRT_DEFAULT_LOCALADDRESS;
+ GST_OBJECT_UNLOCK (srtobject->element);
+
bind_addr = g_inet_socket_address_new_from_string (local_address, local_port);
bind_sa_len = g_socket_address_get_native_size (bind_addr);
bind_sa = g_alloca (bind_sa_len);
goto failed;
}
+ GST_OBJECT_LOCK (srtobject->element);
gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
local_address =
gst_structure_get_string (srtobject->parameters, "localaddress");
+ GST_OBJECT_UNLOCK (srtobject->element);
+
/* According to SRT norm, bind local address and port if specified */
if (local_address != NULL && local_port != 0) {
gpointer bind_sa;
gpointer sa;
size_t sa_len;
const gchar *addr_str;
+ guint port;
+ gboolean ret = FALSE;
+
+ GST_OBJECT_LOCK (srtobject->element);
srtobject->opened = FALSE;
}
addr_str = gst_uri_get_host (srtobject->uri);
-
if (addr_str == NULL) {
addr_str = GST_SRT_DEFAULT_LOCALADDRESS;
GST_DEBUG_OBJECT (srtobject->element,
" setting listener mode", addr_str);
}
- socket_address =
- g_inet_socket_address_new_from_string (addr_str,
- gst_uri_get_port (srtobject->uri));
+ port = gst_uri_get_port (srtobject->uri);
+
+ GST_DEBUG_OBJECT (srtobject->element,
+ "Opening SRT socket with parameters: %" GST_PTR_FORMAT,
+ srtobject->parameters);
+
+ if (!gst_structure_get_enum (srtobject->parameters,
+ "mode", GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode)) {
+ GST_WARNING_OBJECT (srtobject->element,
+ "Cannot get connection mode information." " Use default mode");
+ connection_mode = GST_TYPE_SRT_CONNECTION_MODE;
+ }
+
+ GST_OBJECT_UNLOCK (srtobject->element);
+
+ socket_address = g_inet_socket_address_new_from_string (addr_str, port);
if (socket_address == NULL) {
g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ,
goto out;
}
- GST_DEBUG_OBJECT (srtobject->element,
- "Opening SRT socket with parameters: %" GST_PTR_FORMAT,
- srtobject->parameters);
-
- if (!gst_structure_get_enum (srtobject->parameters,
- "mode", GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode)) {
- GST_WARNING_OBJECT (srtobject->element,
- "Cannot get connection mode information." " Use default mode");
- connection_mode = GST_TYPE_SRT_CONNECTION_MODE;
- }
-
srtobject->listener_poll_id = srt_epoll_create ();
- srtobject->opened =
+ ret =
gst_srt_object_open_connection
(srtobject, cancellable, connection_mode, sa, sa_len, error);
+ GST_OBJECT_LOCK (srtobject->element);
+ srtobject->opened = ret;
+ GST_OBJECT_UNLOCK (srtobject->element);
+
out:
g_clear_object (&socket_address);
- return srtobject->opened;
+ return ret;
}
void
gst_srt_object_close (GstSRTObject * srtobject)
{
- GST_OBJECT_LOCK (srtobject->element);
+ g_mutex_lock (&srtobject->sock_lock);
if (srtobject->poll_id != SRT_ERROR) {
srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
}
}
if (srtobject->thread) {
GThread *thread = g_steal_pointer (&srtobject->thread);
- GST_OBJECT_UNLOCK (srtobject->element);
+ g_mutex_unlock (&srtobject->sock_lock);
g_thread_join (thread);
- GST_OBJECT_LOCK (srtobject->element);
+ g_mutex_lock (&srtobject->sock_lock);
}
if (srtobject->listener_sock != SRT_INVALID_SOCK) {
if (srtobject->callers) {
GList *callers = g_steal_pointer (&srtobject->callers);
- GST_OBJECT_UNLOCK (srtobject->element);
g_list_foreach (callers, (GFunc) srt_caller_invoke_removed_closure,
srtobject);
- GST_OBJECT_LOCK (srtobject->element);
g_list_free_full (callers, (GDestroyNotify) srt_caller_free);
}
g_clear_pointer (&srtobject->caller_added_closure, g_closure_unref);
g_clear_pointer (&srtobject->caller_removed_closure, g_closure_unref);
+ g_mutex_unlock (&srtobject->sock_lock);
+
+ GST_OBJECT_LOCK (srtobject->element);
srtobject->opened = FALSE;
GST_OBJECT_UNLOCK (srtobject->element);
}
GST_DEBUG_OBJECT (srtobject->element, "Waiting connection from caller");
- GST_OBJECT_LOCK (srtobject->element);
+ g_mutex_lock (&srtobject->sock_lock);
while (!g_cancellable_is_cancelled (cancellable)) {
ret = (srtobject->callers != NULL);
if (ret)
break;
- g_cond_wait (&srtobject->sock_cond,
- GST_OBJECT_GET_LOCK (srtobject->element));
+ g_cond_wait (&srtobject->sock_cond, &srtobject->sock_lock);
}
- GST_OBJECT_UNLOCK (srtobject->element);
+ g_mutex_unlock (&srtobject->sock_lock);
GST_DEBUG_OBJECT (srtobject->element, "got %s connection", ret ? "a" : "no");
g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
(srtobject->element)) == GST_URI_SRC, -1);
+ GST_OBJECT_LOCK (srtobject->element);
+
gst_structure_get_enum (srtobject->parameters, "mode",
GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
- if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
- SRTCaller *caller;
+ if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
+ &poll_timeout)) {
+ poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
+ }
+ GST_OBJECT_UNLOCK (srtobject->element);
+
+ if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
return -1;
- GST_OBJECT_LOCK (srtobject->element);
- caller = srtobject->callers->data;
- if (srtobject->callers)
+ g_mutex_lock (&srtobject->sock_lock);
+ if (srtobject->callers) {
+ SRTCaller *caller = srtobject->callers->data;
poll_id = caller->poll_id;
- GST_OBJECT_UNLOCK (srtobject->element);
+ }
+ g_mutex_unlock (&srtobject->sock_lock);
+
if (poll_id == SRT_ERROR)
return 0;
} else {
poll_id = srtobject->poll_id;
}
- if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
- &poll_timeout)) {
- poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
- }
-
while (!g_cancellable_is_cancelled (cancellable)) {
SRTSOCKET rsock;
/* connection is only waited for in listener mode,
* but there is no harm in raising signal in any case */
- GST_OBJECT_LOCK (srtobject->element);
+ g_mutex_lock (&srtobject->sock_lock);
/* however, a race might be harmful ...
* the cancellation is used as 'flushing' flag here,
* so make sure it is so detected by the intended part at proper time */
g_cancellable_cancel (cancellable);
g_cond_signal (&srtobject->sock_cond);
- GST_OBJECT_UNLOCK (srtobject->element);
+ g_mutex_unlock (&srtobject->sock_lock);
}
static gboolean
{
GList *callers;
- GST_OBJECT_LOCK (srtobject->element);
+ g_mutex_lock (&srtobject->sock_lock);
callers = srtobject->callers;
while (callers != NULL) {
gssize len = 0;
callers = callers->next;
if (g_cancellable_is_cancelled (cancellable)) {
- GST_OBJECT_UNLOCK (srtobject->element);
- return -1;
+ goto cancelled;
}
if (!caller->sent_headers) {
srt_caller_free (caller);
}
- GST_OBJECT_UNLOCK (srtobject->element);
-
+ g_mutex_unlock (&srtobject->sock_lock);
return mapinfo->size;
+
+cancelled:
+ g_mutex_unlock (&srtobject->sock_lock);
+ return -1;
}
static gssize
const guint8 *msg = mapinfo->data;
gint payload_size, optlen = 1;
+ GST_OBJECT_LOCK (srtobject->element);
if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
&poll_timeout)) {
poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
}
+ GST_OBJECT_UNLOCK (srtobject->element);
if (!srtobject->sent_headers) {
if (!gst_srt_object_send_headers (srtobject, srtobject->sock,
{
gssize len = 0;
GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
+ gboolean wait_for_connection;
/* Only sink element can write data */
g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
(srtobject->element)) == GST_URI_SINK, -1);
+ GST_OBJECT_LOCK (srtobject->element);
gst_structure_get_enum (srtobject->parameters, "mode",
GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
+ wait_for_connection = srtobject->wait_for_connection;
+ GST_OBJECT_UNLOCK (srtobject->element);
if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
- if (srtobject->wait_for_connection) {
+ if (wait_for_connection) {
if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
return -1;
}
GstStructure *s = NULL;
gboolean is_sender = GST_IS_BASE_SINK (srtobject->element);
- GST_OBJECT_LOCK (srtobject->element);
+ g_mutex_lock (&srtobject->sock_lock);
if (srtobject->sock != SRT_INVALID_SOCK) {
s = get_stats_for_srtsock (srtobject->sock, is_sender);
goto done;
}
done:
- GST_OBJECT_UNLOCK (srtobject->element);
+ g_mutex_unlock (&srtobject->sock_lock);
return s;
}