2 Copyright (c) 2009-2013 Roger Light <roger@atchoo.org>
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
8 1. Redistributions of source code must retain the above copyright notice,
9 this list of conditions and the following disclaimer.
10 2. Redistributions in binary form must reproduce the above copyright
11 notice, this list of conditions and the following disclaimer in the
12 documentation and/or other materials provided with the distribution.
13 3. Neither the name of mosquitto nor the names of its
14 contributors may be used to endorse or promote products derived from
15 this software without specific prior written permission.
17 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 POSSIBILITY OF SUCH DAMAGE.
30 #ifndef _POSIX_C_SOURCE
31 // Defining _POSIX_C_SOURCE macro with 200112L (or greater) as value
32 // causes header files to expose definitions
33 // corresponding to the POSIX.1-2001 base
34 // specification (excluding the XSI extension).
35 // For POSIX.1-2001 base specification,
36 // Refer http://pubs.opengroup.org/onlinepubs/009695399/
38 // For this specific file, see use of getaddrinfo & structs,
39 // Refer http://man7.org/linux/man-pages/man3/getaddrinfo.3.html
40 #define _POSIX_C_SOURCE 200112L // needed for getaddrinfo & structs
50 #include <sys/socket.h>
59 #include <linux/in6.h>
60 #include <sys/endian.h>
64 # include <netinet/in.h>
68 #include <netinet/in.h>
73 #define AI_ADDRCONFIG 0
75 #include <net/netbyte.h>
76 #include <netinet/in.h>
80 #include <openssl/err.h>
85 # include <mosquitto_broker.h>
87 extern uint64_t g_bytes_received;
88 extern uint64_t g_bytes_sent;
89 extern unsigned long g_msgs_received;
90 extern unsigned long g_msgs_sent;
91 extern unsigned long g_pub_msgs_received;
92 extern unsigned long g_pub_msgs_sent;
95 # include <read_handle.h>
98 #include "logging_mosq.h"
99 #include "memory_mosq.h"
100 #include "mqtt3_protocol.h"
101 #include "net_mosq.h"
102 #include "time_mosq.h"
103 #include "util_mosq.h"
106 int tls_ex_index_mosq = -1;
109 void _mosquitto_net_init(void)
113 WSAStartup(MAKEWORD(2,2), &wsaData);
117 ares_library_init(ARES_LIB_INIT_ALL);
121 SSL_load_error_strings();
123 OpenSSL_add_all_algorithms();
124 if(tls_ex_index_mosq == -1){
125 tls_ex_index_mosq = SSL_get_ex_new_index(0, "client context", NULL, NULL, NULL);
130 void _mosquitto_net_cleanup(void)
135 CRYPTO_cleanup_all_ex_data();
139 ares_library_cleanup();
147 void _mosquitto_packet_cleanup(struct _mosquitto_packet *packet)
151 /* Free data and reset values */
153 packet->have_remaining = 0;
154 packet->remaining_count = 0;
155 packet->remaining_mult = 1;
156 packet->remaining_length = 0;
157 if(packet->payload) _mosquitto_free(packet->payload);
158 packet->payload = NULL;
159 packet->to_process = 0;
163 int _mosquitto_packet_queue(struct mosquitto *mosq, struct _mosquitto_packet *packet)
166 char sockpair_data = 0;
172 packet->to_process = packet->packet_length;
175 pthread_mutex_lock(&mosq->out_packet_mutex);
176 if(mosq->out_packet){
177 mosq->out_packet_last->next = packet;
179 mosq->out_packet = packet;
181 mosq->out_packet_last = packet;
182 pthread_mutex_unlock(&mosq->out_packet_mutex);
184 return _mosquitto_packet_write(mosq);
187 /* Write a single byte to sockpairW (connected to sockpairR) to break out
188 * of select() if in threaded mode. */
189 if(mosq->sockpairW != INVALID_SOCKET){
191 if(write(mosq->sockpairW, &sockpair_data, 1)){
194 send(mosq->sockpairW, &sockpair_data, 1, 0);
198 if(mosq->in_callback == false && mosq->threaded == false){
199 return _mosquitto_packet_write(mosq);
201 return MOSQ_ERR_SUCCESS;
206 /* Close a socket associated with a context and set it to -1.
207 * Returns 1 on failure (context is NULL)
208 * Returns 0 on success.
210 int _mosquitto_socket_close(struct mosquitto *mosq)
217 SSL_shutdown(mosq->ssl);
222 SSL_CTX_free(mosq->ssl_ctx);
223 mosq->ssl_ctx = NULL;
227 if(mosq->sock != INVALID_SOCKET){
228 rc = COMPAT_CLOSE(mosq->sock);
229 mosq->sock = INVALID_SOCKET;
235 #ifdef REAL_WITH_TLS_PSK
236 static unsigned int psk_client_callback(SSL *ssl, const char *hint,
237 char *identity, unsigned int max_identity_len,
238 unsigned char *psk, unsigned int max_psk_len)
240 struct mosquitto *mosq;
243 mosq = SSL_get_ex_data(ssl, tls_ex_index_mosq);
246 snprintf(identity, max_identity_len, "%s", mosq->tls_psk_identity);
248 len = _mosquitto_hex2bin(mosq->tls_psk, psk, max_psk_len);
249 if (len < 0) return 0;
254 int _mosquitto_try_connect(const char *host, uint16_t port, int *sock, const char *bind_address, bool blocking)
256 struct addrinfo hints;
257 struct addrinfo *ainfo, *rp;
258 struct addrinfo *ainfo_bind, *rp_bind;
265 *sock = INVALID_SOCKET;
266 memset(&hints, 0, sizeof(struct addrinfo));
267 hints.ai_family = PF_UNSPEC;
268 hints.ai_flags = AI_ADDRCONFIG;
269 hints.ai_socktype = SOCK_STREAM;
271 s = getaddrinfo(host, NULL, &hints, &ainfo);
278 s = getaddrinfo(bind_address, NULL, &hints, &ainfo_bind);
286 for(rp = ainfo; rp != NULL; rp = rp->ai_next){
287 *sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
288 if(*sock == INVALID_SOCKET) continue;
290 if(rp->ai_family == PF_INET){
291 ((struct sockaddr_in *)rp->ai_addr)->sin_port = htons(port);
292 }else if(rp->ai_family == PF_INET6){
293 ((struct sockaddr_in6 *)rp->ai_addr)->sin6_port = htons(port);
299 for(rp_bind = ainfo_bind; rp_bind != NULL; rp_bind = rp_bind->ai_next){
300 if(bind(*sock, rp_bind->ai_addr, rp_bind->ai_addrlen) == 0){
311 /* Set non-blocking */
312 if(_mosquitto_socket_nonblock(*sock)){
318 rc = connect(*sock, rp->ai_addr, rp->ai_addrlen);
320 errno = WSAGetLastError();
322 if(rc == 0 || errno == EINPROGRESS || errno == COMPAT_EWOULDBLOCK){
324 /* Set non-blocking */
325 if(_mosquitto_socket_nonblock(*sock)){
334 *sock = INVALID_SOCKET;
338 freeaddrinfo(ainfo_bind);
341 return MOSQ_ERR_ERRNO;
343 return MOSQ_ERR_SUCCESS;
346 /* Create a socket and connect it to 'ip' on port 'port'.
347 * Returns -1 on failure (ip is NULL, socket creation/connection error)
348 * Returns sock number on success.
350 int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking)
352 int sock = INVALID_SOCKET;
359 if(!mosq || !host || !port) return MOSQ_ERR_INVAL;
362 if(mosq->tls_cafile || mosq->tls_capath || mosq->tls_psk){
367 rc = _mosquitto_try_connect(host, port, &sock, bind_address, blocking);
368 if(rc != MOSQ_ERR_SUCCESS) return rc;
371 if(mosq->tls_cafile || mosq->tls_capath || mosq->tls_psk){
372 #if OPENSSL_VERSION_NUMBER >= 0x10001000L
373 if(!mosq->tls_version || !strcmp(mosq->tls_version, "tlsv1.2")){
374 mosq->ssl_ctx = SSL_CTX_new(TLSv1_2_client_method());
375 }else if(!strcmp(mosq->tls_version, "tlsv1.1")){
376 mosq->ssl_ctx = SSL_CTX_new(TLSv1_1_client_method());
377 }else if(!strcmp(mosq->tls_version, "tlsv1")){
378 mosq->ssl_ctx = SSL_CTX_new(TLSv1_client_method());
380 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Protocol %s not supported.", mosq->tls_version);
382 return MOSQ_ERR_INVAL;
385 if(!mosq->tls_version || !strcmp(mosq->tls_version, "tlsv1")){
386 mosq->ssl_ctx = SSL_CTX_new(TLSv1_client_method());
388 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Protocol %s not supported.", mosq->tls_version);
390 return MOSQ_ERR_INVAL;
394 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to create TLS context.");
399 #if OPENSSL_VERSION_NUMBER >= 0x10000000
400 /* Disable compression */
401 SSL_CTX_set_options(mosq->ssl_ctx, SSL_OP_NO_COMPRESSION);
403 #ifdef SSL_MODE_RELEASE_BUFFERS
404 /* Use even less memory per SSL connection. */
405 SSL_CTX_set_mode(mosq->ssl_ctx, SSL_MODE_RELEASE_BUFFERS);
408 if(mosq->tls_ciphers){
409 ret = SSL_CTX_set_cipher_list(mosq->ssl_ctx, mosq->tls_ciphers);
411 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to set TLS ciphers. Check cipher list \"%s\".", mosq->tls_ciphers);
416 if(mosq->tls_cafile || mosq->tls_capath){
417 ret = SSL_CTX_load_verify_locations(mosq->ssl_ctx, mosq->tls_cafile, mosq->tls_capath);
420 if(mosq->tls_cafile && mosq->tls_capath){
421 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check bridge_cafile \"%s\" and bridge_capath \"%s\".", mosq->tls_cafile, mosq->tls_capath);
422 }else if(mosq->tls_cafile){
423 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check bridge_cafile \"%s\".", mosq->tls_cafile);
425 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check bridge_capath \"%s\".", mosq->tls_capath);
428 if(mosq->tls_cafile && mosq->tls_capath){
429 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check cafile \"%s\" and capath \"%s\".", mosq->tls_cafile, mosq->tls_capath);
430 }else if(mosq->tls_cafile){
431 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check cafile \"%s\".", mosq->tls_cafile);
433 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check capath \"%s\".", mosq->tls_capath);
439 if(mosq->tls_cert_reqs == 0){
440 SSL_CTX_set_verify(mosq->ssl_ctx, SSL_VERIFY_NONE, NULL);
442 SSL_CTX_set_verify(mosq->ssl_ctx, SSL_VERIFY_PEER, _mosquitto_server_certificate_verify);
445 if(mosq->tls_pw_callback){
446 SSL_CTX_set_default_passwd_cb(mosq->ssl_ctx, mosq->tls_pw_callback);
447 SSL_CTX_set_default_passwd_cb_userdata(mosq->ssl_ctx, mosq);
450 if(mosq->tls_certfile){
451 ret = SSL_CTX_use_certificate_chain_file(mosq->ssl_ctx, mosq->tls_certfile);
454 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client certificate, check bridge_certfile \"%s\".", mosq->tls_certfile);
456 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client certificate \"%s\".", mosq->tls_certfile);
462 if(mosq->tls_keyfile){
463 ret = SSL_CTX_use_PrivateKey_file(mosq->ssl_ctx, mosq->tls_keyfile, SSL_FILETYPE_PEM);
466 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client key file, check bridge_keyfile \"%s\".", mosq->tls_keyfile);
468 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client key file \"%s\".", mosq->tls_keyfile);
473 ret = SSL_CTX_check_private_key(mosq->ssl_ctx);
475 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Client certificate/key are inconsistent.");
480 #ifdef REAL_WITH_TLS_PSK
481 }else if(mosq->tls_psk){
482 SSL_CTX_set_psk_client_callback(mosq->ssl_ctx, psk_client_callback);
486 mosq->ssl = SSL_new(mosq->ssl_ctx);
491 SSL_set_ex_data(mosq->ssl, tls_ex_index_mosq, mosq);
492 bio = BIO_new_socket(sock, BIO_NOCLOSE);
497 SSL_set_bio(mosq->ssl, bio, bio);
499 ret = SSL_connect(mosq->ssl);
501 ret = SSL_get_error(mosq->ssl, ret);
502 if(ret == SSL_ERROR_WANT_READ){
503 /* We always try to read anyway */
504 }else if(ret == SSL_ERROR_WANT_WRITE){
505 mosq->want_write = true;
516 return MOSQ_ERR_SUCCESS;
519 int _mosquitto_read_byte(struct _mosquitto_packet *packet, uint8_t *byte)
522 if(packet->pos+1 > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
524 *byte = packet->payload[packet->pos];
527 return MOSQ_ERR_SUCCESS;
530 void _mosquitto_write_byte(struct _mosquitto_packet *packet, uint8_t byte)
533 assert(packet->pos+1 <= packet->packet_length);
535 packet->payload[packet->pos] = byte;
539 int _mosquitto_read_bytes(struct _mosquitto_packet *packet, void *bytes, uint32_t count)
542 if(packet->pos+count > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
544 memcpy(bytes, &(packet->payload[packet->pos]), count);
545 packet->pos += count;
547 return MOSQ_ERR_SUCCESS;
550 void _mosquitto_write_bytes(struct _mosquitto_packet *packet, const void *bytes, uint32_t count)
553 assert(packet->pos+count <= packet->packet_length);
555 memcpy(&(packet->payload[packet->pos]), bytes, count);
556 packet->pos += count;
559 int _mosquitto_read_string(struct _mosquitto_packet *packet, char **str)
565 rc = _mosquitto_read_uint16(packet, &len);
568 if(packet->pos+len > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
570 *str = _mosquitto_calloc(len+1, sizeof(char));
572 memcpy(*str, &(packet->payload[packet->pos]), len);
575 return MOSQ_ERR_NOMEM;
578 return MOSQ_ERR_SUCCESS;
581 void _mosquitto_write_string(struct _mosquitto_packet *packet, const char *str, uint16_t length)
584 _mosquitto_write_uint16(packet, length);
585 _mosquitto_write_bytes(packet, str, length);
588 int _mosquitto_read_uint16(struct _mosquitto_packet *packet, uint16_t *word)
593 if(packet->pos+2 > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
595 msb = packet->payload[packet->pos];
597 lsb = packet->payload[packet->pos];
600 *word = (msb<<8) + lsb;
602 return MOSQ_ERR_SUCCESS;
605 void _mosquitto_write_uint16(struct _mosquitto_packet *packet, uint16_t word)
607 _mosquitto_write_byte(packet, MOSQ_MSB(word));
608 _mosquitto_write_byte(packet, MOSQ_LSB(word));
611 ssize_t _mosquitto_net_read(struct mosquitto *mosq, void *buf, size_t count)
623 ret = SSL_read(mosq->ssl, buf, count);
625 err = SSL_get_error(mosq->ssl, ret);
626 if(err == SSL_ERROR_WANT_READ){
629 }else if(err == SSL_ERROR_WANT_WRITE){
631 mosq->want_write = true;
636 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "OpenSSL Error: %s", ERR_error_string(e, ebuf));
642 return (ssize_t )ret;
644 /* Call normal read/recv */
649 return read(mosq->sock, buf, count);
651 return recv(mosq->sock, buf, count, 0);
659 ssize_t _mosquitto_net_write(struct mosquitto *mosq, void *buf, size_t count)
672 ret = SSL_write(mosq->ssl, buf, count);
674 err = SSL_get_error(mosq->ssl, ret);
675 if(err == SSL_ERROR_WANT_READ){
678 }else if(err == SSL_ERROR_WANT_WRITE){
680 mosq->want_write = true;
685 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "OpenSSL Error: %s", ERR_error_string(e, ebuf));
691 return (ssize_t )ret;
693 /* Call normal write/send */
697 return write(mosq->sock, buf, count);
699 return send(mosq->sock, buf, count, 0);
707 int _mosquitto_packet_write(struct mosquitto *mosq)
709 ssize_t write_length;
710 struct _mosquitto_packet *packet;
712 if(!mosq) return MOSQ_ERR_INVAL;
713 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
715 pthread_mutex_lock(&mosq->current_out_packet_mutex);
716 pthread_mutex_lock(&mosq->out_packet_mutex);
717 if(mosq->out_packet && !mosq->current_out_packet){
718 mosq->current_out_packet = mosq->out_packet;
719 mosq->out_packet = mosq->out_packet->next;
720 if(!mosq->out_packet){
721 mosq->out_packet_last = NULL;
724 pthread_mutex_unlock(&mosq->out_packet_mutex);
726 while(mosq->current_out_packet){
727 packet = mosq->current_out_packet;
729 while(packet->to_process > 0){
730 write_length = _mosquitto_net_write(mosq, &(packet->payload[packet->pos]), packet->to_process);
731 if(write_length > 0){
732 #if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
733 g_bytes_sent += write_length;
735 packet->to_process -= write_length;
736 packet->pos += write_length;
739 errno = WSAGetLastError();
741 if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
742 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
743 return MOSQ_ERR_SUCCESS;
745 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
747 case COMPAT_ECONNRESET:
748 return MOSQ_ERR_CONN_LOST;
750 return MOSQ_ERR_ERRNO;
757 # ifdef WITH_SYS_TREE
759 if(((packet->command)&0xF6) == PUBLISH){
764 if(((packet->command)&0xF6) == PUBLISH){
765 pthread_mutex_lock(&mosq->callback_mutex);
766 if(mosq->on_publish){
767 /* This is a QoS=0 message */
768 mosq->in_callback = true;
769 mosq->on_publish(mosq, mosq->userdata, packet->mid);
770 mosq->in_callback = false;
772 pthread_mutex_unlock(&mosq->callback_mutex);
773 }else if(((packet->command)&0xF0) == DISCONNECT){
774 /* FIXME what cleanup needs doing here?
775 * incoming/outgoing messages? */
776 _mosquitto_socket_close(mosq);
778 /* Start of duplicate, possibly unnecessary code.
779 * This does leave things in a consistent state at least. */
780 /* Free data and reset values */
781 pthread_mutex_lock(&mosq->out_packet_mutex);
782 mosq->current_out_packet = mosq->out_packet;
783 if(mosq->out_packet){
784 mosq->out_packet = mosq->out_packet->next;
785 if(!mosq->out_packet){
786 mosq->out_packet_last = NULL;
789 pthread_mutex_unlock(&mosq->out_packet_mutex);
791 _mosquitto_packet_cleanup(packet);
792 _mosquitto_free(packet);
794 pthread_mutex_lock(&mosq->msgtime_mutex);
795 mosq->last_msg_out = mosquitto_time();
796 pthread_mutex_unlock(&mosq->msgtime_mutex);
797 /* End of duplicate, possibly unnecessary code */
799 pthread_mutex_lock(&mosq->callback_mutex);
800 if(mosq->on_disconnect){
801 mosq->in_callback = true;
802 mosq->on_disconnect(mosq, mosq->userdata, 0);
803 mosq->in_callback = false;
805 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
806 return MOSQ_ERR_SUCCESS;
810 /* Free data and reset values */
811 pthread_mutex_lock(&mosq->out_packet_mutex);
812 mosq->current_out_packet = mosq->out_packet;
813 if(mosq->out_packet){
814 mosq->out_packet = mosq->out_packet->next;
815 if(!mosq->out_packet){
816 mosq->out_packet_last = NULL;
819 pthread_mutex_unlock(&mosq->out_packet_mutex);
821 _mosquitto_packet_cleanup(packet);
822 _mosquitto_free(packet);
824 pthread_mutex_lock(&mosq->msgtime_mutex);
825 mosq->last_msg_out = mosquitto_time();
826 pthread_mutex_unlock(&mosq->msgtime_mutex);
828 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
829 return MOSQ_ERR_SUCCESS;
833 int _mosquitto_packet_read(struct mosquitto_db *db, struct mosquitto *mosq)
835 int _mosquitto_packet_read(struct mosquitto *mosq)
842 if(!mosq) return MOSQ_ERR_INVAL;
843 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
844 /* This gets called if pselect() indicates that there is network data
845 * available - ie. at least one byte. What we do depends on what data we
847 * If we've not got a command, attempt to read one and save it. This should
848 * always work because it's only a single byte.
849 * Then try to read the remaining length. This may fail because it is may
850 * be more than one byte - will need to save data pending next read if it
852 * Then try to read the remaining payload, where 'payload' here means the
853 * combined variable header and actual payload. This is the most likely to
854 * fail due to longer length, so save current data and current position.
855 * After all data is read, send to _mosquitto_handle_packet() to deal with.
856 * Finally, free the memory and reset everything to starting conditions.
858 if(!mosq->in_packet.command){
859 read_length = _mosquitto_net_read(mosq, &byte, 1);
860 if(read_length == 1){
861 mosq->in_packet.command = byte;
863 # ifdef WITH_SYS_TREE
866 /* Clients must send CONNECT as their first command. */
867 if(!(mosq->bridge) && mosq->state == mosq_cs_new && (byte&0xF0) != CONNECT) return MOSQ_ERR_PROTOCOL;
870 if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */
872 errno = WSAGetLastError();
874 if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
875 return MOSQ_ERR_SUCCESS;
878 case COMPAT_ECONNRESET:
879 return MOSQ_ERR_CONN_LOST;
881 return MOSQ_ERR_ERRNO;
886 if(!mosq->in_packet.have_remaining){
888 read_length = _mosquitto_net_read(mosq, &byte, 1);
889 if(read_length == 1){
890 mosq->in_packet.remaining_count++;
891 /* Max 4 bytes length for remaining length as defined by protocol.
892 * Anything more likely means a broken/malicious client.
894 if(mosq->in_packet.remaining_count > 4) return MOSQ_ERR_PROTOCOL;
896 #if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
899 mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;
900 mosq->in_packet.remaining_mult *= 128;
902 if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */
904 errno = WSAGetLastError();
906 if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
907 return MOSQ_ERR_SUCCESS;
910 case COMPAT_ECONNRESET:
911 return MOSQ_ERR_CONN_LOST;
913 return MOSQ_ERR_ERRNO;
917 }while((byte & 128) != 0);
919 if(mosq->in_packet.remaining_length > 0){
920 mosq->in_packet.payload = _mosquitto_malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));
921 if(!mosq->in_packet.payload) return MOSQ_ERR_NOMEM;
922 mosq->in_packet.to_process = mosq->in_packet.remaining_length;
924 mosq->in_packet.have_remaining = 1;
926 while(mosq->in_packet.to_process>0){
927 read_length = _mosquitto_net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
929 #if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
930 g_bytes_received += read_length;
932 mosq->in_packet.to_process -= read_length;
933 mosq->in_packet.pos += read_length;
936 errno = WSAGetLastError();
938 if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
939 if(mosq->in_packet.to_process > 1000){
940 /* Update last_msg_in time if more than 1000 bytes left to
941 * receive. Helps when receiving large messages.
942 * This is an arbitrary limit, but with some consideration.
943 * If a client can't send 1000 bytes in a second it
944 * probably shouldn't be using a 1 second keep alive. */
945 pthread_mutex_lock(&mosq->msgtime_mutex);
946 mosq->last_msg_in = mosquitto_time();
947 pthread_mutex_unlock(&mosq->msgtime_mutex);
949 return MOSQ_ERR_SUCCESS;
952 case COMPAT_ECONNRESET:
953 return MOSQ_ERR_CONN_LOST;
955 return MOSQ_ERR_ERRNO;
961 /* All data for this packet is read. */
962 mosq->in_packet.pos = 0;
964 # ifdef WITH_SYS_TREE
966 if(((mosq->in_packet.command)&0xF5) == PUBLISH){
967 g_pub_msgs_received++;
970 rc = mqtt3_packet_handle(db, mosq);
972 rc = _mosquitto_packet_handle(mosq);
975 /* Free data and reset values */
976 _mosquitto_packet_cleanup(&mosq->in_packet);
978 pthread_mutex_lock(&mosq->msgtime_mutex);
979 mosq->last_msg_in = mosquitto_time();
980 pthread_mutex_unlock(&mosq->msgtime_mutex);
984 int _mosquitto_socket_nonblock(int sock)
988 /* Set non-blocking */
989 opt = fcntl(sock, F_GETFL, 0);
994 if(fcntl(sock, F_SETFL, opt | O_NONBLOCK) == -1){
995 /* If either fcntl fails, don't want to allow this client to connect. */
1001 if(ioctlsocket(sock, FIONBIO, &opt)){
1011 int _mosquitto_socketpair(int *pairR, int *pairW)
1014 int family[2] = {AF_INET, AF_INET6};
1016 struct sockaddr_storage ss;
1017 struct sockaddr_in *sa = (struct sockaddr_in *)&ss;
1018 struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&ss;
1028 memset(&ss, 0, sizeof(ss));
1029 if(family[i] == AF_INET){
1030 sa->sin_family = family[i];
1031 sa->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1033 ss_len = sizeof(struct sockaddr_in);
1034 }else if(family[i] == AF_INET6){
1035 sa6->sin6_family = family[i];
1036 sa6->sin6_addr = in6addr_loopback;
1038 ss_len = sizeof(struct sockaddr_in6);
1040 return MOSQ_ERR_INVAL;
1043 listensock = socket(family[i], SOCK_STREAM, IPPROTO_TCP);
1044 if(listensock == -1){
1048 if(bind(listensock, (struct sockaddr *)&ss, ss_len) == -1){
1049 COMPAT_CLOSE(listensock);
1053 if(listen(listensock, 1) == -1){
1054 COMPAT_CLOSE(listensock);
1057 memset(&ss, 0, sizeof(ss));
1058 ss_len = sizeof(ss);
1059 if(getsockname(listensock, (struct sockaddr *)&ss, &ss_len) < 0){
1060 COMPAT_CLOSE(listensock);
1064 if(_mosquitto_socket_nonblock(listensock)){
1068 if(family[i] == AF_INET){
1069 sa->sin_family = family[i];
1070 sa->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1071 ss_len = sizeof(struct sockaddr_in);
1072 }else if(family[i] == AF_INET6){
1073 sa6->sin6_family = family[i];
1074 sa6->sin6_addr = in6addr_loopback;
1075 ss_len = sizeof(struct sockaddr_in6);
1078 spR = socket(family[i], SOCK_STREAM, IPPROTO_TCP);
1080 COMPAT_CLOSE(listensock);
1083 if(_mosquitto_socket_nonblock(spR)){
1084 COMPAT_CLOSE(listensock);
1087 if(connect(spR, (struct sockaddr *)&ss, ss_len) < 0){
1089 errno = WSAGetLastError();
1091 if(errno != EINPROGRESS && errno != COMPAT_EWOULDBLOCK){
1093 COMPAT_CLOSE(listensock);
1097 spW = accept(listensock, NULL, 0);
1100 errno = WSAGetLastError();
1102 if(errno != EINPROGRESS && errno != COMPAT_EWOULDBLOCK){
1104 COMPAT_CLOSE(listensock);
1109 if(_mosquitto_socket_nonblock(spW)){
1111 COMPAT_CLOSE(listensock);
1114 COMPAT_CLOSE(listensock);
1118 return MOSQ_ERR_SUCCESS;
1120 return MOSQ_ERR_UNKNOWN;
1124 if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1){
1125 return MOSQ_ERR_ERRNO;
1127 if(_mosquitto_socket_nonblock(sv[0])){
1128 COMPAT_CLOSE(sv[0]);
1129 COMPAT_CLOSE(sv[1]);
1130 return MOSQ_ERR_ERRNO;
1132 if(_mosquitto_socket_nonblock(sv[1])){
1133 COMPAT_CLOSE(sv[0]);
1134 COMPAT_CLOSE(sv[1]);
1135 return MOSQ_ERR_ERRNO;
1139 return MOSQ_ERR_SUCCESS;