517a485c46a68ddd10d9a2c3ec6031e1ecb7cc11
[platform/upstream/iotivity.git] / service / protocol-plugin / plugins / mqtt-fan / lib / net_mosq.c
1 /*
2 Copyright (c) 2009-2013 Roger Light <roger@atchoo.org>
3 All rights reserved.
4
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
7
8 1. Redistributions of source code must retain the above copyright notice,
9    this list of conditions and the following disclaimer.
10 2. Redistributions in binary form must reproduce the above copyright
11    notice, this list of conditions and the following disclaimer in the
12    documentation and/or other materials provided with the distribution.
13 3. Neither the name of mosquitto nor the names of its
14    contributors may be used to endorse or promote products derived from
15    this software without specific prior written permission.
16
17 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include <assert.h>
31 #include <errno.h>
32 #include <fcntl.h>
33 #include <stdio.h>
34 #include <string.h>
35 #ifndef WIN32
36 #include <netdb.h>
37 #include <sys/socket.h>
38 #include <unistd.h>
39 #else
40 #include <winsock2.h>
41 #include <ws2tcpip.h>
42 #endif
43
44 #ifdef __ANDROID__
45 #include <linux/in.h>
46 #include <linux/in6.h>
47 #include <sys/endian.h>
48 #endif
49
50 #ifdef __FreeBSD__
51 #  include <netinet/in.h>
52 #endif
53
54 #ifdef __SYMBIAN32__
55 #include <netinet/in.h>
56 #endif
57
58 #ifdef __QNX__
59 #ifndef AI_ADDRCONFIG
60 #define AI_ADDRCONFIG 0
61 #endif
62 #include <net/netbyte.h>
63 #include <netinet/in.h>
64 #endif
65
66 #ifdef WITH_TLS
67 #include <openssl/err.h>
68 #include <tls_mosq.h>
69 #endif
70
71 #ifdef WITH_BROKER
72 #  include <mosquitto_broker.h>
73 #  ifdef WITH_SYS_TREE
74    extern uint64_t g_bytes_received;
75    extern uint64_t g_bytes_sent;
76    extern unsigned long g_msgs_received;
77    extern unsigned long g_msgs_sent;
78    extern unsigned long g_pub_msgs_received;
79    extern unsigned long g_pub_msgs_sent;
80 #  endif
81 #else
82 #  include <read_handle.h>
83 #endif
84
85 #include "logging_mosq.h"
86 #include "memory_mosq.h"
87 #include "mqtt3_protocol.h"
88 #include "net_mosq.h"
89 #include "time_mosq.h"
90 #include "util_mosq.h"
91
92 #ifdef WITH_TLS
93 int tls_ex_index_mosq = -1;
94 #endif
95
96 void _mosquitto_net_init(void)
97 {
98 #ifdef WIN32
99         WSADATA wsaData;
100         WSAStartup(MAKEWORD(2,2), &wsaData);
101 #endif
102
103 #ifdef WITH_SRV
104         ares_library_init(ARES_LIB_INIT_ALL);
105 #endif
106
107 #ifdef WITH_TLS
108         SSL_load_error_strings();
109         SSL_library_init();
110         OpenSSL_add_all_algorithms();
111         if(tls_ex_index_mosq == -1){
112                 tls_ex_index_mosq = SSL_get_ex_new_index(0, "client context", NULL, NULL, NULL);
113         }
114 #endif
115 }
116
117 void _mosquitto_net_cleanup(void)
118 {
119 #ifdef WITH_TLS
120         ERR_free_strings();
121         EVP_cleanup();
122         CRYPTO_cleanup_all_ex_data();
123 #endif
124
125 #ifdef WITH_SRV
126         ares_library_cleanup();
127 #endif
128
129 #ifdef WIN32
130         WSACleanup();
131 #endif
132 }
133
134 void _mosquitto_packet_cleanup(struct _mosquitto_packet *packet)
135 {
136         if(!packet) return;
137
138         /* Free data and reset values */
139         packet->command = 0;
140         packet->have_remaining = 0;
141         packet->remaining_count = 0;
142         packet->remaining_mult = 1;
143         packet->remaining_length = 0;
144         if(packet->payload) _mosquitto_free(packet->payload);
145         packet->payload = NULL;
146         packet->to_process = 0;
147         packet->pos = 0;
148 }
149
150 int _mosquitto_packet_queue(struct mosquitto *mosq, struct _mosquitto_packet *packet)
151 {
152 #ifndef WITH_BROKER
153         char sockpair_data = 0;
154 #endif
155         assert(mosq);
156         assert(packet);
157
158         packet->pos = 0;
159         packet->to_process = packet->packet_length;
160
161         packet->next = NULL;
162         pthread_mutex_lock(&mosq->out_packet_mutex);
163         if(mosq->out_packet){
164                 mosq->out_packet_last->next = packet;
165         }else{
166                 mosq->out_packet = packet;
167         }
168         mosq->out_packet_last = packet;
169         pthread_mutex_unlock(&mosq->out_packet_mutex);
170 #ifdef WITH_BROKER
171         return _mosquitto_packet_write(mosq);
172 #else
173
174         /* Write a single byte to sockpairW (connected to sockpairR) to break out
175          * of select() if in threaded mode. */
176         if(mosq->sockpairW != INVALID_SOCKET){
177 #ifndef WIN32
178                 if(write(mosq->sockpairW, &sockpair_data, 1)){
179                 }
180 #else
181                 send(mosq->sockpairW, &sockpair_data, 1, 0);
182 #endif
183         }
184
185         if(mosq->in_callback == false && mosq->threaded == false){
186                 return _mosquitto_packet_write(mosq);
187         }else{
188                 return MOSQ_ERR_SUCCESS;
189         }
190 #endif
191 }
192
193 /* Close a socket associated with a context and set it to -1.
194  * Returns 1 on failure (context is NULL)
195  * Returns 0 on success.
196  */
197 int _mosquitto_socket_close(struct mosquitto *mosq)
198 {
199         int rc = 0;
200
201         assert(mosq);
202 #ifdef WITH_TLS
203         if(mosq->ssl){
204                 SSL_shutdown(mosq->ssl);
205                 SSL_free(mosq->ssl);
206                 mosq->ssl = NULL;
207         }
208         if(mosq->ssl_ctx){
209                 SSL_CTX_free(mosq->ssl_ctx);
210                 mosq->ssl_ctx = NULL;
211         }
212 #endif
213
214         if(mosq->sock != INVALID_SOCKET){
215                 rc = COMPAT_CLOSE(mosq->sock);
216                 mosq->sock = INVALID_SOCKET;
217         }
218
219         return rc;
220 }
221
222 #ifdef REAL_WITH_TLS_PSK
223 static unsigned int psk_client_callback(SSL *ssl, const char *hint,
224                 char *identity, unsigned int max_identity_len,
225                 unsigned char *psk, unsigned int max_psk_len)
226 {
227         struct mosquitto *mosq;
228         int len;
229
230         mosq = SSL_get_ex_data(ssl, tls_ex_index_mosq);
231         if(!mosq) return 0;
232
233         snprintf(identity, max_identity_len, "%s", mosq->tls_psk_identity);
234
235         len = _mosquitto_hex2bin(mosq->tls_psk, psk, max_psk_len);
236         if (len < 0) return 0;
237         return len;
238 }
239 #endif
240
241 int _mosquitto_try_connect(const char *host, uint16_t port, int *sock, const char *bind_address, bool blocking)
242 {
243         struct addrinfo hints;
244         struct addrinfo *ainfo, *rp;
245         struct addrinfo *ainfo_bind, *rp_bind;
246         int s;
247         int rc;
248 #ifdef WIN32
249         uint32_t val = 1;
250 #endif
251
252         *sock = INVALID_SOCKET;
253         memset(&hints, 0, sizeof(struct addrinfo));
254         hints.ai_family = PF_UNSPEC;
255         hints.ai_flags = AI_ADDRCONFIG;
256         hints.ai_socktype = SOCK_STREAM;
257
258         s = getaddrinfo(host, NULL, &hints, &ainfo);
259         if(s){
260                 errno = s;
261                 return MOSQ_ERR_EAI;
262         }
263
264         if(bind_address){
265                 s = getaddrinfo(bind_address, NULL, &hints, &ainfo_bind);
266                 if(s){
267                         freeaddrinfo(ainfo);
268                         errno = s;
269                         return MOSQ_ERR_EAI;
270                 }
271         }
272
273         for(rp = ainfo; rp != NULL; rp = rp->ai_next){
274                 *sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
275                 if(*sock == INVALID_SOCKET) continue;
276                 
277                 if(rp->ai_family == PF_INET){
278                         ((struct sockaddr_in *)rp->ai_addr)->sin_port = htons(port);
279                 }else if(rp->ai_family == PF_INET6){
280                         ((struct sockaddr_in6 *)rp->ai_addr)->sin6_port = htons(port);
281                 }else{
282                         continue;
283                 }
284
285                 if(bind_address){
286                         for(rp_bind = ainfo_bind; rp_bind != NULL; rp_bind = rp_bind->ai_next){
287                                 if(bind(*sock, rp_bind->ai_addr, rp_bind->ai_addrlen) == 0){
288                                         break;
289                                 }
290                         }
291                         if(!rp_bind){
292                                 COMPAT_CLOSE(*sock);
293                                 continue;
294                         }
295                 }
296
297                 if(!blocking){
298                         /* Set non-blocking */
299                         if(_mosquitto_socket_nonblock(*sock)){
300                                 COMPAT_CLOSE(*sock);
301                                 continue;
302                         }
303                 }
304
305                 rc = connect(*sock, rp->ai_addr, rp->ai_addrlen);
306 #ifdef WIN32
307                 errno = WSAGetLastError();
308 #endif
309                 if(rc == 0 || errno == EINPROGRESS || errno == COMPAT_EWOULDBLOCK){
310                         if(blocking){
311                                 /* Set non-blocking */
312                                 if(_mosquitto_socket_nonblock(*sock)){
313                                         COMPAT_CLOSE(*sock);
314                                         continue;
315                                 }
316                         }
317                         break;
318                 }
319
320                 COMPAT_CLOSE(*sock);
321                 *sock = INVALID_SOCKET;
322         }
323         freeaddrinfo(ainfo);
324         if(bind_address){
325                 freeaddrinfo(ainfo_bind);
326         }
327         if(!rp){
328                 return MOSQ_ERR_ERRNO;
329         }
330         return MOSQ_ERR_SUCCESS;
331 }
332
333 /* Create a socket and connect it to 'ip' on port 'port'.
334  * Returns -1 on failure (ip is NULL, socket creation/connection error)
335  * Returns sock number on success.
336  */
337 int _mosquitto_socket_connect(struct mosquitto *mosq, const char *host, uint16_t port, const char *bind_address, bool blocking)
338 {
339         int sock = INVALID_SOCKET;
340         int rc;
341 #ifdef WITH_TLS
342         int ret;
343         BIO *bio;
344 #endif
345
346         if(!mosq || !host || !port) return MOSQ_ERR_INVAL;
347
348 #ifdef WITH_TLS
349         if(mosq->tls_cafile || mosq->tls_capath || mosq->tls_psk){
350                 blocking = true;
351         }
352 #endif
353
354         rc = _mosquitto_try_connect(host, port, &sock, bind_address, blocking);
355         if(rc != MOSQ_ERR_SUCCESS) return rc;
356
357 #ifdef WITH_TLS
358         if(mosq->tls_cafile || mosq->tls_capath || mosq->tls_psk){
359 #if OPENSSL_VERSION_NUMBER >= 0x10001000L
360                 if(!mosq->tls_version || !strcmp(mosq->tls_version, "tlsv1.2")){
361                         mosq->ssl_ctx = SSL_CTX_new(TLSv1_2_client_method());
362                 }else if(!strcmp(mosq->tls_version, "tlsv1.1")){
363                         mosq->ssl_ctx = SSL_CTX_new(TLSv1_1_client_method());
364                 }else if(!strcmp(mosq->tls_version, "tlsv1")){
365                         mosq->ssl_ctx = SSL_CTX_new(TLSv1_client_method());
366                 }else{
367                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Protocol %s not supported.", mosq->tls_version);
368                         COMPAT_CLOSE(sock);
369                         return MOSQ_ERR_INVAL;
370                 }
371 #else
372                 if(!mosq->tls_version || !strcmp(mosq->tls_version, "tlsv1")){
373                         mosq->ssl_ctx = SSL_CTX_new(TLSv1_client_method());
374                 }else{
375                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Protocol %s not supported.", mosq->tls_version);
376                         COMPAT_CLOSE(sock);
377                         return MOSQ_ERR_INVAL;
378                 }
379 #endif
380                 if(!mosq->ssl_ctx){
381                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to create TLS context.");
382                         COMPAT_CLOSE(sock);
383                         return MOSQ_ERR_TLS;
384                 }
385
386 #if OPENSSL_VERSION_NUMBER >= 0x10000000
387                 /* Disable compression */
388                 SSL_CTX_set_options(mosq->ssl_ctx, SSL_OP_NO_COMPRESSION);
389 #endif
390 #ifdef SSL_MODE_RELEASE_BUFFERS
391                         /* Use even less memory per SSL connection. */
392                         SSL_CTX_set_mode(mosq->ssl_ctx, SSL_MODE_RELEASE_BUFFERS);
393 #endif
394
395                 if(mosq->tls_ciphers){
396                         ret = SSL_CTX_set_cipher_list(mosq->ssl_ctx, mosq->tls_ciphers);
397                         if(ret == 0){
398                                 _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to set TLS ciphers. Check cipher list \"%s\".", mosq->tls_ciphers);
399                                 COMPAT_CLOSE(sock);
400                                 return MOSQ_ERR_TLS;
401                         }
402                 }
403                 if(mosq->tls_cafile || mosq->tls_capath){
404                         ret = SSL_CTX_load_verify_locations(mosq->ssl_ctx, mosq->tls_cafile, mosq->tls_capath);
405                         if(ret == 0){
406 #ifdef WITH_BROKER
407                                 if(mosq->tls_cafile && mosq->tls_capath){
408                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check bridge_cafile \"%s\" and bridge_capath \"%s\".", mosq->tls_cafile, mosq->tls_capath);
409                                 }else if(mosq->tls_cafile){
410                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check bridge_cafile \"%s\".", mosq->tls_cafile);
411                                 }else{
412                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check bridge_capath \"%s\".", mosq->tls_capath);
413                                 }
414 #else
415                                 if(mosq->tls_cafile && mosq->tls_capath){
416                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check cafile \"%s\" and capath \"%s\".", mosq->tls_cafile, mosq->tls_capath);
417                                 }else if(mosq->tls_cafile){
418                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check cafile \"%s\".", mosq->tls_cafile);
419                                 }else{
420                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load CA certificates, check capath \"%s\".", mosq->tls_capath);
421                                 }
422 #endif
423                                 COMPAT_CLOSE(sock);
424                                 return MOSQ_ERR_TLS;
425                         }
426                         if(mosq->tls_cert_reqs == 0){
427                                 SSL_CTX_set_verify(mosq->ssl_ctx, SSL_VERIFY_NONE, NULL);
428                         }else{
429                                 SSL_CTX_set_verify(mosq->ssl_ctx, SSL_VERIFY_PEER, _mosquitto_server_certificate_verify);
430                         }
431
432                         if(mosq->tls_pw_callback){
433                                 SSL_CTX_set_default_passwd_cb(mosq->ssl_ctx, mosq->tls_pw_callback);
434                                 SSL_CTX_set_default_passwd_cb_userdata(mosq->ssl_ctx, mosq);
435                         }
436
437                         if(mosq->tls_certfile){
438                                 ret = SSL_CTX_use_certificate_chain_file(mosq->ssl_ctx, mosq->tls_certfile);
439                                 if(ret != 1){
440 #ifdef WITH_BROKER
441                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client certificate, check bridge_certfile \"%s\".", mosq->tls_certfile);
442 #else
443                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client certificate \"%s\".", mosq->tls_certfile);
444 #endif
445                                         COMPAT_CLOSE(sock);
446                                         return MOSQ_ERR_TLS;
447                                 }
448                         }
449                         if(mosq->tls_keyfile){
450                                 ret = SSL_CTX_use_PrivateKey_file(mosq->ssl_ctx, mosq->tls_keyfile, SSL_FILETYPE_PEM);
451                                 if(ret != 1){
452 #ifdef WITH_BROKER
453                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client key file, check bridge_keyfile \"%s\".", mosq->tls_keyfile);
454 #else
455                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unable to load client key file \"%s\".", mosq->tls_keyfile);
456 #endif
457                                         COMPAT_CLOSE(sock);
458                                         return MOSQ_ERR_TLS;
459                                 }
460                                 ret = SSL_CTX_check_private_key(mosq->ssl_ctx);
461                                 if(ret != 1){
462                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Client certificate/key are inconsistent.");
463                                         COMPAT_CLOSE(sock);
464                                         return MOSQ_ERR_TLS;
465                                 }
466                         }
467 #ifdef REAL_WITH_TLS_PSK
468                 }else if(mosq->tls_psk){
469                         SSL_CTX_set_psk_client_callback(mosq->ssl_ctx, psk_client_callback);
470 #endif
471                 }
472
473                 mosq->ssl = SSL_new(mosq->ssl_ctx);
474                 if(!mosq->ssl){
475                         COMPAT_CLOSE(sock);
476                         return MOSQ_ERR_TLS;
477                 }
478                 SSL_set_ex_data(mosq->ssl, tls_ex_index_mosq, mosq);
479                 bio = BIO_new_socket(sock, BIO_NOCLOSE);
480                 if(!bio){
481                         COMPAT_CLOSE(sock);
482                         return MOSQ_ERR_TLS;
483                 }
484                 SSL_set_bio(mosq->ssl, bio, bio);
485
486                 ret = SSL_connect(mosq->ssl);
487                 if(ret != 1){
488                         ret = SSL_get_error(mosq->ssl, ret);
489                         if(ret == SSL_ERROR_WANT_READ){
490                                 /* We always try to read anyway */
491                         }else if(ret == SSL_ERROR_WANT_WRITE){
492                                 mosq->want_write = true;
493                         }else{
494                                 COMPAT_CLOSE(sock);
495                                 return MOSQ_ERR_TLS;
496                         }
497                 }
498         }
499 #endif
500
501         mosq->sock = sock;
502
503         return MOSQ_ERR_SUCCESS;
504 }
505
506 int _mosquitto_read_byte(struct _mosquitto_packet *packet, uint8_t *byte)
507 {
508         assert(packet);
509         if(packet->pos+1 > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
510
511         *byte = packet->payload[packet->pos];
512         packet->pos++;
513
514         return MOSQ_ERR_SUCCESS;
515 }
516
517 void _mosquitto_write_byte(struct _mosquitto_packet *packet, uint8_t byte)
518 {
519         assert(packet);
520         assert(packet->pos+1 <= packet->packet_length);
521
522         packet->payload[packet->pos] = byte;
523         packet->pos++;
524 }
525
526 int _mosquitto_read_bytes(struct _mosquitto_packet *packet, void *bytes, uint32_t count)
527 {
528         assert(packet);
529         if(packet->pos+count > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
530
531         memcpy(bytes, &(packet->payload[packet->pos]), count);
532         packet->pos += count;
533
534         return MOSQ_ERR_SUCCESS;
535 }
536
537 void _mosquitto_write_bytes(struct _mosquitto_packet *packet, const void *bytes, uint32_t count)
538 {
539         assert(packet);
540         assert(packet->pos+count <= packet->packet_length);
541
542         memcpy(&(packet->payload[packet->pos]), bytes, count);
543         packet->pos += count;
544 }
545
546 int _mosquitto_read_string(struct _mosquitto_packet *packet, char **str)
547 {
548         uint16_t len;
549         int rc;
550
551         assert(packet);
552         rc = _mosquitto_read_uint16(packet, &len);
553         if(rc) return rc;
554
555         if(packet->pos+len > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
556
557         *str = _mosquitto_calloc(len+1, sizeof(char));
558         if(*str){
559                 memcpy(*str, &(packet->payload[packet->pos]), len);
560                 packet->pos += len;
561         }else{
562                 return MOSQ_ERR_NOMEM;
563         }
564
565         return MOSQ_ERR_SUCCESS;
566 }
567
568 void _mosquitto_write_string(struct _mosquitto_packet *packet, const char *str, uint16_t length)
569 {
570         assert(packet);
571         _mosquitto_write_uint16(packet, length);
572         _mosquitto_write_bytes(packet, str, length);
573 }
574
575 int _mosquitto_read_uint16(struct _mosquitto_packet *packet, uint16_t *word)
576 {
577         uint8_t msb, lsb;
578
579         assert(packet);
580         if(packet->pos+2 > packet->remaining_length) return MOSQ_ERR_PROTOCOL;
581
582         msb = packet->payload[packet->pos];
583         packet->pos++;
584         lsb = packet->payload[packet->pos];
585         packet->pos++;
586
587         *word = (msb<<8) + lsb;
588
589         return MOSQ_ERR_SUCCESS;
590 }
591
592 void _mosquitto_write_uint16(struct _mosquitto_packet *packet, uint16_t word)
593 {
594         _mosquitto_write_byte(packet, MOSQ_MSB(word));
595         _mosquitto_write_byte(packet, MOSQ_LSB(word));
596 }
597
598 ssize_t _mosquitto_net_read(struct mosquitto *mosq, void *buf, size_t count)
599 {
600 #ifdef WITH_TLS
601         int ret;
602         int err;
603         char ebuf[256];
604         unsigned long e;
605 #endif
606         assert(mosq);
607         errno = 0;
608 #ifdef WITH_TLS
609         if(mosq->ssl){
610                 ret = SSL_read(mosq->ssl, buf, count);
611                 if(ret <= 0){
612                         err = SSL_get_error(mosq->ssl, ret);
613                         if(err == SSL_ERROR_WANT_READ){
614                                 ret = -1;
615                                 errno = EAGAIN;
616                         }else if(err == SSL_ERROR_WANT_WRITE){
617                                 ret = -1;
618                                 mosq->want_write = true;
619                                 errno = EAGAIN;
620                         }else{
621                                 e = ERR_get_error();
622                                 while(e){
623                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "OpenSSL Error: %s", ERR_error_string(e, ebuf));
624                                         e = ERR_get_error();
625                                 }
626                                 errno = EPROTO;
627                         }
628                 }
629                 return (ssize_t )ret;
630         }else{
631                 /* Call normal read/recv */
632
633 #endif
634
635 #ifndef WIN32
636         return read(mosq->sock, buf, count);
637 #else
638         return recv(mosq->sock, buf, count, 0);
639 #endif
640
641 #ifdef WITH_TLS
642         }
643 #endif
644 }
645
646 ssize_t _mosquitto_net_write(struct mosquitto *mosq, void *buf, size_t count)
647 {
648 #ifdef WITH_TLS
649         int ret;
650         int err;
651         char ebuf[256];
652         unsigned long e;
653 #endif
654         assert(mosq);
655
656         errno = 0;
657 #ifdef WITH_TLS
658         if(mosq->ssl){
659                 ret = SSL_write(mosq->ssl, buf, count);
660                 if(ret < 0){
661                         err = SSL_get_error(mosq->ssl, ret);
662                         if(err == SSL_ERROR_WANT_READ){
663                                 ret = -1;
664                                 errno = EAGAIN;
665                         }else if(err == SSL_ERROR_WANT_WRITE){
666                                 ret = -1;
667                                 mosq->want_write = true;
668                                 errno = EAGAIN;
669                         }else{
670                                 e = ERR_get_error();
671                                 while(e){
672                                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "OpenSSL Error: %s", ERR_error_string(e, ebuf));
673                                         e = ERR_get_error();
674                                 }
675                                 errno = EPROTO;
676                         }
677                 }
678                 return (ssize_t )ret;
679         }else{
680                 /* Call normal write/send */
681 #endif
682
683 #ifndef WIN32
684         return write(mosq->sock, buf, count);
685 #else
686         return send(mosq->sock, buf, count, 0);
687 #endif
688
689 #ifdef WITH_TLS
690         }
691 #endif
692 }
693
694 int _mosquitto_packet_write(struct mosquitto *mosq)
695 {
696         ssize_t write_length;
697         struct _mosquitto_packet *packet;
698
699         if(!mosq) return MOSQ_ERR_INVAL;
700         if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
701
702         pthread_mutex_lock(&mosq->current_out_packet_mutex);
703         pthread_mutex_lock(&mosq->out_packet_mutex);
704         if(mosq->out_packet && !mosq->current_out_packet){
705                 mosq->current_out_packet = mosq->out_packet;
706                 mosq->out_packet = mosq->out_packet->next;
707                 if(!mosq->out_packet){
708                         mosq->out_packet_last = NULL;
709                 }
710         }
711         pthread_mutex_unlock(&mosq->out_packet_mutex);
712
713         while(mosq->current_out_packet){
714                 packet = mosq->current_out_packet;
715
716                 while(packet->to_process > 0){
717                         write_length = _mosquitto_net_write(mosq, &(packet->payload[packet->pos]), packet->to_process);
718                         if(write_length > 0){
719 #if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
720                                 g_bytes_sent += write_length;
721 #endif
722                                 packet->to_process -= write_length;
723                                 packet->pos += write_length;
724                         }else{
725 #ifdef WIN32
726                                 errno = WSAGetLastError();
727 #endif
728                                 if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
729                                         pthread_mutex_unlock(&mosq->current_out_packet_mutex);
730                                         return MOSQ_ERR_SUCCESS;
731                                 }else{
732                                         pthread_mutex_unlock(&mosq->current_out_packet_mutex);
733                                         switch(errno){
734                                                 case COMPAT_ECONNRESET:
735                                                         return MOSQ_ERR_CONN_LOST;
736                                                 default:
737                                                         return MOSQ_ERR_ERRNO;
738                                         }
739                                 }
740                         }
741                 }
742
743 #ifdef WITH_BROKER
744 #  ifdef WITH_SYS_TREE
745                 g_msgs_sent++;
746                 if(((packet->command)&0xF6) == PUBLISH){
747                         g_pub_msgs_sent++;
748                 }
749 #  endif
750 #else
751                 if(((packet->command)&0xF6) == PUBLISH){
752                         pthread_mutex_lock(&mosq->callback_mutex);
753                         if(mosq->on_publish){
754                                 /* This is a QoS=0 message */
755                                 mosq->in_callback = true;
756                                 mosq->on_publish(mosq, mosq->userdata, packet->mid);
757                                 mosq->in_callback = false;
758                         }
759                         pthread_mutex_unlock(&mosq->callback_mutex);
760                 }else if(((packet->command)&0xF0) == DISCONNECT){
761                         /* FIXME what cleanup needs doing here? 
762                          * incoming/outgoing messages? */
763                         _mosquitto_socket_close(mosq);
764
765                         /* Start of duplicate, possibly unnecessary code.
766                          * This does leave things in a consistent state at least. */
767                         /* Free data and reset values */
768                         pthread_mutex_lock(&mosq->out_packet_mutex);
769                         mosq->current_out_packet = mosq->out_packet;
770                         if(mosq->out_packet){
771                                 mosq->out_packet = mosq->out_packet->next;
772                                 if(!mosq->out_packet){
773                                         mosq->out_packet_last = NULL;
774                                 }
775                         }
776                         pthread_mutex_unlock(&mosq->out_packet_mutex);
777
778                         _mosquitto_packet_cleanup(packet);
779                         _mosquitto_free(packet);
780
781                         pthread_mutex_lock(&mosq->msgtime_mutex);
782                         mosq->last_msg_out = mosquitto_time();
783                         pthread_mutex_unlock(&mosq->msgtime_mutex);
784                         /* End of duplicate, possibly unnecessary code */
785
786                         pthread_mutex_lock(&mosq->callback_mutex);
787                         if(mosq->on_disconnect){
788                                 mosq->in_callback = true;
789                                 mosq->on_disconnect(mosq, mosq->userdata, 0);
790                                 mosq->in_callback = false;
791                         }
792                         pthread_mutex_unlock(&mosq->current_out_packet_mutex);
793                         return MOSQ_ERR_SUCCESS;
794                 }
795 #endif
796
797                 /* Free data and reset values */
798                 pthread_mutex_lock(&mosq->out_packet_mutex);
799                 mosq->current_out_packet = mosq->out_packet;
800                 if(mosq->out_packet){
801                         mosq->out_packet = mosq->out_packet->next;
802                         if(!mosq->out_packet){
803                                 mosq->out_packet_last = NULL;
804                         }
805                 }
806                 pthread_mutex_unlock(&mosq->out_packet_mutex);
807
808                 _mosquitto_packet_cleanup(packet);
809                 _mosquitto_free(packet);
810
811                 pthread_mutex_lock(&mosq->msgtime_mutex);
812                 mosq->last_msg_out = mosquitto_time();
813                 pthread_mutex_unlock(&mosq->msgtime_mutex);
814         }
815         pthread_mutex_unlock(&mosq->current_out_packet_mutex);
816         return MOSQ_ERR_SUCCESS;
817 }
818
819 #ifdef WITH_BROKER
820 int _mosquitto_packet_read(struct mosquitto_db *db, struct mosquitto *mosq)
821 #else
822 int _mosquitto_packet_read(struct mosquitto *mosq)
823 #endif
824 {
825         uint8_t byte;
826         ssize_t read_length;
827         int rc = 0;
828
829         if(!mosq) return MOSQ_ERR_INVAL;
830         if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
831         /* This gets called if pselect() indicates that there is network data
832          * available - ie. at least one byte.  What we do depends on what data we
833          * already have.
834          * If we've not got a command, attempt to read one and save it. This should
835          * always work because it's only a single byte.
836          * Then try to read the remaining length. This may fail because it is may
837          * be more than one byte - will need to save data pending next read if it
838          * does fail.
839          * Then try to read the remaining payload, where 'payload' here means the
840          * combined variable header and actual payload. This is the most likely to
841          * fail due to longer length, so save current data and current position.
842          * After all data is read, send to _mosquitto_handle_packet() to deal with.
843          * Finally, free the memory and reset everything to starting conditions.
844          */
845         if(!mosq->in_packet.command){
846                 read_length = _mosquitto_net_read(mosq, &byte, 1);
847                 if(read_length == 1){
848                         mosq->in_packet.command = byte;
849 #ifdef WITH_BROKER
850 #  ifdef WITH_SYS_TREE
851                         g_bytes_received++;
852 #  endif
853                         /* Clients must send CONNECT as their first command. */
854                         if(!(mosq->bridge) && mosq->state == mosq_cs_new && (byte&0xF0) != CONNECT) return MOSQ_ERR_PROTOCOL;
855 #endif
856                 }else{
857                         if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */
858 #ifdef WIN32
859                         errno = WSAGetLastError();
860 #endif
861                         if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
862                                 return MOSQ_ERR_SUCCESS;
863                         }else{
864                                 switch(errno){
865                                         case COMPAT_ECONNRESET:
866                                                 return MOSQ_ERR_CONN_LOST;
867                                         default:
868                                                 return MOSQ_ERR_ERRNO;
869                                 }
870                         }
871                 }
872         }
873         if(!mosq->in_packet.have_remaining){
874                 do{
875                         read_length = _mosquitto_net_read(mosq, &byte, 1);
876                         if(read_length == 1){
877                                 mosq->in_packet.remaining_count++;
878                                 /* Max 4 bytes length for remaining length as defined by protocol.
879                                  * Anything more likely means a broken/malicious client.
880                                  */
881                                 if(mosq->in_packet.remaining_count > 4) return MOSQ_ERR_PROTOCOL;
882
883 #if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
884                                 g_bytes_received++;
885 #endif
886                                 mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;
887                                 mosq->in_packet.remaining_mult *= 128;
888                         }else{
889                                 if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */
890 #ifdef WIN32
891                                 errno = WSAGetLastError();
892 #endif
893                                 if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
894                                         return MOSQ_ERR_SUCCESS;
895                                 }else{
896                                         switch(errno){
897                                                 case COMPAT_ECONNRESET:
898                                                         return MOSQ_ERR_CONN_LOST;
899                                                 default:
900                                                         return MOSQ_ERR_ERRNO;
901                                         }
902                                 }
903                         }
904                 }while((byte & 128) != 0);
905
906                 if(mosq->in_packet.remaining_length > 0){
907                         mosq->in_packet.payload = _mosquitto_malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));
908                         if(!mosq->in_packet.payload) return MOSQ_ERR_NOMEM;
909                         mosq->in_packet.to_process = mosq->in_packet.remaining_length;
910                 }
911                 mosq->in_packet.have_remaining = 1;
912         }
913         while(mosq->in_packet.to_process>0){
914                 read_length = _mosquitto_net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
915                 if(read_length > 0){
916 #if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
917                         g_bytes_received += read_length;
918 #endif
919                         mosq->in_packet.to_process -= read_length;
920                         mosq->in_packet.pos += read_length;
921                 }else{
922 #ifdef WIN32
923                         errno = WSAGetLastError();
924 #endif
925                         if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
926                                 if(mosq->in_packet.to_process > 1000){
927                                         /* Update last_msg_in time if more than 1000 bytes left to
928                                          * receive. Helps when receiving large messages.
929                                          * This is an arbitrary limit, but with some consideration.
930                                          * If a client can't send 1000 bytes in a second it
931                                          * probably shouldn't be using a 1 second keep alive. */
932                                         pthread_mutex_lock(&mosq->msgtime_mutex);
933                                         mosq->last_msg_in = mosquitto_time();
934                                         pthread_mutex_unlock(&mosq->msgtime_mutex);
935                                 }
936                                 return MOSQ_ERR_SUCCESS;
937                         }else{
938                                 switch(errno){
939                                         case COMPAT_ECONNRESET:
940                                                 return MOSQ_ERR_CONN_LOST;
941                                         default:
942                                                 return MOSQ_ERR_ERRNO;
943                                 }
944                         }
945                 }
946         }
947
948         /* All data for this packet is read. */
949         mosq->in_packet.pos = 0;
950 #ifdef WITH_BROKER
951 #  ifdef WITH_SYS_TREE
952         g_msgs_received++;
953         if(((mosq->in_packet.command)&0xF5) == PUBLISH){
954                 g_pub_msgs_received++;
955         }
956 #  endif
957         rc = mqtt3_packet_handle(db, mosq);
958 #else
959         rc = _mosquitto_packet_handle(mosq);
960 #endif
961
962         /* Free data and reset values */
963         _mosquitto_packet_cleanup(&mosq->in_packet);
964
965         pthread_mutex_lock(&mosq->msgtime_mutex);
966         mosq->last_msg_in = mosquitto_time();
967         pthread_mutex_unlock(&mosq->msgtime_mutex);
968         return rc;
969 }
970
971 int _mosquitto_socket_nonblock(int sock)
972 {
973 #ifndef WIN32
974         int opt;
975         /* Set non-blocking */
976         opt = fcntl(sock, F_GETFL, 0);
977         if(opt == -1){
978                 COMPAT_CLOSE(sock);
979                 return 1;
980         }
981         if(fcntl(sock, F_SETFL, opt | O_NONBLOCK) == -1){
982                 /* If either fcntl fails, don't want to allow this client to connect. */
983                 COMPAT_CLOSE(sock);
984                 return 1;
985         }
986 #else
987         opt = 1;
988         if(ioctlsocket(sock, FIONBIO, &opt)){
989                 COMPAT_CLOSE(sock);
990                 return 1;
991         }
992 #endif
993         return 0;
994 }
995
996
997 #ifndef WITH_BROKER
998 int _mosquitto_socketpair(int *pairR, int *pairW)
999 {
1000 #ifdef WIN32
1001         int family[2] = {AF_INET, AF_INET6};
1002         int i;
1003         struct sockaddr_storage ss;
1004         struct sockaddr_in *sa = (struct sockaddr_in *)&ss;
1005         struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&ss;
1006         socklen_t ss_len;
1007         int spR, spW;
1008
1009         int listensock;
1010
1011         *pairR = -1;
1012         *pairW = -1;
1013
1014         for(i=0; i<2; i++){
1015                 memset(&ss, 0, sizeof(ss));
1016                 if(family[i] == AF_INET){
1017                         sa->sin_family = family[i];
1018                         sa->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1019                         sa->sin_port = 0;
1020                         ss_len = sizeof(struct sockaddr_in);
1021                 }else if(family[i] == AF_INET6){
1022                         sa6->sin6_family = family[i];
1023                         sa6->sin6_addr = in6addr_loopback;
1024                         sa6->sin6_port = 0;
1025                         ss_len = sizeof(struct sockaddr_in6);
1026                 }else{
1027                         return MOSQ_ERR_INVAL;
1028                 }
1029
1030                 listensock = socket(family[i], SOCK_STREAM, IPPROTO_TCP);
1031                 if(listensock == -1){
1032                         continue;
1033                 }
1034
1035                 if(bind(listensock, (struct sockaddr *)&ss, ss_len) == -1){
1036                         COMPAT_CLOSE(listensock);
1037                         continue;
1038                 }
1039
1040                 if(listen(listensock, 1) == -1){
1041                         COMPAT_CLOSE(listensock);
1042                         continue;
1043                 }
1044                 memset(&ss, 0, sizeof(ss));
1045                 ss_len = sizeof(ss);
1046                 if(getsockname(listensock, (struct sockaddr *)&ss, &ss_len) < 0){
1047                         COMPAT_CLOSE(listensock);
1048                         continue;
1049                 }
1050
1051                 if(_mosquitto_socket_nonblock(listensock)){
1052                         continue;
1053                 }
1054
1055                 if(family[i] == AF_INET){
1056                         sa->sin_family = family[i];
1057                         sa->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1058                         ss_len = sizeof(struct sockaddr_in);
1059                 }else if(family[i] == AF_INET6){
1060                         sa6->sin6_family = family[i];
1061                         sa6->sin6_addr = in6addr_loopback;
1062                         ss_len = sizeof(struct sockaddr_in6);
1063                 }
1064
1065                 spR = socket(family[i], SOCK_STREAM, IPPROTO_TCP);
1066                 if(spR == -1){
1067                         COMPAT_CLOSE(listensock);
1068                         continue;
1069                 }
1070                 if(_mosquitto_socket_nonblock(spR)){
1071                         COMPAT_CLOSE(listensock);
1072                         continue;
1073                 }
1074                 if(connect(spR, (struct sockaddr *)&ss, ss_len) < 0){
1075 #ifdef WIN32
1076                         errno = WSAGetLastError();
1077 #endif
1078                         if(errno != EINPROGRESS && errno != COMPAT_EWOULDBLOCK){
1079                                 COMPAT_CLOSE(spR);
1080                                 COMPAT_CLOSE(listensock);
1081                                 continue;
1082                         }
1083                 }
1084                 spW = accept(listensock, NULL, 0);
1085                 if(spW == -1){
1086 #ifdef WIN32
1087                         errno = WSAGetLastError();
1088 #endif
1089                         if(errno != EINPROGRESS && errno != COMPAT_EWOULDBLOCK){
1090                                 COMPAT_CLOSE(spR);
1091                                 COMPAT_CLOSE(listensock);
1092                                 continue;
1093                         }
1094                 }
1095
1096                 if(_mosquitto_socket_nonblock(spW)){
1097                         COMPAT_CLOSE(spR);
1098                         COMPAT_CLOSE(listensock);
1099                         continue;
1100                 }
1101                 COMPAT_CLOSE(listensock);
1102
1103                 *pairR = spR;
1104                 *pairW = spW;
1105                 return MOSQ_ERR_SUCCESS;
1106         }
1107         return MOSQ_ERR_UNKNOWN;
1108 #else
1109         int sv[2];
1110
1111         if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1){
1112                 return MOSQ_ERR_ERRNO;
1113         }
1114         if(_mosquitto_socket_nonblock(sv[0])){
1115                 COMPAT_CLOSE(sv[0]);
1116                 COMPAT_CLOSE(sv[1]);
1117                 return MOSQ_ERR_ERRNO;
1118         }
1119         if(_mosquitto_socket_nonblock(sv[1])){
1120                 COMPAT_CLOSE(sv[0]);
1121                 COMPAT_CLOSE(sv[1]);
1122                 return MOSQ_ERR_ERRNO;
1123         }
1124         *pairR = sv[0];
1125         *pairW = sv[1];
1126         return MOSQ_ERR_SUCCESS;
1127 #endif
1128 }
1129 #endif