From: Andriy Gudz Date: Wed, 7 Jun 2017 13:47:08 +0000 (+0300) Subject: Updated MQ server X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=5329c87c341430fb135621f0e7d34c418209a185;p=platform%2Fcore%2Fsecurity%2Fsuspicious-activity-monitor.git Updated MQ server --- diff --git a/cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/Constants.java b/cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/Constants.java index 2961ac2..ee86ed3 100644 --- a/cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/Constants.java +++ b/cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/Constants.java @@ -24,9 +24,9 @@ package org.iotivity.cloud.mqserver; import org.iotivity.cloud.base.OICConstants; public class Constants extends OICConstants { + public static final int DEFAULT_COAP_PORT = 5686; public static final String MQ_TOPICLIST = "topiclist"; - public static final String MQ_MESSAGE = "message"; // For Kafka public static final int KAFKA_SESSION_TIMEOUT = 10000; diff --git a/cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/MessageQueueServer.java b/cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/MessageQueueServer.java index 79e32b9..b3ce4a9 100644 --- a/cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/MessageQueueServer.java +++ b/cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/MessageQueueServer.java @@ -25,39 +25,44 @@ import java.net.InetSocketAddress; import java.util.Scanner; import org.iotivity.cloud.base.ServerSystem; +import org.iotivity.cloud.base.resource.CloudPingResource; import org.iotivity.cloud.base.server.CoapServer; import org.iotivity.cloud.mqserver.resources.MQBrokerResource; import org.iotivity.cloud.util.Log; public class MessageQueueServer { - public static void main(String[] args) throws Exception { - Log.Init(); + private static int coapServerPort; + private static boolean tlsMode; + private static String zookeeperHost; + private static String kafkaHost; + private static String webLogHost; + public static void main(String[] args) throws Exception { System.out.println("-----MQ SERVER-----"); + Log.Init(); - if (args.length != 6) { - Log.e("coap server port, Kafka_zookeeper_Address port" - + " and Kafka_broker_Address Port and TLS mode required\n" - + "ex) 5686 127.0.0.1 2181 127.0.0.1 9092 0\n"); - + if (!parseConfiguration(args)) { + Log.e("\nCoAP-server Zookeeper
Kafka
TLS-mode <0|1> are required. " + + "WebSocketLog-Server is optional.\n" + + "ex) " + Constants.DEFAULT_COAP_PORT + + " 127.0.0.1 2181 127.0.0.1 9092 0\n"); return; } + if (webLogHost != null) + Log.InitWebLog(webLogHost, + MessageQueueServer.class.getSimpleName().toString()); ServerSystem serverSystem = new ServerSystem(); MQBrokerResource MQBroker = new MQBrokerResource(); - - String kafka_zookeeper = args[1] + ":" + args[2]; - String kafka_broker = args[3] + ":" + args[4]; - MQBroker.setKafkaInformation(kafka_zookeeper, kafka_broker); + MQBroker.setKafkaInformation(zookeeperHost, kafkaHost); serverSystem.addResource(MQBroker); + serverSystem.addResource(new CloudPingResource()); - serverSystem.addServer(new CoapServer( - new InetSocketAddress(Integer.parseInt(args[0])))); - - boolean tlsMode = Integer.parseInt(args[5]) == 1; + serverSystem.addServer( + new CoapServer(new InetSocketAddress(coapServerPort))); serverSystem.startSystem(tlsMode); @@ -75,4 +80,29 @@ public class MessageQueueServer { System.out.println("Terminated"); } + + private static boolean parseConfiguration(String[] args) { + // configuration provided by arguments + if (args.length == 6 || args.length == 8) { + coapServerPort = Integer.parseInt(args[0]); + zookeeperHost = args[1] + ":" + args[2]; + kafkaHost = args[3] + ":" + args[4]; + tlsMode = Integer.parseInt(args[5]) == 1; + if (args.length == 8) + webLogHost = args[6] + ":" + args[7]; + return true; + } + // configuration provided by docker env + String tlsModeEnv = System.getenv("TLS_MODE"); + if (tlsModeEnv != null) { + coapServerPort = Constants.DEFAULT_COAP_PORT; + tlsMode = Integer.parseInt(tlsModeEnv) == 1; + zookeeperHost = System.getenv("ZOOKEEPER_ADDRESS") + ":" + + System.getenv("ZOOKEEPER_PORT"); + kafkaHost = System.getenv("KAFKA_ADDRESS") + ":" + + System.getenv("KAFKA_PORT"); + return true; + } + return false; + } } \ No newline at end of file diff --git a/cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/kafka/KafkaConsumerWrapper.java b/cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/kafka/KafkaConsumerWrapper.java index 73e1e50..1ff5224 100644 --- a/cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/kafka/KafkaConsumerWrapper.java +++ b/cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/kafka/KafkaConsumerWrapper.java @@ -30,6 +30,12 @@ import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.iotivity.cloud.mqserver.Constants; +import org.iotivity.cloud.mqserver.topic.Topic; +import org.iotivity.cloud.util.Log; + import kafka.admin.AdminUtils; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; @@ -48,12 +54,6 @@ import kafka.message.MessageAndOffset; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.ZkConnection; -import org.iotivity.cloud.mqserver.Constants; -import org.iotivity.cloud.mqserver.topic.Topic; -import org.iotivity.cloud.util.Log; - /** * * This class provides a set of APIs to use Kafka consumer APIs for receiving @@ -118,8 +118,8 @@ public class KafkaConsumerWrapper { } // remove consumer group info if already exist - List subscribers = mZkClient.getChildren(ZkUtils - .ConsumersPath()); + List subscribers = mZkClient + .getChildren(ZkUtils.ConsumersPath()); if (subscribers.contains(mTopicName)) { AdminUtils.deleteConsumerGroupInZK(mZkUtils, mTopicName); @@ -152,8 +152,8 @@ public class KafkaConsumerWrapper { for (final MessageAndMetadata messageAndMetadata : stream) { - mInternalConsumer.onMessagePublished(messageAndMetadata - .message()); + mInternalConsumer.onMessagePublished( + messageAndMetadata.message()); } } }); @@ -175,8 +175,8 @@ public class KafkaConsumerWrapper { Log.d("kafka unsubscribeTopic - " + mTopicName); // remove consumer group info in zookeeper - List subscribers = mZkClient.getChildren(ZkUtils - .ConsumersPath()); + List subscribers = mZkClient + .getChildren(ZkUtils.ConsumersPath()); if (subscribers.contains(mTopicName)) { AdminUtils.deleteConsumerGroupInZK(mZkUtils, mTopicName); @@ -213,8 +213,8 @@ public class KafkaConsumerWrapper { Log.d("kafka get all messages - " + mTopicName); String brokerHost = mBroker.substring(0, mBroker.indexOf(':')); - int brokerPort = Integer.parseInt(mBroker.substring(mBroker - .indexOf(':') + 1)); + int brokerPort = Integer + .parseInt(mBroker.substring(mBroker.indexOf(':') + 1)); Log.d("host " + brokerHost + ", port " + brokerPort); @@ -255,10 +255,12 @@ public class KafkaConsumerWrapper { lastOffset = messageAndOffset.nextOffset(); ByteBuffer payload = messageAndOffset.message().payload(); - byte[] bytes = new byte[payload.limit()]; - payload.get(bytes); + if (payload != null) { + byte[] bytes = new byte[payload.limit()]; + payload.get(bytes); - initialData.add(bytes); + initialData.add(bytes); + } } } @@ -289,8 +291,8 @@ public class KafkaConsumerWrapper { partition); Map requestInfo = new HashMap<>(); - requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo( - whichTime, 1)); + requestInfo.put(topicAndPartition, + new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(), diff --git a/cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/resources/MQBrokerResource.java b/cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/resources/MQBrokerResource.java index 4d07cdc..3710c5f 100644 --- a/cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/resources/MQBrokerResource.java +++ b/cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/resources/MQBrokerResource.java @@ -96,7 +96,8 @@ public class MQBrokerResource extends Resource { private IResponse handleGetRequest(Device srcDevice, IRequest request) { // DISCOVER - if (request.getUriPathSegments().size() == getUriPathSegments().size()) { + if (request.getUriPathSegments().size() == getUriPathSegments() + .size()) { return discoverTopic(request); } @@ -118,7 +119,8 @@ public class MQBrokerResource extends Resource { // CREATE topic private IResponse handlePutRequest(IRequest request) { - if (request.getUriPathSegments().size() == getUriPathSegments().size()) { + if (request.getUriPathSegments().size() == getUriPathSegments() + .size()) { throw new BadRequestException( "topic name is not included in request uri"); @@ -142,14 +144,19 @@ public class MQBrokerResource extends Resource { private IResponse createTopic(IRequest request) { // main topic creation request - if (request.getUriPathSegments().size() == getUriPathSegments().size() + 1) { + if (request.getUriPathSegments().size() == getUriPathSegments().size() + + 1) { return createMainTopic(request); } // subtopic creation request String uriPath = request.getUriPath(); - uriPath = uriPath.substring(0, uriPath.lastIndexOf('/')); + if (uriPath == null) { + throw new BadRequestException("uriPath is not invalid"); + } + + uriPath = uriPath.substring(0, uriPath.lastIndexOf('/')); Topic targetTopic = mTopicManager.getTopic(uriPath); if (targetTopic == null) { @@ -170,9 +177,13 @@ public class MQBrokerResource extends Resource { // subtopic removal request String uriPath = request.getUriPath(); + if (uriPath == null) { + throw new BadRequestException("uriPath is not invalid"); + } + String parentName = uriPath.substring(0, uriPath.lastIndexOf('/')); - String targetName = request.getUriPathSegments().get( - request.getUriPathSegments().size() - 1); + String targetName = request.getUriPathSegments() + .get(request.getUriPathSegments().size() - 1); Topic parentTopic = mTopicManager.getTopic(parentName); @@ -243,14 +254,14 @@ public class MQBrokerResource extends Resource { } return MessageBuilder.createResponse(request, ResponseStatus.CONTENT, - ContentFormat.APPLICATION_CBOR, MessageQueueUtils.buildPayload( - Constants.MQ_TOPICLIST, topicList)); + ContentFormat.APPLICATION_CBOR, MessageQueueUtils + .buildPayload(Constants.MQ_TOPICLIST, topicList)); } private IResponse createMainTopic(IRequest request) { - String topicName = request.getUriPathSegments().get( - request.getUriPathSegments().size() - 1); + String topicName = request.getUriPathSegments() + .get(request.getUriPathSegments().size() - 1); String type = new String(); @@ -282,7 +293,12 @@ public class MQBrokerResource extends Resource { IResponse response = MessageBuilder.createResponse(request, ResponseStatus.CREATED); - response.setLocationPath(request.getUriPath()); + String uriPath = request.getUriPath(); + if (uriPath == null) { + throw new InternalServerErrorException( + "uriPath is null in handleCreateSubtopic"); + } + response.setLocationPath(uriPath); return response; } diff --git a/cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/topic/Topic.java b/cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/topic/Topic.java index 44b3e22..63ac153 100644 --- a/cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/topic/Topic.java +++ b/cloud/messagequeue/src/main/java/org/iotivity/cloud/mqserver/topic/Topic.java @@ -34,7 +34,6 @@ import org.iotivity.cloud.base.protocols.IResponse; import org.iotivity.cloud.base.protocols.MessageBuilder; import org.iotivity.cloud.base.protocols.enums.ContentFormat; import org.iotivity.cloud.base.protocols.enums.ResponseStatus; -import org.iotivity.cloud.mqserver.Constants; import org.iotivity.cloud.mqserver.kafka.KafkaConsumerWrapper; import org.iotivity.cloud.mqserver.kafka.KafkaProducerWrapper; import org.iotivity.cloud.util.Cbor; @@ -54,7 +53,7 @@ public class Topic { private byte[] mLatestData = null; - private class TopicSubscriber { + private static class TopicSubscriber { TopicSubscriber(Device subscriber, IRequest request) { mSubscriber = subscriber; mRequest = request; @@ -88,8 +87,6 @@ public class Topic { kafka_broker, this); HashMap data = new HashMap<>(); - data.put(Constants.MQ_MESSAGE, null); - mLatestData = mCbor.encodingPayloadToCbor(data); } @@ -147,7 +144,12 @@ public class Topic { IResponse response = MessageBuilder.createResponse(request, ResponseStatus.CREATED); - response.setLocationPath(request.getUriPath()); + String uriPath = request.getUriPath(); + if (uriPath == null) { + throw new InternalServerErrorException( + "uriPath is null in handleCreateSubtopic"); + } + response.setLocationPath(uriPath); return response; } diff --git a/cloud/messagequeue/src/test/java/org/iotivity/cloud/mqserver/resources/MQBrokerResourceTest.java b/cloud/messagequeue/src/test/java/org/iotivity/cloud/mqserver/resources/MQBrokerResourceTest.java index ef8d1f4..24f4392 100644 --- a/cloud/messagequeue/src/test/java/org/iotivity/cloud/mqserver/resources/MQBrokerResourceTest.java +++ b/cloud/messagequeue/src/test/java/org/iotivity/cloud/mqserver/resources/MQBrokerResourceTest.java @@ -35,11 +35,6 @@ import java.util.HashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; -import kafka.admin.TopicCommand; -import kafka.admin.TopicCommand.TopicCommandOptions; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; - import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.iotivity.cloud.base.device.CoapDevice; @@ -64,6 +59,11 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import kafka.admin.TopicCommand; +import kafka.admin.TopicCommand.TopicCommandOptions; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + public class MQBrokerResourceTest { private final String MQ_BROKER_URI = Constants.MQ_BROKER_FULL_URI; @@ -180,9 +180,9 @@ public class MQBrokerResourceTest { Object[] args = invocation.getArguments(); CoapResponse resp = (CoapResponse) args[0]; // assertion: if the response status is "CONTENT" - // assertion: if the response payload has the "message" property + // assertion: if the response payload has data assertTrue(methodCheck(resp, ResponseStatus.CONTENT)); - assertTrue(hashmapCheck(resp, "message")); + assertTrue(payloadCheck(resp)); return resp; } }).when(mockSubscriber).sendResponse(Mockito.anyObject()); @@ -211,10 +211,9 @@ public class MQBrokerResourceTest { CoapResponse resp = (CoapResponse) args[0]; latchSubscriber.countDown(); if (latchSubscriber.getCount() == 0) { - // assertion: if the response payload has the "message" - // property + // assertion: if the response payload has data assertTrue(methodCheck(resp, ResponseStatus.CONTENT)); - assertTrue(hashmapCheck(resp, "message")); + assertTrue(payloadCheck(resp)); } return resp; } @@ -277,7 +276,7 @@ public class MQBrokerResourceTest { // assertion for subscriber if (latchSubscriber.getCount() == 0) { assertTrue(methodCheck(resp, ResponseStatus.CONTENT)); - assertTrue(hashmapCheck(resp, "message")); + assertTrue(payloadCheck(resp)); DeleteTopic(mMockDevice, topic); } @@ -337,9 +336,9 @@ public class MQBrokerResourceTest { // read topic ReadTopic(topic); // assertion1 : if the response status is "CONTENT" - // assertion2 : if the response payload has the "message" property + // assertion2 : if the response payload has data assertTrue(methodCheck(mResponse, ResponseStatus.CONTENT)); - assertTrue(hashmapCheck(mResponse, "message")); + assertTrue(payloadCheck(mResponse)); } @Test(expected = NotFoundException.class) @@ -523,16 +522,14 @@ public class MQBrokerResourceTest { private IRequest PublishTopicRequest(String topicName) { IRequest request = null; - HashMap tags = new HashMap(); HashMap message = new HashMap(); message.put("status", "on"); message.put("brightness", 20); - tags.put("message", message); Cbor> cbor = new Cbor>(); String uri = MQ_BROKER_URI + "/" + topicName; request = MessageBuilder.createRequest(RequestMethod.POST, uri, null, ContentFormat.APPLICATION_CBOR, - cbor.encodingPayloadToCbor(tags)); + cbor.encodingPayloadToCbor(message)); return request; } @@ -677,13 +674,14 @@ public class MQBrokerResourceTest { mMqBrokerResource.onDefaultRequestReceived(mMockDevice, readRequest); } - private boolean hashmapCheck(IResponse response, String propertyName) { + private boolean payloadCheck(IResponse response) { Cbor> mCbor = new Cbor<>(); HashMap payloadData = mCbor .parsePayloadFromCbor(response.getPayload(), HashMap.class); - if (payloadData.get(propertyName) != null) + if (payloadData != null && payloadData.containsKey("status") + && payloadData.containsKey("brightness")) { return true; - else + } else return false; }