Add RTSP client support to AITT android
authorPraveen Naik S <praveen.s@samsung.com>
Mon, 28 Nov 2022 14:21:40 +0000 (19:51 +0530)
committerPraveen Naik S <praveen.s@samsung.com>
Mon, 12 Dec 2022 09:27:04 +0000 (18:27 +0900)
12 files changed:
android/aitt/build.gradle
android/aitt/src/main/java/com/samsung/android/aitt/Aitt.java
android/aitt/src/main/java/com/samsung/android/aitt/handler/RTSPHandler.java
android/aitt/src/main/java/com/samsung/android/aitt/handler/StreamHandler.java
android/aitt/src/main/java/com/samsung/android/aitt/handler/WebRTCHandler.java
android/aitt/src/main/java/com/samsung/android/aitt/stream/AittStream.java
android/aitt/src/main/java/com/samsung/android/aitt/stream/RTSPStream.java
android/aitt/src/main/java/com/samsung/android/aitt/stream/WebRTCStream.java
android/modules/rtsp/build.gradle
android/modules/rtsp/src/androidTest/java/com/samsung/android/modules/rtsp/RTSPInstrumentedTest.java
android/modules/rtsp/src/main/java/com/samsung/android/modules/rtsp/RTSPClient.java [new file with mode: 0644]
build.gradle

