import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import kafka.admin.TopicCommand;
+import kafka.admin.TopicCommand.TopicCommandOptions;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
import org.iotivity.cloud.base.device.CoapDevice;
import org.iotivity.cloud.base.exception.ServerException.ForbiddenException;
import org.iotivity.cloud.base.exception.ServerException.NotFoundException;
import org.iotivity.cloud.base.protocols.enums.ResponseStatus;
import org.iotivity.cloud.mqserver.Constants;
import org.iotivity.cloud.util.Cbor;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
IResponse mResponse = null;
CountDownLatch mLatch = null;
+ // insert user's zookeper and broker addresses
+ String mZookeeper = "127.0.0.1:2181";
+ String mBroker = "127.0.0.1:9092";
+
@Before
// setup for each test
public void setUp() throws Exception {
mMqBrokerResource = new MQBrokerResource();
- // insert user's zookeper and broker addresses
- String zookeeper = "127.0.0.1:2181";
- String broker = "127.0.0.1:9092";
-
mTopicPrefix = "mqtestTopic";
- mMqBrokerResource.setKafkaInformation(zookeeper, broker);
+ mMqBrokerResource.setKafkaInformation(mZookeeper, mBroker);
mLatch = new CountDownLatch(1);
mResponse = null; // initialize response packet
mMockDevice = mock(CoapDevice.class);
}).when(mMockDevice).sendResponse(Mockito.anyObject());
}
+ @After
+ public void tearDown() throws Exception {
+ // delete topics in Kafka broker
+ ZkClient zkClient = new ZkClient(mZookeeper, 10000, 10000,
+ ZKStringSerializer$.MODULE$);
+ ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(mZookeeper),
+ false);
+
+ String topic = MQ_BROKER_URI + "/*";
+ topic = topic.replace('/', '.');
+
+ String[] arr = { "--topic", topic };
+ TopicCommandOptions opts = new TopicCommandOptions(arr);
+ TopicCommand.deleteTopic(zkUtils, opts);
+
+ zkClient.close();
+ zkUtils.close();
+ }
+
@Test
// test topic creation
public void testTopicCreationOnDefaultRequestReceived() throws Exception {
if (latchSubscriber.getCount() == 0) {
assertTrue(methodCheck(resp, ResponseStatus.CONTENT));
assertTrue(hashmapCheck(resp, "message"));
+
+ DeleteTopic(mMockDevice, topic);
}
return resp;
}