1 /***************************************************************************
3 * Project ___| | | | _ \| |
5 * | (__| |_| | _ <| |___
6 * \___|\___/|_| \_\_____|
8 * Copyright (C) 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
9 * Copyright (C) 2019, Björn Stenberg, <bjorn@haxx.se>
11 * This software is licensed as described in the file COPYING, which
12 * you should have received as part of this distribution. The terms
13 * are also available at https://curl.haxx.se/docs/copyright.html.
15 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
16 * copies of the Software, and permit persons to whom the Software is
17 * furnished to do so, under the terms of the COPYING file.
19 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20 * KIND, either express or implied.
22 ***************************************************************************/
24 #include "curl_setup.h"
26 #ifndef CURL_DISABLE_MQTT
29 #include <curl/curl.h>
39 #include "curl_printf.h"
40 #include "curl_memory.h"
44 /* The last #include file should be: */
47 #define MQTT_MSG_CONNECT 0x10
48 #define MQTT_MSG_CONNACK 0x20
49 #define MQTT_MSG_PUBLISH 0x30
50 #define MQTT_MSG_SUBSCRIBE 0x82
51 #define MQTT_MSG_SUBACK 0x90
52 #define MQTT_MSG_DISCONNECT 0xe0
54 #define MQTT_CONNACK_LEN 2
55 #define MQTT_SUBACK_LEN 3
56 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
59 * Forward declarations.
62 static CURLcode mqtt_do(struct connectdata *conn, bool *done);
63 static CURLcode mqtt_doing(struct connectdata *conn, bool *done);
64 static int mqtt_getsock(struct connectdata *conn, curl_socket_t *sock);
65 static CURLcode mqtt_setup_conn(struct connectdata *conn);
68 * MQTT protocol handler.
71 const struct Curl_handler Curl_handler_mqtt = {
73 mqtt_setup_conn, /* setup_connection */
76 ZERO_NULL, /* do_more */
77 ZERO_NULL, /* connect_it */
78 ZERO_NULL, /* connecting */
79 mqtt_doing, /* doing */
80 ZERO_NULL, /* proto_getsock */
81 mqtt_getsock, /* doing_getsock */
82 ZERO_NULL, /* domore_getsock */
83 ZERO_NULL, /* perform_getsock */
84 ZERO_NULL, /* disconnect */
85 ZERO_NULL, /* readwrite */
86 ZERO_NULL, /* connection_check */
87 PORT_MQTT, /* defport */
88 CURLPROTO_MQTT, /* protocol */
89 CURLPROTO_MQTT, /* family */
90 PROTOPT_NONE /* flags */
93 static CURLcode mqtt_setup_conn(struct connectdata *conn)
95 /* allocate the HTTP-specific struct for the Curl_easy, only to survive
96 during this request */
98 struct Curl_easy *data = conn->data;
99 DEBUGASSERT(data->req.protop == NULL);
101 mq = calloc(1, sizeof(struct MQTT));
103 return CURLE_OUT_OF_MEMORY;
104 data->req.protop = mq;
108 static CURLcode mqtt_send(struct connectdata *conn,
109 char *buf, size_t len)
111 CURLcode result = CURLE_OK;
112 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
113 struct Curl_easy *data = conn->data;
114 struct MQTT *mq = data->req.protop;
116 result = Curl_write(conn, sockfd, buf, len, &n);
117 if(!result && data->set.verbose)
118 Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
119 if(len != (size_t)n) {
120 size_t nsend = len - n;
121 char *sendleftovers = Curl_memdup(&buf[n], nsend);
123 return CURLE_OUT_OF_MEMORY;
124 mq->sendleftovers = sendleftovers;
130 /* Generic function called by the multi interface to figure out what socket(s)
131 to wait for and for what actions during the DOING and PROTOCONNECT
133 static int mqtt_getsock(struct connectdata *conn,
136 sock[0] = conn->sock[FIRSTSOCKET];
137 return GETSOCK_READSOCK(FIRSTSOCKET);
140 static CURLcode mqtt_connect(struct connectdata *conn)
142 CURLcode result = CURLE_OK;
143 const size_t client_id_offset = 14;
144 const size_t packetlen = client_id_offset + MQTT_CLIENTID_LEN;
145 char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
146 const size_t clen = strlen("curl");
148 MQTT_MSG_CONNECT, /* packet type */
149 0x00, /* remaining length */
150 0x00, 0x04, /* protocol length */
151 'M','Q','T','T', /* protocol name */
152 0x04, /* protocol level */
153 0x02, /* CONNECT flag: CleanSession */
154 0x00, 0x3c, /* keep-alive 0 = disabled */
155 0x00, 0x00 /* payload1 length */
157 packet[1] = (packetlen - 2) & 0x7f;
158 packet[client_id_offset - 1] = MQTT_CLIENTID_LEN;
160 result = Curl_rand_hex(conn->data, (unsigned char *)&client_id[clen],
161 MQTT_CLIENTID_LEN - clen + 1);
162 memcpy(&packet[client_id_offset], client_id, MQTT_CLIENTID_LEN);
163 infof(conn->data, "Using client id '%s'\n", client_id);
165 result = mqtt_send(conn, packet, packetlen);
169 static CURLcode mqtt_disconnect(struct connectdata *conn)
171 CURLcode result = CURLE_OK;
172 result = mqtt_send(conn, (char *)"\xe0\x00", 2);
176 static CURLcode mqtt_verify_connack(struct connectdata *conn)
179 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
180 unsigned char readbuf[MQTT_CONNACK_LEN];
182 struct Curl_easy *data = conn->data;
184 result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
188 if(data->set.verbose)
189 Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
192 if(nread < MQTT_CONNACK_LEN) {
193 result = CURLE_WEIRD_SERVER_REPLY;
198 if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
199 failf(data, "Expected %02x%02x but got %02x%02x",
200 0x00, 0x00, readbuf[0], readbuf[1]);
201 result = CURLE_WEIRD_SERVER_REPLY;
208 static CURLcode mqtt_get_topic(struct connectdata *conn,
209 char **topic, size_t *topiclen)
211 CURLcode result = CURLE_OK;
212 char *path = conn->data->state.up.path;
214 if(strlen(path) > 1) {
215 result = Curl_urldecode(conn->data, path + 1, 0, topic, topiclen,
219 failf(conn->data, "Error: No topic specified.");
220 result = CURLE_URL_MALFORMAT;
226 static int mqtt_encode_len(char *buf, size_t len)
228 unsigned char encoded;
231 for(i = 0; (len > 0) && (i<4); i++) {
232 encoded = len % 0x80;
242 static CURLcode mqtt_subscribe(struct connectdata *conn)
244 CURLcode result = CURLE_OK;
247 unsigned char *packet = NULL;
252 result = mqtt_get_topic(conn, &topic, &topiclen);
256 conn->proto.mqtt.packetid++;
258 packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
259 + 2 bytes topic length + QoS byte */
260 n = mqtt_encode_len((char *)encodedsize, packetlen);
261 packetlen += n + 1; /* add one for the control packet type byte */
263 packet = malloc(packetlen);
265 result = CURLE_OUT_OF_MEMORY;
269 packet[0] = MQTT_MSG_SUBSCRIBE;
270 memcpy(&packet[1], encodedsize, n);
271 packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
272 packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
273 packet[3 + n] = (topiclen >> 8) & 0xff;
274 packet[4 + n ] = topiclen & 0xff;
275 memcpy(&packet[5 + n], topic, topiclen);
276 packet[5 + n + topiclen] = 0; /* QoS zero */
278 result = mqtt_send(conn, (char *)packet, packetlen);
287 * Called when the first byte was already read.
289 static CURLcode mqtt_verify_suback(struct connectdata *conn)
292 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
293 unsigned char readbuf[MQTT_SUBACK_LEN];
295 struct mqtt_conn *mqtt = &conn->proto.mqtt;
297 result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
301 if(conn->data->set.verbose)
302 Curl_debug(conn->data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
305 if(nread < MQTT_SUBACK_LEN) {
306 result = CURLE_WEIRD_SERVER_REPLY;
311 if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
312 readbuf[1] != (mqtt->packetid & 0xff) ||
314 result = CURLE_WEIRD_SERVER_REPLY;
320 static CURLcode mqtt_publish(struct connectdata *conn)
323 char *payload = conn->data->set.postfields;
324 size_t payloadlen = (size_t)conn->data->set.postfieldsize;
327 unsigned char *pkt = NULL;
329 size_t remaininglength;
331 char encodedbytes[4];
333 result = mqtt_get_topic(conn, &topic, &topiclen);
337 remaininglength = payloadlen + 2 + topiclen;
338 encodelen = mqtt_encode_len(encodedbytes, remaininglength);
340 /* add the control byte and the encoded remaining length */
341 pkt = malloc(remaininglength + 1 + encodelen);
343 result = CURLE_OUT_OF_MEMORY;
347 /* assemble packet */
348 pkt[i++] = MQTT_MSG_PUBLISH;
349 memcpy(&pkt[i], encodedbytes, encodelen);
351 pkt[i++] = (topiclen >> 8) & 0xff;
352 pkt[i++] = (topiclen & 0xff);
353 memcpy(&pkt[i], topic, topiclen);
355 memcpy(&pkt[i], payload, payloadlen);
357 result = mqtt_send(conn, (char *)pkt, i);
365 static size_t mqtt_decode_len(unsigned char *buf,
366 size_t buflen, size_t *lenbytes)
371 unsigned char encoded = 128;
373 for(i = 0; (i < buflen) && (encoded & 128); i++) {
375 len += (encoded & 127) * mult;
386 static const char *statenames[]={
388 "MQTT_REMAINING_LENGTH",
391 "MQTT_SUBACK_COMING",
399 /* The only way to change state */
400 static void mqstate(struct connectdata *conn,
401 enum mqttstate state,
402 enum mqttstate nextstate) /* used if state == FIRST */
404 struct mqtt_conn *mqtt = &conn->proto.mqtt;
406 infof(conn->data, "%s (from %s) (next is %s)\n",
408 statenames[mqtt->state],
409 (state == MQTT_FIRST)? statenames[nextstate] : "");
412 if(state == MQTT_FIRST)
413 mqtt->nextstate = nextstate;
417 /* for the publish packet */
418 #define MQTT_HEADER_LEN 5 /* max 5 bytes */
420 static CURLcode mqtt_read_publish(struct connectdata *conn,
423 CURLcode result = CURLE_OK;
424 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
426 struct Curl_easy *data = conn->data;
427 unsigned char *pkt = (unsigned char *)data->state.buffer;
429 struct mqtt_conn *mqtt = &conn->proto.mqtt;
430 struct MQTT *mq = data->req.protop;
431 unsigned char packet;
433 switch(mqtt->state) {
435 case MQTT_SUBACK_COMING:
436 result = mqtt_verify_suback(conn);
440 mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
445 /* we are expecting PUBLISH or SUBACK */
446 packet = mq->firstbyte & 0xf0;
447 if(packet == MQTT_MSG_PUBLISH)
448 mqstate(conn, MQTT_PUB_REMAIN, MQTT_NOSTATE);
449 else if(packet == MQTT_MSG_SUBACK) {
450 mqstate(conn, MQTT_SUBACK_COMING, MQTT_NOSTATE);
451 goto MQTT_SUBACK_COMING;
453 else if(packet == MQTT_MSG_DISCONNECT) {
454 infof(data, "Got DISCONNECT\n");
459 result = CURLE_WEIRD_SERVER_REPLY;
463 /* -- switched state -- */
464 remlen = mq->remaining_length;
465 infof(data, "Remaining length: %zd bytes\n", remlen);
466 Curl_pgrsSetDownloadSize(data, remlen);
467 data->req.bytecount = 0;
468 data->req.size = remlen;
469 mq->npacket = remlen; /* get this many bytes */
471 case MQTT_PUB_REMAIN: {
472 /* read rest of packet, but no more. Cap to buffer size */
473 struct SingleRequest *k = &data->req;
474 size_t rest = mq->npacket;
475 if(rest > (size_t)data->set.buffer_size)
476 rest = (size_t)data->set.buffer_size;
477 result = Curl_read(conn, sockfd, (char *)pkt, rest, &nread);
479 if(CURLE_AGAIN == result) {
480 infof(data, "EEEE AAAAGAIN\n");
485 infof(data, "server disconnected\n");
486 result = CURLE_PARTIAL_FILE;
489 if(data->set.verbose)
490 Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
492 mq->npacket -= nread;
493 k->bytecount += nread;
494 Curl_pgrsSetDownloadCounter(data, k->bytecount);
496 /* if QoS is set, message contains packet id */
498 result = Curl_client_write(conn, CLIENTWRITE_BODY, (char *)pkt, nread);
503 /* no more PUBLISH payload, back to subscribe wait state */
504 mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
508 DEBUGASSERT(NULL); /* illegal state */
509 result = CURLE_WEIRD_SERVER_REPLY;
516 static CURLcode mqtt_do(struct connectdata *conn, bool *done)
518 CURLcode result = CURLE_OK;
519 struct Curl_easy *data = conn->data;
521 *done = FALSE; /* unconditionally */
523 result = mqtt_connect(conn);
525 failf(data, "Error %d sending MQTT CONN request", result);
528 mqstate(conn, MQTT_FIRST, MQTT_CONNACK);
532 static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
534 CURLcode result = CURLE_OK;
535 struct mqtt_conn *mqtt = &conn->proto.mqtt;
536 struct Curl_easy *data = conn->data;
537 struct MQTT *mq = data->req.protop;
539 curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
540 unsigned char *pkt = (unsigned char *)data->state.buffer;
546 /* send the remainder of an outgoing packet */
547 char *ptr = mq->sendleftovers;
548 result = mqtt_send(conn, mq->sendleftovers, mq->nsend);
554 infof(data, "mqtt_doing: state [%d]\n", (int) mqtt->state);
555 switch(mqtt->state) {
557 /* Read the initial byte only */
558 result = Curl_read(conn, sockfd, (char *)&mq->firstbyte, 1, &nread);
561 if(data->set.verbose)
562 Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
563 /* remember the first byte */
565 mqstate(conn, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
567 case MQTT_REMAINING_LENGTH:
569 result = Curl_read(conn, sockfd, (char *)&byte, 1, &nread);
572 if(data->set.verbose)
573 Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
574 pkt[mq->npacket++] = byte;
575 } while((byte & 0x80) && (mq->npacket < 4));
578 mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
580 if(mq->remaining_length) {
581 mqstate(conn, mqtt->nextstate, MQTT_NOSTATE);
584 mqstate(conn, MQTT_FIRST, MQTT_FIRST);
586 if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
587 infof(data, "Got DISCONNECT\n");
592 result = mqtt_verify_connack(conn);
596 if(conn->data->state.httpreq == HTTPREQ_POST) {
597 result = mqtt_publish(conn);
599 result = mqtt_disconnect(conn);
602 mqtt->nextstate = MQTT_FIRST;
605 result = mqtt_subscribe(conn);
607 mqstate(conn, MQTT_FIRST, MQTT_SUBACK);
614 case MQTT_PUB_REMAIN:
615 result = mqtt_read_publish(conn, done);
619 failf(conn->data, "State not handled yet");
624 if(result == CURLE_AGAIN)
629 #endif /* CURL_DISABLE_MQTT */