Automatically detect C99 flags for supported compilers.
[platform/upstream/iotivity.git] / service / protocol-plugin / plugins / mqtt-fan / lib / mosquitto.c
1 /*
2 Copyright (c) 2010-2013 Roger Light <roger@atchoo.org>
3 All rights reserved.
4
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
7
8 1. Redistributions of source code must retain the above copyright notice,
9    this list of conditions and the following disclaimer.
10 2. Redistributions in binary form must reproduce the above copyright
11    notice, this list of conditions and the following disclaimer in the
12    documentation and/or other materials provided with the distribution.
13 3. Neither the name of mosquitto nor the names of its
14    contributors may be used to endorse or promote products derived from
15    this software without specific prior written permission.
16
17 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include <assert.h>
31 #include <errno.h>
32 #include <signal.h>
33 #include <stdio.h>
34 #include <string.h>
35 #ifndef WIN32
36 #include <strings.h>  /* for strcasecmp() */
37 #include <sys/select.h>
38 #include <sys/time.h>
39 #include <unistd.h>
40 #else
41 #include <winsock2.h>
42 #include <windows.h>
43 typedef int ssize_t;
44 #endif
45
46 #include "mosquitto.h"
47 #include "mosquitto_internal.h"
48 #include "logging_mosq.h"
49 #include "messages_mosq.h"
50 #include "memory_mosq.h"
51 #include "mqtt3_protocol.h"
52 #include "net_mosq.h"
53 #include "read_handle.h"
54 #include "send_mosq.h"
55 #include "time_mosq.h"
56 #include "tls_mosq.h"
57 #include "util_mosq.h"
58 #include "will_mosq.h"
59
60 #if !defined(WIN32) && !defined(__SYMBIAN32__)
61 #define HAVE_PSELECT
62 #endif
63
64 void _mosquitto_destroy(struct mosquitto *mosq);
65 static int _mosquitto_reconnect(struct mosquitto *mosq, bool blocking);
66 static int _mosquitto_connect_init(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address);
67
68 int mosquitto_lib_version(int *major, int *minor, int *revision)
69 {
70         if(major) *major = LIBMOSQUITTO_MAJOR;
71         if(minor) *minor = LIBMOSQUITTO_MINOR;
72         if(revision) *revision = LIBMOSQUITTO_REVISION;
73         return LIBMOSQUITTO_VERSION_NUMBER;
74 }
75
76 int mosquitto_lib_init(void)
77 {
78 #ifdef WIN32
79         srand(GetTickCount());
80 #else
81         struct timeval tv;
82
83         gettimeofday(&tv, NULL);
84         srand(tv.tv_sec*1000 + tv.tv_usec/1000);
85 #endif
86
87         _mosquitto_net_init();
88
89         return MOSQ_ERR_SUCCESS;
90 }
91
92 int mosquitto_lib_cleanup(void)
93 {
94         _mosquitto_net_cleanup();
95
96         return MOSQ_ERR_SUCCESS;
97 }
98
99 struct mosquitto *mosquitto_new(const char *id, bool clean_session, void *userdata)
100 {
101         struct mosquitto *mosq = NULL;
102         int rc;
103
104         if(clean_session == false && id == NULL){
105                 errno = EINVAL;
106                 return NULL;
107         }
108
109 #ifndef WIN32
110         signal(SIGPIPE, SIG_IGN);
111 #endif
112
113         mosq = (struct mosquitto *)_mosquitto_calloc(1, sizeof(struct mosquitto));
114         if(mosq){
115                 mosq->sock = INVALID_SOCKET;
116                 mosq->sockpairR = INVALID_SOCKET;
117                 mosq->sockpairW = INVALID_SOCKET;
118 #ifdef WITH_THREADING
119                 mosq->thread_id = pthread_self();
120 #endif
121                 rc = mosquitto_reinitialise(mosq, id, clean_session, userdata);
122                 if(rc){
123                         mosquitto_destroy(mosq);
124                         if(rc == MOSQ_ERR_INVAL){
125                                 errno = EINVAL;
126                         }else if(rc == MOSQ_ERR_NOMEM){
127                                 errno = ENOMEM;
128                         }
129                         return NULL;
130                 }
131         }else{
132                 errno = ENOMEM;
133         }
134         return mosq;
135 }
136
137 int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_session, void *userdata)
138 {
139         int i;
140
141         if(!mosq) return MOSQ_ERR_INVAL;
142
143         if(clean_session == false && id == NULL){
144                 return MOSQ_ERR_INVAL;
145         }
146
147         _mosquitto_destroy(mosq);
148         memset(mosq, 0, sizeof(struct mosquitto));
149
150         if(userdata){
151                 mosq->userdata = userdata;
152         }else{
153                 mosq->userdata = mosq;
154         }
155         mosq->sock = INVALID_SOCKET;
156         mosq->sockpairR = INVALID_SOCKET;
157         mosq->sockpairW = INVALID_SOCKET;
158         mosq->keepalive = 60;
159         mosq->message_retry = 20;
160         mosq->last_retry_check = 0;
161         mosq->clean_session = clean_session;
162         if(id){
163                 if(strlen(id) == 0){
164                         return MOSQ_ERR_INVAL;
165                 }
166                 mosq->id = _mosquitto_strdup(id);
167         }else{
168                 mosq->id = (char *)_mosquitto_calloc(24, sizeof(char));
169                 if(!mosq->id){
170                         return MOSQ_ERR_NOMEM;
171                 }
172                 mosq->id[0] = 'm';
173                 mosq->id[1] = 'o';
174                 mosq->id[2] = 's';
175                 mosq->id[3] = 'q';
176                 mosq->id[4] = '/';
177
178                 for(i=5; i<23; i++){
179                         mosq->id[i] = (rand()%73)+48;
180                 }
181         }
182         mosq->in_packet.payload = NULL;
183         _mosquitto_packet_cleanup(&mosq->in_packet);
184         mosq->out_packet = NULL;
185         mosq->current_out_packet = NULL;
186         mosq->last_msg_in = mosquitto_time();
187         mosq->last_msg_out = mosquitto_time();
188         mosq->ping_t = 0;
189         mosq->last_mid = 0;
190         mosq->state = mosq_cs_new;
191         mosq->in_messages = NULL;
192         mosq->in_messages_last = NULL;
193         mosq->out_messages = NULL;
194         mosq->out_messages_last = NULL;
195         mosq->max_inflight_messages = 20;
196         mosq->will = NULL;
197         mosq->on_connect = NULL;
198         mosq->on_publish = NULL;
199         mosq->on_message = NULL;
200         mosq->on_subscribe = NULL;
201         mosq->on_unsubscribe = NULL;
202         mosq->host = NULL;
203         mosq->port = 1883;
204         mosq->in_callback = false;
205         mosq->in_queue_len = 0;
206         mosq->out_queue_len = 0;
207         mosq->reconnect_delay = 1;
208         mosq->reconnect_delay_max = 1;
209         mosq->reconnect_exponential_backoff = false;
210         mosq->threaded = false;
211 #ifdef WITH_TLS
212         mosq->ssl = NULL;
213         mosq->tls_cert_reqs = SSL_VERIFY_PEER;
214         mosq->tls_insecure = false;
215 #endif
216 #ifdef WITH_THREADING
217         pthread_mutex_init(&mosq->callback_mutex, NULL);
218         pthread_mutex_init(&mosq->log_callback_mutex, NULL);
219         pthread_mutex_init(&mosq->state_mutex, NULL);
220         pthread_mutex_init(&mosq->out_packet_mutex, NULL);
221         pthread_mutex_init(&mosq->current_out_packet_mutex, NULL);
222         pthread_mutex_init(&mosq->msgtime_mutex, NULL);
223         pthread_mutex_init(&mosq->in_message_mutex, NULL);
224         pthread_mutex_init(&mosq->out_message_mutex, NULL);
225         mosq->thread_id = pthread_self();
226 #endif
227
228         return MOSQ_ERR_SUCCESS;
229 }
230
231 int mosquitto_will_set(struct mosquitto *mosq, const char *topic, int payloadlen, const void *payload, int qos, bool retain)
232 {
233         if(!mosq) return MOSQ_ERR_INVAL;
234         return _mosquitto_will_set(mosq, topic, payloadlen, payload, qos, retain);
235 }
236
237 int mosquitto_will_clear(struct mosquitto *mosq)
238 {
239         if(!mosq) return MOSQ_ERR_INVAL;
240         return _mosquitto_will_clear(mosq);
241 }
242
243 int mosquitto_username_pw_set(struct mosquitto *mosq, const char *username, const char *password)
244 {
245         if(!mosq) return MOSQ_ERR_INVAL;
246
247         if(mosq->username){
248                 _mosquitto_free(mosq->username);
249                 mosq->username = NULL;
250         }
251         if(mosq->password){
252                 _mosquitto_free(mosq->password);
253                 mosq->password = NULL;
254         }
255
256         if(username){
257                 mosq->username = _mosquitto_strdup(username);
258                 if(!mosq->username) return MOSQ_ERR_NOMEM;
259                 if(password){
260                         mosq->password = _mosquitto_strdup(password);
261                         if(!mosq->password){
262                                 _mosquitto_free(mosq->username);
263                                 mosq->username = NULL;
264                                 return MOSQ_ERR_NOMEM;
265                         }
266                 }
267         }
268         return MOSQ_ERR_SUCCESS;
269 }
270
271 int mosquitto_reconnect_delay_set(struct mosquitto *mosq, unsigned int reconnect_delay, unsigned int reconnect_delay_max, bool reconnect_exponential_backoff)
272 {
273         if(!mosq) return MOSQ_ERR_INVAL;
274         
275         mosq->reconnect_delay = reconnect_delay;
276         mosq->reconnect_delay_max = reconnect_delay_max;
277         mosq->reconnect_exponential_backoff = reconnect_exponential_backoff;
278         
279         return MOSQ_ERR_SUCCESS;
280         
281 }
282
283 void _mosquitto_destroy(struct mosquitto *mosq)
284 {
285         struct _mosquitto_packet *packet;
286         if(!mosq) return;
287
288 #ifdef WITH_THREADING
289         if(mosq->threaded && !pthread_equal(mosq->thread_id, pthread_self())){
290                 pthread_cancel(mosq->thread_id);
291                 pthread_join(mosq->thread_id, NULL);
292                 mosq->threaded = false;
293         }
294
295         if(mosq->id){
296                 /* If mosq->id is not NULL then the client has already been initialised
297                  * and so the mutexes need destroying. If mosq->id is NULL, the mutexes
298                  * haven't been initialised. */
299                 pthread_mutex_destroy(&mosq->callback_mutex);
300                 pthread_mutex_destroy(&mosq->log_callback_mutex);
301                 pthread_mutex_destroy(&mosq->state_mutex);
302                 pthread_mutex_destroy(&mosq->out_packet_mutex);
303                 pthread_mutex_destroy(&mosq->current_out_packet_mutex);
304                 pthread_mutex_destroy(&mosq->msgtime_mutex);
305                 pthread_mutex_destroy(&mosq->in_message_mutex);
306                 pthread_mutex_destroy(&mosq->out_message_mutex);
307         }
308 #endif
309         if(mosq->sock != INVALID_SOCKET){
310                 _mosquitto_socket_close(mosq);
311         }
312         _mosquitto_message_cleanup_all(mosq);
313         _mosquitto_will_clear(mosq);
314 #ifdef WITH_TLS
315         if(mosq->ssl){
316                 SSL_free(mosq->ssl);
317         }
318         if(mosq->ssl_ctx){
319                 SSL_CTX_free(mosq->ssl_ctx);
320         }
321         if(mosq->tls_cafile) _mosquitto_free(mosq->tls_cafile);
322         if(mosq->tls_capath) _mosquitto_free(mosq->tls_capath);
323         if(mosq->tls_certfile) _mosquitto_free(mosq->tls_certfile);
324         if(mosq->tls_keyfile) _mosquitto_free(mosq->tls_keyfile);
325         if(mosq->tls_pw_callback) mosq->tls_pw_callback = NULL;
326         if(mosq->tls_version) _mosquitto_free(mosq->tls_version);
327         if(mosq->tls_ciphers) _mosquitto_free(mosq->tls_ciphers);
328         if(mosq->tls_psk) _mosquitto_free(mosq->tls_psk);
329         if(mosq->tls_psk_identity) _mosquitto_free(mosq->tls_psk_identity);
330 #endif
331
332         if(mosq->address){
333                 _mosquitto_free(mosq->address);
334                 mosq->address = NULL;
335         }
336         if(mosq->id){
337                 _mosquitto_free(mosq->id);
338                 mosq->id = NULL;
339         }
340         if(mosq->username){
341                 _mosquitto_free(mosq->username);
342                 mosq->username = NULL;
343         }
344         if(mosq->password){
345                 _mosquitto_free(mosq->password);
346                 mosq->password = NULL;
347         }
348         if(mosq->host){
349                 _mosquitto_free(mosq->host);
350                 mosq->host = NULL;
351         }
352         if(mosq->bind_address){
353                 _mosquitto_free(mosq->bind_address);
354                 mosq->bind_address = NULL;
355         }
356
357         /* Out packet cleanup */
358         if(mosq->out_packet && !mosq->current_out_packet){
359                 mosq->current_out_packet = mosq->out_packet;
360                 mosq->out_packet = mosq->out_packet->next;
361         }
362         while(mosq->current_out_packet){
363                 packet = mosq->current_out_packet;
364                 /* Free data and reset values */
365                 mosq->current_out_packet = mosq->out_packet;
366                 if(mosq->out_packet){
367                         mosq->out_packet = mosq->out_packet->next;
368                 }
369
370                 _mosquitto_packet_cleanup(packet);
371                 _mosquitto_free(packet);
372         }
373
374         _mosquitto_packet_cleanup(&mosq->in_packet);
375         if(mosq->sockpairR != INVALID_SOCKET){
376                 COMPAT_CLOSE(mosq->sockpairR);
377                 mosq->sockpairR = INVALID_SOCKET;
378         }
379         if(mosq->sockpairW != INVALID_SOCKET){
380                 COMPAT_CLOSE(mosq->sockpairW);
381                 mosq->sockpairW = INVALID_SOCKET;
382         }
383 }
384
385 void mosquitto_destroy(struct mosquitto *mosq)
386 {
387         if(!mosq) return;
388
389         _mosquitto_destroy(mosq);
390         _mosquitto_free(mosq);
391 }
392
393 int mosquitto_socket(struct mosquitto *mosq)
394 {
395         if(!mosq) return MOSQ_ERR_INVAL;
396         return mosq->sock;
397 }
398
399 static int _mosquitto_connect_init(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address)
400 {
401         if(!mosq) return MOSQ_ERR_INVAL;
402         if(!host || port <= 0) return MOSQ_ERR_INVAL;
403
404         if(mosq->host) _mosquitto_free(mosq->host);
405         mosq->host = _mosquitto_strdup(host);
406         if(!mosq->host) return MOSQ_ERR_NOMEM;
407         mosq->port = port;
408
409         if(mosq->bind_address) _mosquitto_free(mosq->bind_address);
410         if(bind_address){
411                 mosq->bind_address = _mosquitto_strdup(bind_address);
412                 if(!mosq->bind_address) return MOSQ_ERR_NOMEM;
413         }
414
415         mosq->keepalive = keepalive;
416
417         if(_mosquitto_socketpair(&mosq->sockpairR, &mosq->sockpairW)){
418                 _mosquitto_log_printf(mosq, MOSQ_LOG_WARNING,
419                                 "Warning: Unable to open socket pair, outgoing publish commands may be delayed.");
420         }
421
422         return MOSQ_ERR_SUCCESS;
423 }
424
425 int mosquitto_connect(struct mosquitto *mosq, const char *host, int port, int keepalive)
426 {
427         return mosquitto_connect_bind(mosq, host, port, keepalive, NULL);
428 }
429
430 int mosquitto_connect_bind(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address)
431 {
432         int rc;
433         rc = _mosquitto_connect_init(mosq, host, port, keepalive, bind_address);
434         if(rc) return rc;
435
436         pthread_mutex_lock(&mosq->state_mutex);
437         mosq->state = mosq_cs_new;
438         pthread_mutex_unlock(&mosq->state_mutex);
439
440         return _mosquitto_reconnect(mosq, true);
441 }
442
443 int mosquitto_connect_async(struct mosquitto *mosq, const char *host, int port, int keepalive)
444 {
445         return mosquitto_connect_bind_async(mosq, host, port, keepalive, NULL);
446 }
447
448 int mosquitto_connect_bind_async(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address)
449 {
450         int rc = _mosquitto_connect_init(mosq, host, port, keepalive, bind_address);
451         if(rc) return rc;
452
453         pthread_mutex_lock(&mosq->state_mutex);
454         mosq->state = mosq_cs_connect_async;
455         pthread_mutex_unlock(&mosq->state_mutex);
456
457         return _mosquitto_reconnect(mosq, false);
458 }
459
460 int mosquitto_reconnect_async(struct mosquitto *mosq)
461 {
462         return _mosquitto_reconnect(mosq, false);
463 }
464
465 int mosquitto_reconnect(struct mosquitto *mosq)
466 {
467         return _mosquitto_reconnect(mosq, true);
468 }
469
470 static int _mosquitto_reconnect(struct mosquitto *mosq, bool blocking)
471 {
472         int rc;
473         struct _mosquitto_packet *packet;
474         if(!mosq) return MOSQ_ERR_INVAL;
475         if(!mosq->host || mosq->port <= 0) return MOSQ_ERR_INVAL;
476
477         pthread_mutex_lock(&mosq->state_mutex);
478         mosq->state = mosq_cs_new;
479         pthread_mutex_unlock(&mosq->state_mutex);
480
481         pthread_mutex_lock(&mosq->msgtime_mutex);
482         mosq->last_msg_in = mosquitto_time();
483         mosq->last_msg_out = mosquitto_time();
484         pthread_mutex_unlock(&mosq->msgtime_mutex);
485
486         mosq->ping_t = 0;
487
488         _mosquitto_packet_cleanup(&mosq->in_packet);
489                 
490         pthread_mutex_lock(&mosq->current_out_packet_mutex);
491         pthread_mutex_lock(&mosq->out_packet_mutex);
492
493         if(mosq->out_packet && !mosq->current_out_packet){
494                 mosq->current_out_packet = mosq->out_packet;
495                 mosq->out_packet = mosq->out_packet->next;
496         }
497
498         while(mosq->current_out_packet){
499                 packet = mosq->current_out_packet;
500                 /* Free data and reset values */
501                 mosq->current_out_packet = mosq->out_packet;
502                 if(mosq->out_packet){
503                         mosq->out_packet = mosq->out_packet->next;
504                 }
505
506                 _mosquitto_packet_cleanup(packet);
507                 _mosquitto_free(packet);
508         }
509         pthread_mutex_unlock(&mosq->out_packet_mutex);
510         pthread_mutex_unlock(&mosq->current_out_packet_mutex);
511
512         _mosquitto_messages_reconnect_reset(mosq);
513
514         rc = _mosquitto_socket_connect(mosq, mosq->host, mosq->port, mosq->bind_address, blocking);
515         if(rc){
516                 return rc;
517         }
518
519         return _mosquitto_send_connect(mosq, mosq->keepalive, mosq->clean_session);
520 }
521
522 int mosquitto_disconnect(struct mosquitto *mosq)
523 {
524         if(!mosq) return MOSQ_ERR_INVAL;
525
526         pthread_mutex_lock(&mosq->state_mutex);
527         mosq->state = mosq_cs_disconnecting;
528         pthread_mutex_unlock(&mosq->state_mutex);
529
530         if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
531         return _mosquitto_send_disconnect(mosq);
532 }
533
534 int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, const void *payload, int qos, bool retain)
535 {
536         struct mosquitto_message_all *message;
537         uint16_t local_mid;
538
539         if(!mosq || !topic || qos<0 || qos>2) return MOSQ_ERR_INVAL;
540         if(strlen(topic) == 0) return MOSQ_ERR_INVAL;
541         if(payloadlen < 0 || payloadlen > MQTT_MAX_PAYLOAD) return MOSQ_ERR_PAYLOAD_SIZE;
542
543         if(_mosquitto_topic_wildcard_len_check(topic) != MOSQ_ERR_SUCCESS){
544                 return MOSQ_ERR_INVAL;
545         }
546
547         local_mid = _mosquitto_mid_generate(mosq);
548         if(mid){
549                 *mid = local_mid;
550         }
551
552         if(qos == 0){
553                 return _mosquitto_send_publish(mosq, local_mid, topic, payloadlen, payload, qos, retain, false);
554         }else{
555                 message = _mosquitto_calloc(1, sizeof(struct mosquitto_message_all));
556                 if(!message) return MOSQ_ERR_NOMEM;
557
558                 message->next = NULL;
559                 message->timestamp = mosquitto_time();
560                 message->msg.mid = local_mid;
561                 message->msg.topic = _mosquitto_strdup(topic);
562                 if(!message->msg.topic){
563                         _mosquitto_message_cleanup(&message);
564                         return MOSQ_ERR_NOMEM;
565                 }
566                 if(payloadlen){
567                         message->msg.payloadlen = payloadlen;
568                         message->msg.payload = _mosquitto_malloc(payloadlen*sizeof(uint8_t));
569                         if(!message->msg.payload){
570                                 _mosquitto_message_cleanup(&message);
571                                 return MOSQ_ERR_NOMEM;
572                         }
573                         memcpy(message->msg.payload, payload, payloadlen*sizeof(uint8_t));
574                 }else{
575                         message->msg.payloadlen = 0;
576                         message->msg.payload = NULL;
577                 }
578                 message->msg.qos = qos;
579                 message->msg.retain = retain;
580                 message->dup = false;
581
582                 pthread_mutex_lock(&mosq->out_message_mutex);
583                 _mosquitto_message_queue(mosq, message, mosq_md_out);
584                 if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){
585                         mosq->inflight_messages++;
586                         if(qos == 1){
587                                 message->state = mosq_ms_wait_for_puback;
588                         }else if(qos == 2){
589                                 message->state = mosq_ms_wait_for_pubrec;
590                         }
591                         pthread_mutex_unlock(&mosq->out_message_mutex);
592                         return _mosquitto_send_publish(mosq, message->msg.mid, message->msg.topic, message->msg.payloadlen, message->msg.payload, message->msg.qos, message->msg.retain, message->dup);
593                 }else{
594                         message->state = mosq_ms_invalid;
595                         pthread_mutex_unlock(&mosq->out_message_mutex);
596                         return MOSQ_ERR_SUCCESS;
597                 }
598         }
599 }
600
601 int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const char *sub, int qos)
602 {
603         if(!mosq) return MOSQ_ERR_INVAL;
604         if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
605
606         if(_mosquitto_topic_wildcard_pos_check(sub)) return MOSQ_ERR_INVAL;
607
608         return _mosquitto_send_subscribe(mosq, mid, false, sub, qos);
609 }
610
611 int mosquitto_unsubscribe(struct mosquitto *mosq, int *mid, const char *sub)
612 {
613         if(!mosq) return MOSQ_ERR_INVAL;
614         if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
615
616         if(_mosquitto_topic_wildcard_pos_check(sub)) return MOSQ_ERR_INVAL;
617
618         return _mosquitto_send_unsubscribe(mosq, mid, false, sub);
619 }
620
621 int mosquitto_tls_set(struct mosquitto *mosq, const char *cafile, const char *capath, const char *certfile, const char *keyfile, int (*pw_callback)(char *buf, int size, int rwflag, void *userdata))
622 {
623 #ifdef WITH_TLS
624         FILE *fptr;
625
626         if(!mosq || (!cafile && !capath) || (certfile && !keyfile) || (!certfile && keyfile)) return MOSQ_ERR_INVAL;
627
628         if(cafile){
629                 fptr = _mosquitto_fopen(cafile, "rt");
630                 if(fptr){
631                         fclose(fptr);
632                 }else{
633                         return MOSQ_ERR_INVAL;
634                 }
635                 mosq->tls_cafile = _mosquitto_strdup(cafile);
636
637                 if(!mosq->tls_cafile){
638                         return MOSQ_ERR_NOMEM;
639                 }
640         }else if(mosq->tls_cafile){
641                 _mosquitto_free(mosq->tls_cafile);
642                 mosq->tls_cafile = NULL;
643         }
644
645         if(capath){
646                 mosq->tls_capath = _mosquitto_strdup(capath);
647                 if(!mosq->tls_capath){
648                         return MOSQ_ERR_NOMEM;
649                 }
650         }else if(mosq->tls_capath){
651                 _mosquitto_free(mosq->tls_capath);
652                 mosq->tls_capath = NULL;
653         }
654
655         if(certfile){
656                 fptr = _mosquitto_fopen(certfile, "rt");
657                 if(fptr){
658                         fclose(fptr);
659                 }else{
660                         if(mosq->tls_cafile){
661                                 _mosquitto_free(mosq->tls_cafile);
662                                 mosq->tls_cafile = NULL;
663                         }
664                         if(mosq->tls_capath){
665                                 _mosquitto_free(mosq->tls_capath);
666                                 mosq->tls_capath = NULL;
667                         }
668                         return MOSQ_ERR_INVAL;
669                 }
670                 mosq->tls_certfile = _mosquitto_strdup(certfile);
671                 if(!mosq->tls_certfile){
672                         return MOSQ_ERR_NOMEM;
673                 }
674         }else{
675                 if(mosq->tls_certfile) _mosquitto_free(mosq->tls_certfile);
676                 mosq->tls_certfile = NULL;
677         }
678
679         if(keyfile){
680                 fptr = _mosquitto_fopen(keyfile, "rt");
681                 if(fptr){
682                         fclose(fptr);
683                 }else{
684                         if(mosq->tls_cafile){
685                                 _mosquitto_free(mosq->tls_cafile);
686                                 mosq->tls_cafile = NULL;
687                         }
688                         if(mosq->tls_capath){
689                                 _mosquitto_free(mosq->tls_capath);
690                                 mosq->tls_capath = NULL;
691                         }
692                         if(mosq->tls_certfile){
693                                 _mosquitto_free(mosq->tls_certfile);
694                                 mosq->tls_certfile = NULL;
695                         }
696                         return MOSQ_ERR_INVAL;
697                 }
698                 mosq->tls_keyfile = _mosquitto_strdup(keyfile);
699                 if(!mosq->tls_keyfile){
700                         return MOSQ_ERR_NOMEM;
701                 }
702         }else{
703                 if(mosq->tls_keyfile) _mosquitto_free(mosq->tls_keyfile);
704                 mosq->tls_keyfile = NULL;
705         }
706
707         mosq->tls_pw_callback = pw_callback;
708
709
710         return MOSQ_ERR_SUCCESS;
711 #else
712         return MOSQ_ERR_NOT_SUPPORTED;
713
714 #endif
715 }
716
717 int mosquitto_tls_opts_set(struct mosquitto *mosq, int cert_reqs, const char *tls_version, const char *ciphers)
718 {
719 #ifdef WITH_TLS
720         if(!mosq) return MOSQ_ERR_INVAL;
721
722         mosq->tls_cert_reqs = cert_reqs;
723         if(tls_version){
724 #if OPENSSL_VERSION_NUMBER >= 0x10001000L
725                 if(!strcasecmp(tls_version, "tlsv1.2")
726                                 || !strcasecmp(tls_version, "tlsv1.1")
727                                 || !strcasecmp(tls_version, "tlsv1")){
728
729                         mosq->tls_version = _mosquitto_strdup(tls_version);
730                         if(!mosq->tls_version) return MOSQ_ERR_NOMEM;
731                 }else{
732                         return MOSQ_ERR_INVAL;
733                 }
734 #else
735                 if(!strcasecmp(tls_version, "tlsv1")){
736                         mosq->tls_version = _mosquitto_strdup(tls_version);
737                         if(!mosq->tls_version) return MOSQ_ERR_NOMEM;
738                 }else{
739                         return MOSQ_ERR_INVAL;
740                 }
741 #endif
742         }else{
743 #if OPENSSL_VERSION_NUMBER >= 0x10001000L
744                 mosq->tls_version = _mosquitto_strdup("tlsv1.2");
745 #else
746                 mosq->tls_version = _mosquitto_strdup("tlsv1");
747 #endif
748                 if(!mosq->tls_version) return MOSQ_ERR_NOMEM;
749         }
750         if(ciphers){
751                 mosq->tls_ciphers = _mosquitto_strdup(ciphers);
752                 if(!mosq->tls_ciphers) return MOSQ_ERR_NOMEM;
753         }else{
754                 mosq->tls_ciphers = NULL;
755         }
756
757
758         return MOSQ_ERR_SUCCESS;
759 #else
760         return MOSQ_ERR_NOT_SUPPORTED;
761
762 #endif
763 }
764
765
766 int mosquitto_tls_insecure_set(struct mosquitto *mosq, bool value)
767 {
768 #ifdef WITH_TLS
769         if(!mosq) return MOSQ_ERR_INVAL;
770         mosq->tls_insecure = value;
771         return MOSQ_ERR_SUCCESS;
772 #else
773         return MOSQ_ERR_NOT_SUPPORTED;
774 #endif
775 }
776
777
778 int mosquitto_tls_psk_set(struct mosquitto *mosq, const char *psk, const char *identity, const char *ciphers)
779 {
780 #ifdef REAL_WITH_TLS_PSK
781         if(!mosq || !psk || !identity) return MOSQ_ERR_INVAL;
782
783         /* Check for hex only digits */
784         if(strspn(psk, "0123456789abcdefABCDEF") < strlen(psk)){
785                 return MOSQ_ERR_INVAL;
786         }
787         mosq->tls_psk = _mosquitto_strdup(psk);
788         if(!mosq->tls_psk) return MOSQ_ERR_NOMEM;
789
790         mosq->tls_psk_identity = _mosquitto_strdup(identity);
791         if(!mosq->tls_psk_identity){
792                 _mosquitto_free(mosq->tls_psk);
793                 return MOSQ_ERR_NOMEM;
794         }
795         if(ciphers){
796                 mosq->tls_ciphers = _mosquitto_strdup(ciphers);
797                 if(!mosq->tls_ciphers) return MOSQ_ERR_NOMEM;
798         }else{
799                 mosq->tls_ciphers = NULL;
800         }
801
802         return MOSQ_ERR_SUCCESS;
803 #else
804         return MOSQ_ERR_NOT_SUPPORTED;
805 #endif
806 }
807
808
809 int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
810 {
811 #ifdef HAVE_PSELECT
812         struct timespec local_timeout;
813 #else
814         struct timeval local_timeout;
815 #endif
816         fd_set readfds, writefds;
817         int fdcount;
818         int rc;
819         char pairbuf;
820         int maxfd = 0;
821
822         if(!mosq || max_packets < 1) return MOSQ_ERR_INVAL;
823
824         FD_ZERO(&readfds);
825         FD_ZERO(&writefds);
826         if(mosq->sock != INVALID_SOCKET){
827                 maxfd = mosq->sock;
828                 FD_SET(mosq->sock, &readfds);
829                 pthread_mutex_lock(&mosq->current_out_packet_mutex);
830                 pthread_mutex_lock(&mosq->out_packet_mutex);
831                 if(mosq->out_packet || mosq->current_out_packet){
832                         FD_SET(mosq->sock, &writefds);
833 #ifdef WITH_TLS
834                 }else if(mosq->ssl && mosq->want_write){
835                         FD_SET(mosq->sock, &writefds);
836 #endif
837                 }
838                 pthread_mutex_unlock(&mosq->out_packet_mutex);
839                 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
840         }else{
841 #ifdef WITH_SRV
842                 if(mosq->achan){
843                         pthread_mutex_lock(&mosq->state_mutex);
844                         if(mosq->state == mosq_cs_connect_srv){
845                                 rc = ares_fds(mosq->achan, &readfds, &writefds);
846                                 if(rc > maxfd){
847                                         maxfd = rc;
848                                 }
849                         }else{
850                                 return MOSQ_ERR_NO_CONN;
851                         }
852                         pthread_mutex_unlock(&mosq->state_mutex);
853                 }
854 #else
855                 return MOSQ_ERR_NO_CONN;
856 #endif
857         }
858         if(mosq->sockpairR != INVALID_SOCKET){
859                 /* sockpairR is used to break out of select() before the timeout, on a
860                  * call to publish() etc. */
861                 FD_SET(mosq->sockpairR, &readfds);
862                 if(mosq->sockpairR > maxfd){
863                         maxfd = mosq->sockpairR;
864                 }
865         }
866
867         if(timeout >= 0){
868                 local_timeout.tv_sec = timeout/1000;
869 #ifdef HAVE_PSELECT
870                 local_timeout.tv_nsec = (timeout-local_timeout.tv_sec*1000)*1e6;
871 #else
872                 local_timeout.tv_usec = (timeout-local_timeout.tv_sec*1000)*1000;
873 #endif
874         }else{
875                 local_timeout.tv_sec = 1;
876 #ifdef HAVE_PSELECT
877                 local_timeout.tv_nsec = 0;
878 #else
879                 local_timeout.tv_usec = 0;
880 #endif
881         }
882
883 #ifdef HAVE_PSELECT
884         fdcount = pselect(maxfd+1, &readfds, &writefds, NULL, &local_timeout, NULL);
885 #else
886         fdcount = select(maxfd+1, &readfds, &writefds, NULL, &local_timeout);
887 #endif
888         if(fdcount == -1){
889 #ifdef WIN32
890                 errno = WSAGetLastError();
891 #endif
892                 if(errno == EINTR){
893                         return MOSQ_ERR_SUCCESS;
894                 }else{
895                         return MOSQ_ERR_ERRNO;
896                 }
897         }else{
898                 if(mosq->sock != INVALID_SOCKET){
899                         if(FD_ISSET(mosq->sock, &readfds)){
900                                 rc = mosquitto_loop_read(mosq, max_packets);
901                                 if(rc || mosq->sock == INVALID_SOCKET){
902                                         return rc;
903                                 }
904                         }
905                         if(mosq->sockpairR >= 0 && FD_ISSET(mosq->sockpairR, &readfds)){
906 #ifndef WIN32
907                                 if(read(mosq->sockpairR, &pairbuf, 1) == 0){
908                                 }
909 #else
910                                 recv(mosq->sockpairR, &pairbuf, 1, 0);
911 #endif
912                                 /* Fake write possible, to stimulate output write even though
913                                  * we didn't ask for it, because at that point the publish or
914                                  * other command wasn't present. */
915                                 FD_SET(mosq->sock, &writefds);
916                         }
917                         if(FD_ISSET(mosq->sock, &writefds)){
918                                 rc = mosquitto_loop_write(mosq, max_packets);
919                                 if(rc || mosq->sock == INVALID_SOCKET){
920                                         return rc;
921                                 }
922                         }
923                 }
924 #ifdef WITH_SRV
925                 if(mosq->achan){
926                         ares_process(mosq->achan, &readfds, &writefds);
927                 }
928 #endif
929         }
930         return mosquitto_loop_misc(mosq);
931 }
932
933 int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
934 {
935         int run = 1;
936         int rc;
937         unsigned int reconnects = 0;
938         unsigned long reconnect_delay;
939
940         if(!mosq) return MOSQ_ERR_INVAL;
941
942         if(mosq->state == mosq_cs_connect_async){
943                 mosquitto_reconnect(mosq);
944         }
945
946         while(run){
947                 do{
948                         rc = mosquitto_loop(mosq, timeout, max_packets);
949                         if (reconnects !=0 && rc == MOSQ_ERR_SUCCESS){
950                                 reconnects = 0;
951                         }
952                 }while(rc == MOSQ_ERR_SUCCESS);
953                 if(errno == EPROTO){
954                         return rc;
955                 }
956                 pthread_mutex_lock(&mosq->state_mutex);
957                 if(mosq->state == mosq_cs_disconnecting){
958                         run = 0;
959                         pthread_mutex_unlock(&mosq->state_mutex);
960                 }else{
961                         pthread_mutex_unlock(&mosq->state_mutex);
962
963                         if(mosq->reconnect_delay > 0 && mosq->reconnect_exponential_backoff){
964                                 reconnect_delay = mosq->reconnect_delay*reconnects*reconnects;
965                         }else{
966                                 reconnect_delay = mosq->reconnect_delay;
967                         }
968
969                         if(reconnect_delay > mosq->reconnect_delay_max){
970                                 reconnect_delay = mosq->reconnect_delay_max;
971                         }else{
972                                 reconnects++;
973                         }
974                                 
975 #ifdef WIN32
976                         Sleep(reconnect_delay*1000);
977 #else
978                         sleep(reconnect_delay);
979 #endif
980
981                         pthread_mutex_lock(&mosq->state_mutex);
982                         if(mosq->state == mosq_cs_disconnecting){
983                                 run = 0;
984                                 pthread_mutex_unlock(&mosq->state_mutex);
985                         }else{
986                                 pthread_mutex_unlock(&mosq->state_mutex);
987                                 mosquitto_reconnect(mosq);
988                         }
989                 }
990         }
991         return rc;
992 }
993
994 int mosquitto_loop_misc(struct mosquitto *mosq)
995 {
996         time_t now;
997         int rc;
998
999         if(!mosq) return MOSQ_ERR_INVAL;
1000         if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
1001
1002         now = mosquitto_time();
1003
1004         _mosquitto_check_keepalive(mosq);
1005         if(mosq->last_retry_check+1 < now){
1006                 _mosquitto_message_retry_check(mosq);
1007                 mosq->last_retry_check = now;
1008         }
1009         if(mosq->ping_t && now - mosq->ping_t >= mosq->keepalive){
1010                 /* mosq->ping_t != 0 means we are waiting for a pingresp.
1011                  * This hasn't happened in the keepalive time so we should disconnect.
1012                  */
1013                 _mosquitto_socket_close(mosq);
1014                 pthread_mutex_lock(&mosq->state_mutex);
1015                 if(mosq->state == mosq_cs_disconnecting){
1016                         rc = MOSQ_ERR_SUCCESS;
1017                 }else{
1018                         rc = 1;
1019                 }
1020                 pthread_mutex_unlock(&mosq->state_mutex);
1021                 pthread_mutex_lock(&mosq->callback_mutex);
1022                 if(mosq->on_disconnect){
1023                         mosq->in_callback = true;
1024                         mosq->on_disconnect(mosq, mosq->userdata, rc);
1025                         mosq->in_callback = false;
1026                 }
1027                 pthread_mutex_unlock(&mosq->callback_mutex);
1028                 return MOSQ_ERR_CONN_LOST;
1029         }
1030         return MOSQ_ERR_SUCCESS;
1031 }
1032
1033 static int _mosquitto_loop_rc_handle(struct mosquitto *mosq, int rc)
1034 {
1035         if(rc){
1036                 _mosquitto_socket_close(mosq);
1037                 pthread_mutex_lock(&mosq->state_mutex);
1038                 if(mosq->state == mosq_cs_disconnecting){
1039                         rc = MOSQ_ERR_SUCCESS;
1040                 }
1041                 pthread_mutex_unlock(&mosq->state_mutex);
1042                 pthread_mutex_lock(&mosq->callback_mutex);
1043                 if(mosq->on_disconnect){
1044                         mosq->in_callback = true;
1045                         mosq->on_disconnect(mosq, mosq->userdata, rc);
1046                         mosq->in_callback = false;
1047                 }
1048                 pthread_mutex_unlock(&mosq->callback_mutex);
1049                 return rc;
1050         }
1051         return rc;
1052 }
1053
1054 int mosquitto_loop_read(struct mosquitto *mosq, int max_packets)
1055 {
1056         int rc;
1057         int i;
1058         if(max_packets < 1) return MOSQ_ERR_INVAL;
1059
1060         pthread_mutex_lock(&mosq->out_message_mutex);
1061         max_packets = mosq->out_queue_len;
1062         pthread_mutex_unlock(&mosq->out_message_mutex);
1063
1064         pthread_mutex_lock(&mosq->in_message_mutex);
1065         max_packets += mosq->in_queue_len;
1066         pthread_mutex_unlock(&mosq->in_message_mutex);
1067
1068         if(max_packets < 1) max_packets = 1;
1069         /* Queue len here tells us how many messages are awaiting processing and
1070          * have QoS > 0. We should try to deal with that many in this loop in order
1071          * to keep up. */
1072         for(i=0; i<max_packets; i++){
1073                 rc = _mosquitto_packet_read(mosq);
1074                 if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
1075                         return _mosquitto_loop_rc_handle(mosq, rc);
1076                 }
1077         }
1078         return rc;
1079 }
1080
1081 int mosquitto_loop_write(struct mosquitto *mosq, int max_packets)
1082 {
1083         int rc;
1084         int i;
1085         if(max_packets < 1) return MOSQ_ERR_INVAL;
1086
1087         pthread_mutex_lock(&mosq->out_message_mutex);
1088         max_packets = mosq->out_queue_len;
1089         pthread_mutex_unlock(&mosq->out_message_mutex);
1090
1091         pthread_mutex_lock(&mosq->in_message_mutex);
1092         max_packets += mosq->in_queue_len;
1093         pthread_mutex_unlock(&mosq->in_message_mutex);
1094
1095         if(max_packets < 1) max_packets = 1;
1096         /* Queue len here tells us how many messages are awaiting processing and
1097          * have QoS > 0. We should try to deal with that many in this loop in order
1098          * to keep up. */
1099         for(i=0; i<max_packets; i++){
1100                 rc = _mosquitto_packet_write(mosq);
1101                 if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
1102                         return _mosquitto_loop_rc_handle(mosq, rc);
1103                 }
1104         }
1105         return rc;
1106 }
1107
1108 bool mosquitto_want_write(struct mosquitto *mosq)
1109 {
1110         if(mosq->out_packet || mosq->current_out_packet){
1111                 return true;
1112 #ifdef WITH_TLS
1113         }else if(mosq->ssl && mosq->want_write){
1114                 return true;
1115 #endif
1116         }else{
1117                 return false;
1118         }
1119 }
1120
1121 void mosquitto_connect_callback_set(struct mosquitto *mosq, void (*on_connect)(struct mosquitto *, void *, int))
1122 {
1123         pthread_mutex_lock(&mosq->callback_mutex);
1124         mosq->on_connect = on_connect;
1125         pthread_mutex_unlock(&mosq->callback_mutex);
1126 }
1127
1128 void mosquitto_disconnect_callback_set(struct mosquitto *mosq, void (*on_disconnect)(struct mosquitto *, void *, int))
1129 {
1130         pthread_mutex_lock(&mosq->callback_mutex);
1131         mosq->on_disconnect = on_disconnect;
1132         pthread_mutex_unlock(&mosq->callback_mutex);
1133 }
1134
1135 void mosquitto_publish_callback_set(struct mosquitto *mosq, void (*on_publish)(struct mosquitto *, void *, int))
1136 {
1137         pthread_mutex_lock(&mosq->callback_mutex);
1138         mosq->on_publish = on_publish;
1139         pthread_mutex_unlock(&mosq->callback_mutex);
1140 }
1141
1142 void mosquitto_message_callback_set(struct mosquitto *mosq, void (*on_message)(struct mosquitto *, void *, const struct mosquitto_message *))
1143 {
1144         pthread_mutex_lock(&mosq->callback_mutex);
1145         mosq->on_message = on_message;
1146         pthread_mutex_unlock(&mosq->callback_mutex);
1147 }
1148
1149 void mosquitto_subscribe_callback_set(struct mosquitto *mosq, void (*on_subscribe)(struct mosquitto *, void *, int, int, const int *))
1150 {
1151         pthread_mutex_lock(&mosq->callback_mutex);
1152         mosq->on_subscribe = on_subscribe;
1153         pthread_mutex_unlock(&mosq->callback_mutex);
1154 }
1155
1156 void mosquitto_unsubscribe_callback_set(struct mosquitto *mosq, void (*on_unsubscribe)(struct mosquitto *, void *, int))
1157 {
1158         pthread_mutex_lock(&mosq->callback_mutex);
1159         mosq->on_unsubscribe = on_unsubscribe;
1160         pthread_mutex_unlock(&mosq->callback_mutex);
1161 }
1162
1163 void mosquitto_log_callback_set(struct mosquitto *mosq, void (*on_log)(struct mosquitto *, void *, int, const char *))
1164 {
1165         pthread_mutex_lock(&mosq->log_callback_mutex);
1166         mosq->on_log = on_log;
1167         pthread_mutex_unlock(&mosq->log_callback_mutex);
1168 }
1169
1170 void mosquitto_user_data_set(struct mosquitto *mosq, void *userdata)
1171 {
1172         if(mosq){
1173                 mosq->userdata = userdata;
1174         }
1175 }
1176
1177 const char *mosquitto_strerror(int mosq_errno)
1178 {
1179         switch(mosq_errno){
1180                 case MOSQ_ERR_SUCCESS:
1181                         return "No error.";
1182                 case MOSQ_ERR_NOMEM:
1183                         return "Out of memory.";
1184                 case MOSQ_ERR_PROTOCOL:
1185                         return "A network protocol error occurred when communicating with the broker.";
1186                 case MOSQ_ERR_INVAL:
1187                         return "Invalid function arguments provided.";
1188                 case MOSQ_ERR_NO_CONN:
1189                         return "The client is not currently connected.";
1190                 case MOSQ_ERR_CONN_REFUSED:
1191                         return "The connection was refused.";
1192                 case MOSQ_ERR_NOT_FOUND:
1193                         return "Message not found (internal error).";
1194                 case MOSQ_ERR_CONN_LOST:
1195                         return "The connection was lost.";
1196                 case MOSQ_ERR_TLS:
1197                         return "A TLS error occurred.";
1198                 case MOSQ_ERR_PAYLOAD_SIZE:
1199                         return "Payload too large.";
1200                 case MOSQ_ERR_NOT_SUPPORTED:
1201                         return "This feature is not supported.";
1202                 case MOSQ_ERR_AUTH:
1203                         return "Authorisation failed.";
1204                 case MOSQ_ERR_ACL_DENIED:
1205                         return "Access denied by ACL.";
1206                 case MOSQ_ERR_UNKNOWN:
1207                         return "Unknown error.";
1208                 case MOSQ_ERR_ERRNO:
1209                         return strerror(errno);
1210                 default:
1211                         return "Unknown error.";
1212         }
1213 }
1214
1215 const char *mosquitto_connack_string(int connack_code)
1216 {
1217         switch(connack_code){
1218                 case 0:
1219                         return "Connection Accepted.";
1220                 case 1:
1221                         return "Connection Refused: unacceptable protocol version.";
1222                 case 2:
1223                         return "Connection Refused: identifier rejected.";
1224                 case 3:
1225                         return "Connection Refused: broker unavailable.";
1226                 case 4:
1227                         return "Connection Refused: bad user name or password.";
1228                 case 5:
1229                         return "Connection Refused: not authorised.";
1230                 default:
1231                         return "Connection Refused: unknown reason.";
1232         }
1233 }
1234
1235 int mosquitto_sub_topic_tokenise(const char *subtopic, char ***topics, int *count)
1236 {
1237         int len;
1238         int hier_count = 1;
1239         int start, stop;
1240         int hier;
1241         int tlen;
1242         int i, j;
1243
1244         if(!subtopic || !topics || !count) return MOSQ_ERR_INVAL;
1245
1246         len = strlen(subtopic);
1247
1248         for(i=0; i<len; i++){
1249                 if(subtopic[i] == '/'){
1250                         if(i > len-1){
1251                                 /* Separator at end of line */
1252                         }else{
1253                                 hier_count++;
1254                         }
1255                 }
1256         }
1257
1258         (*topics) = _mosquitto_calloc(hier_count, sizeof(char *));
1259         if(!(*topics)) return MOSQ_ERR_NOMEM;
1260
1261         start = 0;
1262         stop = 0;
1263         hier = 0;
1264
1265         for(i=0; i<len+1; i++){
1266                 if(subtopic[i] == '/' || subtopic[i] == '\0'){
1267                         stop = i;
1268                         if(start != stop){
1269                                 tlen = stop-start + 1;
1270                                 (*topics)[hier] = _mosquitto_calloc(tlen, sizeof(char));
1271                                 if(!(*topics)[hier]){
1272                                         for(i=0; i<hier_count; i++){
1273                                                 if((*topics)[hier]){
1274                                                         _mosquitto_free((*topics)[hier]);
1275                                                 }
1276                                         }
1277                                         _mosquitto_free((*topics));
1278                                         return MOSQ_ERR_NOMEM;
1279                                 }
1280                                 for(j=start; j<stop; j++){
1281                                         (*topics)[hier][j-start] = subtopic[j];
1282                                 }
1283                         }
1284                         start = i+1;
1285                         hier++;
1286                 }
1287         }
1288
1289         *count = hier_count;
1290
1291         return MOSQ_ERR_SUCCESS;
1292 }
1293
1294 int mosquitto_sub_topic_tokens_free(char ***topics, int count)
1295 {
1296         int i;
1297
1298         if(!topics || !(*topics) || count<1) return MOSQ_ERR_INVAL;
1299
1300         for(i=0; i<count; i++){
1301                 if((*topics)[i]) _mosquitto_free((*topics)[i]);
1302         }
1303         _mosquitto_free(*topics);
1304
1305         return MOSQ_ERR_SUCCESS;
1306 }
1307