Update notifications and tests
authorYevhen Zozulia <y.zozulia@surc.local>
Fri, 1 Sep 2017 17:25:51 +0000 (20:25 +0300)
committerYevhen Zozulia <y.zozulia@surc.local>
Fri, 1 Sep 2017 17:25:51 +0000 (20:25 +0300)
16 files changed:
servers/dsm/src/main/java/com/samsung/dsm/model/restapi/RestDevice.java
servers/dsm/src/main/java/com/samsung/dsm/report/analyzer/impl/MACReportAnalyzer.java
servers/dsm/src/main/java/com/samsung/dsm/rest/report/ReportApi.java
servers/dsm/src/test/java/com/samsung/dsm/rest/DeviceApiTest.java
servers/dsm/src/test/java/com/samsung/dsm/rest/PolicyApiTest.java
servers/mq/src/main/java/com/samsung/servermq/iotivity/KafkaConsumerWrapper.java
servers/mq/src/main/java/com/samsung/servermq/iotivity/MessageQueueListener.java
servers/mq/src/main/java/com/samsung/servermq/rest/DSMRestController.java
servers/mq/src/main/java/com/samsung/servermq/utils/rest/ServerSenderImpl.java
servers/mq/src/test/java/com/samsung/servermq/iotivity/KafkaConsumerWrapperTest.java [new file with mode: 0644]
servers/mq/src/test/java/com/samsung/servermq/iotivity/KafkaErrorMessagesTest.java [new file with mode: 0644]
servers/mq/src/test/java/com/samsung/servermq/iotivity/MessageQueueListenerTest.java [new file with mode: 0644]
servers/mq/src/test/java/com/samsung/servermq/iotivity/MessageQueuePublisherTest.java [new file with mode: 0644]
servers/mq/src/test/java/com/samsung/servermq/utils/CborTest.java [new file with mode: 0644]
servers/mq/src/test/java/com/samsung/servermq/utils/MessageQueueUtilsTest.java [new file with mode: 0644]
servers/mq/src/test/java/com/samsung/servermq/utils/rest/ServerSenderImplTest.java [new file with mode: 0644]

index af62303..3489f80 100644 (file)
@@ -42,6 +42,14 @@ public class RestDevice implements RestEntity {
         return status;
     }
 
