2 * //******************************************************************
4 * // Copyright 2016 Samsung Electronics All Rights Reserved.
6 * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
8 * // Licensed under the Apache License, Version 2.0 (the "License");
9 * // you may not use this file except in compliance with the License.
10 * // You may obtain a copy of the License at
12 * // http://www.apache.org/licenses/LICENSE-2.0
14 * // Unless required by applicable law or agreed to in writing, software
15 * // distributed under the License is distributed on an "AS IS" BASIS,
16 * // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * // See the License for the specific language governing permissions and
18 * // limitations under the License.
20 * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
22 package org.iotivity.cloud.mqserver.topic;
24 import java.util.ArrayList;
25 import java.util.HashMap;
27 import org.iotivity.cloud.base.device.Device;
28 import org.iotivity.cloud.base.exception.ServerException.ForbiddenException;
29 import org.iotivity.cloud.base.exception.ServerException.InternalServerErrorException;
30 import org.iotivity.cloud.base.exception.ServerException.NotFoundException;
31 import org.iotivity.cloud.base.exception.ServerException.PreconditionFailedException;
32 import org.iotivity.cloud.base.protocols.IRequest;
33 import org.iotivity.cloud.base.protocols.IResponse;
34 import org.iotivity.cloud.base.protocols.MessageBuilder;
35 import org.iotivity.cloud.base.protocols.enums.ContentFormat;
36 import org.iotivity.cloud.base.protocols.enums.ResponseStatus;
37 import org.iotivity.cloud.mqserver.Constants;
38 import org.iotivity.cloud.mqserver.kafka.KafkaConsumerWrapper;
39 import org.iotivity.cloud.mqserver.kafka.KafkaProducerWrapper;
40 import org.iotivity.cloud.util.Cbor;
44 private TopicManager mTopicManager = null;
46 private String mName = null;
47 private String mType = null;
48 private HashMap<String, Topic> mSubtopics = null;
50 private byte[] mLatestData = null;
52 private class TopicSubscriber {
53 TopicSubscriber(Device subscriber, IRequest request) {
54 mSubscriber = subscriber;
58 public Device mSubscriber;
59 public IRequest mRequest;
62 private HashMap<String, TopicSubscriber> mSubscribers = null;
64 private KafkaProducerWrapper mKafkaProducerOperator = null;
65 private KafkaConsumerWrapper mKafkaConsumerOperator = null;
67 Cbor<HashMap<String, Object>> mCbor = new Cbor<>();
69 public Topic(String name, String type, TopicManager topicManager) {
71 mTopicManager = topicManager;
75 mSubtopics = new HashMap<>();
76 mSubscribers = new HashMap<>();
78 String kafka_zookeeper = topicManager.getKafkaZookeeper();
79 String kafka_broker = topicManager.getKafkaBroker();
81 mKafkaProducerOperator = new KafkaProducerWrapper(kafka_broker, name);
82 mKafkaConsumerOperator = new KafkaConsumerWrapper(kafka_zookeeper,
85 HashMap<String, Object> data = new HashMap<>();
86 data.put(Constants.MQ_MESSAGE, null);
88 mLatestData = mCbor.encodingPayloadToCbor(data);
91 public String getName() {
95 public String getType() {
99 public IResponse handleCreateSubtopic(IRequest request) {
101 String newTopicName = request.getUriPathSegments()
102 .get(request.getUriPathSegments().size() - 1);
104 String newTopicType = new String();
106 if (request.getUriQueryMap() != null) {
107 newTopicType = request.getUriQueryMap().get("rt").get(0);
110 if (getSubtopic(newTopicName) != null) {
111 throw new ForbiddenException("topic already exist");
114 Topic newTopic = new Topic(mName + "/" + newTopicName, newTopicType,
117 if (mTopicManager.createTopic(newTopic) == false) {
118 throw new InternalServerErrorException("create topic falied");
121 synchronized (mSubtopics) {
122 mSubtopics.put(newTopicName, newTopic);
125 IResponse response = MessageBuilder.createResponse(request,
126 ResponseStatus.CREATED);
127 response.setLocationPath(request.getUriPath());
131 public IResponse handleRemoveSubtopic(IRequest request, String topicName) {
133 Topic targetTopic = getSubtopic(topicName);
135 if (targetTopic == null) {
136 throw new NotFoundException("topic doesn't exist");
139 targetTopic.cleanup();
141 if (mTopicManager.removeTopic(targetTopic) == false) {
142 throw new InternalServerErrorException("remove topic failed");
145 synchronized (mSubtopics) {
146 mSubtopics.remove(topicName);
149 return MessageBuilder.createResponse(request, ResponseStatus.DELETED);
152 public IResponse handleSubscribeTopic(Device srcDevice, IRequest request) {
154 // get latest data from kafka if consumer started for the first time
155 if (mKafkaConsumerOperator.consumerStarted() == false) {
157 ArrayList<byte[]> data = mKafkaConsumerOperator.getMessages();
159 if (data != null && !data.isEmpty()) {
160 mLatestData = data.get(data.size() - 1);
164 if (mKafkaConsumerOperator.subscribeTopic() == false) {
165 throw new InternalServerErrorException("subscribe topic failed");
168 synchronized (mSubscribers) {
169 mSubscribers.put(request.getRequestId(),
170 new TopicSubscriber(srcDevice, request));
173 return MessageBuilder.createResponse(request, ResponseStatus.CONTENT,
174 ContentFormat.APPLICATION_CBOR, mLatestData);
177 public IResponse handleUnsubscribeTopic(IRequest request) {
179 synchronized (mSubscribers) {
181 TopicSubscriber subscriber = mSubscribers
182 .get(request.getRequestId());
184 mSubscribers.remove(subscriber.mRequest.getRequestId());
186 // if there's no more subscriber, stop subscribing topic
187 // with kafka consumer
188 if (mSubscribers.isEmpty()) {
189 mKafkaConsumerOperator.unsubscribeTopic();
193 return MessageBuilder.createResponse(request, ResponseStatus.CONTENT,
194 ContentFormat.APPLICATION_CBOR, mLatestData);
197 public IResponse handlePublishMessage(IRequest request) {
198 byte[] payload = request.getPayload();
200 if (payload == null) {
201 throw new PreconditionFailedException("payload is null");
204 HashMap<String, Object> message = mCbor.parsePayloadFromCbor(payload,
208 || message.containsKey(Constants.MQ_MESSAGE) == false) {
209 throw new PreconditionFailedException(
210 "message field is not included");
213 if (mKafkaProducerOperator.publishMessage(payload) == false) {
214 throw new InternalServerErrorException("publish message failed");
217 return MessageBuilder.createResponse(request, ResponseStatus.CHANGED);
220 public IResponse handleReadMessage(IRequest request) {
221 // if consumer is not started, get data from kafka broker
222 if (mKafkaConsumerOperator.consumerStarted() == false) {
224 ArrayList<byte[]> data = mKafkaConsumerOperator.getMessages();
226 if (data != null && !data.isEmpty()) {
227 mLatestData = data.get(data.size() - 1);
231 return MessageBuilder.createResponse(request, ResponseStatus.CONTENT,
232 ContentFormat.APPLICATION_CBOR, mLatestData);
235 public void cleanup() {
237 mKafkaProducerOperator.closeConnection();
238 mKafkaConsumerOperator.closeConnection();
241 // callback from Kafka Consumer
242 public void onMessagePublished(byte[] message) {
244 mLatestData = message;
246 notifyPublishedMessage();
249 private Topic getSubtopic(String topicName) {
253 synchronized (mSubtopics) {
254 topic = mSubtopics.get(topicName);
260 private void notifyPublishedMessage() {
261 synchronized (mSubscribers) {
262 for (TopicSubscriber subscriber : mSubscribers.values()) {
264 subscriber.mSubscriber.sendResponse(
265 MessageBuilder.createResponse(subscriber.mRequest,
266 ResponseStatus.CONTENT,
267 ContentFormat.APPLICATION_CBOR, mLatestData));