[SECIOTSRK-697] Refactoring MQ listener
authorYevhen Zozulia <y.zozulia@surc.local>
Tue, 21 Nov 2017 11:18:35 +0000 (13:18 +0200)
committerYevhen Zozulia <y.zozulia@surc.local>
Tue, 21 Nov 2017 11:18:35 +0000 (13:18 +0200)
servers/api-integration-tests/src/main/java/com/samsung/ci/basic/api/MQListenerAPI.java
servers/api-integration-tests/src/test/java/com/samsung/ci/test/mq/PolicyWitoutCloudTest.java
servers/api-integration-tests/src/test/java/com/samsung/ci/test/mq/ReportsWithOutCloudTest.java
servers/api-integration-tests/src/test/java/com/samsung/ci/test/reports/RestRuleReportsTest.java

index bc5240a..df4271c 100644 (file)
@@ -10,7 +10,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Random;
 
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
@@ -26,24 +25,27 @@ import kafka.admin.AdminUtils;
 import kafka.utils.ZkUtils;
 
 /**
- * The type MQ listener api.
- *
+ * 
+ * The type Mq listener api.
+ * 
  * @author Mail to: <A HREF="mailto:y.zozulia@samsung.com">Yevhen Zozulia</A>
  * @version 1.0
- * @file MQListenerAPI.java
- * @brief The type MQ listener api.
+ * @file UserDSMActionTest
+ * @brief User registration scenario: create new user, login, logout, login as
+ *        user, delete user if not admin, login as admin, delete user if admin
  * @date Created : 9/12/2017
  * @date Modified : 9/12/2017
  * @copyright In Samsung Ukraine R&D Center (SRK under a contract between)
  * @par LLC "Samsung Electronics Co", Ltd (Seoul, Republic of Korea)
  * @par Copyright : (c) Samsung Electronics Co, Ltd 2017. All rights reserved.
+ * 
  */
 public final class MQListenerAPI {
 
     private static final Logger LOG = Logger.getLogger(MQListenerAPI.class);
     private static final int WAITING_TIMEOUT = 5000;
-    private static final String KAFKA_HOST = AppConfigInit.getEnv().getProperty("kafkaHost");
-    private static final String ZOOKEEPER_HOST = AppConfigInit.getEnv().getProperty("zookeperHost");
+    private static final String KAFKA_HOST = "106.125.46.139:9092";
+    private static final String ZOOKEEPER_HOST = "106.125.46.139:2181";
 
     /**
      * Create consumer and subscribe to topic
@@ -51,29 +53,24 @@ public final class MQListenerAPI {
      * @param topicName the topic name
      * @return consumer instance
      */
-    public static KafkaConsumer<byte[], byte[]> getConsumerInstance(String topicName) {
-        try {
-            ZkClient zkClient = new ZkClient(ZOOKEEPER_HOST);
-            ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(ZOOKEEPER_HOST), false);
-            if (!AdminUtils.topicExists(zkUtils, topicName)) {
-                AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties());
-            }
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-        }
+    public static Properties getConsumerProperties(String topicName) {
         Properties props = new Properties();
         props.put("bootstrap.servers", KAFKA_HOST);
-        Random random = new Random();
-        props.put("group.id", topicName + random.nextInt());
+        props.put("group.id", topicName);
         props.put("auto.offset.reset", "earliest");
-        final int interval = 1000;
+        final int interval = 5000;
         props.put("auto.commit.interval.ms", interval);
         props.put("key.deserializer", ByteArrayDeserializer.class.getName());
         props.put("value.deserializer", ByteArrayDeserializer.class.getName());
-        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
-        consumer.subscribe(Arrays.asList(topicName));
-        LOG.info("Subscribe to " + topicName);
-        return consumer;
+        return props;
+    }
+
+    public static void createTopic(String topicName) {
+        ZkClient zkClient = new ZkClient(ZOOKEEPER_HOST);
+        ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(ZOOKEEPER_HOST), false);
+        if (!AdminUtils.topicExists(zkUtils, topicName)) {
+            AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties());
+        }
     }
 
     /**
@@ -82,29 +79,24 @@ public final class MQListenerAPI {
      * @param consumer the consumer
      * @return messages messages
      */