+    public String getParentUuid() {
+        return parentUuid;
+    }
+
+    public void setParentUuid(String parentUuid) {
+        this.parentUuid = parentUuid;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
@@ -63,6 +71,7 @@ public class RestDevice implements RestEntity {
         result = 31 * result + (name != null ? name.hashCode() : 0);
         result = 31 * result + (type != null ? type.hashCode() : 0);
         result = 31 * result + (model != null ? model.hashCode() : 0);
+        result = 31 * result + (parentUuid != null ? parentUuid.hashCode() : 0);
         result = 31 * result + status;
         return result;
     }
index ca2dc59..0b77745 100644 (file)
@@ -201,7 +201,9 @@ public class MACReportAnalyzer implements ReportAnalyzer {
         Map<String, String> mapResult = new HashMap<>(); 
         String parentCommand = map.get(ReportAnalyzerKeys.MAC_REPORT_PPID_COMM);
         String command = map.get(ReportAnalyzerKeys.MAC_REPORT_COMM);
+        String parentUuid = map.get(ReportAnalyzerKeys.DEVICE_PARENT_UUID);
         mapResult.put(ReportAnalyzerKeys.MAC_REPORT_APPNAME, command);
+        mapResult.put(ReportAnalyzerKeys.DEVICE_PARENT_UUID, parentUuid);
         ///////////////////////////////////////////////////// FIXME Stub for demos/////////////////////////////////////////////////////
         boolean find = false;
         if(parentCommand.equalsIgnoreCase("smack_test") || command.equalsIgnoreCase("smack_test")){
@@ -209,6 +211,7 @@ public class MACReportAnalyzer implements ReportAnalyzer {
             notification.setMessage(getNotTrustedMessage(parentCommand, command));
             notification.setCode(CODE_NOT_GRANTED);
             notification.setCurrentTime(System.currentTimeMillis());
+            notification.setAdditionalMap(mapResult);
             log.debug(notification.getMessage());
             return notification;
         }
index 74efb9c..3eb229c 100644 (file)
@@ -7,6 +7,7 @@ package com.samsung.dsm.rest.report;
 
 import static java.lang.String.format;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.StringJoiner;
@@ -93,9 +94,6 @@ public class ReportApi {
         Device device = report.getDevice();
         IotCloudUser iotCloudUser = iotCloudService.findByUser(device.getUser());
         Notification notification = new Notification();
-//        notification.setCloudUserId(iotCloudUser.getUuid());
-//        notification.setDeviceId(device.getUuid());
-//        notification.setPolicy(device.getPolicy().getPolicy());
         if (getReportType(report).equalsIgnoreCase(MACReportAnalyzer.MAC_REPORT)) {
             MACReportAnalyzer analyzer = new MACReportAnalyzer();
             Map<String, String> map = analyzer.extractReportData(report.getReport());
@@ -104,57 +102,14 @@ public class ReportApi {
             map.put(ReportAnalyzerKeys.POLICY, device.getPolicy().getPolicy());
             map.put(ReportAnalyzerKeys.DEVICE_PARENT_UUID, device.getParentUuid());
             analyzer.createNotification(map, grantedApplicationService);
-//            analyzer.analyze(map, grantedApplicationService);
             notification = analyzer.createNotification(map, grantedApplicationService); 
-//            notification.setCode(analyzer.getCode());
-//            notification.setTitle(analyzer.getTitle());
-//            notification.setMessage(analyzer.getMessage());
-//            notification.setCurrentTime(System.currentTimeMillis());
-//            
-//            ////////////////////////////////////////////////////////////////FIXME Stub for demo
-//            String parentCommand = map.get(ReportAnalyzerKeys.MAC_REPORT_PPID_COMM);
-//            String command = map.get(ReportAnalyzerKeys.MAC_REPORT_COMM);
-//            String ppid = map.get(ReportAnalyzerKeys.MAC_REPORT_PPID);
-//            String pid = map.get(ReportAnalyzerKeys.MAC_REPORT_PID);
-//            if(parentCommand.equalsIgnoreCase("smack_test") || command.equalsIgnoreCase("smack_test")){
-//                Map<String, String> stubMap = new HashMap<>();
-//                if(parentCommand.equalsIgnoreCase("smack_test")){
-//                    stubMap.put(ReportAnalyzerKeys.MAC_REPORT_PID, ppid);
-//                    stubMap.put(ReportAnalyzerKeys.MAC_REPORT_APPNAME, parentCommand);
-//                } else {
-//                    stubMap.put(ReportAnalyzerKeys.MAC_REPORT_PID, pid);
-//                    stubMap.put(ReportAnalyzerKeys.MAC_REPORT_APPNAME, command);
-//                }
-//                notification.setAdditionalMap(stubMap);
-//            ///////////////////////////////////////////////////////////////////////////
-//            } 
-//            boolean find = false;
-//            Set<String> keys = map.keySet();
-//            for(String key: keys){
-//                String value = map.get(key);
-//                log.info("(key = " + key + ",  value = " + value+ ")");
-//                if(value.contains("smack_test")){
-//                    log.info("Find in key = " + key + ",  value = " + value);
-//                    find = true;
-//                }
-//            }
-//            if(find){
-//                Map<String, String> stubMap = new HashMap<>();
-//                stubMap.put(ReportAnalyzerKeys.MAC_REPORT_PID, pid);
-//                String subject = map.get(ReportAnalyzerKeys.MAC_REPORT_SUBJECT);
-//                String[] arr = subject.split(":");
-//                log.info("=============================    subject          ==========================================");
-//                log.info("=============================  Array:  "  + Arrays.toString(arr)+ "          ==========================================");
-//                stubMap.put(ReportAnalyzerKeys.MAC_REPORT_APPNAME, arr[arr.length - 1]);
-//                notification.setCode("2");//Not Granted 
-//                notification.setAdditionalMap(stubMap);
-//            } else {
-//                notification.setAdditionalMap(map);
-//            }
         } else {
+            Map<String, String> mapResult = new HashMap<>(); 
             notification.setMessage("Not implemented");
             notification.setTitle("Not implemented");
             notification.setCode("0");
+            mapResult.put(ReportAnalyzerKeys.MAC_REPORT_APPNAME, "Not implemented");
+            mapResult.put(ReportAnalyzerKeys.DEVICE_PARENT_UUID, "Not implemented");
         }
         log.info("=======================================================================");
         mqSender.sendNotification(notification);
index 914bca2..f17570d 100644 (file)
@@ -13,7 +13,6 @@ import java.util.Set;
 
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -104,8 +103,7 @@ public class DeviceApiTest {
     public void deviceByUUIDOk() throws Exception {
         ResultActions result = mockMvc.perform(get(ROOT_URL + "/device/1234")).andExpect(status().isOk());
         String content = result.andReturn().getResponse().getContentAsString();
-
-        assertTrue(JsonUtils.equalJson(content, "{\"uuid\":\"1234\",\"name\":\"s8\",\"type\":\"phone\",\"model\":\"samsung s8 edge\",\"status\":1}"));
+        assertTrue(JsonUtils.equalJson(content, "{\"uuid\":\"1234\",\"name\":\"s8\",\"type\":\"phone\",\"model\":\"samsung s8 edge\",\"status\":1,\"parentUuid\":\"parentUuid\"}"));
     }
 
     @Test
@@ -118,9 +116,9 @@ public class DeviceApiTest {
         ResultActions result = mockMvc.perform(get(ROOT_URL + "/device/1234/byCloudUserUuid")).andExpect(status().isOk());
         String content = result.andReturn().getResponse().getContentAsString();
         assertTrue(JsonUtils.equalJsonArrays(content,
-                "[{\"uuid\":\"1234\",\"name\":\"s8\",\"type\":\"phone\",\"model\":\"samsung s8 edge\",\"status\":1}," +
-                        "{\"uuid\":\"1235\",\"name\":\"s4\",\"type\":\"phone\",\"model\":\"samsung s7 edge\",\"status\":1}," +
-                        "{\"uuid\":\"1236\",\"name\":\"airconditioner samsung\",\"type\":\"airconditioner\",\"model\":\"AR12JSFSRWK\",\"status\":1}]"));
+                "[{\"uuid\":\"1234\",\"name\":\"s8\",\"type\":\"phone\",\"model\":\"samsung s8 edge\",\"status\":1,\"parentUuid\":\"parentUuid\"}," +
+                        "{\"uuid\":\"1235\",\"name\":\"s4\",\"type\":\"phone\",\"model\":\"samsung s7 edge\",\"status\":1,\"parentUuid\":\"parentUuid\"}," +
+                        "{\"uuid\":\"1236\",\"name\":\"airconditioner samsung\",\"type\":\"airconditioner\",\"model\":\"AR12JSFSRWK\",\"status\":1,\"parentUuid\":\"parentUuid\"}]"));
     }
 
     @Test
@@ -153,21 +151,19 @@ public class DeviceApiTest {
     }
 
     @Test
-    @Ignore
     public void findPolicyByUuidOk() throws Exception {
         ResultActions result = mockMvc.perform(get(ROOT_URL + "/device/1234/policy")).andExpect(status().isOk());
         String content = result.andReturn().getResponse().getContentAsString();
 
-        assertEquals("[\t{\t\t\"group\": \"tv-extension\",\t\t\"policies\": [\t\t\t{\t\t\t\t\"name\": \"usb\",\t\t\t\t\"state\": 0,\t\t\t\t\"items\": []\t\t\t},\t\t\t{\t\t\t\t\"name\": \"screen-capture\",\t\t\t\t\"state\": 1,\t\t\t\t\"items\": []\t\t\t},\t\t\t{\t\t\t\t\"name\": \"bluetooth\",\t\t\t\t\"state\": 1,\t\t\t\t\"items\": []\t\t\t},\t\t\t{\t\t\t\t\"name\": \"iptables\",\t\t\t\t\"state\": -1,\t\t\t\t\"items\": [\t\t\t\t\t\"127.0.0.0/24|UDP|10-1024\",\t\t\t\t\t\"1.1.1.1|TCP|80,443\",\t\t\t\t\t\"8.8.8.8\"\t\t\t\t]\t\t\t}\t\t]\t}]",content);
+        assertEquals("[{\"group\": \"tv-extension\",\"policies\": [{\"name\": \"sound\",\"state\": 1,\"items\": []},{\"name\": \"bluetooth\",\"state\": 1,\"items\": []},{\"name\": \"wifi\",\"state\": 1,\"items\": []},{\"name\": \"usb\",\"state\": 1,\"items\": []},{\"name\": \"dtv-tunner\",\"state\": 1,\"items\": []},{\"name\": \"iptables\",\"state\": -1,\"items\": []}]}]",content);
     }
 
     @Test
-    @Ignore
     public void findPolicyByUuidForRemovedDeviceOk() throws Exception {
         ResultActions result = mockMvc.perform(get(ROOT_URL + "/device/1237/policy")).andExpect(status().isOk());
         String content = result.andReturn().getResponse().getContentAsString();
 
-        assertEquals("[\t{\t\t\"group\": \"tv-extension\",\t\t\"policies\": [\t\t\t{\t\t\t\t\"name\": \"usb\",\t\t\t\t\"state\": 0,\t\t\t\t\"items\": []\t\t\t},\t\t\t{\t\t\t\t\"name\": \"screen-capture\",\t\t\t\t\"state\": 1,\t\t\t\t\"items\": []\t\t\t},\t\t\t{\t\t\t\t\"name\": \"bluetooth\",\t\t\t\t\"state\": 1,\t\t\t\t\"items\": []\t\t\t},\t\t\t{\t\t\t\t\"name\": \"iptables\",\t\t\t\t\"state\": -1,\t\t\t\t\"items\": [\t\t\t\t\t\"127.0.0.0/24|UDP|10-1024\",\t\t\t\t\t\"1.1.1.1|TCP|80,443\",\t\t\t\t\t\"8.8.8.8\"\t\t\t\t]\t\t\t}\t\t]\t}]",content);
+        assertEquals("[{\"group\": \"tv-extension\",\"policies\": [{\"name\": \"sound\",\"state\": 1,\"items\": []},{\"name\": \"bluetooth\",\"state\": 1,\"items\": []},{\"name\": \"wifi\",\"state\": 1,\"items\": []},{\"name\": \"usb\",\"state\": 1,\"items\": []},{\"name\": \"dtv-tunner\",\"state\": 1,\"items\": []},{\"name\": \"iptables\",\"state\": -1,\"items\": []}]}]",content);
     }
 
     @Test
index 6012da1..546d825 100644 (file)
@@ -9,6 +9,7 @@ import com.samsung.commons.service.UserService;
 import com.samsung.dsm.utils.JsonUtils;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -81,7 +82,7 @@ public class PolicyApiTest {
         ResultActions result = mockMvc.perform(get(ROOT_URL + "/device/1234")).andExpect(status().isOk());
         String content = result.andReturn().getResponse().getContentAsString();
 
-        assertTrue(JsonUtils.equalJson(content, "{\"uuid\":\"1234\",\"name\":\"s8\",\"type\":\"phone\",\"model\":\"samsung s8 edge\",\"status\":1}"));
+        assertTrue(JsonUtils.equalJson(content, "{\"uuid\":\"1234\",\"name\":\"s8\",\"type\":\"phone\",\"model\":\"samsung s8 edge\",\"status\":1,\"parentUuid\":\"parentUuid\"}"));
     }
 
     @Test
index 8c775a8..53cd66c 100644 (file)
@@ -65,6 +65,9 @@ public class KafkaConsumerWrapper {
 
     private boolean mConsumerStarted = false;
 
+    KafkaConsumerWrapper(){
+    }
+    
     public KafkaConsumerWrapper(String zookeeperAddress, String brokerAddress, Topic consumer) {
 
         mTopicName = consumer.getName().replace("/", ".");
@@ -104,8 +107,7 @@ public class KafkaConsumerWrapper {
                 AdminUtils.deleteConsumerGroupInZK(mZkUtils, mTopicName);
             }
 
-            ConsumerConfig consumerConfig = new ConsumerConfig(buildPropertiesForSubscribe());
-            mConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
+            createConsumerConnect();
             Map<String, Integer> topicCountMap = new HashMap<>();
             topicCountMap.put(mTopicName, Constants.KAFKA_CONSUMMER_THREADS);
 
@@ -137,6 +139,11 @@ public class KafkaConsumerWrapper {
         return mConsumerStarted;
     }
 
+    protected void createConsumerConnect() {
+        ConsumerConfig consumerConfig = new ConsumerConfig(buildPropertiesForSubscribe());
+        mConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
+    }
+
     /**
      * API to unsubscribe Kafka topic to stop receiving messages
      * 
@@ -190,7 +197,7 @@ public class KafkaConsumerWrapper {
         log.trace("host " + brokerHost + ", port " + brokerPort);
 
         // TODO check options - Timeout: Int, bufferSize: Int
-        SimpleConsumer simpleConsumer = new SimpleConsumer(brokerHost, brokerPort, TIME_OUT, BUFFER_SIZE, mTopicName);
+        SimpleConsumer simpleConsumer = createSimpleConsumer(brokerHost, brokerPort);
 
         if (lastOffset < 0) {
             lastOffset = getLastOffset(simpleConsumer, mTopicName, PARTITION, kafka.api.OffsetRequest.EarliestTime(), mTopicName);
@@ -210,6 +217,11 @@ public class KafkaConsumerWrapper {
         return extractMessages(simpleConsumer, fetchResponse);
     }
 
+    protected SimpleConsumer createSimpleConsumer(String brokerHost, int brokerPort) {
+        SimpleConsumer simpleConsumer = new SimpleConsumer(brokerHost, brokerPort, TIME_OUT, BUFFER_SIZE, mTopicName);
+        return simpleConsumer;
+    }
+
     private List<byte[]> extractMessages(SimpleConsumer simpleConsumer, FetchResponse fetchResponse) {
         ArrayList<byte[]> initialData = new ArrayList<>();
 
@@ -243,7 +255,7 @@ public class KafkaConsumerWrapper {
         return initialData;
     }
 
-    private void createTopic() {
+    protected void createTopic() {
         KafkaCommonWrapper kafkaCommonWrapper = new KafkaCommonWrapper(mZookeeper, mBroker);
         if (kafkaCommonWrapper.createTopic(mTopicName)) {
             getMessages();
@@ -275,4 +287,33 @@ public class KafkaConsumerWrapper {
         return offsets[0];
     }
 
+    //Need for tests
+    void setmZookeeper(String mZookeeper) {
+        this.mZookeeper = mZookeeper;
+    }
+
+    void setmBroker(String mBroker) {
+        this.mBroker = mBroker;
+    }
+
+    void setmZkClient(ZkClient mZkClient) {
+        this.mZkClient = mZkClient;
+    }
+
+    void setmZkUtils(ZkUtils mZkUtils) {
+        this.mZkUtils = mZkUtils;
+    }
+
+    void setmConsumerConnector(ConsumerConnector mConsumerConnector) {
+        this.mConsumerConnector = mConsumerConnector;
+    }
+
+    void setmConsumerExecutor(ExecutorService mConsumerExecutor) {
+        this.mConsumerExecutor = mConsumerExecutor;
+    }
+
+    void setmTopicName(String mTopicName) {
+        this.mTopicName = mTopicName;
+    }
+
 }
index 35de35c..f801f98 100644 (file)
@@ -20,6 +20,8 @@ public class MessageQueueListener {
     private PostDataExtractor extractor;
     private String listenerName;
 
+    MessageQueueListener(){}
+    
     public MessageQueueListener(String topicName, PostDataExtractor extractor, String listenerName, String zookeperHost,
             String kafkaHost) {
         log.debug("Created " + listenerName);
index 2bf931e..641edfe 100644 (file)
@@ -69,6 +69,7 @@ public class DSMRestController {
         map.put(NOTIFICATION_TIME, Long.toString(notification.getCurrentTime()));
         map.put(POLICY, notification.getPolicy());
         map.put(DUID, notification.getDeviceId());
+        map.put(DEVICE_PARENT_UUID, map.get(DEVICE_PARENT_UUID));
         String userUUID = notification.getCloudUserId();
         String topicName = MAIN_TOPIC + userUUID + NOTIFICATION_TOPIC;
         publisher.sendMessage(topicName, map);
index 4280332..221be7b 100644 (file)
@@ -20,9 +20,9 @@ public class ServerSenderImpl implements ServerSender {
     /**
      * Server host
      */
-    private final String host;
+    private String host;
 
-    private final RestTemplate restTemplate;
+    private RestTemplate restTemplate;
 
     /**
      * @param host server host
@@ -52,10 +52,10 @@ public class ServerSenderImpl implements ServerSender {
     }
 
     @Override
-    public boolean requestPolicy(Long policyId, String diveceId) {
+    public boolean requestPolicy(Long policyId, String deviceId) {
         try {
             log.debug(format("Send request for policy %s", policyId));
-            URI url = new URI(host + REST_FOR_REQUEST_POLICY + policyId + "/device/" + diveceId);
+            URI url = new URI(host + REST_FOR_REQUEST_POLICY + policyId + "/device/" + deviceId);
             log.debug("Send to " + url);
             @SuppressWarnings("rawtypes")
             ResponseEntity result = restTemplate.exchange(url, POST, null, ResponseEntity.class);
@@ -83,4 +83,8 @@ public class ServerSenderImpl implements ServerSender {
         return false;
     }
 
+    void setRestTemplate(RestTemplate restTemplate) {
+        this.restTemplate = restTemplate;
+    }
+
 }
\ No newline at end of file
diff --git a/servers/mq/src/test/java/com/samsung/servermq/iotivity/KafkaConsumerWrapperTest.java b/servers/mq/src/test/java/com/samsung/servermq/iotivity/KafkaConsumerWrapperTest.java
new file mode 100644 (file)
index 0000000..e7a53e9
--- /dev/null
@@ -0,0 +1,141 @@
+package com.samsung.servermq.iotivity;
+
+import org.iotivity.cloud.mqserver.Constants;
+import org.iotivity.cloud.mqserver.topic.Topic;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.MessageAndMetadata;
+import kafka.utils.ZkUtils;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.any;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+
+public class KafkaConsumerWrapperTest extends KafkaConsumerWrapper{
+
+    private static final String OIC_WC = ".oic.wc";
+    private static final String LOCALHOST = "localhost:8080";
+
+    @Test(expected=ZkTimeoutException.class)
+    @Ignore
+    public void testSubscribeTopicWithTimeoutException(){
+        Topic consumerTopic = mock(Topic.class);
+        when(consumerTopic.getName()).thenReturn("/oic/wc");
+        new KafkaConsumerWrapper(LOCALHOST, LOCALHOST, consumerTopic);
+    }
+
+    @Test
+    public void testSubscribeUnsubscribeTopic(){
+        Topic consumerTopic = mock(Topic.class);
+        ZkClient mZkClient = mock(ZkClient.class);
+        ZkUtils mZkUtils = mock(ZkUtils.class);
+        when(consumerTopic.getName()).thenReturn(OIC_WC);
+        KafkaConsumerWrapper consumer = new KafkaConsumerWrapperTest();
+        consumer.setmBroker(LOCALHOST);
+        consumer.setmZookeeper(LOCALHOST);
+        consumer.setmTopicName(OIC_WC);
+        consumer.setmZkClient(mZkClient);
+        consumer.setmZkUtils(mZkUtils);
+        consumer.subscribeTopic();
+        assertTrue(consumer.consumerStarted());
+        consumer.subscribeTopic();
+        assertTrue(consumer.consumerStarted());
+        consumer.unsubscribeTopic();
+        consumer.closeConnection();
+        assertFalse(consumer.consumerStarted());
+    }
+
+    @Test
+    public void testSubscribeUnsubscribeTopicCloseConnectionWithoutUnsubscribe(){
+        Topic consumerTopic = mock(Topic.class);
+        ZkClient mZkClient = mock(ZkClient.class);
+        ZkUtils mZkUtils = mock(ZkUtils.class);
+        when(consumerTopic.getName()).thenReturn(OIC_WC);
+        KafkaConsumerWrapper consumer = new KafkaConsumerWrapperTest();
+        consumer.setmBroker(LOCALHOST);
+        consumer.setmZookeeper(LOCALHOST);
+        consumer.setmTopicName(OIC_WC);
+        consumer.setmZkClient(mZkClient);
+        consumer.setmZkUtils(mZkUtils);
+        consumer.subscribeTopic();
+        assertTrue(consumer.consumerStarted());
+        consumer.closeConnection();
+        assertFalse(consumer.consumerStarted());
+    }
+
+    @Test
+    public void testGetEmptyMessages(){
+        Topic consumerTopic = mock(Topic.class);
+        ZkClient mZkClient = mock(ZkClient.class);
+        ZkUtils mZkUtils = mock(ZkUtils.class);
+        when(consumerTopic.getName()).thenReturn(OIC_WC);
+        KafkaConsumerWrapper consumer = new KafkaConsumerWrapperTest();
+        consumer.setmBroker(LOCALHOST);
+        consumer.setmZookeeper(LOCALHOST);
+        consumer.setmTopicName(OIC_WC);
+        consumer.setmZkClient(mZkClient);
+        consumer.setmZkUtils(mZkUtils);
+        consumer.subscribeTopic();
+        assertTrue(consumer.consumerStarted());
+        List<byte[]> messages = consumer.getMessages();
+        assertNotNull(messages);
+        assertTrue(messages.isEmpty());
+        consumer.unsubscribeTopic();
+        consumer.closeConnection();
+        assertFalse(consumer.consumerStarted());
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected void createConsumerConnect() {
+        ConsumerConnector mConsumerConnector = mock(ConsumerConnector.class);
+        Map<String, Integer> topicCountMap = new HashMap<>();
+        topicCountMap.put(OIC_WC, Constants.KAFKA_CONSUMMER_THREADS);
+        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = mock(Map.class);
+        List<KafkaStream<byte[], byte[]>> streams = mock(List.class);
+        KafkaStream<byte[], byte[]> stream = mock(KafkaStream.class);
+        MessageAndMetadata<byte[], byte[]> messageAndMetadata = mock(MessageAndMetadata.class);
+
+        Iterator<KafkaStream<byte[], byte[]>> iterator = mock(Iterator.class);
+        when(iterator.next()).thenReturn(stream);
+        when(iterator.hasNext()).thenReturn(true,false);
+        when(streams.iterator()).thenReturn(iterator);
+
+        ConsumerIterator<byte[], byte[]> steamIterator = mock(ConsumerIterator.class);
+        when(steamIterator.next()).thenReturn(messageAndMetadata);
+        when(steamIterator.hasNext()).thenReturn(true,false);
+        when(stream.iterator()).thenReturn(steamIterator);
+
+        when(mConsumerConnector.createMessageStreams(topicCountMap)).thenReturn(consumerMap);
+        when(consumerMap.get(OIC_WC)).thenReturn(streams);
+        setmConsumerConnector(mConsumerConnector);
+    }
+    
+    @Override
+    protected SimpleConsumer createSimpleConsumer(String brokerHost, int brokerPort){
+        SimpleConsumer simpleConsumer = mock(SimpleConsumer.class);
+        return simpleConsumer;
+    }
+    
+    @Override
+    protected void createTopic() {
+    }
+
+}
diff --git a/servers/mq/src/test/java/com/samsung/servermq/iotivity/KafkaErrorMessagesTest.java b/servers/mq/src/test/java/com/samsung/servermq/iotivity/KafkaErrorMessagesTest.java
new file mode 100644 (file)
index 0000000..e83d3cc
--- /dev/null
@@ -0,0 +1,38 @@
+package com.samsung.servermq.iotivity;
+
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class KafkaErrorMessagesTest {
+
+    @Test
+    public void testGetErrorMessageByCode(){
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(-1).equals("Unknown error"));
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(1).equals("Offset out of range"));
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(3).equals("Unknown topic or partition"));
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(5).equals("Leader not available"));
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(6).equals("Not leader for partition"));
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(9).equals("Replica not available"));
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(12).equals("Ofset metadata too large"));
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(14).equals("Group load in progress"));
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(15).equals("Group coordinator not available"));
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(16).equals("Not coordinator for group"));
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(17).equals("Invalid topic"));
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(22).equals("Illegal generation"));
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(23).equals("Inconsistent group protocol"));
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(25).equals("Unknown member id"));
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(26).equals("Invalid session timeout"));
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(27).equals("Rebalance in progress"));
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(28).equals("Invalid commit offset size"));
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(29).equals("Topic autorization failed"));
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(30).equals("Group autorization failed"));
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(43).equals("Unsupported for message format"));
+    }
+
+    @Test
+    public void testGetErrorMessageByUndefindCode(){
+        assertTrue(KafkaErrorMessages.getErrorMessageByCode(-1000).equals("Unknown error"));
+    }
+
+}
diff --git a/servers/mq/src/test/java/com/samsung/servermq/iotivity/MessageQueueListenerTest.java b/servers/mq/src/test/java/com/samsung/servermq/iotivity/MessageQueueListenerTest.java
new file mode 100644 (file)
index 0000000..fc33521
--- /dev/null
@@ -0,0 +1,83 @@
+package com.samsung.servermq.iotivity;
+
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.iotivity.cloud.mqserver.topic.TopicManager;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import com.samsung.servermq.iotivity.extractor.PostDataExtractor;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+public class MessageQueueListenerTest {
+
+    @Mock
+    private TopicManager topicManager;
+
+    @Mock
+    private KafkaConsumerWrapper kafkaConsumer;
+
+    @Mock
+    private PostDataExtractor extractor;
+
+    private MessageQueueListener mqlistener;
+
+    @Before
+    public void setUp() {
+        mqlistener = new MessageQueueListener();
+        mqlistener.setExtractor(extractor);
+        mqlistener.setKafkaConsumer(kafkaConsumer);
+        mqlistener.setKafkaHost("127.0.0.1:9092");
+        mqlistener.setZookeperHost("127.0.0.1:2181");
+        mqlistener.setListenerName("MockListener");
+        mqlistener.setTopicManager(topicManager);
+        MockitoAnnotations.initMocks(mqlistener);
+    }
+
+    @Test
+    public void testNotEmptyFields() {
+        assertNotNull(mqlistener.getExtractor());
+        assertNotNull(mqlistener.getKafkaConsumer());
+        assertNotNull(mqlistener.getKafkaHost());
+        assertNotNull(mqlistener.getListenerName());
+        assertNotNull(mqlistener.getTopicManager());
+        assertNotNull(mqlistener.getZookeperHost());
+    }
+    
+    @Test(expected = ZkTimeoutException.class)
+    public void testMessageQueueListener() {
+        PostDataExtractor extractor = new MockPostDataExtractor();
+        new MessageQueueListener("test", extractor, "mock", "127.0.0.1:2181", "127.0.0.1:9092");
+    }
+
+    class MockPostDataExtractor implements PostDataExtractor {
+
+        @Override
+        public void extract(byte[] data) {
+        }
+
+    }
+
+    @Test
+    public void testGetMessages() {
+        List<byte[]> messages = new ArrayList<>();
+        messages.add("test message".getBytes());
+        List<byte[]> result = mqlistener.getMessages();
+        assertTrue(result.size() == 0);
+        Mockito.when(kafkaConsumer.getMessages()).thenReturn(messages);
+        result = mqlistener.getMessages();
+        assertTrue(result.size() == 1);
+    }
+
+}
diff --git a/servers/mq/src/test/java/com/samsung/servermq/iotivity/MessageQueuePublisherTest.java b/servers/mq/src/test/java/com/samsung/servermq/iotivity/MessageQueuePublisherTest.java
new file mode 100644 (file)
index 0000000..eafbea7
--- /dev/null
@@ -0,0 +1,17 @@
+package com.samsung.servermq.iotivity;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+public class MessageQueuePublisherTest {
+
+    @Test
+    public void testSendMessage(){
+        Map<String, String> messageMap = new HashMap<>();
+        Publisher publisher = new MessageQueuePublisher("127.0.0.1:9092");
+        publisher.sendMessage("MockYopic", messageMap);
+    }
+
+}
diff --git a/servers/mq/src/test/java/com/samsung/servermq/utils/CborTest.java b/servers/mq/src/test/java/com/samsung/servermq/utils/CborTest.java
new file mode 100644 (file)
index 0000000..8a6b689
--- /dev/null
@@ -0,0 +1,60 @@
+package com.samsung.servermq.utils;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class CborTest {
+
+    @Test
+    public void testEncodeAndDecodePayloadFromCbor() {
+        Cbor<String> cbor = new Cbor<>();
+        String data = "test data";
+        byte[] payload = cbor.encodingPayloadToCbor(data);
+        String result = (String) cbor.parsePayloadFromCbor(payload, String.class);
+        assertNotNull(result);
+        assertTrue(result.equals(data));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testparsePayloadFromCborWithNullValue() {
+        Cbor<String> cbor = new Cbor<>();
+        cbor.parsePayloadFromCbor(null, String.class);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testparsePayloadFromCborWithNullClass() {
+        Cbor<String> cbor = new Cbor<>();
+        String data = "test data";
+        byte[] payload = cbor.encodingPayloadToCbor(data);
+        cbor.parsePayloadFromCbor(payload, null);
+    }
+
+    @Test
+    public void testEncodeAndDecodePayloadFromCborWrongPayload() {
+        Cbor<String> cbor = new Cbor<>();
+        Long data = 121213131l;
+        byte[] payload = cbor.encodingPayloadToCbor(data);
+        String result = (String) cbor.parsePayloadFromCbor(payload, String.class);
+        assertNotNull(result);
+        assertFalse(result.equals(data));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testEncodingPayloadToCborWithNull(){
+        Cbor<String> cbor = new Cbor<>();
+        cbor.encodingPayloadToCbor(null);
+    }
+
+    @Test
+    public void testEncodeAndDecodePayloadFromCborWrongData() {
+        Cbor<String> cbor = new Cbor<>();
+        String data = "test data";
+        String result = (String) cbor.parsePayloadFromCbor(data.getBytes(), String.class);
+        assertNull(result);
+    }
+
+}
diff --git a/servers/mq/src/test/java/com/samsung/servermq/utils/MessageQueueUtilsTest.java b/servers/mq/src/test/java/com/samsung/servermq/utils/MessageQueueUtilsTest.java
new file mode 100644 (file)
index 0000000..1e7190e
--- /dev/null
@@ -0,0 +1,66 @@
+package com.samsung.servermq.utils;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class MessageQueueUtilsTest {
+
+    @Test
+    public void testExtractDataFromPayload(){
+        String key = "key";
+        String value = "value";
+        byte[] payload = MessageQueueUtils.buildPayload(key, value); 
+        String result = MessageQueueUtils.extractDataFromPayload(payload, key);
+        assertNotNull(result);
+        assertTrue(result.equals(value));
+    }
+
+    @Test
+    public void testExtractDataFromPayloadWithEmptyKey(){
+        String key = "";
+        String value = "value";
+        byte[] payload = MessageQueueUtils.buildPayload(key, value); 
+        String result = MessageQueueUtils.extractDataFromPayload(payload, key);
+        assertNull(result);
+    }
+
+    @Test
+    public void testExtractDataFromPayloadWithNullKey(){
+        String key = null;
+        String value = "value";
+        byte[] payload = MessageQueueUtils.buildPayload(key, value); 
+        String result = MessageQueueUtils.extractDataFromPayload(payload, key);
+        assertNull(result);
+    }
+
+    @Test
+    public void testExtractDataFromPayloadWithEmptyValue(){
+        String key = "key";
+        String value = "";
+        byte[] payload = MessageQueueUtils.buildPayload(key, value); 
+        String result = MessageQueueUtils.extractDataFromPayload(payload, key);
+        assertNotNull(result);
+        assertTrue(result.equals(value));
+    }
+
+    @Test
+    public void testExtractDataFromPayloadWithNullValue(){
+        String key = "key";
+        String value = null;
+        byte[] payload = MessageQueueUtils.buildPayload(key, value); 
+        String result = MessageQueueUtils.extractDataFromPayload(payload, key);
+        assertNull(result);
+    }
+
+    @Test
+    public void testExtractDataFromPayloadWithWrongPayload(){
+        String key = "key";
+        String value = "value";
+        String result = MessageQueueUtils.extractDataFromPayload(value.getBytes(), key);
+        assertNull(result);
+    }
+
+}
diff --git a/servers/mq/src/test/java/com/samsung/servermq/utils/rest/ServerSenderImplTest.java b/servers/mq/src/test/java/com/samsung/servermq/utils/rest/ServerSenderImplTest.java
new file mode 100644 (file)
index 0000000..c1ea65f
--- /dev/null
@@ -0,0 +1,121 @@
+package com.samsung.servermq.utils.rest;
+
+import static java.lang.String.format;
+import static org.springframework.http.HttpMethod.POST;
+import static org.springframework.http.HttpStatus.OK;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.any;
+import org.junit.Test;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
+
+public class ServerSenderImplTest {
+
+    @Test
+    public void testSendNotification() throws URISyntaxException{
+        ServerSenderImpl sender = new ServerSenderImpl("http://localhost:8080");
+        RestTemplate restTemplate = mock(RestTemplate.class);
+        URI url = new URI("http://localhost:8080/dsm/restapi/report/newreport/1");
+        when(restTemplate.exchange(url, POST, null, ResponseEntity.class)).thenReturn(new ResponseEntity<>(OK));
+        sender.setRestTemplate(restTemplate);
+        boolean result = sender.sendNotification(1l);
+        assertTrue(result);
+    }
+
+    @Test
+    public void testSendNotificationBadRequest() throws URISyntaxException{
+        ServerSenderImpl sender = new ServerSenderImpl("http://localhost:8080");
+        RestTemplate restTemplate = mock(RestTemplate.class);
+        URI url = new URI("http://localhost:8080/dsm/restapi/report/newreport/1");
+        when(restTemplate.exchange(url, POST, null, ResponseEntity.class)).thenReturn(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
+        sender.setRestTemplate(restTemplate);
+        boolean result = sender.sendNotification(1l);
+        assertFalse(result);
+    }
+
+    @Test
+    public void testSendNotificationURISyntaxException() throws URISyntaxException{
+        ServerSenderImpl sender = new ServerSenderImpl("http://localhost:8080   ");
+        RestTemplate restTemplate = mock(RestTemplate.class);
+        URI url = new URI("http://localhost:8080/dsm/restapi/report/newreport/1");
+        when(restTemplate.exchange(url, POST, null, ResponseEntity.class)).thenReturn(new ResponseEntity<>(OK));
+        sender.setRestTemplate(restTemplate);
+        boolean result = sender.sendNotification(1l);
+        assertFalse(result);
+    }
+
+    @Test
+    public void requestDivicePolicy() throws URISyntaxException{
+        ServerSenderImpl sender = new ServerSenderImpl("http://localhost:8080");
+        RestTemplate restTemplate = mock(RestTemplate.class);
+        URI url = new URI("http://localhost:8080/dsm/restapi/policy/request/1/device/1");
+        when(restTemplate.exchange(url, POST, null, ResponseEntity.class)).thenReturn(new ResponseEntity<>(OK));
+        sender.setRestTemplate(restTemplate);
+        boolean result = sender.requestPolicy(1l, "1");
+        assertTrue(result);
+    }
+
+    @Test
+    public void requestDivicePolicyBadRequest() throws URISyntaxException{
+        ServerSenderImpl sender = new ServerSenderImpl("http://localhost:8080");
+        RestTemplate restTemplate = mock(RestTemplate.class);
+        URI url = new URI("http://localhost:8080/dsm/restapi/policy/request/1/device/1");
+        when(restTemplate.exchange(url, POST, null, ResponseEntity.class)).thenReturn(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
+        sender.setRestTemplate(restTemplate);
+        boolean result = sender.requestPolicy(1l, "1");
+        assertFalse(result);
+    }
+
+    @Test
+    public void requestDivicePolicyURISyntaxException() throws URISyntaxException{
+        ServerSenderImpl sender = new ServerSenderImpl("http://localhost:8080     ");
+        RestTemplate restTemplate = mock(RestTemplate.class);
+        URI url = new URI("http://localhost:8080/dsm/restapi/policy/request/1/device/1");
+        when(restTemplate.exchange(url, POST, null, ResponseEntity.class)).thenReturn(new ResponseEntity<>(OK));
+        sender.setRestTemplate(restTemplate);
+        boolean result = sender.requestPolicy(1l, "1");
+        assertFalse(result);
+    }
+
+    @Test
+    public void requestAgentPolicy() throws URISyntaxException{
+        ServerSenderImpl sender = new ServerSenderImpl("http://localhost:8080");
+        RestTemplate restTemplate = mock(RestTemplate.class);
+        URI url = new URI("http://localhost:8080/dsm/restapi/policy/request/1/agent/1");
+        when(restTemplate.exchange(url, POST, null, ResponseEntity.class)).thenReturn(new ResponseEntity<>(OK));
+        sender.setRestTemplate(restTemplate);
+        boolean result = sender.requestPolicyForAgent(1l, "1");
+        assertTrue(result);
+    }
+
+    @Test
+    public void requestAgentPolicyBadRequest() throws URISyntaxException{
+        ServerSenderImpl sender = new ServerSenderImpl("http://localhost:8080");
+        RestTemplate restTemplate = mock(RestTemplate.class);
+        URI url = new URI("http://localhost:8080/dsm/restapi/policy/request/1/agent/1");
+        when(restTemplate.exchange(url, POST, null, ResponseEntity.class)).thenReturn(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
+        sender.setRestTemplate(restTemplate);
+        boolean result = sender.requestPolicyForAgent(1l, "1");
+        assertFalse(result);
+    }
+
+    @Test
+    public void requestAgentPolicyURISyntaxException() throws URISyntaxException{
+        ServerSenderImpl sender = new ServerSenderImpl("http://localhost:8080     ");
+        RestTemplate restTemplate = mock(RestTemplate.class);
+        URI url = new URI("http://localhost:8080/dsm/restapi/policy/request/1/agent/1");
+        when(restTemplate.exchange(url, POST, null, ResponseEntity.class)).thenReturn(new ResponseEntity<>(OK));
+        sender.setRestTemplate(restTemplate);
+        boolean result = sender.requestPolicyForAgent(1l, "1");
+        assertFalse(result);
+    }
+
+}