private TopicManager mTopicManager = new TopicManager();
public MQBrokerResource() {
- super(Arrays.asList(Constants.PREFIX_OIC, Constants.MQ_BROKER_URI));
-
- // addQueryHandler(Arrays.asList("if=" + Constants.INTERFACE_DEFAULT),
- // this::onDefaultInterfaceReceived);
+ super(Arrays.asList(Constants.PREFIX_WELL_KNOWN, Constants.PREFIX_OCF,
+ Constants.MQ_BROKER_URI));
}
public void setKafkaInformation(String zookeeper, String broker) {
private IResponse handleGetRequest(Device srcDevice, IRequest request) {
// DISCOVER
- if (request.getUriPathSegments().size() == getUriPathSegments()
- .size()) {
+ if (request.getUriPathSegments().size() == getUriPathSegments().size()) {
return discoverTopic(request);
}
ResponseStatus.BAD_REQUEST);
}
- // PUBLISH
+ // CREATE topic
private IResponse handlePutRequest(IRequest request) {
- return publishMessage(request);
+ if (request.getUriPathSegments().size() == getUriPathSegments().size()) {
+
+ return MessageBuilder.createResponse(request,
+ ResponseStatus.BAD_REQUEST);
+ }
+
+ return createTopic(request);
}
- // CREATE topic
+ // PUBLISH
private IResponse handlePostRequest(IRequest request) {
- return createTopic(request);
+ return publishMessage(request);
}
// REMOVE topic
private IResponse createTopic(IRequest request) {
- String uriPath = request.getUriPath();
-
// main topic creation request
- if (request.getUriPathSegments().size() == getUriPathSegments()
- .size()) {
+ if (request.getUriPathSegments().size() == getUriPathSegments().size() + 1) {
return createMainTopic(request);
}
// subtopic creation request
+ String uriPath = request.getUriPath();
+ uriPath = uriPath.substring(0, uriPath.lastIndexOf('/'));
+
Topic targetTopic = mTopicManager.getTopic(uriPath);
if (targetTopic == null) {
private IResponse removeTopic(IRequest request) {
- String uriPath = request.getUriPath();
-
- String parentName = uriPath.substring(0, uriPath.lastIndexOf('/'));
- String targetName = request.getUriPathSegments()
- .get(request.getUriPathSegments().size() - 1);
-
// main topic removal request
if (request.getUriPathSegments().size() - 1 == getUriPathSegments()
.size()) {
}
// subtopic removal request
+ String uriPath = request.getUriPath();
+
+ String parentName = uriPath.substring(0, uriPath.lastIndexOf('/'));
+ String targetName = request.getUriPathSegments().get(
+ request.getUriPathSegments().size() - 1);
+
Topic parentTopic = mTopicManager.getTopic(parentName);
if (parentTopic == null) {
}
return MessageBuilder.createResponse(request, ResponseStatus.CONTENT,
- ContentFormat.APPLICATION_CBOR, MessageQueueUtils
- .buildPayload(Constants.MQ_TOPICLIST, topicList));
+ ContentFormat.APPLICATION_CBOR, MessageQueueUtils.buildPayload(
+ Constants.MQ_TOPICLIST, topicList));
}
private IResponse createMainTopic(IRequest request) {
- String topicName = MessageQueueUtils.extractDataFromPayload(
- request.getPayload(), Constants.MQ_TOPIC);
+ String topicName = request.getUriPathSegments().get(
+ request.getUriPathSegments().size() - 1);
String type = new String();
ResponseStatus.BAD_REQUEST);
}
- topicName = "/" + Constants.PREFIX_OIC + "/" + Constants.MQ_BROKER_URI
- + "/" + topicName;
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("/");
+
+ for (String uri : getUriPathSegments()) {
+ stringBuilder.append(uri);
+ stringBuilder.append("/");
+ }
+
+ stringBuilder.append(topicName);
+
+ topicName = stringBuilder.toString();
if (mTopicManager.getTopic(topicName) != null) {
// Topic already exists
ResponseStatus.INTERNAL_SERVER_ERROR);
}
- return MessageBuilder.createResponse(request, ResponseStatus.CREATED,
- ContentFormat.APPLICATION_CBOR, MessageQueueUtils.buildPayload(
- Constants.MQ_LOCATION, newTopic.getName()));
+ return MessageBuilder.createResponse(request, ResponseStatus.CREATED);
}
private IResponse removeMainTopic(IRequest request) {
Topic targetTopic = mTopicManager.getTopic(topicName);
- // TODO check error
if (targetTopic == null) {
// Topic doesn't exist
return MessageBuilder.createResponse(request,
ResponseStatus.BAD_REQUEST);
}
+ targetTopic.cleanup();
+
if (mTopicManager.removeTopic(targetTopic) == false) {
return MessageBuilder.createResponse(request,
ResponseStatus.INTERNAL_SERVER_ERROR);
import org.iotivity.cloud.base.protocols.MessageBuilder;
import org.iotivity.cloud.base.protocols.enums.ContentFormat;
import org.iotivity.cloud.base.protocols.enums.ResponseStatus;
-import org.iotivity.cloud.mqserver.Constants;
-import org.iotivity.cloud.mqserver.MessageQueueUtils;
import org.iotivity.cloud.mqserver.kafka.KafkaConsumerWrapper;
import org.iotivity.cloud.mqserver.kafka.KafkaProducerWrapper;
public IResponse handleCreateSubtopic(IRequest request) {
- String newTopicName = MessageQueueUtils.extractDataFromPayload(
- request.getPayload(), Constants.MQ_TOPIC);
+ String newTopicName = request.getUriPathSegments().get(
+ request.getUriPathSegments().size() - 1);
String newTopicType = new String();
mSubtopics.put(newTopicName, newTopic);
- return MessageBuilder.createResponse(
- request,
- ResponseStatus.CREATED,
- ContentFormat.APPLICATION_CBOR,
- MessageQueueUtils.buildPayload(Constants.MQ_LOCATION,
- newTopic.getName()));
+ return MessageBuilder.createResponse(request, ResponseStatus.CREATED);
}
public IResponse handleRemoveSubtopic(IRequest request, String topicName) {
Topic targetTopic = getSubtopic(topicName);
- // TODO check error
if (targetTopic == null) {
// topic doesn't exist
return MessageBuilder.createResponse(request,
ResponseStatus.BAD_REQUEST);
}
+ targetTopic.cleanup();
+
if (mTopicManager.removeTopic(targetTopic) == false) {
+
return MessageBuilder.createResponse(request,
ResponseStatus.INTERNAL_SERVER_ERROR);
}
ContentFormat.APPLICATION_CBOR, mLatestData);
}
+ public void cleanup() {
+
+ mKafkaProducerOperator.closeConnection();
+ mKafkaConsumerOperator.closeConnection();
+ }
+
// callback from Kafka Consumer
public void onMessagePublished(byte[] message) {
notifyPublishedMessage();
}
- // TODO check
private Topic getSubtopic(String topicName) {
if (mSubtopics.containsKey(topicName) == false) {
private void notifyPublishedMessage() {
synchronized (mSubscribers) {
for (TopicSubscriber subscriber : mSubscribers.values()) {
+
subscriber.mSubscriber.sendResponse(MessageBuilder
.createResponse(subscriber.mRequest,
ResponseStatus.CONTENT,