2 Copyright (c) 2009-2013 Roger Light <roger@atchoo.org>
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
8 1. Redistributions of source code must retain the above copyright notice,
9 this list of conditions and the following disclaimer.
10 2. Redistributions in binary form must reproduce the above copyright
11 notice, this list of conditions and the following disclaimer in the
12 documentation and/or other materials provided with the distribution.
13 3. Neither the name of mosquitto nor the names of its
14 contributors may be used to endorse or promote products derived from
15 this software without specific prior written permission.
17 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 POSSIBILITY OF SUCH DAMAGE.
34 #include "mosquitto.h"
35 #include "mosquitto_internal.h"
36 #include "logging_mosq.h"
37 #include "mqtt3_protocol.h"
38 #include "memory_mosq.h"
40 #include "send_mosq.h"
41 #include "time_mosq.h"
42 #include "util_mosq.h"
45 #include "mosquitto_broker.h"
47 extern uint64_t g_pub_bytes_sent;
51 int _mosquitto_send_pingreq(struct mosquitto *mosq)
56 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PINGREQ to %s", mosq->id);
58 _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PINGREQ", mosq->id);
60 rc = _mosquitto_send_simple_command(mosq, PINGREQ);
61 if(rc == MOSQ_ERR_SUCCESS){
62 mosq->ping_t = mosquitto_time();
67 int _mosquitto_send_pingresp(struct mosquitto *mosq)
70 if(mosq) _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PINGRESP to %s", mosq->id);
72 if(mosq) _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PINGRESP", mosq->id);
74 return _mosquitto_send_simple_command(mosq, PINGRESP);
77 int _mosquitto_send_puback(struct mosquitto *mosq, uint16_t mid)
80 if(mosq) _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBACK to %s (Mid: %d)", mosq->id, mid);
82 if(mosq) _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBACK (Mid: %d)", mosq->id, mid);
84 return _mosquitto_send_command_with_mid(mosq, PUBACK, mid, false);
87 int _mosquitto_send_pubcomp(struct mosquitto *mosq, uint16_t mid)
90 if(mosq) _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBCOMP to %s (Mid: %d)", mosq->id, mid);
92 if(mosq) _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBCOMP (Mid: %d)", mosq->id, mid);
94 return _mosquitto_send_command_with_mid(mosq, PUBCOMP, mid, false);
97 int _mosquitto_send_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup)
103 struct _mqtt3_bridge_topic *cur_topic;
106 char *mapped_topic = NULL;
107 char *topic_temp = NULL;
113 if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
115 if(mosq->listener && mosq->listener->mount_point){
116 len = strlen(mosq->listener->mount_point);
117 if(len < strlen(topic)){
120 /* Invalid topic string. Should never happen, but silently swallow the message anyway. */
121 return MOSQ_ERR_SUCCESS;
125 if(mosq->bridge && mosq->bridge->topics && mosq->bridge->topic_remapping){
126 for(i=0; i<mosq->bridge->topic_count; i++){
127 cur_topic = &mosq->bridge->topics[i];
128 if((cur_topic->direction == bd_both || cur_topic->direction == bd_out)
129 && (cur_topic->remote_prefix || cur_topic->local_prefix)){
130 /* Topic mapping required on this topic if the message matches */
132 rc = mosquitto_topic_matches_sub(cur_topic->local_topic, topic, &match);
137 mapped_topic = _mosquitto_strdup(topic);
138 if(!mapped_topic) return MOSQ_ERR_NOMEM;
139 if(cur_topic->local_prefix){
140 /* This prefix needs removing. */
141 if(!strncmp(cur_topic->local_prefix, mapped_topic, strlen(cur_topic->local_prefix))){
142 topic_temp = _mosquitto_strdup(mapped_topic+strlen(cur_topic->local_prefix));
143 _mosquitto_free(mapped_topic);
145 return MOSQ_ERR_NOMEM;
147 mapped_topic = topic_temp;
151 if(cur_topic->remote_prefix){
152 /* This prefix needs adding. */
153 len = strlen(mapped_topic) + strlen(cur_topic->remote_prefix)+1;
154 topic_temp = _mosquitto_calloc(len+1, sizeof(char));
156 _mosquitto_free(mapped_topic);
157 return MOSQ_ERR_NOMEM;
159 snprintf(topic_temp, len, "%s%s", cur_topic->remote_prefix, mapped_topic);
160 _mosquitto_free(mapped_topic);
161 mapped_topic = topic_temp;
163 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, mapped_topic, (long)payloadlen);
165 g_pub_bytes_sent += payloadlen;
167 rc = _mosquitto_send_real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup);
168 _mosquitto_free(mapped_topic);
175 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, topic, (long)payloadlen);
176 # ifdef WITH_SYS_TREE
177 g_pub_bytes_sent += payloadlen;
180 _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBLISH (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, topic, (long)payloadlen);
183 return _mosquitto_send_real_publish(mosq, mid, topic, payloadlen, payload, qos, retain, dup);
186 int _mosquitto_send_pubrec(struct mosquitto *mosq, uint16_t mid)
189 if(mosq) _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBREC to %s (Mid: %d)", mosq->id, mid);
191 if(mosq) _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBREC (Mid: %d)", mosq->id, mid);
193 return _mosquitto_send_command_with_mid(mosq, PUBREC, mid, false);
196 int _mosquitto_send_pubrel(struct mosquitto *mosq, uint16_t mid, bool dup)
199 if(mosq) _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBREL to %s (Mid: %d)", mosq->id, mid);
201 if(mosq) _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBREL (Mid: %d)", mosq->id, mid);
203 return _mosquitto_send_command_with_mid(mosq, PUBREL|2, mid, dup);
206 /* For PUBACK, PUBCOMP, PUBREC, and PUBREL */
207 int _mosquitto_send_command_with_mid(struct mosquitto *mosq, uint8_t command, uint16_t mid, bool dup)
209 struct _mosquitto_packet *packet = NULL;
213 packet = _mosquitto_calloc(1, sizeof(struct _mosquitto_packet));
214 if(!packet) return MOSQ_ERR_NOMEM;
216 packet->command = command;
218 packet->command |= 8;
220 packet->remaining_length = 2;
221 rc = _mosquitto_packet_alloc(packet);
223 _mosquitto_free(packet);
227 packet->payload[packet->pos+0] = MOSQ_MSB(mid);
228 packet->payload[packet->pos+1] = MOSQ_LSB(mid);
230 return _mosquitto_packet_queue(mosq, packet);
233 /* For DISCONNECT, PINGREQ and PINGRESP */
234 int _mosquitto_send_simple_command(struct mosquitto *mosq, uint8_t command)
236 struct _mosquitto_packet *packet = NULL;
240 packet = _mosquitto_calloc(1, sizeof(struct _mosquitto_packet));
241 if(!packet) return MOSQ_ERR_NOMEM;
243 packet->command = command;
244 packet->remaining_length = 0;
246 rc = _mosquitto_packet_alloc(packet);
248 _mosquitto_free(packet);
252 return _mosquitto_packet_queue(mosq, packet);
255 int _mosquitto_send_real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup)
257 struct _mosquitto_packet *packet = NULL;
264 packetlen = 2+strlen(topic) + payloadlen;
265 if(qos > 0) packetlen += 2; /* For message id */
266 packet = _mosquitto_calloc(1, sizeof(struct _mosquitto_packet));
267 if(!packet) return MOSQ_ERR_NOMEM;
270 packet->command = PUBLISH | ((dup&0x1)<<3) | (qos<<1) | retain;
271 packet->remaining_length = packetlen;
272 rc = _mosquitto_packet_alloc(packet);
274 _mosquitto_free(packet);
277 /* Variable header (topic string) */
278 _mosquitto_write_string(packet, topic, strlen(topic));
280 _mosquitto_write_uint16(packet, mid);
285 _mosquitto_write_bytes(packet, payload, payloadlen);
288 return _mosquitto_packet_queue(mosq, packet);