2 * //******************************************************************
4 * // Copyright 2016 Samsung Electronics All Rights Reserved.
6 * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
8 * // Licensed under the Apache License, Version 2.0 (the "License");
9 * // you may not use this file except in compliance with the License.
10 * // You may obtain a copy of the License at
12 * // http://www.apache.org/licenses/LICENSE-2.0
14 * // Unless required by applicable law or agreed to in writing, software
15 * // distributed under the License is distributed on an "AS IS" BASIS,
16 * // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * // See the License for the specific language governing permissions and
18 * // limitations under the License.
20 * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
22 package org.iotivity.cloud.mqserver.kafka;
24 import java.nio.ByteBuffer;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.Properties;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
33 import kafka.admin.AdminUtils;
34 import kafka.api.FetchRequest;
35 import kafka.api.FetchRequestBuilder;
36 import kafka.api.PartitionOffsetRequestInfo;
37 import kafka.common.TopicAndPartition;
38 import kafka.consumer.Consumer;
39 import kafka.consumer.ConsumerConfig;
40 import kafka.consumer.KafkaStream;
41 import kafka.javaapi.FetchResponse;
42 import kafka.javaapi.OffsetResponse;
43 import kafka.javaapi.consumer.ConsumerConnector;
44 import kafka.javaapi.consumer.SimpleConsumer;
45 import kafka.javaapi.message.ByteBufferMessageSet;
46 import kafka.message.MessageAndMetadata;
47 import kafka.message.MessageAndOffset;
48 import kafka.utils.ZKStringSerializer$;
49 import kafka.utils.ZkUtils;
51 import org.I0Itec.zkclient.ZkClient;
52 import org.I0Itec.zkclient.ZkConnection;
53 import org.iotivity.cloud.mqserver.Constants;
54 import org.iotivity.cloud.mqserver.topic.Topic;
55 import org.iotivity.cloud.util.Logger;
57 public class KafkaConsumerWrapper {
59 private String mTopicName = null;
61 private String mZookeeper = null;
62 private String mBroker = null;
64 private ZkClient mZkClient = null;
65 private ZkUtils mZkUtils = null;
67 private ConsumerConnector mConsumerConnector = null;
68 private ExecutorService mConsumerExecutor = null;
70 private Topic mInternalConsumer = null;
72 private boolean mConsumerStarted = false;
74 public KafkaConsumerWrapper(String zookeeperAddress, String brokerAddress,
77 mTopicName = consumer.getName().replace("/", ".");
79 mZookeeper = zookeeperAddress;
80 mBroker = brokerAddress;
81 mInternalConsumer = consumer;
83 mZkClient = new ZkClient(zookeeperAddress,
84 Constants.KAFKA_SESSION_TIMEOUT,
85 Constants.KAFKA_CONNECT_TIMEOUT, ZKStringSerializer$.MODULE$);
87 mZkUtils = new ZkUtils(mZkClient, new ZkConnection(zookeeperAddress),
91 public boolean consumerStarted() {
92 return mConsumerStarted;
95 // TODO exception handling
96 public boolean subscribeTopic() {
98 Logger.d("kafka subscribeTopic - " + mTopicName);
100 if (mConsumerStarted == true) {
104 // remove consumer group info if already exist
105 List<String> subscribers = mZkClient.getChildren(ZkUtils
108 if (subscribers.contains(mTopicName)) {
109 AdminUtils.deleteConsumerGroupInZK(mZkUtils, mTopicName);
112 ConsumerConfig consumerConfig = new ConsumerConfig(
113 buildPropertiesForSubscribe());
115 mConsumerConnector = Consumer
116 .createJavaConsumerConnector(consumerConfig);
118 Map<String, Integer> topicCountMap = new HashMap<>();
119 topicCountMap.put(mTopicName, Constants.KAFKA_CONSUMMER_THREADS);
121 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = mConsumerConnector
122 .createMessageStreams(topicCountMap);
124 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(mTopicName);
126 mConsumerExecutor = Executors
127 .newFixedThreadPool(Constants.KAFKA_CONSUMMER_THREADS);
129 for (final KafkaStream<byte[], byte[]> stream : streams) {
131 Logger.d("kafka subscribe complete");
133 mConsumerExecutor.execute(new Runnable() {
137 for (final MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
139 mInternalConsumer.onMessagePublished(messageAndMetadata
146 mConsumerStarted = true;
151 public boolean unsubscribeTopic() {
153 Logger.d("kafka unsubscribeTopic - " + mTopicName);
155 // remove consumer group info in zookeeper
156 List<String> subscribers = mZkClient.getChildren(ZkUtils
159 if (subscribers.contains(mTopicName)) {
160 AdminUtils.deleteConsumerGroupInZK(mZkUtils, mTopicName);
163 mConsumerExecutor.shutdown();
164 mConsumerConnector.shutdown();
166 mConsumerStarted = false;
171 public ArrayList<byte[]> getMessages() {
173 Logger.d("kafka get all messages - " + mTopicName);
175 String brokerHost = mBroker.substring(0, mBroker.indexOf(':'));
176 int brokerPort = Integer.parseInt(mBroker.substring(mBroker
179 Logger.d("host " + brokerHost + ", port " + brokerPort);
181 // TODO check options - Timeout: Int, bufferSize: Int
182 SimpleConsumer simpleConsumer = new SimpleConsumer(brokerHost,
183 brokerPort, 100000, 64 * 1024, mTopicName);
185 long lastOffset = getLastOffset(simpleConsumer, mTopicName, 0,
186 kafka.api.OffsetRequest.EarliestTime(), mTopicName);
188 // TODO check option - fetch size
189 FetchRequest req = new FetchRequestBuilder().clientId(mTopicName)
190 .addFetch(mTopicName, 0, lastOffset, 100000).build();
192 FetchResponse fetchResponse = simpleConsumer.fetch(req);
194 if (fetchResponse == null || fetchResponse.hasError()) {
196 Logger.e("Error fetching data from the Broker");
200 ArrayList<byte[]> initialData = new ArrayList<>();
202 ByteBufferMessageSet messageSet = fetchResponse.messageSet(mTopicName,
205 if (messageSet != null) {
206 for (MessageAndOffset messageAndOffset : messageSet) {
208 long currentOffset = messageAndOffset.offset();
209 if (currentOffset < lastOffset) {
210 Logger.e("Found an old offset: " + currentOffset
211 + " Expecting: " + lastOffset);
215 lastOffset = messageAndOffset.nextOffset();
216 ByteBuffer payload = messageAndOffset.message().payload();
218 byte[] bytes = new byte[payload.limit()];
221 initialData.add(bytes);
225 simpleConsumer.close();
227 Logger.d("kafka get all messages complete");
232 private Properties buildPropertiesForSubscribe() {
234 // TODO check property settings
235 Properties props = new Properties();
237 props.put("group.id", mTopicName);
238 props.put("zookeeper.connect", mZookeeper);
239 props.put("enable.auto.commit", "true");
240 props.put("auto.commit.interval.ms", Constants.KAFKA_COMMIT_INTERVAL);
245 private long getLastOffset(SimpleConsumer consumer, String topic,
246 int partition, long whichTime, String clientName) {
248 TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
251 Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
252 requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
255 kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
256 requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
259 OffsetResponse response = consumer.getOffsetsBefore(request);
261 if (response == null || response.hasError()) {
262 Logger.e("Error fetching data Offset Data the Broker");
266 long[] offsets = response.offsets(topic, partition);