openssl: guard against OOM on context creation
[platform/upstream/curl.git] / lib / mqtt.c
1 /***************************************************************************
2  *                                  _   _ ____  _
3  *  Project                     ___| | | |  _ \| |
4  *                             / __| | | | |_) | |
5  *                            | (__| |_| |  _ <| |___
6  *                             \___|\___/|_| \_\_____|
7  *
8  * Copyright (C) 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
9  * Copyright (C) 2019, Björn Stenberg, <bjorn@haxx.se>
10  *
11  * This software is licensed as described in the file COPYING, which
12  * you should have received as part of this distribution. The terms
13  * are also available at https://curl.haxx.se/docs/copyright.html.
14  *
15  * You may opt to use, copy, modify, merge, publish, distribute and/or sell
16  * copies of the Software, and permit persons to whom the Software is
17  * furnished to do so, under the terms of the COPYING file.
18  *
19  * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20  * KIND, either express or implied.
21  *
22  ***************************************************************************/
23
24 #include "curl_setup.h"
25
26 #ifndef CURL_DISABLE_MQTT
27
28 #include "urldata.h"
29 #include <curl/curl.h>
30 #include "transfer.h"
31 #include "sendf.h"
32 #include "progress.h"
33 #include "mqtt.h"
34 #include "select.h"
35 #include "strdup.h"
36 #include "url.h"
37 #include "escape.h"
38 #include "warnless.h"
39 #include "curl_printf.h"
40 #include "curl_memory.h"
41 #include "multiif.h"
42 #include "rand.h"
43
44 /* The last #include file should be: */
45 #include "memdebug.h"
46
47 #define MQTT_MSG_CONNECT   0x10
48 #define MQTT_MSG_CONNACK   0x20
49 #define MQTT_MSG_PUBLISH   0x30
50 #define MQTT_MSG_SUBSCRIBE 0x82
51 #define MQTT_MSG_SUBACK    0x90
52 #define MQTT_MSG_DISCONNECT 0xe0
53
54 #define MQTT_CONNACK_LEN 2
55 #define MQTT_SUBACK_LEN 3
56 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
57
58 /*
59  * Forward declarations.
60  */
61
62 static CURLcode mqtt_do(struct connectdata *conn, bool *done);
63 static CURLcode mqtt_doing(struct connectdata *conn, bool *done);
64 static int mqtt_getsock(struct connectdata *conn, curl_socket_t *sock);
65 static CURLcode mqtt_setup_conn(struct connectdata *conn);
66
67 /*
68  * MQTT protocol handler.
69  */
70
71 const struct Curl_handler Curl_handler_mqtt = {
72   "MQTT",                             /* scheme */
73   mqtt_setup_conn,                    /* setup_connection */
74   mqtt_do,                            /* do_it */
75   ZERO_NULL,                          /* done */
76   ZERO_NULL,                          /* do_more */
77   ZERO_NULL,                          /* connect_it */
78   ZERO_NULL,                          /* connecting */
79   mqtt_doing,                         /* doing */
80   ZERO_NULL,                          /* proto_getsock */
81   mqtt_getsock,                       /* doing_getsock */
82   ZERO_NULL,                          /* domore_getsock */
83   ZERO_NULL,                          /* perform_getsock */
84   ZERO_NULL,                          /* disconnect */
85   ZERO_NULL,                          /* readwrite */
86   ZERO_NULL,                          /* connection_check */
87   PORT_MQTT,                          /* defport */
88   CURLPROTO_MQTT,                     /* protocol */
89   CURLPROTO_MQTT,                     /* family */
90   PROTOPT_NONE                        /* flags */
91 };
92
93 static CURLcode mqtt_setup_conn(struct connectdata *conn)
94 {
95   /* allocate the HTTP-specific struct for the Curl_easy, only to survive
96      during this request */
97   struct MQTT *mq;
98   struct Curl_easy *data = conn->data;
99   DEBUGASSERT(data->req.protop == NULL);
100
101   mq = calloc(1, sizeof(struct MQTT));
102   if(!mq)
103     return CURLE_OUT_OF_MEMORY;
104   data->req.protop = mq;
105   return CURLE_OK;
106 }
107
108 static CURLcode mqtt_send(struct connectdata *conn,
109                           char *buf, size_t len)
110 {
111   CURLcode result = CURLE_OK;
112   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
113   struct Curl_easy *data = conn->data;
114   struct MQTT *mq = data->req.protop;
115   ssize_t n;
116   result = Curl_write(conn, sockfd, buf, len, &n);
117   if(!result && data->set.verbose)
118     Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
119   if(len != (size_t)n) {
120     size_t nsend = len - n;
121     char *sendleftovers = Curl_memdup(&buf[n], nsend);
122     if(!sendleftovers)
123       return CURLE_OUT_OF_MEMORY;
124     mq->sendleftovers = sendleftovers;
125     mq->nsend = nsend;
126   }
127   return result;
128 }
129
130 /* Generic function called by the multi interface to figure out what socket(s)
131    to wait for and for what actions during the DOING and PROTOCONNECT
132    states */
133 static int mqtt_getsock(struct connectdata *conn,
134                         curl_socket_t *sock)
135 {
136   sock[0] = conn->sock[FIRSTSOCKET];
137   return GETSOCK_READSOCK(FIRSTSOCKET);
138 }
139
140 static CURLcode mqtt_connect(struct connectdata *conn)
141 {
142   CURLcode result = CURLE_OK;
143   const size_t client_id_offset = 14;
144   const size_t packetlen = client_id_offset + MQTT_CLIENTID_LEN;
145   char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
146   const size_t clen = strlen("curl");
147   char packet[32] = {
148     MQTT_MSG_CONNECT,  /* packet type */
149     0x00,              /* remaining length */
150     0x00, 0x04,        /* protocol length */
151     'M','Q','T','T',   /* protocol name */
152     0x04,              /* protocol level */
153     0x02,              /* CONNECT flag: CleanSession */
154     0x00, 0x3c,        /* keep-alive 0 = disabled */
155     0x00, 0x00         /* payload1 length */
156   };
157   packet[1] = (packetlen - 2) & 0x7f;
158   packet[client_id_offset - 1] = MQTT_CLIENTID_LEN;
159
160   result = Curl_rand_hex(conn->data, (unsigned char *)&client_id[clen],
161                          MQTT_CLIENTID_LEN - clen + 1);
162   memcpy(&packet[client_id_offset], client_id, MQTT_CLIENTID_LEN);
163   infof(conn->data, "Using client id '%s'\n", client_id);
164   if(!result)
165     result = mqtt_send(conn, packet, packetlen);
166   return result;
167 }
168
169 static CURLcode mqtt_disconnect(struct connectdata *conn)
170 {
171   CURLcode result = CURLE_OK;
172   result = mqtt_send(conn, (char *)"\xe0\x00", 2);
173   return result;
174 }
175
176 static CURLcode mqtt_verify_connack(struct connectdata *conn)
177 {
178   CURLcode result;
179   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
180   unsigned char readbuf[MQTT_CONNACK_LEN];
181   ssize_t nread;
182   struct Curl_easy *data = conn->data;
183
184   result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
185   if(result)
186     goto fail;
187
188   if(data->set.verbose)
189     Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
190
191   /* fixme */
192   if(nread < MQTT_CONNACK_LEN) {
193     result = CURLE_WEIRD_SERVER_REPLY;
194     goto fail;
195   }
196
197   /* verify CONNACK */
198   if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
199     failf(data, "Expected %02x%02x but got %02x%02x",
200           0x00, 0x00, readbuf[0], readbuf[1]);
201     result = CURLE_WEIRD_SERVER_REPLY;
202   }
203
204 fail:
205   return result;
206 }
207
208 static CURLcode mqtt_get_topic(struct connectdata *conn,
209                                char **topic, size_t *topiclen)
210 {
211   CURLcode result = CURLE_OK;
212   char *path = conn->data->state.up.path;
213
214   if(strlen(path) > 1) {
215     result = Curl_urldecode(conn->data, path + 1, 0, topic, topiclen,
216                             REJECT_NADA);
217   }
218   else {
219     failf(conn->data, "Error: No topic specified.");
220     result = CURLE_URL_MALFORMAT;
221   }
222   return result;
223 }
224
225
226 static int mqtt_encode_len(char *buf, size_t len)
227 {
228   unsigned char encoded;
229   int i;
230
231   for(i = 0; (len > 0) && (i<4); i++) {
232     encoded = len % 0x80;
233     len /= 0x80;
234     if(len)
235       encoded |= 0x80;
236     buf[i] = encoded;
237   }
238
239   return i;
240 }
241
242 static CURLcode mqtt_subscribe(struct connectdata *conn)
243 {
244   CURLcode result = CURLE_OK;
245   char *topic = NULL;
246   size_t topiclen;
247   unsigned char *packet = NULL;
248   size_t packetlen;
249   char encodedsize[4];
250   size_t n;
251
252   result = mqtt_get_topic(conn, &topic, &topiclen);
253   if(result)
254     goto fail;
255
256   conn->proto.mqtt.packetid++;
257
258   packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
259                                + 2 bytes topic length + QoS byte */
260   n = mqtt_encode_len((char *)encodedsize, packetlen);
261   packetlen += n + 1; /* add one for the control packet type byte */
262
263   packet = malloc(packetlen);
264   if(!packet) {
265     result = CURLE_OUT_OF_MEMORY;
266     goto fail;
267   }
268
269   packet[0] = MQTT_MSG_SUBSCRIBE;
270   memcpy(&packet[1], encodedsize, n);
271   packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
272   packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
273   packet[3 + n] = (topiclen >> 8) & 0xff;
274   packet[4 + n ] = topiclen & 0xff;
275   memcpy(&packet[5 + n], topic, topiclen);
276   packet[5 + n + topiclen] = 0; /* QoS zero */
277
278   result = mqtt_send(conn, (char *)packet, packetlen);
279
280 fail:
281   free(topic);
282   free(packet);
283   return result;
284 }
285
286 /*
287  * Called when the first byte was already read.
288  */
289 static CURLcode mqtt_verify_suback(struct connectdata *conn)
290 {
291   CURLcode result;
292   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
293   unsigned char readbuf[MQTT_SUBACK_LEN];
294   ssize_t nread;
295   struct mqtt_conn *mqtt = &conn->proto.mqtt;
296
297   result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
298   if(result)
299     goto fail;
300
301   if(conn->data->set.verbose)
302     Curl_debug(conn->data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
303
304   /* fixme */
305   if(nread < MQTT_SUBACK_LEN) {
306     result = CURLE_WEIRD_SERVER_REPLY;
307     goto fail;
308   }
309
310   /* verify SUBACK */
311   if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
312      readbuf[1] != (mqtt->packetid & 0xff) ||
313      readbuf[2] != 0x00)
314     result = CURLE_WEIRD_SERVER_REPLY;
315
316 fail:
317   return result;
318 }
319
320 static CURLcode mqtt_publish(struct connectdata *conn)
321 {
322   CURLcode result;
323   char *payload = conn->data->set.postfields;
324   size_t payloadlen = (size_t)conn->data->set.postfieldsize;
325   char *topic = NULL;
326   size_t topiclen;
327   unsigned char *pkt = NULL;
328   size_t i = 0;
329   size_t remaininglength;
330   size_t encodelen;
331   char encodedbytes[4];
332
333   result = mqtt_get_topic(conn, &topic, &topiclen);
334   if(result)
335     goto fail;
336
337   remaininglength = payloadlen + 2 + topiclen;
338   encodelen = mqtt_encode_len(encodedbytes, remaininglength);
339
340   /* add the control byte and the encoded remaining length */
341   pkt = malloc(remaininglength + 1 + encodelen);
342   if(!pkt) {
343     result = CURLE_OUT_OF_MEMORY;
344     goto fail;
345   }
346
347   /* assemble packet */
348   pkt[i++] = MQTT_MSG_PUBLISH;
349   memcpy(&pkt[i], encodedbytes, encodelen);
350   i += encodelen;
351   pkt[i++] = (topiclen >> 8) & 0xff;
352   pkt[i++] = (topiclen & 0xff);
353   memcpy(&pkt[i], topic, topiclen);
354   i += topiclen;
355   memcpy(&pkt[i], payload, payloadlen);
356   i += payloadlen;
357   result = mqtt_send(conn, (char *)pkt, i);
358
359 fail:
360   free(pkt);
361   free(topic);
362   return result;
363 }
364
365 static size_t mqtt_decode_len(unsigned char *buf,
366                               size_t buflen, size_t *lenbytes)
367 {
368   size_t len = 0;
369   size_t mult = 1;
370   size_t i;
371   unsigned char encoded = 128;
372
373   for(i = 0; (i < buflen) && (encoded & 128); i++) {
374     encoded = buf[i];
375     len += (encoded & 127) * mult;
376     mult *= 128;
377   }
378
379   if(lenbytes)
380     *lenbytes = i;
381
382   return len;
383 }
384
385 #ifdef CURLDEBUG
386 static const char *statenames[]={
387   "MQTT_FIRST",
388   "MQTT_REMAINING_LENGTH",
389   "MQTT_CONNACK",
390   "MQTT_SUBACK",
391   "MQTT_SUBACK_COMING",
392   "MQTT_PUBWAIT",
393   "MQTT_PUB_REMAIN",
394
395   "NOT A STATE"
396 };
397 #endif
398
399 /* The only way to change state */
400 static void mqstate(struct connectdata *conn,
401                     enum mqttstate state,
402                     enum mqttstate nextstate) /* used if state == FIRST */
403 {
404   struct mqtt_conn *mqtt = &conn->proto.mqtt;
405 #ifdef CURLDEBUG
406   infof(conn->data, "%s (from %s) (next is %s)\n",
407         statenames[state],
408         statenames[mqtt->state],
409         (state == MQTT_FIRST)? statenames[nextstate] : "");
410 #endif
411   mqtt->state = state;
412   if(state == MQTT_FIRST)
413     mqtt->nextstate = nextstate;
414 }
415
416
417 /* for the publish packet */
418 #define MQTT_HEADER_LEN 5    /* max 5 bytes */
419
420 static CURLcode mqtt_read_publish(struct connectdata *conn,
421                                   bool *done)
422 {
423   CURLcode result = CURLE_OK;
424   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
425   ssize_t nread;
426   struct Curl_easy *data = conn->data;
427   unsigned char *pkt = (unsigned char *)data->state.buffer;
428   size_t remlen;
429   struct mqtt_conn *mqtt = &conn->proto.mqtt;
430   struct MQTT *mq = data->req.protop;
431   unsigned char packet;
432
433   switch(mqtt->state) {
434   MQTT_SUBACK_COMING:
435   case MQTT_SUBACK_COMING:
436     result = mqtt_verify_suback(conn);
437     if(result)
438       break;
439
440     mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
441     break;
442
443   case MQTT_SUBACK:
444   case MQTT_PUBWAIT:
445     /* we are expecting PUBLISH or SUBACK */
446     packet = mq->firstbyte & 0xf0;
447     if(packet == MQTT_MSG_PUBLISH)
448       mqstate(conn, MQTT_PUB_REMAIN, MQTT_NOSTATE);
449     else if(packet == MQTT_MSG_SUBACK) {
450       mqstate(conn, MQTT_SUBACK_COMING, MQTT_NOSTATE);
451       goto MQTT_SUBACK_COMING;
452     }
453     else if(packet == MQTT_MSG_DISCONNECT) {
454       infof(data, "Got DISCONNECT\n");
455       *done = TRUE;
456       goto end;
457     }
458     else {
459       result = CURLE_WEIRD_SERVER_REPLY;
460       goto end;
461     }
462
463     /* -- switched state -- */
464     remlen = mq->remaining_length;
465     infof(data, "Remaining length: %zd bytes\n", remlen);
466     Curl_pgrsSetDownloadSize(data, remlen);
467     data->req.bytecount = 0;
468     data->req.size = remlen;
469     mq->npacket = remlen; /* get this many bytes */
470     /* FALLTHROUGH */
471   case MQTT_PUB_REMAIN: {
472     /* read rest of packet, but no more. Cap to buffer size */
473     struct SingleRequest *k = &data->req;
474     size_t rest = mq->npacket;
475     if(rest > (size_t)data->set.buffer_size)
476       rest = (size_t)data->set.buffer_size;
477     result = Curl_read(conn, sockfd, (char *)pkt, rest, &nread);
478     if(result) {
479       if(CURLE_AGAIN == result) {
480         infof(data, "EEEE AAAAGAIN\n");
481       }
482       goto end;
483     }
484     if(!nread) {
485       infof(data, "server disconnected\n");
486       result = CURLE_PARTIAL_FILE;
487       goto end;
488     }
489     if(data->set.verbose)
490       Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
491
492     mq->npacket -= nread;
493     k->bytecount += nread;
494     Curl_pgrsSetDownloadCounter(data, k->bytecount);
495
496     /* if QoS is set, message contains packet id */
497
498     result = Curl_client_write(conn, CLIENTWRITE_BODY, (char *)pkt, nread);
499     if(result)
500       goto end;
501
502     if(!mq->npacket)
503       /* no more PUBLISH payload, back to subscribe wait state */
504       mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
505     break;
506   }
507   default:
508     DEBUGASSERT(NULL); /* illegal state */
509     result = CURLE_WEIRD_SERVER_REPLY;
510     goto end;
511   }
512   end:
513   return result;
514 }
515
516 static CURLcode mqtt_do(struct connectdata *conn, bool *done)
517 {
518   CURLcode result = CURLE_OK;
519   struct Curl_easy *data = conn->data;
520
521   *done = FALSE; /* unconditionally */
522
523   result = mqtt_connect(conn);
524   if(result) {
525     failf(data, "Error %d sending MQTT CONN request", result);
526     return result;
527   }
528   mqstate(conn, MQTT_FIRST, MQTT_CONNACK);
529   return CURLE_OK;
530 }
531
532 static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
533 {
534   CURLcode result = CURLE_OK;
535   struct mqtt_conn *mqtt = &conn->proto.mqtt;
536   struct Curl_easy *data = conn->data;
537   struct MQTT *mq = data->req.protop;
538   ssize_t nread;
539   curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
540   unsigned char *pkt = (unsigned char *)data->state.buffer;
541   unsigned char byte;
542
543   *done = FALSE;
544
545   if(mq->nsend) {
546     /* send the remainder of an outgoing packet */
547     char *ptr = mq->sendleftovers;
548     result = mqtt_send(conn, mq->sendleftovers, mq->nsend);
549     free(ptr);
550     if(result)
551       return result;
552   }
553
554   infof(data, "mqtt_doing: state [%d]\n", (int) mqtt->state);
555   switch(mqtt->state) {
556   case MQTT_FIRST:
557     /* Read the initial byte only */
558     result = Curl_read(conn, sockfd, (char *)&mq->firstbyte, 1, &nread);
559     if(result)
560       break;
561     if(data->set.verbose)
562       Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
563     /* remember the first byte */
564     mq->npacket = 0;
565     mqstate(conn, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
566     /* FALLTHROUGH */
567   case MQTT_REMAINING_LENGTH:
568     do {
569       result = Curl_read(conn, sockfd, (char *)&byte, 1, &nread);
570       if(result)
571         break;
572       if(data->set.verbose)
573         Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
574       pkt[mq->npacket++] = byte;
575     } while((byte & 0x80) && (mq->npacket < 4));
576     if(result)
577       break;
578     mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
579     mq->npacket = 0;
580     if(mq->remaining_length) {
581       mqstate(conn, mqtt->nextstate, MQTT_NOSTATE);
582       break;
583     }
584     mqstate(conn, MQTT_FIRST, MQTT_FIRST);
585
586     if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
587       infof(data, "Got DISCONNECT\n");
588       *done = TRUE;
589     }
590     break;
591   case MQTT_CONNACK:
592     result = mqtt_verify_connack(conn);
593     if(result)
594       break;
595
596     if(conn->data->state.httpreq == HTTPREQ_POST) {
597       result = mqtt_publish(conn);
598       if(!result) {
599         result = mqtt_disconnect(conn);
600         *done = TRUE;
601       }
602       mqtt->nextstate = MQTT_FIRST;
603     }
604     else {
605       result = mqtt_subscribe(conn);
606       if(!result) {
607         mqstate(conn, MQTT_FIRST, MQTT_SUBACK);
608       }
609     }
610     break;
611
612   case MQTT_SUBACK:
613   case MQTT_PUBWAIT:
614   case MQTT_PUB_REMAIN:
615     result = mqtt_read_publish(conn, done);
616     break;
617
618   default:
619     failf(conn->data, "State not handled yet");
620     *done = TRUE;
621     break;
622   }
623
624   if(result == CURLE_AGAIN)
625     result = CURLE_OK;
626   return result;
627 }
628
629 #endif /* CURL_DISABLE_MQTT */