2 * Copyright (C) 2018, Collabora Ltd.
3 * Copyright (C) 2018, SK Telecom, Co., Ltd.
4 * Author: Jeongseok Kim <jeongseok.kim@sk.com>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
26 /* Needed for GValueArray */
27 #define GLIB_DISABLE_DEPRECATION_WARNINGS
29 #include "gstsrtobject.h"
31 #include <gst/base/gstbasesink.h>
32 #include <gio/gnetworking.h>
35 GST_DEBUG_CATEGORY_EXTERN (gst_debug_srtobject);
36 #define GST_CAT_DEFAULT gst_debug_srtobject
57 GSocketAddress *sockaddr;
58 gboolean sent_headers;
64 SRTCaller *caller = g_new0 (SRTCaller, 1);
65 caller->sock = SRT_INVALID_SOCK;
66 caller->poll_id = SRT_ERROR;
67 caller->sent_headers = FALSE;
73 srt_caller_free (SRTCaller * caller)
75 g_return_if_fail (caller != NULL);
77 g_clear_object (&caller->sockaddr);
79 if (caller->sock != SRT_INVALID_SOCK) {
80 srt_close (caller->sock);
83 if (caller->poll_id != SRT_ERROR) {
84 srt_epoll_release (caller->poll_id);
91 srt_caller_invoke_removed_closure (SRTCaller * caller, GstSRTObject * srtobject)
93 GValue values[2] = { G_VALUE_INIT };
95 if (srtobject->caller_removed_closure == NULL) {
99 g_value_init (&values[0], G_TYPE_INT);
100 g_value_set_int (&values[0], caller->sock);
102 g_value_init (&values[1], G_TYPE_SOCKET_ADDRESS);
103 g_value_set_object (&values[1], caller->sockaddr);
105 g_closure_invoke (srtobject->caller_removed_closure, NULL, 2, values, NULL);
107 g_value_unset (&values[0]);
108 g_value_unset (&values[1]);
111 struct srt_constant_params
118 static struct srt_constant_params srt_params[] = {
119 {"SRTO_SNDSYN", SRTO_SNDSYN, 0}, /* 0: non-blocking */
120 {"SRTO_RCVSYN", SRTO_RCVSYN, 0}, /* 0: non-blocking */
121 {"SRTO_LINGER", SRTO_LINGER, 0},
122 {"SRTO_TSBPMODE", SRTO_TSBPDMODE, 1}, /* Timestamp-based Packet Delivery mode must be enabled */
126 static gint srt_init_refcount = 0;
129 gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject,
132 struct srt_constant_params *params = srt_params;
134 for (; params->name != NULL; params++) {
135 if (srt_setsockopt (sock, 0, params->param, ¶ms->val, sizeof (gint))) {
136 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
137 "failed to set %s (reason: %s)", params->name,
138 srt_getlasterror_str ());
143 if (srtobject->passphrase != NULL && srtobject->passphrase[0] != '\0') {
146 if (srt_setsockopt (sock, 0, SRTO_PASSPHRASE, srtobject->passphrase,
147 strlen (srtobject->passphrase))) {
148 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
149 "failed to set passphrase (reason: %s)", srt_getlasterror_str ());
154 if (!gst_structure_get_int (srtobject->parameters, "pbkeylen", &pbkeylen)) {
155 pbkeylen = GST_SRT_DEFAULT_PBKEYLEN;
158 if (srt_setsockopt (sock, 0, SRTO_PBKEYLEN, &pbkeylen, sizeof (int))) {
159 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
160 "failed to set pbkeylen (reason: %s)", srt_getlasterror_str ());
168 if (!gst_structure_get_int (srtobject->parameters, "latency", &latency))
169 latency = GST_SRT_DEFAULT_LATENCY;
170 if (srt_setsockopt (sock, 0, SRTO_LATENCY, &latency, sizeof (int))) {
171 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
172 "failed to set latency (reason: %s)", srt_getlasterror_str ());
177 if (gst_structure_has_field (srtobject->parameters, "streamid")) {
178 const gchar *streamid;
180 streamid = gst_structure_get_string (srtobject->parameters, "streamid");
181 if (streamid != NULL && streamid[0] != '\0') {
182 if (srt_setsockopt (sock, 0, SRTO_STREAMID, streamid, strlen (streamid))) {
183 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
184 "failed to set stream ID (reason: %s)", srt_getlasterror_str ());
193 gst_srt_object_new (GstElement * element)
195 GstSRTObject *srtobject;
197 if (g_atomic_int_get (&srt_init_refcount) == 0) {
198 GST_DEBUG_OBJECT (element, "Starting up SRT");
199 if (srt_startup () != 0) {
200 g_warning ("Failed to initialize SRT (reason: %s)",
201 srt_getlasterror_str ());
205 g_atomic_int_inc (&srt_init_refcount);
207 srtobject = g_new0 (GstSRTObject, 1);
208 srtobject->element = element;
209 srtobject->parameters = gst_structure_new ("application/x-srt-params",
210 "poll-timeout", G_TYPE_INT, GST_SRT_DEFAULT_POLL_TIMEOUT,
211 "latency", G_TYPE_INT, GST_SRT_DEFAULT_LATENCY,
212 "mode", GST_TYPE_SRT_CONNECTION_MODE, GST_SRT_DEFAULT_MODE, NULL);
214 srtobject->sock = SRT_INVALID_SOCK;
215 srtobject->poll_id = srt_epoll_create ();
216 srtobject->listener_sock = SRT_INVALID_SOCK;
217 srtobject->listener_poll_id = SRT_ERROR;
218 srtobject->sent_headers = FALSE;
220 g_cond_init (&srtobject->sock_cond);
225 gst_srt_object_destroy (GstSRTObject * srtobject)
227 g_return_if_fail (srtobject != NULL);
229 if (srtobject->poll_id != SRT_ERROR) {
230 srt_epoll_release (srtobject->poll_id);
231 srtobject->poll_id = SRT_ERROR;
234 g_cond_clear (&srtobject->sock_cond);
236 GST_DEBUG_OBJECT (srtobject->element, "Destroying srtobject");
237 gst_structure_free (srtobject->parameters);
239 g_free (srtobject->passphrase);
241 if (g_atomic_int_dec_and_test (&srt_init_refcount)) {
243 GST_DEBUG_OBJECT (srtobject->element, "Cleaning up SRT");
246 g_clear_pointer (&srtobject->uri, gst_uri_unref);
252 gst_srt_object_set_property_helper (GstSRTObject * srtobject,
253 guint prop_id, const GValue * value, GParamSpec * pspec)
257 const gchar *uri = g_value_get_string (value);
258 gst_srt_object_set_uri (srtobject, uri, NULL);
262 gst_structure_set_value (srtobject->parameters, "mode", value);
264 case PROP_POLL_TIMEOUT:
265 gst_structure_set_value (srtobject->parameters, "poll-timeout", value);
268 gst_structure_set_value (srtobject->parameters, "latency", value);
270 case PROP_LOCALADDRESS:
271 gst_structure_set_value (srtobject->parameters, "localaddress", value);
274 gst_structure_set_value (srtobject->parameters, "localport", value);
276 case PROP_PASSPHRASE:
277 g_free (srtobject->passphrase);
278 srtobject->passphrase = g_value_dup_string (value);
281 gst_structure_set_value (srtobject->parameters, "pbkeylen", value);
290 gst_srt_object_get_property_helper (GstSRTObject * srtobject,
291 guint prop_id, GValue * value, GParamSpec * pspec)
295 g_value_take_string (value, gst_uri_to_string (srtobject->uri));
298 GstSRTConnectionMode v;
299 if (!gst_structure_get_enum (srtobject->parameters, "mode",
300 GST_TYPE_SRT_CONNECTION_MODE, (gint *) & v)) {
301 GST_WARNING_OBJECT (srtobject->element, "Failed to get 'mode'");
302 v = GST_SRT_CONNECTION_MODE_NONE;
304 g_value_set_enum (value, v);
307 case PROP_LOCALADDRESS:
308 g_value_set_string (value,
309 gst_structure_get_string (srtobject->parameters, "localaddress"));
311 case PROP_LOCALPORT:{
313 if (!gst_structure_get_uint (srtobject->parameters, "localport", &v)) {
314 GST_WARNING_OBJECT (srtobject->element, "Failed to get 'localport'");
315 v = GST_SRT_DEFAULT_PORT;
317 g_value_set_uint (value, v);
322 if (!gst_structure_get_enum (srtobject->parameters, "pbkeylen",
323 GST_TYPE_SRT_KEY_LENGTH, (gint *) & v)) {
324 GST_WARNING_OBJECT (srtobject->element, "Failed to get 'pbkeylen'");
325 v = GST_SRT_KEY_LENGTH_NO_KEY;
327 g_value_set_enum (value, v);
330 case PROP_POLL_TIMEOUT:{
332 if (!gst_structure_get_int (srtobject->parameters, "poll-timeout", &v)) {
333 GST_WARNING_OBJECT (srtobject->element, "Failed to get 'poll-timeout'");
334 v = GST_SRT_DEFAULT_POLL_TIMEOUT;
336 g_value_set_int (value, v);
341 if (!gst_structure_get_int (srtobject->parameters, "latency", &v)) {
342 GST_WARNING_OBJECT (srtobject->element, "Failed to get 'latency'");
343 v = GST_SRT_DEFAULT_LATENCY;
345 g_value_set_int (value, v);
349 g_value_take_boxed (value, gst_srt_object_get_stats (srtobject));
359 gst_srt_object_install_properties_helper (GObjectClass * gobject_class)
364 * The URI used by SRT connection. User can specify SRT specific options by URI parameters.
365 * Refer to <a href="https://github.com/Haivision/srt/blob/master/docs/stransmit.md#medium-srt">Mediun: SRT</a>
367 g_object_class_install_property (gobject_class, PROP_URI,
368 g_param_spec_string ("uri", "URI",
369 "URI in the form of srt://address:port", GST_SRT_DEFAULT_URI,
370 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
371 G_PARAM_STATIC_STRINGS));
376 * The SRT connection mode.
377 * This property can be set by URI parameters.
379 g_object_class_install_property (gobject_class, PROP_MODE,
380 g_param_spec_enum ("mode", "Connection mode",
381 "SRT connection mode", GST_TYPE_SRT_CONNECTION_MODE,
382 GST_SRT_CONNECTION_MODE_CALLER,
383 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
384 G_PARAM_STATIC_STRINGS));
387 * GstSRTSrc:localaddress:
389 * The address to bind when #GstSRTSrc:mode is listener or rendezvous.
390 * This property can be set by URI parameters.
392 g_object_class_install_property (gobject_class, PROP_LOCALADDRESS,
393 g_param_spec_string ("localaddress", "Local address",
394 "Local address to bind", GST_SRT_DEFAULT_LOCALADDRESS,
395 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
396 G_PARAM_STATIC_STRINGS));
399 * GstSRTSrc:localport:
401 * The local port to bind when #GstSRTSrc:mode is listener or rendezvous.
402 * This property can be set by URI parameters.
404 g_object_class_install_property (gobject_class, PROP_LOCALPORT,
405 g_param_spec_uint ("localport", "Local port",
406 "Local port to bind", 0,
407 65535, GST_SRT_DEFAULT_PORT,
408 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
409 G_PARAM_STATIC_STRINGS));
412 * GstSRTSrc:passphrase:
414 * The password for the encrypted transmission.
415 * This property can be set by URI parameters.
417 g_object_class_install_property (gobject_class, PROP_PASSPHRASE,
418 g_param_spec_string ("passphrase", "Passphrase",
419 "Password for the encrypted transmission", "",
420 G_PARAM_WRITABLE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS));
423 * GstSRTSrc:pbkeylen:
425 * The crypto key length.
426 * This property can be set by URI parameters.
428 g_object_class_install_property (gobject_class, PROP_PBKEYLEN,
429 g_param_spec_enum ("pbkeylen", "Crypto key length",
430 "Crypto key length in bytes", GST_TYPE_SRT_KEY_LENGTH,
431 GST_SRT_DEFAULT_PBKEYLEN,
432 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
433 G_PARAM_STATIC_STRINGS));
436 * GstSRTSrc:poll-timeout:
438 * The polling timeout used when srt poll is started.
439 * Even if the default value indicates infinite waiting, it can be cancellable according to #GstState
440 * This property can be set by URI parameters.
442 g_object_class_install_property (gobject_class, PROP_POLL_TIMEOUT,
443 g_param_spec_int ("poll-timeout", "Poll timeout",
444 "Return poll wait after timeout milliseconds (-1 = infinite)", -1,
445 G_MAXINT32, GST_SRT_DEFAULT_POLL_TIMEOUT,
446 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
447 G_PARAM_STATIC_STRINGS));
452 * The maximum accepted transmission latency.
454 g_object_class_install_property (gobject_class, PROP_LATENCY,
455 g_param_spec_int ("latency", "latency",
456 "Minimum latency (milliseconds)", 0,
457 G_MAXINT32, GST_SRT_DEFAULT_LATENCY,
458 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
463 * The statistics from SRT.
465 g_object_class_install_property (gobject_class, PROP_STATS,
466 g_param_spec_boxed ("stats", "Statistics",
467 "SRT Statistics", GST_TYPE_STRUCTURE,
468 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
473 gst_srt_object_set_enum_value (GstStructure * s, GType enum_type,
474 gconstpointer key, gconstpointer value)
476 GEnumClass *enum_class;
477 GEnumValue *enum_value;
479 enum_class = g_type_class_ref (enum_type);
480 enum_value = g_enum_get_value_by_nick (enum_class, value);
483 GValue v = G_VALUE_INIT;
484 g_value_init (&v, enum_type);
485 g_value_set_enum (&v, enum_value->value);
486 gst_structure_set_value (s, key, &v);
489 g_type_class_unref (enum_class);
493 gst_srt_object_set_string_value (GstStructure * s, const gchar * key,
496 GValue v = G_VALUE_INIT;
497 g_value_init (&v, G_TYPE_STRING);
498 g_value_set_static_string (&v, value);
499 gst_structure_set_value (s, key, &v);
504 gst_srt_object_set_uint_value (GstStructure * s, const gchar * key,
507 GValue v = G_VALUE_INIT;
508 g_value_init (&v, G_TYPE_UINT);
509 g_value_set_uint (&v, (guint) strtoul (value, NULL, 10));
510 gst_structure_set_value (s, key, &v);
515 gst_srt_object_validate_parameters (GstStructure * s, GstUri * uri)
517 GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
519 gst_structure_get_enum (s, "mode", GST_TYPE_SRT_CONNECTION_MODE,
520 (gint *) & connection_mode);
522 if (connection_mode == GST_SRT_CONNECTION_MODE_RENDEZVOUS ||
523 connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
525 const gchar *local_address = gst_structure_get_string (s, "localaddress");
527 if (local_address == NULL) {
529 gst_uri_get_host (uri) ==
530 NULL ? GST_SRT_DEFAULT_LOCALADDRESS : gst_uri_get_host (uri);
531 gst_srt_object_set_string_value (s, "localaddress", local_address);
534 if (!gst_structure_get_uint (s, "localport", &local_port)) {
536 gst_uri_get_port (uri) ==
537 GST_URI_NO_PORT ? GST_SRT_DEFAULT_PORT : gst_uri_get_port (uri);
538 gst_structure_set (s, "localport", G_TYPE_UINT, local_port, NULL);
544 gst_srt_object_set_uri (GstSRTObject * srtobject, const gchar * uri,
547 GHashTable *query_table = NULL;
550 const char *addr_str;
552 if (srtobject->opened) {
554 ("It's not supported to change the 'uri' property when SRT socket is opened.");
555 g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
556 "It's not supported to change the 'uri' property when SRT socket is opened");
561 if (!g_str_has_prefix (uri, GST_SRT_DEFAULT_URI_SCHEME)) {
562 g_warning ("Given uri cannot be used for SRT connection.");
563 g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
564 "Invalid SRT URI scheme");
568 g_clear_pointer (&srtobject->uri, gst_uri_unref);
569 srtobject->uri = gst_uri_from_string (uri);
571 query_table = gst_uri_get_query_table (srtobject->uri);
573 GST_DEBUG_OBJECT (srtobject->element,
574 "set uri to (host: %s, port: %d) with %d query strings",
575 gst_uri_get_host (srtobject->uri), gst_uri_get_port (srtobject->uri),
576 query_table == NULL ? 0 : g_hash_table_size (query_table));
578 addr_str = gst_uri_get_host (srtobject->uri);
580 gst_srt_object_set_enum_value (srtobject->parameters,
581 GST_TYPE_SRT_CONNECTION_MODE, "mode", "caller");
583 gst_srt_object_set_enum_value (srtobject->parameters,
584 GST_TYPE_SRT_CONNECTION_MODE, "mode", "listener");
587 g_hash_table_iter_init (&iter, query_table);
588 while (g_hash_table_iter_next (&iter, &key, &value)) {
589 if (!g_strcmp0 ("mode", key)) {
590 gst_srt_object_set_enum_value (srtobject->parameters,
591 GST_TYPE_SRT_CONNECTION_MODE, key, value);
592 } else if (!g_strcmp0 ("localaddress", key)) {
593 gst_srt_object_set_string_value (srtobject->parameters, key, value);
594 } else if (!g_strcmp0 ("localport", key)) {
595 gst_srt_object_set_uint_value (srtobject->parameters, key, value);
596 } else if (!g_strcmp0 ("passphrase", key)) {
597 g_free (srtobject->passphrase);
598 srtobject->passphrase = g_strdup (value);
599 } else if (!g_strcmp0 ("pbkeylen", key)) {
600 gst_srt_object_set_enum_value (srtobject->parameters,
601 GST_TYPE_SRT_KEY_LENGTH, key, value);
602 } else if (!g_strcmp0 ("streamid", key)) {
603 gst_srt_object_set_string_value (srtobject->parameters, key, value);
607 g_hash_table_unref (query_table);
610 gst_srt_object_validate_parameters (srtobject->parameters, srtobject->uri);
616 thread_func (gpointer data)
618 GstSRTObject *srtobject = data;
619 SRTSOCKET caller_sock;
622 struct sockaddr_storage ss;
633 if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
635 poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
638 GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
640 if (srt_epoll_wait (srtobject->listener_poll_id, &rsock,
641 &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) {
642 gint srt_errno = srt_getlasterror (NULL);
644 if (srtobject->listener_poll_id == SRT_ERROR)
646 if (srt_errno == SRT_ETIMEOUT) {
649 GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
650 ("abort polling: %s", srt_getlasterror_str ()), (NULL));
656 srt_accept (srtobject->listener_sock, &caller_sa.sa, &caller_sa_len);
658 if (caller_sock != SRT_INVALID_SOCK) {
660 gint flag = SRT_EPOLL_ERR;
662 caller = srt_caller_new ();
664 g_socket_address_new_from_native (&caller_sa.sa, caller_sa_len);
665 caller->poll_id = srt_epoll_create ();
666 caller->sock = caller_sock;
668 if (gst_uri_handler_get_uri_type (GST_URI_HANDLER
669 (srtobject->element)) == GST_URI_SRC) {
670 flag |= SRT_EPOLL_IN;
672 flag |= SRT_EPOLL_OUT;
675 if (srt_epoll_add_usock (caller->poll_id, caller_sock, &flag)) {
677 GST_ELEMENT_ERROR (srtobject->element, RESOURCE, SETTINGS,
678 ("%s", srt_getlasterror_str ()), (NULL));
680 srt_caller_free (caller);
686 GST_OBJECT_LOCK (srtobject->element);
687 srtobject->callers = g_list_append (srtobject->callers, caller);
688 g_cond_signal (&srtobject->sock_cond);
689 GST_OBJECT_UNLOCK (srtobject->element);
691 /* notifying caller-added */
692 if (srtobject->caller_added_closure != NULL) {
693 GValue values[2] = { G_VALUE_INIT, G_VALUE_INIT };
695 g_value_init (&values[0], G_TYPE_INT);
696 g_value_set_int (&values[0], caller->sock);
698 g_value_init (&values[1], G_TYPE_SOCKET_ADDRESS);
699 g_value_set_object (&values[1], caller->sockaddr);
701 g_closure_invoke (srtobject->caller_added_closure, NULL, 2, values,
704 g_value_unset (&values[1]);
707 GST_DEBUG_OBJECT (srtobject->element, "Accept to connect");
709 if (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) ==
717 gst_srt_object_wait_connect (GstSRTObject * srtobject,
718 GCancellable * cancellable, gpointer sa, size_t sa_len, GError ** error)
720 SRTSOCKET sock = SRT_INVALID_SOCK;
721 const gchar *local_address = NULL;
722 guint local_port = 0;
723 gint sock_flags = SRT_EPOLL_ERR | SRT_EPOLL_IN;
727 GSocketAddress *bind_addr;
729 gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
732 gst_structure_get_string (srtobject->parameters, "localaddress");
733 if (local_address == NULL)
734 local_address = GST_SRT_DEFAULT_LOCALADDRESS;
736 bind_addr = g_inet_socket_address_new_from_string (local_address, local_port);
737 bind_sa_len = g_socket_address_get_native_size (bind_addr);
738 bind_sa = g_alloca (bind_sa_len);
740 if (!g_socket_address_to_native (bind_addr, bind_sa, bind_sa_len, error)) {
744 g_clear_object (&bind_addr);
746 sock = srt_socket (AF_INET, SOCK_DGRAM, 0);
747 if (sock == SRT_INVALID_SOCK) {
748 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_INIT, "%s",
749 srt_getlasterror_str ());
753 if (!gst_srt_object_set_common_params (sock, srtobject, error)) {
757 GST_DEBUG_OBJECT (srtobject->element, "Binding to %s (port: %d)",
758 local_address, local_port);
760 if (srt_bind (sock, bind_sa, bind_sa_len) == SRT_ERROR) {
761 g_set_error (error, GST_RESOURCE_ERROR,
762 GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot bind to %s:%d - %s",
763 local_address, local_port, srt_getlasterror_str ());
767 if (srt_epoll_add_usock (srtobject->listener_poll_id, sock, &sock_flags)) {
768 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
769 srt_getlasterror_str ());
773 GST_DEBUG_OBJECT (srtobject->element, "Starting to listen on bind socket");
774 if (srt_listen (sock, 1) == SRT_ERROR) {
775 g_set_error (error, GST_RESOURCE_ERROR,
776 GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot listen on bind socket: %s",
777 srt_getlasterror_str ());
782 srtobject->listener_sock = sock;
785 g_thread_try_new ("GstSRTObjectListener", thread_func, srtobject, error);
787 if (*error != NULL) {
795 if (srtobject->listener_poll_id != SRT_ERROR) {
796 srt_epoll_release (srtobject->listener_poll_id);
799 if (sock != SRT_INVALID_SOCK) {
803 g_clear_object (&bind_addr);
805 srtobject->listener_poll_id = SRT_ERROR;
806 srtobject->listener_sock = SRT_INVALID_SOCK;
812 gst_srt_object_connect (GstSRTObject * srtobject,
813 GstSRTConnectionMode connection_mode, gpointer sa, size_t sa_len,
817 gint option_val = -1;
818 gint sock_flags = SRT_EPOLL_ERR;
819 guint local_port = 0;
820 const gchar *local_address = NULL;
822 sock = srt_socket (AF_INET, SOCK_DGRAM, 0);
823 if (sock == SRT_INVALID_SOCK) {
824 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_INIT, "%s",
825 srt_getlasterror_str ());
829 if (!gst_srt_object_set_common_params (sock, srtobject, error)) {
833 switch (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element))) {
836 sock_flags |= SRT_EPOLL_IN;
840 sock_flags |= SRT_EPOLL_OUT;
843 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
844 "Cannot determine stream direction");
848 if (srt_setsockopt (sock, 0, SRTO_SENDER, &option_val, sizeof (gint))) {
849 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
850 srt_getlasterror_str ());
854 option_val = (connection_mode == GST_SRT_CONNECTION_MODE_RENDEZVOUS);
855 if (srt_setsockopt (sock, 0, SRTO_RENDEZVOUS, &option_val, sizeof (gint))) {
856 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
857 srt_getlasterror_str ());
861 gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
863 gst_structure_get_string (srtobject->parameters, "localaddress");
864 /* According to SRT norm, bind local address and port if specified */
865 if (local_address != NULL && local_port != 0) {
869 GSocketAddress *bind_addr =
870 g_inet_socket_address_new_from_string (local_address,
873 bind_sa_len = g_socket_address_get_native_size (bind_addr);
874 bind_sa = g_alloca (bind_sa_len);
876 if (!g_socket_address_to_native (bind_addr, bind_sa, bind_sa_len, error)) {
877 g_clear_object (&bind_addr);
881 g_clear_object (&bind_addr);
883 GST_DEBUG_OBJECT (srtobject->element, "Binding to %s (port: %d)",
884 local_address, local_port);
886 if (srt_bind (sock, bind_sa, bind_sa_len) == SRT_ERROR) {
887 g_set_error (error, GST_RESOURCE_ERROR,
888 GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot bind to %s:%d - %s",
889 local_address, local_port, srt_getlasterror_str ());
894 if (srt_epoll_add_usock (srtobject->poll_id, sock, &sock_flags)) {
895 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
896 srt_getlasterror_str ());
900 if (srt_connect (sock, sa, sa_len) == SRT_ERROR) {
901 g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ, "%s",
902 srt_getlasterror_str ());
906 srtobject->sock = sock;
912 if (srtobject->poll_id != SRT_ERROR) {
913 srt_epoll_release (srtobject->poll_id);
916 if (sock != SRT_INVALID_SOCK) {
920 srtobject->poll_id = SRT_ERROR;
921 srtobject->sock = SRT_INVALID_SOCK;
927 gst_srt_object_open_connection (GstSRTObject * srtobject,
928 GCancellable * cancellable, GstSRTConnectionMode connection_mode,
929 gpointer sa, size_t sa_len, GError ** error)
931 gboolean ret = FALSE;
933 if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
935 gst_srt_object_wait_connect (srtobject, cancellable, sa, sa_len, error);
938 gst_srt_object_connect (srtobject, connection_mode, sa, sa_len, error);
945 gst_srt_object_open (GstSRTObject * srtobject, GCancellable * cancellable,
948 return gst_srt_object_open_full (srtobject, NULL, NULL, cancellable, error);
952 gst_srt_object_open_full (GstSRTObject * srtobject,
953 GstSRTObjectCallerAdded caller_added_func,
954 GstSRTObjectCallerRemoved caller_removed_func,
955 GCancellable * cancellable, GError ** error)
957 GSocketAddress *socket_address = NULL;
958 GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
962 const gchar *addr_str;
964 srtobject->opened = FALSE;
966 if (caller_added_func != NULL) {
967 srtobject->caller_added_closure =
968 g_cclosure_new (G_CALLBACK (caller_added_func), srtobject, NULL);
969 g_closure_set_marshal (srtobject->caller_added_closure,
970 g_cclosure_marshal_generic);
973 if (caller_removed_func != NULL) {
974 srtobject->caller_removed_closure =
975 g_cclosure_new (G_CALLBACK (caller_removed_func), srtobject, NULL);
976 g_closure_set_marshal (srtobject->caller_removed_closure,
977 g_cclosure_marshal_generic);
980 addr_str = gst_uri_get_host (srtobject->uri);
982 if (addr_str == NULL) {
983 addr_str = GST_SRT_DEFAULT_LOCALADDRESS;
984 GST_DEBUG_OBJECT (srtobject->element,
985 "Given uri doesn't have hostname or address. Use any (%s) and"
986 " setting listener mode", addr_str);
990 g_inet_socket_address_new_from_string (addr_str,
991 gst_uri_get_port (srtobject->uri));
993 if (socket_address == NULL) {
994 g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ,
999 /* FIXME: Unfortunately, SRT doesn't support IPv6 currently. */
1000 if (g_socket_address_get_family (socket_address) != G_SOCKET_FAMILY_IPV4) {
1001 g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ,
1002 "SRT supports IPv4 only");
1006 sa_len = g_socket_address_get_native_size (socket_address);
1007 sa = g_alloca (sa_len);
1009 if (!g_socket_address_to_native (socket_address, sa, sa_len, error)) {
1013 GST_DEBUG_OBJECT (srtobject->element,
1014 "Opening SRT socket with parameters: %" GST_PTR_FORMAT,
1015 srtobject->parameters);
1017 if (!gst_structure_get_enum (srtobject->parameters,
1018 "mode", GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode)) {
1019 GST_WARNING_OBJECT (srtobject->element,
1020 "Cannot get connection mode information." " Use default mode");
1021 connection_mode = GST_TYPE_SRT_CONNECTION_MODE;
1024 srtobject->listener_poll_id = srt_epoll_create ();
1027 gst_srt_object_open_connection
1028 (srtobject, cancellable, connection_mode, sa, sa_len, error);
1031 g_clear_object (&socket_address);
1033 return srtobject->opened;
1037 gst_srt_object_close (GstSRTObject * srtobject)
1039 GST_OBJECT_LOCK (srtobject->element);
1040 if (srtobject->poll_id != SRT_ERROR) {
1041 srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
1044 if (srtobject->sock != SRT_INVALID_SOCK) {
1046 GST_DEBUG_OBJECT (srtobject->element, "Closing SRT socket (0x%x)",
1049 srt_close (srtobject->sock);
1050 srtobject->sock = SRT_INVALID_SOCK;
1053 if (srtobject->listener_poll_id != SRT_ERROR) {
1054 srt_epoll_remove_usock (srtobject->listener_poll_id,
1055 srtobject->listener_sock);
1056 srtobject->listener_poll_id = SRT_ERROR;
1058 if (srtobject->thread) {
1059 GThread *thread = g_steal_pointer (&srtobject->thread);
1060 GST_OBJECT_UNLOCK (srtobject->element);
1061 g_thread_join (thread);
1062 GST_OBJECT_LOCK (srtobject->element);
1065 if (srtobject->listener_sock != SRT_INVALID_SOCK) {
1066 GST_DEBUG_OBJECT (srtobject->element, "Closing SRT listener socket (0x%x)",
1067 srtobject->listener_sock);
1069 srt_close (srtobject->listener_sock);
1070 srtobject->listener_sock = SRT_INVALID_SOCK;
1073 if (srtobject->callers) {
1074 GList *callers = g_steal_pointer (&srtobject->callers);
1075 GST_OBJECT_UNLOCK (srtobject->element);
1076 g_list_foreach (callers, (GFunc) srt_caller_invoke_removed_closure,
1078 GST_OBJECT_LOCK (srtobject->element);
1079 g_list_free_full (callers, (GDestroyNotify) srt_caller_free);
1082 g_clear_pointer (&srtobject->caller_added_closure, g_closure_unref);
1083 g_clear_pointer (&srtobject->caller_removed_closure, g_closure_unref);
1085 srtobject->opened = FALSE;
1086 GST_OBJECT_UNLOCK (srtobject->element);
1090 gst_srt_object_wait_caller (GstSRTObject * srtobject,
1091 GCancellable * cancellable, GError ** errorj)
1093 gboolean ret = FALSE;
1095 GST_DEBUG_OBJECT (srtobject->element, "Waiting connection from caller");
1097 GST_OBJECT_LOCK (srtobject->element);
1098 while (!g_cancellable_is_cancelled (cancellable)) {
1099 ret = (srtobject->callers != NULL);
1102 g_cond_wait (&srtobject->sock_cond,
1103 GST_OBJECT_GET_LOCK (srtobject->element));
1105 GST_OBJECT_UNLOCK (srtobject->element);
1107 GST_DEBUG_OBJECT (srtobject->element, "got %s connection", ret ? "a" : "no");
1113 gst_srt_object_read (GstSRTObject * srtobject,
1114 guint8 * data, gsize size, GCancellable * cancellable, GError ** error)
1118 GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
1119 gint poll_id = SRT_ERROR;
1121 /* Only source element can read data */
1122 g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
1123 (srtobject->element)) == GST_URI_SRC, -1);
1125 gst_structure_get_enum (srtobject->parameters, "mode",
1126 GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
1128 if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
1131 if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
1134 GST_OBJECT_LOCK (srtobject->element);
1135 caller = srtobject->callers->data;
1136 if (srtobject->callers)
1137 poll_id = caller->poll_id;
1138 GST_OBJECT_UNLOCK (srtobject->element);
1139 if (poll_id == SRT_ERROR)
1142 poll_id = srtobject->poll_id;
1145 if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
1147 poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
1150 while (!g_cancellable_is_cancelled (cancellable)) {
1156 pollret = srt_epoll_wait (poll_id, &rsock,
1157 &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0);
1159 gint srt_errno = srt_getlasterror (NULL);
1161 if (srt_errno != SRT_ETIMEOUT) {
1168 GST_WARNING_OBJECT (srtobject->element,
1169 "abnormal SRT socket is detected");
1173 switch (srt_getsockstate (rsock)) {
1177 if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
1178 /* Caller has been disappeared. */
1181 GST_WARNING_OBJECT (srtobject->element,
1182 "Invalid SRT socket. Trying to reconnect");
1183 gst_srt_object_close (srtobject);
1184 if (!gst_srt_object_open (srtobject, cancellable, error)) {
1189 case SRTS_CONNECTED:
1198 len = srt_recvmsg (rsock, (char *) (data), size);
1206 gst_srt_object_wakeup (GstSRTObject * srtobject, GCancellable * cancellable)
1208 GST_DEBUG_OBJECT (srtobject->element, "waking up SRT");
1210 /* Removing all socket descriptors from the monitoring list
1211 * wakes up SRT's threads. We only have one to remove. */
1212 srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
1214 /* connection is only waited for in listener mode,
1215 * but there is no harm in raising signal in any case */
1216 GST_OBJECT_LOCK (srtobject->element);
1217 /* however, a race might be harmful ...
1218 * the cancellation is used as 'flushing' flag here,
1219 * so make sure it is so detected by the intended part at proper time */
1220 g_cancellable_cancel (cancellable);
1221 g_cond_signal (&srtobject->sock_cond);
1222 GST_OBJECT_UNLOCK (srtobject->element);
1226 gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
1227 gint poll_id, gint poll_timeout, GstBufferList * headers,
1228 GCancellable * cancellable)
1235 size = gst_buffer_list_length (headers);
1237 GST_DEBUG_OBJECT (srtobject->element, "Sending %u stream headers", size);
1239 for (i = 0; i < size; i++) {
1240 SRTSOCKET wsock = sock;
1243 GstBuffer *buffer = gst_buffer_list_get (headers, i);
1246 if (g_cancellable_is_cancelled (cancellable)) {
1250 if (poll_id > 0 && srt_epoll_wait (poll_id, 0, 0, &wsock,
1251 &wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
1255 GST_TRACE_OBJECT (srtobject->element, "sending header %u %" GST_PTR_FORMAT,
1258 if (!gst_buffer_map (buffer, &mapinfo, GST_MAP_READ)) {
1259 GST_ELEMENT_ERROR (srtobject->element, RESOURCE, READ,
1260 ("Could not map the input stream"), (NULL));
1264 if (srt_sendmsg2 (wsock, (char *) mapinfo.data, mapinfo.size,
1266 GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
1267 ("%s", srt_getlasterror_str ()));
1268 gst_buffer_unmap (buffer, &mapinfo);
1272 gst_buffer_unmap (buffer, &mapinfo);
1279 gst_srt_object_write_to_callers (GstSRTObject * srtobject,
1280 GstBufferList * headers,
1281 const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
1285 GST_OBJECT_LOCK (srtobject->element);
1286 callers = srtobject->callers;
1287 while (callers != NULL) {
1289 const guint8 *msg = mapinfo->data;
1291 gint payload_size, optlen = 1;
1293 SRTCaller *caller = callers->data;
1294 callers = callers->next;
1296 if (g_cancellable_is_cancelled (cancellable)) {
1297 GST_OBJECT_UNLOCK (srtobject->element);
1301 if (!caller->sent_headers) {
1302 if (!gst_srt_object_send_headers (srtobject, caller->sock, -1,
1303 -1, headers, cancellable)) {
1306 caller->sent_headers = TRUE;
1309 if (srt_getsockflag (caller->sock, SRTO_PAYLOADSIZE, &payload_size,
1311 GST_WARNING_OBJECT (srtobject->element, "%s", srt_getlasterror_str ());
1315 while (len < mapinfo->size) {
1316 gint rest = MIN (mapinfo->size - len, payload_size);
1317 sent = srt_sendmsg2 (caller->sock, (char *) (msg + len), rest, 0);
1327 srtobject->callers = g_list_remove (srtobject->callers, caller);
1328 srt_caller_invoke_removed_closure (caller, srtobject);
1329 srt_caller_free (caller);
1332 GST_OBJECT_UNLOCK (srtobject->element);
1334 return mapinfo->size;
1338 gst_srt_object_write_one (GstSRTObject * srtobject,
1339 GstBufferList * headers,
1340 const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
1344 const guint8 *msg = mapinfo->data;
1345 gint payload_size, optlen = 1;
1347 if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
1349 poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
1352 if (!srtobject->sent_headers) {
1353 if (!gst_srt_object_send_headers (srtobject, srtobject->sock,
1354 srtobject->poll_id, poll_timeout, headers, cancellable)) {
1357 srtobject->sent_headers = TRUE;
1360 while (len < mapinfo->size) {
1367 if (g_cancellable_is_cancelled (cancellable)) {
1371 if (srt_epoll_wait (srtobject->poll_id, 0, 0, &wsock,
1372 &wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
1376 if (srt_getsockflag (wsock, SRTO_PAYLOADSIZE, &payload_size, &optlen)) {
1377 GST_WARNING_OBJECT (srtobject->element, "%s", srt_getlasterror_str ());
1381 rest = MIN (mapinfo->size - len, payload_size);
1383 switch (srt_getsockstate (wsock)) {
1387 GST_WARNING_OBJECT (srtobject->element,
1388 "Invalid SRT socket. Trying to reconnect");
1389 gst_srt_object_close (srtobject);
1390 if (!gst_srt_object_open (srtobject, cancellable, error)) {
1394 case SRTS_CONNECTED:
1396 GST_LOG_OBJECT (srtobject->element, "good to go");
1399 GST_WARNING_OBJECT (srtobject->element, "not ready");
1404 sent = srt_sendmsg2 (wsock, (char *) (msg + len), rest, 0);
1406 GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
1407 ("%s", srt_getlasterror_str ()));
1417 gst_srt_object_write (GstSRTObject * srtobject,
1418 GstBufferList * headers,
1419 const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
1422 GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
1424 /* Only sink element can write data */
1425 g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
1426 (srtobject->element)) == GST_URI_SINK, -1);
1428 gst_structure_get_enum (srtobject->parameters, "mode",
1429 GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
1431 if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
1432 if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
1436 gst_srt_object_write_to_callers (srtobject, headers, mapinfo,
1437 cancellable, error);
1440 gst_srt_object_write_one (srtobject, headers, mapinfo, cancellable,
1447 static GstStructure *
1448 get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender)
1450 GstStructure *s = gst_structure_new_empty ("application/x-srt-statistics");
1452 SRT_TRACEBSTATS stats;
1454 ret = srt_bstats (srtsock, &stats, 0);
1458 gst_structure_set (s,
1459 /* number of sent data packets, including retransmissions */
1460 "packets-sent", G_TYPE_INT64, stats.pktSent,
1461 /* number of lost packets (sender side) */
1462 "packets-sent-lost", G_TYPE_INT, stats.pktSndLoss,
1463 /* number of retransmitted packets */
1464 "packets-retransmitted", G_TYPE_INT, stats.pktRetrans,
1465 /* number of received ACK packets */
1466 "packet-ack-received", G_TYPE_INT, stats.pktRecvACK,
1467 /* number of received NAK packets */
1468 "packet-nack-received", G_TYPE_INT, stats.pktRecvNAK,
1469 /* time duration when UDT is sending data (idle time exclusive) */
1470 "send-duration-us", G_TYPE_INT64, stats.usSndDuration,
1471 /* number of sent data bytes, including retransmissions */
1472 "bytes-sent", G_TYPE_UINT64, stats.byteSent,
1473 /* number of retransmitted bytes */
1474 "bytes-retransmitted", G_TYPE_UINT64, stats.byteRetrans,
1475 /* number of too-late-to-send dropped bytes */
1476 "bytes-sent-dropped", G_TYPE_UINT64, stats.byteSndDrop,
1477 /* number of too-late-to-send dropped packets */
1478 "packets-sent-dropped", G_TYPE_INT, stats.pktSndDrop,
1479 /* sending rate in Mb/s */
1480 "send-rate-mbps", G_TYPE_DOUBLE, stats.mbpsSendRate,
1481 /* busy sending time (i.e., idle time exclusive) */
1482 "send-duration-us", G_TYPE_UINT64, stats.usSndDuration,
1483 "negotiated-latency-ms", G_TYPE_INT, stats.msSndTsbPdDelay, NULL);
1485 gst_structure_set (s,
1486 "packets-received", G_TYPE_INT64, stats.pktRecvTotal,
1487 "packets-received-lost", G_TYPE_INT, stats.pktRcvLossTotal,
1488 /* number of sent ACK packets */
1489 "packet-ack-sent", G_TYPE_INT, stats.pktSentACK,
1490 /* number of sent NAK packets */
1491 "packet-nack-sent", G_TYPE_INT, stats.pktSentNAK,
1492 "bytes-received", G_TYPE_UINT64, stats.byteRecvTotal,
1493 "bytes-received-lost", G_TYPE_INT, stats.byteRcvLossTotal,
1494 "receive-rate-mbps", G_TYPE_DOUBLE, stats.mbpsRecvRate,
1495 "negotiated-latency-ms", G_TYPE_INT, stats.msRcvTsbPdDelay, NULL);
1497 gst_structure_set (s,
1498 /* estimated bandwidth, in Mb/s */
1499 "bandwidth-mbps", G_TYPE_DOUBLE, stats.mbpsBandwidth,
1500 "rtt-ms", G_TYPE_DOUBLE, stats.msRTT, NULL);
1508 gst_srt_object_get_stats (GstSRTObject * srtobject)
1510 GstStructure *s = NULL;
1511 gboolean is_sender = GST_IS_BASE_SINK (srtobject->element);
1513 GST_OBJECT_LOCK (srtobject->element);
1514 if (srtobject->sock != SRT_INVALID_SOCK) {
1515 s = get_stats_for_srtsock (srtobject->sock, is_sender);
1519 s = gst_structure_new_empty ("application/x-srt-statistics");
1521 if (srtobject->callers) {
1522 GValueArray *callers_stats = g_value_array_new (1);
1523 GValue callers_stats_v = G_VALUE_INIT;
1526 for (item = srtobject->callers; item; item = item->next) {
1527 SRTCaller *caller = item->data;
1528 GstStructure *tmp = get_stats_for_srtsock (caller->sock, is_sender);
1531 g_value_array_append (callers_stats, NULL);
1532 v = g_value_array_get_nth (callers_stats, callers_stats->n_values - 1);
1533 g_value_init (v, GST_TYPE_STRUCTURE);
1534 g_value_take_boxed (v, tmp);
1537 g_value_init (&callers_stats_v, G_TYPE_VALUE_ARRAY);
1538 g_value_take_boxed (&callers_stats_v, callers_stats);
1539 gst_structure_take_value (s, "callers", &callers_stats_v);
1543 GST_OBJECT_UNLOCK (srtobject->element);