delete unused code of EntityHandler in Provider Side.
[platform/upstream/iotivity.git] / cloud / messagequeue / src / main / java / org / iotivity / cloud / mqserver / kafka / KafkaConsumerWrapper.java
1 /*
2  * //******************************************************************
3  * //
4  * // Copyright 2016 Samsung Electronics All Rights Reserved.
5  * //
6  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7  * //
8  * // Licensed under the Apache License, Version 2.0 (the "License");
9  * // you may not use this file except in compliance with the License.
10  * // You may obtain a copy of the License at
11  * //
12  * //      http://www.apache.org/licenses/LICENSE-2.0
13  * //
14  * // Unless required by applicable law or agreed to in writing, software
15  * // distributed under the License is distributed on an "AS IS" BASIS,
16  * // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * // See the License for the specific language governing permissions and
18  * // limitations under the License.
19  * //
20  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21  */
22 package org.iotivity.cloud.mqserver.kafka;
23
24 import java.nio.ByteBuffer;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Properties;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32
33 import kafka.admin.AdminUtils;
34 import kafka.api.FetchRequest;
35 import kafka.api.FetchRequestBuilder;
36 import kafka.api.PartitionOffsetRequestInfo;
37 import kafka.common.TopicAndPartition;
38 import kafka.consumer.Consumer;
39 import kafka.consumer.ConsumerConfig;
40 import kafka.consumer.KafkaStream;
41 import kafka.javaapi.FetchResponse;
42 import kafka.javaapi.OffsetResponse;
43 import kafka.javaapi.consumer.ConsumerConnector;
44 import kafka.javaapi.consumer.SimpleConsumer;
45 import kafka.javaapi.message.ByteBufferMessageSet;
46 import kafka.message.MessageAndMetadata;
47 import kafka.message.MessageAndOffset;
48 import kafka.utils.ZKStringSerializer$;
49 import kafka.utils.ZkUtils;
50
51 import org.I0Itec.zkclient.ZkClient;
52 import org.I0Itec.zkclient.ZkConnection;
53 import org.iotivity.cloud.mqserver.Constants;
54 import org.iotivity.cloud.mqserver.topic.Topic;
55 import org.iotivity.cloud.util.Logger;
56
57 public class KafkaConsumerWrapper {
58
59     private String            mTopicName         = null;
60
61     private String            mZookeeper         = null;
62     private String            mBroker            = null;
63
64     private ZkClient          mZkClient          = null;
65     private ZkUtils           mZkUtils           = null;
66
67     private ConsumerConnector mConsumerConnector = null;
68     private ExecutorService   mConsumerExecutor  = null;
69
70     private Topic             mInternalConsumer  = null;
71
72     private boolean           mConsumerStarted   = false;
73
74     public KafkaConsumerWrapper(String zookeeperAddress, String brokerAddress,
75             Topic consumer) {
76
77         mTopicName = consumer.getName().replace("/", ".");
78
79         mZookeeper = zookeeperAddress;
80         mBroker = brokerAddress;
81         mInternalConsumer = consumer;
82
83         mZkClient = new ZkClient(zookeeperAddress,
84                 Constants.KAFKA_SESSION_TIMEOUT,
85                 Constants.KAFKA_CONNECT_TIMEOUT, ZKStringSerializer$.MODULE$);
86
87         mZkUtils = new ZkUtils(mZkClient, new ZkConnection(zookeeperAddress),
88                 false);
89     }
90
91     public boolean consumerStarted() {
92         return mConsumerStarted;
93     }
94
95     // TODO exception handling
96     public boolean subscribeTopic() {
97
98         Logger.d("kafka subscribeTopic - " + mTopicName);
99
100         if (mConsumerStarted == true) {
101             return true;
102         }
103
104         // remove consumer group info if already exist
105         List<String> subscribers = mZkClient.getChildren(ZkUtils
106                 .ConsumersPath());
107
108         if (subscribers.contains(mTopicName)) {
109             AdminUtils.deleteConsumerGroupInZK(mZkUtils, mTopicName);
110         }
111
112         ConsumerConfig consumerConfig = new ConsumerConfig(
113                 buildPropertiesForSubscribe());
114
115         mConsumerConnector = Consumer
116                 .createJavaConsumerConnector(consumerConfig);
117
118         Map<String, Integer> topicCountMap = new HashMap<>();
119         topicCountMap.put(mTopicName, Constants.KAFKA_CONSUMMER_THREADS);
120
121         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = mConsumerConnector
122                 .createMessageStreams(topicCountMap);
123
124         List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(mTopicName);
125
126         mConsumerExecutor = Executors
127                 .newFixedThreadPool(Constants.KAFKA_CONSUMMER_THREADS);
128
129         for (final KafkaStream<byte[], byte[]> stream : streams) {
130
131             Logger.d("kafka subscribe complete");
132
133             mConsumerExecutor.execute(new Runnable() {
134
135                 public void run() {
136
137                     for (final MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
138
139                         mInternalConsumer.onMessagePublished(messageAndMetadata
140                                 .message());
141                     }
142                 }
143             });
144         }
145
146         mConsumerStarted = true;
147
148         return true;
149     }
150
151     public boolean unsubscribeTopic() {
152
153         Logger.d("kafka unsubscribeTopic - " + mTopicName);
154
155         // remove consumer group info in zookeeper
156         List<String> subscribers = mZkClient.getChildren(ZkUtils
157                 .ConsumersPath());
158
159         if (subscribers.contains(mTopicName)) {
160             AdminUtils.deleteConsumerGroupInZK(mZkUtils, mTopicName);
161         }
162
163         mConsumerExecutor.shutdown();
164         mConsumerConnector.shutdown();
165
166         mConsumerStarted = false;
167
168         return true;
169     }
170
171     public ArrayList<byte[]> getMessages() {
172
173         Logger.d("kafka get all messages - " + mTopicName);
174
175         String brokerHost = mBroker.substring(0, mBroker.indexOf(':'));
176         int brokerPort = Integer.parseInt(mBroker.substring(mBroker
177                 .indexOf(':') + 1));
178
179         Logger.d("host " + brokerHost + ", port " + brokerPort);
180
181         // TODO check options - Timeout: Int, bufferSize: Int
182         SimpleConsumer simpleConsumer = new SimpleConsumer(brokerHost,
183                 brokerPort, 100000, 64 * 1024, mTopicName);
184
185         long lastOffset = getLastOffset(simpleConsumer, mTopicName, 0,
186                 kafka.api.OffsetRequest.EarliestTime(), mTopicName);
187
188         // TODO check option - fetch size
189         FetchRequest req = new FetchRequestBuilder().clientId(mTopicName)
190                 .addFetch(mTopicName, 0, lastOffset, 100000).build();
191
192         FetchResponse fetchResponse = simpleConsumer.fetch(req);
193
194         if (fetchResponse == null || fetchResponse.hasError()) {
195
196             Logger.e("Error fetching data from the Broker");
197             return null;
198         }
199
200         ArrayList<byte[]> initialData = new ArrayList<>();
201
202         ByteBufferMessageSet messageSet = fetchResponse.messageSet(mTopicName,
203                 0);
204
205         if (messageSet != null) {
206             for (MessageAndOffset messageAndOffset : messageSet) {
207
208                 long currentOffset = messageAndOffset.offset();
209                 if (currentOffset < lastOffset) {
210                     Logger.e("Found an old offset: " + currentOffset
211                             + " Expecting: " + lastOffset);
212                     continue;
213                 }
214
215                 lastOffset = messageAndOffset.nextOffset();
216                 ByteBuffer payload = messageAndOffset.message().payload();
217
218                 byte[] bytes = new byte[payload.limit()];
219                 payload.get(bytes);
220
221                 initialData.add(bytes);
222             }
223         }
224
225         simpleConsumer.close();
226
227         Logger.d("kafka get all messages complete");
228
229         return initialData;
230     }
231
232     private Properties buildPropertiesForSubscribe() {
233
234         // TODO check property settings
235         Properties props = new Properties();
236
237         props.put("group.id", mTopicName);
238         props.put("zookeeper.connect", mZookeeper);
239         props.put("enable.auto.commit", "true");
240         props.put("auto.commit.interval.ms", Constants.KAFKA_COMMIT_INTERVAL);
241
242         return props;
243     }
244
245     private long getLastOffset(SimpleConsumer consumer, String topic,
246             int partition, long whichTime, String clientName) {
247
248         TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
249                 partition);
250
251         Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
252         requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
253                 whichTime, 1));
254
255         kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
256                 requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
257                 clientName);
258
259         OffsetResponse response = consumer.getOffsetsBefore(request);
260
261         if (response == null || response.hasError()) {
262             Logger.e("Error fetching data Offset Data the Broker");
263             return 0;
264         }
265
266         long[] offsets = response.offsets(topic, partition);
267         return offsets[0];
268     }
269
270 }