Fix for x86_64 build fail
[platform/upstream/connectedhomeip.git] / third_party / lwip / repo / lwip / src / apps / mqtt / mqtt.c
1 /**
2  * @file
3  * MQTT client
4  *
5  * @defgroup mqtt MQTT client
6  * @ingroup apps
7  * @verbinclude mqtt_client.txt
8  */
9
10 /*
11  * Copyright (c) 2016 Erik Andersson <erian747@gmail.com>
12  * All rights reserved.
13  *
14  * Redistribution and use in source and binary forms, with or without modification,
15  * are permitted provided that the following conditions are met:
16  *
17  * 1. Redistributions of source code must retain the above copyright notice,
18  *    this list of conditions and the following disclaimer.
19  * 2. Redistributions in binary form must reproduce the above copyright notice,
20  *    this list of conditions and the following disclaimer in the documentation
21  *    and/or other materials provided with the distribution.
22  * 3. The name of the author may not be used to endorse or promote products
23  *    derived from this software without specific prior written permission.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
26  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
27  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
28  * SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
30  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
33  * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
34  * OF SUCH DAMAGE.
35  *
36  * This file is part of the lwIP TCP/IP stack
37  *
38  * Author: Erik Andersson <erian747@gmail.com>
39  *
40  *
41  * @todo:
42  * - Handle large outgoing payloads for PUBLISH messages
43  * - Fix restriction of a single topic in each (UN)SUBSCRIBE message (protocol has support for multiple topics)
44  * - Add support for legacy MQTT protocol version
45  *
46  * Please coordinate changes and requests with Erik Andersson
47  * Erik Andersson <erian747@gmail.com>
48  *
49  */
50 #include "lwip/apps/mqtt.h"
51 #include "lwip/timeouts.h"
52 #include "lwip/ip_addr.h"
53 #include "lwip/mem.h"
54 #include "lwip/err.h"
55 #include "lwip/pbuf.h"
56 #include "lwip/tcp.h"
57 #include <string.h>
58
59 #if LWIP_TCP && LWIP_CALLBACK_API
60
61 /**
62  * MQTT_DEBUG: Default is off.
63  */
64 #if !defined MQTT_DEBUG || defined __DOXYGEN__
65 #define MQTT_DEBUG                  LWIP_DBG_OFF
66 #endif
67
68 #define MQTT_DEBUG_TRACE        (MQTT_DEBUG | LWIP_DBG_TRACE)
69 #define MQTT_DEBUG_STATE        (MQTT_DEBUG | LWIP_DBG_STATE)
70 #define MQTT_DEBUG_WARN         (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING)
71 #define MQTT_DEBUG_WARN_STATE   (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING | LWIP_DBG_STATE)
72 #define MQTT_DEBUG_SERIOUS      (MQTT_DEBUG | LWIP_DBG_LEVEL_SERIOUS)
73
74 static void mqtt_cyclic_timer(void *arg);
75
76 /**
77  * MQTT client connection states
78  */
79 enum {
80   TCP_DISCONNECTED,
81   TCP_CONNECTING,
82   MQTT_CONNECTING,
83   MQTT_CONNECTED
84 };
85
86 /**
87  * MQTT control message types
88  */
89 enum mqtt_message_type {
90   MQTT_MSG_TYPE_CONNECT = 1,
91   MQTT_MSG_TYPE_CONNACK = 2,
92   MQTT_MSG_TYPE_PUBLISH = 3,
93   MQTT_MSG_TYPE_PUBACK = 4,
94   MQTT_MSG_TYPE_PUBREC = 5,
95   MQTT_MSG_TYPE_PUBREL = 6,
96   MQTT_MSG_TYPE_PUBCOMP = 7,
97   MQTT_MSG_TYPE_SUBSCRIBE = 8,
98   MQTT_MSG_TYPE_SUBACK = 9,
99   MQTT_MSG_TYPE_UNSUBSCRIBE = 10,
100   MQTT_MSG_TYPE_UNSUBACK = 11,
101   MQTT_MSG_TYPE_PINGREQ = 12,
102   MQTT_MSG_TYPE_PINGRESP = 13,
103   MQTT_MSG_TYPE_DISCONNECT = 14
104 };
105
106 /** Helpers to extract control packet type and qos from first byte in fixed header */
107 #define MQTT_CTL_PACKET_TYPE(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0xf0) >> 4)
108 #define MQTT_CTL_PACKET_QOS(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0x6) >> 1)
109
110 /**
111  * MQTT connect flags, only used in CONNECT message
112  */
113 enum mqtt_connect_flag {
114   MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
115   MQTT_CONNECT_FLAG_PASSWORD = 1 << 6,
116   MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5,
117   MQTT_CONNECT_FLAG_WILL = 1 << 2,
118   MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
119 };
120
121
122 #if defined(LWIP_DEBUG)
123 static const char * const mqtt_message_type_str[15] =
124 {
125   "UNDEFINED",
126   "CONNECT",
127   "CONNACK",
128   "PUBLISH",
129   "PUBACK",
130   "PUBREC",
131   "PUBREL",
132   "PUBCOMP",
133   "SUBSCRIBE",
134   "SUBACK",
135   "UNSUBSCRIBE",
136   "UNSUBACK",
137   "PINGREQ",
138   "PINGRESP",
139   "DISCONNECT"
140 };
141
142 /**
143  * Message type value to string
144  * @param msg_type see enum mqtt_message_type
145  * 
146  * @return Control message type text string
147  */
148 static const char *
149 mqtt_msg_type_to_str(u8_t msg_type)
150 {
151   if (msg_type >= LWIP_ARRAYSIZE(mqtt_message_type_str)) {
152     msg_type = 0;
153   }
154   return mqtt_message_type_str[msg_type];
155 }
156
157 #endif
158
159
160 /**
161  * Generate MQTT packet identifier
162  * @param client MQTT client
163  * @return New packet identifier, range 1 to 65535
164  */
165 static u16_t
166 msg_generate_packet_id(mqtt_client_t *client)
167 {
168   client->pkt_id_seq++;
169   if (client->pkt_id_seq == 0) {
170     client->pkt_id_seq++;
171   }
172   return client->pkt_id_seq;
173 }
174
175 /*--------------------------------------------------------------------------------------------------------------------- */
176 /* Output ring buffer */
177
178
179 #define MQTT_RINGBUF_IDX_MASK ((MQTT_OUTPUT_RINGBUF_SIZE) - 1)
180
181 /** Add single item to ring buffer */
182 #define mqtt_ringbuf_put(rb, item) ((rb)->buf)[(rb)->put++ & MQTT_RINGBUF_IDX_MASK] = (item)
183
184 /** Return number of bytes in ring buffer */
185 #define mqtt_ringbuf_len(rb) ((u16_t)((rb)->put - (rb)->get))
186
187 /** Return number of bytes free in ring buffer */
188 #define mqtt_ringbuf_free(rb) (MQTT_OUTPUT_RINGBUF_SIZE - mqtt_ringbuf_len(rb))
189
190 /** Return number of bytes possible to read without wrapping around */
191 #define mqtt_ringbuf_linear_read_length(rb) LWIP_MIN(mqtt_ringbuf_len(rb), (MQTT_OUTPUT_RINGBUF_SIZE - ((rb)->get & MQTT_RINGBUF_IDX_MASK)))
192
193 /** Return pointer to ring buffer get position */
194 #define mqtt_ringbuf_get_ptr(rb) (&(rb)->buf[(rb)->get & MQTT_RINGBUF_IDX_MASK])
195
196 #define mqtt_ringbuf_advance_get_idx(rb, len) ((rb)->get += (len))
197
198
199 /**
200  * Try send as many bytes as possible from output ring buffer
201  * @param rb Output ring buffer
202  * @param tpcb TCP connection handle
203  */
204 static void
205 mqtt_output_send(struct mqtt_ringbuf_t *rb, struct tcp_pcb *tpcb)
206 {
207   err_t err;
208   u8_t wrap = 0;
209   u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb);
210   u16_t send_len = tcp_sndbuf(tpcb);
211   LWIP_ASSERT("mqtt_output_send: tpcb != NULL", tpcb != NULL);
212
213   if (send_len == 0 || ringbuf_lin_len == 0) {
214     return;
215   }
216
217   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_output_send: tcp_sndbuf: %d bytes, ringbuf_linear_available: %d, get %d, put %d\n",
218                                 send_len, ringbuf_lin_len, ((rb)->get & MQTT_RINGBUF_IDX_MASK), ((rb)->put & MQTT_RINGBUF_IDX_MASK)));
219
220   if (send_len > ringbuf_lin_len) {
221     /* Space in TCP output buffer is larger than available in ring buffer linear portion */
222     send_len = ringbuf_lin_len;
223     /* Wrap around if more data in ring buffer after linear portion */
224     wrap = (mqtt_ringbuf_len(rb) > ringbuf_lin_len);
225   }
226   err = tcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (wrap ? TCP_WRITE_FLAG_MORE : 0));
227   if ((err == ERR_OK) && wrap) {
228     mqtt_ringbuf_advance_get_idx(rb, send_len);
229     /* Use the lesser one of ring buffer linear length and TCP send buffer size */
230     send_len = LWIP_MIN(tcp_sndbuf(tpcb), mqtt_ringbuf_linear_read_length(rb));
231     err = tcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY);
232   }
233
234   if (err == ERR_OK) {
235     mqtt_ringbuf_advance_get_idx(rb, send_len);
236     /* Flush */
237     tcp_output(tpcb);
238   } else {
239     LWIP_DEBUGF(MQTT_DEBUG_WARN, ("mqtt_output_send: Send failed with err %d (\"%s\")\n", err, lwip_strerr(err)));
240   }
241 }
242
243
244
245 /*--------------------------------------------------------------------------------------------------------------------- */
246 /* Request queue */
247
248 /**
249  * Create request item
250  * @param r_objs Pointer to request objects
251  * @param pkt_id Packet identifier of request
252  * @param cb Packet callback to call when requests lifetime ends
253  * @param arg Parameter following callback
254  * @return Request or NULL if failed to create
255  */
256 static struct mqtt_request_t *
257 mqtt_create_request(struct mqtt_request_t *r_objs, u16_t pkt_id, mqtt_request_cb_t cb, void *arg)
258 {
259   struct mqtt_request_t *r = NULL;
260   u8_t n;
261   LWIP_ASSERT("mqtt_create_request: r_objs != NULL", r_objs != NULL);
262   for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) {
263     /* Item point to itself if not in use */
264     if (r_objs[n].next == &r_objs[n]) {
265       r = &r_objs[n];
266       r->next = NULL;
267       r->cb = cb;
268       r->arg = arg;
269       r->pkt_id = pkt_id;
270       break;
271     }
272   }
273   return r;
274 }
275
276
277 /**
278  * Append request to pending request queue
279  * @param tail Pointer to request queue tail pointer
280  * @param r Request to append
281  */
282 static void
283 mqtt_append_request(struct mqtt_request_t **tail, struct mqtt_request_t *r)
284 {
285   struct mqtt_request_t *head = NULL;
286   s16_t time_before = 0;
287   struct mqtt_request_t *iter;
288
289   LWIP_ASSERT("mqtt_append_request: tail != NULL", tail != NULL);
290
291   /* Iterate trough queue to find head, and count total timeout time */
292   for (iter = *tail; iter != NULL; iter = iter->next) {
293     time_before += iter->timeout_diff;
294     head = iter;
295   }
296
297   LWIP_ASSERT("mqtt_append_request: time_before <= MQTT_REQ_TIMEOUT", time_before <= MQTT_REQ_TIMEOUT);
298   r->timeout_diff = MQTT_REQ_TIMEOUT - time_before;
299   if (head == NULL) {
300     *tail = r;
301   } else {
302     head->next = r;
303   }
304 }
305
306
307 /**
308  * Delete request item
309  * @param r Request item to delete
310  */
311 static void
312 mqtt_delete_request(struct mqtt_request_t *r)
313 {
314   if (r != NULL) {
315     r->next = r;
316   }
317 }
318
319 /**
320  * Remove a request item with a specific packet identifier from request queue
321  * @param tail Pointer to request queue tail pointer
322  * @param pkt_id Packet identifier of request to take
323  * @return Request item if found, NULL if not
324  */
325 static struct mqtt_request_t *
326 mqtt_take_request(struct mqtt_request_t **tail, u16_t pkt_id)
327 {
328   struct mqtt_request_t *iter = NULL, *prev = NULL;
329   LWIP_ASSERT("mqtt_take_request: tail != NULL", tail != NULL);
330   /* Search all request for pkt_id */
331   for (iter = *tail; iter != NULL; iter = iter->next) {
332     if (iter->pkt_id == pkt_id) {
333       break;
334     }
335     prev = iter;
336   }
337
338   /* If request was found */
339   if (iter != NULL) {
340     /* unchain */
341     if (prev == NULL) {
342       *tail= iter->next;
343     } else {
344       prev->next = iter->next;
345     }
346     /* If exists, add remaining timeout time for the request to next */
347     if (iter->next != NULL) {
348       iter->next->timeout_diff += iter->timeout_diff;
349     }
350     iter->next = NULL;
351   }
352   return iter;
353 }
354
355 /**
356  * Handle requests timeout
357  * @param tail Pointer to request queue tail pointer
358  * @param t Time since last call in seconds
359  */
360 static void
361 mqtt_request_time_elapsed(struct mqtt_request_t **tail, u8_t t)
362 {
363   struct mqtt_request_t *r;
364   LWIP_ASSERT("mqtt_request_time_elapsed: tail != NULL", tail != NULL);
365   r = *tail;
366   while (t > 0 && r != NULL) {
367     if (t >= r->timeout_diff) {
368       t -= (u8_t)r->timeout_diff;
369       /* Unchain */
370       *tail = r->next;
371       /* Notify upper layer about timeout */
372       if (r->cb != NULL) {
373         r->cb(r->arg, ERR_TIMEOUT);
374       }
375       mqtt_delete_request(r);
376       /* Tail might be be modified in callback, so re-read it in every iteration */
377       r = *(struct mqtt_request_t * const volatile *)tail;
378     } else {
379       r->timeout_diff -= t;
380       t = 0;
381     }
382   }
383 }
384
385 /**
386  * Free all request items
387  * @param tail Pointer to request queue tail pointer
388  */
389 static void
390 mqtt_clear_requests(struct mqtt_request_t **tail)
391 {
392   struct mqtt_request_t *iter, *next;
393   LWIP_ASSERT("mqtt_clear_requests: tail != NULL", tail != NULL);
394   for (iter = *tail; iter != NULL; iter = next) {
395     next = iter->next;
396     mqtt_delete_request(iter);
397   }
398   *tail = NULL;
399 }
400 /**
401  * Initialize all request items
402  * @param r_objs Pointer to request objects
403  */
404 static void
405 mqtt_init_requests(struct mqtt_request_t *r_objs)
406 {
407   u8_t n;
408   LWIP_ASSERT("mqtt_init_requests: r_objs != NULL", r_objs != NULL);
409   for (n = 0; n < MQTT_REQ_MAX_IN_FLIGHT; n++) {
410     /* Item pointing to itself indicates unused */
411     r_objs[n].next = &r_objs[n];
412   }
413 }
414
415 /*--------------------------------------------------------------------------------------------------------------------- */
416 /* Output message build helpers */
417
418
419 static void
420 mqtt_output_append_u8(struct mqtt_ringbuf_t *rb, u8_t value)
421 {
422   mqtt_ringbuf_put(rb, value);
423 }
424
425 static
426 void mqtt_output_append_u16(struct mqtt_ringbuf_t *rb, u16_t value)
427 {
428   mqtt_ringbuf_put(rb, value >> 8);
429   mqtt_ringbuf_put(rb, value & 0xff);
430 }
431
432 static void
433 mqtt_output_append_buf(struct mqtt_ringbuf_t *rb, const void *data, u16_t length)
434 {
435   u16_t n;
436   for (n = 0; n < length; n++) {
437     mqtt_ringbuf_put(rb, ((const u8_t *)data)[n]);
438   }
439 }
440
441 static void
442 mqtt_output_append_string(struct mqtt_ringbuf_t *rb, const char *str, u16_t length)
443 {
444   u16_t n;
445   mqtt_ringbuf_put(rb, length >> 8);
446   mqtt_ringbuf_put(rb, length & 0xff);
447   for (n = 0; n < length; n++) {
448     mqtt_ringbuf_put(rb, str[n]);
449   }
450 }
451
452 /**
453  * Append fixed header
454  * @param rb Output ring buffer
455  * @param msg_type see enum mqtt_message_type
456  * @param dup MQTT DUP flag
457  * @param qos MQTT QoS field
458  * @param retain MQTT retain flag
459  * @param r_length Remaining length after fixed header
460  */
461
462 static void
463 mqtt_output_append_fixed_header(struct mqtt_ringbuf_t *rb, u8_t msg_type, u8_t dup,
464                  u8_t qos, u8_t retain, u16_t r_length)
465 {
466   /* Start with control byte */
467   mqtt_output_append_u8(rb, (((msg_type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1)));
468   /* Encode remaining length field */
469   do {
470     mqtt_output_append_u8(rb, (r_length & 0x7f) | (r_length >= 128 ? 0x80 : 0));
471     r_length >>= 7;
472   } while (r_length > 0);
473 }
474
475
476 /**
477  * Check output buffer space
478  * @param rb Output ring buffer
479  * @param r_length Remaining length after fixed header
480  * @return 1 if message will fit, 0 if not enough buffer space
481  */
482 static u8_t
483 mqtt_output_check_space(struct mqtt_ringbuf_t *rb, u16_t r_length)
484 {
485   /* Start with length of type byte + remaining length */
486   u16_t total_len = 1 + r_length;
487
488   LWIP_ASSERT("mqtt_output_check_space: rb != NULL", rb != NULL);
489
490  /* Calculate number of required bytes to contain the remaining bytes field and add to total*/
491   do {
492     total_len++;
493     r_length >>= 7;
494   } while (r_length > 0);
495
496   return (total_len <= mqtt_ringbuf_free(rb));
497 }
498
499
500 /**
501  * Close connection to server
502  * @param client MQTT client
503  * @param reason Reason for disconnection
504  */
505 static void
506 mqtt_close(mqtt_client_t *client, mqtt_connection_status_t reason)
507 {
508   LWIP_ASSERT("mqtt_close: client != NULL", client != NULL);
509
510   /* Bring down TCP connection if not already done */
511   if (client->conn != NULL) {
512     err_t res;
513     tcp_recv(client->conn, NULL);
514     tcp_err(client->conn,  NULL);
515     tcp_sent(client->conn, NULL);
516     res = tcp_close(client->conn);
517     if (res != ERR_OK) {
518       tcp_abort(client->conn);
519       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_close: Close err=%s\n", lwip_strerr(res)));
520     }
521     client->conn = NULL;
522   }
523
524   /* Remove all pending requests */
525   mqtt_clear_requests(&client->pend_req_queue);
526   /* Stop cyclic timer */
527   sys_untimeout(mqtt_cyclic_timer, client);
528
529   /* Notify upper layer of disconnection if changed state */
530   if (client->conn_state != TCP_DISCONNECTED) {
531
532     client->conn_state = TCP_DISCONNECTED;
533     if (client->connect_cb != NULL) {
534       client->connect_cb(client, client->connect_arg, reason);
535     }
536   }
537 }
538
539
540 /**
541  * Interval timer, called every MQTT_CYCLIC_TIMER_INTERVAL seconds in MQTT_CONNECTING and MQTT_CONNECTED states
542  * @param arg MQTT client
543  */
544 static void
545 mqtt_cyclic_timer(void *arg)
546 {
547   u8_t restart_timer = 1;
548   mqtt_client_t *client = (mqtt_client_t *)arg;
549   LWIP_ASSERT("mqtt_cyclic_timer: client != NULL", client != NULL);
550
551   if (client->conn_state == MQTT_CONNECTING) {
552     client->cyclic_tick++;
553     if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= MQTT_CONNECT_TIMOUT) {
554       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_cyclic_timer: CONNECT attempt to server timed out\n"));
555       /* Disconnect TCP */
556       mqtt_close(client, MQTT_CONNECT_TIMEOUT);
557       restart_timer = 0;
558     }
559   } else if (client->conn_state == MQTT_CONNECTED) {
560     /* Handle timeout for pending requests */
561     mqtt_request_time_elapsed(&client->pend_req_queue, MQTT_CYCLIC_TIMER_INTERVAL);
562
563     /* keep_alive > 0 means keep alive functionality shall be used */
564     if (client->keep_alive > 0) {
565
566       client->server_watchdog++;
567       /* If reception from server has been idle for 1.5*keep_alive time, server is considered unresponsive */
568       if ((client->server_watchdog * MQTT_CYCLIC_TIMER_INTERVAL) > (client->keep_alive + client->keep_alive/2)) {
569         LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_cyclic_timer: Server incoming keep-alive timeout\n"));
570         mqtt_close(client, MQTT_CONNECT_TIMEOUT);
571         restart_timer = 0;
572       }
573
574       /* If time for a keep alive message to be sent, transmission has been idle for keep_alive time */
575       if ((client->cyclic_tick * MQTT_CYCLIC_TIMER_INTERVAL) >= client->keep_alive) {
576         LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_cyclic_timer: Sending keep-alive message to server\n"));
577         if (mqtt_output_check_space(&client->output, 0) != 0) {
578           mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0, 0);
579           client->cyclic_tick = 0;
580         }
581       } else {
582         client->cyclic_tick++;
583       }
584     }
585   } else {
586     LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_cyclic_timer: Timer should not be running in state %d\n", client->conn_state));
587     restart_timer = 0;
588   }
589   if (restart_timer) {
590     sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL*1000, mqtt_cyclic_timer, arg);
591   }
592 }
593
594
595 /**
596  * Send PUBACK, PUBREC or PUBREL response message
597  * @param client MQTT client
598  * @param msg PUBACK, PUBREC or PUBREL
599  * @param pkt_id Packet identifier
600  * @param qos QoS value
601  * @return ERR_OK if successful, ERR_MEM if out of memory
602  */
603 static err_t
604 pub_ack_rec_rel_response(mqtt_client_t *client, u8_t msg, u16_t pkt_id, u8_t qos)
605 {
606   err_t err = ERR_OK;
607   if (mqtt_output_check_space(&client->output, 2)) {
608     mqtt_output_append_fixed_header(&client->output, msg, 0, qos, 0, 2);
609     mqtt_output_append_u16(&client->output, pkt_id);
610     mqtt_output_send(&client->output, client->conn);
611   } else {
612     LWIP_DEBUGF(MQTT_DEBUG_TRACE,("pub_ack_rec_rel_response: OOM creating response: %s with pkt_id: %d\n",
613                                   mqtt_msg_type_to_str(msg), pkt_id));
614     err = ERR_MEM;
615   }
616   return err;
617 }
618
619 /**
620  * Subscribe response from server
621  * @param r Matching request
622  * @param result Result code from server
623  */
624 static void
625 mqtt_incomming_suback(struct mqtt_request_t *r, u8_t result)
626 {
627   if (r->cb != NULL) {
628     r->cb(r->arg, result < 3 ? ERR_OK : ERR_ABRT);
629   }
630 }
631
632
633 /**
634  * Complete MQTT message received or buffer full
635  * @param client MQTT client
636  * @param fixed_hdr_idx header index
637  * @param length length received part
638  * @param remaining_length Remaining length of complete message
639  */
640 static mqtt_connection_status_t
641   mqtt_message_received(mqtt_client_t *client, u8_t fixed_hdr_idx, u16_t length, u32_t remaining_length)
642 {
643   mqtt_connection_status_t res = MQTT_CONNECT_ACCEPTED;
644
645   u8_t *var_hdr_payload = client->rx_buffer + fixed_hdr_idx;
646
647   /* Control packet type */
648   u8_t pkt_type = MQTT_CTL_PACKET_TYPE(client->rx_buffer[0]);
649   u16_t pkt_id = 0;
650
651   if (pkt_type == MQTT_MSG_TYPE_CONNACK) {
652     if (client->conn_state == MQTT_CONNECTING) {
653       /* Get result code from CONNACK */
654       res = (mqtt_connection_status_t)var_hdr_payload[1];
655       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: Connect response code %d\n", res));
656       if (res == MQTT_CONNECT_ACCEPTED) {
657         /* Reset cyclic_tick when changing to connected state */
658         client->cyclic_tick = 0;
659         client->conn_state = MQTT_CONNECTED;
660         /* Notify upper layer */
661         if (client->connect_cb != 0) {
662           client->connect_cb(client, client->connect_arg, res);
663         }
664       }
665     } else {
666       LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Received CONNACK in connected state\n"));
667     }
668   } else if (pkt_type == MQTT_MSG_TYPE_PINGRESP) {
669     LWIP_DEBUGF(MQTT_DEBUG_TRACE,( "mqtt_message_received: Received PINGRESP from server\n"));
670
671   } else if (pkt_type == MQTT_MSG_TYPE_PUBLISH) {
672     u16_t payload_offset = 0;
673     u16_t payload_length = length;
674     u8_t qos = MQTT_CTL_PACKET_QOS(client->rx_buffer[0]);
675
676     if (client->msg_idx <= MQTT_VAR_HEADER_BUFFER_LEN) {
677       /* Should have topic and pkt id*/
678       uint8_t *topic;
679       uint16_t after_topic;
680       u8_t bkp;
681       u16_t topic_len = var_hdr_payload[0];
682       topic_len = (topic_len << 8) + (u16_t)(var_hdr_payload[1]);
683
684       topic = var_hdr_payload + 2;
685       after_topic = 2 + topic_len;
686       /* Check length, add one byte even for QoS 0 so that zero termination will fit */
687       if ((after_topic + (qos? 2 : 1)) > length) {
688         LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Receive buffer can not fit topic + pkt_id\n"));
689         goto out_disconnect;
690       }
691
692       /* id for QoS 1 and 2 */
693       if (qos > 0) {
694         client->inpub_pkt_id = ((u16_t)var_hdr_payload[after_topic] << 8) + (u16_t)var_hdr_payload[after_topic + 1];
695         after_topic += 2;
696       } else {
697         client->inpub_pkt_id = 0;
698       }
699       /* Take backup of byte after topic */
700       bkp = topic[topic_len];
701       /* Zero terminate string */
702       topic[topic_len] = 0;
703       /* Payload data remaining in receive buffer */
704       payload_length = length - after_topic;
705       payload_offset = after_topic;
706
707       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_incomming_publish: Received message with QoS %d at topic: %s, payload length %d\n",
708                                     qos, topic, remaining_length + payload_length));
709       if (client->pub_cb != NULL) {
710         client->pub_cb(client->inpub_arg, (const char *)topic, remaining_length + payload_length);
711       }
712       /* Restore byte after topic */
713       topic[topic_len] = bkp;
714     }
715     if (payload_length > 0 || remaining_length == 0) {
716       client->data_cb(client->inpub_arg, var_hdr_payload + payload_offset, payload_length, remaining_length == 0 ? MQTT_DATA_FLAG_LAST : 0);
717       /* Reply if QoS > 0 */
718       if (remaining_length == 0 && qos > 0) {
719         /* Send PUBACK for QoS 1 or PUBREC for QoS 2 */
720         u8_t resp_msg = (qos == 1) ? MQTT_MSG_TYPE_PUBACK : MQTT_MSG_TYPE_PUBREC;
721         LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_incomming_publish: Sending publish response: %s with pkt_id: %d\n",
722                                       mqtt_msg_type_to_str(resp_msg), client->inpub_pkt_id));
723         pub_ack_rec_rel_response(client, resp_msg, client->inpub_pkt_id, 0);
724       }
725     }
726   } else {
727     /* Get packet identifier */
728     pkt_id = (u16_t)var_hdr_payload[0] << 8;
729     pkt_id |= (u16_t)var_hdr_payload[1];
730     if (pkt_id == 0) {
731       LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: Got message with illegal packet identifier: 0\n"));
732       goto out_disconnect;
733     }
734     if (pkt_type == MQTT_MSG_TYPE_PUBREC) {
735       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: PUBREC, sending PUBREL with pkt_id: %d\n",pkt_id));
736       pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBREL, pkt_id, 1);
737
738     } else if (pkt_type == MQTT_MSG_TYPE_PUBREL) {
739       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: PUBREL, sending PUBCOMP response with pkt_id: %d\n",pkt_id));
740       pub_ack_rec_rel_response(client, MQTT_MSG_TYPE_PUBCOMP, pkt_id, 0);
741
742     } else if (pkt_type == MQTT_MSG_TYPE_SUBACK || pkt_type == MQTT_MSG_TYPE_UNSUBACK ||
743               pkt_type == MQTT_MSG_TYPE_PUBCOMP || pkt_type == MQTT_MSG_TYPE_PUBACK) {
744       struct mqtt_request_t *r = mqtt_take_request(&client->pend_req_queue, pkt_id);
745       if (r != NULL) {
746         LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_message_received: %s response with id %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
747         if (pkt_type == MQTT_MSG_TYPE_SUBACK) {
748           if (length < 3) {
749             LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_message_received: To small SUBACK packet\n"));
750             goto out_disconnect;
751           } else {
752             mqtt_incomming_suback(r, var_hdr_payload[2]);
753           }
754         } else if (r->cb != NULL) {
755           r->cb(r->arg, ERR_OK);
756         }
757         mqtt_delete_request(r);
758       } else {
759         LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received %s reply, with wrong pkt_id: %d\n", mqtt_msg_type_to_str(pkt_type), pkt_id));
760       }
761     } else {
762       LWIP_DEBUGF(MQTT_DEBUG_WARN,( "mqtt_message_received: Received unknown message type: %d\n", pkt_type));
763       goto out_disconnect;
764     }
765   }
766   return res;
767 out_disconnect:
768   return MQTT_CONNECT_DISCONNECTED;
769 }
770
771
772 /**
773  * MQTT incoming message parser
774  * @param client MQTT client
775  * @param p PBUF chain of received data
776  * @return Connection status
777  */
778 static mqtt_connection_status_t
779 mqtt_parse_incoming(mqtt_client_t *client, struct pbuf *p)
780 {
781   u16_t in_offset = 0;
782   u32_t msg_rem_len = 0;
783   u8_t fixed_hdr_idx = 0;
784   u8_t b = 0;
785
786   while (p->tot_len > in_offset) {
787     if ((fixed_hdr_idx < 2) || ((b & 0x80) != 0)) {
788
789       if (fixed_hdr_idx < client->msg_idx) {
790         b = client->rx_buffer[fixed_hdr_idx];
791       } else {
792         b = pbuf_get_at(p, in_offset++);
793         client->rx_buffer[client->msg_idx++] = b;
794       }
795       fixed_hdr_idx++;
796
797       if (fixed_hdr_idx >= 2) {
798         msg_rem_len |= (u32_t)(b & 0x7f) << ((fixed_hdr_idx - 2) * 7);
799         if ((b & 0x80) == 0) {
800           LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_parse_incoming: Remaining length after fixed header: %d\n", msg_rem_len));
801           if (msg_rem_len == 0) {
802             /* Complete message with no extra headers of payload received */
803             mqtt_message_received(client, fixed_hdr_idx, 0, 0);
804             client->msg_idx = 0;
805             fixed_hdr_idx = 0;
806           } else {
807             /* Bytes remaining in message */
808             msg_rem_len = (msg_rem_len + fixed_hdr_idx) - client->msg_idx;
809           }
810         }
811       }
812     } else {
813       u16_t cpy_len, cpy_start, buffer_space;
814
815       cpy_start = (client->msg_idx - fixed_hdr_idx) % (MQTT_VAR_HEADER_BUFFER_LEN - fixed_hdr_idx) + fixed_hdr_idx;
816
817       /* Allow to copy the lesser one of available length in input data or bytes remaining in message */
818       cpy_len = (u16_t)LWIP_MIN((u16_t)(p->tot_len - in_offset), msg_rem_len);
819
820       /* Limit to available space in buffer */
821       buffer_space = MQTT_VAR_HEADER_BUFFER_LEN - cpy_start;
822       if (cpy_len > buffer_space) {
823         cpy_len = buffer_space;
824       }
825       pbuf_copy_partial(p, client->rx_buffer+cpy_start, cpy_len, in_offset);
826
827       /* Advance get and put indexes  */
828       client->msg_idx += cpy_len;
829       in_offset += cpy_len;
830       msg_rem_len -= cpy_len;
831
832       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_parse_incoming: msg_idx: %d, cpy_len: %d, remaining %d\n", client->msg_idx, cpy_len, msg_rem_len));
833       if (msg_rem_len == 0 || cpy_len == buffer_space) {
834         /* Whole message received or buffer is full */
835         mqtt_connection_status_t res = mqtt_message_received(client, fixed_hdr_idx, (cpy_start + cpy_len) - fixed_hdr_idx, msg_rem_len);
836         if (res != MQTT_CONNECT_ACCEPTED) {
837           return res;
838         }
839         if (msg_rem_len == 0) {
840           /* Reset parser state */
841           client->msg_idx = 0;
842           /* msg_tot_len = 0; */
843           fixed_hdr_idx = 0;
844         }
845       }
846     }
847   }
848   return MQTT_CONNECT_ACCEPTED;
849 }
850
851
852 /**
853  * TCP received callback function. @see tcp_recv_fn
854  * @param arg MQTT client
855  * @param p PBUF chain of received data
856  * @param err Passed as return value if not ERR_OK
857  * @return ERR_OK or err passed into callback
858  */
859 static err_t
860 mqtt_tcp_recv_cb(void *arg, struct tcp_pcb *pcb, struct pbuf *p, err_t err)
861 {
862   mqtt_client_t *client = (mqtt_client_t *)arg;
863   LWIP_ASSERT("mqtt_tcp_recv_cb: client != NULL", client != NULL);
864   LWIP_ASSERT("mqtt_tcp_recv_cb: client->conn == pcb", client->conn == pcb);
865
866   if (p == NULL) {
867     LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_recv_cb: Recv pbuf=NULL, remote has closed connection\n"));
868     mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
869   } else {
870     mqtt_connection_status_t res;
871     if (err != ERR_OK) {
872       LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_recv_cb: Recv err=%d\n", err));
873       pbuf_free(p);
874       return err;
875     }
876
877     /* Tell remote that data has been received */
878     tcp_recved(pcb, p->tot_len);
879     res = mqtt_parse_incoming(client, p);
880     pbuf_free(p);
881
882     if (res != MQTT_CONNECT_ACCEPTED) {
883       mqtt_close(client, res);
884     }
885     /* If keep alive functionality is used */
886     if (client->keep_alive != 0) {
887       /* Reset server alive watchdog */
888       client->server_watchdog = 0;
889     }
890
891   }
892   return ERR_OK;
893 }
894
895
896 /**
897  * TCP data sent callback function. @see tcp_sent_fn
898  * @param arg MQTT client
899  * @param tpcb TCP connection handle
900  * @param len Number of bytes sent
901  * @return ERR_OK
902  */
903 static err_t
904 mqtt_tcp_sent_cb(void *arg, struct tcp_pcb *tpcb, u16_t len)
905 {
906   mqtt_client_t *client = (mqtt_client_t *)arg;
907
908   LWIP_UNUSED_ARG(tpcb);
909   LWIP_UNUSED_ARG(len);
910
911   if (client->conn_state == MQTT_CONNECTED) {
912     struct mqtt_request_t *r;
913
914     /* Reset keep-alive send timer and server watchdog */
915     client->cyclic_tick = 0;
916     client->server_watchdog = 0;
917     /* QoS 0 publish has no response from server, so call its callbacks here */
918     while ((r = mqtt_take_request(&client->pend_req_queue, 0)) != NULL) {
919       LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_sent_cb: Calling QoS 0 publish complete callback\n"));
920       if (r->cb != NULL) {
921         r->cb(r->arg, ERR_OK);
922       }
923       mqtt_delete_request(r);
924     }
925     /* Try send any remaining buffers from output queue */
926     mqtt_output_send(&client->output, client->conn);
927   }
928   return ERR_OK;
929 }
930
931 /**
932  * TCP error callback function. @see tcp_err_fn
933  * @param arg MQTT client
934  * @param err Error encountered
935  */
936 static void
937 mqtt_tcp_err_cb(void *arg, err_t err)
938 {
939   mqtt_client_t *client = (mqtt_client_t *)arg;
940   LWIP_UNUSED_ARG(err); /* only used for debug output */
941   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_err_cb: TCP error callback: error %d, arg: %p\n", err, arg));
942   LWIP_ASSERT("mqtt_tcp_err_cb: client != NULL", client != NULL);
943   /* Set conn to null before calling close as pcb is already deallocated*/
944   client->conn = 0;
945   mqtt_close(client, MQTT_CONNECT_DISCONNECTED);
946 }
947
948 /**
949  * TCP poll callback function. @see tcp_poll_fn
950  * @param arg MQTT client
951  * @param tpcb TCP connection handle
952  * @return err ERR_OK
953  */
954 static err_t
955 mqtt_tcp_poll_cb(void *arg, struct tcp_pcb *tpcb)
956 {
957   mqtt_client_t *client = (mqtt_client_t *)arg;
958   if (client->conn_state == MQTT_CONNECTED) {
959     /* Try send any remaining buffers from output queue */
960     mqtt_output_send(&client->output, tpcb);
961   }
962   return ERR_OK;
963 }
964
965 /**
966  * TCP connect callback function. @see tcp_connected_fn
967  * @param arg MQTT client
968  * @param err Always ERR_OK, mqtt_tcp_err_cb is called in case of error
969  * @return ERR_OK
970  */
971 static err_t
972 mqtt_tcp_connect_cb(void *arg, struct tcp_pcb *tpcb, err_t err)
973 {
974   mqtt_client_t* client = (mqtt_client_t *)arg;
975
976   if (err != ERR_OK) {
977     LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_tcp_connect_cb: TCP connect error %d\n", err));
978     return err;
979   }
980
981   /* Initiate receiver state */
982   client->msg_idx = 0;
983
984   /* Setup TCP callbacks */
985   tcp_recv(tpcb, mqtt_tcp_recv_cb);
986   tcp_sent(tpcb, mqtt_tcp_sent_cb);
987   tcp_poll(tpcb, mqtt_tcp_poll_cb, 2);
988
989   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_tcp_connect_cb: TCP connection established to server\n"));
990   /* Enter MQTT connect state */
991   client->conn_state = MQTT_CONNECTING;
992
993   /* Start cyclic timer */
994   sys_timeout(MQTT_CYCLIC_TIMER_INTERVAL*1000, mqtt_cyclic_timer, client);
995   client->cyclic_tick = 0;
996
997   /* Start transmission from output queue, connect message is the first one out*/
998   mqtt_output_send(&client->output, client->conn);
999
1000   return ERR_OK;
1001 }
1002
1003
1004
1005 /*---------------------------------------------------------------------------------------------------- */
1006 /* Public API */
1007
1008
1009 /**
1010  * @ingroup mqtt
1011  * MQTT publish function.
1012  * @param client MQTT client
1013  * @param topic Publish topic string
1014  * @param payload Data to publish (NULL is allowed)
1015  * @param payload_length: Length of payload (0 is allowed)
1016  * @param qos Quality of service, 0 1 or 2
1017  * @param retain MQTT retain flag
1018  * @param cb Callback to call when publish is complete or has timed out
1019  * @param arg User supplied argument to publish callback
1020  * @return ERR_OK if successful
1021  *         ERR_CONN if client is disconnected
1022  *         ERR_MEM if short on memory
1023  */
1024 err_t
1025 mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain,
1026              mqtt_request_cb_t cb, void *arg)
1027 {
1028   struct mqtt_request_t *r;
1029   u16_t pkt_id;
1030   size_t topic_strlen;
1031   size_t total_len;
1032   u16_t topic_len;
1033   u16_t remaining_length;
1034
1035   LWIP_ASSERT("mqtt_publish: client != NULL", client);
1036   LWIP_ASSERT("mqtt_publish: topic != NULL", topic);
1037   LWIP_ERROR("mqtt_publish: TCP disconnected", (client->conn_state != TCP_DISCONNECTED), return ERR_CONN);
1038
1039   topic_strlen = strlen(topic);
1040   LWIP_ERROR("mqtt_publish: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
1041   topic_len = (u16_t)topic_strlen;
1042   total_len = 2 + topic_len + payload_length;
1043   LWIP_ERROR("mqtt_publish: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
1044   remaining_length = (u16_t)total_len;
1045
1046   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length, topic));
1047
1048   if (qos > 0) {
1049     remaining_length += 2;
1050     /* Generate pkt_id id for QoS1 and 2 */
1051     pkt_id = msg_generate_packet_id(client);
1052   } else {
1053     /* Use reserved value pkt_id 0 for QoS 0 in request handle */
1054     pkt_id = 0;
1055   }
1056
1057   r = mqtt_create_request(client->req_list, pkt_id, cb, arg);
1058   if (r == NULL) {
1059     return ERR_MEM;
1060   }
1061
1062   if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1063     mqtt_delete_request(r);
1064     return ERR_MEM;
1065   }
1066   /* Append fixed header */
1067   mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain, remaining_length);
1068
1069   /* Append Topic */
1070   mqtt_output_append_string(&client->output, topic, topic_len);
1071
1072   /* Append packet if for QoS 1 and 2*/
1073   if (qos > 0) {
1074     mqtt_output_append_u16(&client->output, pkt_id);
1075   }
1076
1077   /* Append optional publish payload */
1078   if ((payload != NULL) && (payload_length > 0)) {
1079     mqtt_output_append_buf(&client->output, payload, payload_length);
1080   }
1081
1082   mqtt_append_request(&client->pend_req_queue, r);
1083   mqtt_output_send(&client->output, client->conn);
1084   return ERR_OK;
1085 }
1086
1087
1088 /**
1089  * @ingroup mqtt
1090  * MQTT subscribe/unsubscribe function.
1091  * @param client MQTT client
1092  * @param topic topic to subscribe to
1093  * @param qos Quality of service, 0 1 or 2 (only used for subscribe)
1094  * @param cb Callback to call when subscribe/unsubscribe reponse is received
1095  * @param arg User supplied argument to publish callback
1096  * @param sub 1 for subscribe, 0 for unsubscribe
1097  * @return ERR_OK if successful, @see err_t enum for other results
1098  */
1099 err_t
1100 mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub)
1101 {
1102   size_t topic_strlen;
1103   size_t total_len;
1104   u16_t topic_len;
1105   u16_t remaining_length;
1106   u16_t pkt_id;
1107   struct mqtt_request_t *r;
1108
1109   LWIP_ASSERT("mqtt_sub_unsub: client != NULL", client);
1110   LWIP_ASSERT("mqtt_sub_unsub: topic != NULL", topic);
1111
1112   topic_strlen = strlen(topic);
1113   LWIP_ERROR("mqtt_sub_unsub: topic length overflow", (topic_strlen <= (0xFFFF - 2)), return ERR_ARG);
1114   topic_len = (u16_t)topic_strlen;
1115   /* Topic string, pkt_id, qos for subscribe */
1116   total_len =  topic_len + 2 + 2 + (sub != 0);
1117   LWIP_ERROR("mqtt_sub_unsub: total length overflow", (total_len <= 0xFFFF), return ERR_ARG);
1118   remaining_length = (u16_t)total_len;
1119
1120   LWIP_ASSERT("mqtt_sub_unsub: qos < 3", qos < 3);
1121   if (client->conn_state == TCP_DISCONNECTED) {
1122     LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_sub_unsub: Can not (un)subscribe in disconnected state\n"));
1123     return ERR_CONN;
1124   }
1125
1126   pkt_id = msg_generate_packet_id(client);
1127   r = mqtt_create_request(client->req_list, pkt_id, cb, arg);
1128   if (r == NULL) {
1129     return ERR_MEM;
1130   }
1131
1132   if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1133     mqtt_delete_request(r);
1134     return ERR_MEM;
1135   }
1136
1137   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_sub_unsub: Client (un)subscribe to topic \"%s\", id: %d\n", topic, pkt_id));
1138
1139   mqtt_output_append_fixed_header(&client->output, sub ? MQTT_MSG_TYPE_SUBSCRIBE : MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0, remaining_length);
1140   /* Packet id */
1141   mqtt_output_append_u16(&client->output, pkt_id);
1142   /* Topic */
1143   mqtt_output_append_string(&client->output, topic, topic_len);
1144   /* QoS */
1145   if (sub != 0) {
1146     mqtt_output_append_u8(&client->output, LWIP_MIN(qos, 2));
1147   }
1148
1149   mqtt_append_request(&client->pend_req_queue, r);
1150   mqtt_output_send(&client->output, client->conn);
1151   return ERR_OK;
1152 }
1153
1154
1155 /**
1156  * @ingroup mqtt
1157  * Set callback to handle incoming publish requests from server
1158  * @param client MQTT client
1159  * @param pub_cb Callback invoked when publish starts, contain topic and total length of payload
1160  * @param data_cb Callback for each fragment of payload that arrives
1161  * @param arg User supplied argument to both callbacks
1162  */
1163 void
1164 mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb,
1165                              mqtt_incoming_data_cb_t data_cb, void *arg)
1166 {
1167   LWIP_ASSERT("mqtt_set_inpub_callback: client != NULL", client != NULL);
1168   client->data_cb = data_cb;
1169   client->pub_cb = pub_cb;
1170   client->inpub_arg = arg;
1171 }
1172
1173 /**
1174  * @ingroup mqtt
1175  * Create a new MQTT client instance
1176  * @return Pointer to instance on success, NULL otherwise
1177  */
1178 mqtt_client_t *
1179 mqtt_client_new(void)
1180 {
1181   mqtt_client_t *client = (mqtt_client_t *)mem_malloc(sizeof(mqtt_client_t));
1182   if (client != NULL) {
1183     memset(client, 0, sizeof(mqtt_client_t));
1184   }
1185   return client;
1186 }
1187
1188
1189 /**
1190  * @ingroup mqtt
1191  * Connect to MQTT server
1192  * @param client MQTT client
1193  * @param ip_addr Server IP
1194  * @param port Server port
1195  * @param cb Connection state change callback
1196  * @param arg User supplied argument to connection callback
1197  * @param client_info Client identification and connection options
1198  * @return ERR_OK if successful, @see err_t enum for other results
1199  */
1200 err_t
1201 mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ip_addr, u16_t port, mqtt_connection_cb_t cb, void *arg,
1202                     const struct mqtt_connect_client_info_t *client_info)
1203 {
1204   err_t err;
1205   size_t len;
1206   u16_t client_id_length;
1207   /* Length is the sum of 2+"MQTT", protocol level, flags and keep alive */
1208   u16_t remaining_length = 2 + 4 + 1 + 1 + 2;
1209   u8_t flags = 0, will_topic_len = 0, will_msg_len = 0;
1210
1211   LWIP_ASSERT("mqtt_client_connect: client != NULL", client != NULL);
1212   LWIP_ASSERT("mqtt_client_connect: ip_addr != NULL", ip_addr != NULL);
1213   LWIP_ASSERT("mqtt_client_connect: client_info != NULL", client_info != NULL);
1214   LWIP_ASSERT("mqtt_client_connect: client_info->client_id != NULL", client_info->client_id != NULL);
1215
1216   if (client->conn_state != TCP_DISCONNECTED) {
1217     LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Already connected\n"));
1218     return ERR_ISCONN;
1219   }
1220
1221   /* Wipe clean */
1222   memset(client, 0, sizeof(mqtt_client_t));
1223   client->connect_arg = arg;
1224   client->connect_cb = cb;
1225   client->keep_alive = client_info->keep_alive;
1226   mqtt_init_requests(client->req_list);
1227
1228   /* Build connect message */
1229   if (client_info->will_topic != NULL && client_info->will_msg != NULL) {
1230     flags |= MQTT_CONNECT_FLAG_WILL;
1231     flags |= (client_info->will_qos & 3) << 3;
1232     if (client_info->will_retain) {
1233       flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;
1234     }
1235     len = strlen(client_info->will_topic);
1236     LWIP_ERROR("mqtt_client_connect: client_info->will_topic length overflow", len <= 0xFF, return ERR_VAL);
1237     LWIP_ERROR("mqtt_client_connect: client_info->will_topic length must be > 0", len > 0, return ERR_VAL);
1238     will_topic_len = (u8_t)len;
1239     len = strlen(client_info->will_msg);
1240     LWIP_ERROR("mqtt_client_connect: client_info->will_msg length overflow", len <= 0xFF, return ERR_VAL);
1241     will_msg_len = (u8_t)len;
1242     len = remaining_length + 2 + will_topic_len + 2 + will_msg_len;
1243     LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1244     remaining_length = (u16_t)len;
1245   }
1246
1247   /* Don't complicate things, always connect using clean session */
1248   flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION;
1249
1250   len = strlen(client_info->client_id);
1251   LWIP_ERROR("mqtt_client_connect: client_info->client_id length overflow", len <= 0xFFFF, return ERR_VAL);
1252   client_id_length = (u16_t)len;
1253   len = remaining_length + 2 + client_id_length;
1254   LWIP_ERROR("mqtt_client_connect: remaining_length overflow", len <= 0xFFFF, return ERR_VAL);
1255   remaining_length = (u16_t)len;
1256
1257   if (mqtt_output_check_space(&client->output, remaining_length) == 0) {
1258     return ERR_MEM;
1259   }
1260
1261   client->conn = tcp_new();
1262   if (client->conn == NULL) {
1263     return ERR_MEM;
1264   }
1265
1266   /* Set arg pointer for callbacks */
1267   tcp_arg(client->conn, client);
1268   /* Any local address, pick random local port number */
1269   err = tcp_bind(client->conn, IP_ADDR_ANY, 0);
1270   if (err != ERR_OK) {
1271     LWIP_DEBUGF(MQTT_DEBUG_WARN,("mqtt_client_connect: Error binding to local ip/port, %d\n", err));
1272     goto tcp_fail;
1273   }
1274   LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Connecting to host: %s at port:%"U16_F"\n", ipaddr_ntoa(ip_addr), port));
1275
1276   /* Connect to server */
1277   err = tcp_connect(client->conn, ip_addr, port, mqtt_tcp_connect_cb);
1278   if (err != ERR_OK) {
1279     LWIP_DEBUGF(MQTT_DEBUG_TRACE,("mqtt_client_connect: Error connecting to remote ip/port, %d\n", err));
1280     goto tcp_fail;
1281   }
1282   /* Set error callback */
1283   tcp_err(client->conn, mqtt_tcp_err_cb);
1284   client->conn_state = TCP_CONNECTING;
1285
1286   /* Append fixed header */
1287   mqtt_output_append_fixed_header(&client->output, MQTT_MSG_TYPE_CONNECT, 0, 0, 0, remaining_length);
1288   /* Append Protocol string */
1289   mqtt_output_append_string(&client->output, "MQTT", 4);
1290   /* Append Protocol level */
1291   mqtt_output_append_u8(&client->output, 4);
1292   /* Append connect flags */
1293   mqtt_output_append_u8(&client->output, flags);
1294   /* Append keep-alive */
1295   mqtt_output_append_u16(&client->output, client_info->keep_alive);
1296   /* Append client id */
1297   mqtt_output_append_string(&client->output, client_info->client_id, client_id_length);
1298   /* Append will message if used */
1299   if ((flags & MQTT_CONNECT_FLAG_WILL) != 0) {
1300     mqtt_output_append_string(&client->output, client_info->will_topic, will_topic_len);
1301     mqtt_output_append_string(&client->output, client_info->will_msg, will_msg_len);
1302   }
1303   return ERR_OK;
1304
1305 tcp_fail:
1306   tcp_abort(client->conn);
1307   client->conn = NULL;
1308   return err;
1309 }
1310
1311
1312 /**
1313  * @ingroup mqtt
1314  * Disconnect from MQTT server
1315  * @param client MQTT client
1316  */
1317 void
1318 mqtt_disconnect(mqtt_client_t *client)
1319 {
1320   LWIP_ASSERT("mqtt_disconnect: client != NULL", client);
1321   /* If connection in not already closed */
1322   if (client->conn_state != TCP_DISCONNECTED) {
1323     /* Set conn_state before calling mqtt_close to prevent callback from being called */
1324     client->conn_state = TCP_DISCONNECTED;
1325     mqtt_close(client, (mqtt_connection_status_t)0);
1326   }
1327 }
1328
1329 /**
1330  * @ingroup mqtt
1331  * Check connection with server
1332  * @param client MQTT client
1333  * @return 1 if connected to server, 0 otherwise
1334  */
1335 u8_t
1336 mqtt_client_is_connected(mqtt_client_t *client)
1337 {
1338   LWIP_ASSERT("mqtt_client_is_connected: client != NULL", client);
1339   return client->conn_state == MQTT_CONNECTED;
1340 }
1341
1342 #endif /* LWIP_TCP && LWIP_CALLBACK_API */