Modify Message queue to apply latest OCF resource model proposal
authorMinji Park <minjii.park@samsung.com>
Thu, 21 Jul 2016 00:37:00 +0000 (09:37 +0900)
committerJee Hyeok Kim <jihyeok13.kim@samsung.com>
Thu, 21 Jul 2016 05:48:04 +0000 (05:48 +0000)
- uri of mq broker changed
- create and publish topic api changed

Change-Id: I81ded0bb574461f0fa07e2920085e26307749ea0
Signed-off-by: Minji Park <minjii.park@samsung.com>
Reviewed-on: https://gerrit.iotivity.org/gerrit/9523
Tested-by: jenkins-iotivity <jenkins-iotivity@opendaylight.org>
Reviewed-by: Jee Hyeok Kim <jihyeok13.kim@samsung.com>
cloud/interface/src/main/java/org/iotivity/cloud/ciserver/resources/proxy/MessageQueue.java
cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/Constants.java
cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/kafka/KafkaConsumerWrapper.java
cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/kafka/KafkaProducerWrapper.java
cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/resources/MQBrokerResource.java
cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/topic/Topic.java

index a4153f9..0d0ca3c 100644 (file)
@@ -35,7 +35,8 @@ public class MessageQueue extends Resource {
     IRequestChannel mPSServer = null;
 
     public MessageQueue() {
-        super(Arrays.asList(Constants.PREFIX_OIC, Constants.MQ_BROKER_URI));
+        super(Arrays.asList(Constants.PREFIX_WELL_KNOWN, Constants.PREFIX_OCF,
+                Constants.MQ_BROKER_URI));
 
         mPSServer = ConnectorPool.getConnection("mq");
     }
index 23d6318..821a537 100644 (file)
@@ -25,13 +25,8 @@ import org.iotivity.cloud.base.OCFConstants;
 
 public class Constants extends OCFConstants {
 
-    public static final String MQ_TOPIC                = "topic";
-    public static final String MQ_LOCATION             = "location";
     public static final String MQ_TOPICLIST            = "topiclist";
 
-    public static final long   MIN_SEQ_NUM             = 5;
-    public static final long   MAX_SEQ_NUM             = 16777215;
-
     // For Kafka
     public static final int    KAFKA_SESSION_TIMEOUT   = 10000;
     public static final int    KAFKA_CONNECT_TIMEOUT   = 10000;
index 8c151c3..0449125 100644 (file)
@@ -168,6 +168,16 @@ public class KafkaConsumerWrapper {
         return true;
     }
 
+    public void closeConnection() {
+
+        if (mConsumerStarted == true) {
+            unsubscribeTopic();
+        }
+
+        mZkUtils.close();
+        mZkClient.close();
+    }
+
     public ArrayList<byte[]> getMessages() {
 
         Logger.d("kafka get all messages - " + mTopicName);
index 903f046..9feefeb 100644 (file)
@@ -58,6 +58,11 @@ public class KafkaProducerWrapper {
         return true;
     }
 
+    public void closeConnection() {
+
+        mProducer.close();
+    }
+
     private Properties buildPropertiesForPublish() {
 
         // TODO check property settings
index a137f75..4109b3a 100644 (file)
@@ -44,10 +44,8 @@ public class MQBrokerResource extends Resource {
     private TopicManager mTopicManager = new TopicManager();
 
     public MQBrokerResource() {
-        super(Arrays.asList(Constants.PREFIX_OIC, Constants.MQ_BROKER_URI));
-
-        // addQueryHandler(Arrays.asList("if=" + Constants.INTERFACE_DEFAULT),
-        // this::onDefaultInterfaceReceived);
+        super(Arrays.asList(Constants.PREFIX_WELL_KNOWN, Constants.PREFIX_OCF,
+                Constants.MQ_BROKER_URI));
     }
 
     public void setKafkaInformation(String zookeeper, String broker) {
@@ -82,8 +80,7 @@ public class MQBrokerResource extends Resource {
     private IResponse handleGetRequest(Device srcDevice, IRequest request) {
 
         // DISCOVER
-        if (request.getUriPathSegments().size() == getUriPathSegments()
-                .size()) {
+        if (request.getUriPathSegments().size() == getUriPathSegments().size()) {
             return discoverTopic(request);
         }
 
@@ -105,16 +102,22 @@ public class MQBrokerResource extends Resource {
                 ResponseStatus.BAD_REQUEST);
     }
 
-    // PUBLISH
+    // CREATE topic
     private IResponse handlePutRequest(IRequest request) {
 
-        return publishMessage(request);
+        if (request.getUriPathSegments().size() == getUriPathSegments().size()) {
+
+            return MessageBuilder.createResponse(request,
+                    ResponseStatus.BAD_REQUEST);
+        }
+
+        return createTopic(request);
     }
 
-    // CREATE topic
+    // PUBLISH
     private IResponse handlePostRequest(IRequest request) {
 
-        return createTopic(request);
+        return publishMessage(request);
     }
 
     // REMOVE topic
@@ -125,15 +128,15 @@ public class MQBrokerResource extends Resource {
 
     private IResponse createTopic(IRequest request) {
 
-        String uriPath = request.getUriPath();
-
         // main topic creation request
-        if (request.getUriPathSegments().size() == getUriPathSegments()
-                .size()) {
+        if (request.getUriPathSegments().size() == getUriPathSegments().size() + 1) {
             return createMainTopic(request);
         }
 
         // subtopic creation request
+        String uriPath = request.getUriPath();
+        uriPath = uriPath.substring(0, uriPath.lastIndexOf('/'));
+
         Topic targetTopic = mTopicManager.getTopic(uriPath);
 
         if (targetTopic == null) {
@@ -146,12 +149,6 @@ public class MQBrokerResource extends Resource {
 
     private IResponse removeTopic(IRequest request) {
 
-        String uriPath = request.getUriPath();
-
-        String parentName = uriPath.substring(0, uriPath.lastIndexOf('/'));
-        String targetName = request.getUriPathSegments()
-                .get(request.getUriPathSegments().size() - 1);
-
         // main topic removal request
         if (request.getUriPathSegments().size() - 1 == getUriPathSegments()
                 .size()) {
@@ -159,6 +156,12 @@ public class MQBrokerResource extends Resource {
         }
 
         // subtopic removal request
+        String uriPath = request.getUriPath();
+
+        String parentName = uriPath.substring(0, uriPath.lastIndexOf('/'));
+        String targetName = request.getUriPathSegments().get(
+                request.getUriPathSegments().size() - 1);
+
         Topic parentTopic = mTopicManager.getTopic(parentName);
 
         if (parentTopic == null) {
@@ -233,14 +236,14 @@ public class MQBrokerResource extends Resource {
         }
 
         return MessageBuilder.createResponse(request, ResponseStatus.CONTENT,
-                ContentFormat.APPLICATION_CBOR, MessageQueueUtils
-                        .buildPayload(Constants.MQ_TOPICLIST, topicList));
+                ContentFormat.APPLICATION_CBOR, MessageQueueUtils.buildPayload(
+                        Constants.MQ_TOPICLIST, topicList));
     }
 
     private IResponse createMainTopic(IRequest request) {
 
-        String topicName = MessageQueueUtils.extractDataFromPayload(
-                request.getPayload(), Constants.MQ_TOPIC);
+        String topicName = request.getUriPathSegments().get(
+                request.getUriPathSegments().size() - 1);
 
         String type = new String();
 
@@ -253,8 +256,17 @@ public class MQBrokerResource extends Resource {
                     ResponseStatus.BAD_REQUEST);
         }
 
-        topicName = "/" + Constants.PREFIX_OIC + "/" + Constants.MQ_BROKER_URI
-                + "/" + topicName;
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append("/");
+
+        for (String uri : getUriPathSegments()) {
+            stringBuilder.append(uri);
+            stringBuilder.append("/");
+        }
+
+        stringBuilder.append(topicName);
+
+        topicName = stringBuilder.toString();
 
         if (mTopicManager.getTopic(topicName) != null) {
             // Topic already exists
@@ -269,9 +281,7 @@ public class MQBrokerResource extends Resource {
                     ResponseStatus.INTERNAL_SERVER_ERROR);
         }
 
-        return MessageBuilder.createResponse(request, ResponseStatus.CREATED,
-                ContentFormat.APPLICATION_CBOR, MessageQueueUtils.buildPayload(
-                        Constants.MQ_LOCATION, newTopic.getName()));
+        return MessageBuilder.createResponse(request, ResponseStatus.CREATED);
     }
 
     private IResponse removeMainTopic(IRequest request) {
@@ -280,13 +290,14 @@ public class MQBrokerResource extends Resource {
 
         Topic targetTopic = mTopicManager.getTopic(topicName);
 
-        // TODO check error
         if (targetTopic == null) {
             // Topic doesn't exist
             return MessageBuilder.createResponse(request,
                     ResponseStatus.BAD_REQUEST);
         }
 
+        targetTopic.cleanup();
+
         if (mTopicManager.removeTopic(targetTopic) == false) {
             return MessageBuilder.createResponse(request,
                     ResponseStatus.INTERNAL_SERVER_ERROR);
index 3b004c1..2e5016e 100644 (file)
@@ -30,8 +30,6 @@ import org.iotivity.cloud.base.protocols.IResponse;
 import org.iotivity.cloud.base.protocols.MessageBuilder;
 import org.iotivity.cloud.base.protocols.enums.ContentFormat;
 import org.iotivity.cloud.base.protocols.enums.ResponseStatus;
-import org.iotivity.cloud.mqserver.Constants;
-import org.iotivity.cloud.mqserver.MessageQueueUtils;
 import org.iotivity.cloud.mqserver.kafka.KafkaConsumerWrapper;
 import org.iotivity.cloud.mqserver.kafka.KafkaProducerWrapper;
 
@@ -87,8 +85,8 @@ public class Topic {
 
     public IResponse handleCreateSubtopic(IRequest request) {
 
-        String newTopicName = MessageQueueUtils.extractDataFromPayload(
-                request.getPayload(), Constants.MQ_TOPIC);
+        String newTopicName = request.getUriPathSegments().get(
+                request.getUriPathSegments().size() - 1);
 
         String newTopicType = new String();
 
@@ -117,26 +115,23 @@ public class Topic {
 
         mSubtopics.put(newTopicName, newTopic);
 
-        return MessageBuilder.createResponse(
-                request,
-                ResponseStatus.CREATED,
-                ContentFormat.APPLICATION_CBOR,
-                MessageQueueUtils.buildPayload(Constants.MQ_LOCATION,
-                        newTopic.getName()));
+        return MessageBuilder.createResponse(request, ResponseStatus.CREATED);
     }
 
     public IResponse handleRemoveSubtopic(IRequest request, String topicName) {
 
         Topic targetTopic = getSubtopic(topicName);
 
-        // TODO check error
         if (targetTopic == null) {
             // topic doesn't exist
             return MessageBuilder.createResponse(request,
                     ResponseStatus.BAD_REQUEST);
         }
 
+        targetTopic.cleanup();
+
         if (mTopicManager.removeTopic(targetTopic) == false) {
+
             return MessageBuilder.createResponse(request,
                     ResponseStatus.INTERNAL_SERVER_ERROR);
         }
@@ -224,6 +219,12 @@ public class Topic {
                 ContentFormat.APPLICATION_CBOR, mLatestData);
     }
 
+    public void cleanup() {
+
+        mKafkaProducerOperator.closeConnection();
+        mKafkaConsumerOperator.closeConnection();
+    }
+
     // callback from Kafka Consumer
     public void onMessagePublished(byte[] message) {
 
@@ -232,7 +233,6 @@ public class Topic {
         notifyPublishedMessage();
     }
 
-    // TODO check
     private Topic getSubtopic(String topicName) {
 
         if (mSubtopics.containsKey(topicName) == false) {
@@ -245,6 +245,7 @@ public class Topic {
     private void notifyPublishedMessage() {
         synchronized (mSubscribers) {
             for (TopicSubscriber subscriber : mSubscribers.values()) {
+
                 subscriber.mSubscriber.sendResponse(MessageBuilder
                         .createResponse(subscriber.mRequest,
                                 ResponseStatus.CONTENT,