Imported Upstream version 0.9.2
[platform/upstream/iotivity.git] / service / protocol-plugin / plugins / mqtt-fan / lib / net_mosq.c
1 /*
2 Copyright (c) 2009-2013 Roger Light <roger@atchoo.org>
3 All rights reserved.
4
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
7
8 1. Redistributions of source code must retain the above copyright notice,
9    this list of conditions and the following disclaimer.
10 2. Redistributions in binary form must reproduce the above copyright
11    notice, this list of conditions and the following disclaimer in the
12    documentation and/or other materials provided with the distribution.
13 3. Neither the name of mosquitto nor the names of its
14    contributors may be used to endorse or promote products derived from
15    this software without specific prior written permission.
16
17 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #ifndef _POSIX_C_SOURCE
31 // Defining _POSIX_C_SOURCE macro with 200112L (or greater) as value
32 // causes header files to expose definitions
33 // corresponding to the POSIX.1-2001 base
34 // specification (excluding the XSI extension).
35 // For POSIX.1-2001 base specification,
36 // Refer http://pubs.opengroup.org/onlinepubs/009695399/
37 //
38 // For this specific file, see use of getaddrinfo & structs,
39 // Refer http://man7.org/linux/man-pages/man3/getaddrinfo.3.html
40 #define _POSIX_C_SOURCE 200112L // needed for getaddrinfo & structs
41 #endif
42
43 #include <assert.h>
44 #include <errno.h>
45 #include <fcntl.h>
46 #include <stdio.h>
47 #include <string.h>
48 #ifndef WIN32
49 #include <netdb.h>
50 #include <sys/socket.h>
51 #include <unistd.h>
52 #else
53 #include <winsock2.h>
54 #include <ws2tcpip.h>
55 #endif
56
57 #ifdef __ANDROID__
58 #include <linux/in.h>
59 #include <linux/in6.h>
60 #include <sys/endian.h>
61 #endif
62
63 #ifdef __FreeBSD__
64 #  include <netinet/in.h>
65 #endif
66
67 #ifdef __SYMBIAN32__
68 #include <netinet/in.h>
69 #endif
70
71 #ifdef __QNX__
72 #ifndef AI_ADDRCONFIG
73 #define AI_ADDRCONFIG 0
74 #endif
75 #include <net/netbyte.h>
76 #include <netinet/in.h>
77 #endif
78
79 #ifdef WITH_TLS
80 #include <openssl/err.h>
81 #include <tls_mosq.h>
82 #endif
83
84 #ifdef WITH_BROKER
85 #  include <mosquitto_broker.h>
86 #  ifdef WITH_SYS_TREE
87    extern uint64_t g_bytes_received;
88    extern uint64_t g_bytes_sent;
89    extern unsigned long g_msgs_received;
90    extern unsigned long g_msgs_sent;
91    extern unsigned long g_pub_msgs_received;
92    extern unsigned long g_pub_msgs_sent;
93 #  endif
94 #else
95 #  include <read_handle.h>
96 #endif
97
98 #include "logging_mosq.h"
99 #include "memory_mosq.h"
100 #include "mqtt3_protocol.h"
101 #include "net_mosq.h"
102 #include "time_mosq.h"
103 #include "util_mosq.h"
104
105 #ifdef WITH_TLS
106 int tls_ex_index_mosq = -1;
107 #endif
108
109 void _mosquitto_net_init(void)
110 {
111 #ifdef WIN32
112         WSADATA wsaData;
113         WSAStartup(MAKEWORD(2,2), &wsaData);
114 #endif
115
116 #ifdef WITH_SRV
117         ares_library_init(ARES_LIB_INIT_ALL);
118 #endif
119
120 #ifdef WITH_TLS
121         SSL_load_error_strings();
122         SSL_library_init();
123         OpenSSL_add_all_algorithms();
124         if(tls_ex_index_mosq == -1){
125                 tls_ex_index_mosq = SSL_get_ex_new_index(0, "client context", NULL, NULL, NULL);
126         }
127 #endif
128 }
129
130 void _mosquitto_net_cleanup(void)
131 {
132 #ifdef WITH_TLS
133         ERR_free_strings();
134         EVP_cleanup();
135         CRYPTO_cleanup_all_ex_data();
136 #endif
137
138 #ifdef WITH_SRV
139         ares_library_cleanup();
140 #endif
141
142 #ifdef WIN32
143         WSACleanup();
144 #endif
145 }
146
147 void _mosquitto_packet_cleanup(struct _mosquitto_packet *packet)
148 {
149         if(!packet) return;
150
151         /* Free data and reset values */
152         packet->command = 0;
153         packet->have_remaining = 0;
154         packet->remaining_count = 0;
155         packet->remaining_mult = 1;
156         packet->remaining_length = 0;
157         if(packet->payload) _mosquitto_free(packet->payload);
158         packet->payload = NULL;
159         packet->to_process = 0;
160         packet->pos = 0;
161 }
162
163 int _mosquitto_packet_queue(struct mosquitto *mosq, struct _mosquitto_packet *packet)
164 {
165 #ifndef WITH_BROKER
166         char sockpair_data = 0;
167 #endif
168         assert(mosq);
169         assert(packet);
170
171         packet->pos = 0;
172         packet->to_process = packet->packet_length;
173
174         packet->next = NULL;
175         pthread_mutex_lock(&mosq->out_packet_mutex);
176         if(mosq->out_packet){
177                 mosq->out_packet_last->next = packet;
178         }else{
179                 mosq->out_packet = packet;
180         }
181         mosq->out_packet_last = packet;
182         pthread_mutex_unlock(&mosq->out_packet_mutex);
183 #ifdef WITH_BROKER
184         return _mosquitto_packet_write(mosq);
185 #else
186
187         /* Write a single byte to sockpairW (connected to sockpairR) to break out
188          * of select() if in threaded mode. */
189         if(mosq->sockpairW != INVALID_SOCKET){
190 #ifndef WIN32
191                 if(write(mosq->sockpairW, &sockpair_data, 1)){
192                 }
193 #else
194                 send(mosq->sockpairW, &sockpair_data, 1, 0);
195 #endif
196         }
197
198         if(mosq->in_callback == false && mosq->threaded == false){
199                 return _mosquitto_packet_write(mosq);
200         }else{
201                 return MOSQ_ERR_SUCCESS;
202         }
203 #endif
204 }
205
206 /* Close a socket associated with a context and set it to -1.
207  * Returns 1 on failure (context is NULL)
208  * Returns 0 on success.
209  */
210 int _mosquitto_socket_close(struct mosquitto *mosq)
211 {
212         int rc = 0;
213
214         assert(mosq);
215 #ifdef WITH_TLS
216         if(mosq->ssl){
217                 SSL_shutdown(mosq->ssl);
218                 SSL_free(mosq->ssl);
219                 mosq->ssl = NULL;
220         }
221         if(mosq->ssl_ctx){
222                 SSL_CTX_free(mosq->ssl_ctx);
223                 mosq->ssl_ctx = NULL;
224         }
225 #endif
226
227         if(mosq->sock != INVALID_SOCKET){
228                 rc = COMPAT_CLOSE(mosq->sock);
229                 mosq->sock = INVALID_SOCKET;
230         }
231
232         return rc;
233 }
234
235 #ifdef REAL_WITH_TLS_PSK
236 static unsigned int psk_client_callback(SSL *ssl, const char *hint,
237                 char *identity, unsigned int max_identity_len,
238                 unsigned char *psk, unsigned int max_psk_len)
239 {
240         struct mosquitto *mosq;
241         int len;
242
243         mosq = SSL_get_ex_data(ssl, tls_ex_index_mosq);
244         if(!mosq) return 0;
245
246         snprintf(identity, max_identity_len, "%s", mosq->tls_psk_identity);
247
248         len = _mosquitto_hex2bin(mosq->tls_psk, psk, max_psk_len);
249         if (len < 0) return 0;
250         return len;
251 }
252 #endif
253
254 int _mosquitto_try_connect(const char *host, uint16_t port, int *sock, const char *bind_address, bool blocking)
255 {
256         struct addrinfo hints;
257         struct addrinfo *ainfo, *rp;
258         struct addrinfo *ainfo_bind, *rp_bind;
259         int s;
260         int rc;
261 #ifdef WIN32
262         uint32_t val = 1;
263 #endif
264
265         *sock = INVALID_SOCKET;
266         memset(&hints, 0, sizeof(struct addrinfo));
267         hints.ai_family = PF_UNSPEC;
268         hints.ai_flags = AI_ADDRCONFIG;
269         hints.ai_socktype = SOCK_STREAM;
270
271         s = getaddrinfo(host, NULL, &hints, &ainfo);
272         if(s){
273                 errno = s;
274                 return MOSQ_ERR_EAI;
275         }
276
277         if(bind_address){
278                 s = getaddrinfo(bind_address, NULL, &hints, &ainfo_bind);
279                 if(s){
280                         freeaddrinfo(ainfo);
281                         errno = s;
282                         return MOSQ_ERR_EAI;
283                 }
284         }
285
286         for(rp = ainfo; rp != NULL; rp = rp->ai_next){
287                 *sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
288                 if(*sock == INVALID_SOCKET) continue;
289                 
290                 if(rp->ai_family == PF_INET){
291                         ((struct sockaddr_in *)rp->ai_addr)->sin_port = htons(port);
292                 }else if(rp->ai_family == PF_INET6){
293                         ((struct sockaddr_in6 *)rp->ai_addr)->sin6_port = htons(port);
294                 }else{
295                         continue;
296                 }
297
298                 if(bind_address){
299                         for(rp_bind = ainfo_bind; rp_bind != NULL; rp_bind = rp_bind->ai_next){
300                                 if(bind(*sock, rp_bind->ai_addr, rp_bind->ai_addrlen) == 0){
301                                         break;
302                                 }
303                         }
304                         if(!rp_bind){
305                                 COMPAT_CLOSE(*sock);
306                                 continue;
307                         }
308                 }
309
310                 if(!blocking){
311                         /* Set non-blocking */
312                         if(_mosquitto_socket_nonblock(*sock)){
313                                 COMPAT_CLOSE(*sock);
314                                 continue;
315                         }
316                 }
317
318                 rc = connect(*sock, rp->ai_addr, rp->ai_addrlen);
319 #ifdef WIN32
320                 errno = WSAGetLastError();
321 #endif
322                 if(rc == 0 || errno == EINPROGRESS || errno == COMPAT_EWOULDBLOCK){
323                         if(blocking){
324                                 /* Set non-blocking */
325                                 if(_mosquitto_socket_nonblock(*sock)){
326                                         COMPAT_CLOSE(*sock);
327                                         continue;
328                                 }
329                         }
330                         break;
331                 }
332
333                 COMPAT_CLOSE(*sock);
334                 *sock = INVALID_SOCKET;
335         }
336         freeaddrinfo(ainfo);
337         if(bind_address){
338                 freeaddrinfo(ainfo_bind);
339         }
340         if(!rp){
341                 return MOSQ_ERR_ERRNO;
342         }
343         return MOSQ_ERR_SUCCESS;
344 }
345
346 /* Create a socket and connect it to 'ip' on port 'port'.
347  * Returns -1 on failure (ip is NULL, socket creation/connection error)
348  * Returns sock number on success.
349  */
350 int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking)
351 {
352         int sock = INVALID_SOCKET;
353         int rc;
354 #ifdef WITH_TLS
355         int ret;
356         BIO *bio;
357 #endif
358
359         if(!mosq || !host || !port) return MOSQ_ERR_INVAL;
360
361 #ifdef WITH_TLS
362         if(mosq->tls_cafile || mosq->tls_capath || mosq->tls_psk){
363                 blocking = true;
364         }
365 #endif
366
367         rc = _mosquitto_try_connect(host, port, &sock, bind_address, blocking);
368         if(rc != MOSQ_ERR_SUCCESS) return rc;
369
370 #ifdef WITH_TLS
371         if(mosq->tls_cafile || mosq->tls_capath || mosq->tls_psk){
372 #if OPENSSL_VERSION_NUMBER >= 0x10001000L
373                 if(!mosq->tls_version || !strcmp(mosq->tls_version, "tlsv1.2")){
374                         mosq->ssl_ctx = SSL_CTX_new(TLSv1_2_client_method());
375                 }else if(!strcmp(mosq->tls_version, "tlsv1.1")){
376                         mosq->ssl_ctx = SSL_CTX_new(TLSv1_1_client_method());
377                 }else if(!strcmp(mosq->tls_version, "tlsv1")){
378                         mosq->ssl_ctx = SSL_CTX_new(TLSv1_client_method());
379                 }else{
380                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Protocol %s not supported.", mosq->tls_version);
381                         COMPAT_CLOSE(sock);
382                         return MOSQ_ERR_INVAL;
383                 }
384 #else
385                 if(!mosq->tls_version || !strcmp(mosq->tls_version, "tlsv1")){
386                         mosq->ssl_ctx = SSL_CTX_new(TLSv1_client_method());
387                 }else{
388                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Protocol %s not supported.", mosq->tls_version);
389                         COMPAT_CLOSE(sock);
390                         return MOSQ_ERR_INVAL;
391                 }
392 #endif
393                 if(!mosq->ssl_ctx){
394                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to create TLS context.");
395                         COMPAT_CLOSE(sock);
396                         return MOSQ_ERR_TLS;
397                 }
398
399 #if OPENSSL_VERSION_NUMBER >= 0x10000000
400                 /* Disable compression */
401                 SSL_CTX_set_options(mosq->ssl_ctx, SSL_OP_NO_COMPRESSION);
402 #endif
403 #ifdef SSL_MODE_RELEASE_BUFFERS
404                         /* Use even less memory per SSL connection. */
405                         SSL_CTX_set_mode(mosq->ssl_ctx, SSL_MODE_RELEASE_BUFFERS);
406 #endif
407
408                 if(mosq->tls_ciphers){
409                         ret = SSL_CTX_set_cipher_list(mosq->ssl_ctx, mosq->tls_ciphers);
410                         if(ret == 0){
411                                 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to set TLS ciphers. Check cipher list \"%s\".", mosq->tls_ciphers);
412                                 COMPAT_CLOSE(sock);
413                                 return MOSQ_ERR_TLS;
414                         }
415                 }
416                 if(mosq->tls_cafile || mosq->tls_capath){
417                         ret = SSL_CTX_load_verify_locations(mosq->ssl_ctx, mosq->tls_cafile, mosq->tls_capath);
418                         if(ret == 0){
419 #ifdef WITH_BROKER
420                                 if(mosq->tls_cafile && mosq->tls_capath){
421                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check bridge_cafile \"%s\" and bridge_capath \"%s\".", mosq->tls_cafile, mosq->tls_capath);
422                                 }else if(mosq->tls_cafile){
423                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check bridge_cafile \"%s\".", mosq->tls_cafile);
424                                 }else{
425                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check bridge_capath \"%s\".", mosq->tls_capath);
426                                 }
427 #else
428                                 if(mosq->tls_cafile && mosq->tls_capath){
429                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check cafile \"%s\" and capath \"%s\".", mosq->tls_cafile, mosq->tls_capath);
430                                 }else if(mosq->tls_cafile){
431                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check cafile \"%s\".", mosq->tls_cafile);
432                                 }else{
433                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check capath \"%s\".", mosq->tls_capath);
434                                 }
435 #endif
436                                 COMPAT_CLOSE(sock);
437                                 return MOSQ_ERR_TLS;
438                         }
439                         if(mosq->tls_cert_reqs == 0){
440                                 SSL_CTX_set_verify(mosq->ssl_ctx, SSL_VERIFY_NONE, NULL);
441                         }else{
442                                 SSL_CTX_set_verify(mosq->ssl_ctx, SSL_VERIFY_PEER, _mosquitto_server_certificate_verify);
443                         }
444
445                         if(mosq->tls_pw_callback){
446                                 SSL_CTX_set_default_passwd_cb(mosq->ssl_ctx, mosq->tls_pw_callback);
447                                 SSL_CTX_set_default_passwd_cb_userdata(mosq->ssl_ctx, mosq);
448                         }
449
450                         if(mosq->tls_certfile){
451                                 ret = SSL_CTX_use_certificate_chain_file(mosq->ssl_ctx, mosq->tls_certfile);
452                                 if(ret != 1){
453 #ifdef WITH_BROKER
454                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client certificate, check bridge_certfile \"%s\".", mosq->tls_certfile);
455 #else
456                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client certificate \"%s\".", mosq->tls_certfile);
457 #endif
458                                         COMPAT_CLOSE(sock);
459                                         return MOSQ_ERR_TLS;
460                                 }
461                         }
462                         if(mosq->tls_keyfile){
463                                 ret = SSL_CTX_use_PrivateKey_file(mosq->ssl_ctx, mosq->tls_keyfile, SSL_FILETYPE_PEM);
464                                 if(ret != 1){
465 #ifdef WITH_BROKER
466                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client key file, check bridge_keyfile \"%s\".", mosq->tls_keyfile);
467 #else
468                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client key file \"%s\".", mosq->tls_keyfile);
469 #endif
470                                         COMPAT_CLOSE(sock);
471                                         return MOSQ_ERR_TLS;
472                                 }
473                                 ret = SSL_CTX_check_private_key(mosq->ssl_ctx);
474                                 if(ret != 1){
475                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Client certificate/key are inconsistent.");
476                                         COMPAT_CLOSE(sock);
477                                         return MOSQ_ERR_TLS;
478                                 }
479                         }
480 #ifdef REAL_WITH_TLS_PSK
481                 }else if(mosq->tls_psk){
482                         SSL_CTX_set_psk_client_callback(mosq->ssl_ctx, psk_client_callback);
483 #endif
484                 }
485
486                 mosq->ssl = SSL_new(mosq->ssl_ctx);
487                 if(!mosq->ssl){
488                         COMPAT_CLOSE(sock);
489                         return MOSQ_ERR_TLS;
490                 }
491                 SSL_set_ex_data(mosq->ssl, tls_ex_index_mosq, mosq);
492                 bio = BIO_new_socket(sock, BIO_NOCLOSE);
493                 if(!bio){
494                         COMPAT_CLOSE(sock);
495                         return MOSQ_ERR_TLS;
496                 }
497                 SSL_set_bio(mosq->ssl, bio, bio);
498
499                 ret = SSL_connect(mosq->ssl);
500                 if(ret != 1){
501                         ret = SSL_get_error(mosq->ssl, ret);
502                         if(ret == SSL_ERROR_WANT_READ){
503                                 /* We always try to read anyway */
504                         }else if(ret == SSL_ERROR_WANT_WRITE){
505                                 mosq->want_write = true;
506                         }else{
507                                 COMPAT_CLOSE(sock);
508                                 return MOSQ_ERR_TLS;
509                         }
510                 }
511         }
512 #endif
513
514         mosq->sock = sock;
515
516         return MOSQ_ERR_SUCCESS;
517 }
518
519 int _mosquitto_read_byte(struct _mosquitto_packet *packet, uint8_t *byte)
520 {
521         assert(packet);
522         if(packet->pos+1 > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
523
524         *byte = packet->payload[packet->pos];
525         packet->pos++;
526
527         return MOSQ_ERR_SUCCESS;
528 }
529
530 void _mosquitto_write_byte(struct _mosquitto_packet *packet, uint8_t byte)
531 {
532         assert(packet);
533         assert(packet->pos+1 <= packet->packet_length);
534
535         packet->payload[packet->pos] = byte;
536         packet->pos++;
537 }
538
539 int _mosquitto_read_bytes(struct _mosquitto_packet *packet, void *bytes, uint32_t count)
540 {
541         assert(packet);
542         if(packet->pos+count > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
543
544         memcpy(bytes, &(packet->payload[packet->pos]), count);
545         packet->pos += count;
546
547         return MOSQ_ERR_SUCCESS;
548 }
549
550 void _mosquitto_write_bytes(struct _mosquitto_packet *packet, const void *bytes, uint32_t count)
551 {
552         assert(packet);
553         assert(packet->pos+count <= packet->packet_length);
554
555         memcpy(&(packet->payload[packet->pos]), bytes, count);
556         packet->pos += count;
557 }
558
559 int _mosquitto_read_string(struct _mosquitto_packet *packet, char **str)
560 {
561         uint16_t len;
562         int rc;
563
564         assert(packet);
565         rc = _mosquitto_read_uint16(packet, &len);
566         if(rc) return rc;
567
568         if(packet->pos+len > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
569
570         *str = _mosquitto_calloc(len+1, sizeof(char));
571         if(*str){
572                 memcpy(*str, &(packet->payload[packet->pos]), len);
573                 packet->pos += len;
574         }else{
575                 return MOSQ_ERR_NOMEM;
576         }
577
578         return MOSQ_ERR_SUCCESS;
579 }
580
581 void _mosquitto_write_string(struct _mosquitto_packet *packet, const char *str, uint16_t length)
582 {
583         assert(packet);
584         _mosquitto_write_uint16(packet, length);
585         _mosquitto_write_bytes(packet, str, length);
586 }
587
588 int _mosquitto_read_uint16(struct _mosquitto_packet *packet, uint16_t *word)
589 {
590         uint8_t msb, lsb;
591
592         assert(packet);
593         if(packet->pos+2 > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
594
595         msb = packet->payload[packet->pos];
596         packet->pos++;
597         lsb = packet->payload[packet->pos];
598         packet->pos++;
599
600         *word = (msb<<8) + lsb;
601
602         return MOSQ_ERR_SUCCESS;
603 }
604
605 void _mosquitto_write_uint16(struct _mosquitto_packet *packet, uint16_t word)
606 {
607         _mosquitto_write_byte(packet, MOSQ_MSB(word));
608         _mosquitto_write_byte(packet, MOSQ_LSB(word));
609 }
610
611 ssize_t _mosquitto_net_read(struct mosquitto *mosq, void *buf, size_t count)
612 {
613 #ifdef WITH_TLS
614         int ret;
615         int err;
616         char ebuf[256];
617         unsigned long e;
618 #endif
619         assert(mosq);
620         errno = 0;
621 #ifdef WITH_TLS
622         if(mosq->ssl){
623                 ret = SSL_read(mosq->ssl, buf, count);
624                 if(ret <= 0){
625                         err = SSL_get_error(mosq->ssl, ret);
626                         if(err == SSL_ERROR_WANT_READ){
627                                 ret = -1;
628                                 errno = EAGAIN;
629                         }else if(err == SSL_ERROR_WANT_WRITE){
630                                 ret = -1;
631                                 mosq->want_write = true;
632                                 errno = EAGAIN;
633                         }else{
634                                 e = ERR_get_error();
635                                 while(e){
636                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "OpenSSL Error: %s", ERR_error_string(e, ebuf));
637                                         e = ERR_get_error();
638                                 }
639                                 errno = EPROTO;
640                         }
641                 }
642                 return (ssize_t )ret;
643         }else{
644                 /* Call normal read/recv */
645
646 #endif
647
648 #ifndef WIN32
649         return read(mosq->sock, buf, count);
650 #else
651         return recv(mosq->sock, buf, count, 0);
652 #endif
653
654 #ifdef WITH_TLS
655         }
656 #endif
657 }
658
659 ssize_t _mosquitto_net_write(struct mosquitto *mosq, void *buf, size_t count)
660 {
661 #ifdef WITH_TLS
662         int ret;
663         int err;
664         char ebuf[256];
665         unsigned long e;
666 #endif
667         assert(mosq);
668
669         errno = 0;
670 #ifdef WITH_TLS
671         if(mosq->ssl){
672                 ret = SSL_write(mosq->ssl, buf, count);
673                 if(ret < 0){
674                         err = SSL_get_error(mosq->ssl, ret);
675                         if(err == SSL_ERROR_WANT_READ){
676                                 ret = -1;
677                                 errno = EAGAIN;
678                         }else if(err == SSL_ERROR_WANT_WRITE){
679                                 ret = -1;
680                                 mosq->want_write = true;
681                                 errno = EAGAIN;
682                         }else{
683                                 e = ERR_get_error();
684                                 while(e){
685                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "OpenSSL Error: %s", ERR_error_string(e, ebuf));
686                                         e = ERR_get_error();
687                                 }
688                                 errno = EPROTO;
689                         }
690                 }
691                 return (ssize_t )ret;
692         }else{
693                 /* Call normal write/send */
694 #endif
695
696 #ifndef WIN32
697         return write(mosq->sock, buf, count);
698 #else
699         return send(mosq->sock, buf, count, 0);
700 #endif
701
702 #ifdef WITH_TLS
703         }
704 #endif
705 }
706
707 int _mosquitto_packet_write(struct mosquitto *mosq)
708 {
709         ssize_t write_length;
710         struct _mosquitto_packet *packet;
711
712         if(!mosq) return MOSQ_ERR_INVAL;
713         if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
714
715         pthread_mutex_lock(&mosq->current_out_packet_mutex);
716         pthread_mutex_lock(&mosq->out_packet_mutex);
717         if(mosq->out_packet && !mosq->current_out_packet){
718                 mosq->current_out_packet = mosq->out_packet;
719                 mosq->out_packet = mosq->out_packet->next;
720                 if(!mosq->out_packet){
721                         mosq->out_packet_last = NULL;
722                 }
723         }
724         pthread_mutex_unlock(&mosq->out_packet_mutex);
725
726         while(mosq->current_out_packet){
727                 packet = mosq->current_out_packet;
728
729                 while(packet->to_process > 0){
730                         write_length = _mosquitto_net_write(mosq, &(packet->payload[packet->pos]), packet->to_process);
731                         if(write_length > 0){
732 #if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
733                                 g_bytes_sent += write_length;
734 #endif
735                                 packet->to_process -= write_length;
736                                 packet->pos += write_length;
737                         }else{
738 #ifdef WIN32
739                                 errno = WSAGetLastError();
740 #endif
741                                 if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
742                                         pthread_mutex_unlock(&mosq->current_out_packet_mutex);
743                                         return MOSQ_ERR_SUCCESS;
744                                 }else{
745                                         pthread_mutex_unlock(&mosq->current_out_packet_mutex);
746                                         switch(errno){
747                                                 case COMPAT_ECONNRESET:
748                                                         return MOSQ_ERR_CONN_LOST;
749                                                 default:
750                                                         return MOSQ_ERR_ERRNO;
751                                         }
752                                 }
753                         }
754                 }
755
756 #ifdef WITH_BROKER
757 #  ifdef WITH_SYS_TREE
758                 g_msgs_sent++;
759                 if(((packet->command)&0xF6) == PUBLISH){
760                         g_pub_msgs_sent++;
761                 }
762 #  endif
763 #else
764                 if(((packet->command)&0xF6) == PUBLISH){
765                         pthread_mutex_lock(&mosq->callback_mutex);
766                         if(mosq->on_publish){
767                                 /* This is a QoS=0 message */
768                                 mosq->in_callback = true;
769                                 mosq->on_publish(mosq, mosq->userdata, packet->mid);
770                                 mosq->in_callback = false;
771                         }
772                         pthread_mutex_unlock(&mosq->callback_mutex);
773                 }else if(((packet->command)&0xF0) == DISCONNECT){
774                         /* FIXME what cleanup needs doing here? 
775                          * incoming/outgoing messages? */
776                         _mosquitto_socket_close(mosq);
777
778                         /* Start of duplicate, possibly unnecessary code.
779                          * This does leave things in a consistent state at least. */
780                         /* Free data and reset values */
781                         pthread_mutex_lock(&mosq->out_packet_mutex);
782                         mosq->current_out_packet = mosq->out_packet;
783                         if(mosq->out_packet){
784                                 mosq->out_packet = mosq->out_packet->next;
785                                 if(!mosq->out_packet){
786                                         mosq->out_packet_last = NULL;
787                                 }
788                         }
789                         pthread_mutex_unlock(&mosq->out_packet_mutex);
790
791                         _mosquitto_packet_cleanup(packet);
792                         _mosquitto_free(packet);
793
794                         pthread_mutex_lock(&mosq->msgtime_mutex);
795                         mosq->last_msg_out = mosquitto_time();
796                         pthread_mutex_unlock(&mosq->msgtime_mutex);
797                         /* End of duplicate, possibly unnecessary code */
798
799                         pthread_mutex_lock(&mosq->callback_mutex);
800                         if(mosq->on_disconnect){
801                                 mosq->in_callback = true;
802                                 mosq->on_disconnect(mosq, mosq->userdata, 0);
803                                 mosq->in_callback = false;
804                         }
805                         pthread_mutex_unlock(&mosq->current_out_packet_mutex);
806                         return MOSQ_ERR_SUCCESS;
807                 }
808 #endif
809
810                 /* Free data and reset values */
811                 pthread_mutex_lock(&mosq->out_packet_mutex);
812                 mosq->current_out_packet = mosq->out_packet;
813                 if(mosq->out_packet){
814                         mosq->out_packet = mosq->out_packet->next;
815                         if(!mosq->out_packet){
816                                 mosq->out_packet_last = NULL;
817                         }
818                 }
819                 pthread_mutex_unlock(&mosq->out_packet_mutex);
820
821                 _mosquitto_packet_cleanup(packet);
822                 _mosquitto_free(packet);
823
824                 pthread_mutex_lock(&mosq->msgtime_mutex);
825                 mosq->last_msg_out = mosquitto_time();
826                 pthread_mutex_unlock(&mosq->msgtime_mutex);
827         }
828         pthread_mutex_unlock(&mosq->current_out_packet_mutex);
829         return MOSQ_ERR_SUCCESS;
830 }
831
832 #ifdef WITH_BROKER
833 int _mosquitto_packet_read(struct mosquitto_db *db, struct mosquitto *mosq)
834 #else
835 int _mosquitto_packet_read(struct mosquitto *mosq)
836 #endif
837 {
838         uint8_t byte;
839         ssize_t read_length;
840         int rc = 0;
841
842         if(!mosq) return MOSQ_ERR_INVAL;
843         if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
844         /* This gets called if pselect() indicates that there is network data
845          * available - ie. at least one byte.  What we do depends on what data we
846          * already have.
847          * If we've not got a command, attempt to read one and save it. This should
848          * always work because it's only a single byte.
849          * Then try to read the remaining length. This may fail because it is may
850          * be more than one byte - will need to save data pending next read if it
851          * does fail.
852          * Then try to read the remaining payload, where 'payload' here means the
853          * combined variable header and actual payload. This is the most likely to
854          * fail due to longer length, so save current data and current position.
855          * After all data is read, send to _mosquitto_handle_packet() to deal with.
856          * Finally, free the memory and reset everything to starting conditions.
857          */
858         if(!mosq->in_packet.command){
859                 read_length = _mosquitto_net_read(mosq, &byte, 1);
860                 if(read_length == 1){
861                         mosq->in_packet.command = byte;
862 #ifdef WITH_BROKER
863 #  ifdef WITH_SYS_TREE
864                         g_bytes_received++;
865 #  endif
866                         /* Clients must send CONNECT as their first command. */
867                         if(!(mosq->bridge) && mosq->state == mosq_cs_new && (byte&0xF0) != CONNECT) return MOSQ_ERR_PROTOCOL;
868 #endif
869                 }else{
870                         if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */
871 #ifdef WIN32
872                         errno = WSAGetLastError();
873 #endif
874                         if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
875                                 return MOSQ_ERR_SUCCESS;
876                         }else{
877                                 switch(errno){
878                                         case COMPAT_ECONNRESET:
879                                                 return MOSQ_ERR_CONN_LOST;
880                                         default:
881                                                 return MOSQ_ERR_ERRNO;
882                                 }
883                         }
884                 }
885         }
886         if(!mosq->in_packet.have_remaining){
887                 do{
888                         read_length = _mosquitto_net_read(mosq, &byte, 1);
889                         if(read_length == 1){
890                                 mosq->in_packet.remaining_count++;
891                                 /* Max 4 bytes length for remaining length as defined by protocol.
892                                  * Anything more likely means a broken/malicious client.
893                                  */
894                                 if(mosq->in_packet.remaining_count > 4) return MOSQ_ERR_PROTOCOL;
895
896 #if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
897                                 g_bytes_received++;
898 #endif
899                                 mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;
900                                 mosq->in_packet.remaining_mult *= 128;
901                         }else{
902                                 if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */
903 #ifdef WIN32
904                                 errno = WSAGetLastError();
905 #endif
906                                 if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
907                                         return MOSQ_ERR_SUCCESS;
908                                 }else{
909                                         switch(errno){
910                                                 case COMPAT_ECONNRESET:
911                                                         return MOSQ_ERR_CONN_LOST;
912                                                 default:
913                                                         return MOSQ_ERR_ERRNO;
914                                         }
915                                 }
916                         }
917                 }while((byte & 128) != 0);
918
919                 if(mosq->in_packet.remaining_length > 0){
920                         mosq->in_packet.payload = _mosquitto_malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));
921                         if(!mosq->in_packet.payload) return MOSQ_ERR_NOMEM;
922                         mosq->in_packet.to_process = mosq->in_packet.remaining_length;
923                 }
924                 mosq->in_packet.have_remaining = 1;
925         }
926         while(mosq->in_packet.to_process>0){
927                 read_length = _mosquitto_net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
928                 if(read_length > 0){
929 #if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
930                         g_bytes_received += read_length;
931 #endif
932                         mosq->in_packet.to_process -= read_length;
933                         mosq->in_packet.pos += read_length;
934                 }else{
935 #ifdef WIN32
936                         errno = WSAGetLastError();
937 #endif
938                         if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
939                                 if(mosq->in_packet.to_process > 1000){
940                                         /* Update last_msg_in time if more than 1000 bytes left to
941                                          * receive. Helps when receiving large messages.
942                                          * This is an arbitrary limit, but with some consideration.
943                                          * If a client can't send 1000 bytes in a second it
944                                          * probably shouldn't be using a 1 second keep alive. */
945                                         pthread_mutex_lock(&mosq->msgtime_mutex);
946                                         mosq->last_msg_in = mosquitto_time();
947                                         pthread_mutex_unlock(&mosq->msgtime_mutex);
948                                 }
949                                 return MOSQ_ERR_SUCCESS;
950                         }else{
951                                 switch(errno){
952                                         case COMPAT_ECONNRESET:
953                                                 return MOSQ_ERR_CONN_LOST;
954                                         default:
955                                                 return MOSQ_ERR_ERRNO;
956                                 }
957                         }
958                 }
959         }
960
961         /* All data for this packet is read. */
962         mosq->in_packet.pos = 0;
963 #ifdef WITH_BROKER
964 #  ifdef WITH_SYS_TREE
965         g_msgs_received++;
966         if(((mosq->in_packet.command)&0xF5) == PUBLISH){
967                 g_pub_msgs_received++;
968         }
969 #  endif
970         rc = mqtt3_packet_handle(db, mosq);
971 #else
972         rc = _mosquitto_packet_handle(mosq);
973 #endif
974
975         /* Free data and reset values */
976         _mosquitto_packet_cleanup(&mosq->in_packet);
977
978         pthread_mutex_lock(&mosq->msgtime_mutex);
979         mosq->last_msg_in = mosquitto_time();
980         pthread_mutex_unlock(&mosq->msgtime_mutex);
981         return rc;
982 }
983
984 int _mosquitto_socket_nonblock(int sock)
985 {
986 #ifndef WIN32
987         int opt;
988         /* Set non-blocking */
989         opt = fcntl(sock, F_GETFL, 0);
990         if(opt == -1){
991                 COMPAT_CLOSE(sock);
992                 return 1;
993         }
994         if(fcntl(sock, F_SETFL, opt | O_NONBLOCK) == -1){
995                 /* If either fcntl fails, don't want to allow this client to connect. */
996                 COMPAT_CLOSE(sock);
997                 return 1;
998         }
999 #else
1000         opt = 1;
1001         if(ioctlsocket(sock, FIONBIO, &opt)){
1002                 COMPAT_CLOSE(sock);
1003                 return 1;
1004         }
1005 #endif
1006         return 0;
1007 }
1008
1009
1010 #ifndef WITH_BROKER
1011 int _mosquitto_socketpair(int *pairR, int *pairW)
1012 {
1013 #ifdef WIN32
1014         int family[2] = {AF_INET, AF_INET6};
1015         int i;
1016         struct sockaddr_storage ss;
1017         struct sockaddr_in *sa = (struct sockaddr_in *)&ss;
1018         struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&ss;
1019         socklen_t ss_len;
1020         int spR, spW;
1021
1022         int listensock;
1023
1024         *pairR = -1;
1025         *pairW = -1;
1026
1027         for(i=0; i<2; i++){
1028                 memset(&ss, 0, sizeof(ss));
1029                 if(family[i] == AF_INET){
1030                         sa->sin_family = family[i];
1031                         sa->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1032                         sa->sin_port = 0;
1033                         ss_len = sizeof(struct sockaddr_in);
1034                 }else if(family[i] == AF_INET6){
1035                         sa6->sin6_family = family[i];
1036                         sa6->sin6_addr = in6addr_loopback;
1037                         sa6->sin6_port = 0;
1038                         ss_len = sizeof(struct sockaddr_in6);
1039                 }else{
1040                         return MOSQ_ERR_INVAL;
1041                 }
1042
1043                 listensock = socket(family[i], SOCK_STREAM, IPPROTO_TCP);
1044                 if(listensock == -1){
1045                         continue;
1046                 }
1047
1048                 if(bind(listensock, (struct sockaddr *)&ss, ss_len) == -1){
1049                         COMPAT_CLOSE(listensock);
1050                         continue;
1051                 }
1052
1053                 if(listen(listensock, 1) == -1){
1054                         COMPAT_CLOSE(listensock);
1055                         continue;
1056                 }
1057                 memset(&ss, 0, sizeof(ss));
1058                 ss_len = sizeof(ss);
1059                 if(getsockname(listensock, (struct sockaddr *)&ss, &ss_len) < 0){
1060                         COMPAT_CLOSE(listensock);
1061                         continue;
1062                 }
1063
1064                 if(_mosquitto_socket_nonblock(listensock)){
1065                         continue;
1066                 }
1067
1068                 if(family[i] == AF_INET){
1069                         sa->sin_family = family[i];
1070                         sa->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1071                         ss_len = sizeof(struct sockaddr_in);
1072                 }else if(family[i] == AF_INET6){
1073                         sa6->sin6_family = family[i];
1074                         sa6->sin6_addr = in6addr_loopback;
1075                         ss_len = sizeof(struct sockaddr_in6);
1076                 }
1077
1078                 spR = socket(family[i], SOCK_STREAM, IPPROTO_TCP);
1079                 if(spR == -1){
1080                         COMPAT_CLOSE(listensock);
1081                         continue;
1082                 }
1083                 if(_mosquitto_socket_nonblock(spR)){
1084                         COMPAT_CLOSE(listensock);
1085                         continue;
1086                 }
1087                 if(connect(spR, (struct sockaddr *)&ss, ss_len) < 0){
1088 #ifdef WIN32
1089                         errno = WSAGetLastError();
1090 #endif
1091                         if(errno != EINPROGRESS && errno != COMPAT_EWOULDBLOCK){
1092                                 COMPAT_CLOSE(spR);
1093                                 COMPAT_CLOSE(listensock);
1094                                 continue;
1095                         }
1096                 }
1097                 spW = accept(listensock, NULL, 0);
1098                 if(spW == -1){
1099 #ifdef WIN32
1100                         errno = WSAGetLastError();
1101 #endif
1102                         if(errno != EINPROGRESS && errno != COMPAT_EWOULDBLOCK){
1103                                 COMPAT_CLOSE(spR);
1104                                 COMPAT_CLOSE(listensock);
1105                                 continue;
1106                         }
1107                 }
1108
1109                 if(_mosquitto_socket_nonblock(spW)){
1110                         COMPAT_CLOSE(spR);
1111                         COMPAT_CLOSE(listensock);
1112                         continue;
1113                 }
1114                 COMPAT_CLOSE(listensock);
1115
1116                 *pairR = spR;
1117                 *pairW = spW;
1118                 return MOSQ_ERR_SUCCESS;
1119         }
1120         return MOSQ_ERR_UNKNOWN;
1121 #else
1122         int sv[2];
1123
1124         if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1){
1125                 return MOSQ_ERR_ERRNO;
1126         }
1127         if(_mosquitto_socket_nonblock(sv[0])){
1128                 COMPAT_CLOSE(sv[0]);
1129                 COMPAT_CLOSE(sv[1]);
1130                 return MOSQ_ERR_ERRNO;
1131         }
1132         if(_mosquitto_socket_nonblock(sv[1])){
1133                 COMPAT_CLOSE(sv[0]);
1134                 COMPAT_CLOSE(sv[1]);
1135                 return MOSQ_ERR_ERRNO;
1136         }
1137         *pairR = sv[0];
1138         *pairW = sv[1];
1139         return MOSQ_ERR_SUCCESS;
1140 #endif
1141 }
1142 #endif