iotivity 0.9.0
[platform/upstream/iotivity.git] / service / protocol-plugin / plugins / mqtt-light / lib / send_mosq.c
1 /*
2 Copyright (c) 2009-2013 Roger Light <roger@atchoo.org>
3 All rights reserved.
4
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
7
8 1. Redistributions of source code must retain the above copyright notice,
9    this list of conditions and the following disclaimer.
10 2. Redistributions in binary form must reproduce the above copyright
11    notice, this list of conditions and the following disclaimer in the
12    documentation and/or other materials provided with the distribution.
13 3. Neither the name of mosquitto nor the names of its
14    contributors may be used to endorse or promote products derived from
15    this software without specific prior written permission.
16
17 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include <assert.h>
31 #include <stdio.h>
32 #include <string.h>
33
34 #include "mosquitto.h"
35 #include "mosquitto_internal.h"
36 #include "logging_mosq.h"
37 #include "mqtt3_protocol.h"
38 #include "memory_mosq.h"
39 #include "net_mosq.h"
40 #include "send_mosq.h"
41 #include "time_mosq.h"
42 #include "util_mosq.h"
43
44 #ifdef WITH_BROKER
45 #include "mosquitto_broker.h"
46 #  ifdef WITH_SYS_TREE
47 extern uint64_t g_pub_bytes_sent;
48 #  endif
49 #endif
50
51 int _mosquitto_send_pingreq(struct mosquitto *mosq)
52 {
53         int rc;
54         assert(mosq);
55 #ifdef WITH_BROKER
56         _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PINGREQ to %s", mosq->id);
57 #else
58         _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PINGREQ", mosq->id);
59 #endif
60         rc = _mosquitto_send_simple_command(mosq, PINGREQ);
61         if(rc == MOSQ_ERR_SUCCESS){
62                 mosq->ping_t = mosquitto_time();
63         }
64         return rc;
65 }
66
67 int _mosquitto_send_pingresp(struct mosquitto *mosq)
68 {
69 #ifdef WITH_BROKER
70         if(mosq) _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PINGRESP to %s", mosq->id);
71 #else
72         if(mosq) _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PINGRESP", mosq->id);
73 #endif
74         return _mosquitto_send_simple_command(mosq, PINGRESP);
75 }
76
77 int _mosquitto_send_puback(struct mosquitto *mosq, uint16_t mid)
78 {
79 #ifdef WITH_BROKER
80         if(mosq) _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBACK to %s (Mid: %d)", mosq->id, mid);
81 #else
82         if(mosq) _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBACK (Mid: %d)", mosq->id, mid);
83 #endif
84         return _mosquitto_send_command_with_mid(mosq, PUBACK, mid, false);
85 }
86
87 int _mosquitto_send_pubcomp(struct mosquitto *mosq, uint16_t mid)
88 {
89 #ifdef WITH_BROKER
90         if(mosq) _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBCOMP to %s (Mid: %d)", mosq->id, mid);
91 #else
92         if(mosq) _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBCOMP (Mid: %d)", mosq->id, mid);
93 #endif
94         return _mosquitto_send_command_with_mid(mosq, PUBCOMP, mid, false);
95 }
96
97 int _mosquitto_send_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup)
98 {
99 #ifdef WITH_BROKER
100         size_t len;
101 #ifdef WITH_BRIDGE
102         int i;
103         struct _mqtt3_bridge_topic *cur_topic;
104         bool match;
105         int rc;
106         char *mapped_topic = NULL;
107         char *topic_temp = NULL;
108 #endif
109 #endif
110         assert(mosq);
111         assert(topic);
112
113         if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
114 #ifdef WITH_BROKER
115         if(mosq->listener && mosq->listener->mount_point){
116                 len = strlen(mosq->listener->mount_point);
117                 if(len < strlen(topic)){
118                         topic += len;
119                 }else{
120                         /* Invalid topic string. Should never happen, but silently swallow the message anyway. */
121                         return MOSQ_ERR_SUCCESS;
122                 }
123         }
124 #ifdef WITH_BRIDGE
125         if(mosq->bridge && mosq->bridge->topics && mosq->bridge->topic_remapping){
126                 for(i=0; i<mosq->bridge->topic_count; i++){
127                         cur_topic = &mosq->bridge->topics[i];
128                         if((cur_topic->direction == bd_both || cur_topic->direction == bd_out) 
129                                         && (cur_topic->remote_prefix || cur_topic->local_prefix)){
130                                 /* Topic mapping required on this topic if the message matches */
131
132                                 rc = mosquitto_topic_matches_sub(cur_topic->local_topic, topic, &match);
133                                 if(rc){
134                                         return rc;
135                                 }
136                                 if(match){
137                                         mapped_topic = _mosquitto_strdup(topic);
138                                         if(!mapped_topic) return MOSQ_ERR_NOMEM;
139                                         if(cur_topic->local_prefix){
140                                                 /* This prefix needs removing. */
141                                                 if(!strncmp(cur_topic->local_prefix, mapped_topic, strlen(cur_topic->local_prefix))){
142                                                         topic_temp = _mosquitto_strdup(mapped_topic+strlen(cur_topic->local_prefix));
143                                                         _mosquitto_free(mapped_topic);
144                                                         if(!topic_temp){
145                                                                 return MOSQ_ERR_NOMEM;
146                                                         }
147                                                         mapped_topic = topic_temp;
148                                                 }
149                                         }
150
151                                         if(cur_topic->remote_prefix){
152                                                 /* This prefix needs adding. */
153                                                 len = strlen(mapped_topic) + strlen(cur_topic->remote_prefix)+1;
154                                                 topic_temp = _mosquitto_calloc(len+1, sizeof(char));
155                                                 if(!topic_temp){
156                                                         _mosquitto_free(mapped_topic);
157                                                         return MOSQ_ERR_NOMEM;
158                                                 }
159                                                 snprintf(topic_temp, len, "%s%s", cur_topic->remote_prefix, mapped_topic);
160                                                 _mosquitto_free(mapped_topic);
161                                                 mapped_topic = topic_temp;
162                                         }
163                                         _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, mapped_topic, (long)payloadlen);
164 #ifdef WITH_SYS_TREE
165                                         g_pub_bytes_sent += payloadlen;
166 #endif
167                                         rc =  _mosquitto_send_real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup);
168                                         _mosquitto_free(mapped_topic);
169                                         return rc;
170                                 }
171                         }
172                 }
173         }
174 #endif
175         _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, topic, (long)payloadlen);
176 #  ifdef WITH_SYS_TREE
177         g_pub_bytes_sent += payloadlen;
178 #  endif
179 #else
180         _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBLISH (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, topic, (long)payloadlen);
181 #endif
182
183         return _mosquitto_send_real_publish(mosq, mid, topic, payloadlen, payload, qos, retain, dup);
184 }
185
186 int _mosquitto_send_pubrec(struct mosquitto *mosq, uint16_t mid)
187 {
188 #ifdef WITH_BROKER
189         if(mosq) _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBREC to %s (Mid: %d)", mosq->id, mid);
190 #else
191         if(mosq) _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBREC (Mid: %d)", mosq->id, mid);
192 #endif
193         return _mosquitto_send_command_with_mid(mosq, PUBREC, mid, false);
194 }
195
196 int _mosquitto_send_pubrel(struct mosquitto *mosq, uint16_t mid, bool dup)
197 {
198 #ifdef WITH_BROKER
199         if(mosq) _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBREL to %s (Mid: %d)", mosq->id, mid);
200 #else
201         if(mosq) _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBREL (Mid: %d)", mosq->id, mid);
202 #endif
203         return _mosquitto_send_command_with_mid(mosq, PUBREL|2, mid, dup);
204 }
205
206 /* For PUBACK, PUBCOMP, PUBREC, and PUBREL */
207 int _mosquitto_send_command_with_mid(struct mosquitto *mosq, uint8_t command, uint16_t mid, bool dup)
208 {
209         struct _mosquitto_packet *packet = NULL;
210         int rc;
211
212         assert(mosq);
213         packet = _mosquitto_calloc(1, sizeof(struct _mosquitto_packet));
214         if(!packet) return MOSQ_ERR_NOMEM;
215
216         packet->command = command;
217         if(dup){
218                 packet->command |= 8;
219         }
220         packet->remaining_length = 2;
221         rc = _mosquitto_packet_alloc(packet);
222         if(rc){
223                 _mosquitto_free(packet);
224                 return rc;
225         }
226
227         packet->payload[packet->pos+0] = MOSQ_MSB(mid);
228         packet->payload[packet->pos+1] = MOSQ_LSB(mid);
229
230         return _mosquitto_packet_queue(mosq, packet);
231 }
232
233 /* For DISCONNECT, PINGREQ and PINGRESP */
234 int _mosquitto_send_simple_command(struct mosquitto *mosq, uint8_t command)
235 {
236         struct _mosquitto_packet *packet = NULL;
237         int rc;
238
239         assert(mosq);
240         packet = _mosquitto_calloc(1, sizeof(struct _mosquitto_packet));
241         if(!packet) return MOSQ_ERR_NOMEM;
242
243         packet->command = command;
244         packet->remaining_length = 0;
245
246         rc = _mosquitto_packet_alloc(packet);
247         if(rc){
248                 _mosquitto_free(packet);
249                 return rc;
250         }
251
252         return _mosquitto_packet_queue(mosq, packet);
253 }
254
255 int _mosquitto_send_real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup)
256 {
257         struct _mosquitto_packet *packet = NULL;
258         int packetlen;
259         int rc;
260
261         assert(mosq);
262         assert(topic);
263
264         packetlen = 2+strlen(topic) + payloadlen;
265         if(qos > 0) packetlen += 2; /* For message id */
266         packet = _mosquitto_calloc(1, sizeof(struct _mosquitto_packet));
267         if(!packet) return MOSQ_ERR_NOMEM;
268
269         packet->mid = mid;
270         packet->command = PUBLISH | ((dup&0x1)<<3) | (qos<<1) | retain;
271         packet->remaining_length = packetlen;
272         rc = _mosquitto_packet_alloc(packet);
273         if(rc){
274                 _mosquitto_free(packet);
275                 return rc;
276         }
277         /* Variable header (topic string) */
278         _mosquitto_write_string(packet, topic, strlen(topic));
279         if(qos > 0){
280                 _mosquitto_write_uint16(packet, mid);
281         }
282
283         /* Payload */
284         if(payloadlen){
285                 _mosquitto_write_bytes(packet, payload, payloadlen);
286         }
287
288         return _mosquitto_packet_queue(mosq, packet);
289 }