[MQTT] Change timeout for getting message from mqtt broker
authorgichan <gichan2.jang@samsung.com>
Tue, 4 Apr 2023 01:59:07 +0000 (10:59 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Tue, 4 Apr 2023 03:48:22 +0000 (12:48 +0900)
Change timeout for getting message from mqtt broker.
 default timeout: 0 (infinite timeout)

Signed-off-by: gichan <gichan2.jang@samsung.com>
src/libnnstreamer-edge/nnstreamer-edge-internal.c
src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c
src/libnnstreamer-edge/nnstreamer-edge-mqtt-paho.c
src/libnnstreamer-edge/nnstreamer-edge-mqtt.h
tests/unittest_nnstreamer-edge-mqtt.cc

index 99d1e54880e31e0a326f43ccf521a0e827ace961..b18dd5995235816fb9c3d366c33dd2b1a3a6ac7e 100644 (file)
@@ -1502,7 +1502,8 @@ _mqtt_hybrid_direct_connection (nns_edge_handle_s * eh)
     int server_port = 0;
     nns_size_t msg_len = 0;
 
-    ret = nns_edge_mqtt_get_message (eh->broker_h, (void **) &msg, &msg_len);
+    ret =
+        nns_edge_mqtt_get_message (eh->broker_h, (void **) &msg, &msg_len, 0U);
     if (ret != NNS_EDGE_ERROR_NONE || !msg || msg_len == 0)
       break;
 
index aebbd44cfaf194e20822384f7c5002ececb94a2c..b859a1c6f350213c2aec65cb861624335723c261 100644 (file)
@@ -365,11 +365,11 @@ nns_edge_mqtt_subscribe (nns_edge_broker_h broker_h)
 }
 
 /**
- * @brief Get message from mqtt broker.
+ * @brief Get message from mqtt broker within timeout (0 for infinite timeout).
  */
 int
 nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg,
-    nns_size_t * msg_len)
+    nns_size_t * msg_len, unsigned int timeout)
 {
   nns_edge_broker_s *bh;
 
@@ -385,8 +385,11 @@ nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg,
 
   bh = (nns_edge_broker_s *) broker_h;
 
-  /* Wait for 1 second */
-  if (!nns_edge_queue_wait_pop (bh->message_queue, 1000U, msg, msg_len)) {
+  /*
+   * The time to wait for new data, in milliseconds.
+   * (Default: 0 for infinite timeout)
+   */
+  if (!nns_edge_queue_wait_pop (bh->message_queue, timeout, msg, msg_len)) {
     nns_edge_loge ("Failed to get message from mqtt broker within timeout.");
     return NNS_EDGE_ERROR_UNKNOWN;
   }
index 8a76cde73cbc49abe9be5c04e36213f27bb11875..658397d7fd07109ace453d0a9289e59e186b667b 100644 (file)
@@ -404,11 +404,11 @@ nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h)
 }
 
 /**
- * @brief Get message from mqtt broker.
+ * @brief Get message from mqtt broker within timeout (0 for infinite timeout).
  */
 int
 nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg,
-    nns_size_t * msg_len)
+    nns_size_t * msg_len, unsigned int timeout)
 {
   nns_edge_broker_s *bh;
 
@@ -424,8 +424,11 @@ nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg,
 
   bh = (nns_edge_broker_s *) broker_h;
 
-  /* Wait for 1 second */
-  if (!nns_edge_queue_wait_pop (bh->message_queue, 1000U, msg, msg_len)) {
+  /*
+   * The time to wait for new data, in milliseconds.
+   * (Default: 0 for infinite timeout)
+   */
+  if (!nns_edge_queue_wait_pop (bh->message_queue, timeout, msg, msg_len)) {
     nns_edge_loge ("Failed to get message from mqtt broker within timeout.");
     return NNS_EDGE_ERROR_UNKNOWN;
   }
index 2a8d642c0cc96498592a30d347be176ff669dce4..df0a662218241956c630bad05b01e089e5b7d745 100644 (file)
@@ -53,9 +53,10 @@ int nns_edge_mqtt_subscribe (nns_edge_broker_h broker_h);
 bool nns_edge_mqtt_is_connected (nns_edge_broker_h broker_h);
 
 /**
- * @brief Get message from mqtt broker. If no message in the queue, it waits up to 1 second for new message.
+ * @brief Get message from mqtt broker with within timeout. (0 for inifinite timeout)
  */
-int nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg, nns_size_t *msg_len);
+int
+nns_edge_mqtt_get_message (nns_edge_broker_h broker_h, void **msg, nns_size_t * msg_len,  unsigned int timeout);
 
 /**
  * @brief Internal util function to send edge-data via MQTT connection.
index 1a55a92120c256903f20b084225947834363ba0e..3f09fb26a2bdcb8f01215e047139430a818a3351 100644 (file)
@@ -438,7 +438,7 @@ TEST(edgeMqttHybrid, getMessageInvalidParam1_n)
   if (!_check_mqtt_broker ())
     return;
 
-  ret = nns_edge_mqtt_get_message (NULL, &msg, &msg_len);
+  ret = nns_edge_mqtt_get_message (NULL, &msg, &msg_len, 0U);
   EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
 }
 
@@ -457,7 +457,7 @@ TEST(edgeMqttHybrid, getMessageInvalidParam2_n)
   ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
 
-  ret = nns_edge_mqtt_get_message (broker_h, NULL, &msg_len);
+  ret = nns_edge_mqtt_get_message (broker_h, NULL, &msg_len, 0U);
   EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
 
   ret = nns_edge_mqtt_close (broker_h);
@@ -479,7 +479,7 @@ TEST(edgeMqttHybrid, getMessageInvalidParam3_n)
   ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
 
-  ret = nns_edge_mqtt_get_message (broker_h, &msg, NULL);
+  ret = nns_edge_mqtt_get_message (broker_h, &msg, NULL, 0U);
   EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
 
   ret = nns_edge_mqtt_close (broker_h);
@@ -502,7 +502,7 @@ TEST(edgeMqttHybrid, getMessageWithinTimeout_n)
   ret = nns_edge_mqtt_connect ("temp-mqtt-id", "temp-mqtt-topic", "127.0.0.1", 1883, &broker_h);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
 
-  ret = nns_edge_mqtt_get_message (broker_h, &msg, &msg_len);
+  ret = nns_edge_mqtt_get_message (broker_h, &msg, &msg_len, 1000U);
   EXPECT_NE (ret, NNS_EDGE_ERROR_NONE);
 
   ret = nns_edge_mqtt_close (broker_h);