2 Copyright (c) 2009-2013 Roger Light <roger@atchoo.org>
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
8 1. Redistributions of source code must retain the above copyright notice,
9 this list of conditions and the following disclaimer.
10 2. Redistributions in binary form must reproduce the above copyright
11 notice, this list of conditions and the following disclaimer in the
12 documentation and/or other materials provided with the distribution.
13 3. Neither the name of mosquitto nor the names of its
14 contributors may be used to endorse or promote products derived from
15 this software without specific prior written permission.
17 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 POSSIBILITY OF SUCH DAMAGE.
34 #include "mosquitto.h"
35 #include "logging_mosq.h"
36 #include "memory_mosq.h"
37 #include "messages_mosq.h"
38 #include "mqtt3_protocol.h"
40 #include "read_handle.h"
41 #include "send_mosq.h"
42 #include "util_mosq.h"
44 #include "mosquitto_broker.h"
47 int _mosquitto_handle_pingreq(struct mosquitto *mosq)
50 #ifdef WITH_STRICT_PROTOCOL
51 if(mosq->in_packet.remaining_length != 0){
52 return MOSQ_ERR_PROTOCOL;
56 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received PINGREQ from %s", mosq->id);
58 _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PINGREQ", mosq->id);
60 return _mosquitto_send_pingresp(mosq);
63 int _mosquitto_handle_pingresp(struct mosquitto *mosq)
66 #ifdef WITH_STRICT_PROTOCOL
67 if(mosq->in_packet.remaining_length != 0){
68 return MOSQ_ERR_PROTOCOL;
71 mosq->ping_t = 0; /* No longer waiting for a PINGRESP. */
73 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received PINGRESP from %s", mosq->id);
75 _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PINGRESP", mosq->id);
77 return MOSQ_ERR_SUCCESS;
80 int _mosquitto_handle_pubackcomp(struct mosquitto *mosq, const char *type)
86 #ifdef WITH_STRICT_PROTOCOL
87 if(mosq->in_packet.remaining_length != 2){
88 return MOSQ_ERR_PROTOCOL;
91 rc = _mosquitto_read_uint16(&mosq->in_packet, &mid);
94 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received %s from %s (Mid: %d)", type, mosq->id, mid);
97 rc = mqtt3_db_message_delete(mosq, mid, mosq_md_out);
101 _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received %s (Mid: %d)", mosq->id, type, mid);
103 if(!_mosquitto_message_delete(mosq, mid, mosq_md_out)){
104 /* Only inform the client the message has been sent once. */
105 pthread_mutex_lock(&mosq->callback_mutex);
106 if(mosq->on_publish){
107 mosq->in_callback = true;
108 mosq->on_publish(mosq, mosq->userdata, mid);
109 mosq->in_callback = false;
111 pthread_mutex_unlock(&mosq->callback_mutex);
115 return MOSQ_ERR_SUCCESS;
118 int _mosquitto_handle_pubrec(struct mosquitto *mosq)
124 #ifdef WITH_STRICT_PROTOCOL
125 if(mosq->in_packet.remaining_length != 2){
126 return MOSQ_ERR_PROTOCOL;
129 rc = _mosquitto_read_uint16(&mosq->in_packet, &mid);
132 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received PUBREC from %s (Mid: %d)", mosq->id, mid);
134 rc = mqtt3_db_message_update(mosq, mid, mosq_md_out, mosq_ms_wait_for_pubcomp);
136 _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PUBREC (Mid: %d)", mosq->id, mid);
138 rc = _mosquitto_message_out_update(mosq, mid, mosq_ms_wait_for_pubcomp);
141 rc = _mosquitto_send_pubrel(mosq, mid, false);
144 return MOSQ_ERR_SUCCESS;
147 int _mosquitto_handle_pubrel(struct mosquitto_db *db, struct mosquitto *mosq)
151 struct mosquitto_message_all *message = NULL;
156 #ifdef WITH_STRICT_PROTOCOL
157 if(mosq->in_packet.remaining_length != 2){
158 return MOSQ_ERR_PROTOCOL;
161 if(mosq->protocol == mosq_p_mqtt311){
162 if((mosq->in_packet.command&0x0F) != 0x02){
163 return MOSQ_ERR_PROTOCOL;
166 rc = _mosquitto_read_uint16(&mosq->in_packet, &mid);
169 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received PUBREL from %s (Mid: %d)", mosq->id, mid);
171 if(mqtt3_db_message_release(db, mosq, mid, mosq_md_in)){
172 /* Message not found. Still send a PUBCOMP anyway because this could be
173 * due to a repeated PUBREL after a client has reconnected. */
176 _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PUBREL (Mid: %d)", mosq->id, mid);
178 if(!_mosquitto_message_remove(mosq, mid, mosq_md_in, &message)){
179 /* Only pass the message on if we have removed it from the queue - this
180 * prevents multiple callbacks for the same message. */
181 pthread_mutex_lock(&mosq->callback_mutex);
182 if(mosq->on_message){
183 mosq->in_callback = true;
184 mosq->on_message(mosq, mosq->userdata, &message->msg);
185 mosq->in_callback = false;
187 pthread_mutex_unlock(&mosq->callback_mutex);
188 _mosquitto_message_cleanup(&message);
191 rc = _mosquitto_send_pubcomp(mosq, mid);
194 return MOSQ_ERR_SUCCESS;
197 int _mosquitto_handle_suback(struct mosquitto *mosq)
208 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received SUBACK from %s", mosq->id);
210 _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received SUBACK", mosq->id);
212 rc = _mosquitto_read_uint16(&mosq->in_packet, &mid);
215 qos_count = mosq->in_packet.remaining_length - mosq->in_packet.pos;
216 granted_qos = _mosquitto_malloc(qos_count*sizeof(int));
217 if(!granted_qos) return MOSQ_ERR_NOMEM;
218 while(mosq->in_packet.pos < mosq->in_packet.remaining_length){
219 rc = _mosquitto_read_byte(&mosq->in_packet, &qos);
221 _mosquitto_free(granted_qos);
224 granted_qos[i] = (int)qos;
228 pthread_mutex_lock(&mosq->callback_mutex);
229 if(mosq->on_subscribe){
230 mosq->in_callback = true;
231 mosq->on_subscribe(mosq, mosq->userdata, mid, qos_count, granted_qos);
232 mosq->in_callback = false;
234 pthread_mutex_unlock(&mosq->callback_mutex);
236 _mosquitto_free(granted_qos);
238 return MOSQ_ERR_SUCCESS;
241 int _mosquitto_handle_unsuback(struct mosquitto *mosq)
247 #ifdef WITH_STRICT_PROTOCOL
248 if(mosq->in_packet.remaining_length != 2){
249 return MOSQ_ERR_PROTOCOL;
253 _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received UNSUBACK from %s", mosq->id);
255 _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received UNSUBACK", mosq->id);
257 rc = _mosquitto_read_uint16(&mosq->in_packet, &mid);
260 pthread_mutex_lock(&mosq->callback_mutex);
261 if(mosq->on_unsubscribe){
262 mosq->in_callback = true;
263 mosq->on_unsubscribe(mosq, mosq->userdata, mid);
264 mosq->in_callback = false;
266 pthread_mutex_unlock(&mosq->callback_mutex);
269 return MOSQ_ERR_SUCCESS;