index de6e262..86deaec 100644 (file)
@@ -53,6 +53,7 @@ dependencies {
     implementation project(path: ':android:modules:tcp')
     implementation project(path: ':android:modules:webrtc')
     implementation project(path: ':android:modules:ipc')
+    implementation project(path: ':android:modules:rtsp')
     implementation project(path: ':android:aitt-native')
 
     testImplementation 'junit:junit:4.13.2'
index 9843225..4d4bd88 100644 (file)
@@ -23,13 +23,13 @@ import android.content.Context;
 import android.util.Log;
 import android.util.Pair;
 
-import androidx.annotation.Nullable;
-
 import com.google.flatbuffers.FlexBuffers;
 import com.samsung.android.aitt.stream.AittStream;
 import com.samsung.android.aitt.stream.WebRTCStream;
 import com.samsung.android.aittnative.JniInterface;
 
+import androidx.annotation.Nullable;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.EnumSet;
@@ -384,7 +384,7 @@ public class Aitt {
      * @return true if the protocol is specific to android implementation
      */
     private boolean classifyProtocol(Protocol protocols) {
-        return protocols.equals(Protocol.WEBRTC) || protocols.equals(Protocol.IPC);
+        return protocols.equals(Protocol.WEBRTC) || protocols.equals(Protocol.IPC) || protocols.equals(Protocol.RTSP);
     }
 
     /**
index 731ca41..2d66a22 100644 (file)
@@ -17,15 +17,40 @@ package com.samsung.android.aitt.handler;
 
 import android.content.Context;
 
+import static android.content.ContentValues.TAG;
+import static com.samsung.android.aitt.stream.RTSPStream.createPublisherStream;
+import static com.samsung.android.aitt.stream.RTSPStream.createSubscriberStream;
+
+import android.util.Log;
+
 import com.samsung.android.aitt.Aitt;
 import com.samsung.android.aitt.stream.AittStream;
-import com.samsung.android.aitt.stream.RTSPStream;
+
+import java.security.InvalidParameterException;
 
 public class RTSPHandler extends StreamHandler {
 
     @Override
+    public void setAppContext(Context context) {
+    }
+
+    @Override
     public AittStream newStreamModule(Aitt.Protocol protocol, String topic, AittStream.StreamRole role, Context context) {
-        // TODO: implement this function.
-        return new RTSPStream();
+        if (protocol != Aitt.Protocol.RTSP)
+            throw new InvalidParameterException("Invalid protocol");
+
+        if (topic == null || topic.isEmpty())
+            throw new InvalidParameterException("Invalid topic");
+
+        try {
+            if (role == AittStream.StreamRole.SUBSCRIBER) {
+                return createSubscriberStream(topic, role);
+            } else {
+                return createPublisherStream(topic, role);
+            }
+        } catch (Exception e) {
+            Log.e(TAG, "Fail to create an AittStream instance.");
+        }
+        return null;
     }
 }
index d54e08c..bec11dc 100644 (file)
@@ -22,11 +22,23 @@ import com.samsung.android.aitt.stream.AittStream;
 
 public class StreamHandler implements ModuleHandler {
 
+    /**
+     * Method to create and return stream object
+     * @param protocol Streaming protocol
+     * @param topic String topic to which subscribe/publish is called
+     * @param role Role of the stream object
+     * @param context context of the application
+     * @return returns stream object
+     */
     AittStream newStreamModule(Aitt.Protocol protocol, String topic, AittStream.StreamRole role, Context context) {
         // TODO: Change this function properly after refactoring WebRTC modules.
         return null;
     }
 
+    /**
+     * Method to set application context to stream object
+     * @param appContext application context
+     */
     @Override
     public void setAppContext(Context appContext) {
         // TODO: implement this function.
index e2382f5..7142dc8 100644 (file)
@@ -22,6 +22,7 @@ import android.content.Context;
 import android.util.Log;
 
 import com.samsung.android.aitt.Aitt;
+
 import com.samsung.android.aitt.stream.AittStream;
 
 import java.security.InvalidParameterException;
index e7b08f0..3018fda 100644 (file)
  */
 package com.samsung.android.aitt.stream;
 
-import com.samsung.android.aitt.handler.ModuleHandler;
-
 public interface AittStream {
 
+    /**
+     * Role of the stream object
+     */
     enum StreamRole {
         PUBLISHER,
         SUBSCRIBER
     }
 
+    /**
+     * State of the stream object
+     */
     enum StreamState {
         INIT,
         READY,
         PLAYING
     }
 
+    /**
+     * Interface to implement handler data callback mechanism
+     */
+    interface StreamDataCallback {
+        void pushStreamData(byte[] data);
+    }
+
+    /**
+     * Method to set configuration
+     */
     void setConfig();
 
+    /**
+     * Method to start stream
+     */
     void start();
 
+    /**
+     * Method to publish to a topic
+     * @param topic String topic to which data is published
+     * @param ip Ip of the receiver
+     * @param port Port of the receiver
+     * @param message Data to be published
+     * @return returns status
+     */
     boolean publish(String topic, String ip, int port, byte[] message);
 
+    /**
+     * Method to disconnect from the broker
+     */
     void disconnect();
 
+    /**
+     * Method to stop the stream
+     */
     void stop();
 
+    /**
+     * Method to set state callback
+     */
     void setStateCallback();
 
-    void setReceiveCallback(ModuleHandler.HandlerDataCallback handlerDataCallback);
+    /**
+     * Method to set subscribe callback
+     * @param streamDataCallback subscribe callback object
+     */
+    void setReceiveCallback(AittStream.StreamDataCallback streamDataCallback);
+
 }
index 7044a4b..8533f78 100644 (file)
  */
 package com.samsung.android.aitt.stream;
 
-import com.samsung.android.aitt.handler.ModuleHandler;
+import android.util.Log;
 
+import com.samsung.android.modules.rtsp.RTSPClient;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Class to implement RTSPStream functionalities
+ */
 public class RTSPStream implements AittStream {
 
+    private static String TAG = "RTSPStream";
+    private StreamRole streamRole;
+    private RTSPClient rtspClient;
+    private AittStream.StreamDataCallback subscribeCallback;
+
+    /**
+     * RTSPStream constructor
+     * @param topic Topic to which streaming is invoked
+     * @param streamRole Role of the RTSPStream object
+     */
+    private RTSPStream(String topic, StreamRole streamRole) {
+        //TODO: create and assign topic to a local variable (topic)
+        this.streamRole = streamRole;
+    }
+
+    /**
+     * Create and return RTSPStream object for subscriber role
+     * @param topic  Topic to which Subscribe role is set
+     * @param streamRole Role of the RTSPStream object
+     * @return RTSPStream object
+     */
+    public static RTSPStream createSubscriberStream(String topic, StreamRole streamRole) {
+        if (streamRole != StreamRole.SUBSCRIBER)
+            throw new IllegalArgumentException("The role of this stream is not subscriber.");
+
+        return new RTSPStream(topic, streamRole);
+    }
+
+    /**
+     * Create and return RTSPStream object for publisher role
+     * @param topic  Topic to which Publisher role is set
+     * @param streamRole Role of the RTSPStream object
+     * @return RTSPStream object
+     */
+    public static RTSPStream createPublisherStream(String topic, StreamRole streamRole) {
+        if (streamRole != StreamRole.PUBLISHER)
+            throw new IllegalArgumentException("The role of this stream is not publisher.");
+
+        return new RTSPStream(topic, streamRole);
+    }
+
+    /**
+     * Method to set configuration
+     */
     @Override
     public void setConfig() {
         // TODO: implement this function.
     }
 
+    /**
+     * Method to start stream
+     */
     @Override
     public void start() {
-        // TODO: implement this function.
+        if (streamRole == StreamRole.SUBSCRIBER) {
+            RTSPClient.ReceiveDataCallback dataCallback = new RTSPClient.ReceiveDataCallback() {
+                @Override
+                public void pushData(byte[] frame) {
+                    subscribeCallback.pushStreamData(frame);
+                }
+            };
+
+            rtspClient = new RTSPClient(new AtomicBoolean(false), dataCallback);
+            RTSPClient.SocketConnectCallback cb = socketSuccess -> {
+                if (socketSuccess) {
+                    rtspClient.initRtspClient();
+                    rtspClient.start();
+                } else {
+                    Log.e(TAG, "Error creating socket");
+                }
+            };
+            rtspClient.createClientSocket(cb);
+        } else {
+            Log.d(TAG, "Publisher role is not yet supported");
+        }
     }
 
+    /**
+     * Method to publish to a topic
+     * @param topic String topic to which data is published
+     * @param ip Ip of the receiver
+     * @param port Port of the receiver
+     * @param message Data to be published
+     * @return returns status
+     */
     @Override
     public boolean publish(String topic, String ip, int port, byte[] message) {
         // TODO: implement this function.
         return true;
     }
 
+    /**
+     * Method to disconnect from the broker
+     */
     @Override
     public void disconnect() {
         // TODO: implement this function.
     }
 
+    /**
+     * Method to stop the stream
+     */
     @Override
     public void stop() {
-        // TODO: implement this function.
+        if(streamRole == StreamRole.SUBSCRIBER)
+            rtspClient.stop();
+        else
+            Log.d(TAG, "Publisher role is not yet supported");
     }
 
+    /**
+     * Method to pause the stream
+     */
     public void pause() {
         // TODO: implement this function.
     }
 
+    /**
+     * Method to record the stream
+     */
     public void record() {
         // TODO: implement this function.
     }
 
+    /**
+     * Method to set state callback
+     */
     @Override
     public void setStateCallback() {
         // TODO: implement this function.
     }
 
+    /**
+     * Method to set subscribe callback
+     * @param streamDataCallback subscribe callback object
+     */
     @Override
-    public void setReceiveCallback(ModuleHandler.HandlerDataCallback handlerDataCallback) {
-        // TODO: implement this function.
+    public void setReceiveCallback(AittStream.StreamDataCallback streamDataCallback) {
+        if(streamRole == StreamRole.SUBSCRIBER)
+            subscribeCallback = streamDataCallback;
+        else
+            throw new IllegalArgumentException("The role of this stream is not subscriber");
     }
 }
index db9d396..ba16299 100644 (file)
@@ -6,7 +6,6 @@ import android.util.Log;
 import com.google.flatbuffers.FlexBuffersBuilder;
 import com.samsung.android.aitt.Aitt;
 import com.samsung.android.aitt.Definitions;
-import com.samsung.android.aitt.handler.ModuleHandler;
 import com.samsung.android.aittnative.JniInterface;
 import com.samsung.android.modules.webrtc.WebRTC;
 import com.samsung.android.modules.webrtc.WebRTCServer;
@@ -87,12 +86,12 @@ public final class WebRTCStream implements AittStream {
     }
 
     @Override
-    public void setReceiveCallback(ModuleHandler.HandlerDataCallback handlerDataCallback) {
-        if (handlerDataCallback == null)
+    public void setReceiveCallback(AittStream.StreamDataCallback streamDataCallback) {
+        if (streamDataCallback == null)
             throw new IllegalArgumentException("The given callback is null.");
 
         if (streamRole == StreamRole.SUBSCRIBER) {
-            WebRTC.ReceiveDataCallback cb = handlerDataCallback::pushHandlerData;
+            WebRTC.ReceiveDataCallback cb = streamDataCallback::pushStreamData;
             ws.setDataCallback(cb);
         } else if (streamRole == StreamRole.PUBLISHER) {
             Log.e(TAG, "Invalid function call");
index dcaed71..adaecf9 100644 (file)
@@ -33,4 +33,5 @@ dependencies {
     androidTestImplementation 'androidx.test.ext:junit:1.1.3'
     androidTestImplementation 'androidx.test.espresso:espresso-core:3.4.0'
     androidTestImplementation project(path: ':android:aitt')
+    implementation 'com.github.alexeyvasilyev:rtsp-client-android:2.0.6'
 }
index 8e638d1..bf75a9b 100644 (file)
@@ -25,6 +25,8 @@ import android.content.Context;
 import androidx.test.platform.app.InstrumentationRegistry;
 
 import com.samsung.android.aitt.Aitt;
+import com.samsung.android.aitt.AittMessage;
+import com.samsung.android.aitt.handler.ModuleHandler;
 import com.samsung.android.aitt.stream.AittStream;
 
 import org.junit.BeforeClass;
@@ -54,7 +56,14 @@ public class RTSPInstrumentedTest {
             aitt.connect(brokerIp, PORT);
 
             AittStream subscriber = aitt.createStream(Aitt.Protocol.RTSP, TEST_TOPIC, SUBSCRIBER);
-            subscriber.setReceiveCallback(/* TODO */null);
+
+            AittStream.StreamDataCallback callback = new AittStream.StreamDataCallback() {
+                @Override
+                public void pushStreamData(byte[] data) {
+                    //Do something
+                }
+            };
+            subscriber.setReceiveCallback(callback);
             subscriber.start();
 
             AittStream publisher = aitt.createStream(Aitt.Protocol.RTSP, TEST_TOPIC, PUBLISHER);
diff --git a/android/modules/rtsp/src/main/java/com/samsung/android/modules/rtsp/RTSPClient.java b/android/modules/rtsp/src/main/java/com/samsung/android/modules/rtsp/RTSPClient.java
new file mode 100644 (file)
index 0000000..acc77be
--- /dev/null
@@ -0,0 +1,176 @@
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.samsung.android.modules.rtsp;
+
+import android.net.Uri;
+import android.util.Log;
+
+import androidx.annotation.NonNull;
+
+import com.alexvas.rtsp.RtspClient;
+import com.alexvas.utils.NetUtils;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.net.Socket;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * RTSPClient class to implement RTSP client side functionalities
+ */
+public class RTSPClient {
+    private static final String TAG = "RTSPClient";
+    private String rtspURL = "rtsp://192.168.1.2:5540/ch0";  //RTSP server URL is hardcoded for now //Todo - Update this using discovery mechanism
+    private static volatile Socket clientSocket;
+    private static int socketTimeout = 10000;
+    private AtomicBoolean exitFlag;
+    private RtspClient mRtspClient;
+    private ReceiveDataCallback streamCb;
+
+    /**
+     * Interface to implement DataCallback from RTSP module to RTSP stream
+     */
+    public interface ReceiveDataCallback {
+        void pushData(byte[] frame);
+    }
+
+    /**
+     * Interface to implement socket connection callback
+     */
+    public interface SocketConnectCallback {
+        void socketConnect(Boolean bool);
+    }
+
+    /**
+     * RTSPClient class constructor
+     * @param exitFlag AtomicBoolean flag to exit execution
+     * @param cb callback object to send data to upper layer
+     */
+    public RTSPClient(AtomicBoolean exitFlag, ReceiveDataCallback cb) {
+        this.exitFlag = exitFlag;
+        streamCb = cb;
+    }
+
+    /**
+     * Method to create a client socket for RTSP connection with RTSP server
+     * @param socketCB socket connection callback to notify success/failure of socket creation
+     */
+    public void createClientSocket(SocketConnectCallback socketCB){
+        Uri uri = Uri.parse(rtspURL);
+        try {
+            Thread thread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try  {
+                        clientSocket = NetUtils.createSocketAndConnect(uri.getHost(), uri.getPort(), socketTimeout);
+                        if(clientSocket != null)
+                            socketCB.socketConnect(true);
+                    } catch (Exception e) {
+                        socketCB.socketConnect(false);
+                        Log.d(TAG, "Exception in RTSP client socket creation");
+                    }
+                }
+            });
+
+            thread.start();
+
+        } catch (Exception e) {
+            Log.e(TAG, "Exception in RTSP client socket creation");
+        }
+    }
+
+    /**
+     * Method to create RtspClient object to access RTSP lib from dependency
+     */
+    public void initRtspClient() {
+
+        RtspClient.RtspClientListener clientlistener = new RtspClient.RtspClientListener() {
+            @Override
+            public void onRtspConnecting() {
+                Log.d(TAG, "Connecting to RTSP server");
+            }
+
+            @Override
+            public void onRtspConnected(@NonNull @NotNull RtspClient.SdpInfo sdpInfo) {
+                Log.d(TAG, "Connected to RTSP server");
+            }
+
+            @Override
+            public void onRtspVideoNalUnitReceived(@NonNull @NotNull byte[] bytes, int i, int i1, long l) {
+                Log.d(TAG, "RTSP video stream callback");
+                //TODO : Decode the Video Nal units received using H264 decoder
+                streamCb.pushData(bytes);
+            }
+
+            @Override
+            public void onRtspAudioSampleReceived(@NonNull @NotNull byte[] bytes, int i, int i1, long l) {
+                //TODO : Decode the Audio Nal units (AAC encoded) received using audio decoder
+                Log.d(TAG, "RTSP audio stream callback");
+            }
+
+            @Override
+            public void onRtspDisconnected() {
+                stopDecoders();
+                Log.d(TAG, "Disconnected from RTSP server");
+            }
+
+            @Override
+            public void onRtspFailedUnauthorized() {
+                Log.d(TAG, "onRtspFailedUnauthorized");
+            }
+
+            @Override
+            public void onRtspFailed(@androidx.annotation.Nullable @Nullable String s) {
+                Log.d(TAG, "onRtspFailed");
+            }
+        };
+
+        Uri uri = Uri.parse(rtspURL);
+        mRtspClient = new RtspClient.Builder(clientSocket, uri.toString(), exitFlag, clientlistener)
+                .requestAudio(false)
+                .requestVideo(true)
+                .withDebug(true)
+                .withUserAgent("RTSP sample Client")
+                .build();
+    }
+
+    /**
+     * Method to start RTSP streaming
+     */
+    public void start() {
+        mRtspClient.execute();
+    }
+
+    /**
+     * Method to stop RTSP streaming
+     */
+    public void stop() {
+        try{
+            NetUtils.closeSocket(clientSocket);
+            stopDecoders();
+        } catch (Exception E) {
+            Log.e(TAG, "Error closing socket");
+        }
+    }
+
+    /**
+     * Method to stop decoders
+     */
+    private void stopDecoders() {
+        //ToDO : Implement this function
+    }
+}
index f998374..12322f2 100644 (file)
@@ -18,6 +18,7 @@ allprojects {
         google()
         jcenter()
         mavenCentral()
+        maven { url 'https://jitpack.io' }
     }
 }