Problem: When state changes from PAUSED to NULL,the tensor_query_serversrc cannot switch to NULL state because it keeps waiting for data pop.
Solution: Change the creation and destruction of server data and edge handle to be managed according to the state.
READY->PAUSED: Create query server common data
PAUSED->PLAYING: Create nns-edge handle
PLAYING->PAUSED: Rlease nns-edge handle
PAUSED->READY: Descruct qeury server common data
Releated issue: https://github.com/nnstreamer/api/issues/487
Signed-off-by: gichan2-jang <gichan2.jang@samsung.com>
* @brief Add nnstreamer edge server handle into hash table.
*/
edge_server_handle
-gst_tensor_query_server_add_data (const char *id,
- nns_edge_connect_type_e connect_type)
+gst_tensor_query_server_add_data (const char *id)
{
GstTensorQueryServer *data = NULL;
- int ret;
data = gst_tensor_query_server_get_handle (id);
data->id = g_strdup (id);
data->configured = FALSE;
- ret = nns_edge_create_handle (id, connect_type,
- NNS_EDGE_NODE_TYPE_QUERY_SERVER, &data->edge_h);
- if (NNS_EDGE_ERROR_NONE != ret) {
- GST_ERROR ("Failed to get nnstreamer edge handle.");
- _release_server_data (data);
- return NULL;
- }
-
G_LOCK (query_server_table);
g_hash_table_insert (_qs_table, g_strdup (id), data);
G_UNLOCK (query_server_table);
return data;
}
-
/**
- * @brief Get edge handle from server data.
+ * @brief Get nnstreamer edge handle of query server.
*/
nns_edge_h
-gst_tensor_query_server_get_edge_handle (edge_server_handle server_h)
+gst_tensor_query_server_get_edge_handle (const char *id,
+ nns_edge_connect_type_e connect_type)
+{
+ GstTensorQueryServer *data = NULL;
+ int ret;
+
+ data = gst_tensor_query_server_get_handle (id);
+ if (!data)
+ return NULL;
+
+ g_mutex_lock (&data->lock);
+ if (data->edge_h) {
+ goto done;
+ }
+
+ ret = nns_edge_create_handle (id, connect_type,
+ NNS_EDGE_NODE_TYPE_QUERY_SERVER, &data->edge_h);
+ if (NNS_EDGE_ERROR_NONE != ret) {
+ GST_ERROR ("Failed to get nnstreamer edge handle.");
+ }
+
+done:
+ g_mutex_unlock (&data->lock);
+ return data->edge_h;
+}
+
+/**
+ * @brief Release nnstreamer edge handle of query server.
+ */
+void
+gst_tensor_query_server_release_edge_handle (edge_server_handle server_h)
{
GstTensorQueryServer *data = (GstTensorQueryServer *) server_h;
- return data ? data->edge_h : NULL;
+ g_mutex_lock (&data->lock);
+ nns_edge_release_handle (data->edge_h);
+ data->edge_h = NULL;
+ g_mutex_unlock (&data->lock);
}
/**
#include "tensor_meta.h"
G_BEGIN_DECLS
-
#define DEFAULT_SERVER_ID 0
#define DEFAULT_QUERY_INFO_TIMEOUT 5
-typedef void * edge_server_handle;
+typedef void *edge_server_handle;
/**
* @brief GstTensorQueryServer internal info data structure.
/**
* @brief Get nnstreamer edge server handle.
*/
-edge_server_handle
-gst_tensor_query_server_get_handle (const char *id);
+edge_server_handle gst_tensor_query_server_get_handle (const char *id);
/**
* @brief Add GstTensorQueryServer.
*/
-edge_server_handle
-gst_tensor_query_server_add_data (const char *id, nns_edge_connect_type_e connect_type);
+edge_server_handle gst_tensor_query_server_add_data (const char *id);
/**
* @brief Remove GstTensorQueryServer.
*/
-void
-gst_tensor_query_server_remove_data (edge_server_handle server_h);
+void gst_tensor_query_server_remove_data (edge_server_handle server_h);
/**
* @brief Wait until the sink is configured and get server info handle.
*/
-gboolean
-gst_tensor_query_server_wait_sink (edge_server_handle server_h);
+gboolean gst_tensor_query_server_wait_sink (edge_server_handle server_h);
/**
- * @brief Get edge handle from server data.
+ * @brief Get nnstreamer edge handle of query server.
*/
-nns_edge_h
-gst_tensor_query_server_get_edge_handle (edge_server_handle server_h);
+nns_edge_h gst_tensor_query_server_get_edge_handle (const char *id, nns_edge_connect_type_e connect_type);
/**
* @brief set query server sink configured.
*/
-void
-gst_tensor_query_server_set_configured (edge_server_handle server_h);
+void gst_tensor_query_server_set_configured (edge_server_handle server_h);
/**
* @brief set query server caps.
*/
void
-gst_tensor_query_server_set_caps (edge_server_handle server_h, const char *caps_str);
+gst_tensor_query_server_set_caps (edge_server_handle server_h,
+ const char *caps_str);
-G_END_DECLS
+/**
+ * @brief Release nnstreamer edge handle of query server.
+ */
+void gst_tensor_query_server_release_edge_handle (edge_server_handle server_h);
+G_END_DECLS
#endif /* __GST_TENSOR_QUERY_CLIENT_H__ */
#define gst_tensor_query_serversink_parent_class parent_class
G_DEFINE_TYPE (GstTensorQueryServerSink, gst_tensor_query_serversink,
GST_TYPE_BASE_SINK);
-
+static GstStateChangeReturn gst_tensor_query_serversink_change_state (GstElement
+ * element, GstStateChange transition);
static void gst_tensor_query_serversink_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec);
static void gst_tensor_query_serversink_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec);
static void gst_tensor_query_serversink_finalize (GObject * object);
-static gboolean gst_tensor_query_serversink_start (GstBaseSink * bsink);
static GstFlowReturn gst_tensor_query_serversink_render (GstBaseSink * bsink,
GstBuffer * buf);
static gboolean gst_tensor_query_serversink_set_caps (GstBaseSink * basesink,
gstelement_class = (GstElementClass *) gstbasesink_class;
gobject_class = (GObjectClass *) gstelement_class;
+ gstelement_class->change_state = gst_tensor_query_serversink_change_state;
gobject_class->set_property = gst_tensor_query_serversink_set_property;
gobject_class->get_property = gst_tensor_query_serversink_get_property;
gobject_class->finalize = gst_tensor_query_serversink_finalize;
"Send tensor data as a server over the network",
"Samsung Electronics Co., Ltd.");
- gstbasesink_class->start = gst_tensor_query_serversink_start;
gstbasesink_class->set_caps = gst_tensor_query_serversink_set_caps;
gstbasesink_class->render = gst_tensor_query_serversink_render;
}
/**
+ * @brief start processing of query_serversink
+ */
+static void
+_gst_tensor_query_serversink_start (GstTensorQueryServerSink * sink)
+{
+ gchar *id_str = NULL;
+
+ id_str = g_strdup_printf ("%u", sink->sink_id);
+ sink->server_h = gst_tensor_query_server_add_data (id_str);
+ g_free (id_str);
+
+ gst_tensor_query_server_set_configured (sink->server_h);
+}
+
+/**
+ * @brief start processing of query_serversink
+ */
+static void
+_gst_tensor_query_serversink_playing (GstTensorQueryServerSink * sink)
+{
+ gchar *id_str = NULL;
+
+ id_str = g_strdup_printf ("%u", sink->sink_id);
+ sink->edge_h =
+ gst_tensor_query_server_get_edge_handle (id_str, sink->connect_type);
+ g_free (id_str);
+}
+
+/**
+ * @brief Change state of query server sink.
+ */
+static GstStateChangeReturn
+gst_tensor_query_serversink_change_state (GstElement * element,
+ GstStateChange transition)
+{
+ GstTensorQueryServerSink *sink = GST_TENSOR_QUERY_SERVERSINK (element);
+ GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
+
+ switch (transition) {
+ case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+ _gst_tensor_query_serversink_playing (sink);
+ if (!sink->edge_h) {
+ nns_loge ("Failed to change state from PAUSED to PLAYING.");
+ return GST_STATE_CHANGE_FAILURE;
+ }
+ break;
+ case GST_STATE_CHANGE_READY_TO_PAUSED:
+ _gst_tensor_query_serversink_start (sink);
+ if (!sink->server_h) {
+ nns_loge ("Failed to change state from READY to PAUSED.");
+ return GST_STATE_CHANGE_FAILURE;
+ }
+ default:
+ break;
+ }
+
+ ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+ if (ret == GST_STATE_CHANGE_FAILURE) {
+ nns_loge ("Failed to change state");
+ return ret;
+ }
+
+ switch (transition) {
+ case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+ gst_tensor_query_server_release_edge_handle (sink->server_h);
+ sink->edge_h = NULL;
+ break;
+ default:
+ break;
+ }
+
+ return ret;
+}
+
+/**
* @brief set property
*/
static void
}
/**
- * @brief start processing of query_serversink
- */
-static gboolean
-gst_tensor_query_serversink_start (GstBaseSink * bsink)
-{
- GstTensorQueryServerSink *sink = GST_TENSOR_QUERY_SERVERSINK (bsink);
- gchar *id_str = NULL;
-
- id_str = g_strdup_printf ("%u", sink->sink_id);
- sink->server_h =
- gst_tensor_query_server_add_data (id_str, sink->connect_type);
- g_free (id_str);
-
- sink->edge_h = gst_tensor_query_server_get_edge_handle (sink->server_h);
- gst_tensor_query_server_set_configured (sink->server_h);
-
- return TRUE;
-}
-
-/**
* @brief An implementation of the set_caps vmethod in GstBaseSinkClass
*/
static gboolean
#include "tensor_query_server.h"
#include "tensor_query_common.h"
G_BEGIN_DECLS
-
#define GST_TYPE_TENSOR_QUERY_SERVERSINK \
(gst_tensor_query_serversink_get_type())
#define GST_TENSOR_QUERY_SERVERSINK(obj) \
#define GST_IS_TENSOR_QUERY_SERVERSINK_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TENSOR_QUERY_SERVERSINK))
#define GST_TENSOR_QUERY_SERVERSINK_CAST(obj) ((GstTensorQueryServerSink *)(obj))
-
typedef struct _GstTensorQueryServerSink GstTensorQueryServerSink;
typedef struct _GstTensorQueryServerSinkClass GstTensorQueryServerSinkClass;
#define DEFAULT_IS_LIVE TRUE
#define DEFAULT_MQTT_HOST "127.0.0.1"
#define DEFAULT_MQTT_PORT 1883
+#define DEFAULT_DATA_POP_TIMEOUT 100000U
/**
* @brief the capabilities of the outputs
G_DEFINE_TYPE (GstTensorQueryServerSrc, gst_tensor_query_serversrc,
GST_TYPE_PUSH_SRC);
+static GstStateChangeReturn gst_tensor_query_serversrc_change_state (GstElement
+ * element, GstStateChange transition);
static void gst_tensor_query_serversrc_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec);
static void gst_tensor_query_serversrc_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec);
static void gst_tensor_query_serversrc_finalize (GObject * object);
-
-static gboolean gst_tensor_query_serversrc_start (GstBaseSrc * bsrc);
static GstFlowReturn gst_tensor_query_serversrc_create (GstPushSrc * psrc,
GstBuffer ** buf);
gobject_class->set_property = gst_tensor_query_serversrc_set_property;
gobject_class->get_property = gst_tensor_query_serversrc_get_property;
gobject_class->finalize = gst_tensor_query_serversrc_finalize;
+ gstelement_class->change_state = gst_tensor_query_serversrc_change_state;
g_object_class_install_property (gobject_class, PROP_HOST,
g_param_spec_string ("host", "Host", "The hostname to listen as",
"Receive tensor data as a server over the network",
"Samsung Electronics Co., Ltd.");
- gstbasesrc_class->start = gst_tensor_query_serversrc_start;
gstpushsrc_class->create = gst_tensor_query_serversrc_create;
GST_DEBUG_CATEGORY_INIT (gst_tensor_query_serversrc_debug,
src->src_id = DEFAULT_SERVER_ID;
src->configured = FALSE;
src->msg_queue = g_async_queue_new ();
+ src->playing = FALSE;
gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME);
/** set the timestamps on each buffer */
}
/**
+ * @brief nnstreamer-edge event callback.
+ */
+static int
+_nns_edge_event_cb (nns_edge_event_h event_h, void *user_data)
+{
+ nns_edge_event_e event_type;
+ int ret = NNS_EDGE_ERROR_NONE;
+
+ GstTensorQueryServerSrc *src = (GstTensorQueryServerSrc *) user_data;
+ ret = nns_edge_event_get_type (event_h, &event_type);
+ if (NNS_EDGE_ERROR_NONE != ret) {
+ nns_loge ("Failed to get event type!");
+ return ret;
+ }
+
+ switch (event_type) {
+ case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
+ {
+ nns_edge_data_h data;
+
+ ret = nns_edge_event_parse_new_data (event_h, &data);
+ if (NNS_EDGE_ERROR_NONE != ret) {
+ nns_loge ("Failed to parse new data received from new data event");
+ return ret;
+ }
+ g_async_queue_push (src->msg_queue, data);
+ break;
+ }
+ default:
+ break;
+ }
+
+ return ret;
+}
+
+/**
+ * @brief start processing of query_serversrc, setting up the server
+ */
+static gboolean
+_gst_tensor_query_serversrc_start (GstTensorQueryServerSrc * src)
+{
+ gchar *id_str = NULL;
+
+ id_str = g_strdup_printf ("%u", src->src_id);
+ src->server_h = gst_tensor_query_server_add_data (id_str);
+ g_free (id_str);
+
+ if (!gst_tensor_query_server_wait_sink (src->server_h)) {
+ nns_loge ("Failed to get server information from query server.");
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+/**
+ * @brief start processing of query_serversrc, setting up the server
+ */
+static gboolean
+_gst_tensor_query_serversrc_playing (GstTensorQueryServerSrc * src,
+ nns_edge_connect_type_e connect_type)
+{
+ gchar *id_str = NULL, *port = NULL, *dest_port = NULL;
+
+ id_str = g_strdup_printf ("%u", src->src_id);
+ src->edge_h = gst_tensor_query_server_get_edge_handle (id_str, connect_type);
+ g_free (id_str);
+
+ if (!src->edge_h) {
+ return FALSE;
+ }
+
+ if (src->host)
+ nns_edge_set_info (src->edge_h, "HOST", src->host);
+ if (src->port > 0) {
+ port = g_strdup_printf ("%u", src->port);
+ nns_edge_set_info (src->edge_h, "PORT", port);
+ g_free (port);
+ }
+ if (src->dest_host)
+ nns_edge_set_info (src->edge_h, "DEST_HOST", src->dest_host);
+ if (src->dest_port > 0) {
+ dest_port = g_strdup_printf ("%u", src->dest_port);
+ nns_edge_set_info (src->edge_h, "DEST_PORT", dest_port);
+ g_free (dest_port);
+ }
+ if (src->topic)
+ nns_edge_set_info (src->edge_h, "TOPIC", src->topic);
+
+ nns_edge_set_event_callback (src->edge_h, _nns_edge_event_cb, src);
+
+ if (NNS_EDGE_ERROR_NONE != nns_edge_start (src->edge_h)) {
+ nns_loge
+ ("Failed to start NNStreamer-edge. Please check server IP and port.");
+ return FALSE;
+ }
+
+ src->playing = TRUE;
+
+ return TRUE;
+}
+
+/**
+ * @brief Change state of query server src.
+ */
+static GstStateChangeReturn
+gst_tensor_query_serversrc_change_state (GstElement * element,
+ GstStateChange transition)
+{
+ GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (element);
+ GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
+
+ switch (transition) {
+ case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+ if (!_gst_tensor_query_serversrc_playing (src, src->connect_type)) {
+ nns_loge ("Failed to change state from PAUSED to PLAYING.");
+ return GST_STATE_CHANGE_FAILURE;
+ }
+ src->playing = TRUE;
+ break;
+ case GST_STATE_CHANGE_READY_TO_PAUSED:
+ if (!_gst_tensor_query_serversrc_start (src)) {
+ nns_loge ("Failed to change state from READY to PAUSED.");
+ return GST_STATE_CHANGE_FAILURE;
+ }
+ break;
+ default:
+ break;
+ }
+
+ ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+ if (ret == GST_STATE_CHANGE_FAILURE) {
+ nns_loge ("Failed to change state");
+ return ret;
+ }
+
+ switch (transition) {
+ case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+ src->playing = FALSE;
+ gst_tensor_query_server_release_edge_handle (src->server_h);
+ src->edge_h = NULL;
+ break;
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
+ gst_tensor_query_server_remove_data (src->server_h);
+ break;
+ default:
+ break;
+ }
+
+ return ret;
+}
+
+/**
* @brief set property of query_serversrc
*/
static void
}
/**
- * @brief nnstreamer-edge event callback.
- */
-static int
-_nns_edge_event_cb (nns_edge_event_h event_h, void *user_data)
-{
- nns_edge_event_e event_type;
- int ret = NNS_EDGE_ERROR_NONE;
-
- GstTensorQueryServerSrc *src = (GstTensorQueryServerSrc *) user_data;
- if (0 != nns_edge_event_get_type (event_h, &event_type)) {
- nns_loge ("Failed to get event type!");
- return NNS_EDGE_ERROR_UNKNOWN;
- }
-
- switch (event_type) {
- case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
- {
- nns_edge_data_h data;
-
- nns_edge_event_parse_new_data (event_h, &data);
- g_async_queue_push (src->msg_queue, data);
- break;
- }
- default:
- break;
- }
-
- return ret;
-}
-
-/**
- * @brief start processing of query_serversrc, setting up the server
- */
-static gboolean
-gst_tensor_query_serversrc_start (GstBaseSrc * bsrc)
-{
- GstTensorQueryServerSrc *src = GST_TENSOR_QUERY_SERVERSRC (bsrc);
- char *id_str = NULL, *port = NULL;
-
- id_str = g_strdup_printf ("%d", src->src_id);
- src->server_h = gst_tensor_query_server_add_data (id_str, src->connect_type);
- g_free (id_str);
-
- src->edge_h = gst_tensor_query_server_get_edge_handle (src->server_h);
- if (src->host)
- nns_edge_set_info (src->edge_h, "HOST", src->host);
- if (src->port > 0) {
- port = g_strdup_printf ("%u", src->port);
- nns_edge_set_info (src->edge_h, "PORT", port);
- g_free (port);
- }
- if (src->dest_host)
- nns_edge_set_info (src->edge_h, "DEST_HOST", src->dest_host);
- if (src->dest_port > 0) {
- port = g_strdup_printf ("%u", src->dest_port);
- nns_edge_set_info (src->edge_h, "DEST_PORT", port);
- g_free (port);
- }
- if (src->topic)
- nns_edge_set_info (src->edge_h, "TOPIC", src->topic);
-
- nns_edge_set_event_callback (src->edge_h, _nns_edge_event_cb, src);
-
- if (NNS_EDGE_ERROR_NONE != nns_edge_start (src->edge_h)) {
- nns_loge
- ("Failed to start NNStreamer-edge. Please check server IP and port.");
- return FALSE;
- }
-
- if (!gst_tensor_query_server_wait_sink (src->server_h)) {
- nns_loge ("Failed to get server information from query server.");
- return FALSE;
- }
-
- return TRUE;
-}
-
-/**
* @brief Get buffer from message queue.
*/
static GstBuffer *
_gst_tensor_query_serversrc_get_buffer (GstTensorQueryServerSrc * src)
{
- nns_edge_data_h data_h;
+ nns_edge_data_h data_h = NULL;
GstBuffer *buffer = NULL;
guint i, num_data;
GstMetaQuery *meta_query;
int ret;
- data_h = g_async_queue_pop (src->msg_queue);
+ while (src->playing && !data_h)
+ data_h = g_async_queue_timeout_pop (src->msg_queue,
+ DEFAULT_DATA_POP_TIMEOUT);
if (!data_h) {
nns_loge ("Failed to get message from the server message queue");
#include "tensor_query_server.h"
G_BEGIN_DECLS
-
#define GST_TYPE_TENSOR_QUERY_SERVERSRC \
(gst_tensor_query_serversrc_get_type())
#define GST_TENSOR_QUERY_SERVERSRC(obj) \
#define GST_IS_TENSOR_QUERY_SERVERSRC_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TENSOR_QUERY_SERVERSRC))
#define GST_TENSOR_QUERY_SERVERSRC_CAST(obj) ((GstTensorQueryServerSrc *)(obj))
-
typedef struct _GstTensorQueryServerSrc GstTensorQueryServerSrc;
typedef struct _GstTensorQueryServerSrcClass GstTensorQueryServerSrcClass;
edge_server_handle server_h;
nns_edge_h edge_h;
GAsyncQueue *msg_queue;
+ gboolean playing;
};
/**
EXPECT_NE (gstpipe, nullptr);
EXPECT_EQ (setPipelineStateSync (gstpipe, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
- g_usleep (100000);
-
EXPECT_EQ (setPipelineStateSync (gstpipe, GST_STATE_PAUSED, UNITTEST_STATECHANGE_TIMEOUT), 0);
- g_usleep (100000);
+ EXPECT_EQ (setPipelineStateSync (gstpipe, GST_STATE_READY, UNITTEST_STATECHANGE_TIMEOUT), 0);
+ EXPECT_EQ (setPipelineStateSync (gstpipe, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT), 0);
+ EXPECT_EQ (setPipelineStateSync (gstpipe, GST_STATE_READY, UNITTEST_STATECHANGE_TIMEOUT), 0);
+ EXPECT_EQ (setPipelineStateSync (gstpipe, GST_STATE_PAUSED, UNITTEST_STATECHANGE_TIMEOUT), 0);
+ EXPECT_EQ (setPipelineStateSync (gstpipe, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
+ EXPECT_EQ (setPipelineStateSync (gstpipe, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT), 0);
+ EXPECT_EQ (setPipelineStateSync (gstpipe, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
+ EXPECT_EQ (setPipelineStateSync (gstpipe, GST_STATE_READY, UNITTEST_STATECHANGE_TIMEOUT), 0);
+ EXPECT_EQ (setPipelineStateSync (gstpipe, GST_STATE_PAUSED, UNITTEST_STATECHANGE_TIMEOUT), 0);
+ EXPECT_EQ (setPipelineStateSync (gstpipe, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT), 0);
gst_object_unref (gstpipe);
g_free (pipeline);