2 * //******************************************************************
4 * // Copyright 2016 Samsung Electronics All Rights Reserved.
6 * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
8 * // Licensed under the Apache License, Version 2.0 (the "License");
9 * // you may not use this file except in compliance with the License.
10 * // You may obtain a copy of the License at
12 * // http://www.apache.org/licenses/LICENSE-2.0
14 * // Unless required by applicable law or agreed to in writing, software
15 * // distributed under the License is distributed on an "AS IS" BASIS,
16 * // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * // See the License for the specific language governing permissions and
18 * // limitations under the License.
20 * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
22 package org.iotivity.cloud.mqserver.kafka;
24 import java.nio.ByteBuffer;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.Properties;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
33 import kafka.admin.AdminUtils;
34 import kafka.api.FetchRequest;
35 import kafka.api.FetchRequestBuilder;
36 import kafka.api.PartitionOffsetRequestInfo;
37 import kafka.common.TopicAndPartition;
38 import kafka.consumer.Consumer;
39 import kafka.consumer.ConsumerConfig;
40 import kafka.consumer.KafkaStream;
41 import kafka.javaapi.FetchResponse;
42 import kafka.javaapi.OffsetResponse;
43 import kafka.javaapi.consumer.ConsumerConnector;
44 import kafka.javaapi.consumer.SimpleConsumer;
45 import kafka.javaapi.message.ByteBufferMessageSet;
46 import kafka.message.MessageAndMetadata;
47 import kafka.message.MessageAndOffset;
48 import kafka.utils.ZKStringSerializer$;
49 import kafka.utils.ZkUtils;
51 import org.I0Itec.zkclient.ZkClient;
52 import org.I0Itec.zkclient.ZkConnection;
53 import org.iotivity.cloud.mqserver.Constants;
54 import org.iotivity.cloud.mqserver.topic.Topic;
55 import org.iotivity.cloud.util.Log;
59 * This class provides a set of APIs to use Kafka consumer APIs for receiving
63 public class KafkaConsumerWrapper {
65 private String mTopicName = null;
67 private String mZookeeper = null;
68 private String mBroker = null;
70 private ZkClient mZkClient = null;
71 private ZkUtils mZkUtils = null;
73 private ConsumerConnector mConsumerConnector = null;
74 private ExecutorService mConsumerExecutor = null;
76 private Topic mInternalConsumer = null;
78 private boolean mConsumerStarted = false;
80 public KafkaConsumerWrapper(String zookeeperAddress, String brokerAddress,
83 mTopicName = consumer.getName().replace("/", ".");
85 mZookeeper = zookeeperAddress;
86 mBroker = brokerAddress;
87 mInternalConsumer = consumer;
89 mZkClient = new ZkClient(zookeeperAddress,
90 Constants.KAFKA_SESSION_TIMEOUT,
91 Constants.KAFKA_CONNECT_TIMEOUT, ZKStringSerializer$.MODULE$);
93 mZkUtils = new ZkUtils(mZkClient, new ZkConnection(zookeeperAddress),
98 * API to check if Kafka consumer is started
100 * @return returns true if Kafka consumer started, otherwise false
102 public boolean consumerStarted() {
103 return mConsumerStarted;
107 * API to subscribe Kafka topic to receive messages
109 * @return returns true if the topic is successfully subscribed, otherwise
112 public boolean subscribeTopic() {
114 Log.d("kafka subscribeTopic - " + mTopicName);
116 if (mConsumerStarted == true) {
120 // remove consumer group info if already exist
121 List<String> subscribers = mZkClient.getChildren(ZkUtils
124 if (subscribers.contains(mTopicName)) {
125 AdminUtils.deleteConsumerGroupInZK(mZkUtils, mTopicName);
128 ConsumerConfig consumerConfig = new ConsumerConfig(
129 buildPropertiesForSubscribe());
131 mConsumerConnector = Consumer
132 .createJavaConsumerConnector(consumerConfig);
134 Map<String, Integer> topicCountMap = new HashMap<>();
135 topicCountMap.put(mTopicName, Constants.KAFKA_CONSUMMER_THREADS);
137 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = mConsumerConnector
138 .createMessageStreams(topicCountMap);
140 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(mTopicName);
142 mConsumerExecutor = Executors
143 .newFixedThreadPool(Constants.KAFKA_CONSUMMER_THREADS);
145 for (final KafkaStream<byte[], byte[]> stream : streams) {
147 Log.d("kafka subscribe complete");
149 mConsumerExecutor.execute(new Runnable() {
153 for (final MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
155 mInternalConsumer.onMessagePublished(messageAndMetadata
162 mConsumerStarted = true;
168 * API to unsubscribe Kafka topic to stop receiving messages
170 * @return returns true if the topic is successfully unsubscribed, otherwise
173 public boolean unsubscribeTopic() {
175 Log.d("kafka unsubscribeTopic - " + mTopicName);
177 // remove consumer group info in zookeeper
178 List<String> subscribers = mZkClient.getChildren(ZkUtils
181 if (subscribers.contains(mTopicName)) {
182 AdminUtils.deleteConsumerGroupInZK(mZkUtils, mTopicName);
185 mConsumerExecutor.shutdown();
186 mConsumerConnector.shutdown();
188 mConsumerStarted = false;
194 * API to close Kafka consumer connection
196 public void closeConnection() {
198 if (mConsumerStarted == true) {
207 * API to get all messages from Kafka topic
209 * @return returns the list of messages published to the topic
211 public ArrayList<byte[]> getMessages() {
213 Log.d("kafka get all messages - " + mTopicName);
215 String brokerHost = mBroker.substring(0, mBroker.indexOf(':'));
216 int brokerPort = Integer.parseInt(mBroker.substring(mBroker
219 Log.d("host " + brokerHost + ", port " + brokerPort);
221 // TODO check options - Timeout: Int, bufferSize: Int
222 SimpleConsumer simpleConsumer = new SimpleConsumer(brokerHost,
223 brokerPort, 100000, 64 * 1024, mTopicName);
225 long lastOffset = getLastOffset(simpleConsumer, mTopicName, 0,
226 kafka.api.OffsetRequest.EarliestTime(), mTopicName);
228 // TODO check option - fetch size
229 FetchRequest req = new FetchRequestBuilder().clientId(mTopicName)
230 .addFetch(mTopicName, 0, lastOffset, 100000).build();
232 FetchResponse fetchResponse = simpleConsumer.fetch(req);
234 if (fetchResponse == null || fetchResponse.hasError()) {
236 Log.e("Error fetching data from the Broker");
240 ArrayList<byte[]> initialData = new ArrayList<>();
242 ByteBufferMessageSet messageSet = fetchResponse.messageSet(mTopicName,
245 if (messageSet != null) {
246 for (MessageAndOffset messageAndOffset : messageSet) {
248 long currentOffset = messageAndOffset.offset();
249 if (currentOffset < lastOffset) {
250 Log.e("Found an old offset: " + currentOffset
251 + " Expecting: " + lastOffset);
255 lastOffset = messageAndOffset.nextOffset();
256 ByteBuffer payload = messageAndOffset.message().payload();
258 byte[] bytes = new byte[payload.limit()];
261 initialData.add(bytes);
265 simpleConsumer.close();
267 Log.d("kafka get all messages complete");
272 private Properties buildPropertiesForSubscribe() {
274 // TODO check property settings
275 Properties props = new Properties();
277 props.put("group.id", mTopicName);
278 props.put("zookeeper.connect", mZookeeper);
279 props.put("enable.auto.commit", "true");
280 props.put("auto.commit.interval.ms", Constants.KAFKA_COMMIT_INTERVAL);
285 private long getLastOffset(SimpleConsumer consumer, String topic,
286 int partition, long whichTime, String clientName) {
288 TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
291 Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
292 requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
295 kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
296 requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
299 OffsetResponse response = consumer.getOffsetsBefore(request);
301 if (response == null || response.hasError()) {
302 Log.e("Error fetching data Offset Data the Broker");
306 long[] offsets = response.offsets(topic, partition);