[Edge] Add wait connection prop
authorgichan2-jang <gichan2.jang@samsung.com>
Fri, 14 Jul 2023 02:16:26 +0000 (11:16 +0900)
committerMyungJoo Ham <myungjoo.ham@samsung.com>
Thu, 10 Aug 2023 00:26:23 +0000 (09:26 +0900)
If wait-connection prop is set true, wait until edgesrc is connected and
don't drop the buffers.

Signed-off-by: gichan2-jang <gichan2.jang@samsung.com>
gst/edge/edge_sink.c
gst/edge/edge_sink.h

index c597bec..b8b169c 100644 (file)
@@ -40,6 +40,8 @@ enum
   PROP_DEST_PORT,
   PROP_CONNECT_TYPE,
   PROP_TOPIC,
+  PROP_WAIT_CONNECTION,
+  PROP_CONNECTION_TIMEOUT,
 
   PROP_LAST
 };
@@ -118,6 +120,17 @@ gst_edgesink_class_init (GstEdgeSinkClass * klass)
           "The main topic of the host and option if necessary. "
           "(topic)/(optional topic for main topic).", "",
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_WAIT_CONNECTION,
+      g_param_spec_boolean ("wait-connection", "Wait connection to edgesrc",
+          "Wait until edgesink is connected to edgesrc. "
+          "In case of false(default), the buffers entering the edgesink are dropped.",
+          FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_CONNECTION_TIMEOUT,
+      g_param_spec_uint64 ("connection-timeout",
+          "Timeout for wating a connection",
+          "The timeout (in milliseconds) for waiting a connection to receiver. "
+          "0 timeout (default) means infinite wait.", 0, G_MAXUINT64, 0,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   gst_element_class_add_pad_template (gstelement_class,
       gst_static_pad_template_get (&sinktemplate));
@@ -146,6 +159,8 @@ gst_edgesink_init (GstEdgeSink * self)
   self->dest_port = DEFAULT_PORT;
   self->topic = NULL;
   self->connect_type = DEFAULT_CONNECT_TYPE;
+  self->wait_connection = FALSE;
+  self->connection_timeout = 0;
 }
 
 /**
@@ -186,6 +201,12 @@ gst_edgesink_set_property (GObject * object, guint prop_id,
       g_free (self->topic);
       self->topic = g_value_dup_string (value);
       break;
+    case PROP_WAIT_CONNECTION:
+      self->wait_connection = g_value_get_boolean (value);
+      break;
+    case PROP_CONNECTION_TIMEOUT:
+      self->connection_timeout = g_value_get_uint64 (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -220,6 +241,12 @@ gst_edgesink_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_TOPIC:
       g_value_set_string (value, self->topic);
       break;
+    case PROP_WAIT_CONNECTION:
+      g_value_set_boolean (value, self->wait_connection);
+      break;
+    case PROP_CONNECTION_TIMEOUT:
+      g_value_set_uint64 (value, self->connection_timeout);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -295,6 +322,34 @@ gst_edgesink_start (GstBaseSink * basesink)
     return FALSE;
   }
 
+  if (self->wait_connection) {
+    guint64 remaining = self->connection_timeout;
+    if (0 == remaining)
+      remaining = G_MAXUINT64;
+
+    while (remaining >= 10 &&
+        NNS_EDGE_ERROR_NONE != nns_edge_is_connected (self->edge_h)) {
+      if (!self->wait_connection) {
+        nns_logi
+            ("Waiting for connection to edgesrc was canceled by the user.");
+        return FALSE;
+      }
+      g_usleep (10000);
+      remaining -= 10;
+    }
+
+    if (remaining > 0 &&
+        NNS_EDGE_ERROR_NONE != nns_edge_is_connected (self->edge_h)) {
+      g_usleep (remaining * 1000U);
+    }
+
+    if (NNS_EDGE_ERROR_NONE != nns_edge_is_connected (self->edge_h)) {
+      nns_loge ("Failed to connect to edgesrc within timeout: %ju ms",
+          self->connection_timeout);
+      return FALSE;
+    }
+  }
+
   return TRUE;
 }
 
index b326a9d..cb63bd5 100644 (file)
@@ -50,6 +50,8 @@ struct _GstEdgeSink
 
   nns_edge_connect_type_e connect_type;
   nns_edge_h edge_h;
+  gboolean wait_connection;
+  guint64 connection_timeout;
 };
 
 /**