Imported Upstream version 1.2.0
[platform/upstream/iotivity.git] / cloud / messagequeue / src / main / java / org / iotivity / cloud / mqserver / topic / TopicManager.java
1 /*
2  * //******************************************************************
3  * //
4  * // Copyright 2016 Samsung Electronics All Rights Reserved.
5  * //
6  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7  * //
8  * // Licensed under the Apache License, Version 2.0 (the "License");
9  * // you may not use this file except in compliance with the License.
10  * // You may obtain a copy of the License at
11  * //
12  * //      http://www.apache.org/licenses/LICENSE-2.0
13  * //
14  * // Unless required by applicable law or agreed to in writing, software
15  * // distributed under the License is distributed on an "AS IS" BASIS,
16  * // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * // See the License for the specific language governing permissions and
18  * // limitations under the License.
19  * //
20  * //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21  */
22 package org.iotivity.cloud.mqserver.topic;
23
24 import java.util.ArrayList;
25
26 import org.iotivity.cloud.mqserver.kafka.KafkaCommonWrapper;
27
28 /**
29  *
30  * This class provides a set of APIs to manage topics in MessageQueue Broker
31  *
32  */
33 public class TopicManager {
34
35     private ArrayList<Topic>   mTopics              = new ArrayList<>();
36
37     // for Kafka
38     String                     mKafkaZookeeper      = null;
39     String                     mKafkaBroker         = null;
40
41     private KafkaCommonWrapper mKafkaCommonOperator = null;
42
43     /**
44      * API to create topic
45      * 
46      * @param topic
47      *            topic to create
48      * 
49      * @return returns true if the topic successfully created, otherwise false
50      */
51     public boolean createTopic(Topic topic) {
52
53         if (mKafkaCommonOperator.createTopic(topic.getName()) == false) {
54             return false;
55         }
56
57         synchronized (mTopics) {
58             mTopics.add(topic);
59         }
60
61         return true;
62     }
63
64     /**
65      * API to remove topic
66      * 
67      * @param topic
68      *            topic to remove
69      * 
70      * @return returns true if the topic successfully removed, otherwise false
71      */
72     public boolean removeTopic(Topic topic) {
73
74         return removeTopics(topic.getName());
75     }
76
77     /**
78      * API to get list of topics
79      * 
80      * @return returns list of topic uris
81      */
82     public ArrayList<String> getTopicList() {
83
84         ArrayList<String> topicList = new ArrayList<>();
85
86         synchronized (mTopics) {
87             for (Topic topic : mTopics) {
88                 topicList.add(topic.getName());
89             }
90         }
91
92         return topicList;
93     }
94
95     /**
96      * API to get list of topics with specific topic type
97      * 
98      * @param type
99      *            topic type
100      * 
101      * @return returns list of topic uris searched with the topic type
102      */
103     public ArrayList<String> getTopicListByType(String type) {
104
105         ArrayList<String> topicList = new ArrayList<>();
106
107         synchronized (mTopics) {
108
109             for (Topic topic : mTopics) {
110                 if (type.equals(topic.getType())) {
111                     topicList.add(topic.getName());
112                 }
113             }
114         }
115
116         return topicList;
117     }
118
119     /**
120      * API to get topic with topic name
121      * 
122      * @param topicName
123      *            topic name to search
124      * 
125      * @return topic searched with the topic name
126      */
127     public Topic getTopic(String topicName) {
128
129         Topic foundTopic = null;
130
131         synchronized (mTopics) {
132
133             for (Topic topic : mTopics) {
134                 if (topicName.equals(topic.getName())) {
135                     foundTopic = topic;
136                     break;
137                 }
138             }
139         }
140
141         return foundTopic;
142     }
143
144     /**
145      * API to set Kafka zookeeper and broker information
146      * 
147      * @param zookeeper
148      *            address and port number of the zookeeper
149      * @param broker
150      *            address and port number of the Kafka broker
151      */
152     public void setKafkaInformation(String zookeeper, String broker) {
153         mKafkaZookeeper = zookeeper;
154         mKafkaBroker = broker;
155
156         mKafkaCommonOperator = new KafkaCommonWrapper(zookeeper, broker);
157     }
158
159     /**
160      * API to get zookeeper information
161      * 
162      * @return address and port number of the zookeeper
163      */
164     public String getKafkaZookeeper() {
165         return mKafkaZookeeper;
166     }
167
168     /**
169      * API to get Kafka broker information
170      * 
171      * @return address and port number of the Kafka broker
172      */
173     public String getKafkaBroker() {
174         return mKafkaBroker;
175     }
176
177     private boolean removeTopics(String topicName) {
178
179         synchronized (mTopics) {
180
181             for (Topic topic : new ArrayList<>(mTopics)) {
182
183                 if (topic.getName().startsWith(topicName)) {
184
185                     if (mKafkaCommonOperator.deleteTopic(topic.getName()) == false) {
186                         return false;
187                     }
188
189                     mTopics.remove(topic);
190                 }
191             }
192         }
193
194         return true;
195     }
196
197 }