import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Random;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import kafka.utils.ZkUtils;
/**
- * The type MQ listener api.
- *
+ *
+ * The type Mq listener api.
+ *
* @author Mail to: <A HREF="mailto:y.zozulia@samsung.com">Yevhen Zozulia</A>
* @version 1.0
- * @file MQListenerAPI.java
- * @brief The type MQ listener api.
+ * @file UserDSMActionTest
+ * @brief User registration scenario: create new user, login, logout, login as
+ * user, delete user if not admin, login as admin, delete user if admin
* @date Created : 9/12/2017
* @date Modified : 9/12/2017
* @copyright In Samsung Ukraine R&D Center (SRK under a contract between)
* @par LLC "Samsung Electronics Co", Ltd (Seoul, Republic of Korea)
* @par Copyright : (c) Samsung Electronics Co, Ltd 2017. All rights reserved.
+ *
*/
public final class MQListenerAPI {
private static final Logger LOG = Logger.getLogger(MQListenerAPI.class);
private static final int WAITING_TIMEOUT = 5000;
- private static final String KAFKA_HOST = AppConfigInit.getEnv().getProperty("kafkaHost");
- private static final String ZOOKEEPER_HOST = AppConfigInit.getEnv().getProperty("zookeperHost");
+ private static final String KAFKA_HOST = "106.125.46.139:9092";
+ private static final String ZOOKEEPER_HOST = "106.125.46.139:2181";
/**
* Create consumer and subscribe to topic
* @param topicName the topic name
* @return consumer instance
*/
- public static KafkaConsumer<byte[], byte[]> getConsumerInstance(String topicName) {
- try {
- ZkClient zkClient = new ZkClient(ZOOKEEPER_HOST);
- ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(ZOOKEEPER_HOST), false);
- if (!AdminUtils.topicExists(zkUtils, topicName)) {
- AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties());
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
+ public static Properties getConsumerProperties(String topicName) {
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_HOST);
- Random random = new Random();
- props.put("group.id", topicName + random.nextInt());
+ props.put("group.id", topicName);
props.put("auto.offset.reset", "earliest");
- final int interval = 1000;
+ final int interval = 5000;
props.put("auto.commit.interval.ms", interval);
props.put("key.deserializer", ByteArrayDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
- KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList(topicName));
- LOG.info("Subscribe to " + topicName);
- return consumer;
+ return props;
+ }
+
+ public static void createTopic(String topicName) {
+ ZkClient zkClient = new ZkClient(ZOOKEEPER_HOST);
+ ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(ZOOKEEPER_HOST), false);
+ if (!AdminUtils.topicExists(zkUtils, topicName)) {
+ AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties());
+ }
}
/**
* @param consumer the consumer
* @return messages messages
*/
- public static List<byte[]> getMessages(KafkaConsumer<byte[], byte[]> consumer) {
+ public static List<byte[]> getMessages(String topicName) {
LOG.info("Get messages");
+ createTopic(topicName);
+ KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(getConsumerProperties(topicName));
+ consumer.subscribe(Arrays.asList(topicName));
List<byte[]> list = new ArrayList<>();
ConsumerRecords<byte[], byte[]> records = consumer.poll(WAITING_TIMEOUT);
for (ConsumerRecord<byte[], byte[]> record : records) {
LOG.info(record.offset() + ": " + new String(record.value()));
list.add(record.value());
}
- return list;
- }
-
- /**
- * Close consumer connection
- *
- * @param consumer the consumer
- */
- public static void closeConnection(KafkaConsumer<byte[], byte[]> consumer) {
LOG.info("Close connection");
- consumer.unsubscribe();
consumer.close();
+ return list;
}
/**
+ *
* Parse message
*
* @param data the data
private MQListenerAPI() {
}
+
}
package com.samsung.ci.test.mq;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import com.samsung.ci.basic.api.MQListenerAPI;
}
// CHECKSTYLE_OFF:Magic Number
- private void checkPolicy(KafkaConsumer<byte[], byte[]> consumer, String policy) {
+ private void checkPolicy(String policy) {
List<byte[]> policyList = new ArrayList<>();
int attemptCounts = 5;
while (policyList.isEmpty() && attemptCounts >= 0) {
- policyList = MQListenerAPI.getMessages(consumer);
+ policyList = MQListenerAPI.getMessages(topic);
attemptCounts--;
}
if (!policyList.isEmpty()) {
Thread.sleep(TIMEOUT);
log("Check default policy");
- KafkaConsumer<byte[], byte[]> consumer = MQListenerAPI.getConsumerInstance(topic);
- assertNotNull(consumer);
Thread.sleep(TIMEOUT);
- checkPolicy(consumer, policy);
+ checkPolicy(policy);
log("Change default policy");
String changedPolicy = readChangedPolicy();
MQPublisherAPI.sendPolisyToDSM(tvDevice.getUuid(), null, tvDevice.getParentUuid(), changedPolicy);
Thread.sleep(TIMEOUT);
- checkPolicy(consumer, changedPolicy);
+ checkPolicy(changedPolicy);
assertTrue(MQPublisherAPI.deviceUnRegistrationInDSM(tvDevice.getUuid(), DEVICE_UNREGISTER_ACTION));
Thread.sleep(TIMEOUT);
- MQListenerAPI.closeConnection(consumer);
deleteIoTUser(user.getCloudId());
}
Thread.sleep(TIMEOUT);
log("Check default policy");
- KafkaConsumer<byte[], byte[]> consumer = MQListenerAPI.getConsumerInstance(topic);
- assertNotNull(consumer);
Thread.sleep(TIMEOUT);
- checkPolicy(consumer, policy);
+ checkPolicy(policy);
log("Change default policy");
String changedPolicy = readPhoneChangedPolicy();
MQPublisherAPI.sendPolisyToDSM(phoneDevice.getUuid(), null, phoneDevice.getParentUuid(), changedPolicy);
Thread.sleep(TIMEOUT);
- checkPolicy(consumer, changedPolicy);
+ checkPolicy(changedPolicy);
assertTrue(MQPublisherAPI.deviceUnRegistrationInDSM(phoneDevice.getUuid(), DEVICE_UNREGISTER_ACTION));
Thread.sleep(TIMEOUT);
- MQListenerAPI.closeConnection(consumer);
deleteIoTUser(user.getCloudId());
}
import java.util.List;
import java.util.Map;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
}
// CHECKSTYLE_OFF:Magic Number
- private void checkNotification(KafkaConsumer<byte[], byte[]> consumer) {
+ private void checkNotification() {
List<byte[]> notificationList = new ArrayList<>();
int attemptCounts = 5;
while (notificationList.isEmpty() && attemptCounts >= 0) {
- notificationList = MQListenerAPI.getMessages(consumer);
+ notificationList = MQListenerAPI.getMessages(topic);
attemptCounts--;
}
if (!notificationList.isEmpty()) {
Thread.sleep(TIMEOUT);
Thread.sleep(TIMEOUT);
// Test reports
- KafkaConsumer<byte[], byte[]> consumer = MQListenerAPI.getConsumerInstance(topic);
- assertNotNull(consumer);
// SMACK
String report = readReport(SMACK_REPORT_VS_SMACK_TEST);
assertTrue(MQPublisherAPI.sendReportToDSM(report, device.getUuid()));
Thread.sleep(TIMEOUT);
Thread.sleep(TIMEOUT);
checkReports();
- checkNotification(consumer);
+ checkNotification();
assertTrue(MQPublisherAPI.deviceUnRegistrationInDSM(device.getUuid(), DEVICE_UNREGISTER_ACTION));
Thread.sleep(TIMEOUT);
- MQListenerAPI.closeConnection(consumer);
deleteIoTUser(user.getCloudId());
}
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import com.google.gson.Gson;
}
// CHECKSTYLE_OFF:Magic Number
- private void checkNotification(KafkaConsumer<byte[], byte[]> consumer) {
+ private void checkNotification() {
List<byte[]> notificationList = new ArrayList<>();
int attemptCounts = 5;
while (notificationList.isEmpty() && attemptCounts >= 0) {
- notificationList = MQListenerAPI.getMessages(consumer);
+ notificationList = MQListenerAPI.getMessages(topic);
attemptCounts--;
}
if (!notificationList.isEmpty()) {
registerDevice();
Thread.sleep(TIMEOUT);
// Test reports
- KafkaConsumer<byte[], byte[]> consumer = MQListenerAPI.getConsumerInstance(topic);
- assertNotNull(consumer);
// SYSCALL
String report = readReport(SYSCALL_REPORT);
assertTrue(MQPublisherAPI.sendReportToDSM(report, device.getUuid()));
Thread.sleep(TIMEOUT);
Thread.sleep(TIMEOUT);
checkReports();
- checkNotification(consumer);
+ checkNotification();
assertTrue(MQPublisherAPI.deviceUnRegistrationInDSM(device.getUuid(), DEVICE_UNREGISTER_ACTION));
Thread.sleep(TIMEOUT);
- MQListenerAPI.closeConnection(consumer);
deleteIoTUser(user.getCloudId());
Rest.removeRules(rules);
}