MQ unit test updated
[platform/upstream/iotivity.git] / cloud / messagequeue / src / main / java / org / iotivity / cloud / mqserver / topic / Topic.java
1 /*
2  * //******************************************************************
3  * //
4  * // Copyright 2016 Samsung Electronics All Rights Reserved.
5  * //
6  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7  * //
8  * // Licensed under the Apache License, Version 2.0 (the "License");
9  * // you may not use this file except in compliance with the License.
10  * // You may obtain a copy of the License at
11  * //
12  * //      http://www.apache.org/licenses/LICENSE-2.0
13  * //
14  * // Unless required by applicable law or agreed to in writing, software
15  * // distributed under the License is distributed on an "AS IS" BASIS,
16  * // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * // See the License for the specific language governing permissions and
18  * // limitations under the License.
19  * //
20  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21  */
22 package org.iotivity.cloud.mqserver.topic;
23
24 import java.util.ArrayList;
25 import java.util.HashMap;
26
27 import org.iotivity.cloud.base.device.Device;
28 import org.iotivity.cloud.base.exception.ServerException.ForbiddenException;
29 import org.iotivity.cloud.base.exception.ServerException.InternalServerErrorException;
30 import org.iotivity.cloud.base.exception.ServerException.NotFoundException;
31 import org.iotivity.cloud.base.exception.ServerException.PreconditionFailedException;
32 import org.iotivity.cloud.base.protocols.IRequest;
33 import org.iotivity.cloud.base.protocols.IResponse;
34 import org.iotivity.cloud.base.protocols.MessageBuilder;
35 import org.iotivity.cloud.base.protocols.enums.ContentFormat;
36 import org.iotivity.cloud.base.protocols.enums.ResponseStatus;
37 import org.iotivity.cloud.mqserver.Constants;
38 import org.iotivity.cloud.mqserver.kafka.KafkaConsumerWrapper;
39 import org.iotivity.cloud.mqserver.kafka.KafkaProducerWrapper;
40 import org.iotivity.cloud.util.Cbor;
41
42 public class Topic {
43
44     private TopicManager           mTopicManager = null;
45
46     private String                 mName         = null;
47     private String                 mType         = null;
48     private HashMap<String, Topic> mSubtopics    = null;
49
50     private byte[]                 mLatestData   = null;
51
52     private class TopicSubscriber {
53         TopicSubscriber(Device subscriber, IRequest request) {
54             mSubscriber = subscriber;
55             mRequest = request;
56         }
57
58         public Device   mSubscriber;
59         public IRequest mRequest;
60     }
61
62     private HashMap<String, TopicSubscriber> mSubscribers           = null;
63
64     private KafkaProducerWrapper             mKafkaProducerOperator = null;
65     private KafkaConsumerWrapper             mKafkaConsumerOperator = null;
66
67     Cbor<HashMap<String, Object>>            mCbor                  = new Cbor<>();
68
69     public Topic(String name, String type, TopicManager topicManager) {
70
71         mTopicManager = topicManager;
72         mName = name;
73         mType = type;
74
75         mSubtopics = new HashMap<>();
76         mSubscribers = new HashMap<>();
77
78         String kafka_zookeeper = topicManager.getKafkaZookeeper();
79         String kafka_broker = topicManager.getKafkaBroker();
80
81         mKafkaProducerOperator = new KafkaProducerWrapper(kafka_broker, name);
82         mKafkaConsumerOperator = new KafkaConsumerWrapper(kafka_zookeeper,
83                 kafka_broker, this);
84
85         HashMap<String, Object> data = new HashMap<>();
86         data.put(Constants.MQ_MESSAGE, null);
87
88         mLatestData = mCbor.encodingPayloadToCbor(data);
89     }
90
91     public String getName() {
92         return mName;
93     }
94
95     public String getType() {
96         return mType;
97     }
98
99     public IResponse handleCreateSubtopic(IRequest request) {
100
101         String newTopicName = request.getUriPathSegments()
102                 .get(request.getUriPathSegments().size() - 1);
103
104         String newTopicType = new String();
105
106         if (request.getUriQueryMap() != null) {
107             newTopicType = request.getUriQueryMap().get("rt").get(0);
108         }
109
110         if (getSubtopic(newTopicName) != null) {
111             throw new ForbiddenException("topic already exist");
112         }
113
114         Topic newTopic = new Topic(mName + "/" + newTopicName, newTopicType,
115                 mTopicManager);
116
117         if (mTopicManager.createTopic(newTopic) == false) {
118             throw new InternalServerErrorException("create topic falied");
119         }
120
121         synchronized (mSubtopics) {
122             mSubtopics.put(newTopicName, newTopic);
123         }
124
125         IResponse response = MessageBuilder.createResponse(request,
126                 ResponseStatus.CREATED);
127         response.setLocationPath(request.getUriPath());
128         return response;
129     }
130
131     public IResponse handleRemoveSubtopic(IRequest request, String topicName) {
132
133         Topic targetTopic = getSubtopic(topicName);
134
135         if (targetTopic == null) {
136             throw new NotFoundException("topic doesn't exist");
137         }
138
139         targetTopic.cleanup();
140
141         if (mTopicManager.removeTopic(targetTopic) == false) {
142             throw new InternalServerErrorException("remove topic failed");
143         }
144
145         synchronized (mSubtopics) {
146             mSubtopics.remove(topicName);
147         }
148
149         return MessageBuilder.createResponse(request, ResponseStatus.DELETED);
150     }
151
152     public IResponse handleSubscribeTopic(Device srcDevice, IRequest request) {
153
154         // get latest data from kafka if consumer started for the first time
155         if (mKafkaConsumerOperator.consumerStarted() == false) {
156
157             ArrayList<byte[]> data = mKafkaConsumerOperator.getMessages();
158
159             if (data != null && !data.isEmpty()) {
160                 mLatestData = data.get(data.size() - 1);
161             }
162         }
163
164         if (mKafkaConsumerOperator.subscribeTopic() == false) {
165             throw new InternalServerErrorException("subscribe topic failed");
166         }
167
168         synchronized (mSubscribers) {
169             mSubscribers.put(request.getRequestId(),
170                     new TopicSubscriber(srcDevice, request));
171         }
172
173         return MessageBuilder.createResponse(request, ResponseStatus.CONTENT,
174                 ContentFormat.APPLICATION_CBOR, mLatestData);
175     }
176
177     public IResponse handleUnsubscribeTopic(IRequest request) {
178
179         synchronized (mSubscribers) {
180
181             TopicSubscriber subscriber = mSubscribers
182                     .get(request.getRequestId());
183
184             mSubscribers.remove(subscriber.mRequest.getRequestId());
185
186             // if there's no more subscriber, stop subscribing topic
187             // with kafka consumer
188             if (mSubscribers.isEmpty()) {
189                 mKafkaConsumerOperator.unsubscribeTopic();
190             }
191         }
192
193         return MessageBuilder.createResponse(request, ResponseStatus.CONTENT,
194                 ContentFormat.APPLICATION_CBOR, mLatestData);
195     }
196
197     public IResponse handlePublishMessage(IRequest request) {
198         byte[] payload = request.getPayload();
199
200         if (payload == null) {
201             throw new PreconditionFailedException("payload is null");
202         }
203
204         HashMap<String, Object> message = mCbor.parsePayloadFromCbor(payload,
205                 HashMap.class);
206
207         if (message == null
208                 || message.containsKey(Constants.MQ_MESSAGE) == false) {
209             throw new PreconditionFailedException(
210                     "message field is not included");
211         }
212
213         if (mKafkaProducerOperator.publishMessage(payload) == false) {
214             throw new InternalServerErrorException("publish message failed");
215         }
216
217         return MessageBuilder.createResponse(request, ResponseStatus.CHANGED);
218     }
219
220     public IResponse handleReadMessage(IRequest request) {
221         // if consumer is not started, get data from kafka broker
222         if (mKafkaConsumerOperator.consumerStarted() == false) {
223
224             ArrayList<byte[]> data = mKafkaConsumerOperator.getMessages();
225
226             if (data != null && !data.isEmpty()) {
227                 mLatestData = data.get(data.size() - 1);
228             }
229         }
230
231         return MessageBuilder.createResponse(request, ResponseStatus.CONTENT,
232                 ContentFormat.APPLICATION_CBOR, mLatestData);
233     }
234
235     public void cleanup() {
236
237         mKafkaProducerOperator.closeConnection();
238         mKafkaConsumerOperator.closeConnection();
239     }
240
241     // callback from Kafka Consumer
242     public void onMessagePublished(byte[] message) {
243
244         mLatestData = message;
245
246         notifyPublishedMessage();
247     }
248
249     private Topic getSubtopic(String topicName) {
250
251         Topic topic = null;
252
253         synchronized (mSubtopics) {
254             topic = mSubtopics.get(topicName);
255         }
256
257         return topic;
258     }
259
260     private void notifyPublishedMessage() {
261         synchronized (mSubscribers) {
262             for (TopicSubscriber subscriber : mSubscribers.values()) {
263
264                 subscriber.mSubscriber.sendResponse(
265                         MessageBuilder.createResponse(subscriber.mRequest,
266                                 ResponseStatus.CONTENT,
267                                 ContentFormat.APPLICATION_CBOR, mLatestData));
268             }
269         }
270     }
271 }