PROP_DEST_PORT,
PROP_CONNECT_TYPE,
PROP_TOPIC,
+ PROP_WAIT_CONNECTION,
+ PROP_CONNECTION_TIMEOUT,
PROP_LAST
};
"The main topic of the host and option if necessary. "
"(topic)/(optional topic for main topic).", "",
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_WAIT_CONNECTION,
+ g_param_spec_boolean ("wait-connection", "Wait connection to edgesrc",
+ "Wait until edgesink is connected to edgesrc. "
+ "In case of false(default), the buffers entering the edgesink are dropped.",
+ FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_CONNECTION_TIMEOUT,
+ g_param_spec_uint64 ("connection-timeout",
+ "Timeout for wating a connection",
+ "The timeout (in milliseconds) for waiting a connection to receiver. "
+ "0 timeout (default) means infinite wait.", 0, G_MAXUINT64, 0,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&sinktemplate));
self->dest_port = DEFAULT_PORT;
self->topic = NULL;
self->connect_type = DEFAULT_CONNECT_TYPE;
+ self->wait_connection = FALSE;
+ self->connection_timeout = 0;
}
/**
g_free (self->topic);
self->topic = g_value_dup_string (value);
break;
+ case PROP_WAIT_CONNECTION:
+ self->wait_connection = g_value_get_boolean (value);
+ break;
+ case PROP_CONNECTION_TIMEOUT:
+ self->connection_timeout = g_value_get_uint64 (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
case PROP_TOPIC:
g_value_set_string (value, self->topic);
break;
+ case PROP_WAIT_CONNECTION:
+ g_value_set_boolean (value, self->wait_connection);
+ break;
+ case PROP_CONNECTION_TIMEOUT:
+ g_value_set_uint64 (value, self->connection_timeout);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
return FALSE;
}
+ if (self->wait_connection) {
+ guint64 remaining = self->connection_timeout;
+ if (0 == remaining)
+ remaining = G_MAXUINT64;
+
+ while (remaining >= 10 &&
+ NNS_EDGE_ERROR_NONE != nns_edge_is_connected (self->edge_h)) {
+ if (!self->wait_connection) {
+ nns_logi
+ ("Waiting for connection to edgesrc was canceled by the user.");
+ return FALSE;
+ }
+ g_usleep (10000);
+ remaining -= 10;
+ }
+
+ if (remaining > 0 &&
+ NNS_EDGE_ERROR_NONE != nns_edge_is_connected (self->edge_h)) {
+ g_usleep (remaining * 1000U);
+ }
+
+ if (NNS_EDGE_ERROR_NONE != nns_edge_is_connected (self->edge_h)) {
+ nns_loge ("Failed to connect to edgesrc within timeout: %ju ms",
+ self->connection_timeout);
+ return FALSE;
+ }
+ }
+
return TRUE;
}