2 * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package com.samsung.android.aitt.stream;
18 import android.util.Log;
20 import com.google.flatbuffers.FlexBuffers;
21 import com.google.flatbuffers.FlexBuffersBuilder;
22 import com.samsung.android.aitt.internal.Definitions;
23 import com.samsung.android.aittnative.JniInterface;
24 import com.samsung.android.modules.rtsp.RTSPClient;
26 import java.nio.ByteBuffer;
27 import java.util.HashMap;
29 import java.util.concurrent.atomic.AtomicBoolean;
32 * Class to implement RTSPStream functionalities
34 public class RTSPStream implements AittStream {
36 private static final String TAG = "RTSPStream";
37 private static final String SERVER_STATE = "server_state";
38 private static final String URL = "url";
39 private static final String ID = "id";
40 private static final String PASSWORD = "password";
41 private static final String URL_PREFIX = "rtsp://";
42 private static final String HEIGHT = "height";
43 private static final String WIDTH = "width";
45 private final StreamRole streamRole;
46 private final String topic;
47 private static int streamHeight;
48 private static int streamWidth;
49 private final DiscoveryInfo discoveryInfo = new DiscoveryInfo();
50 private RTSPClient rtspClient;
51 private StreamDataCallback streamCallback;
52 private StreamStateChangeCallback stateChangeCallback = null;
53 private JniInterface jniInterface = null;
54 private StreamState serverState = StreamState.INIT;
55 private StreamState clientState = StreamState.INIT;
58 * RTSPStream constructor
59 * @param topic Topic to which streaming is invoked
60 * @param streamRole Role of the RTSPStream object
62 private RTSPStream(String topic, StreamRole streamRole) {
64 this.streamRole = streamRole;
66 if (streamRole == StreamRole.SUBSCRIBER) {
67 RTSPClient.ReceiveDataCallback dataCallback = frame -> {
68 if (streamCallback != null)
69 streamCallback.pushStreamData(frame);
72 rtspClient = new RTSPClient(new AtomicBoolean(false), dataCallback);
77 * Class to store discovery parameters in an object
79 private static class DiscoveryInfo {
82 private String password;
88 * Create and return RTSPStream object for subscriber role
89 * @param topic Topic to which Subscribe role is set
90 * @param streamRole Role of the RTSPStream object
91 * @return RTSPStream object
93 public static RTSPStream createSubscriberStream(String topic, StreamRole streamRole) {
94 if (streamRole != StreamRole.SUBSCRIBER)
95 throw new IllegalArgumentException("The role of this stream is not subscriber.");
97 return new RTSPStream(topic, streamRole);
101 * Create and return RTSPStream object for publisher role
102 * @param topic Topic to which Publisher role is set
103 * @param streamRole Role of the RTSPStream object
104 * @return RTSPStream object
106 public static RTSPStream createPublisherStream(String topic, StreamRole streamRole) {
107 if (streamRole != StreamRole.PUBLISHER)
108 throw new IllegalArgumentException("The role of this stream is not publisher.");
110 return new RTSPStream(topic, streamRole);
114 * Method to set configuration
115 * @param config AittStreamConfig object
118 public void setConfig(AittStreamConfig config) {
120 throw new IllegalArgumentException("Invalid configuration");
122 if (config.getUrl() != null) {
123 String url = config.getUrl();
124 if (!url.startsWith(URL_PREFIX))
125 throw new IllegalArgumentException("Invalid RTSP URL");
127 discoveryInfo.url = config.getUrl();
129 if (config.getId() != null) {
130 discoveryInfo.id = config.getId();
132 if (config.getPassword() != null) {
133 discoveryInfo.password = config.getPassword();
136 discoveryInfo.height = config.getHeight();
138 discoveryInfo.width = config.getWidth();
142 * Method to start stream
145 public void start() {
146 if (streamRole == StreamRole.SUBSCRIBER) {
147 if (serverState == StreamState.READY) {
150 Log.d(TAG, "RTSP server not yet ready");
151 updateState(streamRole, StreamState.READY);
154 updateState(streamRole, StreamState.READY);
155 updateDiscoveryMessage();
160 * Method to publish to a topic
161 * @param topic String topic to which data is published
162 * @param ip Ip of the receiver
163 * @param port Port of the receiver
164 * @param message Data to be published
165 * @return returns status
168 public boolean publish(String topic, String ip, int port, byte[] message) {
169 // TODO: implement this function.
174 * Method to disconnect from the broker
177 public void disconnect() {
178 //ToDo : disconnect and stop can be merged
179 if (jniInterface != null)
180 jniInterface.removeDiscoveryCallback(topic);
184 * Method to stop the stream
188 if(streamRole == StreamRole.SUBSCRIBER) {
189 if (clientState == StreamState.PLAYING)
191 updateState(streamRole, StreamState.INIT);
193 updateState(streamRole, StreamState.INIT);
194 updateDiscoveryMessage();
199 * Method to pause the stream
201 public void pause() {
202 // TODO: implement this function.
206 * Method to record the stream
208 public void record() {
209 // TODO: implement this function.
213 * Method to set state callback
216 public void setStateCallback(StreamStateChangeCallback callback) {
217 this.stateChangeCallback = callback;
221 * Method to set subscribe callback
222 * @param streamDataCallback subscribe callback object
225 public void setReceiveCallback(StreamDataCallback streamDataCallback) {
226 if(streamRole == StreamRole.SUBSCRIBER)
227 streamCallback = streamDataCallback;
229 throw new IllegalArgumentException("The role of this stream is not subscriber");
233 * Method to receive stream height
234 * @return returns height of the stream
237 public int getStreamHeight() {
242 * Method to receive stream width
243 * @return returns width of the stream
246 public int getStreamWidth() {
251 * Method to set subscribe callback
252 * @param jniInterface JniInterface object
254 public void setJNIInterface(JniInterface jniInterface) {
255 this.jniInterface = jniInterface;
257 jniInterface.setDiscoveryCallback(topic, (status, data) -> {
258 Log.d(TAG, "Received discovery callback");
259 if (streamRole == StreamRole.PUBLISHER)
262 if (status.compareTo(Definitions.WILL_LEAVE_NETWORK) == 0) {
263 if (clientState == StreamState.PLAYING) {
265 updateState(streamRole, StreamState.READY);
267 updateState(streamRole, StreamState.INIT);
272 ByteBuffer buffer = ByteBuffer.wrap(data);
273 FlexBuffers.Map map = FlexBuffers.getRoot(buffer).asMap();
274 if (map.size() != 6) {
275 Log.e(TAG, "Invalid RTSP discovery message");
279 StreamState state = StreamState.values()[map.get(SERVER_STATE).asInt()];
280 updateState(StreamRole.PUBLISHER, state);
281 discoveryInfo.url = map.get(URL).asString();
282 discoveryInfo.id = map.get(ID).asString();
283 discoveryInfo.password = map.get(PASSWORD).asString();
284 discoveryInfo.height = map.get(HEIGHT).asInt();
285 discoveryInfo.width = map.get(WIDTH).asInt();
287 streamHeight = discoveryInfo.height;
288 streamWidth = discoveryInfo.width;
290 String url = createCompleteUrl();
291 Log.d(TAG, "RTSP URL : " + url);
295 rtspClient.setRtspUrl(url);
296 rtspClient.setResolution(discoveryInfo.height, discoveryInfo.width);
298 if (serverState == StreamState.READY) {
299 if (clientState == StreamState.READY) {
302 } else if (serverState == StreamState.INIT) {
303 if (clientState == StreamState.PLAYING) {
305 updateState(streamRole, StreamState.READY);
311 private String createCompleteUrl() {
312 String completeUrl = discoveryInfo.url;
313 if (completeUrl == null || completeUrl.isEmpty())
316 String id = discoveryInfo.id;
317 if (id == null || id.isEmpty())
320 String password = discoveryInfo.password;
321 if (password == null || password.isEmpty())
324 completeUrl = new StringBuilder(completeUrl).insert(URL_PREFIX.length(), id + ":" + password + "@").toString();
328 private void startRtspClient() {
329 RTSPClient.SocketConnectCallback cb = socketSuccess -> {
331 updateState(streamRole, StreamState.PLAYING);
332 rtspClient.initRtspClient();
335 Log.e(TAG, "Error creating socket");
339 rtspClient.createClientSocket(cb);
342 private void updateDiscoveryMessage() {
343 FlexBuffersBuilder fbb = new FlexBuffersBuilder(ByteBuffer.allocate(512));
345 int smap = fbb.startMap();
346 fbb.putInt(SERVER_STATE, serverState.ordinal());
347 fbb.putString(URL, discoveryInfo.url);
348 fbb.putString(ID, discoveryInfo.id);
349 fbb.putString(PASSWORD, discoveryInfo.password);
350 fbb.putInt(HEIGHT, discoveryInfo.height);
351 fbb.putInt(WIDTH, discoveryInfo.width);
352 fbb.endMap(null, smap);
354 ByteBuffer buffer = fbb.finish();
355 byte[] data = new byte[buffer.remaining()];
356 buffer.get(data, 0, data.length);
358 if (jniInterface != null)
359 jniInterface.updateDiscoveryMessage(topic, data);
362 private void updateState(StreamRole role, StreamState state) {
363 if (role == StreamRole.PUBLISHER)
368 if (role == streamRole && stateChangeCallback != null)
369 stateChangeCallback.pushStataChange(state);