iotivity 0.9.0
[platform/upstream/iotivity.git] / service / protocol-plugin / plugins / mqtt-light / lib / read_handle.c
1 /*
2 Copyright (c) 2009-2013 Roger Light <roger@atchoo.org>
3 All rights reserved.
4
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
7
8 1. Redistributions of source code must retain the above copyright notice,
9    this list of conditions and the following disclaimer.
10 2. Redistributions in binary form must reproduce the above copyright
11    notice, this list of conditions and the following disclaimer in the
12    documentation and/or other materials provided with the distribution.
13 3. Neither the name of mosquitto nor the names of its
14    contributors may be used to endorse or promote products derived from
15    this software without specific prior written permission.
16
17 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include <assert.h>
31 #include <stdio.h>
32 #include <string.h>
33
34 #include "mosquitto.h"
35 #include "logging_mosq.h"
36 #include "memory_mosq.h"
37 #include "messages_mosq.h"
38 #include "mqtt3_protocol.h"
39 #include "net_mosq.h"
40 #include "read_handle.h"
41 #include "send_mosq.h"
42 #include "time_mosq.h"
43 #include "util_mosq.h"
44
45 int _mosquitto_packet_handle(struct mosquitto *mosq)
46 {
47         assert(mosq);
48
49         switch((mosq->in_packet.command)&0xF0){
50                 case PINGREQ:
51                         return _mosquitto_handle_pingreq(mosq);
52                 case PINGRESP:
53                         return _mosquitto_handle_pingresp(mosq);
54                 case PUBACK:
55                         return _mosquitto_handle_pubackcomp(mosq, "PUBACK");
56                 case PUBCOMP:
57                         return _mosquitto_handle_pubackcomp(mosq, "PUBCOMP");
58                 case PUBLISH:
59                         return _mosquitto_handle_publish(mosq);
60                 case PUBREC:
61                         return _mosquitto_handle_pubrec(mosq);
62                 case PUBREL:
63                         return _mosquitto_handle_pubrel(NULL, mosq);
64                 case CONNACK:
65                         return _mosquitto_handle_connack(mosq);
66                 case SUBACK:
67                         return _mosquitto_handle_suback(mosq);
68                 case UNSUBACK:
69                         return _mosquitto_handle_unsuback(mosq);
70                 default:
71                         /* If we don't recognise the command, return an error straight away. */
72                         _mosquitto_log_printf(mosq, MOSQ_LOG_ERR, "Error: Unrecognised command %d\n", (mosq->in_packet.command)&0xF0);
73                         return MOSQ_ERR_PROTOCOL;
74         }
75 }
76
77 int _mosquitto_handle_publish(struct mosquitto *mosq)
78 {
79         uint8_t header;
80         struct mosquitto_message_all *message;
81         int rc = 0;
82         uint16_t mid;
83
84         assert(mosq);
85
86         message = _mosquitto_calloc(1, sizeof(struct mosquitto_message_all));
87         if(!message) return MOSQ_ERR_NOMEM;
88
89         header = mosq->in_packet.command;
90
91         message->dup = (header & 0x08)>>3;
92         message->msg.qos = (header & 0x06)>>1;
93         message->msg.retain = (header & 0x01);
94
95         rc = _mosquitto_read_string(&mosq->in_packet, &message->msg.topic);
96         if(rc){
97                 _mosquitto_message_cleanup(&message);
98                 return rc;
99         }
100         if(!strlen(message->msg.topic)){
101                 _mosquitto_message_cleanup(&message);
102                 return MOSQ_ERR_PROTOCOL;
103         }
104
105         if(message->msg.qos > 0){
106                 rc = _mosquitto_read_uint16(&mosq->in_packet, &mid);
107                 if(rc){
108                         _mosquitto_message_cleanup(&message);
109                         return rc;
110                 }
111                 message->msg.mid = (int)mid;
112         }
113
114         message->msg.payloadlen = mosq->in_packet.remaining_length - mosq->in_packet.pos;
115         if(message->msg.payloadlen){
116                 message->msg.payload = _mosquitto_calloc(message->msg.payloadlen+1, sizeof(uint8_t));
117                 if(!message->msg.payload){
118                         _mosquitto_message_cleanup(&message);
119                         return MOSQ_ERR_NOMEM;
120                 }
121                 rc = _mosquitto_read_bytes(&mosq->in_packet, message->msg.payload, message->msg.payloadlen);
122                 if(rc){
123                         _mosquitto_message_cleanup(&message);
124                         return rc;
125                 }
126         }
127         _mosquitto_log_printf(mosq, MOSQ_LOG_DEBUG,
128                         "Client %s received PUBLISH (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))",
129                         mosq->id, message->dup, message->msg.qos, message->msg.retain,
130                         message->msg.mid, message->msg.topic,
131                         (long)message->msg.payloadlen);
132
133         message->timestamp = mosquitto_time();
134         switch(message->msg.qos){
135                 case 0:
136                         pthread_mutex_lock(&mosq->callback_mutex);
137                         if(mosq->on_message){
138                                 mosq->in_callback = true;
139                                 mosq->on_message(mosq, mosq->userdata, &message->msg);
140                                 mosq->in_callback = false;
141                         }
142                         pthread_mutex_unlock(&mosq->callback_mutex);
143                         _mosquitto_message_cleanup(&message);
144                         return MOSQ_ERR_SUCCESS;
145                 case 1:
146                         rc = _mosquitto_send_puback(mosq, message->msg.mid);
147                         pthread_mutex_lock(&mosq->callback_mutex);
148                         if(mosq->on_message){
149                                 mosq->in_callback = true;
150                                 mosq->on_message(mosq, mosq->userdata, &message->msg);
151                                 mosq->in_callback = false;
152                         }
153                         pthread_mutex_unlock(&mosq->callback_mutex);
154                         _mosquitto_message_cleanup(&message);
155                         return rc;
156                 case 2:
157                         rc = _mosquitto_send_pubrec(mosq, message->msg.mid);
158                         pthread_mutex_lock(&mosq->in_message_mutex);
159                         message->state = mosq_ms_wait_for_pubrel;
160                         _mosquitto_message_queue(mosq, message, mosq_md_in);
161                         pthread_mutex_unlock(&mosq->in_message_mutex);
162                         return rc;
163                 default:
164                         _mosquitto_message_cleanup(&message);
165                         return MOSQ_ERR_PROTOCOL;
166         }
167 }
168