f46ae829bcef706217ab28e19a210d1569236912
[platform/core/ml/aitt.git] / android / aitt / src / main / java / com / samsung / android / aitt / stream / RTSPStream.java
1 /*
2  * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.samsung.android.aitt.stream;
17
18 import android.util.Log;
19
20 import com.google.flatbuffers.FlexBuffers;
21 import com.google.flatbuffers.FlexBuffersBuilder;
22 import com.samsung.android.aitt.internal.Definitions;
23 import com.samsung.android.aittnative.JniInterface;
24 import com.samsung.android.modules.rtsp.RTSPClient;
25
26 import java.nio.ByteBuffer;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.concurrent.atomic.AtomicBoolean;
30
31 /**
32  * Class to implement RTSPStream functionalities
33  */
34 public class RTSPStream implements AittStream {
35
36     private static final String TAG = "RTSPStream";
37     private static final String SERVER_STATE = "server_state";
38     private static final String URL = "url";
39     private static final String ID = "id";
40     private static final String PASSWORD = "password";
41     private static final String URL_PREFIX = "rtsp://";
42     private static final String HEIGHT = "height";
43     private static final String WIDTH = "width";
44
45     private final StreamRole streamRole;
46     private final String topic;
47     private static int streamHeight;
48     private static int streamWidth;
49     private final DiscoveryInfo discoveryInfo = new DiscoveryInfo();
50     private RTSPClient rtspClient;
51     private StreamDataCallback streamCallback;
52     private StreamStateChangeCallback stateChangeCallback = null;
53     private JniInterface jniInterface = null;
54     private StreamState serverState = StreamState.INIT;
55     private StreamState clientState = StreamState.INIT;
56
57     /**
58      * RTSPStream constructor
59      * @param topic Topic to which streaming is invoked
60      * @param streamRole Role of the RTSPStream object
61      */
62     private RTSPStream(String topic, StreamRole streamRole) {
63         this.topic = topic;
64         this.streamRole = streamRole;
65
66         if (streamRole == StreamRole.SUBSCRIBER) {
67             RTSPClient.ReceiveDataCallback dataCallback = frame -> {
68                 if (streamCallback != null)
69                     streamCallback.pushStreamData(frame);
70             };
71
72             rtspClient = new RTSPClient(new AtomicBoolean(false), dataCallback);
73         }
74     }
75
76     /**
77      * Class to store discovery parameters in an object
78      */
79     private static class DiscoveryInfo {
80         private String url;
81         private String id;
82         private String password;
83         private int height;
84         private int width;
85     }
86
87     /**
88      * Create and return RTSPStream object for subscriber role
89      * @param topic  Topic to which Subscribe role is set
90      * @param streamRole Role of the RTSPStream object
91      * @return RTSPStream object
92      */
93     public static RTSPStream createSubscriberStream(String topic, StreamRole streamRole) {
94         if (streamRole != StreamRole.SUBSCRIBER)
95             throw new IllegalArgumentException("The role of this stream is not subscriber.");
96
97         return new RTSPStream(topic, streamRole);
98     }
99
100     /**
101      * Create and return RTSPStream object for publisher role
102      * @param topic  Topic to which Publisher role is set
103      * @param streamRole Role of the RTSPStream object
104      * @return RTSPStream object
105      */
106     public static RTSPStream createPublisherStream(String topic, StreamRole streamRole) {
107         if (streamRole != StreamRole.PUBLISHER)
108             throw new IllegalArgumentException("The role of this stream is not publisher.");
109
110         return new RTSPStream(topic, streamRole);
111     }
112
113     /**
114      * Method to set configuration
115      * @param config AittStreamConfig object
116      */
117     @Override
118     public void setConfig(AittStreamConfig config) {
119         if (config == null)
120             throw new IllegalArgumentException("Invalid configuration");
121
122         if (config.getUrl() != null) {
123             String url = config.getUrl();
124             if (!url.startsWith(URL_PREFIX))
125                 throw new IllegalArgumentException("Invalid RTSP URL");
126
127             discoveryInfo.url = config.getUrl();
128         }
129         if (config.getId() != null) {
130             discoveryInfo.id = config.getId();
131         }
132         if (config.getPassword() != null) {
133             discoveryInfo.password = config.getPassword();
134         }
135
136         discoveryInfo.height = config.getHeight();
137
138         discoveryInfo.width = config.getWidth();
139     }
140
141     /**
142      * Method to start stream
143      */
144     @Override
145     public void start() {
146         if (streamRole == StreamRole.SUBSCRIBER) {
147             if (serverState == StreamState.READY) {
148                 startRtspClient();
149             } else {
150                 Log.d(TAG, "RTSP server not yet ready");
151                 updateState(streamRole, StreamState.READY);
152             }
153         } else {
154             updateState(streamRole, StreamState.READY);
155             updateDiscoveryMessage();
156         }
157     }
158
159     /**
160      * Method to publish to a topic
161      * @param topic String topic to which data is published
162      * @param ip Ip of the receiver
163      * @param port Port of the receiver
164      * @param message Data to be published
165      * @return returns status
166      */
167     @Override
168     public boolean publish(String topic, String ip, int port, byte[] message) {
169         // TODO: implement this function.
170         return true;
171     }
172
173     /**
174      * Method to disconnect from the broker
175      */
176     @Override
177     public void disconnect() {
178         //ToDo : disconnect and stop can be merged
179         if (jniInterface != null)
180             jniInterface.removeDiscoveryCallback(topic);
181     }
182
183     /**
184      * Method to stop the stream
185      */
186     @Override
187     public void stop() {
188         if(streamRole == StreamRole.SUBSCRIBER) {
189             if (clientState == StreamState.PLAYING)
190                 rtspClient.stop();
191             updateState(streamRole, StreamState.INIT);
192         } else {
193             updateState(streamRole, StreamState.INIT);
194             updateDiscoveryMessage();
195         }
196     }
197
198     /**
199      * Method to pause the stream
200      */
201     public void pause() {
202         // TODO: implement this function.
203     }
204
205     /**
206      * Method to record the stream
207      */
208     public void record() {
209         // TODO: implement this function.
210     }
211
212     /**
213      * Method to set state callback
214      */
215     @Override
216     public void setStateCallback(StreamStateChangeCallback callback) {
217         this.stateChangeCallback = callback;
218     }
219
220     /**
221      * Method to set subscribe callback
222      * @param streamDataCallback subscribe callback object
223      */
224     @Override
225     public void setReceiveCallback(StreamDataCallback streamDataCallback) {
226         if(streamRole == StreamRole.SUBSCRIBER)
227             streamCallback = streamDataCallback;
228         else
229             throw new IllegalArgumentException("The role of this stream is not subscriber");
230     }
231
232     /**
233      * Method to receive stream height
234      * @return returns height of the stream
235      */
236     @Override
237     public int getStreamHeight() {
238         return streamHeight;
239     }
240
241     /**
242      * Method to receive stream width
243      * @return returns width of the stream
244      */
245     @Override
246     public int getStreamWidth() {
247         return streamWidth;
248     }
249
250     /**
251      * Method to set subscribe callback
252      * @param jniInterface JniInterface object
253      */
254     public void setJNIInterface(JniInterface jniInterface) {
255         this.jniInterface = jniInterface;
256
257         jniInterface.setDiscoveryCallback(topic, (status, data) -> {
258             Log.d(TAG, "Received discovery callback");
259             if (streamRole == StreamRole.PUBLISHER)
260                 return;
261
262             if (status.compareTo(Definitions.WILL_LEAVE_NETWORK) == 0) {
263                 if (clientState == StreamState.PLAYING) {
264                     rtspClient.stop();
265                     updateState(streamRole, StreamState.READY);
266                 } else {
267                     updateState(streamRole, StreamState.INIT);
268                 }
269                 return;
270             }
271
272             ByteBuffer buffer = ByteBuffer.wrap(data);
273             FlexBuffers.Map map = FlexBuffers.getRoot(buffer).asMap();
274             if (map.size() != 6) {
275                 Log.e(TAG, "Invalid RTSP discovery message");
276                 return;
277             }
278
279             StreamState state = StreamState.values()[map.get(SERVER_STATE).asInt()];
280             updateState(StreamRole.PUBLISHER, state);
281             discoveryInfo.url = map.get(URL).asString();
282             discoveryInfo.id = map.get(ID).asString();
283             discoveryInfo.password = map.get(PASSWORD).asString();
284             discoveryInfo.height = map.get(HEIGHT).asInt();
285             discoveryInfo.width = map.get(WIDTH).asInt();
286
287             streamHeight = discoveryInfo.height;
288             streamWidth = discoveryInfo.width;
289
290             String url = createCompleteUrl();
291             Log.d(TAG, "RTSP URL : " + url);
292             if (url.isEmpty()) {
293                 return;
294             }
295             rtspClient.setRtspUrl(url);
296             rtspClient.setResolution(discoveryInfo.height, discoveryInfo.width);
297
298             if (serverState == StreamState.READY) {
299                 if (clientState == StreamState.READY) {
300                     startRtspClient();
301                 }
302             } else if (serverState == StreamState.INIT) {
303                 if (clientState == StreamState.PLAYING) {
304                     rtspClient.stop();
305                     updateState(streamRole, StreamState.READY);
306                 }
307             }
308         });
309     }
310
311     private String createCompleteUrl() {
312         String completeUrl = discoveryInfo.url;
313         if (completeUrl == null || completeUrl.isEmpty())
314             return "";
315
316         String id = discoveryInfo.id;
317         if (id == null || id.isEmpty())
318             return completeUrl;
319
320         String password = discoveryInfo.password;
321         if (password == null || password.isEmpty())
322             return completeUrl;
323
324         completeUrl = new StringBuilder(completeUrl).insert(URL_PREFIX.length(), id + ":" + password + "@").toString();
325         return completeUrl;
326     }
327
328     private void startRtspClient() {
329         RTSPClient.SocketConnectCallback cb = socketSuccess -> {
330             if (socketSuccess) {
331                 updateState(streamRole, StreamState.PLAYING);
332                 rtspClient.initRtspClient();
333                 rtspClient.start();
334             } else {
335                 Log.e(TAG, "Error creating socket");
336             }
337         };
338
339         rtspClient.createClientSocket(cb);
340     }
341
342     private void updateDiscoveryMessage() {
343         FlexBuffersBuilder fbb = new FlexBuffersBuilder(ByteBuffer.allocate(512));
344         {
345             int smap = fbb.startMap();
346             fbb.putInt(SERVER_STATE, serverState.ordinal());
347             fbb.putString(URL, discoveryInfo.url);
348             fbb.putString(ID, discoveryInfo.id);
349             fbb.putString(PASSWORD, discoveryInfo.password);
350             fbb.putInt(HEIGHT, discoveryInfo.height);
351             fbb.putInt(WIDTH, discoveryInfo.width);
352             fbb.endMap(null, smap);
353         }
354         ByteBuffer buffer = fbb.finish();
355         byte[] data = new byte[buffer.remaining()];
356         buffer.get(data, 0, data.length);
357
358         if (jniInterface != null)
359             jniInterface.updateDiscoveryMessage(topic, data);
360     }
361
362     private void updateState(StreamRole role, StreamState state) {
363         if (role == StreamRole.PUBLISHER)
364             serverState = state;
365         else
366             clientState = state;
367
368         if (role == streamRole && stateChangeCallback != null)
369             stateChangeCallback.pushStataChange(state);
370     }
371 }