2 Copyright (c) 2010-2013 Roger Light <roger@atchoo.org>
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
8 1. Redistributions of source code must retain the above copyright notice,
9 this list of conditions and the following disclaimer.
10 2. Redistributions in binary form must reproduce the above copyright
11 notice, this list of conditions and the following disclaimer in the
12 documentation and/or other materials provided with the distribution.
13 3. Neither the name of mosquitto nor the names of its
14 contributors may be used to endorse or promote products derived from
15 this software without specific prior written permission.
17 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 POSSIBILITY OF SUCH DAMAGE.
36 #include <strings.h> /* for strcasecmp() */
37 #include <sys/select.h>
46 #include "mosquitto.h"
47 #include "mosquitto_internal.h"
48 #include "logging_mosq.h"
49 #include "messages_mosq.h"
50 #include "memory_mosq.h"
51 #include "mqtt3_protocol.h"
53 #include "read_handle.h"
54 #include "send_mosq.h"
55 #include "time_mosq.h"
57 #include "util_mosq.h"
58 #include "will_mosq.h"
60 #if !defined(WIN32) && !defined(__SYMBIAN32__)
64 void _mosquitto_destroy(struct mosquitto *mosq);
65 static int _mosquitto_reconnect(struct mosquitto *mosq, bool blocking);
66 static int _mosquitto_connect_init(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address);
68 int mosquitto_lib_version(int *major, int *minor, int *revision)
70 if(major) *major = LIBMOSQUITTO_MAJOR;
71 if(minor) *minor = LIBMOSQUITTO_MINOR;
72 if(revision) *revision = LIBMOSQUITTO_REVISION;
73 return LIBMOSQUITTO_VERSION_NUMBER;
76 int mosquitto_lib_init(void)
79 srand(GetTickCount());
83 gettimeofday(&tv, NULL);
84 srand(tv.tv_sec*1000 + tv.tv_usec/1000);
87 _mosquitto_net_init();
89 return MOSQ_ERR_SUCCESS;
92 int mosquitto_lib_cleanup(void)
94 _mosquitto_net_cleanup();
96 return MOSQ_ERR_SUCCESS;
99 struct mosquitto *mosquitto_new(const char *id, bool clean_session, void *userdata)
101 struct mosquitto *mosq = NULL;
104 if(clean_session == false && id == NULL){
110 signal(SIGPIPE, SIG_IGN);
113 mosq = (struct mosquitto *)_mosquitto_calloc(1, sizeof(struct mosquitto));
115 mosq->sock = INVALID_SOCKET;
116 mosq->sockpairR = INVALID_SOCKET;
117 mosq->sockpairW = INVALID_SOCKET;
118 #ifdef WITH_THREADING
119 mosq->thread_id = pthread_self();
121 rc = mosquitto_reinitialise(mosq, id, clean_session, userdata);
123 mosquitto_destroy(mosq);
124 if(rc == MOSQ_ERR_INVAL){
126 }else if(rc == MOSQ_ERR_NOMEM){
137 int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_session, void *userdata)
141 if(!mosq) return MOSQ_ERR_INVAL;
143 if(clean_session == false && id == NULL){
144 return MOSQ_ERR_INVAL;
147 _mosquitto_destroy(mosq);
148 memset(mosq, 0, sizeof(struct mosquitto));
151 mosq->userdata = userdata;
153 mosq->userdata = mosq;
155 mosq->sock = INVALID_SOCKET;
156 mosq->sockpairR = INVALID_SOCKET;
157 mosq->sockpairW = INVALID_SOCKET;
158 mosq->keepalive = 60;
159 mosq->message_retry = 20;
160 mosq->last_retry_check = 0;
161 mosq->clean_session = clean_session;
164 return MOSQ_ERR_INVAL;
166 mosq->id = _mosquitto_strdup(id);
168 mosq->id = (char *)_mosquitto_calloc(24, sizeof(char));
170 return MOSQ_ERR_NOMEM;
179 mosq->id[i] = (rand()%73)+48;
182 mosq->in_packet.payload = NULL;
183 _mosquitto_packet_cleanup(&mosq->in_packet);
184 mosq->out_packet = NULL;
185 mosq->current_out_packet = NULL;
186 mosq->last_msg_in = mosquitto_time();
187 mosq->last_msg_out = mosquitto_time();
190 mosq->state = mosq_cs_new;
191 mosq->in_messages = NULL;
192 mosq->in_messages_last = NULL;
193 mosq->out_messages = NULL;
194 mosq->out_messages_last = NULL;
195 mosq->max_inflight_messages = 20;
197 mosq->on_connect = NULL;
198 mosq->on_publish = NULL;
199 mosq->on_message = NULL;
200 mosq->on_subscribe = NULL;
201 mosq->on_unsubscribe = NULL;
204 mosq->in_callback = false;
205 mosq->in_queue_len = 0;
206 mosq->out_queue_len = 0;
207 mosq->reconnect_delay = 1;
208 mosq->reconnect_delay_max = 1;
209 mosq->reconnect_exponential_backoff = false;
210 mosq->threaded = false;
213 mosq->tls_cert_reqs = SSL_VERIFY_PEER;
214 mosq->tls_insecure = false;
216 #ifdef WITH_THREADING
217 pthread_mutex_init(&mosq->callback_mutex, NULL);
218 pthread_mutex_init(&mosq->log_callback_mutex, NULL);
219 pthread_mutex_init(&mosq->state_mutex, NULL);
220 pthread_mutex_init(&mosq->out_packet_mutex, NULL);
221 pthread_mutex_init(&mosq->current_out_packet_mutex, NULL);
222 pthread_mutex_init(&mosq->msgtime_mutex, NULL);
223 pthread_mutex_init(&mosq->in_message_mutex, NULL);
224 pthread_mutex_init(&mosq->out_message_mutex, NULL);
225 mosq->thread_id = pthread_self();
228 return MOSQ_ERR_SUCCESS;
231 int mosquitto_will_set(struct mosquitto *mosq, const char *topic, int payloadlen, const void *payload, int qos, bool retain)
233 if(!mosq) return MOSQ_ERR_INVAL;
234 return _mosquitto_will_set(mosq, topic, payloadlen, payload, qos, retain);
237 int mosquitto_will_clear(struct mosquitto *mosq)
239 if(!mosq) return MOSQ_ERR_INVAL;
240 return _mosquitto_will_clear(mosq);
243 int mosquitto_username_pw_set(struct mosquitto *mosq, const char *username, const char *password)
245 if(!mosq) return MOSQ_ERR_INVAL;
248 _mosquitto_free(mosq->username);
249 mosq->username = NULL;
252 _mosquitto_free(mosq->password);
253 mosq->password = NULL;
257 mosq->username = _mosquitto_strdup(username);
258 if(!mosq->username) return MOSQ_ERR_NOMEM;
260 mosq->password = _mosquitto_strdup(password);
262 _mosquitto_free(mosq->username);
263 mosq->username = NULL;
264 return MOSQ_ERR_NOMEM;
268 return MOSQ_ERR_SUCCESS;
271 int mosquitto_reconnect_delay_set(struct mosquitto *mosq, unsigned int reconnect_delay, unsigned int reconnect_delay_max, bool reconnect_exponential_backoff)
273 if(!mosq) return MOSQ_ERR_INVAL;
275 mosq->reconnect_delay = reconnect_delay;
276 mosq->reconnect_delay_max = reconnect_delay_max;
277 mosq->reconnect_exponential_backoff = reconnect_exponential_backoff;
279 return MOSQ_ERR_SUCCESS;
283 void _mosquitto_destroy(struct mosquitto *mosq)
285 struct _mosquitto_packet *packet;
288 #ifdef WITH_THREADING
289 if(mosq->threaded && !pthread_equal(mosq->thread_id, pthread_self())){
290 pthread_cancel(mosq->thread_id);
291 pthread_join(mosq->thread_id, NULL);
292 mosq->threaded = false;
296 /* If mosq->id is not NULL then the client has already been initialised
297 * and so the mutexes need destroying. If mosq->id is NULL, the mutexes
298 * haven't been initialised. */
299 pthread_mutex_destroy(&mosq->callback_mutex);
300 pthread_mutex_destroy(&mosq->log_callback_mutex);
301 pthread_mutex_destroy(&mosq->state_mutex);
302 pthread_mutex_destroy(&mosq->out_packet_mutex);
303 pthread_mutex_destroy(&mosq->current_out_packet_mutex);
304 pthread_mutex_destroy(&mosq->msgtime_mutex);
305 pthread_mutex_destroy(&mosq->in_message_mutex);
306 pthread_mutex_destroy(&mosq->out_message_mutex);
309 if(mosq->sock != INVALID_SOCKET){
310 _mosquitto_socket_close(mosq);
312 _mosquitto_message_cleanup_all(mosq);
313 _mosquitto_will_clear(mosq);
319 SSL_CTX_free(mosq->ssl_ctx);
321 if(mosq->tls_cafile) _mosquitto_free(mosq->tls_cafile);
322 if(mosq->tls_capath) _mosquitto_free(mosq->tls_capath);
323 if(mosq->tls_certfile) _mosquitto_free(mosq->tls_certfile);
324 if(mosq->tls_keyfile) _mosquitto_free(mosq->tls_keyfile);
325 if(mosq->tls_pw_callback) mosq->tls_pw_callback = NULL;
326 if(mosq->tls_version) _mosquitto_free(mosq->tls_version);
327 if(mosq->tls_ciphers) _mosquitto_free(mosq->tls_ciphers);
328 if(mosq->tls_psk) _mosquitto_free(mosq->tls_psk);
329 if(mosq->tls_psk_identity) _mosquitto_free(mosq->tls_psk_identity);
333 _mosquitto_free(mosq->address);
334 mosq->address = NULL;
337 _mosquitto_free(mosq->id);
341 _mosquitto_free(mosq->username);
342 mosq->username = NULL;
345 _mosquitto_free(mosq->password);
346 mosq->password = NULL;
349 _mosquitto_free(mosq->host);
352 if(mosq->bind_address){
353 _mosquitto_free(mosq->bind_address);
354 mosq->bind_address = NULL;
357 /* Out packet cleanup */
358 if(mosq->out_packet && !mosq->current_out_packet){
359 mosq->current_out_packet = mosq->out_packet;
360 mosq->out_packet = mosq->out_packet->next;
362 while(mosq->current_out_packet){
363 packet = mosq->current_out_packet;
364 /* Free data and reset values */
365 mosq->current_out_packet = mosq->out_packet;
366 if(mosq->out_packet){
367 mosq->out_packet = mosq->out_packet->next;
370 _mosquitto_packet_cleanup(packet);
371 _mosquitto_free(packet);
374 _mosquitto_packet_cleanup(&mosq->in_packet);
375 if(mosq->sockpairR != INVALID_SOCKET){
376 COMPAT_CLOSE(mosq->sockpairR);
377 mosq->sockpairR = INVALID_SOCKET;
379 if(mosq->sockpairW != INVALID_SOCKET){
380 COMPAT_CLOSE(mosq->sockpairW);
381 mosq->sockpairW = INVALID_SOCKET;
385 void mosquitto_destroy(struct mosquitto *mosq)
389 _mosquitto_destroy(mosq);
390 _mosquitto_free(mosq);
393 int mosquitto_socket(struct mosquitto *mosq)
395 if(!mosq) return MOSQ_ERR_INVAL;
399 static int _mosquitto_connect_init(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address)
401 if(!mosq) return MOSQ_ERR_INVAL;
402 if(!host || port <= 0) return MOSQ_ERR_INVAL;
404 if(mosq->host) _mosquitto_free(mosq->host);
405 mosq->host = _mosquitto_strdup(host);
406 if(!mosq->host) return MOSQ_ERR_NOMEM;
409 if(mosq->bind_address) _mosquitto_free(mosq->bind_address);
411 mosq->bind_address = _mosquitto_strdup(bind_address);
412 if(!mosq->bind_address) return MOSQ_ERR_NOMEM;
415 mosq->keepalive = keepalive;
417 if(_mosquitto_socketpair(&mosq->sockpairR, &mosq->sockpairW)){
418 _mosquitto_log_printf(mosq, MOSQ_LOG_WARNING,
419 "Warning: Unable to open socket pair, outgoing publish commands may be delayed.");
422 return MOSQ_ERR_SUCCESS;
425 int mosquitto_connect(struct mosquitto *mosq, const char *host, int port, int keepalive)
427 return mosquitto_connect_bind(mosq, host, port, keepalive, NULL);
430 int mosquitto_connect_bind(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address)
433 rc = _mosquitto_connect_init(mosq, host, port, keepalive, bind_address);
436 pthread_mutex_lock(&mosq->state_mutex);
437 mosq->state = mosq_cs_new;
438 pthread_mutex_unlock(&mosq->state_mutex);
440 return _mosquitto_reconnect(mosq, true);
443 int mosquitto_connect_async(struct mosquitto *mosq, const char *host, int port, int keepalive)
445 return mosquitto_connect_bind_async(mosq, host, port, keepalive, NULL);
448 int mosquitto_connect_bind_async(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address)
450 int rc = _mosquitto_connect_init(mosq, host, port, keepalive, bind_address);
453 pthread_mutex_lock(&mosq->state_mutex);
454 mosq->state = mosq_cs_connect_async;
455 pthread_mutex_unlock(&mosq->state_mutex);
457 return _mosquitto_reconnect(mosq, false);
460 int mosquitto_reconnect_async(struct mosquitto *mosq)
462 return _mosquitto_reconnect(mosq, false);
465 int mosquitto_reconnect(struct mosquitto *mosq)
467 return _mosquitto_reconnect(mosq, true);
470 static int _mosquitto_reconnect(struct mosquitto *mosq, bool blocking)
473 struct _mosquitto_packet *packet;
474 if(!mosq) return MOSQ_ERR_INVAL;
475 if(!mosq->host || mosq->port <= 0) return MOSQ_ERR_INVAL;
477 pthread_mutex_lock(&mosq->state_mutex);
478 mosq->state = mosq_cs_new;
479 pthread_mutex_unlock(&mosq->state_mutex);
481 pthread_mutex_lock(&mosq->msgtime_mutex);
482 mosq->last_msg_in = mosquitto_time();
483 mosq->last_msg_out = mosquitto_time();
484 pthread_mutex_unlock(&mosq->msgtime_mutex);
488 _mosquitto_packet_cleanup(&mosq->in_packet);
490 pthread_mutex_lock(&mosq->current_out_packet_mutex);
491 pthread_mutex_lock(&mosq->out_packet_mutex);
493 if(mosq->out_packet && !mosq->current_out_packet){
494 mosq->current_out_packet = mosq->out_packet;
495 mosq->out_packet = mosq->out_packet->next;
498 while(mosq->current_out_packet){
499 packet = mosq->current_out_packet;
500 /* Free data and reset values */
501 mosq->current_out_packet = mosq->out_packet;
502 if(mosq->out_packet){
503 mosq->out_packet = mosq->out_packet->next;
506 _mosquitto_packet_cleanup(packet);
507 _mosquitto_free(packet);
509 pthread_mutex_unlock(&mosq->out_packet_mutex);
510 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
512 _mosquitto_messages_reconnect_reset(mosq);
514 rc = _mosquitto_socket_connect(mosq, mosq->host, mosq->port, mosq->bind_address, blocking);
519 return _mosquitto_send_connect(mosq, mosq->keepalive, mosq->clean_session);
522 int mosquitto_disconnect(struct mosquitto *mosq)
524 if(!mosq) return MOSQ_ERR_INVAL;
526 pthread_mutex_lock(&mosq->state_mutex);
527 mosq->state = mosq_cs_disconnecting;
528 pthread_mutex_unlock(&mosq->state_mutex);
530 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
531 return _mosquitto_send_disconnect(mosq);
534 int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, const void *payload, int qos, bool retain)
536 struct mosquitto_message_all *message;
539 if(!mosq || !topic || qos<0 || qos>2) return MOSQ_ERR_INVAL;
540 if(strlen(topic) == 0) return MOSQ_ERR_INVAL;
541 if(payloadlen < 0 || payloadlen > MQTT_MAX_PAYLOAD) return MOSQ_ERR_PAYLOAD_SIZE;
543 if(_mosquitto_topic_wildcard_len_check(topic) != MOSQ_ERR_SUCCESS){
544 return MOSQ_ERR_INVAL;
547 local_mid = _mosquitto_mid_generate(mosq);
553 return _mosquitto_send_publish(mosq, local_mid, topic, payloadlen, payload, qos, retain, false);
555 message = _mosquitto_calloc(1, sizeof(struct mosquitto_message_all));
556 if(!message) return MOSQ_ERR_NOMEM;
558 message->next = NULL;
559 message->timestamp = mosquitto_time();
560 message->msg.mid = local_mid;
561 message->msg.topic = _mosquitto_strdup(topic);
562 if(!message->msg.topic){
563 _mosquitto_message_cleanup(&message);
564 return MOSQ_ERR_NOMEM;
567 message->msg.payloadlen = payloadlen;
568 message->msg.payload = _mosquitto_malloc(payloadlen*sizeof(uint8_t));
569 if(!message->msg.payload){
570 _mosquitto_message_cleanup(&message);
571 return MOSQ_ERR_NOMEM;
573 memcpy(message->msg.payload, payload, payloadlen*sizeof(uint8_t));
575 message->msg.payloadlen = 0;
576 message->msg.payload = NULL;
578 message->msg.qos = qos;
579 message->msg.retain = retain;
580 message->dup = false;
582 pthread_mutex_lock(&mosq->out_message_mutex);
583 _mosquitto_message_queue(mosq, message, mosq_md_out);
584 if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){
585 mosq->inflight_messages++;
587 message->state = mosq_ms_wait_for_puback;
589 message->state = mosq_ms_wait_for_pubrec;
591 pthread_mutex_unlock(&mosq->out_message_mutex);
592 return _mosquitto_send_publish(mosq, message->msg.mid, message->msg.topic, message->msg.payloadlen, message->msg.payload, message->msg.qos, message->msg.retain, message->dup);
594 message->state = mosq_ms_invalid;
595 pthread_mutex_unlock(&mosq->out_message_mutex);
596 return MOSQ_ERR_SUCCESS;
601 int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const char *sub, int qos)
603 if(!mosq) return MOSQ_ERR_INVAL;
604 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
606 if(_mosquitto_topic_wildcard_pos_check(sub)) return MOSQ_ERR_INVAL;
608 return _mosquitto_send_subscribe(mosq, mid, false, sub, qos);
611 int mosquitto_unsubscribe(struct mosquitto *mosq, int *mid, const char *sub)
613 if(!mosq) return MOSQ_ERR_INVAL;
614 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
616 if(_mosquitto_topic_wildcard_pos_check(sub)) return MOSQ_ERR_INVAL;
618 return _mosquitto_send_unsubscribe(mosq, mid, false, sub);
621 int mosquitto_tls_set(struct mosquitto *mosq, const char *cafile, const char *capath, const char *certfile, const char *keyfile, int (*pw_callback)(char *buf, int size, int rwflag, void *userdata))
626 if(!mosq || (!cafile && !capath) || (certfile && !keyfile) || (!certfile && keyfile)) return MOSQ_ERR_INVAL;
629 fptr = _mosquitto_fopen(cafile, "rt");
633 return MOSQ_ERR_INVAL;
635 mosq->tls_cafile = _mosquitto_strdup(cafile);
637 if(!mosq->tls_cafile){
638 return MOSQ_ERR_NOMEM;
640 }else if(mosq->tls_cafile){
641 _mosquitto_free(mosq->tls_cafile);
642 mosq->tls_cafile = NULL;
646 mosq->tls_capath = _mosquitto_strdup(capath);
647 if(!mosq->tls_capath){
648 return MOSQ_ERR_NOMEM;
650 }else if(mosq->tls_capath){
651 _mosquitto_free(mosq->tls_capath);
652 mosq->tls_capath = NULL;
656 fptr = _mosquitto_fopen(certfile, "rt");
660 if(mosq->tls_cafile){
661 _mosquitto_free(mosq->tls_cafile);
662 mosq->tls_cafile = NULL;
664 if(mosq->tls_capath){
665 _mosquitto_free(mosq->tls_capath);
666 mosq->tls_capath = NULL;
668 return MOSQ_ERR_INVAL;
670 mosq->tls_certfile = _mosquitto_strdup(certfile);
671 if(!mosq->tls_certfile){
672 return MOSQ_ERR_NOMEM;
675 if(mosq->tls_certfile) _mosquitto_free(mosq->tls_certfile);
676 mosq->tls_certfile = NULL;
680 fptr = _mosquitto_fopen(keyfile, "rt");
684 if(mosq->tls_cafile){
685 _mosquitto_free(mosq->tls_cafile);
686 mosq->tls_cafile = NULL;
688 if(mosq->tls_capath){
689 _mosquitto_free(mosq->tls_capath);
690 mosq->tls_capath = NULL;
692 if(mosq->tls_certfile){
693 _mosquitto_free(mosq->tls_certfile);
694 mosq->tls_certfile = NULL;
696 return MOSQ_ERR_INVAL;
698 mosq->tls_keyfile = _mosquitto_strdup(keyfile);
699 if(!mosq->tls_keyfile){
700 return MOSQ_ERR_NOMEM;
703 if(mosq->tls_keyfile) _mosquitto_free(mosq->tls_keyfile);
704 mosq->tls_keyfile = NULL;
707 mosq->tls_pw_callback = pw_callback;
710 return MOSQ_ERR_SUCCESS;
712 return MOSQ_ERR_NOT_SUPPORTED;
717 int mosquitto_tls_opts_set(struct mosquitto *mosq, int cert_reqs, const char *tls_version, const char *ciphers)
720 if(!mosq) return MOSQ_ERR_INVAL;
722 mosq->tls_cert_reqs = cert_reqs;
724 #if OPENSSL_VERSION_NUMBER >= 0x10001000L
725 if(!strcasecmp(tls_version, "tlsv1.2")
726 || !strcasecmp(tls_version, "tlsv1.1")
727 || !strcasecmp(tls_version, "tlsv1")){
729 mosq->tls_version = _mosquitto_strdup(tls_version);
730 if(!mosq->tls_version) return MOSQ_ERR_NOMEM;
732 return MOSQ_ERR_INVAL;
735 if(!strcasecmp(tls_version, "tlsv1")){
736 mosq->tls_version = _mosquitto_strdup(tls_version);
737 if(!mosq->tls_version) return MOSQ_ERR_NOMEM;
739 return MOSQ_ERR_INVAL;
743 #if OPENSSL_VERSION_NUMBER >= 0x10001000L
744 mosq->tls_version = _mosquitto_strdup("tlsv1.2");
746 mosq->tls_version = _mosquitto_strdup("tlsv1");
748 if(!mosq->tls_version) return MOSQ_ERR_NOMEM;
751 mosq->tls_ciphers = _mosquitto_strdup(ciphers);
752 if(!mosq->tls_ciphers) return MOSQ_ERR_NOMEM;
754 mosq->tls_ciphers = NULL;
758 return MOSQ_ERR_SUCCESS;
760 return MOSQ_ERR_NOT_SUPPORTED;
766 int mosquitto_tls_insecure_set(struct mosquitto *mosq, bool value)
769 if(!mosq) return MOSQ_ERR_INVAL;
770 mosq->tls_insecure = value;
771 return MOSQ_ERR_SUCCESS;
773 return MOSQ_ERR_NOT_SUPPORTED;
778 int mosquitto_tls_psk_set(struct mosquitto *mosq, const char *psk, const char *identity, const char *ciphers)
780 #ifdef REAL_WITH_TLS_PSK
781 if(!mosq || !psk || !identity) return MOSQ_ERR_INVAL;
783 /* Check for hex only digits */
784 if(strspn(psk, "0123456789abcdefABCDEF") < strlen(psk)){
785 return MOSQ_ERR_INVAL;
787 mosq->tls_psk = _mosquitto_strdup(psk);
788 if(!mosq->tls_psk) return MOSQ_ERR_NOMEM;
790 mosq->tls_psk_identity = _mosquitto_strdup(identity);
791 if(!mosq->tls_psk_identity){
792 _mosquitto_free(mosq->tls_psk);
793 return MOSQ_ERR_NOMEM;
796 mosq->tls_ciphers = _mosquitto_strdup(ciphers);
797 if(!mosq->tls_ciphers) return MOSQ_ERR_NOMEM;
799 mosq->tls_ciphers = NULL;
802 return MOSQ_ERR_SUCCESS;
804 return MOSQ_ERR_NOT_SUPPORTED;
809 int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
812 struct timespec local_timeout;
814 struct timeval local_timeout;
816 fd_set readfds, writefds;
822 if(!mosq || max_packets < 1) return MOSQ_ERR_INVAL;
826 if(mosq->sock != INVALID_SOCKET){
828 FD_SET(mosq->sock, &readfds);
829 pthread_mutex_lock(&mosq->current_out_packet_mutex);
830 pthread_mutex_lock(&mosq->out_packet_mutex);
831 if(mosq->out_packet || mosq->current_out_packet){
832 FD_SET(mosq->sock, &writefds);
834 }else if(mosq->ssl && mosq->want_write){
835 FD_SET(mosq->sock, &writefds);
838 pthread_mutex_unlock(&mosq->out_packet_mutex);
839 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
843 pthread_mutex_lock(&mosq->state_mutex);
844 if(mosq->state == mosq_cs_connect_srv){
845 rc = ares_fds(mosq->achan, &readfds, &writefds);
850 return MOSQ_ERR_NO_CONN;
852 pthread_mutex_unlock(&mosq->state_mutex);
855 return MOSQ_ERR_NO_CONN;
858 if(mosq->sockpairR != INVALID_SOCKET){
859 /* sockpairR is used to break out of select() before the timeout, on a
860 * call to publish() etc. */
861 FD_SET(mosq->sockpairR, &readfds);
862 if(mosq->sockpairR > maxfd){
863 maxfd = mosq->sockpairR;
868 local_timeout.tv_sec = timeout/1000;
870 local_timeout.tv_nsec = (timeout-local_timeout.tv_sec*1000)*1e6;
872 local_timeout.tv_usec = (timeout-local_timeout.tv_sec*1000)*1000;
875 local_timeout.tv_sec = 1;
877 local_timeout.tv_nsec = 0;
879 local_timeout.tv_usec = 0;
884 fdcount = pselect(maxfd+1, &readfds, &writefds, NULL, &local_timeout, NULL);
886 fdcount = select(maxfd+1, &readfds, &writefds, NULL, &local_timeout);
890 errno = WSAGetLastError();
893 return MOSQ_ERR_SUCCESS;
895 return MOSQ_ERR_ERRNO;
898 if(mosq->sock != INVALID_SOCKET){
899 if(FD_ISSET(mosq->sock, &readfds)){
900 rc = mosquitto_loop_read(mosq, max_packets);
901 if(rc || mosq->sock == INVALID_SOCKET){
905 if(mosq->sockpairR >= 0 && FD_ISSET(mosq->sockpairR, &readfds)){
907 if(read(mosq->sockpairR, &pairbuf, 1) == 0){
910 recv(mosq->sockpairR, &pairbuf, 1, 0);
912 /* Fake write possible, to stimulate output write even though
913 * we didn't ask for it, because at that point the publish or
914 * other command wasn't present. */
915 FD_SET(mosq->sock, &writefds);
917 if(FD_ISSET(mosq->sock, &writefds)){
918 rc = mosquitto_loop_write(mosq, max_packets);
919 if(rc || mosq->sock == INVALID_SOCKET){
926 ares_process(mosq->achan, &readfds, &writefds);
930 return mosquitto_loop_misc(mosq);
933 int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
937 unsigned int reconnects = 0;
938 unsigned long reconnect_delay;
940 if(!mosq) return MOSQ_ERR_INVAL;
942 if(mosq->state == mosq_cs_connect_async){
943 mosquitto_reconnect(mosq);
948 rc = mosquitto_loop(mosq, timeout, max_packets);
949 if (reconnects !=0 && rc == MOSQ_ERR_SUCCESS){
952 }while(rc == MOSQ_ERR_SUCCESS);
956 pthread_mutex_lock(&mosq->state_mutex);
957 if(mosq->state == mosq_cs_disconnecting){
959 pthread_mutex_unlock(&mosq->state_mutex);
961 pthread_mutex_unlock(&mosq->state_mutex);
963 if(mosq->reconnect_delay > 0 && mosq->reconnect_exponential_backoff){
964 reconnect_delay = mosq->reconnect_delay*reconnects*reconnects;
966 reconnect_delay = mosq->reconnect_delay;
969 if(reconnect_delay > mosq->reconnect_delay_max){
970 reconnect_delay = mosq->reconnect_delay_max;
976 Sleep(reconnect_delay*1000);
978 sleep(reconnect_delay);
981 pthread_mutex_lock(&mosq->state_mutex);
982 if(mosq->state == mosq_cs_disconnecting){
984 pthread_mutex_unlock(&mosq->state_mutex);
986 pthread_mutex_unlock(&mosq->state_mutex);
987 mosquitto_reconnect(mosq);
994 int mosquitto_loop_misc(struct mosquitto *mosq)
999 if(!mosq) return MOSQ_ERR_INVAL;
1000 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
1002 now = mosquitto_time();
1004 _mosquitto_check_keepalive(mosq);
1005 if(mosq->last_retry_check+1 < now){
1006 _mosquitto_message_retry_check(mosq);
1007 mosq->last_retry_check = now;
1009 if(mosq->ping_t && now - mosq->ping_t >= mosq->keepalive){
1010 /* mosq->ping_t != 0 means we are waiting for a pingresp.
1011 * This hasn't happened in the keepalive time so we should disconnect.
1013 _mosquitto_socket_close(mosq);
1014 pthread_mutex_lock(&mosq->state_mutex);
1015 if(mosq->state == mosq_cs_disconnecting){
1016 rc = MOSQ_ERR_SUCCESS;
1020 pthread_mutex_unlock(&mosq->state_mutex);
1021 pthread_mutex_lock(&mosq->callback_mutex);
1022 if(mosq->on_disconnect){
1023 mosq->in_callback = true;
1024 mosq->on_disconnect(mosq, mosq->userdata, rc);
1025 mosq->in_callback = false;
1027 pthread_mutex_unlock(&mosq->callback_mutex);
1028 return MOSQ_ERR_CONN_LOST;
1030 return MOSQ_ERR_SUCCESS;
1033 static int _mosquitto_loop_rc_handle(struct mosquitto *mosq, int rc)
1036 _mosquitto_socket_close(mosq);
1037 pthread_mutex_lock(&mosq->state_mutex);
1038 if(mosq->state == mosq_cs_disconnecting){
1039 rc = MOSQ_ERR_SUCCESS;
1041 pthread_mutex_unlock(&mosq->state_mutex);
1042 pthread_mutex_lock(&mosq->callback_mutex);
1043 if(mosq->on_disconnect){
1044 mosq->in_callback = true;
1045 mosq->on_disconnect(mosq, mosq->userdata, rc);
1046 mosq->in_callback = false;
1048 pthread_mutex_unlock(&mosq->callback_mutex);
1054 int mosquitto_loop_read(struct mosquitto *mosq, int max_packets)
1058 if(max_packets < 1) return MOSQ_ERR_INVAL;
1060 pthread_mutex_lock(&mosq->out_message_mutex);
1061 max_packets = mosq->out_queue_len;
1062 pthread_mutex_unlock(&mosq->out_message_mutex);
1064 pthread_mutex_lock(&mosq->in_message_mutex);
1065 max_packets += mosq->in_queue_len;
1066 pthread_mutex_unlock(&mosq->in_message_mutex);
1068 if(max_packets < 1) max_packets = 1;
1069 /* Queue len here tells us how many messages are awaiting processing and
1070 * have QoS > 0. We should try to deal with that many in this loop in order
1072 for(i=0; i<max_packets; i++){
1073 rc = _mosquitto_packet_read(mosq);
1074 if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
1075 return _mosquitto_loop_rc_handle(mosq, rc);
1081 int mosquitto_loop_write(struct mosquitto *mosq, int max_packets)
1085 if(max_packets < 1) return MOSQ_ERR_INVAL;
1087 pthread_mutex_lock(&mosq->out_message_mutex);
1088 max_packets = mosq->out_queue_len;
1089 pthread_mutex_unlock(&mosq->out_message_mutex);
1091 pthread_mutex_lock(&mosq->in_message_mutex);
1092 max_packets += mosq->in_queue_len;
1093 pthread_mutex_unlock(&mosq->in_message_mutex);
1095 if(max_packets < 1) max_packets = 1;
1096 /* Queue len here tells us how many messages are awaiting processing and
1097 * have QoS > 0. We should try to deal with that many in this loop in order
1099 for(i=0; i<max_packets; i++){
1100 rc = _mosquitto_packet_write(mosq);
1101 if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
1102 return _mosquitto_loop_rc_handle(mosq, rc);
1108 bool mosquitto_want_write(struct mosquitto *mosq)
1110 if(mosq->out_packet || mosq->current_out_packet){
1113 }else if(mosq->ssl && mosq->want_write){
1121 void mosquitto_connect_callback_set(struct mosquitto *mosq, void (*on_connect)(struct mosquitto *, void *, int))
1123 pthread_mutex_lock(&mosq->callback_mutex);
1124 mosq->on_connect = on_connect;
1125 pthread_mutex_unlock(&mosq->callback_mutex);
1128 void mosquitto_disconnect_callback_set(struct mosquitto *mosq, void (*on_disconnect)(struct mosquitto *, void *, int))
1130 pthread_mutex_lock(&mosq->callback_mutex);
1131 mosq->on_disconnect = on_disconnect;
1132 pthread_mutex_unlock(&mosq->callback_mutex);
1135 void mosquitto_publish_callback_set(struct mosquitto *mosq, void (*on_publish)(struct mosquitto *, void *, int))
1137 pthread_mutex_lock(&mosq->callback_mutex);
1138 mosq->on_publish = on_publish;
1139 pthread_mutex_unlock(&mosq->callback_mutex);
1142 void mosquitto_message_callback_set(struct mosquitto *mosq, void (*on_message)(struct mosquitto *, void *, const struct mosquitto_message *))
1144 pthread_mutex_lock(&mosq->callback_mutex);
1145 mosq->on_message = on_message;
1146 pthread_mutex_unlock(&mosq->callback_mutex);
1149 void mosquitto_subscribe_callback_set(struct mosquitto *mosq, void (*on_subscribe)(struct mosquitto *, void *, int, int, const int *))
1151 pthread_mutex_lock(&mosq->callback_mutex);
1152 mosq->on_subscribe = on_subscribe;
1153 pthread_mutex_unlock(&mosq->callback_mutex);
1156 void mosquitto_unsubscribe_callback_set(struct mosquitto *mosq, void (*on_unsubscribe)(struct mosquitto *, void *, int))
1158 pthread_mutex_lock(&mosq->callback_mutex);
1159 mosq->on_unsubscribe = on_unsubscribe;
1160 pthread_mutex_unlock(&mosq->callback_mutex);
1163 void mosquitto_log_callback_set(struct mosquitto *mosq, void (*on_log)(struct mosquitto *, void *, int, const char *))
1165 pthread_mutex_lock(&mosq->log_callback_mutex);
1166 mosq->on_log = on_log;
1167 pthread_mutex_unlock(&mosq->log_callback_mutex);
1170 void mosquitto_user_data_set(struct mosquitto *mosq, void *userdata)
1173 mosq->userdata = userdata;
1177 const char *mosquitto_strerror(int mosq_errno)
1180 case MOSQ_ERR_SUCCESS:
1182 case MOSQ_ERR_NOMEM:
1183 return "Out of memory.";
1184 case MOSQ_ERR_PROTOCOL:
1185 return "A network protocol error occurred when communicating with the broker.";
1186 case MOSQ_ERR_INVAL:
1187 return "Invalid function arguments provided.";
1188 case MOSQ_ERR_NO_CONN:
1189 return "The client is not currently connected.";
1190 case MOSQ_ERR_CONN_REFUSED:
1191 return "The connection was refused.";
1192 case MOSQ_ERR_NOT_FOUND:
1193 return "Message not found (internal error).";
1194 case MOSQ_ERR_CONN_LOST:
1195 return "The connection was lost.";
1197 return "A TLS error occurred.";
1198 case MOSQ_ERR_PAYLOAD_SIZE:
1199 return "Payload too large.";
1200 case MOSQ_ERR_NOT_SUPPORTED:
1201 return "This feature is not supported.";
1203 return "Authorisation failed.";
1204 case MOSQ_ERR_ACL_DENIED:
1205 return "Access denied by ACL.";
1206 case MOSQ_ERR_UNKNOWN:
1207 return "Unknown error.";
1208 case MOSQ_ERR_ERRNO:
1209 return strerror(errno);
1211 return "Unknown error.";
1215 const char *mosquitto_connack_string(int connack_code)
1217 switch(connack_code){
1219 return "Connection Accepted.";
1221 return "Connection Refused: unacceptable protocol version.";
1223 return "Connection Refused: identifier rejected.";
1225 return "Connection Refused: broker unavailable.";
1227 return "Connection Refused: bad user name or password.";
1229 return "Connection Refused: not authorised.";
1231 return "Connection Refused: unknown reason.";
1235 int mosquitto_sub_topic_tokenise(const char *subtopic, char ***topics, int *count)
1244 if(!subtopic || !topics || !count) return MOSQ_ERR_INVAL;
1246 len = strlen(subtopic);
1248 for(i=0; i<len; i++){
1249 if(subtopic[i] == '/'){
1251 /* Separator at end of line */
1258 (*topics) = _mosquitto_calloc(hier_count, sizeof(char *));
1259 if(!(*topics)) return MOSQ_ERR_NOMEM;
1265 for(i=0; i<len+1; i++){
1266 if(subtopic[i] == '/' || subtopic[i] == '\0'){
1269 tlen = stop-start + 1;
1270 (*topics)[hier] = _mosquitto_calloc(tlen, sizeof(char));
1271 if(!(*topics)[hier]){
1272 for(i=0; i<hier_count; i++){
1273 if((*topics)[hier]){
1274 _mosquitto_free((*topics)[hier]);
1277 _mosquitto_free((*topics));
1278 return MOSQ_ERR_NOMEM;
1280 for(j=start; j<stop; j++){
1281 (*topics)[hier][j-start] = subtopic[j];
1289 *count = hier_count;
1291 return MOSQ_ERR_SUCCESS;
1294 int mosquitto_sub_topic_tokens_free(char ***topics, int count)
1298 if(!topics || !(*topics) || count<1) return MOSQ_ERR_INVAL;
1300 for(i=0; i<count; i++){
1301 if((*topics)[i]) _mosquitto_free((*topics)[i]);
1303 _mosquitto_free(*topics);
1305 return MOSQ_ERR_SUCCESS;