From b7fe760d05cbcbb30e40df84afcfb40c062f7fc5 Mon Sep 17 00:00:00 2001 From: gichan2-jang Date: Fri, 14 Jul 2023 11:16:26 +0900 Subject: [PATCH] [Edge] Add wait connection prop If wait-connection prop is set true, wait until edgesrc is connected and don't drop the buffers. Signed-off-by: gichan2-jang --- gst/edge/edge_sink.c | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++++ gst/edge/edge_sink.h | 2 ++ 2 files changed, 57 insertions(+) diff --git a/gst/edge/edge_sink.c b/gst/edge/edge_sink.c index c597bec..b8b169c 100644 --- a/gst/edge/edge_sink.c +++ b/gst/edge/edge_sink.c @@ -40,6 +40,8 @@ enum PROP_DEST_PORT, PROP_CONNECT_TYPE, PROP_TOPIC, + PROP_WAIT_CONNECTION, + PROP_CONNECTION_TIMEOUT, PROP_LAST }; @@ -118,6 +120,17 @@ gst_edgesink_class_init (GstEdgeSinkClass * klass) "The main topic of the host and option if necessary. " "(topic)/(optional topic for main topic).", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_WAIT_CONNECTION, + g_param_spec_boolean ("wait-connection", "Wait connection to edgesrc", + "Wait until edgesink is connected to edgesrc. " + "In case of false(default), the buffers entering the edgesink are dropped.", + FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_CONNECTION_TIMEOUT, + g_param_spec_uint64 ("connection-timeout", + "Timeout for wating a connection", + "The timeout (in milliseconds) for waiting a connection to receiver. " + "0 timeout (default) means infinite wait.", 0, G_MAXUINT64, 0, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&sinktemplate)); @@ -146,6 +159,8 @@ gst_edgesink_init (GstEdgeSink * self) self->dest_port = DEFAULT_PORT; self->topic = NULL; self->connect_type = DEFAULT_CONNECT_TYPE; + self->wait_connection = FALSE; + self->connection_timeout = 0; } /** @@ -186,6 +201,12 @@ gst_edgesink_set_property (GObject * object, guint prop_id, g_free (self->topic); self->topic = g_value_dup_string (value); break; + case PROP_WAIT_CONNECTION: + self->wait_connection = g_value_get_boolean (value); + break; + case PROP_CONNECTION_TIMEOUT: + self->connection_timeout = g_value_get_uint64 (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -220,6 +241,12 @@ gst_edgesink_get_property (GObject * object, guint prop_id, GValue * value, case PROP_TOPIC: g_value_set_string (value, self->topic); break; + case PROP_WAIT_CONNECTION: + g_value_set_boolean (value, self->wait_connection); + break; + case PROP_CONNECTION_TIMEOUT: + g_value_set_uint64 (value, self->connection_timeout); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -295,6 +322,34 @@ gst_edgesink_start (GstBaseSink * basesink) return FALSE; } + if (self->wait_connection) { + guint64 remaining = self->connection_timeout; + if (0 == remaining) + remaining = G_MAXUINT64; + + while (remaining >= 10 && + NNS_EDGE_ERROR_NONE != nns_edge_is_connected (self->edge_h)) { + if (!self->wait_connection) { + nns_logi + ("Waiting for connection to edgesrc was canceled by the user."); + return FALSE; + } + g_usleep (10000); + remaining -= 10; + } + + if (remaining > 0 && + NNS_EDGE_ERROR_NONE != nns_edge_is_connected (self->edge_h)) { + g_usleep (remaining * 1000U); + } + + if (NNS_EDGE_ERROR_NONE != nns_edge_is_connected (self->edge_h)) { + nns_loge ("Failed to connect to edgesrc within timeout: %ju ms", + self->connection_timeout); + return FALSE; + } + } + return TRUE; } diff --git a/gst/edge/edge_sink.h b/gst/edge/edge_sink.h index b326a9d..cb63bd5 100644 --- a/gst/edge/edge_sink.h +++ b/gst/edge/edge_sink.h @@ -50,6 +50,8 @@ struct _GstEdgeSink nns_edge_connect_type_e connect_type; nns_edge_h edge_h; + gboolean wait_connection; + guint64 connection_timeout; }; /** -- 2.7.4