2 Copyright (c) 2009-2013 Roger Light <roger@atchoo.org>
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
8 1. Redistributions of source code must retain the above copyright notice,
9 this list of conditions and the following disclaimer.
10 2. Redistributions in binary form must reproduce the above copyright
11 notice, this list of conditions and the following disclaimer in the
12 documentation and/or other materials provided with the distribution.
13 3. Neither the name of mosquitto nor the names of its
14 contributors may be used to endorse or promote products derived from
15 this software without specific prior written permission.
17 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 POSSIBILITY OF SUCH DAMAGE.
37 #include <sys/socket.h>
46 #include <linux/in6.h>
47 #include <sys/endian.h>
51 # include <netinet/in.h>
55 #include <netinet/in.h>
60 #define AI_ADDRCONFIG 0
62 #include <net/netbyte.h>
63 #include <netinet/in.h>
67 #include <openssl/err.h>
72 # include <mosquitto_broker.h>
74 extern uint64_t g_bytes_received;
75 extern uint64_t g_bytes_sent;
76 extern unsigned long g_msgs_received;
77 extern unsigned long g_msgs_sent;
78 extern unsigned long g_pub_msgs_received;
79 extern unsigned long g_pub_msgs_sent;
82 # include <read_handle.h>
85 #include "logging_mosq.h"
86 #include "memory_mosq.h"
87 #include "mqtt3_protocol.h"
89 #include "time_mosq.h"
90 #include "util_mosq.h"
93 int tls_ex_index_mosq = -1;
96 void _mosquitto_net_init(void)
100 WSAStartup(MAKEWORD(2,2), &wsaData);
104 ares_library_init(ARES_LIB_INIT_ALL);
108 SSL_load_error_strings();
110 OpenSSL_add_all_algorithms();
111 if(tls_ex_index_mosq == -1){
112 tls_ex_index_mosq = SSL_get_ex_new_index(0, "client context", NULL, NULL, NULL);
117 void _mosquitto_net_cleanup(void)
122 CRYPTO_cleanup_all_ex_data();
126 ares_library_cleanup();
134 void _mosquitto_packet_cleanup(struct _mosquitto_packet *packet)
138 /* Free data and reset values */
140 packet->have_remaining = 0;
141 packet->remaining_count = 0;
142 packet->remaining_mult = 1;
143 packet->remaining_length = 0;
144 if(packet->payload) _mosquitto_free(packet->payload);
145 packet->payload = NULL;
146 packet->to_process = 0;
150 int _mosquitto_packet_queue(struct mosquitto *mosq, struct _mosquitto_packet *packet)
153 char sockpair_data = 0;
159 packet->to_process = packet->packet_length;
162 pthread_mutex_lock(&mosq->out_packet_mutex);
163 if(mosq->out_packet){
164 mosq->out_packet_last->next = packet;
166 mosq->out_packet = packet;
168 mosq->out_packet_last = packet;
169 pthread_mutex_unlock(&mosq->out_packet_mutex);
171 return _mosquitto_packet_write(mosq);
174 /* Write a single byte to sockpairW (connected to sockpairR) to break out
175 * of select() if in threaded mode. */
176 if(mosq->sockpairW != INVALID_SOCKET){
178 if(write(mosq->sockpairW, &sockpair_data, 1)){
181 send(mosq->sockpairW, &sockpair_data, 1, 0);
185 if(mosq->in_callback == false && mosq->threaded == false){
186 return _mosquitto_packet_write(mosq);
188 return MOSQ_ERR_SUCCESS;
193 /* Close a socket associated with a context and set it to -1.
194 * Returns 1 on failure (context is NULL)
195 * Returns 0 on success.
197 int _mosquitto_socket_close(struct mosquitto *mosq)
204 SSL_shutdown(mosq->ssl);
209 SSL_CTX_free(mosq->ssl_ctx);
210 mosq->ssl_ctx = NULL;
214 if(mosq->sock != INVALID_SOCKET){
215 rc = COMPAT_CLOSE(mosq->sock);
216 mosq->sock = INVALID_SOCKET;
222 #ifdef REAL_WITH_TLS_PSK
223 static unsigned int psk_client_callback(SSL *ssl, const char *hint,
224 char *identity, unsigned int max_identity_len,
225 unsigned char *psk, unsigned int max_psk_len)
227 struct mosquitto *mosq;
230 mosq = SSL_get_ex_data(ssl, tls_ex_index_mosq);
233 snprintf(identity, max_identity_len, "%s", mosq->tls_psk_identity);
235 len = _mosquitto_hex2bin(mosq->tls_psk, psk, max_psk_len);
236 if (len < 0) return 0;
241 int _mosquitto_try_connect(const char *host, uint16_t port, int *sock, const char *bind_address, bool blocking)
243 struct addrinfo hints;
244 struct addrinfo *ainfo, *rp;
245 struct addrinfo *ainfo_bind, *rp_bind;
252 *sock = INVALID_SOCKET;
253 memset(&hints, 0, sizeof(struct addrinfo));
254 hints.ai_family = PF_UNSPEC;
255 hints.ai_flags = AI_ADDRCONFIG;
256 hints.ai_socktype = SOCK_STREAM;
258 s = getaddrinfo(host, NULL, &hints, &ainfo);
265 s = getaddrinfo(bind_address, NULL, &hints, &ainfo_bind);
273 for(rp = ainfo; rp != NULL; rp = rp->ai_next){
274 *sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
275 if(*sock == INVALID_SOCKET) continue;
277 if(rp->ai_family == PF_INET){
278 ((struct sockaddr_in *)rp->ai_addr)->sin_port = htons(port);
279 }else if(rp->ai_family == PF_INET6){
280 ((struct sockaddr_in6 *)rp->ai_addr)->sin6_port = htons(port);
286 for(rp_bind = ainfo_bind; rp_bind != NULL; rp_bind = rp_bind->ai_next){
287 if(bind(*sock, rp_bind->ai_addr, rp_bind->ai_addrlen) == 0){
298 /* Set non-blocking */
299 if(_mosquitto_socket_nonblock(*sock)){
305 rc = connect(*sock, rp->ai_addr, rp->ai_addrlen);
307 errno = WSAGetLastError();
309 if(rc == 0 || errno == EINPROGRESS || errno == COMPAT_EWOULDBLOCK){
311 /* Set non-blocking */
312 if(_mosquitto_socket_nonblock(*sock)){
321 *sock = INVALID_SOCKET;
325 freeaddrinfo(ainfo_bind);
328 return MOSQ_ERR_ERRNO;
330 return MOSQ_ERR_SUCCESS;
333 /* Create a socket and connect it to 'ip' on port 'port'.
334 * Returns -1 on failure (ip is NULL, socket creation/connection error)
335 * Returns sock number on success.
337 int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking)
339 int sock = INVALID_SOCKET;
346 if(!mosq || !host || !port) return MOSQ_ERR_INVAL;
349 if(mosq->tls_cafile || mosq->tls_capath || mosq->tls_psk){
354 rc = _mosquitto_try_connect(host, port, &sock, bind_address, blocking);
355 if(rc != MOSQ_ERR_SUCCESS) return rc;
358 if(mosq->tls_cafile || mosq->tls_capath || mosq->tls_psk){
359 #if OPENSSL_VERSION_NUMBER >= 0x10001000L
360 if(!mosq->tls_version || !strcmp(mosq->tls_version, "tlsv1.2")){
361 mosq->ssl_ctx = SSL_CTX_new(TLSv1_2_client_method());
362 }else if(!strcmp(mosq->tls_version, "tlsv1.1")){
363 mosq->ssl_ctx = SSL_CTX_new(TLSv1_1_client_method());
364 }else if(!strcmp(mosq->tls_version, "tlsv1")){
365 mosq->ssl_ctx = SSL_CTX_new(TLSv1_client_method());
367 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Protocol %s not supported.", mosq->tls_version);
369 return MOSQ_ERR_INVAL;
372 if(!mosq->tls_version || !strcmp(mosq->tls_version, "tlsv1")){
373 mosq->ssl_ctx = SSL_CTX_new(TLSv1_client_method());
375 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Protocol %s not supported.", mosq->tls_version);
377 return MOSQ_ERR_INVAL;
381 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to create TLS context.");
386 #if OPENSSL_VERSION_NUMBER >= 0x10000000
387 /* Disable compression */
388 SSL_CTX_set_options(mosq->ssl_ctx, SSL_OP_NO_COMPRESSION);
390 #ifdef SSL_MODE_RELEASE_BUFFERS
391 /* Use even less memory per SSL connection. */
392 SSL_CTX_set_mode(mosq->ssl_ctx, SSL_MODE_RELEASE_BUFFERS);
395 if(mosq->tls_ciphers){
396 ret = SSL_CTX_set_cipher_list(mosq->ssl_ctx, mosq->tls_ciphers);
398 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to set TLS ciphers. Check cipher list \"%s\".", mosq->tls_ciphers);
403 if(mosq->tls_cafile || mosq->tls_capath){
404 ret = SSL_CTX_load_verify_locations(mosq->ssl_ctx, mosq->tls_cafile, mosq->tls_capath);
407 if(mosq->tls_cafile && mosq->tls_capath){
408 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check bridge_cafile \"%s\" and bridge_capath \"%s\".", mosq->tls_cafile, mosq->tls_capath);
409 }else if(mosq->tls_cafile){
410 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check bridge_cafile \"%s\".", mosq->tls_cafile);
412 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check bridge_capath \"%s\".", mosq->tls_capath);
415 if(mosq->tls_cafile && mosq->tls_capath){
416 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check cafile \"%s\" and capath \"%s\".", mosq->tls_cafile, mosq->tls_capath);
417 }else if(mosq->tls_cafile){
418 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check cafile \"%s\".", mosq->tls_cafile);
420 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check capath \"%s\".", mosq->tls_capath);
426 if(mosq->tls_cert_reqs == 0){
427 SSL_CTX_set_verify(mosq->ssl_ctx, SSL_VERIFY_NONE, NULL);
429 SSL_CTX_set_verify(mosq->ssl_ctx, SSL_VERIFY_PEER, _mosquitto_server_certificate_verify);
432 if(mosq->tls_pw_callback){
433 SSL_CTX_set_default_passwd_cb(mosq->ssl_ctx, mosq->tls_pw_callback);
434 SSL_CTX_set_default_passwd_cb_userdata(mosq->ssl_ctx, mosq);
437 if(mosq->tls_certfile){
438 ret = SSL_CTX_use_certificate_chain_file(mosq->ssl_ctx, mosq->tls_certfile);
441 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client certificate, check bridge_certfile \"%s\".", mosq->tls_certfile);
443 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client certificate \"%s\".", mosq->tls_certfile);
449 if(mosq->tls_keyfile){
450 ret = SSL_CTX_use_PrivateKey_file(mosq->ssl_ctx, mosq->tls_keyfile, SSL_FILETYPE_PEM);
453 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client key file, check bridge_keyfile \"%s\".", mosq->tls_keyfile);
455 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client key file \"%s\".", mosq->tls_keyfile);
460 ret = SSL_CTX_check_private_key(mosq->ssl_ctx);
462 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Client certificate/key are inconsistent.");
467 #ifdef REAL_WITH_TLS_PSK
468 }else if(mosq->tls_psk){
469 SSL_CTX_set_psk_client_callback(mosq->ssl_ctx, psk_client_callback);
473 mosq->ssl = SSL_new(mosq->ssl_ctx);
478 SSL_set_ex_data(mosq->ssl, tls_ex_index_mosq, mosq);
479 bio = BIO_new_socket(sock, BIO_NOCLOSE);
484 SSL_set_bio(mosq->ssl, bio, bio);
486 ret = SSL_connect(mosq->ssl);
488 ret = SSL_get_error(mosq->ssl, ret);
489 if(ret == SSL_ERROR_WANT_READ){
490 /* We always try to read anyway */
491 }else if(ret == SSL_ERROR_WANT_WRITE){
492 mosq->want_write = true;
503 return MOSQ_ERR_SUCCESS;
506 int _mosquitto_read_byte(struct _mosquitto_packet *packet, uint8_t *byte)
509 if(packet->pos+1 > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
511 *byte = packet->payload[packet->pos];
514 return MOSQ_ERR_SUCCESS;
517 void _mosquitto_write_byte(struct _mosquitto_packet *packet, uint8_t byte)
520 assert(packet->pos+1 <= packet->packet_length);
522 packet->payload[packet->pos] = byte;
526 int _mosquitto_read_bytes(struct _mosquitto_packet *packet, void *bytes, uint32_t count)
529 if(packet->pos+count > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
531 memcpy(bytes, &(packet->payload[packet->pos]), count);
532 packet->pos += count;
534 return MOSQ_ERR_SUCCESS;
537 void _mosquitto_write_bytes(struct _mosquitto_packet *packet, const void *bytes, uint32_t count)
540 assert(packet->pos+count <= packet->packet_length);
542 memcpy(&(packet->payload[packet->pos]), bytes, count);
543 packet->pos += count;
546 int _mosquitto_read_string(struct _mosquitto_packet *packet, char **str)
552 rc = _mosquitto_read_uint16(packet, &len);
555 if(packet->pos+len > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
557 *str = _mosquitto_calloc(len+1, sizeof(char));
559 memcpy(*str, &(packet->payload[packet->pos]), len);
562 return MOSQ_ERR_NOMEM;
565 return MOSQ_ERR_SUCCESS;
568 void _mosquitto_write_string(struct _mosquitto_packet *packet, const char *str, uint16_t length)
571 _mosquitto_write_uint16(packet, length);
572 _mosquitto_write_bytes(packet, str, length);
575 int _mosquitto_read_uint16(struct _mosquitto_packet *packet, uint16_t *word)
580 if(packet->pos+2 > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
582 msb = packet->payload[packet->pos];
584 lsb = packet->payload[packet->pos];
587 *word = (msb<<8) + lsb;
589 return MOSQ_ERR_SUCCESS;
592 void _mosquitto_write_uint16(struct _mosquitto_packet *packet, uint16_t word)
594 _mosquitto_write_byte(packet, MOSQ_MSB(word));
595 _mosquitto_write_byte(packet, MOSQ_LSB(word));
598 ssize_t _mosquitto_net_read(struct mosquitto *mosq, void *buf, size_t count)
610 ret = SSL_read(mosq->ssl, buf, count);
612 err = SSL_get_error(mosq->ssl, ret);
613 if(err == SSL_ERROR_WANT_READ){
616 }else if(err == SSL_ERROR_WANT_WRITE){
618 mosq->want_write = true;
623 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "OpenSSL Error: %s", ERR_error_string(e, ebuf));
629 return (ssize_t )ret;
631 /* Call normal read/recv */
636 return read(mosq->sock, buf, count);
638 return recv(mosq->sock, buf, count, 0);
646 ssize_t _mosquitto_net_write(struct mosquitto *mosq, void *buf, size_t count)
659 ret = SSL_write(mosq->ssl, buf, count);
661 err = SSL_get_error(mosq->ssl, ret);
662 if(err == SSL_ERROR_WANT_READ){
665 }else if(err == SSL_ERROR_WANT_WRITE){
667 mosq->want_write = true;
672 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "OpenSSL Error: %s", ERR_error_string(e, ebuf));
678 return (ssize_t )ret;
680 /* Call normal write/send */
684 return write(mosq->sock, buf, count);
686 return send(mosq->sock, buf, count, 0);
694 int _mosquitto_packet_write(struct mosquitto *mosq)
696 ssize_t write_length;
697 struct _mosquitto_packet *packet;
699 if(!mosq) return MOSQ_ERR_INVAL;
700 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
702 pthread_mutex_lock(&mosq->current_out_packet_mutex);
703 pthread_mutex_lock(&mosq->out_packet_mutex);
704 if(mosq->out_packet && !mosq->current_out_packet){
705 mosq->current_out_packet = mosq->out_packet;
706 mosq->out_packet = mosq->out_packet->next;
707 if(!mosq->out_packet){
708 mosq->out_packet_last = NULL;
711 pthread_mutex_unlock(&mosq->out_packet_mutex);
713 while(mosq->current_out_packet){
714 packet = mosq->current_out_packet;
716 while(packet->to_process > 0){
717 write_length = _mosquitto_net_write(mosq, &(packet->payload[packet->pos]), packet->to_process);
718 if(write_length > 0){
719 #if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
720 g_bytes_sent += write_length;
722 packet->to_process -= write_length;
723 packet->pos += write_length;
726 errno = WSAGetLastError();
728 if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
729 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
730 return MOSQ_ERR_SUCCESS;
732 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
734 case COMPAT_ECONNRESET:
735 return MOSQ_ERR_CONN_LOST;
737 return MOSQ_ERR_ERRNO;
744 # ifdef WITH_SYS_TREE
746 if(((packet->command)&0xF6) == PUBLISH){
751 if(((packet->command)&0xF6) == PUBLISH){
752 pthread_mutex_lock(&mosq->callback_mutex);
753 if(mosq->on_publish){
754 /* This is a QoS=0 message */
755 mosq->in_callback = true;
756 mosq->on_publish(mosq, mosq->userdata, packet->mid);
757 mosq->in_callback = false;
759 pthread_mutex_unlock(&mosq->callback_mutex);
760 }else if(((packet->command)&0xF0) == DISCONNECT){
761 /* FIXME what cleanup needs doing here?
762 * incoming/outgoing messages? */
763 _mosquitto_socket_close(mosq);
765 /* Start of duplicate, possibly unnecessary code.
766 * This does leave things in a consistent state at least. */
767 /* Free data and reset values */
768 pthread_mutex_lock(&mosq->out_packet_mutex);
769 mosq->current_out_packet = mosq->out_packet;
770 if(mosq->out_packet){
771 mosq->out_packet = mosq->out_packet->next;
772 if(!mosq->out_packet){
773 mosq->out_packet_last = NULL;
776 pthread_mutex_unlock(&mosq->out_packet_mutex);
778 _mosquitto_packet_cleanup(packet);
779 _mosquitto_free(packet);
781 pthread_mutex_lock(&mosq->msgtime_mutex);
782 mosq->last_msg_out = mosquitto_time();
783 pthread_mutex_unlock(&mosq->msgtime_mutex);
784 /* End of duplicate, possibly unnecessary code */
786 pthread_mutex_lock(&mosq->callback_mutex);
787 if(mosq->on_disconnect){
788 mosq->in_callback = true;
789 mosq->on_disconnect(mosq, mosq->userdata, 0);
790 mosq->in_callback = false;
792 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
793 return MOSQ_ERR_SUCCESS;
797 /* Free data and reset values */
798 pthread_mutex_lock(&mosq->out_packet_mutex);
799 mosq->current_out_packet = mosq->out_packet;
800 if(mosq->out_packet){
801 mosq->out_packet = mosq->out_packet->next;
802 if(!mosq->out_packet){
803 mosq->out_packet_last = NULL;
806 pthread_mutex_unlock(&mosq->out_packet_mutex);
808 _mosquitto_packet_cleanup(packet);
809 _mosquitto_free(packet);
811 pthread_mutex_lock(&mosq->msgtime_mutex);
812 mosq->last_msg_out = mosquitto_time();
813 pthread_mutex_unlock(&mosq->msgtime_mutex);
815 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
816 return MOSQ_ERR_SUCCESS;
820 int _mosquitto_packet_read(struct mosquitto_db *db, struct mosquitto *mosq)
822 int _mosquitto_packet_read(struct mosquitto *mosq)
829 if(!mosq) return MOSQ_ERR_INVAL;
830 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
831 /* This gets called if pselect() indicates that there is network data
832 * available - ie. at least one byte. What we do depends on what data we
834 * If we've not got a command, attempt to read one and save it. This should
835 * always work because it's only a single byte.
836 * Then try to read the remaining length. This may fail because it is may
837 * be more than one byte - will need to save data pending next read if it
839 * Then try to read the remaining payload, where 'payload' here means the
840 * combined variable header and actual payload. This is the most likely to
841 * fail due to longer length, so save current data and current position.
842 * After all data is read, send to _mosquitto_handle_packet() to deal with.
843 * Finally, free the memory and reset everything to starting conditions.
845 if(!mosq->in_packet.command){
846 read_length = _mosquitto_net_read(mosq, &byte, 1);
847 if(read_length == 1){
848 mosq->in_packet.command = byte;
850 # ifdef WITH_SYS_TREE
853 /* Clients must send CONNECT as their first command. */
854 if(!(mosq->bridge) && mosq->state == mosq_cs_new && (byte&0xF0) != CONNECT) return MOSQ_ERR_PROTOCOL;
857 if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */
859 errno = WSAGetLastError();
861 if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
862 return MOSQ_ERR_SUCCESS;
865 case COMPAT_ECONNRESET:
866 return MOSQ_ERR_CONN_LOST;
868 return MOSQ_ERR_ERRNO;
873 if(!mosq->in_packet.have_remaining){
875 read_length = _mosquitto_net_read(mosq, &byte, 1);
876 if(read_length == 1){
877 mosq->in_packet.remaining_count++;
878 /* Max 4 bytes length for remaining length as defined by protocol.
879 * Anything more likely means a broken/malicious client.
881 if(mosq->in_packet.remaining_count > 4) return MOSQ_ERR_PROTOCOL;
883 #if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
886 mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;
887 mosq->in_packet.remaining_mult *= 128;
889 if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */
891 errno = WSAGetLastError();
893 if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
894 return MOSQ_ERR_SUCCESS;
897 case COMPAT_ECONNRESET:
898 return MOSQ_ERR_CONN_LOST;
900 return MOSQ_ERR_ERRNO;
904 }while((byte & 128) != 0);
906 if(mosq->in_packet.remaining_length > 0){
907 mosq->in_packet.payload = _mosquitto_malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));
908 if(!mosq->in_packet.payload) return MOSQ_ERR_NOMEM;
909 mosq->in_packet.to_process = mosq->in_packet.remaining_length;
911 mosq->in_packet.have_remaining = 1;
913 while(mosq->in_packet.to_process>0){
914 read_length = _mosquitto_net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
916 #if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
917 g_bytes_received += read_length;
919 mosq->in_packet.to_process -= read_length;
920 mosq->in_packet.pos += read_length;
923 errno = WSAGetLastError();
925 if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
926 if(mosq->in_packet.to_process > 1000){
927 /* Update last_msg_in time if more than 1000 bytes left to
928 * receive. Helps when receiving large messages.
929 * This is an arbitrary limit, but with some consideration.
930 * If a client can't send 1000 bytes in a second it
931 * probably shouldn't be using a 1 second keep alive. */
932 pthread_mutex_lock(&mosq->msgtime_mutex);
933 mosq->last_msg_in = mosquitto_time();
934 pthread_mutex_unlock(&mosq->msgtime_mutex);
936 return MOSQ_ERR_SUCCESS;
939 case COMPAT_ECONNRESET:
940 return MOSQ_ERR_CONN_LOST;
942 return MOSQ_ERR_ERRNO;
948 /* All data for this packet is read. */
949 mosq->in_packet.pos = 0;
951 # ifdef WITH_SYS_TREE
953 if(((mosq->in_packet.command)&0xF5) == PUBLISH){
954 g_pub_msgs_received++;
957 rc = mqtt3_packet_handle(db, mosq);
959 rc = _mosquitto_packet_handle(mosq);
962 /* Free data and reset values */
963 _mosquitto_packet_cleanup(&mosq->in_packet);
965 pthread_mutex_lock(&mosq->msgtime_mutex);
966 mosq->last_msg_in = mosquitto_time();
967 pthread_mutex_unlock(&mosq->msgtime_mutex);
971 int _mosquitto_socket_nonblock(int sock)
975 /* Set non-blocking */
976 opt = fcntl(sock, F_GETFL, 0);
981 if(fcntl(sock, F_SETFL, opt | O_NONBLOCK) == -1){
982 /* If either fcntl fails, don't want to allow this client to connect. */
988 if(ioctlsocket(sock, FIONBIO, &opt)){
998 int _mosquitto_socketpair(int *pairR, int *pairW)
1001 int family[2] = {AF_INET, AF_INET6};
1003 struct sockaddr_storage ss;
1004 struct sockaddr_in *sa = (struct sockaddr_in *)&ss;
1005 struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&ss;
1015 memset(&ss, 0, sizeof(ss));
1016 if(family[i] == AF_INET){
1017 sa->sin_family = family[i];
1018 sa->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1020 ss_len = sizeof(struct sockaddr_in);
1021 }else if(family[i] == AF_INET6){
1022 sa6->sin6_family = family[i];
1023 sa6->sin6_addr = in6addr_loopback;
1025 ss_len = sizeof(struct sockaddr_in6);
1027 return MOSQ_ERR_INVAL;
1030 listensock = socket(family[i], SOCK_STREAM, IPPROTO_TCP);
1031 if(listensock == -1){
1035 if(bind(listensock, (struct sockaddr *)&ss, ss_len) == -1){
1036 COMPAT_CLOSE(listensock);
1040 if(listen(listensock, 1) == -1){
1041 COMPAT_CLOSE(listensock);
1044 memset(&ss, 0, sizeof(ss));
1045 ss_len = sizeof(ss);
1046 if(getsockname(listensock, (struct sockaddr *)&ss, &ss_len) < 0){
1047 COMPAT_CLOSE(listensock);
1051 if(_mosquitto_socket_nonblock(listensock)){
1055 if(family[i] == AF_INET){
1056 sa->sin_family = family[i];
1057 sa->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1058 ss_len = sizeof(struct sockaddr_in);
1059 }else if(family[i] == AF_INET6){
1060 sa6->sin6_family = family[i];
1061 sa6->sin6_addr = in6addr_loopback;
1062 ss_len = sizeof(struct sockaddr_in6);
1065 spR = socket(family[i], SOCK_STREAM, IPPROTO_TCP);
1067 COMPAT_CLOSE(listensock);
1070 if(_mosquitto_socket_nonblock(spR)){
1071 COMPAT_CLOSE(listensock);
1074 if(connect(spR, (struct sockaddr *)&ss, ss_len) < 0){
1076 errno = WSAGetLastError();
1078 if(errno != EINPROGRESS && errno != COMPAT_EWOULDBLOCK){
1080 COMPAT_CLOSE(listensock);
1084 spW = accept(listensock, NULL, 0);
1087 errno = WSAGetLastError();
1089 if(errno != EINPROGRESS && errno != COMPAT_EWOULDBLOCK){
1091 COMPAT_CLOSE(listensock);
1096 if(_mosquitto_socket_nonblock(spW)){
1098 COMPAT_CLOSE(listensock);
1101 COMPAT_CLOSE(listensock);
1105 return MOSQ_ERR_SUCCESS;
1107 return MOSQ_ERR_UNKNOWN;
1111 if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1){
1112 return MOSQ_ERR_ERRNO;
1114 if(_mosquitto_socket_nonblock(sv[0])){
1115 COMPAT_CLOSE(sv[0]);
1116 COMPAT_CLOSE(sv[1]);
1117 return MOSQ_ERR_ERRNO;
1119 if(_mosquitto_socket_nonblock(sv[1])){
1120 COMPAT_CLOSE(sv[0]);
1121 COMPAT_CLOSE(sv[1]);
1122 return MOSQ_ERR_ERRNO;
1126 return MOSQ_ERR_SUCCESS;