2 * //******************************************************************
4 * // Copyright 2016 Samsung Electronics All Rights Reserved.
6 * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
8 * // Licensed under the Apache License, Version 2.0 (the "License");
9 * // you may not use this file except in compliance with the License.
10 * // You may obtain a copy of the License at
12 * // http://www.apache.org/licenses/LICENSE-2.0
14 * // Unless required by applicable law or agreed to in writing, software
15 * // distributed under the License is distributed on an "AS IS" BASIS,
16 * // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * // See the License for the specific language governing permissions and
18 * // limitations under the License.
20 * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
22 package org.iotivity.cloud.mqserver.topic;
24 import java.util.ArrayList;
26 import org.iotivity.cloud.mqserver.kafka.KafkaCommonWrapper;
30 * This class provides a set of APIs to manage topics in MessageQueue Broker
33 public class TopicManager {
35 private ArrayList<Topic> mTopics = new ArrayList<>();
38 String mKafkaZookeeper = null;
39 String mKafkaBroker = null;
41 private KafkaCommonWrapper mKafkaCommonOperator = null;
49 * @return returns true if the topic successfully created, otherwise false
51 public boolean createTopic(Topic topic) {
53 if (mKafkaCommonOperator.createTopic(topic.getName()) == false) {
57 synchronized (mTopics) {
70 * @return returns true if the topic successfully removed, otherwise false
72 public boolean removeTopic(Topic topic) {
74 return removeTopics(topic.getName());
78 * API to get list of topics
80 * @return returns list of topic uris
82 public ArrayList<String> getTopicList() {
84 ArrayList<String> topicList = new ArrayList<>();
86 synchronized (mTopics) {
87 for (Topic topic : mTopics) {
88 topicList.add(topic.getName());
96 * API to get list of topics with specific topic type
101 * @return returns list of topic uris searched with the topic type
103 public ArrayList<String> getTopicListByType(String type) {
105 ArrayList<String> topicList = new ArrayList<>();
107 synchronized (mTopics) {
109 for (Topic topic : mTopics) {
110 if (type.equals(topic.getType())) {
111 topicList.add(topic.getName());
120 * API to get topic with topic name
123 * topic name to search
125 * @return topic searched with the topic name
127 public Topic getTopic(String topicName) {
129 Topic foundTopic = null;
131 synchronized (mTopics) {
133 for (Topic topic : mTopics) {
134 if (topicName.equals(topic.getName())) {
145 * API to set Kafka zookeeper and broker information
148 * address and port number of the zookeeper
150 * address and port number of the Kafka broker
152 public void setKafkaInformation(String zookeeper, String broker) {
153 mKafkaZookeeper = zookeeper;
154 mKafkaBroker = broker;
156 mKafkaCommonOperator = new KafkaCommonWrapper(zookeeper, broker);
160 * API to get zookeeper information
162 * @return address and port number of the zookeeper
164 public String getKafkaZookeeper() {
165 return mKafkaZookeeper;
169 * API to get Kafka broker information
171 * @return address and port number of the Kafka broker
173 public String getKafkaBroker() {
177 private boolean removeTopics(String topicName) {
179 synchronized (mTopics) {
181 for (Topic topic : new ArrayList<>(mTopics)) {
183 if (topic.getName().startsWith(topicName)) {
185 if (mKafkaCommonOperator.deleteTopic(topic.getName()) == false) {
189 mTopics.remove(topic);