Updated MQ server
authorAndriy Gudz <a.gudz@samsung.com>
Wed, 7 Jun 2017 13:47:08 +0000 (16:47 +0300)
committerAndriy Gudz <a.gudz@samsung.com>
Wed, 7 Jun 2017 13:47:08 +0000 (16:47 +0300)
cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/Constants.java
cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/MessageQueueServer.java
cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/kafka/KafkaConsumerWrapper.java
cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/resources/MQBrokerResource.java
cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/topic/Topic.java
cloud/messagequeue/src/test/java/org/iotivity/cloud/mqserver/resources/MQBrokerResourceTest.java

index 2961ac2..ee86ed3 100644 (file)
@@ -24,9 +24,9 @@ package org.iotivity.cloud.mqserver;
 import org.iotivity.cloud.base.OICConstants;
 
 public class Constants extends OICConstants {
+       public static final int    DEFAULT_COAP_PORT       = 5686;
 
     public static final String MQ_TOPICLIST            = "topiclist";
-    public static final String MQ_MESSAGE              = "message";
 
     // For Kafka
     public static final int    KAFKA_SESSION_TIMEOUT   = 10000;
index 79e32b9..b3ce4a9 100644 (file)
@@ -25,39 +25,44 @@ import java.net.InetSocketAddress;
 import java.util.Scanner;
 
 import org.iotivity.cloud.base.ServerSystem;
+import org.iotivity.cloud.base.resource.CloudPingResource;
 import org.iotivity.cloud.base.server.CoapServer;
 import org.iotivity.cloud.mqserver.resources.MQBrokerResource;
 import org.iotivity.cloud.util.Log;
 
 public class MessageQueueServer {
 
-    public static void main(String[] args) throws Exception {
-        Log.Init();
+    private static int     coapServerPort;
+    private static boolean tlsMode;
+    private static String  zookeeperHost;
+    private static String  kafkaHost;
+    private static String  webLogHost;
 
+    public static void main(String[] args) throws Exception {
         System.out.println("-----MQ SERVER-----");
+        Log.Init();
 
-        if (args.length != 6) {
-            Log.e("coap server port, Kafka_zookeeper_Address port"
-                    + " and Kafka_broker_Address Port and TLS mode required\n"
-                    + "ex) 5686 127.0.0.1 2181 127.0.0.1 9092 0\n");
-
+        if (!parseConfiguration(args)) {
+            Log.e("\nCoAP-server <Port> Zookeeper <Address> <Port> Kafka <Address> <Port> TLS-mode <0|1> are required. "
+                    + "WebSocketLog-Server <Addres> <Port> is optional.\n"
+                    + "ex) " + Constants.DEFAULT_COAP_PORT
+                    + " 127.0.0.1 2181 127.0.0.1 9092 0\n");
             return;
         }
+        if (webLogHost != null)
+            Log.InitWebLog(webLogHost,
+                    MessageQueueServer.class.getSimpleName().toString());
 
         ServerSystem serverSystem = new ServerSystem();
 
         MQBrokerResource MQBroker = new MQBrokerResource();
-
-        String kafka_zookeeper = args[1] + ":" + args[2];
-        String kafka_broker = args[3] + ":" + args[4];
-        MQBroker.setKafkaInformation(kafka_zookeeper, kafka_broker);
+        MQBroker.setKafkaInformation(zookeeperHost, kafkaHost);
 
         serverSystem.addResource(MQBroker);
+        serverSystem.addResource(new CloudPingResource());
 
-        serverSystem.addServer(new CoapServer(
-                new InetSocketAddress(Integer.parseInt(args[0]))));
-
-        boolean tlsMode = Integer.parseInt(args[5]) == 1;
+        serverSystem.addServer(
+                new CoapServer(new InetSocketAddress(coapServerPort)));
 
         serverSystem.startSystem(tlsMode);
 
@@ -75,4 +80,29 @@ public class MessageQueueServer {
 
         System.out.println("Terminated");
     }
+
+    private static boolean parseConfiguration(String[] args) {
+        // configuration provided by arguments
+        if (args.length == 6 || args.length == 8) {
+            coapServerPort = Integer.parseInt(args[0]);
+            zookeeperHost = args[1] + ":" + args[2];
+            kafkaHost = args[3] + ":" + args[4];
+            tlsMode = Integer.parseInt(args[5]) == 1;
+            if (args.length == 8)
+                webLogHost = args[6] + ":" + args[7];
+            return true;
+        }
+        // configuration provided by docker env
+        String tlsModeEnv = System.getenv("TLS_MODE");
+        if (tlsModeEnv != null) {
+            coapServerPort = Constants.DEFAULT_COAP_PORT;
+            tlsMode = Integer.parseInt(tlsModeEnv) == 1;
+            zookeeperHost = System.getenv("ZOOKEEPER_ADDRESS") + ":"
+                    + System.getenv("ZOOKEEPER_PORT");
+            kafkaHost = System.getenv("KAFKA_ADDRESS") + ":"
+                    + System.getenv("KAFKA_PORT");
+            return true;
+        }
+        return false;
+    }
 }
\ No newline at end of file
index 73e1e50..1ff5224 100644 (file)
@@ -30,6 +30,12 @@ import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.iotivity.cloud.mqserver.Constants;
+import org.iotivity.cloud.mqserver.topic.Topic;
+import org.iotivity.cloud.util.Log;
+
 import kafka.admin.AdminUtils;
 import kafka.api.FetchRequest;
 import kafka.api.FetchRequestBuilder;
@@ -48,12 +54,6 @@ import kafka.message.MessageAndOffset;
 import kafka.utils.ZKStringSerializer$;
 import kafka.utils.ZkUtils;
 
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
-import org.iotivity.cloud.mqserver.Constants;
-import org.iotivity.cloud.mqserver.topic.Topic;
-import org.iotivity.cloud.util.Log;
-
 /**
  *
  * This class provides a set of APIs to use Kafka consumer APIs for receiving
@@ -118,8 +118,8 @@ public class KafkaConsumerWrapper {
         }
 
         // remove consumer group info if already exist
-        List<String> subscribers = mZkClient.getChildren(ZkUtils
-                .ConsumersPath());
+        List<String> subscribers = mZkClient
+                .getChildren(ZkUtils.ConsumersPath());
 
         if (subscribers.contains(mTopicName)) {
             AdminUtils.deleteConsumerGroupInZK(mZkUtils, mTopicName);
@@ -152,8 +152,8 @@ public class KafkaConsumerWrapper {
 
                     for (final MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
 
-                        mInternalConsumer.onMessagePublished(messageAndMetadata
-                                .message());
+                        mInternalConsumer.onMessagePublished(
+                                messageAndMetadata.message());
                     }
                 }
             });
@@ -175,8 +175,8 @@ public class KafkaConsumerWrapper {
         Log.d("kafka unsubscribeTopic - " + mTopicName);
 
         // remove consumer group info in zookeeper
-        List<String> subscribers = mZkClient.getChildren(ZkUtils
-                .ConsumersPath());
+        List<String> subscribers = mZkClient
+                .getChildren(ZkUtils.ConsumersPath());
 
         if (subscribers.contains(mTopicName)) {
             AdminUtils.deleteConsumerGroupInZK(mZkUtils, mTopicName);
@@ -213,8 +213,8 @@ public class KafkaConsumerWrapper {
         Log.d("kafka get all messages - " + mTopicName);
 
         String brokerHost = mBroker.substring(0, mBroker.indexOf(':'));
-        int brokerPort = Integer.parseInt(mBroker.substring(mBroker
-                .indexOf(':') + 1));
+        int brokerPort = Integer
+                .parseInt(mBroker.substring(mBroker.indexOf(':') + 1));
 
         Log.d("host " + brokerHost + ", port " + brokerPort);
 
@@ -255,10 +255,12 @@ public class KafkaConsumerWrapper {
                 lastOffset = messageAndOffset.nextOffset();
                 ByteBuffer payload = messageAndOffset.message().payload();
 
-                byte[] bytes = new byte[payload.limit()];
-                payload.get(bytes);
+                if (payload != null) {
+                    byte[] bytes = new byte[payload.limit()];
+                    payload.get(bytes);
 
-                initialData.add(bytes);
+                    initialData.add(bytes);
+                }
             }
         }
 
@@ -289,8 +291,8 @@ public class KafkaConsumerWrapper {
                 partition);
 
         Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
-        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
-                whichTime, 1));
+        requestInfo.put(topicAndPartition,
+                new PartitionOffsetRequestInfo(whichTime, 1));
 
         kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                 requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
index 4d07cdc..3710c5f 100644 (file)
@@ -96,7 +96,8 @@ public class MQBrokerResource extends Resource {
     private IResponse handleGetRequest(Device srcDevice, IRequest request) {
 
         // DISCOVER
-        if (request.getUriPathSegments().size() == getUriPathSegments().size()) {
+        if (request.getUriPathSegments().size() == getUriPathSegments()
+                .size()) {
             return discoverTopic(request);
         }
 
@@ -118,7 +119,8 @@ public class MQBrokerResource extends Resource {
     // CREATE topic
     private IResponse handlePutRequest(IRequest request) {
 
-        if (request.getUriPathSegments().size() == getUriPathSegments().size()) {
+        if (request.getUriPathSegments().size() == getUriPathSegments()
+                .size()) {
 
             throw new BadRequestException(
                     "topic name is not included in request uri");
@@ -142,14 +144,19 @@ public class MQBrokerResource extends Resource {
     private IResponse createTopic(IRequest request) {
 
         // main topic creation request
-        if (request.getUriPathSegments().size() == getUriPathSegments().size() + 1) {
+        if (request.getUriPathSegments().size() == getUriPathSegments().size()
+                + 1) {
             return createMainTopic(request);
         }
 
         // subtopic creation request
         String uriPath = request.getUriPath();
-        uriPath = uriPath.substring(0, uriPath.lastIndexOf('/'));
 
+        if (uriPath == null) {
+            throw new BadRequestException("uriPath is not invalid");
+        }
+
+        uriPath = uriPath.substring(0, uriPath.lastIndexOf('/'));
         Topic targetTopic = mTopicManager.getTopic(uriPath);
 
         if (targetTopic == null) {
@@ -170,9 +177,13 @@ public class MQBrokerResource extends Resource {
         // subtopic removal request
         String uriPath = request.getUriPath();
 
+        if (uriPath == null) {
+            throw new BadRequestException("uriPath is not invalid");
+        }
+
         String parentName = uriPath.substring(0, uriPath.lastIndexOf('/'));
-        String targetName = request.getUriPathSegments().get(
-                request.getUriPathSegments().size() - 1);
+        String targetName = request.getUriPathSegments()
+                .get(request.getUriPathSegments().size() - 1);
 
         Topic parentTopic = mTopicManager.getTopic(parentName);
 
@@ -243,14 +254,14 @@ public class MQBrokerResource extends Resource {
         }
 
         return MessageBuilder.createResponse(request, ResponseStatus.CONTENT,
-                ContentFormat.APPLICATION_CBOR, MessageQueueUtils.buildPayload(
-                        Constants.MQ_TOPICLIST, topicList));
+                ContentFormat.APPLICATION_CBOR, MessageQueueUtils
+                        .buildPayload(Constants.MQ_TOPICLIST, topicList));
     }
 
     private IResponse createMainTopic(IRequest request) {
 
-        String topicName = request.getUriPathSegments().get(
-                request.getUriPathSegments().size() - 1);
+        String topicName = request.getUriPathSegments()
+                .get(request.getUriPathSegments().size() - 1);
 
         String type = new String();
 
@@ -282,7 +293,12 @@ public class MQBrokerResource extends Resource {
 
         IResponse response = MessageBuilder.createResponse(request,
                 ResponseStatus.CREATED);
-        response.setLocationPath(request.getUriPath());
+        String uriPath = request.getUriPath();
+        if (uriPath == null) {
+            throw new InternalServerErrorException(
+                    "uriPath is null in handleCreateSubtopic");
+        }
+        response.setLocationPath(uriPath);
         return response;
     }
 
index 44b3e22..63ac153 100644 (file)
@@ -34,7 +34,6 @@ import org.iotivity.cloud.base.protocols.IResponse;
 import org.iotivity.cloud.base.protocols.MessageBuilder;
 import org.iotivity.cloud.base.protocols.enums.ContentFormat;
 import org.iotivity.cloud.base.protocols.enums.ResponseStatus;
-import org.iotivity.cloud.mqserver.Constants;
 import org.iotivity.cloud.mqserver.kafka.KafkaConsumerWrapper;
 import org.iotivity.cloud.mqserver.kafka.KafkaProducerWrapper;
 import org.iotivity.cloud.util.Cbor;
@@ -54,7 +53,7 @@ public class Topic {
 
     private byte[]                 mLatestData   = null;
 
-    private class TopicSubscriber {
+    private static class TopicSubscriber {
         TopicSubscriber(Device subscriber, IRequest request) {
             mSubscriber = subscriber;
             mRequest = request;
@@ -88,8 +87,6 @@ public class Topic {
                 kafka_broker, this);
 
         HashMap<String, Object> data = new HashMap<>();
-        data.put(Constants.MQ_MESSAGE, null);
-
         mLatestData = mCbor.encodingPayloadToCbor(data);
     }
 
@@ -147,7 +144,12 @@ public class Topic {
 
         IResponse response = MessageBuilder.createResponse(request,
                 ResponseStatus.CREATED);
-        response.setLocationPath(request.getUriPath());
+        String uriPath = request.getUriPath();
+        if (uriPath == null) {
+            throw new InternalServerErrorException(
+                    "uriPath is null in handleCreateSubtopic");
+        }
+        response.setLocationPath(uriPath);
         return response;
     }
 
index ef8d1f4..24f4392 100644 (file)
@@ -35,11 +35,6 @@ import java.util.HashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import kafka.admin.TopicCommand;
-import kafka.admin.TopicCommand.TopicCommandOptions;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
 import org.iotivity.cloud.base.device.CoapDevice;
@@ -64,6 +59,11 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import kafka.admin.TopicCommand;
+import kafka.admin.TopicCommand.TopicCommandOptions;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
 public class MQBrokerResourceTest {
     private final String     MQ_BROKER_URI     = Constants.MQ_BROKER_FULL_URI;
 
@@ -180,9 +180,9 @@ public class MQBrokerResourceTest {
                 Object[] args = invocation.getArguments();
                 CoapResponse resp = (CoapResponse) args[0];
                 // assertion: if the response status is "CONTENT"
-                // assertion: if the response payload has the "message" property
+                // assertion: if the response payload has data
                 assertTrue(methodCheck(resp, ResponseStatus.CONTENT));
-                assertTrue(hashmapCheck(resp, "message"));
+                assertTrue(payloadCheck(resp));
                 return resp;
             }
         }).when(mockSubscriber).sendResponse(Mockito.anyObject());
@@ -211,10 +211,9 @@ public class MQBrokerResourceTest {
                 CoapResponse resp = (CoapResponse) args[0];
                 latchSubscriber.countDown();
                 if (latchSubscriber.getCount() == 0) {
-                    // assertion: if the response payload has the "message"
-                    // property
+                    // assertion: if the response payload has data
                     assertTrue(methodCheck(resp, ResponseStatus.CONTENT));
-                    assertTrue(hashmapCheck(resp, "message"));
+                    assertTrue(payloadCheck(resp));
                 }
                 return resp;
             }
@@ -277,7 +276,7 @@ public class MQBrokerResourceTest {
                 // assertion for subscriber
                 if (latchSubscriber.getCount() == 0) {
                     assertTrue(methodCheck(resp, ResponseStatus.CONTENT));
-                    assertTrue(hashmapCheck(resp, "message"));
+                    assertTrue(payloadCheck(resp));
 
                     DeleteTopic(mMockDevice, topic);
                 }
@@ -337,9 +336,9 @@ public class MQBrokerResourceTest {
         // read topic
         ReadTopic(topic);
         // assertion1 : if the response status is "CONTENT"
-        // assertion2 : if the response payload has the "message" property
+        // assertion2 : if the response payload has data
         assertTrue(methodCheck(mResponse, ResponseStatus.CONTENT));
-        assertTrue(hashmapCheck(mResponse, "message"));
+        assertTrue(payloadCheck(mResponse));
     }
 
     @Test(expected = NotFoundException.class)
@@ -523,16 +522,14 @@ public class MQBrokerResourceTest {
 
     private IRequest PublishTopicRequest(String topicName) {
         IRequest request = null;
-        HashMap<String, Object> tags = new HashMap<String, Object>();
         HashMap<String, Object> message = new HashMap<String, Object>();
         message.put("status", "on");
         message.put("brightness", 20);
-        tags.put("message", message);
         Cbor<HashMap<String, Object>> cbor = new Cbor<HashMap<String, Object>>();
         String uri = MQ_BROKER_URI + "/" + topicName;
         request = MessageBuilder.createRequest(RequestMethod.POST, uri, null,
                 ContentFormat.APPLICATION_CBOR,
-                cbor.encodingPayloadToCbor(tags));
+                cbor.encodingPayloadToCbor(message));
         return request;
     }
 
@@ -677,13 +674,14 @@ public class MQBrokerResourceTest {
         mMqBrokerResource.onDefaultRequestReceived(mMockDevice, readRequest);
     }
 
-    private boolean hashmapCheck(IResponse response, String propertyName) {
+    private boolean payloadCheck(IResponse response) {
         Cbor<HashMap<String, Object>> mCbor = new Cbor<>();
         HashMap<String, Object> payloadData = mCbor
                 .parsePayloadFromCbor(response.getPayload(), HashMap.class);
-        if (payloadData.get(propertyName) != null)
+        if (payloadData != null && payloadData.containsKey("status")
+                && payloadData.containsKey("brightness")) {
             return true;
-        else
+        else
             return false;
     }