import org.iotivity.cloud.base.OICConstants;
public class Constants extends OICConstants {
+ public static final int DEFAULT_COAP_PORT = 5686;
public static final String MQ_TOPICLIST = "topiclist";
- public static final String MQ_MESSAGE = "message";
// For Kafka
public static final int KAFKA_SESSION_TIMEOUT = 10000;
import java.util.Scanner;
import org.iotivity.cloud.base.ServerSystem;
+import org.iotivity.cloud.base.resource.CloudPingResource;
import org.iotivity.cloud.base.server.CoapServer;
import org.iotivity.cloud.mqserver.resources.MQBrokerResource;
import org.iotivity.cloud.util.Log;
public class MessageQueueServer {
- public static void main(String[] args) throws Exception {
- Log.Init();
+ private static int coapServerPort;
+ private static boolean tlsMode;
+ private static String zookeeperHost;
+ private static String kafkaHost;
+ private static String webLogHost;
+ public static void main(String[] args) throws Exception {
System.out.println("-----MQ SERVER-----");
+ Log.Init();
- if (args.length != 6) {
- Log.e("coap server port, Kafka_zookeeper_Address port"
- + " and Kafka_broker_Address Port and TLS mode required\n"
- + "ex) 5686 127.0.0.1 2181 127.0.0.1 9092 0\n");
-
+ if (!parseConfiguration(args)) {
+ Log.e("\nCoAP-server <Port> Zookeeper <Address> <Port> Kafka <Address> <Port> TLS-mode <0|1> are required. "
+ + "WebSocketLog-Server <Addres> <Port> is optional.\n"
+ + "ex) " + Constants.DEFAULT_COAP_PORT
+ + " 127.0.0.1 2181 127.0.0.1 9092 0\n");
return;
}
+ if (webLogHost != null)
+ Log.InitWebLog(webLogHost,
+ MessageQueueServer.class.getSimpleName().toString());
ServerSystem serverSystem = new ServerSystem();
MQBrokerResource MQBroker = new MQBrokerResource();
-
- String kafka_zookeeper = args[1] + ":" + args[2];
- String kafka_broker = args[3] + ":" + args[4];
- MQBroker.setKafkaInformation(kafka_zookeeper, kafka_broker);
+ MQBroker.setKafkaInformation(zookeeperHost, kafkaHost);
serverSystem.addResource(MQBroker);
+ serverSystem.addResource(new CloudPingResource());
- serverSystem.addServer(new CoapServer(
- new InetSocketAddress(Integer.parseInt(args[0]))));
-
- boolean tlsMode = Integer.parseInt(args[5]) == 1;
+ serverSystem.addServer(
+ new CoapServer(new InetSocketAddress(coapServerPort)));
serverSystem.startSystem(tlsMode);
System.out.println("Terminated");
}
+
+ private static boolean parseConfiguration(String[] args) {
+ // configuration provided by arguments
+ if (args.length == 6 || args.length == 8) {
+ coapServerPort = Integer.parseInt(args[0]);
+ zookeeperHost = args[1] + ":" + args[2];
+ kafkaHost = args[3] + ":" + args[4];
+ tlsMode = Integer.parseInt(args[5]) == 1;
+ if (args.length == 8)
+ webLogHost = args[6] + ":" + args[7];
+ return true;
+ }
+ // configuration provided by docker env
+ String tlsModeEnv = System.getenv("TLS_MODE");
+ if (tlsModeEnv != null) {
+ coapServerPort = Constants.DEFAULT_COAP_PORT;
+ tlsMode = Integer.parseInt(tlsModeEnv) == 1;
+ zookeeperHost = System.getenv("ZOOKEEPER_ADDRESS") + ":"
+ + System.getenv("ZOOKEEPER_PORT");
+ kafkaHost = System.getenv("KAFKA_ADDRESS") + ":"
+ + System.getenv("KAFKA_PORT");
+ return true;
+ }
+ return false;
+ }
}
\ No newline at end of file
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.iotivity.cloud.mqserver.Constants;
+import org.iotivity.cloud.mqserver.topic.Topic;
+import org.iotivity.cloud.util.Log;
+
import kafka.admin.AdminUtils;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
-import org.iotivity.cloud.mqserver.Constants;
-import org.iotivity.cloud.mqserver.topic.Topic;
-import org.iotivity.cloud.util.Log;
-
/**
*
* This class provides a set of APIs to use Kafka consumer APIs for receiving
}
// remove consumer group info if already exist
- List<String> subscribers = mZkClient.getChildren(ZkUtils
- .ConsumersPath());
+ List<String> subscribers = mZkClient
+ .getChildren(ZkUtils.ConsumersPath());
if (subscribers.contains(mTopicName)) {
AdminUtils.deleteConsumerGroupInZK(mZkUtils, mTopicName);
for (final MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
- mInternalConsumer.onMessagePublished(messageAndMetadata
- .message());
+ mInternalConsumer.onMessagePublished(
+ messageAndMetadata.message());
}
}
});
Log.d("kafka unsubscribeTopic - " + mTopicName);
// remove consumer group info in zookeeper
- List<String> subscribers = mZkClient.getChildren(ZkUtils
- .ConsumersPath());
+ List<String> subscribers = mZkClient
+ .getChildren(ZkUtils.ConsumersPath());
if (subscribers.contains(mTopicName)) {
AdminUtils.deleteConsumerGroupInZK(mZkUtils, mTopicName);
Log.d("kafka get all messages - " + mTopicName);
String brokerHost = mBroker.substring(0, mBroker.indexOf(':'));
- int brokerPort = Integer.parseInt(mBroker.substring(mBroker
- .indexOf(':') + 1));
+ int brokerPort = Integer
+ .parseInt(mBroker.substring(mBroker.indexOf(':') + 1));
Log.d("host " + brokerHost + ", port " + brokerPort);
lastOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
- byte[] bytes = new byte[payload.limit()];
- payload.get(bytes);
+ if (payload != null) {
+ byte[] bytes = new byte[payload.limit()];
+ payload.get(bytes);
- initialData.add(bytes);
+ initialData.add(bytes);
+ }
}
}
partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
- requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
- whichTime, 1));
+ requestInfo.put(topicAndPartition,
+ new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
private IResponse handleGetRequest(Device srcDevice, IRequest request) {
// DISCOVER
- if (request.getUriPathSegments().size() == getUriPathSegments().size()) {
+ if (request.getUriPathSegments().size() == getUriPathSegments()
+ .size()) {
return discoverTopic(request);
}
// CREATE topic
private IResponse handlePutRequest(IRequest request) {
- if (request.getUriPathSegments().size() == getUriPathSegments().size()) {
+ if (request.getUriPathSegments().size() == getUriPathSegments()
+ .size()) {
throw new BadRequestException(
"topic name is not included in request uri");
private IResponse createTopic(IRequest request) {
// main topic creation request
- if (request.getUriPathSegments().size() == getUriPathSegments().size() + 1) {
+ if (request.getUriPathSegments().size() == getUriPathSegments().size()
+ + 1) {
return createMainTopic(request);
}
// subtopic creation request
String uriPath = request.getUriPath();
- uriPath = uriPath.substring(0, uriPath.lastIndexOf('/'));
+ if (uriPath == null) {
+ throw new BadRequestException("uriPath is not invalid");
+ }
+
+ uriPath = uriPath.substring(0, uriPath.lastIndexOf('/'));
Topic targetTopic = mTopicManager.getTopic(uriPath);
if (targetTopic == null) {
// subtopic removal request
String uriPath = request.getUriPath();
+ if (uriPath == null) {
+ throw new BadRequestException("uriPath is not invalid");
+ }
+
String parentName = uriPath.substring(0, uriPath.lastIndexOf('/'));
- String targetName = request.getUriPathSegments().get(
- request.getUriPathSegments().size() - 1);
+ String targetName = request.getUriPathSegments()
+ .get(request.getUriPathSegments().size() - 1);
Topic parentTopic = mTopicManager.getTopic(parentName);
}
return MessageBuilder.createResponse(request, ResponseStatus.CONTENT,
- ContentFormat.APPLICATION_CBOR, MessageQueueUtils.buildPayload(
- Constants.MQ_TOPICLIST, topicList));
+ ContentFormat.APPLICATION_CBOR, MessageQueueUtils
+ .buildPayload(Constants.MQ_TOPICLIST, topicList));
}
private IResponse createMainTopic(IRequest request) {
- String topicName = request.getUriPathSegments().get(
- request.getUriPathSegments().size() - 1);
+ String topicName = request.getUriPathSegments()
+ .get(request.getUriPathSegments().size() - 1);
String type = new String();
IResponse response = MessageBuilder.createResponse(request,
ResponseStatus.CREATED);
- response.setLocationPath(request.getUriPath());
+ String uriPath = request.getUriPath();
+ if (uriPath == null) {
+ throw new InternalServerErrorException(
+ "uriPath is null in handleCreateSubtopic");
+ }
+ response.setLocationPath(uriPath);
return response;
}
import org.iotivity.cloud.base.protocols.MessageBuilder;
import org.iotivity.cloud.base.protocols.enums.ContentFormat;
import org.iotivity.cloud.base.protocols.enums.ResponseStatus;
-import org.iotivity.cloud.mqserver.Constants;
import org.iotivity.cloud.mqserver.kafka.KafkaConsumerWrapper;
import org.iotivity.cloud.mqserver.kafka.KafkaProducerWrapper;
import org.iotivity.cloud.util.Cbor;
private byte[] mLatestData = null;
- private class TopicSubscriber {
+ private static class TopicSubscriber {
TopicSubscriber(Device subscriber, IRequest request) {
mSubscriber = subscriber;
mRequest = request;
kafka_broker, this);
HashMap<String, Object> data = new HashMap<>();
- data.put(Constants.MQ_MESSAGE, null);
-
mLatestData = mCbor.encodingPayloadToCbor(data);
}
IResponse response = MessageBuilder.createResponse(request,
ResponseStatus.CREATED);
- response.setLocationPath(request.getUriPath());
+ String uriPath = request.getUriPath();
+ if (uriPath == null) {
+ throw new InternalServerErrorException(
+ "uriPath is null in handleCreateSubtopic");
+ }
+ response.setLocationPath(uriPath);
return response;
}
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
-import kafka.admin.TopicCommand;
-import kafka.admin.TopicCommand.TopicCommandOptions;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.iotivity.cloud.base.device.CoapDevice;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import kafka.admin.TopicCommand;
+import kafka.admin.TopicCommand.TopicCommandOptions;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
public class MQBrokerResourceTest {
private final String MQ_BROKER_URI = Constants.MQ_BROKER_FULL_URI;
Object[] args = invocation.getArguments();
CoapResponse resp = (CoapResponse) args[0];
// assertion: if the response status is "CONTENT"
- // assertion: if the response payload has the "message" property
+ // assertion: if the response payload has data
assertTrue(methodCheck(resp, ResponseStatus.CONTENT));
- assertTrue(hashmapCheck(resp, "message"));
+ assertTrue(payloadCheck(resp));
return resp;
}
}).when(mockSubscriber).sendResponse(Mockito.anyObject());
CoapResponse resp = (CoapResponse) args[0];
latchSubscriber.countDown();
if (latchSubscriber.getCount() == 0) {
- // assertion: if the response payload has the "message"
- // property
+ // assertion: if the response payload has data
assertTrue(methodCheck(resp, ResponseStatus.CONTENT));
- assertTrue(hashmapCheck(resp, "message"));
+ assertTrue(payloadCheck(resp));
}
return resp;
}
// assertion for subscriber
if (latchSubscriber.getCount() == 0) {
assertTrue(methodCheck(resp, ResponseStatus.CONTENT));
- assertTrue(hashmapCheck(resp, "message"));
+ assertTrue(payloadCheck(resp));
DeleteTopic(mMockDevice, topic);
}
// read topic
ReadTopic(topic);
// assertion1 : if the response status is "CONTENT"
- // assertion2 : if the response payload has the "message" property
+ // assertion2 : if the response payload has data
assertTrue(methodCheck(mResponse, ResponseStatus.CONTENT));
- assertTrue(hashmapCheck(mResponse, "message"));
+ assertTrue(payloadCheck(mResponse));
}
@Test(expected = NotFoundException.class)
private IRequest PublishTopicRequest(String topicName) {
IRequest request = null;
- HashMap<String, Object> tags = new HashMap<String, Object>();
HashMap<String, Object> message = new HashMap<String, Object>();
message.put("status", "on");
message.put("brightness", 20);
- tags.put("message", message);
Cbor<HashMap<String, Object>> cbor = new Cbor<HashMap<String, Object>>();
String uri = MQ_BROKER_URI + "/" + topicName;
request = MessageBuilder.createRequest(RequestMethod.POST, uri, null,
ContentFormat.APPLICATION_CBOR,
- cbor.encodingPayloadToCbor(tags));
+ cbor.encodingPayloadToCbor(message));
return request;
}
mMqBrokerResource.onDefaultRequestReceived(mMockDevice, readRequest);
}
- private boolean hashmapCheck(IResponse response, String propertyName) {
+ private boolean payloadCheck(IResponse response) {
Cbor<HashMap<String, Object>> mCbor = new Cbor<>();
HashMap<String, Object> payloadData = mCbor
.parsePayloadFromCbor(response.getPayload(), HashMap.class);
- if (payloadData.get(propertyName) != null)
+ if (payloadData != null && payloadData.containsKey("status")
+ && payloadData.containsKey("brightness")) {
return true;
- else
+ } else
return false;
}