From: Wonsang Ryou Date: Thu, 1 Jun 2017 06:23:29 +0000 (+0900) Subject: netutils/mqtt: allow concurrent sub,unsub,pub on same mqtt instance X-Git-Tag: 1.1_Public_Release~453^2~48 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=91d35dd0f6f83faf0fb8945a2c6ca4207d9aaf10;p=rtos%2Ftinyara.git netutils/mqtt: allow concurrent sub,unsub,pub on same mqtt instance This patch allows users to operate subscribe, unsubscribe and publish concurrently on same mqtt client instance. The subscribe, unsubscribe and publish operations had been able to be operated only when there is no other operation. Change-Id: I0a24b7f6a882e190874163103c1b81de8283a192 Signed-off-by: Wonsang Ryou --- diff --git a/apps/include/netutils/mqtt_api.h b/apps/include/netutils/mqtt_api.h index e25ecd7..6caa82f 100644 --- a/apps/include/netutils/mqtt_api.h +++ b/apps/include/netutils/mqtt_api.h @@ -52,9 +52,6 @@ enum mqtt_client_state_e { MQTT_CLIENT_STATE_NOT_CONNECTED = 0, MQTT_CLIENT_STATE_CONNECTED, MQTT_CLIENT_STATE_CONNECT_REQUEST, - MQTT_CLIENT_STATE_SUBSCRIBE_REQUEST, - MQTT_CLIENT_STATE_UNSUBSCRIBE_REQUEST, - MQTT_CLIENT_STATE_PUBLISH_REQUEST, MQTT_CLIENT_STATE_DISCONNECT_REQUEST, }; diff --git a/apps/netutils/mqtt/mqtt_api.c b/apps/netutils/mqtt/mqtt_api.c index 566b53c..300e5a4 100644 --- a/apps/netutils/mqtt/mqtt_api.c +++ b/apps/netutils/mqtt/mqtt_api.c @@ -135,7 +135,6 @@ static void on_publish_callback(struct mosquitto *client, void *data, int msg_id mqtt_client_t *mqtt_client = (mqtt_client_t *)data; if (mqtt_client) { - mqtt_client->state = MQTT_CLIENT_STATE_CONNECTED; if (mqtt_client->config && mqtt_client->config->on_publish) { mqtt_client->config->on_publish(mqtt_client, msg_id); } @@ -147,7 +146,6 @@ static void on_subscribe_callback(struct mosquitto *client, void *data, int msg_ mqtt_client_t *mqtt_client = (mqtt_client_t *)data; if (mqtt_client) { - mqtt_client->state = MQTT_CLIENT_STATE_CONNECTED; if (mqtt_client->config && mqtt_client->config->on_subscribe) { mqtt_client->config->on_subscribe(mqtt_client, msg_id, qos_count, granted_qos); } @@ -159,7 +157,6 @@ static void on_unsubscribe_callback(struct mosquitto *client, void *data, int ms mqtt_client_t *mqtt_client = (mqtt_client_t *)data; if (mqtt_client) { - mqtt_client->state = MQTT_CLIENT_STATE_CONNECTED; if (mqtt_client->config && mqtt_client->config->on_unsubscribe) { mqtt_client->config->on_unsubscribe(mqtt_client, msg_id); } @@ -183,15 +180,6 @@ static void get_mqtt_client_state_string(int state, char *result_str) case MQTT_CLIENT_STATE_CONNECT_REQUEST: snprintf(result_str, 20, "CONNECT_REQUEST"); break; - case MQTT_CLIENT_STATE_SUBSCRIBE_REQUEST: - snprintf(result_str, 20, "SUBSCRIBE_REQUEST"); - break; - case MQTT_CLIENT_STATE_UNSUBSCRIBE_REQUEST: - snprintf(result_str, 20, "UNSUBSCRIBE_REQUEST"); - break; - case MQTT_CLIENT_STATE_PUBLISH_REQUEST: - snprintf(result_str, 20, "PUBLISH_REQUEST"); - break; case MQTT_CLIENT_STATE_DISCONNECT_REQUEST: snprintf(result_str, 20, "DISCONNECT_REQUEST"); break; @@ -361,6 +349,7 @@ int mqtt_connect(mqtt_client_t *handle, char *addr, int port, int keep_alive) int ret = 0; struct mosquitto *mosq = NULL; mqtt_client_config_t *mqtt_config = NULL; + int prev_state = 0; if (handle == NULL) { ndbg("ERROR: mqtt_client handle is null.\n"); @@ -401,11 +390,12 @@ int mqtt_connect(mqtt_client_t *handle, char *addr, int port, int keep_alive) port = MQTT_DEFAULT_BROKER_PORT; } + prev_state = handle->state; handle->state = MQTT_CLIENT_STATE_CONNECT_REQUEST; ret = mosquitto_connect(mosq, addr, port, keep_alive); if (ret != 0) { ndbg("ERROR: mosquitto_connect() failed. (ret: %d)\n", ret); - handle->state = MQTT_CLIENT_STATE_NOT_CONNECTED; + handle->state = prev_state; goto done; } @@ -446,6 +436,7 @@ int mqtt_disconnect(mqtt_client_t *handle) int result = -1; int ret = 0; struct mosquitto *mosq = NULL; + int prev_state = 0; if (handle == NULL) { ndbg("ERROR: mqtt_client handle is null.\n"); @@ -464,18 +455,19 @@ int mqtt_disconnect(mqtt_client_t *handle) goto done; } - if (handle->state > MQTT_CLIENT_STATE_CONNECTED) { + if (handle->state == MQTT_CLIENT_STATE_DISCONNECT_REQUEST) { char state_str[20]; get_mqtt_client_state_string(handle->state, state_str); ndbg("ERROR: mqtt_client is busy. (current state: %s)\n", state_str); goto done; } + prev_state = handle->state; handle->state = MQTT_CLIENT_STATE_DISCONNECT_REQUEST; ret = mosquitto_disconnect(mosq); if (ret != 0) { ndbg("ERROR: mosquitto_disconnect() failed.\n"); - handle->state = MQTT_CLIENT_STATE_CONNECTED; + handle->state = prev_state; goto done; } @@ -543,7 +535,6 @@ int mqtt_publish(mqtt_client_t *handle, char *topic, char *data, uint32_t data_l goto done; } - handle->state = MQTT_CLIENT_STATE_PUBLISH_REQUEST; ret = mosquitto_publish(mosq, NULL, (const char *)topic, data_len, data, qos, retain != 0 ? true : false); if (ret != 0) { ndbg("ERROR: mosquitto_publish() failed. (ret: %d)\n", ret); @@ -612,7 +603,6 @@ int mqtt_subscribe(mqtt_client_t *handle, char *topic, uint8_t qos) goto done; } - handle->state = MQTT_CLIENT_STATE_SUBSCRIBE_REQUEST; ret = mosquitto_subscribe(mosq, NULL, (const char *)topic, qos); if (ret != 0) { ndbg("ERROR: mqtt_subscribe() failed. (ret: %d)\n", ret); @@ -675,7 +665,6 @@ int mqtt_unsubscribe(mqtt_client_t *handle, char *topic) goto done; } - handle->state = MQTT_CLIENT_STATE_UNSUBSCRIBE_REQUEST; ret = mosquitto_unsubscribe(mosq, NULL, (const char *)topic); if (ret != 0) { ndbg("ERROR: mosquitto_unsubscribe() failed. (ret: %d)\n");