2 Copyright (c) 2010-2013 Roger Light <roger@atchoo.org>
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
8 1. Redistributions of source code must retain the above copyright notice,
9 this list of conditions and the following disclaimer.
10 2. Redistributions in binary form must reproduce the above copyright
11 notice, this list of conditions and the following disclaimer in the
12 documentation and/or other materials provided with the distribution.
13 3. Neither the name of mosquitto nor the names of its
14 contributors may be used to endorse or promote products derived from
15 this software without specific prior written permission.
17 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 POSSIBILITY OF SUCH DAMAGE.
36 #include <sys/select.h>
45 #include "mosquitto.h"
46 #include "mosquitto_internal.h"
47 #include "logging_mosq.h"
48 #include "messages_mosq.h"
49 #include "memory_mosq.h"
50 #include "mqtt3_protocol.h"
52 #include "read_handle.h"
53 #include "send_mosq.h"
54 #include "time_mosq.h"
56 #include "util_mosq.h"
57 #include "will_mosq.h"
59 #if !defined(WIN32) && !defined(__SYMBIAN32__)
63 void _mosquitto_destroy(struct mosquitto *mosq);
64 static int _mosquitto_reconnect(struct mosquitto *mosq, bool blocking);
65 static int _mosquitto_connect_init(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address);
67 int mosquitto_lib_version(int *major, int *minor, int *revision)
69 if(major) *major = LIBMOSQUITTO_MAJOR;
70 if(minor) *minor = LIBMOSQUITTO_MINOR;
71 if(revision) *revision = LIBMOSQUITTO_REVISION;
72 return LIBMOSQUITTO_VERSION_NUMBER;
75 int mosquitto_lib_init(void)
78 srand(GetTickCount());
82 gettimeofday(&tv, NULL);
83 srand(tv.tv_sec*1000 + tv.tv_usec/1000);
86 _mosquitto_net_init();
88 return MOSQ_ERR_SUCCESS;
91 int mosquitto_lib_cleanup(void)
93 _mosquitto_net_cleanup();
95 return MOSQ_ERR_SUCCESS;
98 struct mosquitto *mosquitto_new(const char *id, bool clean_session, void *userdata)
100 struct mosquitto *mosq = NULL;
103 if(clean_session == false && id == NULL){
109 signal(SIGPIPE, SIG_IGN);
112 mosq = (struct mosquitto *)_mosquitto_calloc(1, sizeof(struct mosquitto));
114 mosq->sock = INVALID_SOCKET;
115 mosq->sockpairR = INVALID_SOCKET;
116 mosq->sockpairW = INVALID_SOCKET;
117 #ifdef WITH_THREADING
118 mosq->thread_id = pthread_self();
120 rc = mosquitto_reinitialise(mosq, id, clean_session, userdata);
122 mosquitto_destroy(mosq);
123 if(rc == MOSQ_ERR_INVAL){
125 }else if(rc == MOSQ_ERR_NOMEM){
136 int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_session, void *userdata)
140 if(!mosq) return MOSQ_ERR_INVAL;
142 if(clean_session == false && id == NULL){
143 return MOSQ_ERR_INVAL;
146 _mosquitto_destroy(mosq);
147 memset(mosq, 0, sizeof(struct mosquitto));
150 mosq->userdata = userdata;
152 mosq->userdata = mosq;
154 mosq->sock = INVALID_SOCKET;
155 mosq->sockpairR = INVALID_SOCKET;
156 mosq->sockpairW = INVALID_SOCKET;
157 mosq->keepalive = 60;
158 mosq->message_retry = 20;
159 mosq->last_retry_check = 0;
160 mosq->clean_session = clean_session;
163 return MOSQ_ERR_INVAL;
165 mosq->id = _mosquitto_strdup(id);
167 mosq->id = (char *)_mosquitto_calloc(24, sizeof(char));
169 return MOSQ_ERR_NOMEM;
178 mosq->id[i] = (rand()%73)+48;
181 mosq->in_packet.payload = NULL;
182 _mosquitto_packet_cleanup(&mosq->in_packet);
183 mosq->out_packet = NULL;
184 mosq->current_out_packet = NULL;
185 mosq->last_msg_in = mosquitto_time();
186 mosq->last_msg_out = mosquitto_time();
189 mosq->state = mosq_cs_new;
190 mosq->in_messages = NULL;
191 mosq->in_messages_last = NULL;
192 mosq->out_messages = NULL;
193 mosq->out_messages_last = NULL;
194 mosq->max_inflight_messages = 20;
196 mosq->on_connect = NULL;
197 mosq->on_publish = NULL;
198 mosq->on_message = NULL;
199 mosq->on_subscribe = NULL;
200 mosq->on_unsubscribe = NULL;
203 mosq->in_callback = false;
204 mosq->in_queue_len = 0;
205 mosq->out_queue_len = 0;
206 mosq->reconnect_delay = 1;
207 mosq->reconnect_delay_max = 1;
208 mosq->reconnect_exponential_backoff = false;
209 mosq->threaded = false;
212 mosq->tls_cert_reqs = SSL_VERIFY_PEER;
213 mosq->tls_insecure = false;
215 #ifdef WITH_THREADING
216 pthread_mutex_init(&mosq->callback_mutex, NULL);
217 pthread_mutex_init(&mosq->log_callback_mutex, NULL);
218 pthread_mutex_init(&mosq->state_mutex, NULL);
219 pthread_mutex_init(&mosq->out_packet_mutex, NULL);
220 pthread_mutex_init(&mosq->current_out_packet_mutex, NULL);
221 pthread_mutex_init(&mosq->msgtime_mutex, NULL);
222 pthread_mutex_init(&mosq->in_message_mutex, NULL);
223 pthread_mutex_init(&mosq->out_message_mutex, NULL);
224 mosq->thread_id = pthread_self();
227 return MOSQ_ERR_SUCCESS;
230 int mosquitto_will_set(struct mosquitto *mosq, const char *topic, int payloadlen, const void *payload, int qos, bool retain)
232 if(!mosq) return MOSQ_ERR_INVAL;
233 return _mosquitto_will_set(mosq, topic, payloadlen, payload, qos, retain);
236 int mosquitto_will_clear(struct mosquitto *mosq)
238 if(!mosq) return MOSQ_ERR_INVAL;
239 return _mosquitto_will_clear(mosq);
242 int mosquitto_username_pw_set(struct mosquitto *mosq, const char *username, const char *password)
244 if(!mosq) return MOSQ_ERR_INVAL;
247 _mosquitto_free(mosq->username);
248 mosq->username = NULL;
251 _mosquitto_free(mosq->password);
252 mosq->password = NULL;
256 mosq->username = _mosquitto_strdup(username);
257 if(!mosq->username) return MOSQ_ERR_NOMEM;
259 mosq->password = _mosquitto_strdup(password);
261 _mosquitto_free(mosq->username);
262 mosq->username = NULL;
263 return MOSQ_ERR_NOMEM;
267 return MOSQ_ERR_SUCCESS;
270 int mosquitto_reconnect_delay_set(struct mosquitto *mosq, unsigned int reconnect_delay, unsigned int reconnect_delay_max, bool reconnect_exponential_backoff)
272 if(!mosq) return MOSQ_ERR_INVAL;
274 mosq->reconnect_delay = reconnect_delay;
275 mosq->reconnect_delay_max = reconnect_delay_max;
276 mosq->reconnect_exponential_backoff = reconnect_exponential_backoff;
278 return MOSQ_ERR_SUCCESS;
282 void _mosquitto_destroy(struct mosquitto *mosq)
284 struct _mosquitto_packet *packet;
287 #ifdef WITH_THREADING
288 if(mosq->threaded && !pthread_equal(mosq->thread_id, pthread_self())){
289 pthread_cancel(mosq->thread_id);
290 pthread_join(mosq->thread_id, NULL);
291 mosq->threaded = false;
295 /* If mosq->id is not NULL then the client has already been initialised
296 * and so the mutexes need destroying. If mosq->id is NULL, the mutexes
297 * haven't been initialised. */
298 pthread_mutex_destroy(&mosq->callback_mutex);
299 pthread_mutex_destroy(&mosq->log_callback_mutex);
300 pthread_mutex_destroy(&mosq->state_mutex);
301 pthread_mutex_destroy(&mosq->out_packet_mutex);
302 pthread_mutex_destroy(&mosq->current_out_packet_mutex);
303 pthread_mutex_destroy(&mosq->msgtime_mutex);
304 pthread_mutex_destroy(&mosq->in_message_mutex);
305 pthread_mutex_destroy(&mosq->out_message_mutex);
308 if(mosq->sock != INVALID_SOCKET){
309 _mosquitto_socket_close(mosq);
311 _mosquitto_message_cleanup_all(mosq);
312 _mosquitto_will_clear(mosq);
318 SSL_CTX_free(mosq->ssl_ctx);
320 if(mosq->tls_cafile) _mosquitto_free(mosq->tls_cafile);
321 if(mosq->tls_capath) _mosquitto_free(mosq->tls_capath);
322 if(mosq->tls_certfile) _mosquitto_free(mosq->tls_certfile);
323 if(mosq->tls_keyfile) _mosquitto_free(mosq->tls_keyfile);
324 if(mosq->tls_pw_callback) mosq->tls_pw_callback = NULL;
325 if(mosq->tls_version) _mosquitto_free(mosq->tls_version);
326 if(mosq->tls_ciphers) _mosquitto_free(mosq->tls_ciphers);
327 if(mosq->tls_psk) _mosquitto_free(mosq->tls_psk);
328 if(mosq->tls_psk_identity) _mosquitto_free(mosq->tls_psk_identity);
332 _mosquitto_free(mosq->address);
333 mosq->address = NULL;
336 _mosquitto_free(mosq->id);
340 _mosquitto_free(mosq->username);
341 mosq->username = NULL;
344 _mosquitto_free(mosq->password);
345 mosq->password = NULL;
348 _mosquitto_free(mosq->host);
351 if(mosq->bind_address){
352 _mosquitto_free(mosq->bind_address);
353 mosq->bind_address = NULL;
356 /* Out packet cleanup */
357 if(mosq->out_packet && !mosq->current_out_packet){
358 mosq->current_out_packet = mosq->out_packet;
359 mosq->out_packet = mosq->out_packet->next;
361 while(mosq->current_out_packet){
362 packet = mosq->current_out_packet;
363 /* Free data and reset values */
364 mosq->current_out_packet = mosq->out_packet;
365 if(mosq->out_packet){
366 mosq->out_packet = mosq->out_packet->next;
369 _mosquitto_packet_cleanup(packet);
370 _mosquitto_free(packet);
373 _mosquitto_packet_cleanup(&mosq->in_packet);
374 if(mosq->sockpairR != INVALID_SOCKET){
375 COMPAT_CLOSE(mosq->sockpairR);
376 mosq->sockpairR = INVALID_SOCKET;
378 if(mosq->sockpairW != INVALID_SOCKET){
379 COMPAT_CLOSE(mosq->sockpairW);
380 mosq->sockpairW = INVALID_SOCKET;
384 void mosquitto_destroy(struct mosquitto *mosq)
388 _mosquitto_destroy(mosq);
389 _mosquitto_free(mosq);
392 int mosquitto_socket(struct mosquitto *mosq)
394 if(!mosq) return MOSQ_ERR_INVAL;
398 static int _mosquitto_connect_init(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address)
400 if(!mosq) return MOSQ_ERR_INVAL;
401 if(!host || port <= 0) return MOSQ_ERR_INVAL;
403 if(mosq->host) _mosquitto_free(mosq->host);
404 mosq->host = _mosquitto_strdup(host);
405 if(!mosq->host) return MOSQ_ERR_NOMEM;
408 if(mosq->bind_address) _mosquitto_free(mosq->bind_address);
410 mosq->bind_address = _mosquitto_strdup(bind_address);
411 if(!mosq->bind_address) return MOSQ_ERR_NOMEM;
414 mosq->keepalive = keepalive;
416 if(_mosquitto_socketpair(&mosq->sockpairR, &mosq->sockpairW)){
417 _mosquitto_log_printf(mosq, MOSQ_LOG_WARNING,
418 "Warning: Unable to open socket pair, outgoing publish commands may be delayed.");
421 return MOSQ_ERR_SUCCESS;
424 int mosquitto_connect(struct mosquitto *mosq, const char *host, int port, int keepalive)
426 return mosquitto_connect_bind(mosq, host, port, keepalive, NULL);
429 int mosquitto_connect_bind(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address)
432 rc = _mosquitto_connect_init(mosq, host, port, keepalive, bind_address);
435 pthread_mutex_lock(&mosq->state_mutex);
436 mosq->state = mosq_cs_new;
437 pthread_mutex_unlock(&mosq->state_mutex);
439 return _mosquitto_reconnect(mosq, true);
442 int mosquitto_connect_async(struct mosquitto *mosq, const char *host, int port, int keepalive)
444 return mosquitto_connect_bind_async(mosq, host, port, keepalive, NULL);
447 int mosquitto_connect_bind_async(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address)
449 int rc = _mosquitto_connect_init(mosq, host, port, keepalive, bind_address);
452 pthread_mutex_lock(&mosq->state_mutex);
453 mosq->state = mosq_cs_connect_async;
454 pthread_mutex_unlock(&mosq->state_mutex);
456 return _mosquitto_reconnect(mosq, false);
459 int mosquitto_reconnect_async(struct mosquitto *mosq)
461 return _mosquitto_reconnect(mosq, false);
464 int mosquitto_reconnect(struct mosquitto *mosq)
466 return _mosquitto_reconnect(mosq, true);
469 static int _mosquitto_reconnect(struct mosquitto *mosq, bool blocking)
472 struct _mosquitto_packet *packet;
473 if(!mosq) return MOSQ_ERR_INVAL;
474 if(!mosq->host || mosq->port <= 0) return MOSQ_ERR_INVAL;
476 pthread_mutex_lock(&mosq->state_mutex);
477 mosq->state = mosq_cs_new;
478 pthread_mutex_unlock(&mosq->state_mutex);
480 pthread_mutex_lock(&mosq->msgtime_mutex);
481 mosq->last_msg_in = mosquitto_time();
482 mosq->last_msg_out = mosquitto_time();
483 pthread_mutex_unlock(&mosq->msgtime_mutex);
487 _mosquitto_packet_cleanup(&mosq->in_packet);
489 pthread_mutex_lock(&mosq->current_out_packet_mutex);
490 pthread_mutex_lock(&mosq->out_packet_mutex);
492 if(mosq->out_packet && !mosq->current_out_packet){
493 mosq->current_out_packet = mosq->out_packet;
494 mosq->out_packet = mosq->out_packet->next;
497 while(mosq->current_out_packet){
498 packet = mosq->current_out_packet;
499 /* Free data and reset values */
500 mosq->current_out_packet = mosq->out_packet;
501 if(mosq->out_packet){
502 mosq->out_packet = mosq->out_packet->next;
505 _mosquitto_packet_cleanup(packet);
506 _mosquitto_free(packet);
508 pthread_mutex_unlock(&mosq->out_packet_mutex);
509 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
511 _mosquitto_messages_reconnect_reset(mosq);
513 rc = _mosquitto_socket_connect(mosq, mosq->host, mosq->port, mosq->bind_address, blocking);
518 return _mosquitto_send_connect(mosq, mosq->keepalive, mosq->clean_session);
521 int mosquitto_disconnect(struct mosquitto *mosq)
523 if(!mosq) return MOSQ_ERR_INVAL;
525 pthread_mutex_lock(&mosq->state_mutex);
526 mosq->state = mosq_cs_disconnecting;
527 pthread_mutex_unlock(&mosq->state_mutex);
529 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
530 return _mosquitto_send_disconnect(mosq);
533 int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, const void *payload, int qos, bool retain)
535 struct mosquitto_message_all *message;
538 if(!mosq || !topic || qos<0 || qos>2) return MOSQ_ERR_INVAL;
539 if(strlen(topic) == 0) return MOSQ_ERR_INVAL;
540 if(payloadlen < 0 || payloadlen > MQTT_MAX_PAYLOAD) return MOSQ_ERR_PAYLOAD_SIZE;
542 if(_mosquitto_topic_wildcard_len_check(topic) != MOSQ_ERR_SUCCESS){
543 return MOSQ_ERR_INVAL;
546 local_mid = _mosquitto_mid_generate(mosq);
552 return _mosquitto_send_publish(mosq, local_mid, topic, payloadlen, payload, qos, retain, false);
554 message = _mosquitto_calloc(1, sizeof(struct mosquitto_message_all));
555 if(!message) return MOSQ_ERR_NOMEM;
557 message->next = NULL;
558 message->timestamp = mosquitto_time();
559 message->msg.mid = local_mid;
560 message->msg.topic = _mosquitto_strdup(topic);
561 if(!message->msg.topic){
562 _mosquitto_message_cleanup(&message);
563 return MOSQ_ERR_NOMEM;
566 message->msg.payloadlen = payloadlen;
567 message->msg.payload = _mosquitto_malloc(payloadlen*sizeof(uint8_t));
568 if(!message->msg.payload){
569 _mosquitto_message_cleanup(&message);
570 return MOSQ_ERR_NOMEM;
572 memcpy(message->msg.payload, payload, payloadlen*sizeof(uint8_t));
574 message->msg.payloadlen = 0;
575 message->msg.payload = NULL;
577 message->msg.qos = qos;
578 message->msg.retain = retain;
579 message->dup = false;
581 pthread_mutex_lock(&mosq->out_message_mutex);
582 _mosquitto_message_queue(mosq, message, mosq_md_out);
583 if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){
584 mosq->inflight_messages++;
586 message->state = mosq_ms_wait_for_puback;
588 message->state = mosq_ms_wait_for_pubrec;
590 pthread_mutex_unlock(&mosq->out_message_mutex);
591 return _mosquitto_send_publish(mosq, message->msg.mid, message->msg.topic, message->msg.payloadlen, message->msg.payload, message->msg.qos, message->msg.retain, message->dup);
593 message->state = mosq_ms_invalid;
594 pthread_mutex_unlock(&mosq->out_message_mutex);
595 return MOSQ_ERR_SUCCESS;
600 int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const char *sub, int qos)
602 if(!mosq) return MOSQ_ERR_INVAL;
603 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
605 if(_mosquitto_topic_wildcard_pos_check(sub)) return MOSQ_ERR_INVAL;
607 return _mosquitto_send_subscribe(mosq, mid, false, sub, qos);
610 int mosquitto_unsubscribe(struct mosquitto *mosq, int *mid, const char *sub)
612 if(!mosq) return MOSQ_ERR_INVAL;
613 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
615 if(_mosquitto_topic_wildcard_pos_check(sub)) return MOSQ_ERR_INVAL;
617 return _mosquitto_send_unsubscribe(mosq, mid, false, sub);
620 int mosquitto_tls_set(struct mosquitto *mosq, const char *cafile, const char *capath, const char *certfile, const char *keyfile, int (*pw_callback)(char *buf, int size, int rwflag, void *userdata))
625 if(!mosq || (!cafile && !capath) || (certfile && !keyfile) || (!certfile && keyfile)) return MOSQ_ERR_INVAL;
628 fptr = _mosquitto_fopen(cafile, "rt");
632 return MOSQ_ERR_INVAL;
634 mosq->tls_cafile = _mosquitto_strdup(cafile);
636 if(!mosq->tls_cafile){
637 return MOSQ_ERR_NOMEM;
639 }else if(mosq->tls_cafile){
640 _mosquitto_free(mosq->tls_cafile);
641 mosq->tls_cafile = NULL;
645 mosq->tls_capath = _mosquitto_strdup(capath);
646 if(!mosq->tls_capath){
647 return MOSQ_ERR_NOMEM;
649 }else if(mosq->tls_capath){
650 _mosquitto_free(mosq->tls_capath);
651 mosq->tls_capath = NULL;
655 fptr = _mosquitto_fopen(certfile, "rt");
659 if(mosq->tls_cafile){
660 _mosquitto_free(mosq->tls_cafile);
661 mosq->tls_cafile = NULL;
663 if(mosq->tls_capath){
664 _mosquitto_free(mosq->tls_capath);
665 mosq->tls_capath = NULL;
667 return MOSQ_ERR_INVAL;
669 mosq->tls_certfile = _mosquitto_strdup(certfile);
670 if(!mosq->tls_certfile){
671 return MOSQ_ERR_NOMEM;
674 if(mosq->tls_certfile) _mosquitto_free(mosq->tls_certfile);
675 mosq->tls_certfile = NULL;
679 fptr = _mosquitto_fopen(keyfile, "rt");
683 if(mosq->tls_cafile){
684 _mosquitto_free(mosq->tls_cafile);
685 mosq->tls_cafile = NULL;
687 if(mosq->tls_capath){
688 _mosquitto_free(mosq->tls_capath);
689 mosq->tls_capath = NULL;
691 if(mosq->tls_certfile){
692 _mosquitto_free(mosq->tls_certfile);
693 mosq->tls_certfile = NULL;
695 return MOSQ_ERR_INVAL;
697 mosq->tls_keyfile = _mosquitto_strdup(keyfile);
698 if(!mosq->tls_keyfile){
699 return MOSQ_ERR_NOMEM;
702 if(mosq->tls_keyfile) _mosquitto_free(mosq->tls_keyfile);
703 mosq->tls_keyfile = NULL;
706 mosq->tls_pw_callback = pw_callback;
709 return MOSQ_ERR_SUCCESS;
711 return MOSQ_ERR_NOT_SUPPORTED;
716 int mosquitto_tls_opts_set(struct mosquitto *mosq, int cert_reqs, const char *tls_version, const char *ciphers)
719 if(!mosq) return MOSQ_ERR_INVAL;
721 mosq->tls_cert_reqs = cert_reqs;
723 #if OPENSSL_VERSION_NUMBER >= 0x10001000L
724 if(!strcasecmp(tls_version, "tlsv1.2")
725 || !strcasecmp(tls_version, "tlsv1.1")
726 || !strcasecmp(tls_version, "tlsv1")){
728 mosq->tls_version = _mosquitto_strdup(tls_version);
729 if(!mosq->tls_version) return MOSQ_ERR_NOMEM;
731 return MOSQ_ERR_INVAL;
734 if(!strcasecmp(tls_version, "tlsv1")){
735 mosq->tls_version = _mosquitto_strdup(tls_version);
736 if(!mosq->tls_version) return MOSQ_ERR_NOMEM;
738 return MOSQ_ERR_INVAL;
742 #if OPENSSL_VERSION_NUMBER >= 0x10001000L
743 mosq->tls_version = _mosquitto_strdup("tlsv1.2");
745 mosq->tls_version = _mosquitto_strdup("tlsv1");
747 if(!mosq->tls_version) return MOSQ_ERR_NOMEM;
750 mosq->tls_ciphers = _mosquitto_strdup(ciphers);
751 if(!mosq->tls_ciphers) return MOSQ_ERR_NOMEM;
753 mosq->tls_ciphers = NULL;
757 return MOSQ_ERR_SUCCESS;
759 return MOSQ_ERR_NOT_SUPPORTED;
765 int mosquitto_tls_insecure_set(struct mosquitto *mosq, bool value)
768 if(!mosq) return MOSQ_ERR_INVAL;
769 mosq->tls_insecure = value;
770 return MOSQ_ERR_SUCCESS;
772 return MOSQ_ERR_NOT_SUPPORTED;
777 int mosquitto_tls_psk_set(struct mosquitto *mosq, const char *psk, const char *identity, const char *ciphers)
779 #ifdef REAL_WITH_TLS_PSK
780 if(!mosq || !psk || !identity) return MOSQ_ERR_INVAL;
782 /* Check for hex only digits */
783 if(strspn(psk, "0123456789abcdefABCDEF") < strlen(psk)){
784 return MOSQ_ERR_INVAL;
786 mosq->tls_psk = _mosquitto_strdup(psk);
787 if(!mosq->tls_psk) return MOSQ_ERR_NOMEM;
789 mosq->tls_psk_identity = _mosquitto_strdup(identity);
790 if(!mosq->tls_psk_identity){
791 _mosquitto_free(mosq->tls_psk);
792 return MOSQ_ERR_NOMEM;
795 mosq->tls_ciphers = _mosquitto_strdup(ciphers);
796 if(!mosq->tls_ciphers) return MOSQ_ERR_NOMEM;
798 mosq->tls_ciphers = NULL;
801 return MOSQ_ERR_SUCCESS;
803 return MOSQ_ERR_NOT_SUPPORTED;
808 int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
811 struct timespec local_timeout;
813 struct timeval local_timeout;
815 fd_set readfds, writefds;
821 if(!mosq || max_packets < 1) return MOSQ_ERR_INVAL;
825 if(mosq->sock != INVALID_SOCKET){
827 FD_SET(mosq->sock, &readfds);
828 pthread_mutex_lock(&mosq->current_out_packet_mutex);
829 pthread_mutex_lock(&mosq->out_packet_mutex);
830 if(mosq->out_packet || mosq->current_out_packet){
831 FD_SET(mosq->sock, &writefds);
833 }else if(mosq->ssl && mosq->want_write){
834 FD_SET(mosq->sock, &writefds);
837 pthread_mutex_unlock(&mosq->out_packet_mutex);
838 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
842 pthread_mutex_lock(&mosq->state_mutex);
843 if(mosq->state == mosq_cs_connect_srv){
844 rc = ares_fds(mosq->achan, &readfds, &writefds);
849 return MOSQ_ERR_NO_CONN;
851 pthread_mutex_unlock(&mosq->state_mutex);
854 return MOSQ_ERR_NO_CONN;
857 if(mosq->sockpairR != INVALID_SOCKET){
858 /* sockpairR is used to break out of select() before the timeout, on a
859 * call to publish() etc. */
860 FD_SET(mosq->sockpairR, &readfds);
861 if(mosq->sockpairR > maxfd){
862 maxfd = mosq->sockpairR;
867 local_timeout.tv_sec = timeout/1000;
869 local_timeout.tv_nsec = (timeout-local_timeout.tv_sec*1000)*1e6;
871 local_timeout.tv_usec = (timeout-local_timeout.tv_sec*1000)*1000;
874 local_timeout.tv_sec = 1;
876 local_timeout.tv_nsec = 0;
878 local_timeout.tv_usec = 0;
883 fdcount = pselect(maxfd+1, &readfds, &writefds, NULL, &local_timeout, NULL);
885 fdcount = select(maxfd+1, &readfds, &writefds, NULL, &local_timeout);
889 errno = WSAGetLastError();
892 return MOSQ_ERR_SUCCESS;
894 return MOSQ_ERR_ERRNO;
897 if(mosq->sock != INVALID_SOCKET){
898 if(FD_ISSET(mosq->sock, &readfds)){
899 rc = mosquitto_loop_read(mosq, max_packets);
900 if(rc || mosq->sock == INVALID_SOCKET){
904 if(mosq->sockpairR >= 0 && FD_ISSET(mosq->sockpairR, &readfds)){
906 if(read(mosq->sockpairR, &pairbuf, 1) == 0){
909 recv(mosq->sockpairR, &pairbuf, 1, 0);
911 /* Fake write possible, to stimulate output write even though
912 * we didn't ask for it, because at that point the publish or
913 * other command wasn't present. */
914 FD_SET(mosq->sock, &writefds);
916 if(FD_ISSET(mosq->sock, &writefds)){
917 rc = mosquitto_loop_write(mosq, max_packets);
918 if(rc || mosq->sock == INVALID_SOCKET){
925 ares_process(mosq->achan, &readfds, &writefds);
929 return mosquitto_loop_misc(mosq);
932 int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
936 unsigned int reconnects = 0;
937 unsigned long reconnect_delay;
939 if(!mosq) return MOSQ_ERR_INVAL;
941 if(mosq->state == mosq_cs_connect_async){
942 mosquitto_reconnect(mosq);
947 rc = mosquitto_loop(mosq, timeout, max_packets);
948 if (reconnects !=0 && rc == MOSQ_ERR_SUCCESS){
951 }while(rc == MOSQ_ERR_SUCCESS);
955 pthread_mutex_lock(&mosq->state_mutex);
956 if(mosq->state == mosq_cs_disconnecting){
958 pthread_mutex_unlock(&mosq->state_mutex);
960 pthread_mutex_unlock(&mosq->state_mutex);
962 if(mosq->reconnect_delay > 0 && mosq->reconnect_exponential_backoff){
963 reconnect_delay = mosq->reconnect_delay*reconnects*reconnects;
965 reconnect_delay = mosq->reconnect_delay;
968 if(reconnect_delay > mosq->reconnect_delay_max){
969 reconnect_delay = mosq->reconnect_delay_max;
975 Sleep(reconnect_delay*1000);
977 sleep(reconnect_delay);
980 pthread_mutex_lock(&mosq->state_mutex);
981 if(mosq->state == mosq_cs_disconnecting){
983 pthread_mutex_unlock(&mosq->state_mutex);
985 pthread_mutex_unlock(&mosq->state_mutex);
986 mosquitto_reconnect(mosq);
993 int mosquitto_loop_misc(struct mosquitto *mosq)
998 if(!mosq) return MOSQ_ERR_INVAL;
999 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
1001 now = mosquitto_time();
1003 _mosquitto_check_keepalive(mosq);
1004 if(mosq->last_retry_check+1 < now){
1005 _mosquitto_message_retry_check(mosq);
1006 mosq->last_retry_check = now;
1008 if(mosq->ping_t && now - mosq->ping_t >= mosq->keepalive){
1009 /* mosq->ping_t != 0 means we are waiting for a pingresp.
1010 * This hasn't happened in the keepalive time so we should disconnect.
1012 _mosquitto_socket_close(mosq);
1013 pthread_mutex_lock(&mosq->state_mutex);
1014 if(mosq->state == mosq_cs_disconnecting){
1015 rc = MOSQ_ERR_SUCCESS;
1019 pthread_mutex_unlock(&mosq->state_mutex);
1020 pthread_mutex_lock(&mosq->callback_mutex);
1021 if(mosq->on_disconnect){
1022 mosq->in_callback = true;
1023 mosq->on_disconnect(mosq, mosq->userdata, rc);
1024 mosq->in_callback = false;
1026 pthread_mutex_unlock(&mosq->callback_mutex);
1027 return MOSQ_ERR_CONN_LOST;
1029 return MOSQ_ERR_SUCCESS;
1032 static int _mosquitto_loop_rc_handle(struct mosquitto *mosq, int rc)
1035 _mosquitto_socket_close(mosq);
1036 pthread_mutex_lock(&mosq->state_mutex);
1037 if(mosq->state == mosq_cs_disconnecting){
1038 rc = MOSQ_ERR_SUCCESS;
1040 pthread_mutex_unlock(&mosq->state_mutex);
1041 pthread_mutex_lock(&mosq->callback_mutex);
1042 if(mosq->on_disconnect){
1043 mosq->in_callback = true;
1044 mosq->on_disconnect(mosq, mosq->userdata, rc);
1045 mosq->in_callback = false;
1047 pthread_mutex_unlock(&mosq->callback_mutex);
1053 int mosquitto_loop_read(struct mosquitto *mosq, int max_packets)
1057 if(max_packets < 1) return MOSQ_ERR_INVAL;
1059 pthread_mutex_lock(&mosq->out_message_mutex);
1060 max_packets = mosq->out_queue_len;
1061 pthread_mutex_unlock(&mosq->out_message_mutex);
1063 pthread_mutex_lock(&mosq->in_message_mutex);
1064 max_packets += mosq->in_queue_len;
1065 pthread_mutex_unlock(&mosq->in_message_mutex);
1067 if(max_packets < 1) max_packets = 1;
1068 /* Queue len here tells us how many messages are awaiting processing and
1069 * have QoS > 0. We should try to deal with that many in this loop in order
1071 for(i=0; i<max_packets; i++){
1072 rc = _mosquitto_packet_read(mosq);
1073 if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
1074 return _mosquitto_loop_rc_handle(mosq, rc);
1080 int mosquitto_loop_write(struct mosquitto *mosq, int max_packets)
1084 if(max_packets < 1) return MOSQ_ERR_INVAL;
1086 pthread_mutex_lock(&mosq->out_message_mutex);
1087 max_packets = mosq->out_queue_len;
1088 pthread_mutex_unlock(&mosq->out_message_mutex);
1090 pthread_mutex_lock(&mosq->in_message_mutex);
1091 max_packets += mosq->in_queue_len;
1092 pthread_mutex_unlock(&mosq->in_message_mutex);
1094 if(max_packets < 1) max_packets = 1;
1095 /* Queue len here tells us how many messages are awaiting processing and
1096 * have QoS > 0. We should try to deal with that many in this loop in order
1098 for(i=0; i<max_packets; i++){
1099 rc = _mosquitto_packet_write(mosq);
1100 if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
1101 return _mosquitto_loop_rc_handle(mosq, rc);
1107 bool mosquitto_want_write(struct mosquitto *mosq)
1109 if(mosq->out_packet || mosq->current_out_packet){
1112 }else if(mosq->ssl && mosq->want_write){
1120 void mosquitto_connect_callback_set(struct mosquitto *mosq, void (*on_connect)(struct mosquitto *, void *, int))
1122 pthread_mutex_lock(&mosq->callback_mutex);
1123 mosq->on_connect = on_connect;
1124 pthread_mutex_unlock(&mosq->callback_mutex);
1127 void mosquitto_disconnect_callback_set(struct mosquitto *mosq, void (*on_disconnect)(struct mosquitto *, void *, int))
1129 pthread_mutex_lock(&mosq->callback_mutex);
1130 mosq->on_disconnect = on_disconnect;
1131 pthread_mutex_unlock(&mosq->callback_mutex);
1134 void mosquitto_publish_callback_set(struct mosquitto *mosq, void (*on_publish)(struct mosquitto *, void *, int))
1136 pthread_mutex_lock(&mosq->callback_mutex);
1137 mosq->on_publish = on_publish;
1138 pthread_mutex_unlock(&mosq->callback_mutex);
1141 void mosquitto_message_callback_set(struct mosquitto *mosq, void (*on_message)(struct mosquitto *, void *, const struct mosquitto_message *))
1143 pthread_mutex_lock(&mosq->callback_mutex);
1144 mosq->on_message = on_message;
1145 pthread_mutex_unlock(&mosq->callback_mutex);
1148 void mosquitto_subscribe_callback_set(struct mosquitto *mosq, void (*on_subscribe)(struct mosquitto *, void *, int, int, const int *))
1150 pthread_mutex_lock(&mosq->callback_mutex);
1151 mosq->on_subscribe = on_subscribe;
1152 pthread_mutex_unlock(&mosq->callback_mutex);
1155 void mosquitto_unsubscribe_callback_set(struct mosquitto *mosq, void (*on_unsubscribe)(struct mosquitto *, void *, int))
1157 pthread_mutex_lock(&mosq->callback_mutex);
1158 mosq->on_unsubscribe = on_unsubscribe;
1159 pthread_mutex_unlock(&mosq->callback_mutex);
1162 void mosquitto_log_callback_set(struct mosquitto *mosq, void (*on_log)(struct mosquitto *, void *, int, const char *))
1164 pthread_mutex_lock(&mosq->log_callback_mutex);
1165 mosq->on_log = on_log;
1166 pthread_mutex_unlock(&mosq->log_callback_mutex);
1169 void mosquitto_user_data_set(struct mosquitto *mosq, void *userdata)
1172 mosq->userdata = userdata;
1176 const char *mosquitto_strerror(int mosq_errno)
1179 case MOSQ_ERR_SUCCESS:
1181 case MOSQ_ERR_NOMEM:
1182 return "Out of memory.";
1183 case MOSQ_ERR_PROTOCOL:
1184 return "A network protocol error occurred when communicating with the broker.";
1185 case MOSQ_ERR_INVAL:
1186 return "Invalid function arguments provided.";
1187 case MOSQ_ERR_NO_CONN:
1188 return "The client is not currently connected.";
1189 case MOSQ_ERR_CONN_REFUSED:
1190 return "The connection was refused.";
1191 case MOSQ_ERR_NOT_FOUND:
1192 return "Message not found (internal error).";
1193 case MOSQ_ERR_CONN_LOST:
1194 return "The connection was lost.";
1196 return "A TLS error occurred.";
1197 case MOSQ_ERR_PAYLOAD_SIZE:
1198 return "Payload too large.";
1199 case MOSQ_ERR_NOT_SUPPORTED:
1200 return "This feature is not supported.";
1202 return "Authorisation failed.";
1203 case MOSQ_ERR_ACL_DENIED:
1204 return "Access denied by ACL.";
1205 case MOSQ_ERR_UNKNOWN:
1206 return "Unknown error.";
1207 case MOSQ_ERR_ERRNO:
1208 return strerror(errno);
1210 return "Unknown error.";
1214 const char *mosquitto_connack_string(int connack_code)
1216 switch(connack_code){
1218 return "Connection Accepted.";
1220 return "Connection Refused: unacceptable protocol version.";
1222 return "Connection Refused: identifier rejected.";
1224 return "Connection Refused: broker unavailable.";
1226 return "Connection Refused: bad user name or password.";
1228 return "Connection Refused: not authorised.";
1230 return "Connection Refused: unknown reason.";
1234 int mosquitto_sub_topic_tokenise(const char *subtopic, char ***topics, int *count)
1243 if(!subtopic || !topics || !count) return MOSQ_ERR_INVAL;
1245 len = strlen(subtopic);
1247 for(i=0; i<len; i++){
1248 if(subtopic[i] == '/'){
1250 /* Separator at end of line */
1257 (*topics) = _mosquitto_calloc(hier_count, sizeof(char *));
1258 if(!(*topics)) return MOSQ_ERR_NOMEM;
1264 for(i=0; i<len+1; i++){
1265 if(subtopic[i] == '/' || subtopic[i] == '\0'){
1268 tlen = stop-start + 1;
1269 (*topics)[hier] = _mosquitto_calloc(tlen, sizeof(char));
1270 if(!(*topics)[hier]){
1271 for(i=0; i<hier_count; i++){
1272 if((*topics)[hier]){
1273 _mosquitto_free((*topics)[hier]);
1276 _mosquitto_free((*topics));
1277 return MOSQ_ERR_NOMEM;
1279 for(j=start; j<stop; j++){
1280 (*topics)[hier][j-start] = subtopic[j];
1288 *count = hier_count;
1290 return MOSQ_ERR_SUCCESS;
1293 int mosquitto_sub_topic_tokens_free(char ***topics, int count)
1297 if(!topics || !(*topics) || count<1) return MOSQ_ERR_INVAL;
1299 for(i=0; i<count; i++){
1300 if((*topics)[i]) _mosquitto_free((*topics)[i]);
1302 _mosquitto_free(*topics);
1304 return MOSQ_ERR_SUCCESS;