26bfe0bdb088218317980ef8c319860500efb123
[platform/upstream/gstreamer.git] / ext / srt / gstsrtobject.c
1 /* GStreamer
2  * Copyright (C) 2018, Collabora Ltd.
3  * Copyright (C) 2018, SK Telecom, Co., Ltd.
4  *   Author: Jeongseok Kim <jeongseok.kim@sk.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 /* Needed for GValueArray */
27 #define GLIB_DISABLE_DEPRECATION_WARNINGS
28
29 #include "gstsrtobject.h"
30
31 #include <gst/base/gstbasesink.h>
32 #include <gio/gnetworking.h>
33 #include <stdlib.h>
34
35 GST_DEBUG_CATEGORY_EXTERN (gst_debug_srtobject);
36 #define GST_CAT_DEFAULT gst_debug_srtobject
37
38 enum
39 {
40   PROP_URI = 1,
41   PROP_MODE,
42   PROP_LOCALADDRESS,
43   PROP_LOCALPORT,
44   PROP_PASSPHRASE,
45   PROP_PBKEYLEN,
46   PROP_POLL_TIMEOUT,
47   PROP_LATENCY,
48   PROP_MSG_SIZE,
49   PROP_STATS,
50   PROP_LAST
51 };
52
53 typedef struct
54 {
55   SRTSOCKET sock;
56   gint poll_id;
57   GSocketAddress *sockaddr;
58   gboolean sent_headers;
59 } SRTCaller;
60
61 static SRTCaller *
62 srt_caller_new (void)
63 {
64   SRTCaller *caller = g_new0 (SRTCaller, 1);
65   caller->sock = SRT_INVALID_SOCK;
66   caller->poll_id = SRT_ERROR;
67   caller->sent_headers = FALSE;
68
69   return caller;
70 }
71
72 static void
73 srt_caller_free (SRTCaller * caller)
74 {
75   g_return_if_fail (caller != NULL);
76
77   g_clear_object (&caller->sockaddr);
78
79   if (caller->sock != SRT_INVALID_SOCK) {
80     srt_close (caller->sock);
81   }
82
83   if (caller->poll_id != SRT_ERROR) {
84     srt_epoll_release (caller->poll_id);
85   }
86
87   g_free (caller);
88 }
89
90 static void
91 srt_caller_invoke_removed_closure (SRTCaller * caller, GstSRTObject * srtobject)
92 {
93   GValue values[2] = { G_VALUE_INIT };
94
95   if (srtobject->caller_removed_closure == NULL) {
96     return;
97   }
98
99   g_value_init (&values[0], G_TYPE_INT);
100   g_value_set_int (&values[0], caller->sock);
101
102   g_value_init (&values[1], G_TYPE_SOCKET_ADDRESS);
103   g_value_set_object (&values[1], caller->sockaddr);
104
105   g_closure_invoke (srtobject->caller_removed_closure, NULL, 2, values, NULL);
106
107   g_value_unset (&values[0]);
108   g_value_unset (&values[1]);
109 }
110
111 struct srt_constant_params
112 {
113   const gchar *name;
114   gint param;
115   gint val;
116 };
117
118 static struct srt_constant_params srt_params[] = {
119   {"SRTO_SNDSYN", SRTO_SNDSYN, 0},      /* 0: non-blocking */
120   {"SRTO_RCVSYN", SRTO_RCVSYN, 0},      /* 0: non-blocking */
121   {"SRTO_LINGER", SRTO_LINGER, 0},
122   {"SRTO_TSBPMODE", SRTO_TSBPDMODE, 1}, /* Timestamp-based Packet Delivery mode must be enabled */
123   {NULL, -1, -1},
124 };
125
126 static gint srt_init_refcount = 0;
127
128 static gboolean
129 gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject,
130     GError ** error)
131 {
132   struct srt_constant_params *params = srt_params;
133
134   for (; params->name != NULL; params++) {
135     if (srt_setsockopt (sock, 0, params->param, &params->val, sizeof (gint))) {
136       g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
137           "failed to set %s (reason: %s)", params->name,
138           srt_getlasterror_str ());
139       return FALSE;
140     }
141   }
142
143   if (srtobject->passphrase != NULL && srtobject->passphrase[0] != '\0') {
144     gint pbkeylen;
145
146     if (srt_setsockopt (sock, 0, SRTO_PASSPHRASE, srtobject->passphrase,
147             strlen (srtobject->passphrase))) {
148       g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
149           "failed to set passphrase (reason: %s)", srt_getlasterror_str ());
150
151       return FALSE;
152     }
153
154     if (!gst_structure_get_int (srtobject->parameters, "pbkeylen", &pbkeylen)) {
155       pbkeylen = GST_SRT_DEFAULT_PBKEYLEN;
156     }
157
158     if (srt_setsockopt (sock, 0, SRTO_PBKEYLEN, &pbkeylen, sizeof (int))) {
159       g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
160           "failed to set pbkeylen (reason: %s)", srt_getlasterror_str ());
161       return FALSE;
162     }
163   }
164
165   {
166     int latency;
167
168     if (!gst_structure_get_int (srtobject->parameters, "latency", &latency))
169       latency = GST_SRT_DEFAULT_LATENCY;
170     if (srt_setsockopt (sock, 0, SRTO_LATENCY, &latency, sizeof (int))) {
171       g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
172           "failed to set latency (reason: %s)", srt_getlasterror_str ());
173       return FALSE;
174     }
175   }
176
177   if (gst_structure_has_field (srtobject->parameters, "streamid")) {
178     const gchar *streamid;
179
180     streamid = gst_structure_get_string (srtobject->parameters, "streamid");
181     if (streamid != NULL && streamid[0] != '\0') {
182       if (srt_setsockopt (sock, 0, SRTO_STREAMID, streamid, strlen (streamid))) {
183         g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
184             "failed to set stream ID (reason: %s)", srt_getlasterror_str ());
185       }
186     }
187   }
188
189   return TRUE;
190 }
191
192 GstSRTObject *
193 gst_srt_object_new (GstElement * element)
194 {
195   GstSRTObject *srtobject;
196
197   if (g_atomic_int_get (&srt_init_refcount) == 0) {
198     GST_DEBUG_OBJECT (element, "Starting up SRT");
199     if (srt_startup () != 0) {
200       g_warning ("Failed to initialize SRT (reason: %s)",
201           srt_getlasterror_str ());
202     }
203   }
204
205   g_atomic_int_inc (&srt_init_refcount);
206
207   srtobject = g_new0 (GstSRTObject, 1);
208   srtobject->element = element;
209   srtobject->parameters = gst_structure_new ("application/x-srt-params",
210       "poll-timeout", G_TYPE_INT, GST_SRT_DEFAULT_POLL_TIMEOUT,
211       "latency", G_TYPE_INT, GST_SRT_DEFAULT_LATENCY,
212       "mode", GST_TYPE_SRT_CONNECTION_MODE, GST_SRT_DEFAULT_MODE, NULL);
213
214   srtobject->sock = SRT_INVALID_SOCK;
215   srtobject->poll_id = srt_epoll_create ();
216   srtobject->listener_sock = SRT_INVALID_SOCK;
217   srtobject->listener_poll_id = SRT_ERROR;
218   srtobject->sent_headers = FALSE;
219
220   g_cond_init (&srtobject->sock_cond);
221   return srtobject;
222 }
223
224 void
225 gst_srt_object_destroy (GstSRTObject * srtobject)
226 {
227   g_return_if_fail (srtobject != NULL);
228
229   if (srtobject->poll_id != SRT_ERROR) {
230     srt_epoll_release (srtobject->poll_id);
231     srtobject->poll_id = SRT_ERROR;
232   }
233
234   g_cond_clear (&srtobject->sock_cond);
235
236   GST_DEBUG_OBJECT (srtobject->element, "Destroying srtobject");
237   gst_structure_free (srtobject->parameters);
238
239   g_free (srtobject->passphrase);
240
241   if (g_atomic_int_dec_and_test (&srt_init_refcount)) {
242     srt_cleanup ();
243     GST_DEBUG_OBJECT (srtobject->element, "Cleaning up SRT");
244   }
245
246   g_clear_pointer (&srtobject->uri, gst_uri_unref);
247
248   g_free (srtobject);
249 }
250
251 gboolean
252 gst_srt_object_set_property_helper (GstSRTObject * srtobject,
253     guint prop_id, const GValue * value, GParamSpec * pspec)
254 {
255   switch (prop_id) {
256     case PROP_URI:{
257       const gchar *uri = g_value_get_string (value);
258       gst_srt_object_set_uri (srtobject, uri, NULL);
259       break;
260     }
261     case PROP_MODE:
262       gst_structure_set_value (srtobject->parameters, "mode", value);
263       break;
264     case PROP_POLL_TIMEOUT:
265       gst_structure_set_value (srtobject->parameters, "poll-timeout", value);
266       break;
267     case PROP_LATENCY:
268       gst_structure_set_value (srtobject->parameters, "latency", value);
269       break;
270     case PROP_LOCALADDRESS:
271       gst_structure_set_value (srtobject->parameters, "localaddress", value);
272       break;
273     case PROP_LOCALPORT:
274       gst_structure_set_value (srtobject->parameters, "localport", value);
275       break;
276     case PROP_PASSPHRASE:
277       g_free (srtobject->passphrase);
278       srtobject->passphrase = g_value_dup_string (value);
279       break;
280     case PROP_PBKEYLEN:
281       gst_structure_set_value (srtobject->parameters, "pbkeylen", value);
282       break;
283     default:
284       return FALSE;
285   }
286   return TRUE;
287 }
288
289 gboolean
290 gst_srt_object_get_property_helper (GstSRTObject * srtobject,
291     guint prop_id, GValue * value, GParamSpec * pspec)
292 {
293   switch (prop_id) {
294     case PROP_URI:
295       g_value_take_string (value, gst_uri_to_string (srtobject->uri));
296       break;
297     case PROP_MODE:{
298       GstSRTConnectionMode v;
299       if (!gst_structure_get_enum (srtobject->parameters, "mode",
300               GST_TYPE_SRT_CONNECTION_MODE, (gint *) & v)) {
301         GST_WARNING_OBJECT (srtobject->element, "Failed to get 'mode'");
302         v = GST_SRT_CONNECTION_MODE_NONE;
303       }
304       g_value_set_enum (value, v);
305       break;
306     }
307     case PROP_LOCALADDRESS:
308       g_value_set_string (value,
309           gst_structure_get_string (srtobject->parameters, "localaddress"));
310       break;
311     case PROP_LOCALPORT:{
312       guint v;
313       if (!gst_structure_get_uint (srtobject->parameters, "localport", &v)) {
314         GST_WARNING_OBJECT (srtobject->element, "Failed to get 'localport'");
315         v = GST_SRT_DEFAULT_PORT;
316       }
317       g_value_set_uint (value, v);
318       break;
319     }
320     case PROP_PBKEYLEN:{
321       GstSRTKeyLength v;
322       if (!gst_structure_get_enum (srtobject->parameters, "pbkeylen",
323               GST_TYPE_SRT_KEY_LENGTH, (gint *) & v)) {
324         GST_WARNING_OBJECT (srtobject->element, "Failed to get 'pbkeylen'");
325         v = GST_SRT_KEY_LENGTH_NO_KEY;
326       }
327       g_value_set_enum (value, v);
328       break;
329     }
330     case PROP_POLL_TIMEOUT:{
331       gint v;
332       if (!gst_structure_get_int (srtobject->parameters, "poll-timeout", &v)) {
333         GST_WARNING_OBJECT (srtobject->element, "Failed to get 'poll-timeout'");
334         v = GST_SRT_DEFAULT_POLL_TIMEOUT;
335       }
336       g_value_set_int (value, v);
337       break;
338     }
339     case PROP_LATENCY:{
340       gint v;
341       if (!gst_structure_get_int (srtobject->parameters, "latency", &v)) {
342         GST_WARNING_OBJECT (srtobject->element, "Failed to get 'latency'");
343         v = GST_SRT_DEFAULT_LATENCY;
344       }
345       g_value_set_int (value, v);
346       break;
347     }
348     case PROP_STATS:
349       g_value_take_boxed (value, gst_srt_object_get_stats (srtobject));
350       break;
351     default:
352       return FALSE;
353   }
354
355   return TRUE;
356 }
357
358 void
359 gst_srt_object_install_properties_helper (GObjectClass * gobject_class)
360 {
361   /**
362    * GstSRTSrc:uri:
363    *
364    * The URI used by SRT connection. User can specify SRT specific options by URI parameters.
365    * Refer to <a href="https://github.com/Haivision/srt/blob/master/docs/stransmit.md#medium-srt">Mediun: SRT</a>
366    */
367   g_object_class_install_property (gobject_class, PROP_URI,
368       g_param_spec_string ("uri", "URI",
369           "URI in the form of srt://address:port", GST_SRT_DEFAULT_URI,
370           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
371           G_PARAM_STATIC_STRINGS));
372
373   /**
374    * GstSRTSrc:mode:
375    * 
376    * The SRT connection mode. 
377    * This property can be set by URI parameters.
378    */
379   g_object_class_install_property (gobject_class, PROP_MODE,
380       g_param_spec_enum ("mode", "Connection mode",
381           "SRT connection mode", GST_TYPE_SRT_CONNECTION_MODE,
382           GST_SRT_CONNECTION_MODE_CALLER,
383           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
384           G_PARAM_STATIC_STRINGS));
385
386   /**
387    * GstSRTSrc:localaddress:
388    * 
389    * The address to bind when #GstSRTSrc:mode is listener or rendezvous.
390    * This property can be set by URI parameters.
391    */
392   g_object_class_install_property (gobject_class, PROP_LOCALADDRESS,
393       g_param_spec_string ("localaddress", "Local address",
394           "Local address to bind", GST_SRT_DEFAULT_LOCALADDRESS,
395           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
396           G_PARAM_STATIC_STRINGS));
397
398   /**
399    * GstSRTSrc:localport:
400    *
401    * The local port to bind when #GstSRTSrc:mode is listener or rendezvous.
402    * This property can be set by URI parameters.
403    */
404   g_object_class_install_property (gobject_class, PROP_LOCALPORT,
405       g_param_spec_uint ("localport", "Local port",
406           "Local port to bind", 0,
407           65535, GST_SRT_DEFAULT_PORT,
408           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
409           G_PARAM_STATIC_STRINGS));
410
411   /**
412    * GstSRTSrc:passphrase:
413    *
414    * The password for the encrypted transmission.
415    * This property can be set by URI parameters.
416    */
417   g_object_class_install_property (gobject_class, PROP_PASSPHRASE,
418       g_param_spec_string ("passphrase", "Passphrase",
419           "Password for the encrypted transmission", "",
420           G_PARAM_WRITABLE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS));
421
422   /**
423    * GstSRTSrc:pbkeylen:
424    * 
425    * The crypto key length.
426    * This property can be set by URI parameters.
427    */
428   g_object_class_install_property (gobject_class, PROP_PBKEYLEN,
429       g_param_spec_enum ("pbkeylen", "Crypto key length",
430           "Crypto key length in bytes", GST_TYPE_SRT_KEY_LENGTH,
431           GST_SRT_DEFAULT_PBKEYLEN,
432           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
433           G_PARAM_STATIC_STRINGS));
434
435   /**
436    * GstSRTSrc:poll-timeout:
437    * 
438    * The polling timeout used when srt poll is started.
439    * Even if the default value indicates infinite waiting, it can be cancellable according to #GstState
440    * This property can be set by URI parameters.
441    */
442   g_object_class_install_property (gobject_class, PROP_POLL_TIMEOUT,
443       g_param_spec_int ("poll-timeout", "Poll timeout",
444           "Return poll wait after timeout milliseconds (-1 = infinite)", -1,
445           G_MAXINT32, GST_SRT_DEFAULT_POLL_TIMEOUT,
446           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
447           G_PARAM_STATIC_STRINGS));
448
449   /**
450    * GstSRTSrc:latency:
451    *
452    * The maximum accepted transmission latency.
453    */
454   g_object_class_install_property (gobject_class, PROP_LATENCY,
455       g_param_spec_int ("latency", "latency",
456           "Minimum latency (milliseconds)", 0,
457           G_MAXINT32, GST_SRT_DEFAULT_LATENCY,
458           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
459
460   /**
461    * GstSRTSrc:stats:
462    *
463    * The statistics from SRT.
464    */
465   g_object_class_install_property (gobject_class, PROP_STATS,
466       g_param_spec_boxed ("stats", "Statistics",
467           "SRT Statistics", GST_TYPE_STRUCTURE,
468           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
469
470 }
471
472 static void
473 gst_srt_object_set_enum_value (GstStructure * s, GType enum_type,
474     gconstpointer key, gconstpointer value)
475 {
476   GEnumClass *enum_class;
477   GEnumValue *enum_value;
478
479   enum_class = g_type_class_ref (enum_type);
480   enum_value = g_enum_get_value_by_nick (enum_class, value);
481
482   if (enum_value) {
483     GValue v = G_VALUE_INIT;
484     g_value_init (&v, enum_type);
485     g_value_set_enum (&v, enum_value->value);
486     gst_structure_set_value (s, key, &v);
487   }
488
489   g_type_class_unref (enum_class);
490 }
491
492 static void
493 gst_srt_object_set_string_value (GstStructure * s, const gchar * key,
494     const gchar * value)
495 {
496   GValue v = G_VALUE_INIT;
497   g_value_init (&v, G_TYPE_STRING);
498   g_value_set_static_string (&v, value);
499   gst_structure_set_value (s, key, &v);
500   g_value_unset (&v);
501 }
502
503 static void
504 gst_srt_object_set_uint_value (GstStructure * s, const gchar * key,
505     const gchar * value)
506 {
507   GValue v = G_VALUE_INIT;
508   g_value_init (&v, G_TYPE_UINT);
509   g_value_set_uint (&v, (guint) strtoul (value, NULL, 10));
510   gst_structure_set_value (s, key, &v);
511   g_value_unset (&v);
512 }
513
514 static void
515 gst_srt_object_validate_parameters (GstStructure * s, GstUri * uri)
516 {
517   GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
518
519   gst_structure_get_enum (s, "mode", GST_TYPE_SRT_CONNECTION_MODE,
520       (gint *) & connection_mode);
521
522   if (connection_mode == GST_SRT_CONNECTION_MODE_RENDEZVOUS ||
523       connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
524     guint local_port;
525     const gchar *local_address = gst_structure_get_string (s, "localaddress");
526
527     if (local_address == NULL) {
528       local_address =
529           gst_uri_get_host (uri) ==
530           NULL ? GST_SRT_DEFAULT_LOCALADDRESS : gst_uri_get_host (uri);
531       gst_srt_object_set_string_value (s, "localaddress", local_address);
532     }
533
534     if (!gst_structure_get_uint (s, "localport", &local_port)) {
535       local_port =
536           gst_uri_get_port (uri) ==
537           GST_URI_NO_PORT ? GST_SRT_DEFAULT_PORT : gst_uri_get_port (uri);
538       gst_structure_set (s, "localport", G_TYPE_UINT, local_port, NULL);
539     }
540   }
541 }
542
543 gboolean
544 gst_srt_object_set_uri (GstSRTObject * srtobject, const gchar * uri,
545     GError ** err)
546 {
547   GHashTable *query_table = NULL;
548   GHashTableIter iter;
549   gpointer key, value;
550   const char *addr_str;
551
552   if (srtobject->opened) {
553     g_warning
554         ("It's not supported to change the 'uri' property when SRT socket is opened.");
555     g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
556         "It's not supported to change the 'uri' property when SRT socket is opened");
557
558     return FALSE;
559   }
560
561   if (!g_str_has_prefix (uri, GST_SRT_DEFAULT_URI_SCHEME)) {
562     g_warning ("Given uri cannot be used for SRT connection.");
563     g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
564         "Invalid SRT URI scheme");
565     return FALSE;
566   }
567
568   g_clear_pointer (&srtobject->uri, gst_uri_unref);
569   srtobject->uri = gst_uri_from_string (uri);
570
571   query_table = gst_uri_get_query_table (srtobject->uri);
572
573   GST_DEBUG_OBJECT (srtobject->element,
574       "set uri to (host: %s, port: %d) with %d query strings",
575       gst_uri_get_host (srtobject->uri), gst_uri_get_port (srtobject->uri),
576       query_table == NULL ? 0 : g_hash_table_size (query_table));
577
578   addr_str = gst_uri_get_host (srtobject->uri);
579   if (addr_str)
580     gst_srt_object_set_enum_value (srtobject->parameters,
581         GST_TYPE_SRT_CONNECTION_MODE, "mode", "caller");
582   else
583     gst_srt_object_set_enum_value (srtobject->parameters,
584         GST_TYPE_SRT_CONNECTION_MODE, "mode", "listener");
585
586   if (query_table) {
587     g_hash_table_iter_init (&iter, query_table);
588     while (g_hash_table_iter_next (&iter, &key, &value)) {
589       if (!g_strcmp0 ("mode", key)) {
590         gst_srt_object_set_enum_value (srtobject->parameters,
591             GST_TYPE_SRT_CONNECTION_MODE, key, value);
592       } else if (!g_strcmp0 ("localaddress", key)) {
593         gst_srt_object_set_string_value (srtobject->parameters, key, value);
594       } else if (!g_strcmp0 ("localport", key)) {
595         gst_srt_object_set_uint_value (srtobject->parameters, key, value);
596       } else if (!g_strcmp0 ("passphrase", key)) {
597         g_free (srtobject->passphrase);
598         srtobject->passphrase = g_strdup (value);
599       } else if (!g_strcmp0 ("pbkeylen", key)) {
600         gst_srt_object_set_enum_value (srtobject->parameters,
601             GST_TYPE_SRT_KEY_LENGTH, key, value);
602       } else if (!g_strcmp0 ("streamid", key)) {
603         gst_srt_object_set_string_value (srtobject->parameters, key, value);
604       }
605     }
606
607     g_hash_table_unref (query_table);
608   }
609
610   gst_srt_object_validate_parameters (srtobject->parameters, srtobject->uri);
611
612   return TRUE;
613 }
614
615 static gpointer
616 thread_func (gpointer data)
617 {
618   GstSRTObject *srtobject = data;
619   SRTSOCKET caller_sock;
620   union
621   {
622     struct sockaddr_storage ss;
623     struct sockaddr sa;
624   } caller_sa;
625   int caller_sa_len;
626
627   gint poll_timeout;
628
629   SRTSOCKET rsock;
630   gint rsocklen = 1;
631
632   for (;;) {
633     if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
634             &poll_timeout)) {
635       poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
636     }
637
638     GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
639
640     if (srt_epoll_wait (srtobject->listener_poll_id, &rsock,
641             &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) {
642       gint srt_errno = srt_getlasterror (NULL);
643
644       if (srtobject->listener_poll_id == SRT_ERROR)
645         return NULL;
646       if (srt_errno == SRT_ETIMEOUT) {
647         continue;
648       } else {
649         GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
650             ("abort polling: %s", srt_getlasterror_str ()), (NULL));
651         return NULL;
652       }
653     }
654
655     caller_sock =
656         srt_accept (srtobject->listener_sock, &caller_sa.sa, &caller_sa_len);
657
658     if (caller_sock != SRT_INVALID_SOCK) {
659       SRTCaller *caller;
660       gint flag = SRT_EPOLL_ERR;
661
662       caller = srt_caller_new ();
663       caller->sockaddr =
664           g_socket_address_new_from_native (&caller_sa.sa, caller_sa_len);
665       caller->poll_id = srt_epoll_create ();
666       caller->sock = caller_sock;
667
668       if (gst_uri_handler_get_uri_type (GST_URI_HANDLER
669               (srtobject->element)) == GST_URI_SRC) {
670         flag |= SRT_EPOLL_IN;
671       } else {
672         flag |= SRT_EPOLL_OUT;
673       }
674
675       if (srt_epoll_add_usock (caller->poll_id, caller_sock, &flag)) {
676
677         GST_ELEMENT_ERROR (srtobject->element, RESOURCE, SETTINGS,
678             ("%s", srt_getlasterror_str ()), (NULL));
679
680         srt_caller_free (caller);
681
682         /* try-again */
683         continue;
684       }
685
686       GST_OBJECT_LOCK (srtobject->element);
687       srtobject->callers = g_list_append (srtobject->callers, caller);
688       g_cond_signal (&srtobject->sock_cond);
689       GST_OBJECT_UNLOCK (srtobject->element);
690
691       /* notifying caller-added */
692       if (srtobject->caller_added_closure != NULL) {
693         GValue values[2] = { G_VALUE_INIT, G_VALUE_INIT };
694
695         g_value_init (&values[0], G_TYPE_INT);
696         g_value_set_int (&values[0], caller->sock);
697
698         g_value_init (&values[1], G_TYPE_SOCKET_ADDRESS);
699         g_value_set_object (&values[1], caller->sockaddr);
700
701         g_closure_invoke (srtobject->caller_added_closure, NULL, 2, values,
702             NULL);
703
704         g_value_unset (&values[1]);
705       }
706
707       GST_DEBUG_OBJECT (srtobject->element, "Accept to connect");
708
709       if (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) ==
710           GST_URI_SRC)
711         return NULL;
712     }
713   }
714 }
715
716 static gboolean
717 gst_srt_object_wait_connect (GstSRTObject * srtobject,
718     GCancellable * cancellable, gpointer sa, size_t sa_len, GError ** error)
719 {
720   SRTSOCKET sock = SRT_INVALID_SOCK;
721   const gchar *local_address = NULL;
722   guint local_port = 0;
723   gint sock_flags = SRT_EPOLL_ERR | SRT_EPOLL_IN;
724
725   gpointer bind_sa;
726   gsize bind_sa_len;
727   GSocketAddress *bind_addr;
728
729   gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
730
731   local_address =
732       gst_structure_get_string (srtobject->parameters, "localaddress");
733   if (local_address == NULL)
734     local_address = GST_SRT_DEFAULT_LOCALADDRESS;
735
736   bind_addr = g_inet_socket_address_new_from_string (local_address, local_port);
737   bind_sa_len = g_socket_address_get_native_size (bind_addr);
738   bind_sa = g_alloca (bind_sa_len);
739
740   if (!g_socket_address_to_native (bind_addr, bind_sa, bind_sa_len, error)) {
741     goto failed;
742   }
743
744   g_clear_object (&bind_addr);
745
746   sock = srt_socket (AF_INET, SOCK_DGRAM, 0);
747   if (sock == SRT_INVALID_SOCK) {
748     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_INIT, "%s",
749         srt_getlasterror_str ());
750     goto failed;
751   }
752
753   if (!gst_srt_object_set_common_params (sock, srtobject, error)) {
754     goto failed;
755   }
756
757   GST_DEBUG_OBJECT (srtobject->element, "Binding to %s (port: %d)",
758       local_address, local_port);
759
760   if (srt_bind (sock, bind_sa, bind_sa_len) == SRT_ERROR) {
761     g_set_error (error, GST_RESOURCE_ERROR,
762         GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot bind to %s:%d - %s",
763         local_address, local_port, srt_getlasterror_str ());
764     goto failed;
765   }
766
767   if (srt_epoll_add_usock (srtobject->listener_poll_id, sock, &sock_flags)) {
768     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
769         srt_getlasterror_str ());
770     goto failed;
771   }
772
773   GST_DEBUG_OBJECT (srtobject->element, "Starting to listen on bind socket");
774   if (srt_listen (sock, 1) == SRT_ERROR) {
775     g_set_error (error, GST_RESOURCE_ERROR,
776         GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot listen on bind socket: %s",
777         srt_getlasterror_str ());
778
779     goto failed;
780   }
781
782   srtobject->listener_sock = sock;
783
784   srtobject->thread =
785       g_thread_try_new ("GstSRTObjectListener", thread_func, srtobject, error);
786
787   if (*error != NULL) {
788     goto failed;
789   }
790
791   return TRUE;
792
793 failed:
794
795   if (srtobject->listener_poll_id != SRT_ERROR) {
796     srt_epoll_release (srtobject->listener_poll_id);
797   }
798
799   if (sock != SRT_INVALID_SOCK) {
800     srt_close (sock);
801   }
802
803   g_clear_object (&bind_addr);
804
805   srtobject->listener_poll_id = SRT_ERROR;
806   srtobject->listener_sock = SRT_INVALID_SOCK;
807
808   return FALSE;
809 }
810
811 static gboolean
812 gst_srt_object_connect (GstSRTObject * srtobject,
813     GstSRTConnectionMode connection_mode, gpointer sa, size_t sa_len,
814     GError ** error)
815 {
816   SRTSOCKET sock;
817   gint option_val = -1;
818   gint sock_flags = SRT_EPOLL_ERR;
819   guint local_port = 0;
820   const gchar *local_address = NULL;
821
822   sock = srt_socket (AF_INET, SOCK_DGRAM, 0);
823   if (sock == SRT_INVALID_SOCK) {
824     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_INIT, "%s",
825         srt_getlasterror_str ());
826     goto failed;
827   }
828
829   if (!gst_srt_object_set_common_params (sock, srtobject, error)) {
830     goto failed;
831   }
832
833   switch (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element))) {
834     case GST_URI_SRC:
835       option_val = 0;
836       sock_flags |= SRT_EPOLL_IN;
837       break;
838     case GST_URI_SINK:
839       option_val = 1;
840       sock_flags |= SRT_EPOLL_OUT;
841       break;
842     default:
843       g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
844           "Cannot determine stream direction");
845       goto failed;
846   }
847
848   if (srt_setsockopt (sock, 0, SRTO_SENDER, &option_val, sizeof (gint))) {
849     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
850         srt_getlasterror_str ());
851     goto failed;
852   }
853
854   option_val = (connection_mode == GST_SRT_CONNECTION_MODE_RENDEZVOUS);
855   if (srt_setsockopt (sock, 0, SRTO_RENDEZVOUS, &option_val, sizeof (gint))) {
856     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
857         srt_getlasterror_str ());
858     goto failed;
859   }
860
861   gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
862   local_address =
863       gst_structure_get_string (srtobject->parameters, "localaddress");
864   /* According to SRT norm, bind local address and port if specified */
865   if (local_address != NULL && local_port != 0) {
866     gpointer bind_sa;
867     gsize bind_sa_len;
868
869     GSocketAddress *bind_addr =
870         g_inet_socket_address_new_from_string (local_address,
871         local_port);
872
873     bind_sa_len = g_socket_address_get_native_size (bind_addr);
874     bind_sa = g_alloca (bind_sa_len);
875
876     if (!g_socket_address_to_native (bind_addr, bind_sa, bind_sa_len, error)) {
877       g_clear_object (&bind_addr);
878       goto failed;
879     }
880
881     g_clear_object (&bind_addr);
882
883     GST_DEBUG_OBJECT (srtobject->element, "Binding to %s (port: %d)",
884         local_address, local_port);
885
886     if (srt_bind (sock, bind_sa, bind_sa_len) == SRT_ERROR) {
887       g_set_error (error, GST_RESOURCE_ERROR,
888           GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot bind to %s:%d - %s",
889           local_address, local_port, srt_getlasterror_str ());
890       goto failed;
891     }
892   }
893
894   if (srt_epoll_add_usock (srtobject->poll_id, sock, &sock_flags)) {
895     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
896         srt_getlasterror_str ());
897     goto failed;
898   }
899
900   if (srt_connect (sock, sa, sa_len) == SRT_ERROR) {
901     g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ, "%s",
902         srt_getlasterror_str ());
903     goto failed;
904   }
905
906   srtobject->sock = sock;
907
908   return TRUE;
909
910 failed:
911
912   if (srtobject->poll_id != SRT_ERROR) {
913     srt_epoll_release (srtobject->poll_id);
914   }
915
916   if (sock != SRT_INVALID_SOCK) {
917     srt_close (sock);
918   }
919
920   srtobject->poll_id = SRT_ERROR;
921   srtobject->sock = SRT_INVALID_SOCK;
922
923   return FALSE;
924 }
925
926 static gboolean
927 gst_srt_object_open_connection (GstSRTObject * srtobject,
928     GCancellable * cancellable, GstSRTConnectionMode connection_mode,
929     gpointer sa, size_t sa_len, GError ** error)
930 {
931   gboolean ret = FALSE;
932
933   if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
934     ret =
935         gst_srt_object_wait_connect (srtobject, cancellable, sa, sa_len, error);
936   } else {
937     ret =
938         gst_srt_object_connect (srtobject, connection_mode, sa, sa_len, error);
939   }
940
941   return ret;
942 }
943
944 gboolean
945 gst_srt_object_open (GstSRTObject * srtobject, GCancellable * cancellable,
946     GError ** error)
947 {
948   return gst_srt_object_open_full (srtobject, NULL, NULL, cancellable, error);
949 }
950
951 gboolean
952 gst_srt_object_open_full (GstSRTObject * srtobject,
953     GstSRTObjectCallerAdded caller_added_func,
954     GstSRTObjectCallerRemoved caller_removed_func,
955     GCancellable * cancellable, GError ** error)
956 {
957   GSocketAddress *socket_address = NULL;
958   GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
959
960   gpointer sa;
961   size_t sa_len;
962   const gchar *addr_str;
963
964   srtobject->opened = FALSE;
965
966   if (caller_added_func != NULL) {
967     srtobject->caller_added_closure =
968         g_cclosure_new (G_CALLBACK (caller_added_func), srtobject, NULL);
969     g_closure_set_marshal (srtobject->caller_added_closure,
970         g_cclosure_marshal_generic);
971   }
972
973   if (caller_removed_func != NULL) {
974     srtobject->caller_removed_closure =
975         g_cclosure_new (G_CALLBACK (caller_removed_func), srtobject, NULL);
976     g_closure_set_marshal (srtobject->caller_removed_closure,
977         g_cclosure_marshal_generic);
978   }
979
980   addr_str = gst_uri_get_host (srtobject->uri);
981
982   if (addr_str == NULL) {
983     addr_str = GST_SRT_DEFAULT_LOCALADDRESS;
984     GST_DEBUG_OBJECT (srtobject->element,
985         "Given uri doesn't have hostname or address. Use any (%s) and"
986         " setting listener mode", addr_str);
987   }
988
989   socket_address =
990       g_inet_socket_address_new_from_string (addr_str,
991       gst_uri_get_port (srtobject->uri));
992
993   if (socket_address == NULL) {
994     g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ,
995         "Invalid host");
996     goto out;
997   }
998
999   /* FIXME: Unfortunately, SRT doesn't support IPv6 currently. */
1000   if (g_socket_address_get_family (socket_address) != G_SOCKET_FAMILY_IPV4) {
1001     g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ,
1002         "SRT supports IPv4 only");
1003     goto out;
1004   }
1005
1006   sa_len = g_socket_address_get_native_size (socket_address);
1007   sa = g_alloca (sa_len);
1008
1009   if (!g_socket_address_to_native (socket_address, sa, sa_len, error)) {
1010     goto out;
1011   }
1012
1013   GST_DEBUG_OBJECT (srtobject->element,
1014       "Opening SRT socket with parameters: %" GST_PTR_FORMAT,
1015       srtobject->parameters);
1016
1017   if (!gst_structure_get_enum (srtobject->parameters,
1018           "mode", GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode)) {
1019     GST_WARNING_OBJECT (srtobject->element,
1020         "Cannot get connection mode information." " Use default mode");
1021     connection_mode = GST_TYPE_SRT_CONNECTION_MODE;
1022   }
1023
1024   srtobject->listener_poll_id = srt_epoll_create ();
1025
1026   srtobject->opened =
1027       gst_srt_object_open_connection
1028       (srtobject, cancellable, connection_mode, sa, sa_len, error);
1029
1030 out:
1031   g_clear_object (&socket_address);
1032
1033   return srtobject->opened;
1034 }
1035
1036 void
1037 gst_srt_object_close (GstSRTObject * srtobject)
1038 {
1039   GST_OBJECT_LOCK (srtobject->element);
1040   if (srtobject->poll_id != SRT_ERROR) {
1041     srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
1042   }
1043
1044   if (srtobject->sock != SRT_INVALID_SOCK) {
1045
1046     GST_DEBUG_OBJECT (srtobject->element, "Closing SRT socket (0x%x)",
1047         srtobject->sock);
1048
1049     srt_close (srtobject->sock);
1050     srtobject->sock = SRT_INVALID_SOCK;
1051   }
1052
1053   if (srtobject->listener_poll_id != SRT_ERROR) {
1054     srt_epoll_remove_usock (srtobject->listener_poll_id,
1055         srtobject->listener_sock);
1056     srtobject->listener_poll_id = SRT_ERROR;
1057   }
1058   if (srtobject->thread) {
1059     GThread *thread = g_steal_pointer (&srtobject->thread);
1060     GST_OBJECT_UNLOCK (srtobject->element);
1061     g_thread_join (thread);
1062     GST_OBJECT_LOCK (srtobject->element);
1063   }
1064
1065   if (srtobject->listener_sock != SRT_INVALID_SOCK) {
1066     GST_DEBUG_OBJECT (srtobject->element, "Closing SRT listener socket (0x%x)",
1067         srtobject->listener_sock);
1068
1069     srt_close (srtobject->listener_sock);
1070     srtobject->listener_sock = SRT_INVALID_SOCK;
1071   }
1072
1073   if (srtobject->callers) {
1074     GList *callers = g_steal_pointer (&srtobject->callers);
1075     GST_OBJECT_UNLOCK (srtobject->element);
1076     g_list_foreach (callers, (GFunc) srt_caller_invoke_removed_closure,
1077         srtobject);
1078     GST_OBJECT_LOCK (srtobject->element);
1079     g_list_free_full (callers, (GDestroyNotify) srt_caller_free);
1080   }
1081
1082   g_clear_pointer (&srtobject->caller_added_closure, g_closure_unref);
1083   g_clear_pointer (&srtobject->caller_removed_closure, g_closure_unref);
1084
1085   srtobject->opened = FALSE;
1086   GST_OBJECT_UNLOCK (srtobject->element);
1087 }
1088
1089 static gboolean
1090 gst_srt_object_wait_caller (GstSRTObject * srtobject,
1091     GCancellable * cancellable, GError ** errorj)
1092 {
1093   gboolean ret = FALSE;
1094
1095   GST_DEBUG_OBJECT (srtobject->element, "Waiting connection from caller");
1096
1097   GST_OBJECT_LOCK (srtobject->element);
1098   while (!g_cancellable_is_cancelled (cancellable)) {
1099     ret = (srtobject->callers != NULL);
1100     if (ret)
1101       break;
1102     g_cond_wait (&srtobject->sock_cond,
1103         GST_OBJECT_GET_LOCK (srtobject->element));
1104   }
1105   GST_OBJECT_UNLOCK (srtobject->element);
1106
1107   GST_DEBUG_OBJECT (srtobject->element, "got %s connection", ret ? "a" : "no");
1108
1109   return ret;
1110 }
1111
1112 gssize
1113 gst_srt_object_read (GstSRTObject * srtobject,
1114     guint8 * data, gsize size, GCancellable * cancellable, GError ** error)
1115 {
1116   gssize len = 0;
1117   gint poll_timeout;
1118   GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
1119   gint poll_id = SRT_ERROR;
1120
1121   /* Only source element can read data */
1122   g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
1123           (srtobject->element)) == GST_URI_SRC, -1);
1124
1125   gst_structure_get_enum (srtobject->parameters, "mode",
1126       GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
1127
1128   if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
1129     SRTCaller *caller;
1130
1131     if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
1132       return -1;
1133
1134     GST_OBJECT_LOCK (srtobject->element);
1135     caller = srtobject->callers->data;
1136     if (srtobject->callers)
1137       poll_id = caller->poll_id;
1138     GST_OBJECT_UNLOCK (srtobject->element);
1139     if (poll_id == SRT_ERROR)
1140       return 0;
1141   } else {
1142     poll_id = srtobject->poll_id;
1143   }
1144
1145   if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
1146           &poll_timeout)) {
1147     poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
1148   }
1149
1150   while (!g_cancellable_is_cancelled (cancellable)) {
1151
1152     SRTSOCKET rsock;
1153     gint rsocklen = 1;
1154     int pollret;
1155
1156     pollret = srt_epoll_wait (poll_id, &rsock,
1157         &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0);
1158     if (pollret < 0) {
1159       gint srt_errno = srt_getlasterror (NULL);
1160
1161       if (srt_errno != SRT_ETIMEOUT) {
1162         return 0;
1163       }
1164       continue;
1165     }
1166
1167     if (rsocklen < 0) {
1168       GST_WARNING_OBJECT (srtobject->element,
1169           "abnormal SRT socket is detected");
1170       srt_close (rsock);
1171     }
1172
1173     switch (srt_getsockstate (rsock)) {
1174       case SRTS_BROKEN:
1175       case SRTS_NONEXIST:
1176       case SRTS_CLOSED:
1177         if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
1178           /* Caller has been disappeared. */
1179           return 0;
1180         } else {
1181           GST_WARNING_OBJECT (srtobject->element,
1182               "Invalid SRT socket. Trying to reconnect");
1183           gst_srt_object_close (srtobject);
1184           if (!gst_srt_object_open (srtobject, cancellable, error)) {
1185             return -1;
1186           }
1187           continue;
1188         }
1189       case SRTS_CONNECTED:
1190         /* good to go */
1191         break;
1192       default:
1193         /* not-ready */
1194         continue;
1195     }
1196
1197
1198     len = srt_recvmsg (rsock, (char *) (data), size);
1199     break;
1200   }
1201
1202   return len;
1203 }
1204
1205 void
1206 gst_srt_object_wakeup (GstSRTObject * srtobject, GCancellable * cancellable)
1207 {
1208   GST_DEBUG_OBJECT (srtobject->element, "waking up SRT");
1209
1210   /* Removing all socket descriptors from the monitoring list
1211    * wakes up SRT's threads. We only have one to remove. */
1212   srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
1213
1214   /* connection is only waited for in listener mode,
1215    * but there is no harm in raising signal in any case */
1216   GST_OBJECT_LOCK (srtobject->element);
1217   /* however, a race might be harmful ...
1218    * the cancellation is used as 'flushing' flag here,
1219    * so make sure it is so detected by the intended part at proper time */
1220   g_cancellable_cancel (cancellable);
1221   g_cond_signal (&srtobject->sock_cond);
1222   GST_OBJECT_UNLOCK (srtobject->element);
1223 }
1224
1225 static gboolean
1226 gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
1227     gint poll_id, gint poll_timeout, GstBufferList * headers,
1228     GCancellable * cancellable)
1229 {
1230   guint size, i;
1231
1232   if (!headers)
1233     return TRUE;
1234
1235   size = gst_buffer_list_length (headers);
1236
1237   GST_DEBUG_OBJECT (srtobject->element, "Sending %u stream headers", size);
1238
1239   for (i = 0; i < size; i++) {
1240     SRTSOCKET wsock = sock;
1241     gint wsocklen = 1;
1242
1243     GstBuffer *buffer = gst_buffer_list_get (headers, i);
1244     GstMapInfo mapinfo;
1245
1246     if (g_cancellable_is_cancelled (cancellable)) {
1247       return FALSE;
1248     }
1249
1250     if (poll_id > 0 && srt_epoll_wait (poll_id, 0, 0, &wsock,
1251             &wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
1252       continue;
1253     }
1254
1255     GST_TRACE_OBJECT (srtobject->element, "sending header %u %" GST_PTR_FORMAT,
1256         i, buffer);
1257
1258     if (!gst_buffer_map (buffer, &mapinfo, GST_MAP_READ)) {
1259       GST_ELEMENT_ERROR (srtobject->element, RESOURCE, READ,
1260           ("Could not map the input stream"), (NULL));
1261       return FALSE;
1262     }
1263
1264     if (srt_sendmsg2 (wsock, (char *) mapinfo.data, mapinfo.size,
1265             0) == SRT_ERROR) {
1266       GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
1267           ("%s", srt_getlasterror_str ()));
1268       gst_buffer_unmap (buffer, &mapinfo);
1269       return FALSE;
1270     }
1271
1272     gst_buffer_unmap (buffer, &mapinfo);
1273   }
1274
1275   return TRUE;
1276 }
1277
1278 static gssize
1279 gst_srt_object_write_to_callers (GstSRTObject * srtobject,
1280     GstBufferList * headers,
1281     const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
1282 {
1283   GList *callers;
1284
1285   GST_OBJECT_LOCK (srtobject->element);
1286   callers = srtobject->callers;
1287   while (callers != NULL) {
1288     gssize len = 0;
1289     const guint8 *msg = mapinfo->data;
1290     gint sent;
1291     gint payload_size, optlen = 1;
1292
1293     SRTCaller *caller = callers->data;
1294     callers = callers->next;
1295
1296     if (g_cancellable_is_cancelled (cancellable)) {
1297       GST_OBJECT_UNLOCK (srtobject->element);
1298       return -1;
1299     }
1300
1301     if (!caller->sent_headers) {
1302       if (!gst_srt_object_send_headers (srtobject, caller->sock, -1,
1303               -1, headers, cancellable)) {
1304         goto err;
1305       }
1306       caller->sent_headers = TRUE;
1307     }
1308
1309     if (srt_getsockflag (caller->sock, SRTO_PAYLOADSIZE, &payload_size,
1310             &optlen)) {
1311       GST_WARNING_OBJECT (srtobject->element, "%s", srt_getlasterror_str ());
1312       goto err;
1313     }
1314
1315     while (len < mapinfo->size) {
1316       gint rest = MIN (mapinfo->size - len, payload_size);
1317       sent = srt_sendmsg2 (caller->sock, (char *) (msg + len), rest, 0);
1318       if (sent < 0) {
1319         goto err;
1320       }
1321       len += sent;
1322     }
1323
1324     continue;
1325
1326   err:
1327     srtobject->callers = g_list_remove (srtobject->callers, caller);
1328     srt_caller_invoke_removed_closure (caller, srtobject);
1329     srt_caller_free (caller);
1330   }
1331
1332   GST_OBJECT_UNLOCK (srtobject->element);
1333
1334   return mapinfo->size;
1335 }
1336
1337 static gssize
1338 gst_srt_object_write_one (GstSRTObject * srtobject,
1339     GstBufferList * headers,
1340     const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
1341 {
1342   gssize len = 0;
1343   gint poll_timeout;
1344   const guint8 *msg = mapinfo->data;
1345   gint payload_size, optlen = 1;
1346
1347   if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
1348           &poll_timeout)) {
1349     poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
1350   }
1351
1352   if (!srtobject->sent_headers) {
1353     if (!gst_srt_object_send_headers (srtobject, srtobject->sock,
1354             srtobject->poll_id, poll_timeout, headers, cancellable)) {
1355       return -1;
1356     }
1357     srtobject->sent_headers = TRUE;
1358   }
1359
1360   while (len < mapinfo->size) {
1361     SRTSOCKET wsock;
1362     gint wsocklen = 1;
1363
1364     gint sent;
1365     gint rest;
1366
1367     if (g_cancellable_is_cancelled (cancellable)) {
1368       break;
1369     }
1370
1371     if (srt_epoll_wait (srtobject->poll_id, 0, 0, &wsock,
1372             &wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
1373       continue;
1374     }
1375
1376     if (srt_getsockflag (wsock, SRTO_PAYLOADSIZE, &payload_size, &optlen)) {
1377       GST_WARNING_OBJECT (srtobject->element, "%s", srt_getlasterror_str ());
1378       break;
1379     }
1380
1381     rest = MIN (mapinfo->size - len, payload_size);
1382
1383     switch (srt_getsockstate (wsock)) {
1384       case SRTS_BROKEN:
1385       case SRTS_NONEXIST:
1386       case SRTS_CLOSED:
1387         GST_WARNING_OBJECT (srtobject->element,
1388             "Invalid SRT socket. Trying to reconnect");
1389         gst_srt_object_close (srtobject);
1390         if (!gst_srt_object_open (srtobject, cancellable, error)) {
1391           return -1;
1392         }
1393         continue;
1394       case SRTS_CONNECTED:
1395         /* good to go */
1396         GST_LOG_OBJECT (srtobject->element, "good to go");
1397         break;
1398       default:
1399         GST_WARNING_OBJECT (srtobject->element, "not ready");
1400         /* not-ready */
1401         continue;
1402     }
1403
1404     sent = srt_sendmsg2 (wsock, (char *) (msg + len), rest, 0);
1405     if (sent < 0) {
1406       GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
1407           ("%s", srt_getlasterror_str ()));
1408       break;
1409     }
1410     len += sent;
1411   }
1412
1413   return len;
1414 }
1415
1416 gssize
1417 gst_srt_object_write (GstSRTObject * srtobject,
1418     GstBufferList * headers,
1419     const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
1420 {
1421   gssize len = 0;
1422   GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
1423
1424   /* Only sink element can write data */
1425   g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
1426           (srtobject->element)) == GST_URI_SINK, -1);
1427
1428   gst_structure_get_enum (srtobject->parameters, "mode",
1429       GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
1430
1431   if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
1432     if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
1433       return -1;
1434
1435     len =
1436         gst_srt_object_write_to_callers (srtobject, headers, mapinfo,
1437         cancellable, error);
1438   } else {
1439     len =
1440         gst_srt_object_write_one (srtobject, headers, mapinfo, cancellable,
1441         error);
1442   }
1443
1444   return len;
1445 }
1446
1447 static GstStructure *
1448 get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender)
1449 {
1450   GstStructure *s = gst_structure_new_empty ("application/x-srt-statistics");
1451   int ret;
1452   SRT_TRACEBSTATS stats;
1453
1454   ret = srt_bstats (srtsock, &stats, 0);
1455
1456   if (ret >= 0) {
1457     if (is_sender)
1458       gst_structure_set (s,
1459           /* number of sent data packets, including retransmissions */
1460           "packets-sent", G_TYPE_INT64, stats.pktSent,
1461           /* number of lost packets (sender side) */
1462           "packets-sent-lost", G_TYPE_INT, stats.pktSndLoss,
1463           /* number of retransmitted packets */
1464           "packets-retransmitted", G_TYPE_INT, stats.pktRetrans,
1465           /* number of received ACK packets */
1466           "packet-ack-received", G_TYPE_INT, stats.pktRecvACK,
1467           /* number of received NAK packets */
1468           "packet-nack-received", G_TYPE_INT, stats.pktRecvNAK,
1469           /* time duration when UDT is sending data (idle time exclusive) */
1470           "send-duration-us", G_TYPE_INT64, stats.usSndDuration,
1471           /* number of sent data bytes, including retransmissions */
1472           "bytes-sent", G_TYPE_UINT64, stats.byteSent,
1473           /* number of retransmitted bytes */
1474           "bytes-retransmitted", G_TYPE_UINT64, stats.byteRetrans,
1475           /* number of too-late-to-send dropped bytes */
1476           "bytes-sent-dropped", G_TYPE_UINT64, stats.byteSndDrop,
1477           /* number of too-late-to-send dropped packets */
1478           "packets-sent-dropped", G_TYPE_INT, stats.pktSndDrop,
1479           /* sending rate in Mb/s */
1480           "send-rate-mbps", G_TYPE_DOUBLE, stats.mbpsSendRate,
1481           /* busy sending time (i.e., idle time exclusive) */
1482           "send-duration-us", G_TYPE_UINT64, stats.usSndDuration,
1483           "negotiated-latency-ms", G_TYPE_INT, stats.msSndTsbPdDelay, NULL);
1484     else
1485       gst_structure_set (s,
1486           "packets-received", G_TYPE_INT64, stats.pktRecvTotal,
1487           "packets-received-lost", G_TYPE_INT, stats.pktRcvLossTotal,
1488           /* number of sent ACK packets */
1489           "packet-ack-sent", G_TYPE_INT, stats.pktSentACK,
1490           /* number of sent NAK packets */
1491           "packet-nack-sent", G_TYPE_INT, stats.pktSentNAK,
1492           "bytes-received", G_TYPE_UINT64, stats.byteRecvTotal,
1493           "bytes-received-lost", G_TYPE_INT, stats.byteRcvLossTotal,
1494           "receive-rate-mbps", G_TYPE_DOUBLE, stats.mbpsRecvRate,
1495           "negotiated-latency-ms", G_TYPE_INT, stats.msRcvTsbPdDelay, NULL);
1496
1497     gst_structure_set (s,
1498         /* estimated bandwidth, in Mb/s */
1499         "bandwidth-mbps", G_TYPE_DOUBLE, stats.mbpsBandwidth,
1500         "rtt-ms", G_TYPE_DOUBLE, stats.msRTT, NULL);
1501
1502   }
1503
1504   return s;
1505 }
1506
1507 GstStructure *
1508 gst_srt_object_get_stats (GstSRTObject * srtobject)
1509 {
1510   GstStructure *s = NULL;
1511   gboolean is_sender = GST_IS_BASE_SINK (srtobject->element);
1512
1513   GST_OBJECT_LOCK (srtobject->element);
1514   if (srtobject->sock != SRT_INVALID_SOCK) {
1515     s = get_stats_for_srtsock (srtobject->sock, is_sender);
1516     goto done;
1517   }
1518
1519   s = gst_structure_new_empty ("application/x-srt-statistics");
1520
1521   if (srtobject->callers) {
1522     GValueArray *callers_stats = g_value_array_new (1);
1523     GValue callers_stats_v = G_VALUE_INIT;
1524     GList *item;
1525
1526     for (item = srtobject->callers; item; item = item->next) {
1527       SRTCaller *caller = item->data;
1528       GstStructure *tmp = get_stats_for_srtsock (caller->sock, is_sender);
1529       GValue *v;
1530
1531       g_value_array_append (callers_stats, NULL);
1532       v = g_value_array_get_nth (callers_stats, callers_stats->n_values - 1);
1533       g_value_init (v, GST_TYPE_STRUCTURE);
1534       g_value_take_boxed (v, tmp);
1535     }
1536
1537     g_value_init (&callers_stats_v, G_TYPE_VALUE_ARRAY);
1538     g_value_take_boxed (&callers_stats_v, callers_stats);
1539     gst_structure_take_value (s, "callers", &callers_stats_v);
1540   }
1541
1542 done:
1543   GST_OBJECT_UNLOCK (srtobject->element);
1544
1545   return s;
1546 }