9bcbaa1950a00b0203ffc9ddb969cab0ff0c4fda
[platform/upstream/cmake.git] / Utilities / cmcurl / lib / mqtt.c
1 /***************************************************************************
2  *                                  _   _ ____  _
3  *  Project                     ___| | | |  _ \| |
4  *                             / __| | | | |_) | |
5  *                            | (__| |_| |  _ <| |___
6  *                             \___|\___/|_| \_\_____|
7  *
8  * Copyright (C) 2020 - 2022, Daniel Stenberg, <daniel@haxx.se>, et al.
9  * Copyright (C) 2019, Björn Stenberg, <bjorn@haxx.se>
10  *
11  * This software is licensed as described in the file COPYING, which
12  * you should have received as part of this distribution. The terms
13  * are also available at https://curl.se/docs/copyright.html.
14  *
15  * You may opt to use, copy, modify, merge, publish, distribute and/or sell
16  * copies of the Software, and permit persons to whom the Software is
17  * furnished to do so, under the terms of the COPYING file.
18  *
19  * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20  * KIND, either express or implied.
21  *
22  ***************************************************************************/
23
24 #include "curl_setup.h"
25
26 #ifndef CURL_DISABLE_MQTT
27
28 #include "urldata.h"
29 #include <curl/curl.h>
30 #include "transfer.h"
31 #include "sendf.h"
32 #include "progress.h"
33 #include "mqtt.h"
34 #include "select.h"
35 #include "strdup.h"
36 #include "url.h"
37 #include "escape.h"
38 #include "warnless.h"
39 #include "curl_printf.h"
40 #include "curl_memory.h"
41 #include "multiif.h"
42 #include "rand.h"
43
44 /* The last #include file should be: */
45 #include "memdebug.h"
46
47 #define MQTT_MSG_CONNECT   0x10
48 #define MQTT_MSG_CONNACK   0x20
49 #define MQTT_MSG_PUBLISH   0x30
50 #define MQTT_MSG_SUBSCRIBE 0x82
51 #define MQTT_MSG_SUBACK    0x90
52 #define MQTT_MSG_DISCONNECT 0xe0
53
54 #define MQTT_CONNACK_LEN 2
55 #define MQTT_SUBACK_LEN 3
56 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
57
58 /*
59  * Forward declarations.
60  */
61
62 static CURLcode mqtt_do(struct Curl_easy *data, bool *done);
63 static CURLcode mqtt_done(struct Curl_easy *data,
64                           CURLcode status, bool premature);
65 static CURLcode mqtt_doing(struct Curl_easy *data, bool *done);
66 static int mqtt_getsock(struct Curl_easy *data, struct connectdata *conn,
67                         curl_socket_t *sock);
68 static CURLcode mqtt_setup_conn(struct Curl_easy *data,
69                                 struct connectdata *conn);
70
71 /*
72  * MQTT protocol handler.
73  */
74
75 const struct Curl_handler Curl_handler_mqtt = {
76   "MQTT",                             /* scheme */
77   mqtt_setup_conn,                    /* setup_connection */
78   mqtt_do,                            /* do_it */
79   mqtt_done,                          /* done */
80   ZERO_NULL,                          /* do_more */
81   ZERO_NULL,                          /* connect_it */
82   ZERO_NULL,                          /* connecting */
83   mqtt_doing,                         /* doing */
84   ZERO_NULL,                          /* proto_getsock */
85   mqtt_getsock,                       /* doing_getsock */
86   ZERO_NULL,                          /* domore_getsock */
87   ZERO_NULL,                          /* perform_getsock */
88   ZERO_NULL,                          /* disconnect */
89   ZERO_NULL,                          /* readwrite */
90   ZERO_NULL,                          /* connection_check */
91   ZERO_NULL,                          /* attach connection */
92   PORT_MQTT,                          /* defport */
93   CURLPROTO_MQTT,                     /* protocol */
94   CURLPROTO_MQTT,                     /* family */
95   PROTOPT_NONE                        /* flags */
96 };
97
98 static CURLcode mqtt_setup_conn(struct Curl_easy *data,
99                                 struct connectdata *conn)
100 {
101   /* allocate the HTTP-specific struct for the Curl_easy, only to survive
102      during this request */
103   struct MQTT *mq;
104   (void)conn;
105   DEBUGASSERT(data->req.p.mqtt == NULL);
106
107   mq = calloc(1, sizeof(struct MQTT));
108   if(!mq)
109     return CURLE_OUT_OF_MEMORY;
110   data->req.p.mqtt = mq;
111   return CURLE_OK;
112 }
113
114 static CURLcode mqtt_send(struct Curl_easy *data,
115                           char *buf, size_t len)
116 {
117   CURLcode result = CURLE_OK;
118   struct connectdata *conn = data->conn;
119   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
120   struct MQTT *mq = data->req.p.mqtt;
121   ssize_t n;
122   result = Curl_write(data, sockfd, buf, len, &n);
123   if(!result)
124     Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
125   if(len != (size_t)n) {
126     size_t nsend = len - n;
127     char *sendleftovers = Curl_memdup(&buf[n], nsend);
128     if(!sendleftovers)
129       return CURLE_OUT_OF_MEMORY;
130     mq->sendleftovers = sendleftovers;
131     mq->nsend = nsend;
132   }
133   else {
134     mq->sendleftovers = NULL;
135     mq->nsend = 0;
136   }
137   return result;
138 }
139
140 /* Generic function called by the multi interface to figure out what socket(s)
141    to wait for and for what actions during the DOING and PROTOCONNECT
142    states */
143 static int mqtt_getsock(struct Curl_easy *data,
144                         struct connectdata *conn,
145                         curl_socket_t *sock)
146 {
147   (void)data;
148   sock[0] = conn->sock[FIRSTSOCKET];
149   return GETSOCK_READSOCK(FIRSTSOCKET);
150 }
151
152 static int mqtt_encode_len(char *buf, size_t len)
153 {
154   unsigned char encoded;
155   int i;
156
157   for(i = 0; (len > 0) && (i<4); i++) {
158     encoded = len % 0x80;
159     len /= 0x80;
160     if(len)
161       encoded |= 0x80;
162     buf[i] = encoded;
163   }
164
165   return i;
166 }
167
168 /* add the passwd to the CONNECT packet */
169 static int add_passwd(const char *passwd, const size_t plen,
170                        char *pkt, const size_t start, int remain_pos)
171 {
172   /* magic number that need to be set properly */
173   const size_t conn_flags_pos = remain_pos + 8;
174   if(plen > 0xffff)
175     return 1;
176
177   /* set password flag */
178   pkt[conn_flags_pos] |= 0x40;
179
180   /* length of password provided */
181   pkt[start] = (char)((plen >> 8) & 0xFF);
182   pkt[start + 1] = (char)(plen & 0xFF);
183   memcpy(&pkt[start + 2], passwd, plen);
184   return 0;
185 }
186
187 /* add user to the CONN packet */
188 static int add_user(const char *username, const size_t ulen,
189                     unsigned char *pkt, const size_t start, int remain_pos)
190 {
191   /* magic number that need to be set properly */
192   const size_t conn_flags_pos = remain_pos + 8;
193   if(ulen > 0xffff)
194     return 1;
195
196   /* set username flag */
197   pkt[conn_flags_pos] |= 0x80;
198   /* length of username provided */
199   pkt[start] = (unsigned char)((ulen >> 8) & 0xFF);
200   pkt[start + 1] = (unsigned char)(ulen & 0xFF);
201   memcpy(&pkt[start + 2], username, ulen);
202   return 0;
203 }
204
205 /* add client ID to the CONN packet */
206 static int add_client_id(const char *client_id, const size_t client_id_len,
207                          char *pkt, const size_t start)
208 {
209   if(client_id_len != MQTT_CLIENTID_LEN)
210     return 1;
211   pkt[start] = 0x00;
212   pkt[start + 1] = MQTT_CLIENTID_LEN;
213   memcpy(&pkt[start + 2], client_id, MQTT_CLIENTID_LEN);
214   return 0;
215 }
216
217 /* Set initial values of CONN packet */
218 static int init_connpack(char *packet, char *remain, int remain_pos)
219 {
220   /* Fixed header starts */
221   /* packet type */
222   packet[0] = MQTT_MSG_CONNECT;
223   /* remaining length field */
224   memcpy(&packet[1], remain, remain_pos);
225   /* Fixed header ends */
226
227   /* Variable header starts */
228   /* protocol length */
229   packet[remain_pos + 1] = 0x00;
230   packet[remain_pos + 2] = 0x04;
231   /* protocol name */
232   packet[remain_pos + 3] = 'M';
233   packet[remain_pos + 4] = 'Q';
234   packet[remain_pos + 5] = 'T';
235   packet[remain_pos + 6] = 'T';
236   /* protocol level */
237   packet[remain_pos + 7] = 0x04;
238   /* CONNECT flag: CleanSession */
239   packet[remain_pos + 8] = 0x02;
240   /* keep-alive 0 = disabled */
241   packet[remain_pos + 9] = 0x00;
242   packet[remain_pos + 10] = 0x3c;
243   /*end of variable header*/
244   return remain_pos + 10;
245 }
246
247 static CURLcode mqtt_connect(struct Curl_easy *data)
248 {
249   CURLcode result = CURLE_OK;
250   int pos = 0;
251   int rc = 0;
252   /*remain length*/
253   int remain_pos = 0;
254   char remain[4] = {0};
255   size_t packetlen = 0;
256   size_t payloadlen = 0;
257   size_t start_user = 0;
258   size_t start_pwd = 0;
259   char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
260   const size_t clen = strlen("curl");
261   char *packet = NULL;
262
263   /* extracting username from request */
264   const char *username = data->state.aptr.user ?
265     data->state.aptr.user : "";
266   const size_t ulen = strlen(username);
267   /* extracting password from request */
268   const char *passwd = data->state.aptr.passwd ?
269     data->state.aptr.passwd : "";
270   const size_t plen = strlen(passwd);
271
272   payloadlen = ulen + plen + MQTT_CLIENTID_LEN + 2;
273   /* The plus 2 are for the MSB and LSB describing the length of the string to
274    * be added on the payload. Refer to spec 1.5.2 and 1.5.4 */
275   if(ulen)
276     payloadlen += 2;
277   if(plen)
278     payloadlen += 2;
279
280   /* getting how much occupy the remain length */
281   remain_pos = mqtt_encode_len(remain, payloadlen + 10);
282
283   /* 10 length of variable header and 1 the first byte of the fixed header */
284   packetlen = payloadlen + 10 + remain_pos + 1;
285
286   /* allocating packet */
287   if(packetlen > 268435455)
288     return CURLE_WEIRD_SERVER_REPLY;
289   packet = malloc(packetlen);
290   if(!packet)
291     return CURLE_OUT_OF_MEMORY;
292   memset(packet, 0, packetlen);
293
294   /* set initial values for CONN pack */
295   pos = init_connpack(packet, remain, remain_pos);
296
297   result = Curl_rand_hex(data, (unsigned char *)&client_id[clen],
298                          MQTT_CLIENTID_LEN - clen + 1);
299   /* add client id */
300   rc = add_client_id(client_id, strlen(client_id), packet, pos + 1);
301   if(rc) {
302     failf(data, "Client ID length mismatched: [%lu]", strlen(client_id));
303     result = CURLE_WEIRD_SERVER_REPLY;
304     goto end;
305   }
306   infof(data, "Using client id '%s'", client_id);
307
308   /* position where starts the user payload */
309   start_user = pos + 3 + MQTT_CLIENTID_LEN;
310   /* position where starts the password payload */
311   start_pwd = start_user + ulen;
312   /* if user name was provided, add it to the packet */
313   if(ulen) {
314     start_pwd += 2;
315
316     rc = add_user(username, ulen,
317                   (unsigned char *)packet, start_user, remain_pos);
318     if(rc) {
319       failf(data, "Username is too large: [%lu]", ulen);
320       result = CURLE_WEIRD_SERVER_REPLY;
321       goto end;
322     }
323   }
324
325   /* if passwd was provided, add it to the packet */
326   if(plen) {
327     rc = add_passwd(passwd, plen, packet, start_pwd, remain_pos);
328     if(rc) {
329       failf(data, "Password is too large: [%lu]", plen);
330       result = CURLE_WEIRD_SERVER_REPLY;
331       goto end;
332     }
333   }
334
335   if(!result)
336     result = mqtt_send(data, packet, packetlen);
337
338 end:
339   if(packet)
340     free(packet);
341   Curl_safefree(data->state.aptr.user);
342   Curl_safefree(data->state.aptr.passwd);
343   return result;
344 }
345
346 static CURLcode mqtt_disconnect(struct Curl_easy *data)
347 {
348   CURLcode result = CURLE_OK;
349   struct MQTT *mq = data->req.p.mqtt;
350   result = mqtt_send(data, (char *)"\xe0\x00", 2);
351   Curl_safefree(mq->sendleftovers);
352   return result;
353 }
354
355 static CURLcode mqtt_verify_connack(struct Curl_easy *data)
356 {
357   CURLcode result;
358   struct connectdata *conn = data->conn;
359   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
360   unsigned char readbuf[MQTT_CONNACK_LEN];
361   ssize_t nread;
362
363   result = Curl_read(data, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
364   if(result)
365     goto fail;
366
367   Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
368
369   /* fixme */
370   if(nread < MQTT_CONNACK_LEN) {
371     result = CURLE_WEIRD_SERVER_REPLY;
372     goto fail;
373   }
374
375   /* verify CONNACK */
376   if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
377     failf(data, "Expected %02x%02x but got %02x%02x",
378           0x00, 0x00, readbuf[0], readbuf[1]);
379     result = CURLE_WEIRD_SERVER_REPLY;
380   }
381
382 fail:
383   return result;
384 }
385
386 static CURLcode mqtt_get_topic(struct Curl_easy *data,
387                                char **topic, size_t *topiclen)
388 {
389   char *path = data->state.up.path;
390   if(strlen(path) > 1)
391     return Curl_urldecode(path + 1, 0, topic, topiclen, REJECT_NADA);
392   failf(data, "No MQTT topic found. Forgot to URL encode it?");
393   return CURLE_URL_MALFORMAT;
394 }
395
396 static CURLcode mqtt_subscribe(struct Curl_easy *data)
397 {
398   CURLcode result = CURLE_OK;
399   char *topic = NULL;
400   size_t topiclen;
401   unsigned char *packet = NULL;
402   size_t packetlen;
403   char encodedsize[4];
404   size_t n;
405   struct connectdata *conn = data->conn;
406
407   result = mqtt_get_topic(data, &topic, &topiclen);
408   if(result)
409     goto fail;
410
411   conn->proto.mqtt.packetid++;
412
413   packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
414                                + 2 bytes topic length + QoS byte */
415   n = mqtt_encode_len((char *)encodedsize, packetlen);
416   packetlen += n + 1; /* add one for the control packet type byte */
417
418   packet = malloc(packetlen);
419   if(!packet) {
420     result = CURLE_OUT_OF_MEMORY;
421     goto fail;
422   }
423
424   packet[0] = MQTT_MSG_SUBSCRIBE;
425   memcpy(&packet[1], encodedsize, n);
426   packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
427   packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
428   packet[3 + n] = (topiclen >> 8) & 0xff;
429   packet[4 + n ] = topiclen & 0xff;
430   memcpy(&packet[5 + n], topic, topiclen);
431   packet[5 + n + topiclen] = 0; /* QoS zero */
432
433   result = mqtt_send(data, (char *)packet, packetlen);
434
435 fail:
436   free(topic);
437   free(packet);
438   return result;
439 }
440
441 /*
442  * Called when the first byte was already read.
443  */
444 static CURLcode mqtt_verify_suback(struct Curl_easy *data)
445 {
446   CURLcode result;
447   struct connectdata *conn = data->conn;
448   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
449   unsigned char readbuf[MQTT_SUBACK_LEN];
450   ssize_t nread;
451   struct mqtt_conn *mqtt = &conn->proto.mqtt;
452
453   result = Curl_read(data, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
454   if(result)
455     goto fail;
456
457   Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
458
459   /* fixme */
460   if(nread < MQTT_SUBACK_LEN) {
461     result = CURLE_WEIRD_SERVER_REPLY;
462     goto fail;
463   }
464
465   /* verify SUBACK */
466   if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
467      readbuf[1] != (mqtt->packetid & 0xff) ||
468      readbuf[2] != 0x00)
469     result = CURLE_WEIRD_SERVER_REPLY;
470
471 fail:
472   return result;
473 }
474
475 static CURLcode mqtt_publish(struct Curl_easy *data)
476 {
477   CURLcode result;
478   char *payload = data->set.postfields;
479   size_t payloadlen;
480   char *topic = NULL;
481   size_t topiclen;
482   unsigned char *pkt = NULL;
483   size_t i = 0;
484   size_t remaininglength;
485   size_t encodelen;
486   char encodedbytes[4];
487   curl_off_t postfieldsize = data->set.postfieldsize;
488
489   if(!payload)
490     return CURLE_BAD_FUNCTION_ARGUMENT;
491   if(postfieldsize < 0)
492     payloadlen = strlen(payload);
493   else
494     payloadlen = (size_t)postfieldsize;
495
496   result = mqtt_get_topic(data, &topic, &topiclen);
497   if(result)
498     goto fail;
499
500   remaininglength = payloadlen + 2 + topiclen;
501   encodelen = mqtt_encode_len(encodedbytes, remaininglength);
502
503   /* add the control byte and the encoded remaining length */
504   pkt = malloc(remaininglength + 1 + encodelen);
505   if(!pkt) {
506     result = CURLE_OUT_OF_MEMORY;
507     goto fail;
508   }
509
510   /* assemble packet */
511   pkt[i++] = MQTT_MSG_PUBLISH;
512   memcpy(&pkt[i], encodedbytes, encodelen);
513   i += encodelen;
514   pkt[i++] = (topiclen >> 8) & 0xff;
515   pkt[i++] = (topiclen & 0xff);
516   memcpy(&pkt[i], topic, topiclen);
517   i += topiclen;
518   memcpy(&pkt[i], payload, payloadlen);
519   i += payloadlen;
520   result = mqtt_send(data, (char *)pkt, i);
521
522 fail:
523   free(pkt);
524   free(topic);
525   return result;
526 }
527
528 static size_t mqtt_decode_len(unsigned char *buf,
529                               size_t buflen, size_t *lenbytes)
530 {
531   size_t len = 0;
532   size_t mult = 1;
533   size_t i;
534   unsigned char encoded = 128;
535
536   for(i = 0; (i < buflen) && (encoded & 128); i++) {
537     encoded = buf[i];
538     len += (encoded & 127) * mult;
539     mult *= 128;
540   }
541
542   if(lenbytes)
543     *lenbytes = i;
544
545   return len;
546 }
547
548 #ifdef CURLDEBUG
549 static const char *statenames[]={
550   "MQTT_FIRST",
551   "MQTT_REMAINING_LENGTH",
552   "MQTT_CONNACK",
553   "MQTT_SUBACK",
554   "MQTT_SUBACK_COMING",
555   "MQTT_PUBWAIT",
556   "MQTT_PUB_REMAIN",
557
558   "NOT A STATE"
559 };
560 #endif
561
562 /* The only way to change state */
563 static void mqstate(struct Curl_easy *data,
564                     enum mqttstate state,
565                     enum mqttstate nextstate) /* used if state == FIRST */
566 {
567   struct connectdata *conn = data->conn;
568   struct mqtt_conn *mqtt = &conn->proto.mqtt;
569 #ifdef CURLDEBUG
570   infof(data, "%s (from %s) (next is %s)",
571         statenames[state],
572         statenames[mqtt->state],
573         (state == MQTT_FIRST)? statenames[nextstate] : "");
574 #endif
575   mqtt->state = state;
576   if(state == MQTT_FIRST)
577     mqtt->nextstate = nextstate;
578 }
579
580
581 /* for the publish packet */
582 #define MQTT_HEADER_LEN 5    /* max 5 bytes */
583
584 static CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done)
585 {
586   CURLcode result = CURLE_OK;
587   struct connectdata *conn = data->conn;
588   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
589   ssize_t nread;
590   unsigned char *pkt = (unsigned char *)data->state.buffer;
591   size_t remlen;
592   struct mqtt_conn *mqtt = &conn->proto.mqtt;
593   struct MQTT *mq = data->req.p.mqtt;
594   unsigned char packet;
595
596   switch(mqtt->state) {
597   MQTT_SUBACK_COMING:
598   case MQTT_SUBACK_COMING:
599     result = mqtt_verify_suback(data);
600     if(result)
601       break;
602
603     mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
604     break;
605
606   case MQTT_SUBACK:
607   case MQTT_PUBWAIT:
608     /* we are expecting PUBLISH or SUBACK */
609     packet = mq->firstbyte & 0xf0;
610     if(packet == MQTT_MSG_PUBLISH)
611       mqstate(data, MQTT_PUB_REMAIN, MQTT_NOSTATE);
612     else if(packet == MQTT_MSG_SUBACK) {
613       mqstate(data, MQTT_SUBACK_COMING, MQTT_NOSTATE);
614       goto MQTT_SUBACK_COMING;
615     }
616     else if(packet == MQTT_MSG_DISCONNECT) {
617       infof(data, "Got DISCONNECT");
618       *done = TRUE;
619       goto end;
620     }
621     else {
622       result = CURLE_WEIRD_SERVER_REPLY;
623       goto end;
624     }
625
626     /* -- switched state -- */
627     remlen = mq->remaining_length;
628     infof(data, "Remaining length: %zd bytes", remlen);
629     if(data->set.max_filesize &&
630        (curl_off_t)remlen > data->set.max_filesize) {
631       failf(data, "Maximum file size exceeded");
632       result = CURLE_FILESIZE_EXCEEDED;
633       goto end;
634     }
635     Curl_pgrsSetDownloadSize(data, remlen);
636     data->req.bytecount = 0;
637     data->req.size = remlen;
638     mq->npacket = remlen; /* get this many bytes */
639     /* FALLTHROUGH */
640   case MQTT_PUB_REMAIN: {
641     /* read rest of packet, but no more. Cap to buffer size */
642     struct SingleRequest *k = &data->req;
643     size_t rest = mq->npacket;
644     if(rest > (size_t)data->set.buffer_size)
645       rest = (size_t)data->set.buffer_size;
646     result = Curl_read(data, sockfd, (char *)pkt, rest, &nread);
647     if(result) {
648       if(CURLE_AGAIN == result) {
649         infof(data, "EEEE AAAAGAIN");
650       }
651       goto end;
652     }
653     if(!nread) {
654       infof(data, "server disconnected");
655       result = CURLE_PARTIAL_FILE;
656       goto end;
657     }
658     Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
659
660     mq->npacket -= nread;
661     k->bytecount += nread;
662     Curl_pgrsSetDownloadCounter(data, k->bytecount);
663
664     /* if QoS is set, message contains packet id */
665
666     result = Curl_client_write(data, CLIENTWRITE_BODY, (char *)pkt, nread);
667     if(result)
668       goto end;
669
670     if(!mq->npacket)
671       /* no more PUBLISH payload, back to subscribe wait state */
672       mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
673     break;
674   }
675   default:
676     DEBUGASSERT(NULL); /* illegal state */
677     result = CURLE_WEIRD_SERVER_REPLY;
678     goto end;
679   }
680   end:
681   return result;
682 }
683
684 static CURLcode mqtt_do(struct Curl_easy *data, bool *done)
685 {
686   CURLcode result = CURLE_OK;
687   *done = FALSE; /* unconditionally */
688
689   result = mqtt_connect(data);
690   if(result) {
691     failf(data, "Error %d sending MQTT CONN request", result);
692     return result;
693   }
694   mqstate(data, MQTT_FIRST, MQTT_CONNACK);
695   return CURLE_OK;
696 }
697
698 static CURLcode mqtt_done(struct Curl_easy *data,
699                           CURLcode status, bool premature)
700 {
701   struct MQTT *mq = data->req.p.mqtt;
702   (void)status;
703   (void)premature;
704   Curl_safefree(mq->sendleftovers);
705   return CURLE_OK;
706 }
707
708 static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
709 {
710   CURLcode result = CURLE_OK;
711   struct connectdata *conn = data->conn;
712   struct mqtt_conn *mqtt = &conn->proto.mqtt;
713   struct MQTT *mq = data->req.p.mqtt;
714   ssize_t nread;
715   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
716   unsigned char *pkt = (unsigned char *)data->state.buffer;
717   unsigned char byte;
718
719   *done = FALSE;
720
721   if(mq->nsend) {
722     /* send the remainder of an outgoing packet */
723     char *ptr = mq->sendleftovers;
724     result = mqtt_send(data, mq->sendleftovers, mq->nsend);
725     free(ptr);
726     if(result)
727       return result;
728   }
729
730   infof(data, "mqtt_doing: state [%d]", (int) mqtt->state);
731   switch(mqtt->state) {
732   case MQTT_FIRST:
733     /* Read the initial byte only */
734     result = Curl_read(data, sockfd, (char *)&mq->firstbyte, 1, &nread);
735     if(result)
736       break;
737     else if(!nread) {
738       failf(data, "Connection disconnected");
739       *done = TRUE;
740       result = CURLE_RECV_ERROR;
741       break;
742     }
743     Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
744     /* remember the first byte */
745     mq->npacket = 0;
746     mqstate(data, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
747     /* FALLTHROUGH */
748   case MQTT_REMAINING_LENGTH:
749     do {
750       result = Curl_read(data, sockfd, (char *)&byte, 1, &nread);
751       if(!nread)
752         break;
753       Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
754       pkt[mq->npacket++] = byte;
755     } while((byte & 0x80) && (mq->npacket < 4));
756     if(nread && (byte & 0x80))
757       /* MQTT supports up to 127 * 128^0 + 127 * 128^1 + 127 * 128^2 +
758          127 * 128^3 bytes. server tried to send more */
759       result = CURLE_WEIRD_SERVER_REPLY;
760     if(result)
761       break;
762     mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
763     mq->npacket = 0;
764     if(mq->remaining_length) {
765       mqstate(data, mqtt->nextstate, MQTT_NOSTATE);
766       break;
767     }
768     mqstate(data, MQTT_FIRST, MQTT_FIRST);
769
770     if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
771       infof(data, "Got DISCONNECT");
772       *done = TRUE;
773     }
774     break;
775   case MQTT_CONNACK:
776     result = mqtt_verify_connack(data);
777     if(result)
778       break;
779
780     if(data->state.httpreq == HTTPREQ_POST) {
781       result = mqtt_publish(data);
782       if(!result) {
783         result = mqtt_disconnect(data);
784         *done = TRUE;
785       }
786       mqtt->nextstate = MQTT_FIRST;
787     }
788     else {
789       result = mqtt_subscribe(data);
790       if(!result) {
791         mqstate(data, MQTT_FIRST, MQTT_SUBACK);
792       }
793     }
794     break;
795
796   case MQTT_SUBACK:
797   case MQTT_PUBWAIT:
798   case MQTT_PUB_REMAIN:
799     result = mqtt_read_publish(data, done);
800     break;
801
802   default:
803     failf(data, "State not handled yet");
804     *done = TRUE;
805     break;
806   }
807
808   if(result == CURLE_AGAIN)
809     result = CURLE_OK;
810   return result;
811 }
812
813 #endif /* CURL_DISABLE_MQTT */