5 * @defgroup mqtt MQTT client
7 * @verbinclude mqtt_client.txt
11 * Copyright (c) 2016 Erik Andersson <erian747@gmail.com>
12 * All rights reserved.
14 * Redistribution and use in source and binary forms, with or without modification,
15 * are permitted provided that the following conditions are met:
17 * 1. Redistributions of source code must retain the above copyright notice,
18 * this list of conditions and the following disclaimer.
19 * 2. Redistributions in binary form must reproduce the above copyright notice,
20 * this list of conditions and the following disclaimer in the documentation
21 * and/or other materials provided with the distribution.
22 * 3. The name of the author may not be used to endorse or promote products
23 * derived from this software without specific prior written permission.
25 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
26 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
27 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
28 * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
30 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
33 * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
36 * This file is part of the lwIP TCP/IP stack
38 * Author: Erik Andersson <erian747@gmail.com>
42 * - Handle large outgoing payloads for PUBLISH messages
43 * - Fix restriction of a single topic in each (UN)SUBSCRIBE message (protocol has support for multiple topics)
44 * - Add support for legacy MQTT protocol version
46 * Please coordinate changes and requests with Erik Andersson
47 * Erik Andersson <erian747@gmail.com>
50 #include "lwip/apps/mqtt.h"
51 #include "lwip/timeouts.h"
52 #include "lwip/ip_addr.h"
55 #include "lwip/pbuf.h"
59 #if LWIP_TCP && LWIP_CALLBACK_API
62 * MQTT_DEBUG: Default is off.
64 #if !defined MQTT_DEBUG || defined __DOXYGEN__
65 #define MQTT_DEBUG LWIP_DBG_OFF
68 #define MQTT_DEBUG_TRACE (MQTT_DEBUG | LWIP_DBG_TRACE)
69 #define MQTT_DEBUG_STATE (MQTT_DEBUG | LWIP_DBG_STATE)
70 #define MQTT_DEBUG_WARN (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING)
71 #define MQTT_DEBUG_WARN_STATE (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING | LWIP_DBG_STATE)
72 #define MQTT_DEBUG_SERIOUS (MQTT_DEBUG | LWIP_DBG_LEVEL_SERIOUS)
74 static void mqtt_cyclic_timer(void *arg);
77 * MQTT client connection states
87 * MQTT control message types
89 enum mqtt_message_type {
90 MQTT_MSG_TYPE_CONNECT = 1,
91 MQTT_MSG_TYPE_CONNACK = 2,
92 MQTT_MSG_TYPE_PUBLISH = 3,
93 MQTT_MSG_TYPE_PUBACK = 4,
94 MQTT_MSG_TYPE_PUBREC = 5,
95 MQTT_MSG_TYPE_PUBREL = 6,
96 MQTT_MSG_TYPE_PUBCOMP = 7,
97 MQTT_MSG_TYPE_SUBSCRIBE = 8,
98 MQTT_MSG_TYPE_SUBACK = 9,
99 MQTT_MSG_TYPE_UNSUBSCRIBE = 10,
100 MQTT_MSG_TYPE_UNSUBACK = 11,
101 MQTT_MSG_TYPE_PINGREQ = 12,
102 MQTT_MSG_TYPE_PINGRESP = 13,
103 MQTT_MSG_TYPE_DISCONNECT = 14
106 /** Helpers to extract control packet type and qos from first byte in fixed header */
107 #define MQTT_CTL_PACKET_TYPE(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0xf0) >> 4)
108 #define MQTT_CTL_PACKET_QOS(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0x6) >> 1)
111 * MQTT connect flags, only used in CONNECT message
113 enum mqtt_connect_flag {
114 MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
115 MQTT_CONNECT_FLAG_PASSWORD = 1 << 6,
116 MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5,
117 MQTT_CONNECT_FLAG_WILL = 1 << 2,
118 MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
122 #if defined(LWIP_DEBUG)
123 static const char * const mqtt_message_type_str[15] =
143 * Message type value to string
144 * @param msg_type see enum mqtt_message_type
146 * @return Control message type text string
149 mqtt_msg_type_to_str(u8_t msg_type)
151 if (msg_type >= LWIP_ARRAYSIZE(mqtt_message_type_str)) {
154 return mqtt_message_type_str[msg_type];
161 * Generate MQTT packet identifier
162 * @param client MQTT client
163 * @return New packet identifier, range 1 to 65535
166 msg_generate_packet_id(mqtt_client_t *client)
168 client->pkt_id_seq++;
169 if (client->pkt_id_seq == 0) {
170 client->pkt_id_seq++;
172 return client->pkt_id_seq;
175 /*--------------------------------------------------------------------------------------------------------------------- */
176 /* Output ring buffer */
179 #define MQTT_RINGBUF_IDX_MASK ((MQTT_OUTPUT_RINGBUF_SIZE) - 1)
181 /** Add single item to ring buffer */
182 #define mqtt_ringbuf_put(rb, item) ((rb)->buf)[(rb)->put++ & MQTT_RINGBUF_IDX_MASK] = (item)
184 /** Return number of bytes in ring buffer */
185 #define mqtt_ringbuf_len(rb) ((u16_t)((rb)->put - (rb)->get))
187 /** Return number of bytes free in ring buffer */
188 #define mqtt_ringbuf_free(rb) (MQTT_OUTPUT_RINGBUF_SIZE - mqtt_ringbuf_len(rb))
190 /** Return number of bytes possible to read without wrapping around */
191 #define mqtt_ringbuf_linear_read_length(rb) LWIP_MIN(mqtt_ringbuf_len(rb), (MQTT_OUTPUT_RINGBUF_SIZE - ((rb)->get & MQTT_RINGBUF_IDX_MASK)))
193 /** Return pointer to ring buffer get position */
194 #define mqtt_ringbuf_get_ptr(rb) (&(rb)->buf[(rb)->get & MQTT_RINGBUF_IDX_MASK])
196 #define mqtt_ringbuf_advance_get_idx(rb, len) ((rb)->get += (len))
200 * Try send as many bytes as possible from output ring buffer
201 * @param rb Output ring buffer
202 * @param tpcb TCP connection handle
205 mqtt_output_send(struct mqtt_ringbuf_t *rb, struct tcp_pcb *tpcb)
209 u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb);
210 u16_t send_len = tcp_sndbuf(tpcb);
211 LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL);
213 if (send_len == 0 || ringbuf_lin_len == 0) {
217 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_output_send: tcp_sndbuf: %d bytes, ringbuf_linear_available: %d, get %d, put %d\n",
218 send_len, ringbuf_lin_len, ((rb)->get & MQTT_RINGBUF_IDX_MASK), ((rb)->put & MQTT_RINGBUF_IDX_MASK)));
220 if (send_len > ringbuf_lin_len) {
221 /* Space in TCP output buffer is larger than available in ring buffer linear portion */
222 send_len = ringbuf_lin_len;
223 /* Wrap around if more data in ring buffer after linear portion */
224 wrap = (mqtt_ringbuf_len(rb) > ringbuf_lin_len);
226 err = tcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (wrap ? TCP_WRITE_FLAG_MORE : 0));
227 if ((err == ERR_OK) && wrap) {
228 mqtt_ringbuf_advance_get_idx(rb, send_len);
229 /* Use the lesser one of ring buffer linear length and TCP send buffer size */
230 send_len = LWIP_MIN(tcp_sndbuf(tpcb), mqtt_ringbuf_linear_read_length(rb));
231 err = tcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY);
235 mqtt_ringbuf_advance_get_idx(rb, send_len);
239 LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_output_send: Send failed with err %d (\"%s\")\n", err, lwip_strerr(err)));
245 /*--------------------------------------------------------------------------------------------------------------------- */
249 * Create request item
250 * @param r_objs Pointer to request objects
251 * @param pkt_id Packet identifier of request
252 * @param cb Packet callback to call when requests lifetime ends
253 * @param arg Parameter following callback
254 * @return Request or NULL if failed to create
256 static struct mqtt_request_t *
257 mqtt_create_request(struct mqtt_request_t *r_objs, u16_t pkt_id, mqtt_request_cb_t cb, void *arg)
259 struct mqtt_request_t *r = NULL;
261 LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL);
262 for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) {
263 /* Item point to itself if not in use */
264 if (r_objs[n].next == &r_objs[n]) {
278 * Append request to pending request queue
279 * @param tail Pointer to request queue tail pointer
280 * @param r Request to append
283 mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r)
285 struct mqtt_request_t *head = NULL;
286 s16_t time_before = 0;
287 struct mqtt_request_t *iter;
289 LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL);
291 /* Iterate trough queue to find head, and count total timeout time */
292 for (iter = *tail; iter != NULL; iter = iter->next) {
293 time_before += iter->timeout_diff;
297 LWIP_ASSERT("mqtt_append_request: time_before <= MQTT_REQ_TIMEOUT", time_before <= MQTT_REQ_TIMEOUT);
298 r->timeout_diff = MQTT_REQ_TIMEOUT - time_before;
308 * Delete request item
309 * @param r Request item to delete
312 mqtt_delete_request(struct mqtt_request_t *r)
320 * Remove a request item with a specific packet identifier from request queue
321 * @param tail Pointer to request queue tail pointer
322 * @param pkt_id Packet identifier of request to take
323 * @return Request item if found, NULL if not
325 static struct mqtt_request_t *
326 mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id)
328 struct mqtt_request_t *iter = NULL, *prev = NULL;
329 LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL);
330 /* Search all request for pkt_id */
331 for (iter = *tail; iter != NULL; iter = iter->next) {
332 if (iter->pkt_id == pkt_id) {
338 /* If request was found */
344 prev->next = iter->next;
346 /* If exists, add remaining timeout time for the request to next */
347 if (iter->next != NULL) {
348 iter->next->timeout_diff += iter->timeout_diff;
356 * Handle requests timeout
357 * @param tail Pointer to request queue tail pointer
358 * @param t Time since last call in seconds
361 mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t)
363 struct mqtt_request_t *r;
364 LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL);
366 while (t > 0 && r != NULL) {
367 if (t >= r->timeout_diff) {
368 t -= (u8_t)r->timeout_diff;
371 /* Notify upper layer about timeout */
373 r->cb(r->arg, ERR_TIMEOUT);
375 mqtt_delete_request(r);
376 /* Tail might be be modified in callback, so re-read it in every iteration */
377 r = *(struct mqtt_request_t * const volatile *)tail;
379 r->timeout_diff -= t;
386 * Free all request items
387 * @param tail Pointer to request queue tail pointer
390 mqtt_clear_requests(struct mqtt_request_t **tail)
392 struct mqtt_request_t *iter, *next;
393 LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL);
394 for (iter = *tail; iter != NULL; iter = next) {
396 mqtt_delete_request(iter);
401 * Initialize all request items
402 * @param r_objs Pointer to request objects
405 mqtt_init_requests(struct mqtt_request_t *r_objs)
408 LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL);
409 for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) {
410 /* Item pointing to itself indicates unused */
411 r_objs[n].next = &r_objs[n];
415 /*--------------------------------------------------------------------------------------------------------------------- */
416 /* Output message build helpers */
420 mqtt_output_append_u8(struct mqtt_ringbuf_t *rb, u8_t value)
422 mqtt_ringbuf_put(rb, value);
426 void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value)
428 mqtt_ringbuf_put(rb, value >> 8);
429 mqtt_ringbuf_put(rb, value & 0xff);
433 mqtt_output_append_buf(struct mqtt_ringbuf_t *rb, const void *data, u16_t length)
436 for (n = 0; n < length; n++) {
437 mqtt_ringbuf_put(rb, ((const u8_t *)data)[n]);
442 mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length)
445 mqtt_ringbuf_put(rb, length >> 8);
446 mqtt_ringbuf_put(rb, length & 0xff);
447 for (n = 0; n < length; n++) {
448 mqtt_ringbuf_put(rb, str[n]);
453 * Append fixed header
454 * @param rb Output ring buffer
455 * @param msg_type see enum mqtt_message_type
456 * @param dup MQTT DUP flag
457 * @param qos MQTT QoS field
458 * @param retain MQTT retain flag
459 * @param r_length Remaining length after fixed header
463 mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t dup,
464 u8_t qos, u8_t retain, u16_t r_length)
466 /* Start with control byte */
467 mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1)));
468 /* Encode remaining length field */
470 mqtt_output_append_u8(rb, (r_length & 0x7f) | (r_length >= 128 ? 0x80 : 0));
472 } while (r_length > 0);
477 * Check output buffer space
478 * @param rb Output ring buffer
479 * @param r_length Remaining length after fixed header
480 * @return 1 if message will fit, 0 if not enough buffer space
483 mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length)
485 /* Start with length of type byte + remaining length */
486 u16_t total_len = 1 + r_length;
488 LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL);
490 /* Calculate number of required bytes to contain the remaining bytes field and add to total*/
494 } while (r_length > 0);
496 return (total_len <= mqtt_ringbuf_free(rb));
501 * Close connection to server
502 * @param client MQTT client
503 * @param reason Reason for disconnection
506 mqtt_close(mqtt_client_t *client, mqtt_connection_status_t reason)
508 LWIP_ASSERT("mqtt_close: client != NULL", client != NULL);
510 /* Bring down TCP connection if not already done */
511 if (client->conn != NULL) {
513 tcp_recv(client->conn, NULL);
514 tcp_err(client->conn, NULL);
515 tcp_sent(client->conn, NULL);
516 res = tcp_close(client->conn);
518 tcp_abort(client->conn);
519 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_close: Close err=%s\n", lwip_strerr(res)));
524 /* Remove all pending requests */
525 mqtt_clear_requests(&client->pend_req_queue);
526 /* Stop cyclic timer */
527 sys_untimeout(mqtt_cyclic_timer, client);
529 /* Notify upper layer of disconnection if changed state */
530 if (client->conn_state != TCP_DISCONNECTED) {
532 client->conn_state = TCP_DISCONNECTED;
533 if (client->connect_cb != NULL) {
534 client->connect_cb(client, client->connect_arg, reason);
541 * Interval timer, called every MQTT_CYCLIC_TIMER_INTERVAL seconds in MQTT_CONNECTING and MQTT_CONNECTED states
542 * @param arg MQTT client
545 mqtt_cyclic_timer(void *arg)
547 u8_t restart_timer = 1;
548 mqtt_client_t *client = (mqtt_client_t *)arg;
549 LWIP_ASSERT("mqtt_cyclic_timer: client != NULL", client != NULL);
551 if (client->conn_state == MQTT_CONNECTING) {
552 client->cyclic_tick++;
553 if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= MQTT_CONNECT_TIMOUT) {
554 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_cyclic_timer: CONNECT attempt to server timed out\n"));
556 mqtt_close(client, MQTT_CONNECT_TIMEOUT);
559 } else if (client->conn_state == MQTT_CONNECTED) {
560 /* Handle timeout for pending requests */
561 mqtt_request_time_elapsed(&client->pend_req_queue, MQTT_CYCLIC_TIMER_INTERVAL);
563 /* keep_alive > 0 means keep alive functionality shall be used */
564 if (client->keep_alive > 0) {
566 client->server_watchdog++;
567 /* If reception from server has been idle for 1.5*keep_alive time, server is considered unresponsive */
568 if ((client->server_watchdog * MQTT_CYCLIC_TIMER_INTERVAL) > (client->keep_alive + client->keep_alive/2)) {
569 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_cyclic_timer: Server incoming keep-alive timeout\n"));
570 mqtt_close(client, MQTT_CONNECT_TIMEOUT);
574 /* If time for a keep alive message to be sent, transmission has been idle for keep_alive time */
575 if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= client->keep_alive) {
576 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_cyclic_timer: Sending keep-alive message to server\n"));
577 if (mqtt_output_check_space(&client->output, 0) != 0) {
578 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0, 0);
579 client->cyclic_tick = 0;
582 client->cyclic_tick++;
586 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_cyclic_timer: Timer should not be running in state %d\n", client->conn_state));
590 sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL*1000, mqtt_cyclic_timer, arg);
596 * Send PUBACK, PUBREC or PUBREL response message
597 * @param client MQTT client
598 * @param msg PUBACK, PUBREC or PUBREL
599 * @param pkt_id Packet identifier
600 * @param qos QoS value
601 * @return ERR_OK if successful, ERR_MEM if out of memory
604 pub_ack_rec_rel_response(mqtt_client_t *client, u8_t msg, u16_t pkt_id, u8_t qos)
607 if (mqtt_output_check_space(&client->output, 2)) {
608 mqtt_output_append_fixed_header(&client->output, msg, 0, qos, 0, 2);
609 mqtt_output_append_u16(&client->output, pkt_id);
610 mqtt_output_send(&client->output, client->conn);
612 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("pub_ack_rec_rel_response: OOM creating response: %s with pkt_id: %d\n",
613 mqtt_msg_type_to_str(msg), pkt_id));
620 * Subscribe response from server
621 * @param r Matching request
622 * @param result Result code from server
625 mqtt_incomming_suback(struct mqtt_request_t *r, u8_t result)
628 r->cb(r->arg, result < 3 ? ERR_OK : ERR_ABRT);
634 * Complete MQTT message received or buffer full
635 * @param client MQTT client
636 * @param fixed_hdr_idx header index
637 * @param length length received part
638 * @param remaining_length Remaining length of complete message
640 static mqtt_connection_status_t
641 mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_idx, u16_t length, u32_t remaining_length)
643 mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED;
645 u8_t *var_hdr_payload = client->rx_buffer + fixed_hdr_idx;
647 /* Control packet type */
648 u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]);
651 if (pkt_type == MQTT_MSG_TYPE_CONNACK) {
652 if (client->conn_state == MQTT_CONNECTING) {
653 /* Get result code from CONNACK */
654 res = (mqtt_connection_status_t)var_hdr_payload[1];
655 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: Connect response code %d\n", res));
656 if (res == MQTT_CONNECT_ACCEPTED) {
657 /* Reset cyclic_tick when changing to connected state */
658 client->cyclic_tick = 0;
659 client->conn_state = MQTT_CONNECTED;
660 /* Notify upper layer */
661 if (client->connect_cb != 0) {
662 client->connect_cb(client, client->connect_arg, res);
666 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Received CONNACK in connected state\n"));
668 } else if (pkt_type == MQTT_MSG_TYPE_PINGRESP) {
669 LWIP_DEBUGF(MQTT_DEBUG_TRACE,( "mqtt_message_received: Received PINGRESP from server\n"));
671 } else if (pkt_type == MQTT_MSG_TYPE_PUBLISH) {
672 u16_t payload_offset = 0;
673 u16_t payload_length = length;
674 u8_t qos = MQTT_CTL_PACKET_QOS(client->rx_buffer[0]);
676 if (client->msg_idx <= MQTT_VAR_HEADER_BUFFER_LEN) {
677 /* Should have topic and pkt id*/
679 uint16_t after_topic;
681 u16_t topic_len = var_hdr_payload[0];
682 topic_len = (topic_len << 8) + (u16_t)(var_hdr_payload[1]);
684 topic = var_hdr_payload + 2;
685 after_topic = 2 + topic_len;
686 /* Check length, add one byte even for QoS 0 so that zero termination will fit */
687 if ((after_topic + (qos? 2 : 1)) > length) {
688 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Receive buffer can not fit topic + pkt_id\n"));
692 /* id for QoS 1 and 2 */
694 client->inpub_pkt_id = ((u16_t)var_hdr_payload[after_topic] << 8) + (u16_t)var_hdr_payload[after_topic + 1];
697 client->inpub_pkt_id = 0;
699 /* Take backup of byte after topic */
700 bkp = topic[topic_len];
701 /* Zero terminate string */
702 topic[topic_len] = 0;
703 /* Payload data remaining in receive buffer */
704 payload_length = length - after_topic;
705 payload_offset = after_topic;
707 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_incomming_publish: Received message with QoS %d at topic: %s, payload length %d\n",
708 qos, topic, remaining_length + payload_length));
709 if (client->pub_cb != NULL) {
710 client->pub_cb(client->inpub_arg, (const char *)topic, remaining_length + payload_length);
712 /* Restore byte after topic */
713 topic[topic_len] = bkp;
715 if (payload_length > 0 || remaining_length == 0) {
716 client->data_cb(client->inpub_arg, var_hdr_payload + payload_offset, payload_length, remaining_length == 0 ? MQTT_DATA_FLAG_LAST : 0);
717 /* Reply if QoS > 0 */
718 if (remaining_length == 0 && qos > 0) {
719 /* Send PUBACK for QoS 1 or PUBREC for QoS 2 */
720 u8_t resp_msg = (qos == 1) ? MQTT_MSG_TYPE_PUBACK : MQTT_MSG_TYPE_PUBREC;
721 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_incomming_publish: Sending publish response: %s with pkt_id: %d\n",
722 mqtt_msg_type_to_str(resp_msg), client->inpub_pkt_id));
723 pub_ack_rec_rel_response(client, resp_msg, client->inpub_pkt_id, 0);
727 /* Get packet identifier */
728 pkt_id = (u16_t)var_hdr_payload[0] << 8;
729 pkt_id |= (u16_t)var_hdr_payload[1];
731 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Got message with illegal packet identifier: 0\n"));
734 if (pkt_type == MQTT_MSG_TYPE_PUBREC) {
735 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: PUBREC, sending PUBREL with pkt_id: %d\n",pkt_id));
736 pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBREL, pkt_id, 1);
738 } else if (pkt_type == MQTT_MSG_TYPE_PUBREL) {
739 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: PUBREL, sending PUBCOMP response with pkt_id: %d\n",pkt_id));
740 pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBCOMP, pkt_id, 0);
742 } else if (pkt_type == MQTT_MSG_TYPE_SUBACK || pkt_type == MQTT_MSG_TYPE_UNSUBACK ||
743 pkt_type == MQTT_MSG_TYPE_PUBCOMP || pkt_type == MQTT_MSG_TYPE_PUBACK) {
744 struct mqtt_request_t *r = mqtt_take_request(&client->pend_req_queue, pkt_id);
746 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: %s response with id %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
747 if (pkt_type == MQTT_MSG_TYPE_SUBACK) {
749 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: To small SUBACK packet\n"));
752 mqtt_incomming_suback(r, var_hdr_payload[2]);
754 } else if (r->cb != NULL) {
755 r->cb(r->arg, ERR_OK);
757 mqtt_delete_request(r);
759 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received %s reply, with wrong pkt_id: %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
762 LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received unknown message type: %d\n", pkt_type));
768 return MQTT_CONNECT_DISCONNECTED;
773 * MQTT incoming message parser
774 * @param client MQTT client
775 * @param p PBUF chain of received data
776 * @return Connection status
778 static mqtt_connection_status_t
779 mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p)
782 u32_t msg_rem_len = 0;
783 u8_t fixed_hdr_idx = 0;
786 while (p->tot_len > in_offset) {
787 if ((fixed_hdr_idx < 2) || ((b & 0x80) != 0)) {
789 if (fixed_hdr_idx < client->msg_idx) {
790 b = client->rx_buffer[fixed_hdr_idx];
792 b = pbuf_get_at(p, in_offset++);
793 client->rx_buffer[client->msg_idx++] = b;
797 if (fixed_hdr_idx >= 2) {
798 msg_rem_len |= (u32_t)(b & 0x7f) << ((fixed_hdr_idx - 2) * 7);
799 if ((b & 0x80) == 0) {
800 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_parse_incoming: Remaining length after fixed header: %d\n", msg_rem_len));
801 if (msg_rem_len == 0) {
802 /* Complete message with no extra headers of payload received */
803 mqtt_message_received(client, fixed_hdr_idx, 0, 0);
807 /* Bytes remaining in message */
808 msg_rem_len = (msg_rem_len + fixed_hdr_idx) - client->msg_idx;
813 u16_t cpy_len, cpy_start, buffer_space;
815 cpy_start = (client->msg_idx - fixed_hdr_idx) % (MQTT_VAR_HEADER_BUFFER_LEN - fixed_hdr_idx) + fixed_hdr_idx;
817 /* Allow to copy the lesser one of available length in input data or bytes remaining in message */
818 cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len);
820 /* Limit to available space in buffer */
821 buffer_space = MQTT_VAR_HEADER_BUFFER_LEN - cpy_start;
822 if (cpy_len > buffer_space) {
823 cpy_len = buffer_space;
825 pbuf_copy_partial(p, client->rx_buffer+cpy_start, cpy_len, in_offset);
827 /* Advance get and put indexes */
828 client->msg_idx += cpy_len;
829 in_offset += cpy_len;
830 msg_rem_len -= cpy_len;
832 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_parse_incoming: msg_idx: %d, cpy_len: %d, remaining %d\n", client->msg_idx, cpy_len, msg_rem_len));
833 if (msg_rem_len == 0 || cpy_len == buffer_space) {
834 /* Whole message received or buffer is full */
835 mqtt_connection_status_t res = mqtt_message_received(client, fixed_hdr_idx, (cpy_start + cpy_len) - fixed_hdr_idx, msg_rem_len);
836 if (res != MQTT_CONNECT_ACCEPTED) {
839 if (msg_rem_len == 0) {
840 /* Reset parser state */
842 /* msg_tot_len = 0; */
848 return MQTT_CONNECT_ACCEPTED;
853 * TCP received callback function. @see tcp_recv_fn
854 * @param arg MQTT client
855 * @param p PBUF chain of received data
856 * @param err Passed as return value if not ERR_OK
857 * @return ERR_OK or err passed into callback
860 mqtt_tcp_recv_cb(void *arg, struct tcp_pcb *pcb, struct pbuf *p, err_t err)
862 mqtt_client_t *client = (mqtt_client_t *)arg;
863 LWIP_ASSERT("mqtt_tcp_recv_cb: client != NULL", client != NULL);
864 LWIP_ASSERT("mqtt_tcp_recv_cb: client->conn == pcb", client->conn == pcb);
867 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_recv_cb: Recv pbuf=NULL, remote has closed connection\n"));
868 mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
870 mqtt_connection_status_t res;
872 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_recv_cb: Recv err=%d\n", err));
877 /* Tell remote that data has been received */
878 tcp_recved(pcb, p->tot_len);
879 res = mqtt_parse_incoming(client, p);
882 if (res != MQTT_CONNECT_ACCEPTED) {
883 mqtt_close(client, res);
885 /* If keep alive functionality is used */
886 if (client->keep_alive != 0) {
887 /* Reset server alive watchdog */
888 client->server_watchdog = 0;
897 * TCP data sent callback function. @see tcp_sent_fn
898 * @param arg MQTT client
899 * @param tpcb TCP connection handle
900 * @param len Number of bytes sent
904 mqtt_tcp_sent_cb(void *arg, struct tcp_pcb *tpcb, u16_t len)
906 mqtt_client_t *client = (mqtt_client_t *)arg;
908 LWIP_UNUSED_ARG(tpcb);
909 LWIP_UNUSED_ARG(len);
911 if (client->conn_state == MQTT_CONNECTED) {
912 struct mqtt_request_t *r;
914 /* Reset keep-alive send timer and server watchdog */
915 client->cyclic_tick = 0;
916 client->server_watchdog = 0;
917 /* QoS 0 publish has no response from server, so call its callbacks here */
918 while ((r = mqtt_take_request(&client->pend_req_queue, 0)) != NULL) {
919 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_sent_cb: Calling QoS 0 publish complete callback\n"));
921 r->cb(r->arg, ERR_OK);
923 mqtt_delete_request(r);
925 /* Try send any remaining buffers from output queue */
926 mqtt_output_send(&client->output, client->conn);
932 * TCP error callback function. @see tcp_err_fn
933 * @param arg MQTT client
934 * @param err Error encountered
937 mqtt_tcp_err_cb(void *arg, err_t err)
939 mqtt_client_t *client = (mqtt_client_t *)arg;
940 LWIP_UNUSED_ARG(err); /* only used for debug output */
941 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_err_cb: TCP error callback: error %d, arg: %p\n", err, arg));
942 LWIP_ASSERT("mqtt_tcp_err_cb: client != NULL", client != NULL);
943 /* Set conn to null before calling close as pcb is already deallocated*/
945 mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
949 * TCP poll callback function. @see tcp_poll_fn
950 * @param arg MQTT client
951 * @param tpcb TCP connection handle
955 mqtt_tcp_poll_cb(void *arg, struct tcp_pcb *tpcb)
957 mqtt_client_t *client = (mqtt_client_t *)arg;
958 if (client->conn_state == MQTT_CONNECTED) {
959 /* Try send any remaining buffers from output queue */
960 mqtt_output_send(&client->output, tpcb);
966 * TCP connect callback function. @see tcp_connected_fn
967 * @param arg MQTT client
968 * @param err Always ERR_OK, mqtt_tcp_err_cb is called in case of error
972 mqtt_tcp_connect_cb(void *arg, struct tcp_pcb *tpcb, err_t err)
974 mqtt_client_t* client = (mqtt_client_t *)arg;
977 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_connect_cb: TCP connect error %d\n", err));
981 /* Initiate receiver state */
984 /* Setup TCP callbacks */
985 tcp_recv(tpcb, mqtt_tcp_recv_cb);
986 tcp_sent(tpcb, mqtt_tcp_sent_cb);
987 tcp_poll(tpcb, mqtt_tcp_poll_cb, 2);
989 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_connect_cb: TCP connection established to server\n"));
990 /* Enter MQTT connect state */
991 client->conn_state = MQTT_CONNECTING;
993 /* Start cyclic timer */
994 sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL*1000, mqtt_cyclic_timer, client);
995 client->cyclic_tick = 0;
997 /* Start transmission from output queue, connect message is the first one out*/
998 mqtt_output_send(&client->output, client->conn);
1005 /*---------------------------------------------------------------------------------------------------- */
1011 * MQTT publish function.
1012 * @param client MQTT client
1013 * @param topic Publish topic string
1014 * @param payload Data to publish (NULL is allowed)
1015 * @param payload_length: Length of payload (0 is allowed)
1016 * @param qos Quality of service, 0 1 or 2
1017 * @param retain MQTT retain flag
1018 * @param cb Callback to call when publish is complete or has timed out
1019 * @param arg User supplied argument to publish callback
1020 * @return ERR_OK if successful
1021 * ERR_CONN if client is disconnected
1022 * ERR_MEM if short on memory
1025 mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain,
1026 mqtt_request_cb_t cb, void *arg)
1028 struct mqtt_request_t *r;
1030 size_t topic_strlen;
1033 u16_t remaining_length;
1035 LWIP_ASSERT("mqtt_publish: client != NULL", client);
1036 LWIP_ASSERT("mqtt_publish: topic != NULL", topic);
1037 LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN);
1039 topic_strlen = strlen(topic);
1040 LWIP_ERROR("mqtt_publish: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
1041 topic_len = (u16_t)topic_strlen;
1042 total_len = 2 + topic_len + payload_length;
1043 LWIP_ERROR("mqtt_publish: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
1044 remaining_length = (u16_t)total_len;
1046 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic));
1049 remaining_length += 2;
1050 /* Generate pkt_id id for QoS1 and 2 */
1051 pkt_id = msg_generate_packet_id(client);
1053 /* Use reserved value pkt_id 0 for QoS 0 in request handle */
1057 r = mqtt_create_request(client->req_list, pkt_id, cb, arg);
1062 if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1063 mqtt_delete_request(r);
1066 /* Append fixed header */
1067 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain, remaining_length);
1070 mqtt_output_append_string(&client->output, topic, topic_len);
1072 /* Append packet if for QoS 1 and 2*/
1074 mqtt_output_append_u16(&client->output, pkt_id);
1077 /* Append optional publish payload */
1078 if ((payload != NULL) && (payload_length > 0)) {
1079 mqtt_output_append_buf(&client->output, payload, payload_length);
1082 mqtt_append_request(&client->pend_req_queue, r);
1083 mqtt_output_send(&client->output, client->conn);
1090 * MQTT subscribe/unsubscribe function.
1091 * @param client MQTT client
1092 * @param topic topic to subscribe to
1093 * @param qos Quality of service, 0 1 or 2 (only used for subscribe)
1094 * @param cb Callback to call when subscribe/unsubscribe reponse is received
1095 * @param arg User supplied argument to publish callback
1096 * @param sub 1 for subscribe, 0 for unsubscribe
1097 * @return ERR_OK if successful, @see err_t enum for other results
1100 mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub)
1102 size_t topic_strlen;
1105 u16_t remaining_length;
1107 struct mqtt_request_t *r;
1109 LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client);
1110 LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic);
1112 topic_strlen = strlen(topic);
1113 LWIP_ERROR("mqtt_sub_unsub: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
1114 topic_len = (u16_t)topic_strlen;
1115 /* Topic string, pkt_id, qos for subscribe */
1116 total_len = topic_len + 2 + 2 + (sub != 0);
1117 LWIP_ERROR("mqtt_sub_unsub: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
1118 remaining_length = (u16_t)total_len;
1120 LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3);
1121 if (client->conn_state == TCP_DISCONNECTED) {
1122 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_sub_unsub: Can not (un)subscribe in disconnected state\n"));
1126 pkt_id = msg_generate_packet_id(client);
1127 r = mqtt_create_request(client->req_list, pkt_id, cb, arg);
1132 if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1133 mqtt_delete_request(r);
1137 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_sub_unsub: Client (un)subscribe to topic \"%s\", id: %d\n", topic, pkt_id));
1139 mqtt_output_append_fixed_header(&client->output, sub ? MQTT_MSG_TYPE_SUBSCRIBE : MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0, remaining_length);
1141 mqtt_output_append_u16(&client->output, pkt_id);
1143 mqtt_output_append_string(&client->output, topic, topic_len);
1146 mqtt_output_append_u8(&client->output, LWIP_MIN(qos, 2));
1149 mqtt_append_request(&client->pend_req_queue, r);
1150 mqtt_output_send(&client->output, client->conn);
1157 * Set callback to handle incoming publish requests from server
1158 * @param client MQTT client
1159 * @param pub_cb Callback invoked when publish starts, contain topic and total length of payload
1160 * @param data_cb Callback for each fragment of payload that arrives
1161 * @param arg User supplied argument to both callbacks
1164 mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb,
1165 mqtt_incoming_data_cb_t data_cb, void *arg)
1167 LWIP_ASSERT("mqtt_set_inpub_callback: client != NULL", client != NULL);
1168 client->data_cb = data_cb;
1169 client->pub_cb = pub_cb;
1170 client->inpub_arg = arg;
1175 * Create a new MQTT client instance
1176 * @return Pointer to instance on success, NULL otherwise
1179 mqtt_client_new(void)
1181 mqtt_client_t *client = (mqtt_client_t *)mem_malloc(sizeof(mqtt_client_t));
1182 if (client != NULL) {
1183 memset(client, 0, sizeof(mqtt_client_t));
1191 * Connect to MQTT server
1192 * @param client MQTT client
1193 * @param ip_addr Server IP
1194 * @param port Server port
1195 * @param cb Connection state change callback
1196 * @param arg User supplied argument to connection callback
1197 * @param client_info Client identification and connection options
1198 * @return ERR_OK if successful, @see err_t enum for other results
1201 mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port, mqtt_connection_cb_t cb, void *arg,
1202 const struct mqtt_connect_client_info_t *client_info)
1206 u16_t client_id_length;
1207 /* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */
1208 u16_t remaining_length = 2 + 4 + 1 + 1 + 2;
1209 u8_t flags = 0, will_topic_len = 0, will_msg_len = 0;
1211 LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL);
1212 LWIP_ASSERT("mqtt_client_connect: ip_addr != NULL", ip_addr != NULL);
1213 LWIP_ASSERT("mqtt_client_connect: client_info != NULL", client_info != NULL);
1214 LWIP_ASSERT("mqtt_client_connect: client_info->client_id != NULL", client_info->client_id != NULL);
1216 if (client->conn_state != TCP_DISCONNECTED) {
1217 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Already connected\n"));
1222 memset(client, 0, sizeof(mqtt_client_t));
1223 client->connect_arg = arg;
1224 client->connect_cb = cb;
1225 client->keep_alive = client_info->keep_alive;
1226 mqtt_init_requests(client->req_list);
1228 /* Build connect message */
1229 if (client_info->will_topic != NULL && client_info->will_msg != NULL) {
1230 flags |= MQTT_CONNECT_FLAG_WILL;
1231 flags |= (client_info->will_qos & 3) << 3;
1232 if (client_info->will_retain) {
1233 flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;
1235 len = strlen(client_info->will_topic);
1236 LWIP_ERROR("mqtt_client_connect: client_info->will_topic length overflow", len <= 0xFF, return ERR_VAL);
1237 LWIP_ERROR("mqtt_client_connect: client_info->will_topic length must be > 0", len > 0, return ERR_VAL);
1238 will_topic_len = (u8_t)len;
1239 len = strlen(client_info->will_msg);
1240 LWIP_ERROR("mqtt_client_connect: client_info->will_msg length overflow", len <= 0xFF, return ERR_VAL);
1241 will_msg_len = (u8_t)len;
1242 len = remaining_length + 2 + will_topic_len + 2 + will_msg_len;
1243 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1244 remaining_length = (u16_t)len;
1247 /* Don't complicate things, always connect using clean session */
1248 flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION;
1250 len = strlen(client_info->client_id);
1251 LWIP_ERROR("mqtt_client_connect: client_info->client_id length overflow", len <= 0xFFFF, return ERR_VAL);
1252 client_id_length = (u16_t)len;
1253 len = remaining_length + 2 + client_id_length;
1254 LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1255 remaining_length = (u16_t)len;
1257 if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1261 client->conn = tcp_new();
1262 if (client->conn == NULL) {
1266 /* Set arg pointer for callbacks */
1267 tcp_arg(client->conn, client);
1268 /* Any local address, pick random local port number */
1269 err = tcp_bind(client->conn, IP_ADDR_ANY, 0);
1270 if (err != ERR_OK) {
1271 LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Error binding to local ip/port, %d\n", err));
1274 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Connecting to host: %s at port:%"U16_F"\n", ipaddr_ntoa(ip_addr), port));
1276 /* Connect to server */
1277 err = tcp_connect(client->conn, ip_addr, port, mqtt_tcp_connect_cb);
1278 if (err != ERR_OK) {
1279 LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Error connecting to remote ip/port, %d\n", err));
1282 /* Set error callback */
1283 tcp_err(client->conn, mqtt_tcp_err_cb);
1284 client->conn_state = TCP_CONNECTING;
1286 /* Append fixed header */
1287 mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_CONNECT, 0, 0, 0, remaining_length);
1288 /* Append Protocol string */
1289 mqtt_output_append_string(&client->output, "MQTT", 4);
1290 /* Append Protocol level */
1291 mqtt_output_append_u8(&client->output, 4);
1292 /* Append connect flags */
1293 mqtt_output_append_u8(&client->output, flags);
1294 /* Append keep-alive */
1295 mqtt_output_append_u16(&client->output, client_info->keep_alive);
1296 /* Append client id */
1297 mqtt_output_append_string(&client->output, client_info->client_id, client_id_length);
1298 /* Append will message if used */
1299 if ((flags & MQTT_CONNECT_FLAG_WILL) != 0) {
1300 mqtt_output_append_string(&client->output, client_info->will_topic, will_topic_len);
1301 mqtt_output_append_string(&client->output, client_info->will_msg, will_msg_len);
1306 tcp_abort(client->conn);
1307 client->conn = NULL;
1314 * Disconnect from MQTT server
1315 * @param client MQTT client
1318 mqtt_disconnect(mqtt_client_t *client)
1320 LWIP_ASSERT("mqtt_disconnect: client != NULL", client);
1321 /* If connection in not already closed */
1322 if (client->conn_state != TCP_DISCONNECTED) {
1323 /* Set conn_state before calling mqtt_close to prevent callback from being called */
1324 client->conn_state = TCP_DISCONNECTED;
1325 mqtt_close(client, (mqtt_connection_status_t)0);
1331 * Check connection with server
1332 * @param client MQTT client
1333 * @return 1 if connected to server, 0 otherwise
1336 mqtt_client_is_connected(mqtt_client_t *client)
1338 LWIP_ASSERT("mqtt_client_is_connected: client != NULL", client);
1339 return client->conn_state == MQTT_CONNECTED;
1342 #endif /* LWIP_TCP && LWIP_CALLBACK_API */