-    public static List<byte[]> getMessages(KafkaConsumer<byte[], byte[]> consumer) {
+    public static List<byte[]> getMessages(String topicName) {
         LOG.info("Get messages");
+        createTopic(topicName);
+        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(getConsumerProperties(topicName));
+        consumer.subscribe(Arrays.asList(topicName));
         List<byte[]> list = new ArrayList<>();
         ConsumerRecords<byte[], byte[]> records = consumer.poll(WAITING_TIMEOUT);
         for (ConsumerRecord<byte[], byte[]> record : records) {
             LOG.info(record.offset() + ": " + new String(record.value()));
             list.add(record.value());
         }
-        return list;
-    }
-
-    /**
-     * Close consumer connection
-     *
-     * @param consumer the consumer
-     */
-    public static void closeConnection(KafkaConsumer<byte[], byte[]> consumer) {
         LOG.info("Close connection");
-        consumer.unsubscribe();
         consumer.close();
+        return list;
     }
 
     /**
+     * 
      * Parse message
      *
      * @param data the data
@@ -118,4 +110,5 @@ public final class MQListenerAPI {
 
     private MQListenerAPI() {
     }
+
 }
index f40d5d7..9d3bbee 100644 (file)
@@ -6,14 +6,12 @@
 package com.samsung.ci.test.mq;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import com.samsung.ci.basic.api.MQListenerAPI;
@@ -136,11 +134,11 @@ public class PolicyWitoutCloudTest extends BasicUserAction {
     }
 
     // CHECKSTYLE_OFF:Magic Number
-    private void checkPolicy(KafkaConsumer<byte[], byte[]> consumer, String policy) {
+    private void checkPolicy(String policy) {
         List<byte[]> policyList = new ArrayList<>();
         int attemptCounts = 5;
         while (policyList.isEmpty() && attemptCounts >= 0) {
-            policyList = MQListenerAPI.getMessages(consumer);
+            policyList = MQListenerAPI.getMessages(topic);
             attemptCounts--;
         }
         if (!policyList.isEmpty()) {
@@ -195,10 +193,8 @@ public class PolicyWitoutCloudTest extends BasicUserAction {
         Thread.sleep(TIMEOUT);
 
         log("Check default policy");
-        KafkaConsumer<byte[], byte[]> consumer = MQListenerAPI.getConsumerInstance(topic);
-        assertNotNull(consumer);
         Thread.sleep(TIMEOUT);
-        checkPolicy(consumer, policy);
+        checkPolicy(policy);
 
         log("Change default policy");
         String changedPolicy = readChangedPolicy();
@@ -206,11 +202,10 @@ public class PolicyWitoutCloudTest extends BasicUserAction {
         MQPublisherAPI.sendPolisyToDSM(tvDevice.getUuid(), null, tvDevice.getParentUuid(), changedPolicy);
 
         Thread.sleep(TIMEOUT);
-        checkPolicy(consumer, changedPolicy);
+        checkPolicy(changedPolicy);
 
         assertTrue(MQPublisherAPI.deviceUnRegistrationInDSM(tvDevice.getUuid(), DEVICE_UNREGISTER_ACTION));
         Thread.sleep(TIMEOUT);
-        MQListenerAPI.closeConnection(consumer);
         deleteIoTUser(user.getCloudId());
     }
 
@@ -253,10 +248,8 @@ public class PolicyWitoutCloudTest extends BasicUserAction {
         Thread.sleep(TIMEOUT);
 
         log("Check default policy");
-        KafkaConsumer<byte[], byte[]> consumer = MQListenerAPI.getConsumerInstance(topic);
-        assertNotNull(consumer);
         Thread.sleep(TIMEOUT);
-        checkPolicy(consumer, policy);
+        checkPolicy(policy);
 
         log("Change default policy");
         String changedPolicy = readPhoneChangedPolicy();
@@ -264,11 +257,10 @@ public class PolicyWitoutCloudTest extends BasicUserAction {
         MQPublisherAPI.sendPolisyToDSM(phoneDevice.getUuid(), null, phoneDevice.getParentUuid(), changedPolicy);
 
         Thread.sleep(TIMEOUT);
-        checkPolicy(consumer, changedPolicy);
+        checkPolicy(changedPolicy);
 
         assertTrue(MQPublisherAPI.deviceUnRegistrationInDSM(phoneDevice.getUuid(), DEVICE_UNREGISTER_ACTION));
         Thread.sleep(TIMEOUT);
-        MQListenerAPI.closeConnection(consumer);
         deleteIoTUser(user.getCloudId());
     }
 
index c394ff0..c0e6a5c 100644 (file)
@@ -14,7 +14,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
@@ -142,11 +141,11 @@ public class ReportsWithOutCloudTest extends BasicUserAction {
     }
 
     // CHECKSTYLE_OFF:Magic Number
-    private void checkNotification(KafkaConsumer<byte[], byte[]> consumer) {
+    private void checkNotification() {
         List<byte[]> notificationList = new ArrayList<>();
         int attemptCounts = 5;
         while (notificationList.isEmpty() && attemptCounts >= 0) {
-            notificationList = MQListenerAPI.getMessages(consumer);
+            notificationList = MQListenerAPI.getMessages(topic);
             attemptCounts--;
         }
         if (!notificationList.isEmpty()) {
@@ -259,8 +258,6 @@ public class ReportsWithOutCloudTest extends BasicUserAction {
         Thread.sleep(TIMEOUT);
         Thread.sleep(TIMEOUT);
         // Test reports
-        KafkaConsumer<byte[], byte[]> consumer = MQListenerAPI.getConsumerInstance(topic);
-        assertNotNull(consumer);
         // SMACK
         String report = readReport(SMACK_REPORT_VS_SMACK_TEST);
         assertTrue(MQPublisherAPI.sendReportToDSM(report, device.getUuid()));
@@ -277,10 +274,9 @@ public class ReportsWithOutCloudTest extends BasicUserAction {
         Thread.sleep(TIMEOUT);
         Thread.sleep(TIMEOUT);
         checkReports();
-        checkNotification(consumer);
+        checkNotification();
         assertTrue(MQPublisherAPI.deviceUnRegistrationInDSM(device.getUuid(), DEVICE_UNREGISTER_ACTION));
         Thread.sleep(TIMEOUT);
-        MQListenerAPI.closeConnection(consumer);
         deleteIoTUser(user.getCloudId());
     }
 
index 6da0996..1b7612a 100644 (file)
@@ -12,7 +12,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import com.google.gson.Gson;
@@ -130,11 +129,11 @@ public class RestRuleReportsTest extends BasicUserAction {
     }
 
     // CHECKSTYLE_OFF:Magic Number
-    private void checkNotification(KafkaConsumer<byte[], byte[]> consumer) {
+    private void checkNotification() {
         List<byte[]> notificationList = new ArrayList<>();
         int attemptCounts = 5;
         while (notificationList.isEmpty() && attemptCounts >= 0) {
-            notificationList = MQListenerAPI.getMessages(consumer);
+            notificationList = MQListenerAPI.getMessages(topic);
             attemptCounts--;
         }
         if (!notificationList.isEmpty()) {
@@ -176,8 +175,6 @@ public class RestRuleReportsTest extends BasicUserAction {
         registerDevice();
         Thread.sleep(TIMEOUT);
         // Test reports
-        KafkaConsumer<byte[], byte[]> consumer = MQListenerAPI.getConsumerInstance(topic);
-        assertNotNull(consumer);
         // SYSCALL
         String report = readReport(SYSCALL_REPORT);
         assertTrue(MQPublisherAPI.sendReportToDSM(report, device.getUuid()));
@@ -185,10 +182,9 @@ public class RestRuleReportsTest extends BasicUserAction {
         Thread.sleep(TIMEOUT);
         Thread.sleep(TIMEOUT);
         checkReports();
-        checkNotification(consumer);
+        checkNotification();
         assertTrue(MQPublisherAPI.deviceUnRegistrationInDSM(device.getUuid(), DEVICE_UNREGISTER_ACTION));
         Thread.sleep(TIMEOUT);
-        MQListenerAPI.closeConnection(consumer);
         deleteIoTUser(user.getCloudId());
         Rest.removeRules(rules);
     }