Imported Upstream version 0.9.2
[platform/upstream/iotivity.git] / service / protocol-plugin / plugins / mqtt-fan / lib / read_handle_shared.c
1 /*
2 Copyright (c) 2009-2013 Roger Light <roger@atchoo.org>
3 All rights reserved.
4
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
7
8 1. Redistributions of source code must retain the above copyright notice,
9    this list of conditions and the following disclaimer.
10 2. Redistributions in binary form must reproduce the above copyright
11    notice, this list of conditions and the following disclaimer in the
12    documentation and/or other materials provided with the distribution.
13 3. Neither the name of mosquitto nor the names of its
14    contributors may be used to endorse or promote products derived from
15    this software without specific prior written permission.
16
17 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include <assert.h>
31 #include <stdio.h>
32 #include <string.h>
33
34 #include "mosquitto.h"
35 #include "logging_mosq.h"
36 #include "memory_mosq.h"
37 #include "messages_mosq.h"
38 #include "mqtt3_protocol.h"
39 #include "net_mosq.h"
40 #include "read_handle.h"
41 #include "send_mosq.h"
42 #include "util_mosq.h"
43 #ifdef WITH_BROKER
44 #include "mosquitto_broker.h"
45 #endif
46
47 int _mosquitto_handle_pingreq(struct mosquitto *mosq)
48 {
49         assert(mosq);
50 #ifdef WITH_STRICT_PROTOCOL
51         if(mosq->in_packet.remaining_length != 0){
52                 return MOSQ_ERR_PROTOCOL;
53         }
54 #endif
55 #ifdef WITH_BROKER
56         _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received PINGREQ from %s", mosq->id);
57 #else
58         _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PINGREQ", mosq->id);
59 #endif
60         return _mosquitto_send_pingresp(mosq);
61 }
62
63 int _mosquitto_handle_pingresp(struct mosquitto *mosq)
64 {
65         assert(mosq);
66 #ifdef WITH_STRICT_PROTOCOL
67         if(mosq->in_packet.remaining_length != 0){
68                 return MOSQ_ERR_PROTOCOL;
69         }
70 #endif
71         mosq->ping_t = 0; /* No longer waiting for a PINGRESP. */
72 #ifdef WITH_BROKER
73         _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received PINGRESP from %s", mosq->id);
74 #else
75         _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PINGRESP", mosq->id);
76 #endif
77         return MOSQ_ERR_SUCCESS;
78 }
79
80 int _mosquitto_handle_pubackcomp(struct mosquitto *mosq, const char *type)
81 {
82         uint16_t mid;
83         int rc;
84
85         assert(mosq);
86 #ifdef WITH_STRICT_PROTOCOL
87         if(mosq->in_packet.remaining_length != 2){
88                 return MOSQ_ERR_PROTOCOL;
89         }
90 #endif
91         rc = _mosquitto_read_uint16(&mosq->in_packet, &mid);
92         if(rc) return rc;
93 #ifdef WITH_BROKER
94         _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received %s from %s (Mid: %d)", type, mosq->id, mid);
95
96         if(mid){
97                 rc = mqtt3_db_message_delete(mosq, mid, mosq_md_out);
98                 if(rc) return rc;
99         }
100 #else
101         _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received %s (Mid: %d)", mosq->id, type, mid);
102
103         if(!_mosquitto_message_delete(mosq, mid, mosq_md_out)){
104                 /* Only inform the client the message has been sent once. */
105                 pthread_mutex_lock(&mosq->callback_mutex);
106                 if(mosq->on_publish){
107                         mosq->in_callback = true;
108                         mosq->on_publish(mosq, mosq->userdata, mid);
109                         mosq->in_callback = false;
110                 }
111                 pthread_mutex_unlock(&mosq->callback_mutex);
112         }
113 #endif
114
115         return MOSQ_ERR_SUCCESS;
116 }
117
118 int _mosquitto_handle_pubrec(struct mosquitto *mosq)
119 {
120         uint16_t mid;
121         int rc;
122
123         assert(mosq);
124 #ifdef WITH_STRICT_PROTOCOL
125         if(mosq->in_packet.remaining_length != 2){
126                 return MOSQ_ERR_PROTOCOL;
127         }
128 #endif
129         rc = _mosquitto_read_uint16(&mosq->in_packet, &mid);
130         if(rc) return rc;
131 #ifdef WITH_BROKER
132         _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received PUBREC from %s (Mid: %d)", mosq->id, mid);
133
134         rc = mqtt3_db_message_update(mosq, mid, mosq_md_out, mosq_ms_wait_for_pubcomp);
135 #else
136         _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PUBREC (Mid: %d)", mosq->id, mid);
137
138         rc = _mosquitto_message_out_update(mosq, mid, mosq_ms_wait_for_pubcomp);
139 #endif
140         if(rc) return rc;
141         rc = _mosquitto_send_pubrel(mosq, mid, false);
142         if(rc) return rc;
143
144         return MOSQ_ERR_SUCCESS;
145 }
146
147 int _mosquitto_handle_pubrel(struct mosquitto_db *db, struct mosquitto *mosq)
148 {
149         uint16_t mid;
150 #ifndef WITH_BROKER
151         struct mosquitto_message_all *message = NULL;
152 #endif
153         int rc;
154
155         assert(mosq);
156 #ifdef WITH_STRICT_PROTOCOL
157         if(mosq->in_packet.remaining_length != 2){
158                 return MOSQ_ERR_PROTOCOL;
159         }
160 #endif
161         if(mosq->protocol == mosq_p_mqtt311){
162                 if((mosq->in_packet.command&0x0F) != 0x02){
163                         return MOSQ_ERR_PROTOCOL;
164                 }
165         }
166         rc = _mosquitto_read_uint16(&mosq->in_packet, &mid);
167         if(rc) return rc;
168 #ifdef WITH_BROKER
169         _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received PUBREL from %s (Mid: %d)", mosq->id, mid);
170
171         if(mqtt3_db_message_release(db, mosq, mid, mosq_md_in)){
172                 /* Message not found. Still send a PUBCOMP anyway because this could be
173                  * due to a repeated PUBREL after a client has reconnected. */
174         }
175 #else
176         _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PUBREL (Mid: %d)", mosq->id, mid);
177
178         if(!_mosquitto_message_remove(mosq, mid, mosq_md_in, &message)){
179                 /* Only pass the message on if we have removed it from the queue - this
180                  * prevents multiple callbacks for the same message. */
181                 pthread_mutex_lock(&mosq->callback_mutex);
182                 if(mosq->on_message){
183                         mosq->in_callback = true;
184                         mosq->on_message(mosq, mosq->userdata, &message->msg);
185                         mosq->in_callback = false;
186                 }
187                 pthread_mutex_unlock(&mosq->callback_mutex);
188                 _mosquitto_message_cleanup(&message);
189         }
190 #endif
191         rc = _mosquitto_send_pubcomp(mosq, mid);
192         if(rc) return rc;
193
194         return MOSQ_ERR_SUCCESS;
195 }
196
197 int _mosquitto_handle_suback(struct mosquitto *mosq)
198 {
199         uint16_t mid;
200         uint8_t qos;
201         int *granted_qos;
202         int qos_count;
203         int i = 0;
204         int rc;
205
206         assert(mosq);
207 #ifdef WITH_BROKER
208         _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received SUBACK from %s", mosq->id);
209 #else
210         _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received SUBACK", mosq->id);
211 #endif
212         rc = _mosquitto_read_uint16(&mosq->in_packet, &mid);
213         if(rc) return rc;
214
215         qos_count = mosq->in_packet.remaining_length - mosq->in_packet.pos;
216         granted_qos = _mosquitto_malloc(qos_count*sizeof(int));
217         if(!granted_qos) return MOSQ_ERR_NOMEM;
218         while(mosq->in_packet.pos < mosq->in_packet.remaining_length){
219                 rc = _mosquitto_read_byte(&mosq->in_packet, &qos);
220                 if(rc){
221                         _mosquitto_free(granted_qos);
222                         return rc;
223                 }
224                 granted_qos[i] = (int)qos;
225                 i++;
226         }
227 #ifndef WITH_BROKER
228         pthread_mutex_lock(&mosq->callback_mutex);
229         if(mosq->on_subscribe){
230                 mosq->in_callback = true;
231                 mosq->on_subscribe(mosq, mosq->userdata, mid, qos_count, granted_qos);
232                 mosq->in_callback = false;
233         }
234         pthread_mutex_unlock(&mosq->callback_mutex);
235 #endif
236         _mosquitto_free(granted_qos);
237
238         return MOSQ_ERR_SUCCESS;
239 }
240
241 int _mosquitto_handle_unsuback(struct mosquitto *mosq)
242 {
243         uint16_t mid;
244         int rc;
245
246         assert(mosq);
247 #ifdef WITH_STRICT_PROTOCOL
248         if(mosq->in_packet.remaining_length != 2){
249                 return MOSQ_ERR_PROTOCOL;
250         }
251 #endif
252 #ifdef WITH_BROKER
253         _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received UNSUBACK from %s", mosq->id);
254 #else
255         _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG, "Client %s received UNSUBACK", mosq->id);
256 #endif
257         rc = _mosquitto_read_uint16(&mosq->in_packet, &mid);
258         if(rc) return rc;
259 #ifndef WITH_BROKER
260         pthread_mutex_lock(&mosq->callback_mutex);
261         if(mosq->on_unsubscribe){
262                 mosq->in_callback = true;
263                 mosq->on_unsubscribe(mosq, mosq->userdata, mid);
264                 mosq->in_callback = false;
265         }
266         pthread_mutex_unlock(&mosq->callback_mutex);
267 #endif
268
269         return MOSQ_ERR_SUCCESS;
270 }
271