return status;
}
+ public String getParentUuid() {
+ return parentUuid;
+ }
+
+ public void setParentUuid(String parentUuid) {
+ this.parentUuid = parentUuid;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
result = 31 * result + (name != null ? name.hashCode() : 0);
result = 31 * result + (type != null ? type.hashCode() : 0);
result = 31 * result + (model != null ? model.hashCode() : 0);
+ result = 31 * result + (parentUuid != null ? parentUuid.hashCode() : 0);
result = 31 * result + status;
return result;
}
Map<String, String> mapResult = new HashMap<>();
String parentCommand = map.get(ReportAnalyzerKeys.MAC_REPORT_PPID_COMM);
String command = map.get(ReportAnalyzerKeys.MAC_REPORT_COMM);
+ String parentUuid = map.get(ReportAnalyzerKeys.DEVICE_PARENT_UUID);
mapResult.put(ReportAnalyzerKeys.MAC_REPORT_APPNAME, command);
+ mapResult.put(ReportAnalyzerKeys.DEVICE_PARENT_UUID, parentUuid);
///////////////////////////////////////////////////// FIXME Stub for demos/////////////////////////////////////////////////////
boolean find = false;
if(parentCommand.equalsIgnoreCase("smack_test") || command.equalsIgnoreCase("smack_test")){
notification.setMessage(getNotTrustedMessage(parentCommand, command));
notification.setCode(CODE_NOT_GRANTED);
notification.setCurrentTime(System.currentTimeMillis());
+ notification.setAdditionalMap(mapResult);
log.debug(notification.getMessage());
return notification;
}
import static java.lang.String.format;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
Device device = report.getDevice();
IotCloudUser iotCloudUser = iotCloudService.findByUser(device.getUser());
Notification notification = new Notification();
-// notification.setCloudUserId(iotCloudUser.getUuid());
-// notification.setDeviceId(device.getUuid());
-// notification.setPolicy(device.getPolicy().getPolicy());
if (getReportType(report).equalsIgnoreCase(MACReportAnalyzer.MAC_REPORT)) {
MACReportAnalyzer analyzer = new MACReportAnalyzer();
Map<String, String> map = analyzer.extractReportData(report.getReport());
map.put(ReportAnalyzerKeys.POLICY, device.getPolicy().getPolicy());
map.put(ReportAnalyzerKeys.DEVICE_PARENT_UUID, device.getParentUuid());
analyzer.createNotification(map, grantedApplicationService);
-// analyzer.analyze(map, grantedApplicationService);
notification = analyzer.createNotification(map, grantedApplicationService);
-// notification.setCode(analyzer.getCode());
-// notification.setTitle(analyzer.getTitle());
-// notification.setMessage(analyzer.getMessage());
-// notification.setCurrentTime(System.currentTimeMillis());
-//
-// ////////////////////////////////////////////////////////////////FIXME Stub for demo
-// String parentCommand = map.get(ReportAnalyzerKeys.MAC_REPORT_PPID_COMM);
-// String command = map.get(ReportAnalyzerKeys.MAC_REPORT_COMM);
-// String ppid = map.get(ReportAnalyzerKeys.MAC_REPORT_PPID);
-// String pid = map.get(ReportAnalyzerKeys.MAC_REPORT_PID);
-// if(parentCommand.equalsIgnoreCase("smack_test") || command.equalsIgnoreCase("smack_test")){
-// Map<String, String> stubMap = new HashMap<>();
-// if(parentCommand.equalsIgnoreCase("smack_test")){
-// stubMap.put(ReportAnalyzerKeys.MAC_REPORT_PID, ppid);
-// stubMap.put(ReportAnalyzerKeys.MAC_REPORT_APPNAME, parentCommand);
-// } else {
-// stubMap.put(ReportAnalyzerKeys.MAC_REPORT_PID, pid);
-// stubMap.put(ReportAnalyzerKeys.MAC_REPORT_APPNAME, command);
-// }
-// notification.setAdditionalMap(stubMap);
-// ///////////////////////////////////////////////////////////////////////////
-// }
-// boolean find = false;
-// Set<String> keys = map.keySet();
-// for(String key: keys){
-// String value = map.get(key);
-// log.info("(key = " + key + ", value = " + value+ ")");
-// if(value.contains("smack_test")){
-// log.info("Find in key = " + key + ", value = " + value);
-// find = true;
-// }
-// }
-// if(find){
-// Map<String, String> stubMap = new HashMap<>();
-// stubMap.put(ReportAnalyzerKeys.MAC_REPORT_PID, pid);
-// String subject = map.get(ReportAnalyzerKeys.MAC_REPORT_SUBJECT);
-// String[] arr = subject.split(":");
-// log.info("============================= subject ==========================================");
-// log.info("============================= Array: " + Arrays.toString(arr)+ " ==========================================");
-// stubMap.put(ReportAnalyzerKeys.MAC_REPORT_APPNAME, arr[arr.length - 1]);
-// notification.setCode("2");//Not Granted
-// notification.setAdditionalMap(stubMap);
-// } else {
-// notification.setAdditionalMap(map);
-// }
} else {
+ Map<String, String> mapResult = new HashMap<>();
notification.setMessage("Not implemented");
notification.setTitle("Not implemented");
notification.setCode("0");
+ mapResult.put(ReportAnalyzerKeys.MAC_REPORT_APPNAME, "Not implemented");
+ mapResult.put(ReportAnalyzerKeys.DEVICE_PARENT_UUID, "Not implemented");
}
log.info("=======================================================================");
mqSender.sendNotification(notification);
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
public void deviceByUUIDOk() throws Exception {
ResultActions result = mockMvc.perform(get(ROOT_URL + "/device/1234")).andExpect(status().isOk());
String content = result.andReturn().getResponse().getContentAsString();
-
- assertTrue(JsonUtils.equalJson(content, "{\"uuid\":\"1234\",\"name\":\"s8\",\"type\":\"phone\",\"model\":\"samsung s8 edge\",\"status\":1}"));
+ assertTrue(JsonUtils.equalJson(content, "{\"uuid\":\"1234\",\"name\":\"s8\",\"type\":\"phone\",\"model\":\"samsung s8 edge\",\"status\":1,\"parentUuid\":\"parentUuid\"}"));
}
@Test
ResultActions result = mockMvc.perform(get(ROOT_URL + "/device/1234/byCloudUserUuid")).andExpect(status().isOk());
String content = result.andReturn().getResponse().getContentAsString();
assertTrue(JsonUtils.equalJsonArrays(content,
- "[{\"uuid\":\"1234\",\"name\":\"s8\",\"type\":\"phone\",\"model\":\"samsung s8 edge\",\"status\":1}," +
- "{\"uuid\":\"1235\",\"name\":\"s4\",\"type\":\"phone\",\"model\":\"samsung s7 edge\",\"status\":1}," +
- "{\"uuid\":\"1236\",\"name\":\"airconditioner samsung\",\"type\":\"airconditioner\",\"model\":\"AR12JSFSRWK\",\"status\":1}]"));
+ "[{\"uuid\":\"1234\",\"name\":\"s8\",\"type\":\"phone\",\"model\":\"samsung s8 edge\",\"status\":1,\"parentUuid\":\"parentUuid\"}," +
+ "{\"uuid\":\"1235\",\"name\":\"s4\",\"type\":\"phone\",\"model\":\"samsung s7 edge\",\"status\":1,\"parentUuid\":\"parentUuid\"}," +
+ "{\"uuid\":\"1236\",\"name\":\"airconditioner samsung\",\"type\":\"airconditioner\",\"model\":\"AR12JSFSRWK\",\"status\":1,\"parentUuid\":\"parentUuid\"}]"));
}
@Test
}
@Test
- @Ignore
public void findPolicyByUuidOk() throws Exception {
ResultActions result = mockMvc.perform(get(ROOT_URL + "/device/1234/policy")).andExpect(status().isOk());
String content = result.andReturn().getResponse().getContentAsString();
- assertEquals("[\t{\t\t\"group\": \"tv-extension\",\t\t\"policies\": [\t\t\t{\t\t\t\t\"name\": \"usb\",\t\t\t\t\"state\": 0,\t\t\t\t\"items\": []\t\t\t},\t\t\t{\t\t\t\t\"name\": \"screen-capture\",\t\t\t\t\"state\": 1,\t\t\t\t\"items\": []\t\t\t},\t\t\t{\t\t\t\t\"name\": \"bluetooth\",\t\t\t\t\"state\": 1,\t\t\t\t\"items\": []\t\t\t},\t\t\t{\t\t\t\t\"name\": \"iptables\",\t\t\t\t\"state\": -1,\t\t\t\t\"items\": [\t\t\t\t\t\"127.0.0.0/24|UDP|10-1024\",\t\t\t\t\t\"1.1.1.1|TCP|80,443\",\t\t\t\t\t\"8.8.8.8\"\t\t\t\t]\t\t\t}\t\t]\t}]",content);
+ assertEquals("[{\"group\": \"tv-extension\",\"policies\": [{\"name\": \"sound\",\"state\": 1,\"items\": []},{\"name\": \"bluetooth\",\"state\": 1,\"items\": []},{\"name\": \"wifi\",\"state\": 1,\"items\": []},{\"name\": \"usb\",\"state\": 1,\"items\": []},{\"name\": \"dtv-tunner\",\"state\": 1,\"items\": []},{\"name\": \"iptables\",\"state\": -1,\"items\": []}]}]",content);
}
@Test
- @Ignore
public void findPolicyByUuidForRemovedDeviceOk() throws Exception {
ResultActions result = mockMvc.perform(get(ROOT_URL + "/device/1237/policy")).andExpect(status().isOk());
String content = result.andReturn().getResponse().getContentAsString();
- assertEquals("[\t{\t\t\"group\": \"tv-extension\",\t\t\"policies\": [\t\t\t{\t\t\t\t\"name\": \"usb\",\t\t\t\t\"state\": 0,\t\t\t\t\"items\": []\t\t\t},\t\t\t{\t\t\t\t\"name\": \"screen-capture\",\t\t\t\t\"state\": 1,\t\t\t\t\"items\": []\t\t\t},\t\t\t{\t\t\t\t\"name\": \"bluetooth\",\t\t\t\t\"state\": 1,\t\t\t\t\"items\": []\t\t\t},\t\t\t{\t\t\t\t\"name\": \"iptables\",\t\t\t\t\"state\": -1,\t\t\t\t\"items\": [\t\t\t\t\t\"127.0.0.0/24|UDP|10-1024\",\t\t\t\t\t\"1.1.1.1|TCP|80,443\",\t\t\t\t\t\"8.8.8.8\"\t\t\t\t]\t\t\t}\t\t]\t}]",content);
+ assertEquals("[{\"group\": \"tv-extension\",\"policies\": [{\"name\": \"sound\",\"state\": 1,\"items\": []},{\"name\": \"bluetooth\",\"state\": 1,\"items\": []},{\"name\": \"wifi\",\"state\": 1,\"items\": []},{\"name\": \"usb\",\"state\": 1,\"items\": []},{\"name\": \"dtv-tunner\",\"state\": 1,\"items\": []},{\"name\": \"iptables\",\"state\": -1,\"items\": []}]}]",content);
}
@Test
import com.samsung.dsm.utils.JsonUtils;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
ResultActions result = mockMvc.perform(get(ROOT_URL + "/device/1234")).andExpect(status().isOk());
String content = result.andReturn().getResponse().getContentAsString();
- assertTrue(JsonUtils.equalJson(content, "{\"uuid\":\"1234\",\"name\":\"s8\",\"type\":\"phone\",\"model\":\"samsung s8 edge\",\"status\":1}"));
+ assertTrue(JsonUtils.equalJson(content, "{\"uuid\":\"1234\",\"name\":\"s8\",\"type\":\"phone\",\"model\":\"samsung s8 edge\",\"status\":1,\"parentUuid\":\"parentUuid\"}"));
}
@Test
private boolean mConsumerStarted = false;
+ KafkaConsumerWrapper(){
+ }
+
public KafkaConsumerWrapper(String zookeeperAddress, String brokerAddress, Topic consumer) {
mTopicName = consumer.getName().replace("/", ".");
AdminUtils.deleteConsumerGroupInZK(mZkUtils, mTopicName);
}
- ConsumerConfig consumerConfig = new ConsumerConfig(buildPropertiesForSubscribe());
- mConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
+ createConsumerConnect();
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(mTopicName, Constants.KAFKA_CONSUMMER_THREADS);
return mConsumerStarted;
}
+ protected void createConsumerConnect() {
+ ConsumerConfig consumerConfig = new ConsumerConfig(buildPropertiesForSubscribe());
+ mConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
+ }
+
/**
* API to unsubscribe Kafka topic to stop receiving messages
*
log.trace("host " + brokerHost + ", port " + brokerPort);
// TODO check options - Timeout: Int, bufferSize: Int
- SimpleConsumer simpleConsumer = new SimpleConsumer(brokerHost, brokerPort, TIME_OUT, BUFFER_SIZE, mTopicName);
+ SimpleConsumer simpleConsumer = createSimpleConsumer(brokerHost, brokerPort);
if (lastOffset < 0) {
lastOffset = getLastOffset(simpleConsumer, mTopicName, PARTITION, kafka.api.OffsetRequest.EarliestTime(), mTopicName);
return extractMessages(simpleConsumer, fetchResponse);
}
+ protected SimpleConsumer createSimpleConsumer(String brokerHost, int brokerPort) {
+ SimpleConsumer simpleConsumer = new SimpleConsumer(brokerHost, brokerPort, TIME_OUT, BUFFER_SIZE, mTopicName);
+ return simpleConsumer;
+ }
+
private List<byte[]> extractMessages(SimpleConsumer simpleConsumer, FetchResponse fetchResponse) {
ArrayList<byte[]> initialData = new ArrayList<>();
return initialData;
}
- private void createTopic() {
+ protected void createTopic() {
KafkaCommonWrapper kafkaCommonWrapper = new KafkaCommonWrapper(mZookeeper, mBroker);
if (kafkaCommonWrapper.createTopic(mTopicName)) {
getMessages();
return offsets[0];
}
+ //Need for tests
+ void setmZookeeper(String mZookeeper) {
+ this.mZookeeper = mZookeeper;
+ }
+
+ void setmBroker(String mBroker) {
+ this.mBroker = mBroker;
+ }
+
+ void setmZkClient(ZkClient mZkClient) {
+ this.mZkClient = mZkClient;
+ }
+
+ void setmZkUtils(ZkUtils mZkUtils) {
+ this.mZkUtils = mZkUtils;
+ }
+
+ void setmConsumerConnector(ConsumerConnector mConsumerConnector) {
+ this.mConsumerConnector = mConsumerConnector;
+ }
+
+ void setmConsumerExecutor(ExecutorService mConsumerExecutor) {
+ this.mConsumerExecutor = mConsumerExecutor;
+ }
+
+ void setmTopicName(String mTopicName) {
+ this.mTopicName = mTopicName;
+ }
+
}
private PostDataExtractor extractor;
private String listenerName;
+ MessageQueueListener(){}
+
public MessageQueueListener(String topicName, PostDataExtractor extractor, String listenerName, String zookeperHost,
String kafkaHost) {
log.debug("Created " + listenerName);
map.put(NOTIFICATION_TIME, Long.toString(notification.getCurrentTime()));
map.put(POLICY, notification.getPolicy());
map.put(DUID, notification.getDeviceId());
+ map.put(DEVICE_PARENT_UUID, map.get(DEVICE_PARENT_UUID));
String userUUID = notification.getCloudUserId();
String topicName = MAIN_TOPIC + userUUID + NOTIFICATION_TOPIC;
publisher.sendMessage(topicName, map);
/**
* Server host
*/
- private final String host;
+ private String host;
- private final RestTemplate restTemplate;
+ private RestTemplate restTemplate;
/**
* @param host server host
}
@Override
- public boolean requestPolicy(Long policyId, String diveceId) {
+ public boolean requestPolicy(Long policyId, String deviceId) {
try {
log.debug(format("Send request for policy %s", policyId));
- URI url = new URI(host + REST_FOR_REQUEST_POLICY + policyId + "/device/" + diveceId);
+ URI url = new URI(host + REST_FOR_REQUEST_POLICY + policyId + "/device/" + deviceId);
log.debug("Send to " + url);
@SuppressWarnings("rawtypes")
ResponseEntity result = restTemplate.exchange(url, POST, null, ResponseEntity.class);
return false;
}
+ void setRestTemplate(RestTemplate restTemplate) {
+ this.restTemplate = restTemplate;
+ }
+
}
\ No newline at end of file
--- /dev/null
+package com.samsung.servermq.iotivity;
+
+import org.iotivity.cloud.mqserver.Constants;
+import org.iotivity.cloud.mqserver.topic.Topic;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.MessageAndMetadata;
+import kafka.utils.ZkUtils;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.any;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+
+public class KafkaConsumerWrapperTest extends KafkaConsumerWrapper{
+
+ private static final String OIC_WC = ".oic.wc";
+ private static final String LOCALHOST = "localhost:8080";
+
+ @Test(expected=ZkTimeoutException.class)
+ @Ignore
+ public void testSubscribeTopicWithTimeoutException(){
+ Topic consumerTopic = mock(Topic.class);
+ when(consumerTopic.getName()).thenReturn("/oic/wc");
+ new KafkaConsumerWrapper(LOCALHOST, LOCALHOST, consumerTopic);
+ }
+
+ @Test
+ public void testSubscribeUnsubscribeTopic(){
+ Topic consumerTopic = mock(Topic.class);
+ ZkClient mZkClient = mock(ZkClient.class);
+ ZkUtils mZkUtils = mock(ZkUtils.class);
+ when(consumerTopic.getName()).thenReturn(OIC_WC);
+ KafkaConsumerWrapper consumer = new KafkaConsumerWrapperTest();
+ consumer.setmBroker(LOCALHOST);
+ consumer.setmZookeeper(LOCALHOST);
+ consumer.setmTopicName(OIC_WC);
+ consumer.setmZkClient(mZkClient);
+ consumer.setmZkUtils(mZkUtils);
+ consumer.subscribeTopic();
+ assertTrue(consumer.consumerStarted());
+ consumer.subscribeTopic();
+ assertTrue(consumer.consumerStarted());
+ consumer.unsubscribeTopic();
+ consumer.closeConnection();
+ assertFalse(consumer.consumerStarted());
+ }
+
+ @Test
+ public void testSubscribeUnsubscribeTopicCloseConnectionWithoutUnsubscribe(){
+ Topic consumerTopic = mock(Topic.class);
+ ZkClient mZkClient = mock(ZkClient.class);
+ ZkUtils mZkUtils = mock(ZkUtils.class);
+ when(consumerTopic.getName()).thenReturn(OIC_WC);
+ KafkaConsumerWrapper consumer = new KafkaConsumerWrapperTest();
+ consumer.setmBroker(LOCALHOST);
+ consumer.setmZookeeper(LOCALHOST);
+ consumer.setmTopicName(OIC_WC);
+ consumer.setmZkClient(mZkClient);
+ consumer.setmZkUtils(mZkUtils);
+ consumer.subscribeTopic();
+ assertTrue(consumer.consumerStarted());
+ consumer.closeConnection();
+ assertFalse(consumer.consumerStarted());
+ }
+
+ @Test
+ public void testGetEmptyMessages(){
+ Topic consumerTopic = mock(Topic.class);
+ ZkClient mZkClient = mock(ZkClient.class);
+ ZkUtils mZkUtils = mock(ZkUtils.class);
+ when(consumerTopic.getName()).thenReturn(OIC_WC);
+ KafkaConsumerWrapper consumer = new KafkaConsumerWrapperTest();
+ consumer.setmBroker(LOCALHOST);
+ consumer.setmZookeeper(LOCALHOST);
+ consumer.setmTopicName(OIC_WC);
+ consumer.setmZkClient(mZkClient);
+ consumer.setmZkUtils(mZkUtils);
+ consumer.subscribeTopic();
+ assertTrue(consumer.consumerStarted());
+ List<byte[]> messages = consumer.getMessages();
+ assertNotNull(messages);
+ assertTrue(messages.isEmpty());
+ consumer.unsubscribeTopic();
+ consumer.closeConnection();
+ assertFalse(consumer.consumerStarted());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected void createConsumerConnect() {
+ ConsumerConnector mConsumerConnector = mock(ConsumerConnector.class);
+ Map<String, Integer> topicCountMap = new HashMap<>();
+ topicCountMap.put(OIC_WC, Constants.KAFKA_CONSUMMER_THREADS);
+ Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = mock(Map.class);
+ List<KafkaStream<byte[], byte[]>> streams = mock(List.class);
+ KafkaStream<byte[], byte[]> stream = mock(KafkaStream.class);
+ MessageAndMetadata<byte[], byte[]> messageAndMetadata = mock(MessageAndMetadata.class);
+
+ Iterator<KafkaStream<byte[], byte[]>> iterator = mock(Iterator.class);
+ when(iterator.next()).thenReturn(stream);
+ when(iterator.hasNext()).thenReturn(true,false);
+ when(streams.iterator()).thenReturn(iterator);
+
+ ConsumerIterator<byte[], byte[]> steamIterator = mock(ConsumerIterator.class);
+ when(steamIterator.next()).thenReturn(messageAndMetadata);
+ when(steamIterator.hasNext()).thenReturn(true,false);
+ when(stream.iterator()).thenReturn(steamIterator);
+
+ when(mConsumerConnector.createMessageStreams(topicCountMap)).thenReturn(consumerMap);
+ when(consumerMap.get(OIC_WC)).thenReturn(streams);
+ setmConsumerConnector(mConsumerConnector);
+ }
+
+ @Override
+ protected SimpleConsumer createSimpleConsumer(String brokerHost, int brokerPort){
+ SimpleConsumer simpleConsumer = mock(SimpleConsumer.class);
+ return simpleConsumer;
+ }
+
+ @Override
+ protected void createTopic() {
+ }
+
+}
--- /dev/null
+package com.samsung.servermq.iotivity;
+
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class KafkaErrorMessagesTest {
+
+ @Test
+ public void testGetErrorMessageByCode(){
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(-1).equals("Unknown error"));
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(1).equals("Offset out of range"));
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(3).equals("Unknown topic or partition"));
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(5).equals("Leader not available"));
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(6).equals("Not leader for partition"));
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(9).equals("Replica not available"));
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(12).equals("Ofset metadata too large"));
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(14).equals("Group load in progress"));
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(15).equals("Group coordinator not available"));
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(16).equals("Not coordinator for group"));
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(17).equals("Invalid topic"));
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(22).equals("Illegal generation"));
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(23).equals("Inconsistent group protocol"));
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(25).equals("Unknown member id"));
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(26).equals("Invalid session timeout"));
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(27).equals("Rebalance in progress"));
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(28).equals("Invalid commit offset size"));
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(29).equals("Topic autorization failed"));
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(30).equals("Group autorization failed"));
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(43).equals("Unsupported for message format"));
+ }
+
+ @Test
+ public void testGetErrorMessageByUndefindCode(){
+ assertTrue(KafkaErrorMessages.getErrorMessageByCode(-1000).equals("Unknown error"));
+ }
+
+}
--- /dev/null
+package com.samsung.servermq.iotivity;
+
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.iotivity.cloud.mqserver.topic.TopicManager;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import com.samsung.servermq.iotivity.extractor.PostDataExtractor;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+public class MessageQueueListenerTest {
+
+ @Mock
+ private TopicManager topicManager;
+
+ @Mock
+ private KafkaConsumerWrapper kafkaConsumer;
+
+ @Mock
+ private PostDataExtractor extractor;
+
+ private MessageQueueListener mqlistener;
+
+ @Before
+ public void setUp() {
+ mqlistener = new MessageQueueListener();
+ mqlistener.setExtractor(extractor);
+ mqlistener.setKafkaConsumer(kafkaConsumer);
+ mqlistener.setKafkaHost("127.0.0.1:9092");
+ mqlistener.setZookeperHost("127.0.0.1:2181");
+ mqlistener.setListenerName("MockListener");
+ mqlistener.setTopicManager(topicManager);
+ MockitoAnnotations.initMocks(mqlistener);
+ }
+
+ @Test
+ public void testNotEmptyFields() {
+ assertNotNull(mqlistener.getExtractor());
+ assertNotNull(mqlistener.getKafkaConsumer());
+ assertNotNull(mqlistener.getKafkaHost());
+ assertNotNull(mqlistener.getListenerName());
+ assertNotNull(mqlistener.getTopicManager());
+ assertNotNull(mqlistener.getZookeperHost());
+ }
+
+ @Test(expected = ZkTimeoutException.class)
+ public void testMessageQueueListener() {
+ PostDataExtractor extractor = new MockPostDataExtractor();
+ new MessageQueueListener("test", extractor, "mock", "127.0.0.1:2181", "127.0.0.1:9092");
+ }
+
+ class MockPostDataExtractor implements PostDataExtractor {
+
+ @Override
+ public void extract(byte[] data) {
+ }
+
+ }
+
+ @Test
+ public void testGetMessages() {
+ List<byte[]> messages = new ArrayList<>();
+ messages.add("test message".getBytes());
+ List<byte[]> result = mqlistener.getMessages();
+ assertTrue(result.size() == 0);
+ Mockito.when(kafkaConsumer.getMessages()).thenReturn(messages);
+ result = mqlistener.getMessages();
+ assertTrue(result.size() == 1);
+ }
+
+}
--- /dev/null
+package com.samsung.servermq.iotivity;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+public class MessageQueuePublisherTest {
+
+ @Test
+ public void testSendMessage(){
+ Map<String, String> messageMap = new HashMap<>();
+ Publisher publisher = new MessageQueuePublisher("127.0.0.1:9092");
+ publisher.sendMessage("MockYopic", messageMap);
+ }
+
+}
--- /dev/null
+package com.samsung.servermq.utils;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class CborTest {
+
+ @Test
+ public void testEncodeAndDecodePayloadFromCbor() {
+ Cbor<String> cbor = new Cbor<>();
+ String data = "test data";
+ byte[] payload = cbor.encodingPayloadToCbor(data);
+ String result = (String) cbor.parsePayloadFromCbor(payload, String.class);
+ assertNotNull(result);
+ assertTrue(result.equals(data));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testparsePayloadFromCborWithNullValue() {
+ Cbor<String> cbor = new Cbor<>();
+ cbor.parsePayloadFromCbor(null, String.class);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testparsePayloadFromCborWithNullClass() {
+ Cbor<String> cbor = new Cbor<>();
+ String data = "test data";
+ byte[] payload = cbor.encodingPayloadToCbor(data);
+ cbor.parsePayloadFromCbor(payload, null);
+ }
+
+ @Test
+ public void testEncodeAndDecodePayloadFromCborWrongPayload() {
+ Cbor<String> cbor = new Cbor<>();
+ Long data = 121213131l;
+ byte[] payload = cbor.encodingPayloadToCbor(data);
+ String result = (String) cbor.parsePayloadFromCbor(payload, String.class);
+ assertNotNull(result);
+ assertFalse(result.equals(data));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testEncodingPayloadToCborWithNull(){
+ Cbor<String> cbor = new Cbor<>();
+ cbor.encodingPayloadToCbor(null);
+ }
+
+ @Test
+ public void testEncodeAndDecodePayloadFromCborWrongData() {
+ Cbor<String> cbor = new Cbor<>();
+ String data = "test data";
+ String result = (String) cbor.parsePayloadFromCbor(data.getBytes(), String.class);
+ assertNull(result);
+ }
+
+}
--- /dev/null
+package com.samsung.servermq.utils;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class MessageQueueUtilsTest {
+
+ @Test
+ public void testExtractDataFromPayload(){
+ String key = "key";
+ String value = "value";
+ byte[] payload = MessageQueueUtils.buildPayload(key, value);
+ String result = MessageQueueUtils.extractDataFromPayload(payload, key);
+ assertNotNull(result);
+ assertTrue(result.equals(value));
+ }
+
+ @Test
+ public void testExtractDataFromPayloadWithEmptyKey(){
+ String key = "";
+ String value = "value";
+ byte[] payload = MessageQueueUtils.buildPayload(key, value);
+ String result = MessageQueueUtils.extractDataFromPayload(payload, key);
+ assertNull(result);
+ }
+
+ @Test
+ public void testExtractDataFromPayloadWithNullKey(){
+ String key = null;
+ String value = "value";
+ byte[] payload = MessageQueueUtils.buildPayload(key, value);
+ String result = MessageQueueUtils.extractDataFromPayload(payload, key);
+ assertNull(result);
+ }
+
+ @Test
+ public void testExtractDataFromPayloadWithEmptyValue(){
+ String key = "key";
+ String value = "";
+ byte[] payload = MessageQueueUtils.buildPayload(key, value);
+ String result = MessageQueueUtils.extractDataFromPayload(payload, key);
+ assertNotNull(result);
+ assertTrue(result.equals(value));
+ }
+
+ @Test
+ public void testExtractDataFromPayloadWithNullValue(){
+ String key = "key";
+ String value = null;
+ byte[] payload = MessageQueueUtils.buildPayload(key, value);
+ String result = MessageQueueUtils.extractDataFromPayload(payload, key);
+ assertNull(result);
+ }
+
+ @Test
+ public void testExtractDataFromPayloadWithWrongPayload(){
+ String key = "key";
+ String value = "value";
+ String result = MessageQueueUtils.extractDataFromPayload(value.getBytes(), key);
+ assertNull(result);
+ }
+
+}
--- /dev/null
+package com.samsung.servermq.utils.rest;
+
+import static java.lang.String.format;
+import static org.springframework.http.HttpMethod.POST;
+import static org.springframework.http.HttpStatus.OK;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.any;
+import org.junit.Test;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
+
+public class ServerSenderImplTest {
+
+ @Test
+ public void testSendNotification() throws URISyntaxException{
+ ServerSenderImpl sender = new ServerSenderImpl("http://localhost:8080");
+ RestTemplate restTemplate = mock(RestTemplate.class);
+ URI url = new URI("http://localhost:8080/dsm/restapi/report/newreport/1");
+ when(restTemplate.exchange(url, POST, null, ResponseEntity.class)).thenReturn(new ResponseEntity<>(OK));
+ sender.setRestTemplate(restTemplate);
+ boolean result = sender.sendNotification(1l);
+ assertTrue(result);
+ }
+
+ @Test
+ public void testSendNotificationBadRequest() throws URISyntaxException{
+ ServerSenderImpl sender = new ServerSenderImpl("http://localhost:8080");
+ RestTemplate restTemplate = mock(RestTemplate.class);
+ URI url = new URI("http://localhost:8080/dsm/restapi/report/newreport/1");
+ when(restTemplate.exchange(url, POST, null, ResponseEntity.class)).thenReturn(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
+ sender.setRestTemplate(restTemplate);
+ boolean result = sender.sendNotification(1l);
+ assertFalse(result);
+ }
+
+ @Test
+ public void testSendNotificationURISyntaxException() throws URISyntaxException{
+ ServerSenderImpl sender = new ServerSenderImpl("http://localhost:8080 ");
+ RestTemplate restTemplate = mock(RestTemplate.class);
+ URI url = new URI("http://localhost:8080/dsm/restapi/report/newreport/1");
+ when(restTemplate.exchange(url, POST, null, ResponseEntity.class)).thenReturn(new ResponseEntity<>(OK));
+ sender.setRestTemplate(restTemplate);
+ boolean result = sender.sendNotification(1l);
+ assertFalse(result);
+ }
+
+ @Test
+ public void requestDivicePolicy() throws URISyntaxException{
+ ServerSenderImpl sender = new ServerSenderImpl("http://localhost:8080");
+ RestTemplate restTemplate = mock(RestTemplate.class);
+ URI url = new URI("http://localhost:8080/dsm/restapi/policy/request/1/device/1");
+ when(restTemplate.exchange(url, POST, null, ResponseEntity.class)).thenReturn(new ResponseEntity<>(OK));
+ sender.setRestTemplate(restTemplate);
+ boolean result = sender.requestPolicy(1l, "1");
+ assertTrue(result);
+ }
+
+ @Test
+ public void requestDivicePolicyBadRequest() throws URISyntaxException{
+ ServerSenderImpl sender = new ServerSenderImpl("http://localhost:8080");
+ RestTemplate restTemplate = mock(RestTemplate.class);
+ URI url = new URI("http://localhost:8080/dsm/restapi/policy/request/1/device/1");
+ when(restTemplate.exchange(url, POST, null, ResponseEntity.class)).thenReturn(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
+ sender.setRestTemplate(restTemplate);
+ boolean result = sender.requestPolicy(1l, "1");
+ assertFalse(result);
+ }
+
+ @Test
+ public void requestDivicePolicyURISyntaxException() throws URISyntaxException{
+ ServerSenderImpl sender = new ServerSenderImpl("http://localhost:8080 ");
+ RestTemplate restTemplate = mock(RestTemplate.class);
+ URI url = new URI("http://localhost:8080/dsm/restapi/policy/request/1/device/1");
+ when(restTemplate.exchange(url, POST, null, ResponseEntity.class)).thenReturn(new ResponseEntity<>(OK));
+ sender.setRestTemplate(restTemplate);
+ boolean result = sender.requestPolicy(1l, "1");
+ assertFalse(result);
+ }
+
+ @Test
+ public void requestAgentPolicy() throws URISyntaxException{
+ ServerSenderImpl sender = new ServerSenderImpl("http://localhost:8080");
+ RestTemplate restTemplate = mock(RestTemplate.class);
+ URI url = new URI("http://localhost:8080/dsm/restapi/policy/request/1/agent/1");
+ when(restTemplate.exchange(url, POST, null, ResponseEntity.class)).thenReturn(new ResponseEntity<>(OK));
+ sender.setRestTemplate(restTemplate);
+ boolean result = sender.requestPolicyForAgent(1l, "1");
+ assertTrue(result);
+ }
+
+ @Test
+ public void requestAgentPolicyBadRequest() throws URISyntaxException{
+ ServerSenderImpl sender = new ServerSenderImpl("http://localhost:8080");
+ RestTemplate restTemplate = mock(RestTemplate.class);
+ URI url = new URI("http://localhost:8080/dsm/restapi/policy/request/1/agent/1");
+ when(restTemplate.exchange(url, POST, null, ResponseEntity.class)).thenReturn(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
+ sender.setRestTemplate(restTemplate);
+ boolean result = sender.requestPolicyForAgent(1l, "1");
+ assertFalse(result);
+ }
+
+ @Test
+ public void requestAgentPolicyURISyntaxException() throws URISyntaxException{
+ ServerSenderImpl sender = new ServerSenderImpl("http://localhost:8080 ");
+ RestTemplate restTemplate = mock(RestTemplate.class);
+ URI url = new URI("http://localhost:8080/dsm/restapi/policy/request/1/agent/1");
+ when(restTemplate.exchange(url, POST, null, ResponseEntity.class)).thenReturn(new ResponseEntity<>(OK));
+ sender.setRestTemplate(restTemplate);
+ boolean result = sender.requestPolicyForAgent(1l, "1");
+ assertFalse(result);
+ }
+
+}