implementation project(path: ':android:modules:tcp')
implementation project(path: ':android:modules:webrtc')
implementation project(path: ':android:modules:ipc')
+ implementation project(path: ':android:modules:rtsp')
implementation project(path: ':android:aitt-native')
testImplementation 'junit:junit:4.13.2'
import android.util.Log;
import android.util.Pair;
-import androidx.annotation.Nullable;
-
import com.google.flatbuffers.FlexBuffers;
import com.samsung.android.aitt.stream.AittStream;
import com.samsung.android.aitt.stream.WebRTCStream;
import com.samsung.android.aittnative.JniInterface;
+import androidx.annotation.Nullable;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
* @return true if the protocol is specific to android implementation
*/
private boolean classifyProtocol(Protocol protocols) {
- return protocols.equals(Protocol.WEBRTC) || protocols.equals(Protocol.IPC);
+ return protocols.equals(Protocol.WEBRTC) || protocols.equals(Protocol.IPC) || protocols.equals(Protocol.RTSP);
}
/**
import android.content.Context;
+import static android.content.ContentValues.TAG;
+import static com.samsung.android.aitt.stream.RTSPStream.createPublisherStream;
+import static com.samsung.android.aitt.stream.RTSPStream.createSubscriberStream;
+
+import android.util.Log;
+
import com.samsung.android.aitt.Aitt;
import com.samsung.android.aitt.stream.AittStream;
-import com.samsung.android.aitt.stream.RTSPStream;
+
+import java.security.InvalidParameterException;
public class RTSPHandler extends StreamHandler {
@Override
+ public void setAppContext(Context context) {
+ }
+
+ @Override
public AittStream newStreamModule(Aitt.Protocol protocol, String topic, AittStream.StreamRole role, Context context) {
- // TODO: implement this function.
- return new RTSPStream();
+ if (protocol != Aitt.Protocol.RTSP)
+ throw new InvalidParameterException("Invalid protocol");
+
+ if (topic == null || topic.isEmpty())
+ throw new InvalidParameterException("Invalid topic");
+
+ try {
+ if (role == AittStream.StreamRole.SUBSCRIBER) {
+ return createSubscriberStream(topic, role);
+ } else {
+ return createPublisherStream(topic, role);
+ }
+ } catch (Exception e) {
+ Log.e(TAG, "Fail to create an AittStream instance.");
+ }
+ return null;
}
}
public class StreamHandler implements ModuleHandler {
+ /**
+ * Method to create and return stream object
+ * @param protocol Streaming protocol
+ * @param topic String topic to which subscribe/publish is called
+ * @param role Role of the stream object
+ * @param context context of the application
+ * @return returns stream object
+ */
AittStream newStreamModule(Aitt.Protocol protocol, String topic, AittStream.StreamRole role, Context context) {
// TODO: Change this function properly after refactoring WebRTC modules.
return null;
}
+ /**
+ * Method to set application context to stream object
+ * @param appContext application context
+ */
@Override
public void setAppContext(Context appContext) {
// TODO: implement this function.
import android.util.Log;
import com.samsung.android.aitt.Aitt;
+
import com.samsung.android.aitt.stream.AittStream;
import java.security.InvalidParameterException;
*/
package com.samsung.android.aitt.stream;
-import com.samsung.android.aitt.handler.ModuleHandler;
-
public interface AittStream {
+ /**
+ * Role of the stream object
+ */
enum StreamRole {
PUBLISHER,
SUBSCRIBER
}
+ /**
+ * State of the stream object
+ */
enum StreamState {
INIT,
READY,
PLAYING
}
+ /**
+ * Interface to implement handler data callback mechanism
+ */
+ interface StreamDataCallback {
+ void pushStreamData(byte[] data);
+ }
+
+ /**
+ * Method to set configuration
+ */
void setConfig();
+ /**
+ * Method to start stream
+ */
void start();
+ /**
+ * Method to publish to a topic
+ * @param topic String topic to which data is published
+ * @param ip Ip of the receiver
+ * @param port Port of the receiver
+ * @param message Data to be published
+ * @return returns status
+ */
boolean publish(String topic, String ip, int port, byte[] message);
+ /**
+ * Method to disconnect from the broker
+ */
void disconnect();
+ /**
+ * Method to stop the stream
+ */
void stop();
+ /**
+ * Method to set state callback
+ */
void setStateCallback();
- void setReceiveCallback(ModuleHandler.HandlerDataCallback handlerDataCallback);
+ /**
+ * Method to set subscribe callback
+ * @param streamDataCallback subscribe callback object
+ */
+ void setReceiveCallback(AittStream.StreamDataCallback streamDataCallback);
+
}
*/
package com.samsung.android.aitt.stream;
-import com.samsung.android.aitt.handler.ModuleHandler;
+import android.util.Log;
+import com.samsung.android.modules.rtsp.RTSPClient;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Class to implement RTSPStream functionalities
+ */
public class RTSPStream implements AittStream {
+ private static String TAG = "RTSPStream";
+ private StreamRole streamRole;
+ private RTSPClient rtspClient;
+ private AittStream.StreamDataCallback subscribeCallback;
+
+ /**
+ * RTSPStream constructor
+ * @param topic Topic to which streaming is invoked
+ * @param streamRole Role of the RTSPStream object
+ */
+ private RTSPStream(String topic, StreamRole streamRole) {
+ //TODO: create and assign topic to a local variable (topic)
+ this.streamRole = streamRole;
+ }
+
+ /**
+ * Create and return RTSPStream object for subscriber role
+ * @param topic Topic to which Subscribe role is set
+ * @param streamRole Role of the RTSPStream object
+ * @return RTSPStream object
+ */
+ public static RTSPStream createSubscriberStream(String topic, StreamRole streamRole) {
+ if (streamRole != StreamRole.SUBSCRIBER)
+ throw new IllegalArgumentException("The role of this stream is not subscriber.");
+
+ return new RTSPStream(topic, streamRole);
+ }
+
+ /**
+ * Create and return RTSPStream object for publisher role
+ * @param topic Topic to which Publisher role is set
+ * @param streamRole Role of the RTSPStream object
+ * @return RTSPStream object
+ */
+ public static RTSPStream createPublisherStream(String topic, StreamRole streamRole) {
+ if (streamRole != StreamRole.PUBLISHER)
+ throw new IllegalArgumentException("The role of this stream is not publisher.");
+
+ return new RTSPStream(topic, streamRole);
+ }
+
+ /**
+ * Method to set configuration
+ */
@Override
public void setConfig() {
// TODO: implement this function.
}
+ /**
+ * Method to start stream
+ */
@Override
public void start() {
- // TODO: implement this function.
+ if (streamRole == StreamRole.SUBSCRIBER) {
+ RTSPClient.ReceiveDataCallback dataCallback = new RTSPClient.ReceiveDataCallback() {
+ @Override
+ public void pushData(byte[] frame) {
+ subscribeCallback.pushStreamData(frame);
+ }
+ };
+
+ rtspClient = new RTSPClient(new AtomicBoolean(false), dataCallback);
+ RTSPClient.SocketConnectCallback cb = socketSuccess -> {
+ if (socketSuccess) {
+ rtspClient.initRtspClient();
+ rtspClient.start();
+ } else {
+ Log.e(TAG, "Error creating socket");
+ }
+ };
+ rtspClient.createClientSocket(cb);
+ } else {
+ Log.d(TAG, "Publisher role is not yet supported");
+ }
}
+ /**
+ * Method to publish to a topic
+ * @param topic String topic to which data is published
+ * @param ip Ip of the receiver
+ * @param port Port of the receiver
+ * @param message Data to be published
+ * @return returns status
+ */
@Override
public boolean publish(String topic, String ip, int port, byte[] message) {
// TODO: implement this function.
return true;
}
+ /**
+ * Method to disconnect from the broker
+ */
@Override
public void disconnect() {
// TODO: implement this function.
}
+ /**
+ * Method to stop the stream
+ */
@Override
public void stop() {
- // TODO: implement this function.
+ if(streamRole == StreamRole.SUBSCRIBER)
+ rtspClient.stop();
+ else
+ Log.d(TAG, "Publisher role is not yet supported");
}
+ /**
+ * Method to pause the stream
+ */
public void pause() {
// TODO: implement this function.
}
+ /**
+ * Method to record the stream
+ */
public void record() {
// TODO: implement this function.
}
+ /**
+ * Method to set state callback
+ */
@Override
public void setStateCallback() {
// TODO: implement this function.
}
+ /**
+ * Method to set subscribe callback
+ * @param streamDataCallback subscribe callback object
+ */
@Override
- public void setReceiveCallback(ModuleHandler.HandlerDataCallback handlerDataCallback) {
- // TODO: implement this function.
+ public void setReceiveCallback(AittStream.StreamDataCallback streamDataCallback) {
+ if(streamRole == StreamRole.SUBSCRIBER)
+ subscribeCallback = streamDataCallback;
+ else
+ throw new IllegalArgumentException("The role of this stream is not subscriber");
}
}
import com.google.flatbuffers.FlexBuffersBuilder;
import com.samsung.android.aitt.Aitt;
import com.samsung.android.aitt.Definitions;
-import com.samsung.android.aitt.handler.ModuleHandler;
import com.samsung.android.aittnative.JniInterface;
import com.samsung.android.modules.webrtc.WebRTC;
import com.samsung.android.modules.webrtc.WebRTCServer;
}
@Override
- public void setReceiveCallback(ModuleHandler.HandlerDataCallback handlerDataCallback) {
- if (handlerDataCallback == null)
+ public void setReceiveCallback(AittStream.StreamDataCallback streamDataCallback) {
+ if (streamDataCallback == null)
throw new IllegalArgumentException("The given callback is null.");
if (streamRole == StreamRole.SUBSCRIBER) {
- WebRTC.ReceiveDataCallback cb = handlerDataCallback::pushHandlerData;
+ WebRTC.ReceiveDataCallback cb = streamDataCallback::pushStreamData;
ws.setDataCallback(cb);
} else if (streamRole == StreamRole.PUBLISHER) {
Log.e(TAG, "Invalid function call");
androidTestImplementation 'androidx.test.ext:junit:1.1.3'
androidTestImplementation 'androidx.test.espresso:espresso-core:3.4.0'
androidTestImplementation project(path: ':android:aitt')
+ implementation 'com.github.alexeyvasilyev:rtsp-client-android:2.0.6'
}
import androidx.test.platform.app.InstrumentationRegistry;
import com.samsung.android.aitt.Aitt;
+import com.samsung.android.aitt.AittMessage;
+import com.samsung.android.aitt.handler.ModuleHandler;
import com.samsung.android.aitt.stream.AittStream;
import org.junit.BeforeClass;
aitt.connect(brokerIp, PORT);
AittStream subscriber = aitt.createStream(Aitt.Protocol.RTSP, TEST_TOPIC, SUBSCRIBER);
- subscriber.setReceiveCallback(/* TODO */null);
+
+ AittStream.StreamDataCallback callback = new AittStream.StreamDataCallback() {
+ @Override
+ public void pushStreamData(byte[] data) {
+ //Do something
+ }
+ };
+ subscriber.setReceiveCallback(callback);
subscriber.start();
AittStream publisher = aitt.createStream(Aitt.Protocol.RTSP, TEST_TOPIC, PUBLISHER);
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.samsung.android.modules.rtsp;
+
+import android.net.Uri;
+import android.util.Log;
+
+import androidx.annotation.NonNull;
+
+import com.alexvas.rtsp.RtspClient;
+import com.alexvas.utils.NetUtils;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.net.Socket;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * RTSPClient class to implement RTSP client side functionalities
+ */
+public class RTSPClient {
+ private static final String TAG = "RTSPClient";
+ private String rtspURL = "rtsp://192.168.1.2:5540/ch0"; //RTSP server URL is hardcoded for now //Todo - Update this using discovery mechanism
+ private static volatile Socket clientSocket;
+ private static int socketTimeout = 10000;
+ private AtomicBoolean exitFlag;
+ private RtspClient mRtspClient;
+ private ReceiveDataCallback streamCb;
+
+ /**
+ * Interface to implement DataCallback from RTSP module to RTSP stream
+ */
+ public interface ReceiveDataCallback {
+ void pushData(byte[] frame);
+ }
+
+ /**
+ * Interface to implement socket connection callback
+ */
+ public interface SocketConnectCallback {
+ void socketConnect(Boolean bool);
+ }
+
+ /**
+ * RTSPClient class constructor
+ * @param exitFlag AtomicBoolean flag to exit execution
+ * @param cb callback object to send data to upper layer
+ */
+ public RTSPClient(AtomicBoolean exitFlag, ReceiveDataCallback cb) {
+ this.exitFlag = exitFlag;
+ streamCb = cb;
+ }
+
+ /**
+ * Method to create a client socket for RTSP connection with RTSP server
+ * @param socketCB socket connection callback to notify success/failure of socket creation
+ */
+ public void createClientSocket(SocketConnectCallback socketCB){
+ Uri uri = Uri.parse(rtspURL);
+ try {
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ clientSocket = NetUtils.createSocketAndConnect(uri.getHost(), uri.getPort(), socketTimeout);
+ if(clientSocket != null)
+ socketCB.socketConnect(true);
+ } catch (Exception e) {
+ socketCB.socketConnect(false);
+ Log.d(TAG, "Exception in RTSP client socket creation");
+ }
+ }
+ });
+
+ thread.start();
+
+ } catch (Exception e) {
+ Log.e(TAG, "Exception in RTSP client socket creation");
+ }
+ }
+
+ /**
+ * Method to create RtspClient object to access RTSP lib from dependency
+ */
+ public void initRtspClient() {
+
+ RtspClient.RtspClientListener clientlistener = new RtspClient.RtspClientListener() {
+ @Override
+ public void onRtspConnecting() {
+ Log.d(TAG, "Connecting to RTSP server");
+ }
+
+ @Override
+ public void onRtspConnected(@NonNull @NotNull RtspClient.SdpInfo sdpInfo) {
+ Log.d(TAG, "Connected to RTSP server");
+ }
+
+ @Override
+ public void onRtspVideoNalUnitReceived(@NonNull @NotNull byte[] bytes, int i, int i1, long l) {
+ Log.d(TAG, "RTSP video stream callback");
+ //TODO : Decode the Video Nal units received using H264 decoder
+ streamCb.pushData(bytes);
+ }
+
+ @Override
+ public void onRtspAudioSampleReceived(@NonNull @NotNull byte[] bytes, int i, int i1, long l) {
+ //TODO : Decode the Audio Nal units (AAC encoded) received using audio decoder
+ Log.d(TAG, "RTSP audio stream callback");
+ }
+
+ @Override
+ public void onRtspDisconnected() {
+ stopDecoders();
+ Log.d(TAG, "Disconnected from RTSP server");
+ }
+
+ @Override
+ public void onRtspFailedUnauthorized() {
+ Log.d(TAG, "onRtspFailedUnauthorized");
+ }
+
+ @Override
+ public void onRtspFailed(@androidx.annotation.Nullable @Nullable String s) {
+ Log.d(TAG, "onRtspFailed");
+ }
+ };
+
+ Uri uri = Uri.parse(rtspURL);
+ mRtspClient = new RtspClient.Builder(clientSocket, uri.toString(), exitFlag, clientlistener)
+ .requestAudio(false)
+ .requestVideo(true)
+ .withDebug(true)
+ .withUserAgent("RTSP sample Client")
+ .build();
+ }
+
+ /**
+ * Method to start RTSP streaming
+ */
+ public void start() {
+ mRtspClient.execute();
+ }
+
+ /**
+ * Method to stop RTSP streaming
+ */
+ public void stop() {
+ try{
+ NetUtils.closeSocket(clientSocket);
+ stopDecoders();
+ } catch (Exception E) {
+ Log.e(TAG, "Error closing socket");
+ }
+ }
+
+ /**
+ * Method to stop decoders
+ */
+ private void stopDecoders() {
+ //ToDO : Implement this function
+ }
+}
google()
jcenter()
mavenCentral()
+ maven { url 'https://jitpack.io' }
}
}