Imported Upstream version 0.9.2
[platform/upstream/iotivity.git] / service / protocol-plugin / plugins / mqtt-light / lib / mosquitto.c
1 /*
2 Copyright (c) 2010-2013 Roger Light <roger@atchoo.org>
3 All rights reserved.
4
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
7
8 1. Redistributions of source code must retain the above copyright notice,
9    this list of conditions and the following disclaimer.
10 2. Redistributions in binary form must reproduce the above copyright
11    notice, this list of conditions and the following disclaimer in the
12    documentation and/or other materials provided with the distribution.
13 3. Neither the name of mosquitto nor the names of its
14    contributors may be used to endorse or promote products derived from
15    this software without specific prior written permission.
16
17 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include <assert.h>
31 #include <errno.h>
32 #include <signal.h>
33 #include <stdio.h>
34 #include <string.h>
35 #ifndef WIN32
36 #include <sys/select.h>
37 #include <sys/time.h>
38 #include <unistd.h>
39 #else
40 #include <winsock2.h>
41 #include <windows.h>
42 typedef int ssize_t;
43 #endif
44
45 #include "mosquitto.h"
46 #include "mosquitto_internal.h"
47 #include "logging_mosq.h"
48 #include "messages_mosq.h"
49 #include "memory_mosq.h"
50 #include "mqtt3_protocol.h"
51 #include "net_mosq.h"
52 #include "read_handle.h"
53 #include "send_mosq.h"
54 #include "time_mosq.h"
55 #include "tls_mosq.h"
56 #include "util_mosq.h"
57 #include "will_mosq.h"
58
59 #if !defined(WIN32) && !defined(__SYMBIAN32__)
60 #define HAVE_PSELECT
61 #endif
62
63 void _mosquitto_destroy(struct mosquitto *mosq);
64 static int _mosquitto_reconnect(struct mosquitto *mosq, bool blocking);
65 static int _mosquitto_connect_init(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address);
66
67 int mosquitto_lib_version(int *major, int *minor, int *revision)
68 {
69         if(major) *major = LIBMOSQUITTO_MAJOR;
70         if(minor) *minor = LIBMOSQUITTO_MINOR;
71         if(revision) *revision = LIBMOSQUITTO_REVISION;
72         return LIBMOSQUITTO_VERSION_NUMBER;
73 }
74
75 int mosquitto_lib_init(void)
76 {
77 #ifdef WIN32
78         srand(GetTickCount());
79 #else
80         struct timeval tv;
81
82         gettimeofday(&tv, NULL);
83         srand(tv.tv_sec*1000 + tv.tv_usec/1000);
84 #endif
85
86         _mosquitto_net_init();
87
88         return MOSQ_ERR_SUCCESS;
89 }
90
91 int mosquitto_lib_cleanup(void)
92 {
93         _mosquitto_net_cleanup();
94
95         return MOSQ_ERR_SUCCESS;
96 }
97
98 struct mosquitto *mosquitto_new(const char *id, bool clean_session, void *userdata)
99 {
100         struct mosquitto *mosq = NULL;
101         int rc;
102
103         if(clean_session == false && id == NULL){
104                 errno = EINVAL;
105                 return NULL;
106         }
107
108 #ifndef WIN32
109         signal(SIGPIPE, SIG_IGN);
110 #endif
111
112         mosq = (struct mosquitto *)_mosquitto_calloc(1, sizeof(struct mosquitto));
113         if(mosq){
114                 mosq->sock = INVALID_SOCKET;
115                 mosq->sockpairR = INVALID_SOCKET;
116                 mosq->sockpairW = INVALID_SOCKET;
117 #ifdef WITH_THREADING
118                 mosq->thread_id = pthread_self();
119 #endif
120                 rc = mosquitto_reinitialise(mosq, id, clean_session, userdata);
121                 if(rc){
122                         mosquitto_destroy(mosq);
123                         if(rc == MOSQ_ERR_INVAL){
124                                 errno = EINVAL;
125                         }else if(rc == MOSQ_ERR_NOMEM){
126                                 errno = ENOMEM;
127                         }
128                         return NULL;
129                 }
130         }else{
131                 errno = ENOMEM;
132         }
133         return mosq;
134 }
135
136 int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_session, void *userdata)
137 {
138         int i;
139
140         if(!mosq) return MOSQ_ERR_INVAL;
141
142         if(clean_session == false && id == NULL){
143                 return MOSQ_ERR_INVAL;
144         }
145
146         _mosquitto_destroy(mosq);
147         memset(mosq, 0, sizeof(struct mosquitto));
148
149         if(userdata){
150                 mosq->userdata = userdata;
151         }else{
152                 mosq->userdata = mosq;
153         }
154         mosq->sock = INVALID_SOCKET;
155         mosq->sockpairR = INVALID_SOCKET;
156         mosq->sockpairW = INVALID_SOCKET;
157         mosq->keepalive = 60;
158         mosq->message_retry = 20;
159         mosq->last_retry_check = 0;
160         mosq->clean_session = clean_session;
161         if(id){
162                 if(strlen(id) == 0){
163                         return MOSQ_ERR_INVAL;
164                 }
165                 mosq->id = _mosquitto_strdup(id);
166         }else{
167                 mosq->id = (char *)_mosquitto_calloc(24, sizeof(char));
168                 if(!mosq->id){
169                         return MOSQ_ERR_NOMEM;
170                 }
171                 mosq->id[0] = 'm';
172                 mosq->id[1] = 'o';
173                 mosq->id[2] = 's';
174                 mosq->id[3] = 'q';
175                 mosq->id[4] = '/';
176
177                 for(i=5; i<23; i++){
178                         mosq->id[i] = (rand()%73)+48;
179                 }
180         }
181         mosq->in_packet.payload = NULL;
182         _mosquitto_packet_cleanup(&mosq->in_packet);
183         mosq->out_packet = NULL;
184         mosq->current_out_packet = NULL;
185         mosq->last_msg_in = mosquitto_time();
186         mosq->last_msg_out = mosquitto_time();
187         mosq->ping_t = 0;
188         mosq->last_mid = 0;
189         mosq->state = mosq_cs_new;
190         mosq->in_messages = NULL;
191         mosq->in_messages_last = NULL;
192         mosq->out_messages = NULL;
193         mosq->out_messages_last = NULL;
194         mosq->max_inflight_messages = 20;
195         mosq->will = NULL;
196         mosq->on_connect = NULL;
197         mosq->on_publish = NULL;
198         mosq->on_message = NULL;
199         mosq->on_subscribe = NULL;
200         mosq->on_unsubscribe = NULL;
201         mosq->host = NULL;
202         mosq->port = 1883;
203         mosq->in_callback = false;
204         mosq->in_queue_len = 0;
205         mosq->out_queue_len = 0;
206         mosq->reconnect_delay = 1;
207         mosq->reconnect_delay_max = 1;
208         mosq->reconnect_exponential_backoff = false;
209         mosq->threaded = false;
210 #ifdef WITH_TLS
211         mosq->ssl = NULL;
212         mosq->tls_cert_reqs = SSL_VERIFY_PEER;
213         mosq->tls_insecure = false;
214 #endif
215 #ifdef WITH_THREADING
216         pthread_mutex_init(&mosq->callback_mutex, NULL);
217         pthread_mutex_init(&mosq->log_callback_mutex, NULL);
218         pthread_mutex_init(&mosq->state_mutex, NULL);
219         pthread_mutex_init(&mosq->out_packet_mutex, NULL);
220         pthread_mutex_init(&mosq->current_out_packet_mutex, NULL);
221         pthread_mutex_init(&mosq->msgtime_mutex, NULL);
222         pthread_mutex_init(&mosq->in_message_mutex, NULL);
223         pthread_mutex_init(&mosq->out_message_mutex, NULL);
224         mosq->thread_id = pthread_self();
225 #endif
226
227         return MOSQ_ERR_SUCCESS;
228 }
229
230 int mosquitto_will_set(struct mosquitto *mosq, const char *topic, int payloadlen, const void *payload, int qos, bool retain)
231 {
232         if(!mosq) return MOSQ_ERR_INVAL;
233         return _mosquitto_will_set(mosq, topic, payloadlen, payload, qos, retain);
234 }
235
236 int mosquitto_will_clear(struct mosquitto *mosq)
237 {
238         if(!mosq) return MOSQ_ERR_INVAL;
239         return _mosquitto_will_clear(mosq);
240 }
241
242 int mosquitto_username_pw_set(struct mosquitto *mosq, const char *username, const char *password)
243 {
244         if(!mosq) return MOSQ_ERR_INVAL;
245
246         if(mosq->username){
247                 _mosquitto_free(mosq->username);
248                 mosq->username = NULL;
249         }
250         if(mosq->password){
251                 _mosquitto_free(mosq->password);
252                 mosq->password = NULL;
253         }
254
255         if(username){
256                 mosq->username = _mosquitto_strdup(username);
257                 if(!mosq->username) return MOSQ_ERR_NOMEM;
258                 if(password){
259                         mosq->password = _mosquitto_strdup(password);
260                         if(!mosq->password){
261                                 _mosquitto_free(mosq->username);
262                                 mosq->username = NULL;
263                                 return MOSQ_ERR_NOMEM;
264                         }
265                 }
266         }
267         return MOSQ_ERR_SUCCESS;
268 }
269
270 int mosquitto_reconnect_delay_set(struct mosquitto *mosq, unsigned int reconnect_delay, unsigned int reconnect_delay_max, bool reconnect_exponential_backoff)
271 {
272         if(!mosq) return MOSQ_ERR_INVAL;
273         
274         mosq->reconnect_delay = reconnect_delay;
275         mosq->reconnect_delay_max = reconnect_delay_max;
276         mosq->reconnect_exponential_backoff = reconnect_exponential_backoff;
277         
278         return MOSQ_ERR_SUCCESS;
279         
280 }
281
282 void _mosquitto_destroy(struct mosquitto *mosq)
283 {
284         struct _mosquitto_packet *packet;
285         if(!mosq) return;
286
287 #ifdef WITH_THREADING
288         if(mosq->threaded && !pthread_equal(mosq->thread_id, pthread_self())){
289                 pthread_cancel(mosq->thread_id);
290                 pthread_join(mosq->thread_id, NULL);
291                 mosq->threaded = false;
292         }
293
294         if(mosq->id){
295                 /* If mosq->id is not NULL then the client has already been initialised
296                  * and so the mutexes need destroying. If mosq->id is NULL, the mutexes
297                  * haven't been initialised. */
298                 pthread_mutex_destroy(&mosq->callback_mutex);
299                 pthread_mutex_destroy(&mosq->log_callback_mutex);
300                 pthread_mutex_destroy(&mosq->state_mutex);
301                 pthread_mutex_destroy(&mosq->out_packet_mutex);
302                 pthread_mutex_destroy(&mosq->current_out_packet_mutex);
303                 pthread_mutex_destroy(&mosq->msgtime_mutex);
304                 pthread_mutex_destroy(&mosq->in_message_mutex);
305                 pthread_mutex_destroy(&mosq->out_message_mutex);
306         }
307 #endif
308         if(mosq->sock != INVALID_SOCKET){
309                 _mosquitto_socket_close(mosq);
310         }
311         _mosquitto_message_cleanup_all(mosq);
312         _mosquitto_will_clear(mosq);
313 #ifdef WITH_TLS
314         if(mosq->ssl){
315                 SSL_free(mosq->ssl);
316         }
317         if(mosq->ssl_ctx){
318                 SSL_CTX_free(mosq->ssl_ctx);
319         }
320         if(mosq->tls_cafile) _mosquitto_free(mosq->tls_cafile);
321         if(mosq->tls_capath) _mosquitto_free(mosq->tls_capath);
322         if(mosq->tls_certfile) _mosquitto_free(mosq->tls_certfile);
323         if(mosq->tls_keyfile) _mosquitto_free(mosq->tls_keyfile);
324         if(mosq->tls_pw_callback) mosq->tls_pw_callback = NULL;
325         if(mosq->tls_version) _mosquitto_free(mosq->tls_version);
326         if(mosq->tls_ciphers) _mosquitto_free(mosq->tls_ciphers);
327         if(mosq->tls_psk) _mosquitto_free(mosq->tls_psk);
328         if(mosq->tls_psk_identity) _mosquitto_free(mosq->tls_psk_identity);
329 #endif
330
331         if(mosq->address){
332                 _mosquitto_free(mosq->address);
333                 mosq->address = NULL;
334         }
335         if(mosq->id){
336                 _mosquitto_free(mosq->id);
337                 mosq->id = NULL;
338         }
339         if(mosq->username){
340                 _mosquitto_free(mosq->username);
341                 mosq->username = NULL;
342         }
343         if(mosq->password){
344                 _mosquitto_free(mosq->password);
345                 mosq->password = NULL;
346         }
347         if(mosq->host){
348                 _mosquitto_free(mosq->host);
349                 mosq->host = NULL;
350         }
351         if(mosq->bind_address){
352                 _mosquitto_free(mosq->bind_address);
353                 mosq->bind_address = NULL;
354         }
355
356         /* Out packet cleanup */
357         if(mosq->out_packet && !mosq->current_out_packet){
358                 mosq->current_out_packet = mosq->out_packet;
359                 mosq->out_packet = mosq->out_packet->next;
360         }
361         while(mosq->current_out_packet){
362                 packet = mosq->current_out_packet;
363                 /* Free data and reset values */
364                 mosq->current_out_packet = mosq->out_packet;
365                 if(mosq->out_packet){
366                         mosq->out_packet = mosq->out_packet->next;
367                 }
368
369                 _mosquitto_packet_cleanup(packet);
370                 _mosquitto_free(packet);
371         }
372
373         _mosquitto_packet_cleanup(&mosq->in_packet);
374         if(mosq->sockpairR != INVALID_SOCKET){
375                 COMPAT_CLOSE(mosq->sockpairR);
376                 mosq->sockpairR = INVALID_SOCKET;
377         }
378         if(mosq->sockpairW != INVALID_SOCKET){
379                 COMPAT_CLOSE(mosq->sockpairW);
380                 mosq->sockpairW = INVALID_SOCKET;
381         }
382 }
383
384 void mosquitto_destroy(struct mosquitto *mosq)
385 {
386         if(!mosq) return;
387
388         _mosquitto_destroy(mosq);
389         _mosquitto_free(mosq);
390 }
391
392 int mosquitto_socket(struct mosquitto *mosq)
393 {
394         if(!mosq) return MOSQ_ERR_INVAL;
395         return mosq->sock;
396 }
397
398 static int _mosquitto_connect_init(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address)
399 {
400         if(!mosq) return MOSQ_ERR_INVAL;
401         if(!host || port <= 0) return MOSQ_ERR_INVAL;
402
403         if(mosq->host) _mosquitto_free(mosq->host);
404         mosq->host = _mosquitto_strdup(host);
405         if(!mosq->host) return MOSQ_ERR_NOMEM;
406         mosq->port = port;
407
408         if(mosq->bind_address) _mosquitto_free(mosq->bind_address);
409         if(bind_address){
410                 mosq->bind_address = _mosquitto_strdup(bind_address);
411                 if(!mosq->bind_address) return MOSQ_ERR_NOMEM;
412         }
413
414         mosq->keepalive = keepalive;
415
416         if(_mosquitto_socketpair(&mosq->sockpairR, &mosq->sockpairW)){
417                 _mosquitto_log_printf(mosq, MOSQ_LOG_WARNING,
418                                 "Warning: Unable to open socket pair, outgoing publish commands may be delayed.");
419         }
420
421         return MOSQ_ERR_SUCCESS;
422 }
423
424 int mosquitto_connect(struct mosquitto *mosq, const char *host, int port, int keepalive)
425 {
426         return mosquitto_connect_bind(mosq, host, port, keepalive, NULL);
427 }
428
429 int mosquitto_connect_bind(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address)
430 {
431         int rc;
432         rc = _mosquitto_connect_init(mosq, host, port, keepalive, bind_address);
433         if(rc) return rc;
434
435         pthread_mutex_lock(&mosq->state_mutex);
436         mosq->state = mosq_cs_new;
437         pthread_mutex_unlock(&mosq->state_mutex);
438
439         return _mosquitto_reconnect(mosq, true);
440 }
441
442 int mosquitto_connect_async(struct mosquitto *mosq, const char *host, int port, int keepalive)
443 {
444         return mosquitto_connect_bind_async(mosq, host, port, keepalive, NULL);
445 }
446
447 int mosquitto_connect_bind_async(struct mosquitto *mosq, const char *host, int port, int keepalive, const char *bind_address)
448 {
449         int rc = _mosquitto_connect_init(mosq, host, port, keepalive, bind_address);
450         if(rc) return rc;
451
452         pthread_mutex_lock(&mosq->state_mutex);
453         mosq->state = mosq_cs_connect_async;
454         pthread_mutex_unlock(&mosq->state_mutex);
455
456         return _mosquitto_reconnect(mosq, false);
457 }
458
459 int mosquitto_reconnect_async(struct mosquitto *mosq)
460 {
461         return _mosquitto_reconnect(mosq, false);
462 }
463
464 int mosquitto_reconnect(struct mosquitto *mosq)
465 {
466         return _mosquitto_reconnect(mosq, true);
467 }
468
469 static int _mosquitto_reconnect(struct mosquitto *mosq, bool blocking)
470 {
471         int rc;
472         struct _mosquitto_packet *packet;
473         if(!mosq) return MOSQ_ERR_INVAL;
474         if(!mosq->host || mosq->port <= 0) return MOSQ_ERR_INVAL;
475
476         pthread_mutex_lock(&mosq->state_mutex);
477         mosq->state = mosq_cs_new;
478         pthread_mutex_unlock(&mosq->state_mutex);
479
480         pthread_mutex_lock(&mosq->msgtime_mutex);
481         mosq->last_msg_in = mosquitto_time();
482         mosq->last_msg_out = mosquitto_time();
483         pthread_mutex_unlock(&mosq->msgtime_mutex);
484
485         mosq->ping_t = 0;
486
487         _mosquitto_packet_cleanup(&mosq->in_packet);
488                 
489         pthread_mutex_lock(&mosq->current_out_packet_mutex);
490         pthread_mutex_lock(&mosq->out_packet_mutex);
491
492         if(mosq->out_packet && !mosq->current_out_packet){
493                 mosq->current_out_packet = mosq->out_packet;
494                 mosq->out_packet = mosq->out_packet->next;
495         }
496
497         while(mosq->current_out_packet){
498                 packet = mosq->current_out_packet;
499                 /* Free data and reset values */
500                 mosq->current_out_packet = mosq->out_packet;
501                 if(mosq->out_packet){
502                         mosq->out_packet = mosq->out_packet->next;
503                 }
504
505                 _mosquitto_packet_cleanup(packet);
506                 _mosquitto_free(packet);
507         }
508         pthread_mutex_unlock(&mosq->out_packet_mutex);
509         pthread_mutex_unlock(&mosq->current_out_packet_mutex);
510
511         _mosquitto_messages_reconnect_reset(mosq);
512
513         rc = _mosquitto_socket_connect(mosq, mosq->host, mosq->port, mosq->bind_address, blocking);
514         if(rc){
515                 return rc;
516         }
517
518         return _mosquitto_send_connect(mosq, mosq->keepalive, mosq->clean_session);
519 }
520
521 int mosquitto_disconnect(struct mosquitto *mosq)
522 {
523         if(!mosq) return MOSQ_ERR_INVAL;
524
525         pthread_mutex_lock(&mosq->state_mutex);
526         mosq->state = mosq_cs_disconnecting;
527         pthread_mutex_unlock(&mosq->state_mutex);
528
529         if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
530         return _mosquitto_send_disconnect(mosq);
531 }
532
533 int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, const void *payload, int qos, bool retain)
534 {
535         struct mosquitto_message_all *message;
536         uint16_t local_mid;
537
538         if(!mosq || !topic || qos<0 || qos>2) return MOSQ_ERR_INVAL;
539         if(strlen(topic) == 0) return MOSQ_ERR_INVAL;
540         if(payloadlen < 0 || payloadlen > MQTT_MAX_PAYLOAD) return MOSQ_ERR_PAYLOAD_SIZE;
541
542         if(_mosquitto_topic_wildcard_len_check(topic) != MOSQ_ERR_SUCCESS){
543                 return MOSQ_ERR_INVAL;
544         }
545
546         local_mid = _mosquitto_mid_generate(mosq);
547         if(mid){
548                 *mid = local_mid;
549         }
550
551         if(qos == 0){
552                 return _mosquitto_send_publish(mosq, local_mid, topic, payloadlen, payload, qos, retain, false);
553         }else{
554                 message = _mosquitto_calloc(1, sizeof(struct mosquitto_message_all));
555                 if(!message) return MOSQ_ERR_NOMEM;
556
557                 message->next = NULL;
558                 message->timestamp = mosquitto_time();
559                 message->msg.mid = local_mid;
560                 message->msg.topic = _mosquitto_strdup(topic);
561                 if(!message->msg.topic){
562                         _mosquitto_message_cleanup(&message);
563                         return MOSQ_ERR_NOMEM;
564                 }
565                 if(payloadlen){
566                         message->msg.payloadlen = payloadlen;
567                         message->msg.payload = _mosquitto_malloc(payloadlen*sizeof(uint8_t));
568                         if(!message->msg.payload){
569                                 _mosquitto_message_cleanup(&message);
570                                 return MOSQ_ERR_NOMEM;
571                         }
572                         memcpy(message->msg.payload, payload, payloadlen*sizeof(uint8_t));
573                 }else{
574                         message->msg.payloadlen = 0;
575                         message->msg.payload = NULL;
576                 }
577                 message->msg.qos = qos;
578                 message->msg.retain = retain;
579                 message->dup = false;
580
581                 pthread_mutex_lock(&mosq->out_message_mutex);
582                 _mosquitto_message_queue(mosq, message, mosq_md_out);
583                 if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){
584                         mosq->inflight_messages++;
585                         if(qos == 1){
586                                 message->state = mosq_ms_wait_for_puback;
587                         }else if(qos == 2){
588                                 message->state = mosq_ms_wait_for_pubrec;
589                         }
590                         pthread_mutex_unlock(&mosq->out_message_mutex);
591                         return _mosquitto_send_publish(mosq, message->msg.mid, message->msg.topic, message->msg.payloadlen, message->msg.payload, message->msg.qos, message->msg.retain, message->dup);
592                 }else{
593                         message->state = mosq_ms_invalid;
594                         pthread_mutex_unlock(&mosq->out_message_mutex);
595                         return MOSQ_ERR_SUCCESS;
596                 }
597         }
598 }
599
600 int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const char *sub, int qos)
601 {
602         if(!mosq) return MOSQ_ERR_INVAL;
603         if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
604
605         if(_mosquitto_topic_wildcard_pos_check(sub)) return MOSQ_ERR_INVAL;
606
607         return _mosquitto_send_subscribe(mosq, mid, false, sub, qos);
608 }
609
610 int mosquitto_unsubscribe(struct mosquitto *mosq, int *mid, const char *sub)
611 {
612         if(!mosq) return MOSQ_ERR_INVAL;
613         if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
614
615         if(_mosquitto_topic_wildcard_pos_check(sub)) return MOSQ_ERR_INVAL;
616
617         return _mosquitto_send_unsubscribe(mosq, mid, false, sub);
618 }
619
620 int mosquitto_tls_set(struct mosquitto *mosq, const char *cafile, const char *capath, const char *certfile, const char *keyfile, int (*pw_callback)(char *buf, int size, int rwflag, void *userdata))
621 {
622 #ifdef WITH_TLS
623         FILE *fptr;
624
625         if(!mosq || (!cafile && !capath) || (certfile && !keyfile) || (!certfile && keyfile)) return MOSQ_ERR_INVAL;
626
627         if(cafile){
628                 fptr = _mosquitto_fopen(cafile, "rt");
629                 if(fptr){
630                         fclose(fptr);
631                 }else{
632                         return MOSQ_ERR_INVAL;
633                 }
634                 mosq->tls_cafile = _mosquitto_strdup(cafile);
635
636                 if(!mosq->tls_cafile){
637                         return MOSQ_ERR_NOMEM;
638                 }
639         }else if(mosq->tls_cafile){
640                 _mosquitto_free(mosq->tls_cafile);
641                 mosq->tls_cafile = NULL;
642         }
643
644         if(capath){
645                 mosq->tls_capath = _mosquitto_strdup(capath);
646                 if(!mosq->tls_capath){
647                         return MOSQ_ERR_NOMEM;
648                 }
649         }else if(mosq->tls_capath){
650                 _mosquitto_free(mosq->tls_capath);
651                 mosq->tls_capath = NULL;
652         }
653
654         if(certfile){
655                 fptr = _mosquitto_fopen(certfile, "rt");
656                 if(fptr){
657                         fclose(fptr);
658                 }else{
659                         if(mosq->tls_cafile){
660                                 _mosquitto_free(mosq->tls_cafile);
661                                 mosq->tls_cafile = NULL;
662                         }
663                         if(mosq->tls_capath){
664                                 _mosquitto_free(mosq->tls_capath);
665                                 mosq->tls_capath = NULL;
666                         }
667                         return MOSQ_ERR_INVAL;
668                 }
669                 mosq->tls_certfile = _mosquitto_strdup(certfile);
670                 if(!mosq->tls_certfile){
671                         return MOSQ_ERR_NOMEM;
672                 }
673         }else{
674                 if(mosq->tls_certfile) _mosquitto_free(mosq->tls_certfile);
675                 mosq->tls_certfile = NULL;
676         }
677
678         if(keyfile){
679                 fptr = _mosquitto_fopen(keyfile, "rt");
680                 if(fptr){
681                         fclose(fptr);
682                 }else{
683                         if(mosq->tls_cafile){
684                                 _mosquitto_free(mosq->tls_cafile);
685                                 mosq->tls_cafile = NULL;
686                         }
687                         if(mosq->tls_capath){
688                                 _mosquitto_free(mosq->tls_capath);
689                                 mosq->tls_capath = NULL;
690                         }
691                         if(mosq->tls_certfile){
692                                 _mosquitto_free(mosq->tls_certfile);
693                                 mosq->tls_certfile = NULL;
694                         }
695                         return MOSQ_ERR_INVAL;
696                 }
697                 mosq->tls_keyfile = _mosquitto_strdup(keyfile);
698                 if(!mosq->tls_keyfile){
699                         return MOSQ_ERR_NOMEM;
700                 }
701         }else{
702                 if(mosq->tls_keyfile) _mosquitto_free(mosq->tls_keyfile);
703                 mosq->tls_keyfile = NULL;
704         }
705
706         mosq->tls_pw_callback = pw_callback;
707
708
709         return MOSQ_ERR_SUCCESS;
710 #else
711         return MOSQ_ERR_NOT_SUPPORTED;
712
713 #endif
714 }
715
716 int mosquitto_tls_opts_set(struct mosquitto *mosq, int cert_reqs, const char *tls_version, const char *ciphers)
717 {
718 #ifdef WITH_TLS
719         if(!mosq) return MOSQ_ERR_INVAL;
720
721         mosq->tls_cert_reqs = cert_reqs;
722         if(tls_version){
723 #if OPENSSL_VERSION_NUMBER >= 0x10001000L
724                 if(!strcasecmp(tls_version, "tlsv1.2")
725                                 || !strcasecmp(tls_version, "tlsv1.1")
726                                 || !strcasecmp(tls_version, "tlsv1")){
727
728                         mosq->tls_version = _mosquitto_strdup(tls_version);
729                         if(!mosq->tls_version) return MOSQ_ERR_NOMEM;
730                 }else{
731                         return MOSQ_ERR_INVAL;
732                 }
733 #else
734                 if(!strcasecmp(tls_version, "tlsv1")){
735                         mosq->tls_version = _mosquitto_strdup(tls_version);
736                         if(!mosq->tls_version) return MOSQ_ERR_NOMEM;
737                 }else{
738                         return MOSQ_ERR_INVAL;
739                 }
740 #endif
741         }else{
742 #if OPENSSL_VERSION_NUMBER >= 0x10001000L
743                 mosq->tls_version = _mosquitto_strdup("tlsv1.2");
744 #else
745                 mosq->tls_version = _mosquitto_strdup("tlsv1");
746 #endif
747                 if(!mosq->tls_version) return MOSQ_ERR_NOMEM;
748         }
749         if(ciphers){
750                 mosq->tls_ciphers = _mosquitto_strdup(ciphers);
751                 if(!mosq->tls_ciphers) return MOSQ_ERR_NOMEM;
752         }else{
753                 mosq->tls_ciphers = NULL;
754         }
755
756
757         return MOSQ_ERR_SUCCESS;
758 #else
759         return MOSQ_ERR_NOT_SUPPORTED;
760
761 #endif
762 }
763
764
765 int mosquitto_tls_insecure_set(struct mosquitto *mosq, bool value)
766 {
767 #ifdef WITH_TLS
768         if(!mosq) return MOSQ_ERR_INVAL;
769         mosq->tls_insecure = value;
770         return MOSQ_ERR_SUCCESS;
771 #else
772         return MOSQ_ERR_NOT_SUPPORTED;
773 #endif
774 }
775
776
777 int mosquitto_tls_psk_set(struct mosquitto *mosq, const char *psk, const char *identity, const char *ciphers)
778 {
779 #ifdef REAL_WITH_TLS_PSK
780         if(!mosq || !psk || !identity) return MOSQ_ERR_INVAL;
781
782         /* Check for hex only digits */
783         if(strspn(psk, "0123456789abcdefABCDEF") < strlen(psk)){
784                 return MOSQ_ERR_INVAL;
785         }
786         mosq->tls_psk = _mosquitto_strdup(psk);
787         if(!mosq->tls_psk) return MOSQ_ERR_NOMEM;
788
789         mosq->tls_psk_identity = _mosquitto_strdup(identity);
790         if(!mosq->tls_psk_identity){
791                 _mosquitto_free(mosq->tls_psk);
792                 return MOSQ_ERR_NOMEM;
793         }
794         if(ciphers){
795                 mosq->tls_ciphers = _mosquitto_strdup(ciphers);
796                 if(!mosq->tls_ciphers) return MOSQ_ERR_NOMEM;
797         }else{
798                 mosq->tls_ciphers = NULL;
799         }
800
801         return MOSQ_ERR_SUCCESS;
802 #else
803         return MOSQ_ERR_NOT_SUPPORTED;
804 #endif
805 }
806
807
808 int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
809 {
810 #ifdef HAVE_PSELECT
811         struct timespec local_timeout;
812 #else
813         struct timeval local_timeout;
814 #endif
815         fd_set readfds, writefds;
816         int fdcount;
817         int rc;
818         char pairbuf;
819         int maxfd = 0;
820
821         if(!mosq || max_packets < 1) return MOSQ_ERR_INVAL;
822
823         FD_ZERO(&readfds);
824         FD_ZERO(&writefds);
825         if(mosq->sock != INVALID_SOCKET){
826                 maxfd = mosq->sock;
827                 FD_SET(mosq->sock, &readfds);
828                 pthread_mutex_lock(&mosq->current_out_packet_mutex);
829                 pthread_mutex_lock(&mosq->out_packet_mutex);
830                 if(mosq->out_packet || mosq->current_out_packet){
831                         FD_SET(mosq->sock, &writefds);
832 #ifdef WITH_TLS
833                 }else if(mosq->ssl && mosq->want_write){
834                         FD_SET(mosq->sock, &writefds);
835 #endif
836                 }
837                 pthread_mutex_unlock(&mosq->out_packet_mutex);
838                 pthread_mutex_unlock(&mosq->current_out_packet_mutex);
839         }else{
840 #ifdef WITH_SRV
841                 if(mosq->achan){
842                         pthread_mutex_lock(&mosq->state_mutex);
843                         if(mosq->state == mosq_cs_connect_srv){
844                                 rc = ares_fds(mosq->achan, &readfds, &writefds);
845                                 if(rc > maxfd){
846                                         maxfd = rc;
847                                 }
848                         }else{
849                                 return MOSQ_ERR_NO_CONN;
850                         }
851                         pthread_mutex_unlock(&mosq->state_mutex);
852                 }
853 #else
854                 return MOSQ_ERR_NO_CONN;
855 #endif
856         }
857         if(mosq->sockpairR != INVALID_SOCKET){
858                 /* sockpairR is used to break out of select() before the timeout, on a
859                  * call to publish() etc. */
860                 FD_SET(mosq->sockpairR, &readfds);
861                 if(mosq->sockpairR > maxfd){
862                         maxfd = mosq->sockpairR;
863                 }
864         }
865
866         if(timeout >= 0){
867                 local_timeout.tv_sec = timeout/1000;
868 #ifdef HAVE_PSELECT
869                 local_timeout.tv_nsec = (timeout-local_timeout.tv_sec*1000)*1e6;
870 #else
871                 local_timeout.tv_usec = (timeout-local_timeout.tv_sec*1000)*1000;
872 #endif
873         }else{
874                 local_timeout.tv_sec = 1;
875 #ifdef HAVE_PSELECT
876                 local_timeout.tv_nsec = 0;
877 #else
878                 local_timeout.tv_usec = 0;
879 #endif
880         }
881
882 #ifdef HAVE_PSELECT
883         fdcount = pselect(maxfd+1, &readfds, &writefds, NULL, &local_timeout, NULL);
884 #else
885         fdcount = select(maxfd+1, &readfds, &writefds, NULL, &local_timeout);
886 #endif
887         if(fdcount == -1){
888 #ifdef WIN32
889                 errno = WSAGetLastError();
890 #endif
891                 if(errno == EINTR){
892                         return MOSQ_ERR_SUCCESS;
893                 }else{
894                         return MOSQ_ERR_ERRNO;
895                 }
896         }else{
897                 if(mosq->sock != INVALID_SOCKET){
898                         if(FD_ISSET(mosq->sock, &readfds)){
899                                 rc = mosquitto_loop_read(mosq, max_packets);
900                                 if(rc || mosq->sock == INVALID_SOCKET){
901                                         return rc;
902                                 }
903                         }
904                         if(mosq->sockpairR >= 0 && FD_ISSET(mosq->sockpairR, &readfds)){
905 #ifndef WIN32
906                                 if(read(mosq->sockpairR, &pairbuf, 1) == 0){
907                                 }
908 #else
909                                 recv(mosq->sockpairR, &pairbuf, 1, 0);
910 #endif
911                                 /* Fake write possible, to stimulate output write even though
912                                  * we didn't ask for it, because at that point the publish or
913                                  * other command wasn't present. */
914                                 FD_SET(mosq->sock, &writefds);
915                         }
916                         if(FD_ISSET(mosq->sock, &writefds)){
917                                 rc = mosquitto_loop_write(mosq, max_packets);
918                                 if(rc || mosq->sock == INVALID_SOCKET){
919                                         return rc;
920                                 }
921                         }
922                 }
923 #ifdef WITH_SRV
924                 if(mosq->achan){
925                         ares_process(mosq->achan, &readfds, &writefds);
926                 }
927 #endif
928         }
929         return mosquitto_loop_misc(mosq);
930 }
931
932 int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
933 {
934         int run = 1;
935         int rc;
936         unsigned int reconnects = 0;
937         unsigned long reconnect_delay;
938
939         if(!mosq) return MOSQ_ERR_INVAL;
940
941         if(mosq->state == mosq_cs_connect_async){
942                 mosquitto_reconnect(mosq);
943         }
944
945         while(run){
946                 do{
947                         rc = mosquitto_loop(mosq, timeout, max_packets);
948                         if (reconnects !=0 && rc == MOSQ_ERR_SUCCESS){
949                                 reconnects = 0;
950                         }
951                 }while(rc == MOSQ_ERR_SUCCESS);
952                 if(errno == EPROTO){
953                         return rc;
954                 }
955                 pthread_mutex_lock(&mosq->state_mutex);
956                 if(mosq->state == mosq_cs_disconnecting){
957                         run = 0;
958                         pthread_mutex_unlock(&mosq->state_mutex);
959                 }else{
960                         pthread_mutex_unlock(&mosq->state_mutex);
961
962                         if(mosq->reconnect_delay > 0 && mosq->reconnect_exponential_backoff){
963                                 reconnect_delay = mosq->reconnect_delay*reconnects*reconnects;
964                         }else{
965                                 reconnect_delay = mosq->reconnect_delay;
966                         }
967
968                         if(reconnect_delay > mosq->reconnect_delay_max){
969                                 reconnect_delay = mosq->reconnect_delay_max;
970                         }else{
971                                 reconnects++;
972                         }
973                                 
974 #ifdef WIN32
975                         Sleep(reconnect_delay*1000);
976 #else
977                         sleep(reconnect_delay);
978 #endif
979
980                         pthread_mutex_lock(&mosq->state_mutex);
981                         if(mosq->state == mosq_cs_disconnecting){
982                                 run = 0;
983                                 pthread_mutex_unlock(&mosq->state_mutex);
984                         }else{
985                                 pthread_mutex_unlock(&mosq->state_mutex);
986                                 mosquitto_reconnect(mosq);
987                         }
988                 }
989         }
990         return rc;
991 }
992
993 int mosquitto_loop_misc(struct mosquitto *mosq)
994 {
995         time_t now;
996         int rc;
997
998         if(!mosq) return MOSQ_ERR_INVAL;
999         if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
1000
1001         now = mosquitto_time();
1002
1003         _mosquitto_check_keepalive(mosq);
1004         if(mosq->last_retry_check+1 < now){
1005                 _mosquitto_message_retry_check(mosq);
1006                 mosq->last_retry_check = now;
1007         }
1008         if(mosq->ping_t && now - mosq->ping_t >= mosq->keepalive){
1009                 /* mosq->ping_t != 0 means we are waiting for a pingresp.
1010                  * This hasn't happened in the keepalive time so we should disconnect.
1011                  */
1012                 _mosquitto_socket_close(mosq);
1013                 pthread_mutex_lock(&mosq->state_mutex);
1014                 if(mosq->state == mosq_cs_disconnecting){
1015                         rc = MOSQ_ERR_SUCCESS;
1016                 }else{
1017                         rc = 1;
1018                 }
1019                 pthread_mutex_unlock(&mosq->state_mutex);
1020                 pthread_mutex_lock(&mosq->callback_mutex);
1021                 if(mosq->on_disconnect){
1022                         mosq->in_callback = true;
1023                         mosq->on_disconnect(mosq, mosq->userdata, rc);
1024                         mosq->in_callback = false;
1025                 }
1026                 pthread_mutex_unlock(&mosq->callback_mutex);
1027                 return MOSQ_ERR_CONN_LOST;
1028         }
1029         return MOSQ_ERR_SUCCESS;
1030 }
1031
1032 static int _mosquitto_loop_rc_handle(struct mosquitto *mosq, int rc)
1033 {
1034         if(rc){
1035                 _mosquitto_socket_close(mosq);
1036                 pthread_mutex_lock(&mosq->state_mutex);
1037                 if(mosq->state == mosq_cs_disconnecting){
1038                         rc = MOSQ_ERR_SUCCESS;
1039                 }
1040                 pthread_mutex_unlock(&mosq->state_mutex);
1041                 pthread_mutex_lock(&mosq->callback_mutex);
1042                 if(mosq->on_disconnect){
1043                         mosq->in_callback = true;
1044                         mosq->on_disconnect(mosq, mosq->userdata, rc);
1045                         mosq->in_callback = false;
1046                 }
1047                 pthread_mutex_unlock(&mosq->callback_mutex);
1048                 return rc;
1049         }
1050         return rc;
1051 }
1052
1053 int mosquitto_loop_read(struct mosquitto *mosq, int max_packets)
1054 {
1055         int rc;
1056         int i;
1057         if(max_packets < 1) return MOSQ_ERR_INVAL;
1058
1059         pthread_mutex_lock(&mosq->out_message_mutex);
1060         max_packets = mosq->out_queue_len;
1061         pthread_mutex_unlock(&mosq->out_message_mutex);
1062
1063         pthread_mutex_lock(&mosq->in_message_mutex);
1064         max_packets += mosq->in_queue_len;
1065         pthread_mutex_unlock(&mosq->in_message_mutex);
1066
1067         if(max_packets < 1) max_packets = 1;
1068         /* Queue len here tells us how many messages are awaiting processing and
1069          * have QoS > 0. We should try to deal with that many in this loop in order
1070          * to keep up. */
1071         for(i=0; i<max_packets; i++){
1072                 rc = _mosquitto_packet_read(mosq);
1073                 if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
1074                         return _mosquitto_loop_rc_handle(mosq, rc);
1075                 }
1076         }
1077         return rc;
1078 }
1079
1080 int mosquitto_loop_write(struct mosquitto *mosq, int max_packets)
1081 {
1082         int rc;
1083         int i;
1084         if(max_packets < 1) return MOSQ_ERR_INVAL;
1085
1086         pthread_mutex_lock(&mosq->out_message_mutex);
1087         max_packets = mosq->out_queue_len;
1088         pthread_mutex_unlock(&mosq->out_message_mutex);
1089
1090         pthread_mutex_lock(&mosq->in_message_mutex);
1091         max_packets += mosq->in_queue_len;
1092         pthread_mutex_unlock(&mosq->in_message_mutex);
1093
1094         if(max_packets < 1) max_packets = 1;
1095         /* Queue len here tells us how many messages are awaiting processing and
1096          * have QoS > 0. We should try to deal with that many in this loop in order
1097          * to keep up. */
1098         for(i=0; i<max_packets; i++){
1099                 rc = _mosquitto_packet_write(mosq);
1100                 if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
1101                         return _mosquitto_loop_rc_handle(mosq, rc);
1102                 }
1103         }
1104         return rc;
1105 }
1106
1107 bool mosquitto_want_write(struct mosquitto *mosq)
1108 {
1109         if(mosq->out_packet || mosq->current_out_packet){
1110                 return true;
1111 #ifdef WITH_TLS
1112         }else if(mosq->ssl && mosq->want_write){
1113                 return true;
1114 #endif
1115         }else{
1116                 return false;
1117         }
1118 }
1119
1120 void mosquitto_connect_callback_set(struct mosquitto *mosq, void (*on_connect)(struct mosquitto *, void *, int))
1121 {
1122         pthread_mutex_lock(&mosq->callback_mutex);
1123         mosq->on_connect = on_connect;
1124         pthread_mutex_unlock(&mosq->callback_mutex);
1125 }
1126
1127 void mosquitto_disconnect_callback_set(struct mosquitto *mosq, void (*on_disconnect)(struct mosquitto *, void *, int))
1128 {
1129         pthread_mutex_lock(&mosq->callback_mutex);
1130         mosq->on_disconnect = on_disconnect;
1131         pthread_mutex_unlock(&mosq->callback_mutex);
1132 }
1133
1134 void mosquitto_publish_callback_set(struct mosquitto *mosq, void (*on_publish)(struct mosquitto *, void *, int))
1135 {
1136         pthread_mutex_lock(&mosq->callback_mutex);
1137         mosq->on_publish = on_publish;
1138         pthread_mutex_unlock(&mosq->callback_mutex);
1139 }
1140
1141 void mosquitto_message_callback_set(struct mosquitto *mosq, void (*on_message)(struct mosquitto *, void *, const struct mosquitto_message *))
1142 {
1143         pthread_mutex_lock(&mosq->callback_mutex);
1144         mosq->on_message = on_message;
1145         pthread_mutex_unlock(&mosq->callback_mutex);
1146 }
1147
1148 void mosquitto_subscribe_callback_set(struct mosquitto *mosq, void (*on_subscribe)(struct mosquitto *, void *, int, int, const int *))
1149 {
1150         pthread_mutex_lock(&mosq->callback_mutex);
1151         mosq->on_subscribe = on_subscribe;
1152         pthread_mutex_unlock(&mosq->callback_mutex);
1153 }
1154
1155 void mosquitto_unsubscribe_callback_set(struct mosquitto *mosq, void (*on_unsubscribe)(struct mosquitto *, void *, int))
1156 {
1157         pthread_mutex_lock(&mosq->callback_mutex);
1158         mosq->on_unsubscribe = on_unsubscribe;
1159         pthread_mutex_unlock(&mosq->callback_mutex);
1160 }
1161
1162 void mosquitto_log_callback_set(struct mosquitto *mosq, void (*on_log)(struct mosquitto *, void *, int, const char *))
1163 {
1164         pthread_mutex_lock(&mosq->log_callback_mutex);
1165         mosq->on_log = on_log;
1166         pthread_mutex_unlock(&mosq->log_callback_mutex);
1167 }
1168
1169 void mosquitto_user_data_set(struct mosquitto *mosq, void *userdata)
1170 {
1171         if(mosq){
1172                 mosq->userdata = userdata;
1173         }
1174 }
1175
1176 const char *mosquitto_strerror(int mosq_errno)
1177 {
1178         switch(mosq_errno){
1179                 case MOSQ_ERR_SUCCESS:
1180                         return "No error.";
1181                 case MOSQ_ERR_NOMEM:
1182                         return "Out of memory.";
1183                 case MOSQ_ERR_PROTOCOL:
1184                         return "A network protocol error occurred when communicating with the broker.";
1185                 case MOSQ_ERR_INVAL:
1186                         return "Invalid function arguments provided.";
1187                 case MOSQ_ERR_NO_CONN:
1188                         return "The client is not currently connected.";
1189                 case MOSQ_ERR_CONN_REFUSED:
1190                         return "The connection was refused.";
1191                 case MOSQ_ERR_NOT_FOUND:
1192                         return "Message not found (internal error).";
1193                 case MOSQ_ERR_CONN_LOST:
1194                         return "The connection was lost.";
1195                 case MOSQ_ERR_TLS:
1196                         return "A TLS error occurred.";
1197                 case MOSQ_ERR_PAYLOAD_SIZE:
1198                         return "Payload too large.";
1199                 case MOSQ_ERR_NOT_SUPPORTED:
1200                         return "This feature is not supported.";
1201                 case MOSQ_ERR_AUTH:
1202                         return "Authorisation failed.";
1203                 case MOSQ_ERR_ACL_DENIED:
1204                         return "Access denied by ACL.";
1205                 case MOSQ_ERR_UNKNOWN:
1206                         return "Unknown error.";
1207                 case MOSQ_ERR_ERRNO:
1208                         return strerror(errno);
1209                 default:
1210                         return "Unknown error.";
1211         }
1212 }
1213
1214 const char *mosquitto_connack_string(int connack_code)
1215 {
1216         switch(connack_code){
1217                 case 0:
1218                         return "Connection Accepted.";
1219                 case 1:
1220                         return "Connection Refused: unacceptable protocol version.";
1221                 case 2:
1222                         return "Connection Refused: identifier rejected.";
1223                 case 3:
1224                         return "Connection Refused: broker unavailable.";
1225                 case 4:
1226                         return "Connection Refused: bad user name or password.";
1227                 case 5:
1228                         return "Connection Refused: not authorised.";
1229                 default:
1230                         return "Connection Refused: unknown reason.";
1231         }
1232 }
1233
1234 int mosquitto_sub_topic_tokenise(const char *subtopic, char ***topics, int *count)
1235 {
1236         int len;
1237         int hier_count = 1;
1238         int start, stop;
1239         int hier;
1240         int tlen;
1241         int i, j;
1242
1243         if(!subtopic || !topics || !count) return MOSQ_ERR_INVAL;
1244
1245         len = strlen(subtopic);
1246
1247         for(i=0; i<len; i++){
1248                 if(subtopic[i] == '/'){
1249                         if(i > len-1){
1250                                 /* Separator at end of line */
1251                         }else{
1252                                 hier_count++;
1253                         }
1254                 }
1255         }
1256
1257         (*topics) = _mosquitto_calloc(hier_count, sizeof(char *));
1258         if(!(*topics)) return MOSQ_ERR_NOMEM;
1259
1260         start = 0;
1261         stop = 0;
1262         hier = 0;
1263
1264         for(i=0; i<len+1; i++){
1265                 if(subtopic[i] == '/' || subtopic[i] == '\0'){
1266                         stop = i;
1267                         if(start != stop){
1268                                 tlen = stop-start + 1;
1269                                 (*topics)[hier] = _mosquitto_calloc(tlen, sizeof(char));
1270                                 if(!(*topics)[hier]){
1271                                         for(i=0; i<hier_count; i++){
1272                                                 if((*topics)[hier]){
1273                                                         _mosquitto_free((*topics)[hier]);
1274                                                 }
1275                                         }
1276                                         _mosquitto_free((*topics));
1277                                         return MOSQ_ERR_NOMEM;
1278                                 }
1279                                 for(j=start; j<stop; j++){
1280                                         (*topics)[hier][j-start] = subtopic[j];
1281                                 }
1282                         }
1283                         start = i+1;
1284                         hier++;
1285                 }
1286         }
1287
1288         *count = hier_count;
1289
1290         return MOSQ_ERR_SUCCESS;
1291 }
1292
1293 int mosquitto_sub_topic_tokens_free(char ***topics, int count)
1294 {
1295         int i;
1296
1297         if(!topics || !(*topics) || count<1) return MOSQ_ERR_INVAL;
1298
1299         for(i=0; i<count; i++){
1300                 if((*topics)[i]) _mosquitto_free((*topics)[i]);
1301         }
1302         _mosquitto_free(*topics);
1303
1304         return MOSQ_ERR_SUCCESS;
1305 }
1306