2 * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package com.samsung.android.aitt.stream;
18 import android.util.Log;
20 import com.google.flatbuffers.FlexBuffers;
21 import com.google.flatbuffers.FlexBuffersBuilder;
22 import com.samsung.android.aitt.internal.Definitions;
23 import com.samsung.android.aittnative.JniInterface;
24 import com.samsung.android.modules.rtsp.RTSPClient;
26 import java.nio.ByteBuffer;
27 import java.util.HashMap;
29 import java.util.concurrent.atomic.AtomicBoolean;
32 * Class to implement RTSPStream functionalities
34 public class RTSPStream implements AittStream {
36 private static final String TAG = "RTSPStream";
37 private static final String SERVER_STATE = "server_state";
38 private static final String URL = "url";
39 private static final String ID = "id";
40 private static final String PASSWORD = "password";
41 private static final String URL_PREFIX = "rtsp://";
42 private static final String HEIGHT = "height";
43 private static final String WIDTH = "width";
45 private final StreamRole streamRole;
46 private final String topic;
47 private static int streamHeight;
48 private static int streamWidth;
49 private final DiscoveryInfo discoveryInfo = new DiscoveryInfo();
50 private RTSPClient rtspClient;
51 private StreamDataCallback streamCallback;
52 private StreamStateChangeCallback stateChangeCallback = null;
53 private JniInterface jniInterface = null;
54 private StreamState serverState = StreamState.INIT;
55 private StreamState clientState = StreamState.INIT;
58 * RTSPStream constructor
60 * @param topic Topic to which streaming is invoked
61 * @param streamRole Role of the RTSPStream object
63 private RTSPStream(String topic, StreamRole streamRole) {
65 this.streamRole = streamRole;
67 if (streamRole == StreamRole.SUBSCRIBER) {
68 RTSPClient.ReceiveDataCallback dataCallback = frame -> {
69 if (streamCallback != null)
70 streamCallback.pushStreamData(frame);
73 rtspClient = new RTSPClient(new AtomicBoolean(false), dataCallback);
78 * Class to store discovery parameters in an object
80 private static class DiscoveryInfo {
83 private String password;
89 * Create and return RTSPStream object for subscriber role
91 * @param topic Topic to which Subscribe role is set
92 * @param streamRole Role of the RTSPStream object
93 * @return RTSPStream object
95 public static RTSPStream createSubscriberStream(String topic, StreamRole streamRole) {
96 if (streamRole != StreamRole.SUBSCRIBER)
97 throw new IllegalArgumentException("The role of this stream is not subscriber.");
99 return new RTSPStream(topic, streamRole);
103 * Create and return RTSPStream object for publisher role
105 * @param topic Topic to which Publisher role is set
106 * @param streamRole Role of the RTSPStream object
107 * @return RTSPStream object
109 public static RTSPStream createPublisherStream(String topic, StreamRole streamRole) {
110 if (streamRole != StreamRole.PUBLISHER)
111 throw new IllegalArgumentException("The role of this stream is not publisher.");
113 return new RTSPStream(topic, streamRole);
117 * Method to set configuration
119 * @param config AittStreamConfig object
122 public void setConfig(AittStreamConfig config) {
124 throw new IllegalArgumentException("Invalid configuration");
126 if (config.getUrl() != null) {
127 String url = config.getUrl();
128 if (!url.startsWith(URL_PREFIX))
129 throw new IllegalArgumentException("Invalid RTSP URL");
131 discoveryInfo.url = config.getUrl();
133 if (config.getId() != null) {
134 discoveryInfo.id = config.getId();
136 if (config.getPassword() != null) {
137 discoveryInfo.password = config.getPassword();
140 discoveryInfo.height = config.getHeight();
142 discoveryInfo.width = config.getWidth();
146 * Method to start stream
149 public void start() {
150 if (streamRole == StreamRole.SUBSCRIBER) {
151 if (serverState == StreamState.READY) {
154 Log.d(TAG, "RTSP server not yet ready");
155 updateState(streamRole, StreamState.READY);
158 updateState(streamRole, StreamState.READY);
159 updateDiscoveryMessage();
164 * Method to publish to a topic
166 * @param topic String topic to which data is published
167 * @param ip Ip of the receiver
168 * @param port Port of the receiver
169 * @param message Data to be published
170 * @return returns status
173 public boolean publish(String topic, String ip, int port, byte[] message) {
174 // TODO: implement this function.
179 * Method to disconnect from the broker
182 public void disconnect() {
183 //ToDo : disconnect and stop can be merged
184 if (jniInterface != null)
185 jniInterface.removeDiscoveryCallback(topic);
189 * Method to stop the stream
193 if (streamRole == StreamRole.SUBSCRIBER) {
194 if (clientState == StreamState.PLAYING)
196 updateState(streamRole, StreamState.INIT);
198 updateState(streamRole, StreamState.INIT);
199 updateDiscoveryMessage();
204 * Method to pause the stream
206 public void pause() {
207 // TODO: implement this function.
211 * Method to record the stream
213 public void record() {
214 // TODO: implement this function.
218 * Method to set state callback
221 public void setStateCallback(StreamStateChangeCallback callback) {
222 this.stateChangeCallback = callback;
226 * Method to set subscribe callback
228 * @param streamDataCallback subscribe callback object
231 public void setReceiveCallback(StreamDataCallback streamDataCallback) {
232 if (streamRole == StreamRole.SUBSCRIBER)
233 streamCallback = streamDataCallback;
235 throw new IllegalArgumentException("The role of this stream is not subscriber");
239 * Method to receive stream height
241 * @return returns height of the stream
244 public int getStreamHeight() {
249 * Method to receive stream width
251 * @return returns width of the stream
254 public int getStreamWidth() {
259 * Method to set subscribe callback
261 * @param jniInterface JniInterface object
263 public void setJNIInterface(JniInterface jniInterface) {
264 this.jniInterface = jniInterface;
266 jniInterface.setDiscoveryCallback(topic, (status, data) -> {
267 Log.d(TAG, "Received discovery callback");
268 if (streamRole == StreamRole.PUBLISHER)
271 if (status.compareTo(Definitions.WILL_LEAVE_NETWORK) == 0) {
272 if (clientState == StreamState.PLAYING) {
274 updateState(streamRole, StreamState.READY);
276 updateState(streamRole, StreamState.INIT);
281 ByteBuffer buffer = ByteBuffer.wrap(data);
282 FlexBuffers.Map map = FlexBuffers.getRoot(buffer).asMap();
283 if (map.size() != 6) {
284 Log.e(TAG, "Invalid RTSP discovery message");
288 StreamState state = StreamState.values()[map.get(SERVER_STATE).asInt()];
289 updateState(StreamRole.PUBLISHER, state);
290 discoveryInfo.url = map.get(URL).asString();
291 discoveryInfo.id = map.get(ID).asString();
292 discoveryInfo.password = map.get(PASSWORD).asString();
293 discoveryInfo.height = map.get(HEIGHT).asInt();
294 discoveryInfo.width = map.get(WIDTH).asInt();
296 streamHeight = discoveryInfo.height;
297 streamWidth = discoveryInfo.width;
299 String url = createCompleteUrl();
300 Log.d(TAG, "RTSP URL : " + url);
304 rtspClient.setRtspUrl(url);
305 rtspClient.setResolution(discoveryInfo.height, discoveryInfo.width);
307 if (serverState == StreamState.READY) {
308 if (clientState == StreamState.READY) {
309 startRtspClient(discoveryInfo.id, discoveryInfo.password);
311 } else if (serverState == StreamState.INIT) {
312 if (clientState == StreamState.PLAYING) {
314 updateState(streamRole, StreamState.READY);
320 private String createCompleteUrl() {
321 String completeUrl = discoveryInfo.url;
322 if (completeUrl == null || completeUrl.isEmpty())
325 String id = discoveryInfo.id;
326 if (id == null || id.isEmpty())
329 String password = discoveryInfo.password;
330 if (password == null || password.isEmpty())
333 completeUrl = new StringBuilder(completeUrl).insert(URL_PREFIX.length(), id + ":" + password + "@").toString();
337 private void startRtspClient() {
338 startRtspClient("", "");
341 private void startRtspClient(String id, String password) {
342 RTSPClient.SocketConnectCallback cb = socketSuccess -> {
344 updateState(streamRole, StreamState.PLAYING);
345 rtspClient.initRtspClient(id, password);
348 Log.e(TAG, "Error creating socket");
352 rtspClient.createClientSocket(cb);
355 private void updateDiscoveryMessage() {
356 FlexBuffersBuilder fbb = new FlexBuffersBuilder(ByteBuffer.allocate(512));
358 int smap = fbb.startMap();
359 fbb.putInt(SERVER_STATE, serverState.ordinal());
360 fbb.putString(URL, discoveryInfo.url);
361 fbb.putString(ID, discoveryInfo.id);
362 fbb.putString(PASSWORD, discoveryInfo.password);
363 fbb.putInt(HEIGHT, discoveryInfo.height);
364 fbb.putInt(WIDTH, discoveryInfo.width);
365 fbb.endMap(null, smap);
367 ByteBuffer buffer = fbb.finish();
368 byte[] data = new byte[buffer.remaining()];
369 buffer.get(data, 0, data.length);
371 if (jniInterface != null)
372 jniInterface.updateDiscoveryMessage(topic, data);
375 private void updateState(StreamRole role, StreamState state) {
376 if (role == StreamRole.PUBLISHER)
381 if (role == streamRole && stateChangeCallback != null)
382 stateChangeCallback.pushStataChange(state);