From 5dbe6c773ee22589f2790641df93720e1bd08929 Mon Sep 17 00:00:00 2001 From: gichan Date: Tue, 4 Apr 2023 10:59:07 +0900 Subject: [PATCH] [MQTT] Change timeout for getting message from mqtt broker Change timeout for getting message from mqtt broker. default timeout: 0 (infinite timeout) Signed-off-by: gichan --- src/libnnstreamer-edge/nnstreamer-edge-internal.c | 3 ++- .../nnstreamer-edge-mqtt-mosquitto.c | 11 +++++++---- src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c | 11 +++++++---- src/libnnstreamer-edge/nnstreamer-edge-mqtt.h | 5 +++-- tests/unittest_nnstreamer-edge-mqtt.cc | 8 ++++---- 5 files changed, 23 insertions(+), 15 deletions(-) diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.c b/src/libnnstreamer-edge/nnstreamer-edge-internal.c index 99d1e54..b18dd59 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-internal.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.c @@ -1502,7 +1502,8 @@ _mqtt_hybrid_direct_connection (nns_edge_handle_s * eh) int server_port = 0; nns_size_t msg_len = 0; - ret = nns_edge_mqtt_get_message (eh->broker_h, (void **) &msg, &msg_len); + ret = + nns_edge_mqtt_get_message (eh->broker_h, (void **) &msg, &msg_len, 0U); if (ret != NNS_EDGE_ERROR_NONE || !msg || msg_len == 0) break; diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c index aebbd44..b859a1c 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c @@ -365,11 +365,11 @@ nns_edge_mqtt_subscribe (nns_edge_broker_h broker_h) } /** - * @brief Get message from mqtt broker. + * @brief Get message from mqtt broker within timeout (0 for infinite timeout). */ int nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg, - nns_size_t * msg_len) + nns_size_t * msg_len, unsigned int timeout) { nns_edge_broker_s *bh; @@ -385,8 +385,11 @@ nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg, bh = (nns_edge_broker_s *) broker_h; - /* Wait for 1 second */ - if (!nns_edge_queue_wait_pop (bh->message_queue, 1000U, msg, msg_len)) { + /* + * The time to wait for new data, in milliseconds. + * (Default: 0 for infinite timeout) + */ + if (!nns_edge_queue_wait_pop (bh->message_queue, timeout, msg, msg_len)) { nns_edge_loge ("Failed to get message from mqtt broker within timeout."); return NNS_EDGE_ERROR_UNKNOWN; } diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c index 8a76cde..658397d 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c @@ -404,11 +404,11 @@ nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h) } /** - * @brief Get message from mqtt broker. + * @brief Get message from mqtt broker within timeout (0 for infinite timeout). */ int nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg, - nns_size_t * msg_len) + nns_size_t * msg_len, unsigned int timeout) { nns_edge_broker_s *bh; @@ -424,8 +424,11 @@ nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg, bh = (nns_edge_broker_s *) broker_h; - /* Wait for 1 second */ - if (!nns_edge_queue_wait_pop (bh->message_queue, 1000U, msg, msg_len)) { + /* + * The time to wait for new data, in milliseconds. + * (Default: 0 for infinite timeout) + */ + if (!nns_edge_queue_wait_pop (bh->message_queue, timeout, msg, msg_len)) { nns_edge_loge ("Failed to get message from mqtt broker within timeout."); return NNS_EDGE_ERROR_UNKNOWN; } diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.h b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.h index 2a8d642..df0a662 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.h +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.h @@ -53,9 +53,10 @@ int nns_edge_mqtt_subscribe (nns_edge_broker_h broker_h); bool nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h); /** - * @brief Get message from mqtt broker. If no message in the queue, it waits up to 1 second for new message. + * @brief Get message from mqtt broker with within timeout. (0 for inifinite timeout) */ -int nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg, nns_size_t *msg_len); +int +nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg, nns_size_t * msg_len, unsigned int timeout); /** * @brief Internal util function to send edge-data via MQTT connection. diff --git a/tests/unittest_nnstreamer-edge-mqtt.cc b/tests/unittest_nnstreamer-edge-mqtt.cc index 1a55a92..3f09fb2 100644 --- a/tests/unittest_nnstreamer-edge-mqtt.cc +++ b/tests/unittest_nnstreamer-edge-mqtt.cc @@ -438,7 +438,7 @@ TEST(edgeMqttHybrid, getMessageInvalidParam1_n) if (!_check_mqtt_broker ()) return; - ret = nns_edge_mqtt_get_message (NULL, &msg, &msg_len); + ret = nns_edge_mqtt_get_message (NULL, &msg, &msg_len, 0U); EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); } @@ -457,7 +457,7 @@ TEST(edgeMqttHybrid, getMessageInvalidParam2_n) ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - ret = nns_edge_mqtt_get_message (broker_h, NULL, &msg_len); + ret = nns_edge_mqtt_get_message (broker_h, NULL, &msg_len, 0U); EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); ret = nns_edge_mqtt_close (broker_h); @@ -479,7 +479,7 @@ TEST(edgeMqttHybrid, getMessageInvalidParam3_n) ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - ret = nns_edge_mqtt_get_message (broker_h, &msg, NULL); + ret = nns_edge_mqtt_get_message (broker_h, &msg, NULL, 0U); EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); ret = nns_edge_mqtt_close (broker_h); @@ -502,7 +502,7 @@ TEST(edgeMqttHybrid, getMessageWithinTimeout_n) ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h); EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE); - ret = nns_edge_mqtt_get_message (broker_h, &msg, &msg_len); + ret = nns_edge_mqtt_get_message (broker_h, &msg, &msg_len, 1000U); EXPECT_NE (ret, NNS_EDGE_ERROR_NONE); ret = nns_edge_mqtt_close (broker_h); -- 2.34.1