Use native discovery for webrtc signaling
authorKartik Anand <kartik.anand@samsung.com>
Wed, 1 Feb 2023 11:56:02 +0000 (17:26 +0530)
committerKartik Anand <kartik.anand@samsung.com>
Thu, 9 Mar 2023 02:20:50 +0000 (11:20 +0900)
13 files changed:
android/aitt-native/src/main/java/com/samsung/android/aittnative/JniInterface.java
android/aitt-native/src/main/jni/aitt_jni.cc
android/aitt/src/main/java/com/samsung/android/aitt/Aitt.java
android/aitt/src/main/java/com/samsung/android/aitt/stream/AittStream.java
android/aitt/src/main/java/com/samsung/android/aitt/stream/RTSPStream.java
android/aitt/src/main/java/com/samsung/android/aitt/stream/WebRTCStream.java
android/modules/rtsp/src/main/java/com/samsung/android/modules/rtsp/RTSPClient.java
android/modules/webrtc/src/androidTest/java/com/samsung/android/modules/webrtc/WebRTCInstrumentedTest.java
android/modules/webrtc/src/main/java/com/samsung/android/modules/webrtc/WebRTC.java
android/modules/webrtc/src/main/java/com/samsung/android/modules/webrtc/WebRTCPublisher.java [new file with mode: 0644]
android/modules/webrtc/src/main/java/com/samsung/android/modules/webrtc/WebRTCServer.java [deleted file]
android/modules/webrtc/src/main/java/com/samsung/android/modules/webrtc/WebRTCSubscriber.java [new file with mode: 0644]
android/modules/webrtc/src/test/java/com/samsung/android/modules/webrtc/WebRTCUnitTest.java

index 7850c80..f7783d5 100644 (file)
@@ -62,7 +62,7 @@ public class JniInterface {
      * JNI callback interface to receive discovery messages
      */
     public interface JniDiscoveryCallback {
-        void onDiscoveryMessageReceived(String status, byte[] data);
+        void onDiscoveryMessageReceived(String clientId, String status, byte[] data);
     }
 
     /**
@@ -209,11 +209,11 @@ public class JniInterface {
         }
     }
 
-    void discoveryMessageCallback(String topic, String status, byte[] message) {
+    void discoveryMessageCallback(String topic, String clientId, String status, byte[] message) {
         synchronized (this) {
             Pair<Integer, JniDiscoveryCallback> pair = discoveryCallbacks.get(topic);
             if (pair != null) {
-                pair.second.onDiscoveryMessageReceived(status, message);
+                pair.second.onDiscoveryMessageReceived(clientId, status, message);
             }
         }
     }
index d842157..b49727d 100644 (file)
@@ -453,6 +453,13 @@ void AittNativeInterface::DiscoveryMessageCallback(const std::string &topic, con
             return;
         }
 
+        jstring _clientId = env->NewStringUTF(clientId.c_str());
+        if (env->ExceptionCheck() == true) {
+            JNI_LOG(ANDROID_LOG_ERROR, TAG, "Failed to create new UTF string");
+            cbContext.jvm->DetachCurrentThread();
+            return;
+        }
+
         jstring _status = env->NewStringUTF(status.c_str());
         if (env->ExceptionCheck() == true) {
             JNI_LOG(ANDROID_LOG_ERROR, TAG, "Failed to create new UTF string");
@@ -469,7 +476,7 @@ void AittNativeInterface::DiscoveryMessageCallback(const std::string &topic, con
             return;
         }
 
-        env->CallVoidMethod(cbObject, cbContext.discoveryCallbackMethodID, _topic, _status, array);
+        env->CallVoidMethod(cbObject, cbContext.discoveryCallbackMethodID, _topic, _clientId, _status, array);
         if (env->ExceptionCheck() == true) {
             JNI_LOG(ANDROID_LOG_ERROR, TAG, "Failed to call void method");
             cbContext.jvm->DetachCurrentThread();
@@ -532,7 +539,7 @@ jlong AittNativeInterface::Init(JNIEnv *env, jobject jni_interface_object, jstri
         cbContext.connectionCallbackMethodID =
                 env->GetMethodID(callbackClass, "connectionStatusCallback", "(I)V");
         cbContext.discoveryCallbackMethodID =
-                env->GetMethodID(callbackClass, "discoveryMessageCallback", "(Ljava/lang/String;Ljava/lang/String;[B)V");
+                env->GetMethodID(callbackClass, "discoveryMessageCallback", "(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;[B)V");
         env->DeleteLocalRef(callbackClass);
     } catch (std::exception &e) {
         JNI_LOG(ANDROID_LOG_ERROR, TAG, e.what());
index 7ed8f04..f099b76 100644 (file)
@@ -350,41 +350,13 @@ public class Aitt {
     }
 
     // TODO: Update publish with proper stream interface.
-    public boolean publish(AittStream stream, String topic, byte[] message, Protocol protocol) {
+    public boolean publish(AittStream stream, String topic, byte[] message) {
         if (stream == null) {
             Log.e(TAG, "Stream is null.");
             return false;
         }
 
-        try {
-            synchronized (this) {
-                HostTable hostTable = getHostTable(topic);
-                for (String hostIp : hostTable.hostMap.keySet()) {
-                    PortTable portTable = hostTable.hostMap.get(hostIp);
-                    if (portTable == null) {
-                        Log.e(TAG, "Port table for host [" + hostIp + "] is null.");
-                        continue;
-                    }
-                    for (Integer port : portTable.portMap.keySet()) {
-                        Pair<Protocol, Object> protocolPair = portTable.portMap.get(port);
-                        if (protocolPair == null) {
-                            Log.e(TAG, "Pair for port: " + port + "is null.");
-                            continue;
-                        }
-                        if (protocolPair.first != protocol) {
-                            Log.d(TAG, "protocol is not matched.");
-                            continue;
-                        }
-
-                        return stream.publish(topic, hostIp, port, message);
-                    }
-                }
-            }
-        } catch (Exception e) {
-            Log.e(TAG, "Error during publish", e);
-        }
-
-        return false;
+        return stream.publish(topic, message);
     }
 
     /**
@@ -820,10 +792,8 @@ public class Aitt {
         switch (protocol) {
             case WEBRTC:
                 WebRTCStream webRTCStream = (WebRTCStream) ((WebRTCHandler) moduleHandler).newStreamModule(protocol, topic, streamRole, appContext);
-                if (webRTCStream != null && streamRole == AittStream.StreamRole.SUBSCRIBER) {
-                    webRTCStream.setSelfIP(ip);
+                if (webRTCStream != null)
                     webRTCStream.setJNIInterface(mJniInterface);
-                }
                 return webRTCStream;
             case RTSP:
                 RTSPStream rtspStream = (RTSPStream) ((RTSPHandler) moduleHandler).newStreamModule(protocol, topic, streamRole, appContext);
index a5d855a..734230d 100644 (file)
@@ -61,12 +61,10 @@ public interface AittStream {
     /**
      * Method to publish to a topic
      * @param topic String topic to which data is published
-     * @param ip Ip of the receiver
-     * @param port Port of the receiver
      * @param message Data to be published
      * @return returns status
      */
-    boolean publish(String topic, String ip, int port, byte[] message);
+    boolean publish(String topic, byte[] message);
 
     /**
      * Method to disconnect from the broker
index bb19c0d..92d82e4 100644 (file)
@@ -65,9 +65,9 @@ public class RTSPStream implements AittStream {
         this.streamRole = streamRole;
 
         if (streamRole == StreamRole.SUBSCRIBER) {
-            RTSPClient.ReceiveDataCallback dataCallback = frame -> {
+            RTSPClient.ReceiveDataCallback dataCallback = data -> {
                 if (streamCallback != null)
-                    streamCallback.pushStreamData(frame);
+                    streamCallback.pushStreamData(data);
             };
 
             rtspClient = new RTSPClient(new AtomicBoolean(false), dataCallback);
@@ -164,13 +164,11 @@ public class RTSPStream implements AittStream {
      * Method to publish to a topic
      *
      * @param topic   String topic to which data is published
-     * @param ip      Ip of the receiver
-     * @param port    Port of the receiver
      * @param message Data to be published
      * @return returns status
      */
     @Override
-    public boolean publish(String topic, String ip, int port, byte[] message) {
+    public boolean publish(String topic, byte[] message) {
         // TODO: implement this function.
         return true;
     }
@@ -263,7 +261,7 @@ public class RTSPStream implements AittStream {
     public void setJNIInterface(JniInterface jniInterface) {
         this.jniInterface = jniInterface;
 
-        jniInterface.setDiscoveryCallback(topic, (status, data) -> {
+        jniInterface.setDiscoveryCallback(topic, (clientId, status, data) -> {
             Log.d(TAG, "Received discovery callback");
             if (streamRole == StreamRole.PUBLISHER)
                 return;
index d25c2a8..fdcee20 100644 (file)
@@ -18,37 +18,62 @@ package com.samsung.android.aitt.stream;
 import android.content.Context;
 import android.util.Log;
 
+import com.google.flatbuffers.FlexBuffers;
 import com.google.flatbuffers.FlexBuffersBuilder;
-import com.samsung.android.aitt.Aitt;
 import com.samsung.android.aitt.internal.Definitions;
 import com.samsung.android.aittnative.JniInterface;
 import com.samsung.android.modules.webrtc.WebRTC;
-import com.samsung.android.modules.webrtc.WebRTCServer;
+import com.samsung.android.modules.webrtc.WebRTCPublisher;
+import com.samsung.android.modules.webrtc.WebRTCSubscriber;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
 
 public final class WebRTCStream implements AittStream {
 
     private static final String TAG = "WebRTCStream";
-
-    private final String topic;
+    private static final String SINK = "/SINK";
+    private static final String SRC = "/SRC";
+    private static final String START = "START";
+    private static final String STOP = "STOP";
+    private static final String ID = "id";
+    private static final String PEER_ID = "peer_id";
+    private static final String SDP = "sdp";
+    private static final String ICE_CANDIDATES = "ice_candidates";
+
+    private final String publishTopic;
+    private final String watchTopic;
     private final StreamRole streamRole;
+    private final String id;
 
-    private int serverPort;
-    private String ip;
-    private JniInterface jniInterface;
     private WebRTC webrtc;
-    private WebRTCServer ws = null;
+    private JniInterface jniInterface;
     private StreamState streamState = StreamState.INIT;
+    private StreamStateChangeCallback stateChangeCallback = null;
 
     WebRTCStream(String topic, StreamRole streamRole, Context context) {
-        this.topic = topic;
         this.streamRole = streamRole;
 
-        if (streamRole == StreamRole.PUBLISHER)
-            webrtc = new WebRTC(context);       // TODO: Currently, native stream servers are publishers.
-        else
-            ws = new WebRTCServer(context);     // TODO: Currently, native stream clients are subscribers.
+        id = createId();
+        if (streamRole == StreamRole.PUBLISHER) {
+            publishTopic = topic + SRC;
+            watchTopic = topic + SINK;
+            try {
+                webrtc = new WebRTCPublisher(context);
+            } catch (InstantiationException e) {
+                Log.e(TAG, "Failed to create WebRTC instance");
+            }
+        } else {
+            publishTopic = topic + SINK;
+            watchTopic = topic + SRC;
+            try {
+                webrtc = new WebRTCSubscriber(context);
+            }  catch (InstantiationException e) {
+                Log.e(TAG, "Failed to create WebRTC instance");
+            }
+        }
     }
 
     public static WebRTCStream createSubscriberStream(String topic, StreamRole streamRole, Context context) {
@@ -67,47 +92,64 @@ public final class WebRTCStream implements AittStream {
 
     @Override
     public void setConfig(AittStreamConfig config) {
-
+        // ToDo : use this method to get frame width and height
     }
 
     @Override
     public void start() {
-        if (streamRole == StreamRole.SUBSCRIBER) {
-            serverPort = ws.start();
-            if (serverPort < 0)
-                throw new RuntimeException("Failed to start a WebRTC server socket.");
-            Log.d(TAG, "Start a WebRTC Server, port = " + serverPort);
+        if (invalidWebRTC())
+            return;
+
+        webrtc.registerIceCandidateAddedCallback(this::updateDiscoveryMessage);
+
+        FlexBuffersBuilder fbb = new FlexBuffersBuilder(ByteBuffer.allocate(512));
+        fbb.putString(START);
+        ByteBuffer buffer = fbb.finish();
+        byte[] data = new byte[buffer.remaining()];
+        buffer.get(data, 0, data.length);
+        if (jniInterface != null)
+            jniInterface.updateDiscoveryMessage(publishTopic, data);
 
-            byte[] publishData = wrapPublishData(topic, serverPort);
-            jniInterface.publish(Definitions.JAVA_SPECIFIC_DISCOVERY_TOPIC, publishData, publishData.length, Aitt.Protocol.MQTT.getValue(), Aitt.QoS.EXACTLY_ONCE.ordinal(), true);
+        if (streamRole == StreamRole.SUBSCRIBER && webrtc.getPeerDiscoveryId() != null) {
+            updateDiscoveryMessage();
         }
+
+        updateState(StreamState.READY);
     }
 
     @Override
     public void disconnect() {
-        if (webrtc != null)
-            webrtc.disconnect();
+        stop();
+        if (jniInterface != null)
+            jniInterface.removeDiscoveryCallback(watchTopic);
     }
 
     @Override
     public void stop() {
-        if (ws != null)
-            ws.stop();
+        FlexBuffersBuilder fbb = new FlexBuffersBuilder(ByteBuffer.allocate(512));
+        fbb.putString(STOP);
+        ByteBuffer buffer = fbb.finish();
+        byte[] data = new byte[buffer.remaining()];
+        buffer.get(data, 0, data.length);
+        if (jniInterface != null)
+            jniInterface.updateDiscoveryMessage(publishTopic, data);
+        updateState(StreamState.INIT);
     }
 
     @Override
     public void setStateCallback(StreamStateChangeCallback callback) {
-
+        stateChangeCallback = callback;
     }
 
     @Override
-    public void setReceiveCallback(AittStream.StreamDataCallback streamDataCallback) {
+    public void setReceiveCallback(StreamDataCallback streamDataCallback) {
         if (streamDataCallback == null)
             throw new IllegalArgumentException("The given callback is null.");
 
         if (streamRole == StreamRole.SUBSCRIBER) {
-            WebRTC.ReceiveDataCallback cb = streamDataCallback::pushStreamData;
-            ws.setDataCallback(cb);
+            if (invalidWebRTC())
+                return;
+            webrtc.registerDataCallback(streamDataCallback::pushStreamData);
         } else if (streamRole == StreamRole.PUBLISHER) {
             Log.e(TAG, "Invalid function call");
         }
@@ -125,61 +167,18 @@ public final class WebRTCStream implements AittStream {
         return 0;
     }
 
-    public int getServerPort() {
-        return serverPort;
-    }
-
-    public void setSelfIP(String ip) {
-        this.ip = ip;
-    }
-
-    /**
-     * Method to wrap topic, device IP address, and the port number of a webRTC server instance for publishing
-     *
-     * @param topic      Topic to which the application has subscribed to
-     * @param serverPort Port number of the WebRTC server instance
-     * @return Byte data wrapped, contains topic, device IP, webRTC server port number
-     */
-    private byte[] wrapPublishData(String topic, int serverPort) {
-        FlexBuffersBuilder fbb = new FlexBuffersBuilder(ByteBuffer.allocate(512));
-        {
-            int smap = fbb.startMap();
-            fbb.putString(Definitions.STATUS, Definitions.JOIN_NETWORK);
-            fbb.putString("host", ip);
-            {
-                int smap1 = fbb.startMap();
-                fbb.putInt("protocol", Aitt.Protocol.WEBRTC.getValue());
-                fbb.putInt("port", serverPort);
-                fbb.endMap(topic, smap1);
-            }
-            fbb.endMap(null, smap);
-        }
-        ByteBuffer buffer = fbb.finish();
-        byte[] data = new byte[buffer.remaining()];
-        buffer.get(data, 0, data.length);
-        return data;
-    }
-
     @Override
-    public boolean publish(String topic, String ip, int port, byte[] message) {
-        if (streamRole == StreamRole.PUBLISHER) {
-            if (webrtc == null) {
-                Log.e(TAG, "A WebRTC instance is null.");
-                return false;
-            }
-
-            if (streamState == StreamState.INIT) {
-                webrtc.connect(ip, port);
-                streamState = StreamState.READY;
-                Log.d(TAG, "A WebRTC client is connected into a server.");
-            }
+    public boolean publish(String topic, byte[] message) {
+        if (invalidWebRTC())
+            return false;
 
+        if (streamRole == StreamRole.PUBLISHER) {
             if (topic.endsWith(Definitions.RESPONSE_POSTFIX)) {
                 Log.d(TAG, "A message is sent through a WebRTC publisher stream.");
                 return webrtc.sendMessageData(message);
             } else {
                 Log.d(TAG, "Video data are sent through a WebRTC publisher stream.");
-                //ToDo - For now using sample app parameters, later fetch frameWidth & frameHeight from app
+                //ToDo - Fetch frameWidth & frameHeight from app
                 int frameWidth = 640;
                 int frameHeight = 480;
                 webrtc.sendVideoData(message, frameWidth, frameHeight);
@@ -192,6 +191,119 @@ public final class WebRTCStream implements AittStream {
     }
 
     public void setJNIInterface(JniInterface jniInterface) {
+        if (invalidWebRTC())
+            return;
+
         this.jniInterface = jniInterface;
+
+        jniInterface.setDiscoveryCallback(watchTopic, (clientId, status, data) -> {
+            if (status.compareTo(Definitions.WILL_LEAVE_NETWORK) == 0) {
+                webrtc.removePeer(clientId);
+                updateState(StreamState.INIT);
+                return;
+            }
+
+            ByteBuffer buffer = ByteBuffer.wrap(data);
+            if (FlexBuffers.getRoot(buffer).isString())
+                handleStreamState(clientId, buffer);
+            else
+                handleStreamInfo(clientId, buffer);
+        });
+    }
+
+    private String createId() {
+        return UUID.randomUUID().toString();
+    }
+
+    private boolean invalidWebRTC() {
+        if (webrtc == null) {
+            Log.e(TAG, "Invalid WebRTCStream");
+            return true;
+        }
+        return false;
+    }
+
+    private void handleStreamState(String clientId, ByteBuffer buffer) {
+        String peerState = FlexBuffers.getRoot(buffer).asString();
+        if (STOP.compareTo(peerState) == 0) {
+            updateState(StreamState.INIT);
+            webrtc.removePeer(clientId);
+        } else if (START.compareTo(peerState) == 0) {
+            if (streamRole == StreamRole.SUBSCRIBER)
+                webrtc.setPeerDiscoveryId(clientId);
+        } else {
+            Log.e(TAG, "Invalid message");
+        }
+    }
+
+    private void handleStreamInfo(String clientId, ByteBuffer buffer) {
+        FlexBuffers.Map map = FlexBuffers.getRoot(buffer).asMap();
+        if (!isValidInfo(map)) {
+            Log.e(TAG, "Invalid WebRTC stream information");
+            return;
+        }
+
+        Log.d(TAG, "Got discovery message");
+        String id = map.get(ID).asString();
+        String peerId = map.get(PEER_ID).asString();
+        String sdp = map.get(SDP).asString();
+        FlexBuffers.Vector vector = map.get(ICE_CANDIDATES).asVector();
+        int size = vector.size();
+        List<String> iceCandidates = new ArrayList<>();
+        for (int i = 0; i < size; i++)
+            iceCandidates.add(vector.get(i).asString());
+
+        if (webrtc.getPeerDiscoveryId() == null) {
+            if (webrtc.setPeerDiscoveryId(clientId))
+                webrtc.addPeer(id, sdp, iceCandidates);
+        }
+        else if(webrtc.getPeerDiscoveryId().compareTo(clientId) != 0)
+            Log.e(TAG, "Invalid peer discovery ID");
+        else if (this.id.compareTo(peerId) != 0)
+            Log.e(TAG, "Discovery message for different id");
+        else if(webrtc.getPeerId() == null)
+            webrtc.addPeer(id, sdp, iceCandidates);
+        else
+            webrtc.updatePeer(iceCandidates);
+    }
+
+    private boolean isValidInfo(FlexBuffers.Map map) {
+        boolean hasId = map.get(ID).isString();
+        boolean hasPeerId = map.get(PEER_ID).isString();
+        boolean hasSdp = map.get(SDP).isString();
+        boolean hasIceCandidate = map.get(ICE_CANDIDATES).isVector();
+        return hasId & hasPeerId & hasSdp & hasIceCandidate;
+    }
+
+    private void updateDiscoveryMessage() {
+        FlexBuffersBuilder fbb = new FlexBuffersBuilder(ByteBuffer.allocate(512));
+        {
+            int smap = fbb.startMap();
+            fbb.putString(ID, id);
+            String peerId = webrtc.getPeerId();
+            if (peerId == null)
+                peerId = webrtc.getPeerDiscoveryId();
+            fbb.putString(PEER_ID, peerId);
+            fbb.putString(SDP, webrtc.getLocalDescription());
+            int svec = fbb.startVector();
+            List<String> iceCandidates = webrtc.getIceCandidates();
+            for (String candidate : iceCandidates) {
+                fbb.putString(candidate);
+            }
+            fbb.endVector(ICE_CANDIDATES, svec, false, false);
+            fbb.endMap(null, smap);
+        }
+        ByteBuffer buffer = fbb.finish();
+        byte[] data = new byte[buffer.remaining()];
+        buffer.get(data, 0, data.length);
+
+        if (jniInterface != null)
+            jniInterface.updateDiscoveryMessage(publishTopic, data);
+    }
+
+    private void updateState(StreamState state) {
+        streamState = state;
+        if (stateChangeCallback != null)
+            stateChangeCallback.pushStataChange(streamState);
     }
 }
index cd2d7a0..6eee75d 100644 (file)
@@ -59,7 +59,7 @@ public class RTSPClient {
      * Interface to implement DataCallback from RTSP module to RTSP stream
      */
     public interface ReceiveDataCallback {
-        void pushData(byte[] frame);
+        void pushData(byte[] data);
     }
 
     /**
index eb619d1..ab9ae71 100644 (file)
@@ -37,7 +37,6 @@ import androidx.test.platform.app.InstrumentationRegistry;
 
 import com.samsung.android.aitt.Aitt;
 import com.samsung.android.aitt.stream.AittStream;
-import com.samsung.android.aitt.stream.WebRTCStream;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -147,11 +146,9 @@ public class WebRTCInstrumentedTest {
                 intervalSum += SLEEP_INTERVAL;
             }
 
-            int serverPort = ((WebRTCStream) serverSubscriberStream).getServerPort();
-            Log.d(TAG, "Server port = " + serverPort);
             while (true) {
                 // TODO: Replace publish
-                boolean isPublished = clientPublisher.publish(clientPublisherStream, TEST_MESSAGE_TOPIC, message.getBytes(), Aitt.Protocol.WEBRTC);
+                boolean isPublished = clientPublisher.publish(clientPublisherStream, TEST_MESSAGE_TOPIC, message.getBytes());
                 if (isPublished)
                     break;
             }
@@ -204,11 +201,9 @@ public class WebRTCInstrumentedTest {
                 intervalSum += SLEEP_INTERVAL;
             }
 
-            int serverPort = ((WebRTCStream) serverSubscriberStream).getServerPort();
-            Log.d(TAG, "Server port = " + serverPort);
             while (true) {
                 // TODO: Replace publish
-                boolean isPublished = clientPublisher.publish(clientPublisherStream, TEST_LARGE_MESSAGE_TOPIC, largeBytes, Aitt.Protocol.WEBRTC);
+                boolean isPublished = clientPublisher.publish(clientPublisherStream, TEST_LARGE_MESSAGE_TOPIC, largeBytes);
                 if (isPublished)
                     break;
                 Thread.sleep(SLEEP_INTERVAL);
@@ -259,12 +254,9 @@ public class WebRTCInstrumentedTest {
                 intervalSum += SLEEP_INTERVAL;
             }
 
-            int serverPort = ((WebRTCStream) serverSubscriberStream).getServerPort();
-            Log.d(TAG, "Server port = " + serverPort);
             while (true) {
                 // TODO: Replace publish
-                boolean isPublished = clientPublisher.publish(clientPublisherStream, TEST_VIDEO_TOPIC, frameImageBytes
-                        , Aitt.Protocol.WEBRTC);
+                boolean isPublished = clientPublisher.publish(clientPublisherStream, TEST_VIDEO_TOPIC, frameImageBytes);
                 if (isPublished)
                     break;
             }
index 4dbcc18..5abd5d5 100644 (file)
  */
 package com.samsung.android.modules.webrtc;
 
-import static org.webrtc.SessionDescription.Type.ANSWER;
-import static org.webrtc.SessionDescription.Type.OFFER;
-
-import static java.lang.Math.max;
-import static java.lang.Math.min;
-import static java.lang.Math.toIntExact;
-
 import android.content.Context;
-import android.os.SystemClock;
 import android.util.Log;
 
-import com.github.luben.zstd.Zstd;
-
 import org.json.JSONException;
 import org.json.JSONObject;
-import org.webrtc.CapturerObserver;
 import org.webrtc.DataChannel;
 import org.webrtc.DefaultVideoDecoderFactory;
 import org.webrtc.DefaultVideoEncoderFactory;
 import org.webrtc.EglBase;
 import org.webrtc.IceCandidate;
-import org.webrtc.MediaConstraints;
-import org.webrtc.MediaStream;
-import org.webrtc.MediaStreamTrack;
-import org.webrtc.NV21Buffer;
 import org.webrtc.PeerConnection;
 import org.webrtc.PeerConnectionFactory;
-import org.webrtc.RtpReceiver;
 import org.webrtc.SdpObserver;
 import org.webrtc.SessionDescription;
-import org.webrtc.SurfaceTextureHelper;
-import org.webrtc.VideoCapturer;
 import org.webrtc.VideoDecoderFactory;
 import org.webrtc.VideoEncoderFactory;
-import org.webrtc.VideoFrame;
-import org.webrtc.VideoSink;
-import org.webrtc.VideoSource;
-import org.webrtc.VideoTrack;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.net.Socket;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
+
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
+import java.util.List;
 
 /**
  * WebRTC class to implement webRTC functionalities
  */
-public final class WebRTC {
+public abstract class WebRTC {
 
-    public static final String VIDEO_TRACK_ID = "ARDAMSv0";
     public static final String EOF_MESSAGE = "EOF";
     public static final int MAX_MESSAGE_SIZE = 32768;
     public static final int MAX_UNCOMPRESSED_MESSAGE_SIZE = 295936;
     private static final String TAG = "WebRTC";
-    private static final String CANDIDATE = "candidate";
-
-    private final Context appContext;
-    private final boolean isReceiver;
-
-    private java.net.Socket socket;
-    private boolean isInitiator;
-    private boolean isChannelReady;
-    private boolean isStarted;
-    private PeerConnection peerConnection;
-    private PeerConnectionFactory factory;
-    private VideoTrack videoTrackFromSource;
-    private ObjectOutputStream outStream;
-    private ObjectInputStream inputStream;
-    private SDPThread sdpThread;
-    private DataChannel localDataChannel;
-    private FrameVideoCapturer videoCapturer;
-    private ReceiveDataCallback dataCallback;
-    private String receiverIP;
-    private Integer receiverPort;
-    private ByteArrayOutputStream baos;
-    private boolean recvLargeChunk = false;
+
+    protected PeerConnection peerConnection;
+    protected PeerConnectionFactory connectionFactory;
+    protected DataChannel localDataChannel;
+    protected ReceiveDataCallback dataCallback;
+    protected IceCandidateAddedCallback iceCandidateAddedCallback;
+    protected String localDescription;
+    protected List<String> iceCandidates = new ArrayList<>();
+    protected String peerDiscoveryId;
+
+    protected final Context appContext;
+    private String peerId;
 
     /**
-     * WebRTC constructor to create webRTC instance
-     *
-     * @param appContext Application context creating webRTC instance
+     * Interface to create data call back mechanism
      */
-    public WebRTC(Context appContext) {
-        if (appContext == null)
-            throw new IllegalArgumentException("App context is null.");
+    public interface ReceiveDataCallback {
+        void pushData(byte[] data);
+    }
 
-        this.appContext = appContext;
-        this.isReceiver = false;
+    /**
+     * Callback triggered when ICE candidates are added
+     */
+    public interface IceCandidateAddedCallback {
+        void onIceCandidate();
     }
 
     /**
+     * Method to set remote description of peer
+     *
+     * @param sdp String containing SDP of peer
+     */
+    public abstract void setRemoteDescription(String sdp);
+
+    protected abstract void initializePeerConnection();
+
+    /**
      * WebRTC constructor to create webRTC instance
      *
      * @param appContext Application context creating webRTC instance
-     * @param socket     Java server socket for webrtc signalling
      */
-    WebRTC(Context appContext, Socket socket) {
-        Log.d(TAG, "InWebRTC Constructor");
+    public WebRTC(Context appContext) throws InstantiationException {
+        if (appContext == null)
+            throw new IllegalArgumentException("App context is null.");
+
         this.appContext = appContext;
-        this.socket = socket;
-        this.isReceiver = true;
+        initializePeerConnectionFactory();
+        initializePeerConnection();
+        if (peerConnection == null)
+            throw new InstantiationException("Failed to create peer connection");
     }
 
     /**
      * To create data call-back mechanism
      *
-     * @param cb aitt callback registered to receive a webrtc data
+     * @param cb ReceiveDataCallback registered to receive a webrtc data
      */
-    void registerDataCallback(ReceiveDataCallback cb) {
+    public void registerDataCallback(ReceiveDataCallback cb) {
         if (cb == null)
             throw new IllegalArgumentException("Callback is null.");
 
-        this.dataCallback = cb;
+        dataCallback = cb;
     }
 
     /**
-     * Method to disconnect the connection from peer
+     * To notify when ice-candidate is added
+     *
+     * @param cb IceCandidateAddedCallback triggered when a new ICE candidate has been found
      */
-    public void disconnect() {
-        if (sdpThread != null) {
-            sdpThread.stop();
-        }
+    public void registerIceCandidateAddedCallback(IceCandidateAddedCallback cb) {
+        if (cb == null)
+            throw new IllegalArgumentException("Callback is null.");
 
-        if (socket != null) {
-            new Thread(() -> {
-                try {
-                    sendMessage(false, "bye");
-                    socket.close();
-                    if (outStream != null) {
-                        outStream.close();
-                    }
-                    if (inputStream != null) {
-                        inputStream.close();
-                    }
-                } catch (IOException e) {
-                    Log.e(TAG, "Error during disconnect", e);
-                }
-            }).start();
-        }
+        iceCandidateAddedCallback = cb;
     }
 
     /**
-     * Method to establish a socket connection with peer node
+     * Method to disconnect the connection from peer
      */
-    void connect() {
-        initialize();
+    public void stop() {
+        // ToDo : Stop webRTC stream
     }
 
     /**
-     * Method to establish communication with peer node
+     * Method to get peer ID
      *
-     * @param receiverIP   IP Address of the destination(peer) node
-     * @param receiverPort Port number of the destination(peer) node
-     */
-    public void connect(String receiverIP, Integer receiverPort) {
-        this.receiverIP = receiverIP;
-        this.receiverPort = receiverPort;
-        initialize();
-        Log.i(TAG, "A WebRTC client is connected.");
-    }
-
-    /**
-     * Method to initialize webRTC APIs while establishing connection
+     * @return String ID of the peer
      */
-    private void initialize() {
-        baos = new ByteArrayOutputStream();
-
-        initializePeerConnectionFactory();
-        initializePeerConnections();
-        Log.i(TAG, "Peer connections are initialized.");
-
-        if (!isReceiver) {
-            createVideoTrack();
-            addVideoTrack();
-        }
-        isInitiator = isReceiver;
-
-        sdpThread = new SDPThread();
-        new Thread(sdpThread).start();
+    public String getPeerId() {
+        return peerId;
     }
 
     /**
-     * Method to create webRTC offer for sdp negotiation
+     * Method to get peer discovery ID
+     *
+     * @return String Discovery ID of the peer
      */
-    private void doCall() {
-        MediaConstraints sdpMediaConstraints = new MediaConstraints();
-        sdpMediaConstraints.mandatory.add(new MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"));
-
-        peerConnection.createOffer(new SimpleSdpObserver() {
-            @Override
-            public void onCreateSuccess(SessionDescription sessionDescription) {
-                Log.d(TAG, "onCreateSuccess: ");
-                peerConnection.setLocalDescription(new SimpleSdpObserver(), sessionDescription);
-                JSONObject message = new JSONObject();
-                try {
-                    message.put("type", "offer");
-                    message.put("sdp", sessionDescription.description);
-                    sendMessage(true, message);
-                } catch (JSONException | IOException e) {
-                    Log.e(TAG, "Error during create offer", e);
-                }
-            }
-        }, sdpMediaConstraints);
+    public String getPeerDiscoveryId() {
+        return peerDiscoveryId;
     }
 
     /**
-     * Method to send signalling messages over socket connection
+     * Method to set peer discovery ID
      *
-     * @param isJSON  Boolean to check if message is JSON
-     * @param message Data to be sent over webRTC connection
-     * @throws IOException Throws IOException if writing to outStream fails
+     * @param id Discovery ID of the peer
+     * @return true if peer is not already added
      */
-    private void sendMessage(boolean isJSON, Object message) throws IOException {
-        Log.d(TAG, message.toString());
-        try {
-            if (outStream != null) {
-                if (isJSON) {
-                    outStream.writeObject(new Packet((JSONObject) message));
-                } else {
-                    outStream.writeObject(new Packet((String) message));
-                }
-            }
-        } catch (SocketException e) {
-            Log.e(TAG, "Error during sending a message.", e);
+    public boolean setPeerDiscoveryId(String id) {
+        if (peerDiscoveryId != null && !peerDiscoveryId.isEmpty()) {
+            Log.e(TAG, "Stream peer already added");
+            return false;
         }
+        this.peerDiscoveryId = id;
+        return true;
     }
 
     /**
-     * Class to create proxy video sink
+     * Method to get local description
+     *
+     * @return Local description
      */
-    private static class ProxyVideoSink implements VideoSink {
-
-        private final ReceiveDataCallback dataCallback;
-
-        /**
-         * ProxyVideoSink constructor to create its instance
-         *
-         * @param dataCb DataCall back to be set to self-object
-         */
-        ProxyVideoSink(ReceiveDataCallback dataCb) {
-            this.dataCallback = dataCb;
-        }
-
-        /**
-         * Method to send data through data call back
-         *
-         * @param frame VideoFrame to be transferred using media channel
-         */
-        @Override
-        synchronized public void onFrame(VideoFrame frame) {
-            byte[] rawFrame = createNV21Data(frame.getBuffer().toI420());
-            dataCallback.pushData(rawFrame);
-        }
-
-        /**
-         * Method used to convert VideoFrame to NV21 data format
-         *
-         * @param i420Buffer VideoFrame in I420 buffer format
-         * @return the video frame in NV21 data format
-         */
-        public byte[] createNV21Data(VideoFrame.I420Buffer i420Buffer) {
-            final int width = i420Buffer.getWidth();
-            final int height = i420Buffer.getHeight();
-            final int chromaWidth = (width + 1) / 2;
-            final int chromaHeight = (height + 1) / 2;
-            final int ySize = width * height;
-            final ByteBuffer nv21Buffer = ByteBuffer.allocateDirect(ySize + width * chromaHeight);
-            final byte[] nv21Data = nv21Buffer.array();
-            for (int y = 0; y < height; ++y) {
-                for (int x = 0; x < width; ++x) {
-                    final byte yValue = i420Buffer.getDataY().get(y * i420Buffer.getStrideY() + x);
-                    nv21Data[y * width + x] = yValue;
-                }
-            }
-            for (int y = 0; y < chromaHeight; ++y) {
-                for (int x = 0; x < chromaWidth; ++x) {
-                    final byte uValue = i420Buffer.getDataU().get(y * i420Buffer.getStrideU() + x);
-                    final byte vValue = i420Buffer.getDataV().get(y * i420Buffer.getStrideV() + x);
-                    nv21Data[ySize + y * width + 2 * x] = vValue;
-                    nv21Data[ySize + y * width + 2 * x + 1] = uValue;
-                }
-            }
-            return nv21Data;
-        }
+    public String getLocalDescription() {
+        return localDescription;
     }
 
     /**
-     * Method to initialize peer connection factory
+     * Method to get ICE candidates
+     *
+     * @return List of ICE candidates
      */
-    private void initializePeerConnectionFactory() {
-        EglBase mRootEglBase;
-        mRootEglBase = EglBase.create();
-        VideoEncoderFactory encoderFactory = new DefaultVideoEncoderFactory(mRootEglBase.getEglBaseContext(), true /* enableIntelVp8Encoder */, true);
-        VideoDecoderFactory decoderFactory = new DefaultVideoDecoderFactory(mRootEglBase.getEglBaseContext());
-
-        PeerConnectionFactory.initialize(PeerConnectionFactory.InitializationOptions.builder(appContext).setEnableInternalTracer(true).createInitializationOptions());
-        PeerConnectionFactory.Builder builder = PeerConnectionFactory.builder().setVideoEncoderFactory(encoderFactory).setVideoDecoderFactory(decoderFactory);
-        builder.setOptions(null);
-        factory = builder.createPeerConnectionFactory();
+    public List<String> getIceCandidates() {
+        return iceCandidates;
     }
 
     /**
-     * Method to create video track
+     * Method to add peer information
+     *
+     * @param peerId ID of the peer
+     * @param sdp SDP sent by peer
+     * @param candidates List of ICE candidates
      */
-    private void createVideoTrack() {
-        videoCapturer = new FrameVideoCapturer();
-        VideoSource videoSource = factory.createVideoSource(false);
-        videoCapturer.initialize(null, null, videoSource.getCapturerObserver());
-        videoTrackFromSource = factory.createVideoTrack(VIDEO_TRACK_ID, videoSource);
-        videoTrackFromSource.setEnabled(true);
+    public void addPeer(String peerId, String sdp, List<String> candidates) {
+        this.peerId = peerId;
+        setRemoteDescription(sdp);
+        updatePeer(candidates);
     }
 
     /**
-     * Method to initialize peer connections
+     * Method to update ICE candidates
+     *
+     * @param candidates List of ICE candidates
      */
-    private void initializePeerConnections() {
-        peerConnection = createPeerConnection(factory);
-        if (peerConnection != null) {
-            localDataChannel = peerConnection.createDataChannel("sendDataChannel", new DataChannel.Init());
-        }
-    }
+    public void updatePeer(List<String> candidates) {
+        if (candidates == null)
+            return;
 
-    /**
-     * Method to add video track
-     */
-    private void addVideoTrack() {
-        MediaStream mediaStream = factory.createLocalMediaStream("ARDAMS");
-        mediaStream.addTrack(videoTrackFromSource);
-        if (peerConnection != null) {
-            peerConnection.addStream(mediaStream);
+        for (String candidate : candidates) {
+            Log.d(TAG, "Received ICE candidate: " + candidate);
+            String ice = extractICE(candidate);
+            IceCandidate iceCandidate = new IceCandidate("video", 0, ice);
+            peerConnection.addIceCandidate(iceCandidate);
         }
     }
 
     /**
-     * Method to create peer connection
+     * Method to remove connected peer stream
      *
-     * @param factory Peer connection factory object
-     * @return return factory object
+     * @param discoveryId Discovery ID of the peer
      */
-    private PeerConnection createPeerConnection(PeerConnectionFactory factory) {
-        PeerConnection.RTCConfiguration rtcConfig = new PeerConnection.RTCConfiguration(new ArrayList<>());
-        MediaConstraints pcConstraints = new MediaConstraints();
-
-        PeerConnection.Observer pcObserver = new PeerConnection.Observer() {
-            @Override
-            public void onSignalingChange(PeerConnection.SignalingState signalingState) {
-                Log.d(TAG, "onSignalingChange: ");
-            }
-
-            @Override
-            public void onIceConnectionChange(PeerConnection.IceConnectionState iceConnectionState) {
-                Log.d(TAG, "onIceConnectionChange: ");
-            }
-
-            @Override
-            public void onIceConnectionReceivingChange(boolean b) {
-                Log.d(TAG, "onIceConnectionReceivingChange: ");
-            }
-
-            @Override
-            public void onIceGatheringChange(PeerConnection.IceGatheringState iceGatheringState) {
-                Log.d(TAG, "onIceGatheringChange: ");
-            }
-
-            @Override
-            public void onIceCandidate(IceCandidate iceCandidate) {
-                Log.d(TAG, "onIceCandidate: ");
-                JSONObject message = new JSONObject();
-                try {
-                    message.put("type", CANDIDATE);
-                    message.put("label", iceCandidate.sdpMLineIndex);
-                    message.put("id", iceCandidate.sdpMid);
-                    message.put(CANDIDATE, iceCandidate.sdp);
-                    Log.d(TAG, "onIceCandidate: sending candidate " + message);
-                    sendMessage(true, message);
-                } catch (JSONException | IOException e) {
-                    Log.e(TAG, "Error during onIceCandidate", e);
-                }
-            }
-
-            @Override
-            public void onIceCandidatesRemoved(IceCandidate[] iceCandidates) {
-                Log.d(TAG, "onIceCandidatesRemoved: ");
-            }
-
-            @Override
-            public void onAddStream(MediaStream mediaStream) {
-                Log.d(TAG, "onAddStream: " + mediaStream.videoTracks.size());
-                VideoTrack remoteVideoTrack = mediaStream.videoTracks.get(0);
-                remoteVideoTrack.setEnabled(true);
-            }
-
-            @Override
-            public void onRemoveStream(MediaStream mediaStream) {
-                Log.d(TAG, "onRemoveStream: ");
-            }
-
-            @Override
-            public void onDataChannel(DataChannel dataChannel) {
-                Log.d(TAG, "onDataChannel: ");
-                dataChannel.registerObserver(new DataChannel.Observer() {
-                    @Override
-                    public void onBufferedAmountChange(long l) {
-                        //Keep this callback for future usage
-                        Log.d(TAG, "onBufferedAmountChange:");
-                    }
-
-                    @Override
-                    public void onStateChange() {
-                        Log.d(TAG, "onStateChange: remote data channel state: " + dataChannel.state().toString());
-                    }
-
-                    @Override
-                    public void onMessage(DataChannel.Buffer buffer) {
-                        Log.d(TAG, "onMessage: got message");
-                        if (!recvLargeChunk && buffer.data.capacity() < MAX_MESSAGE_SIZE) {
-                            byte[] array = new byte[buffer.data.capacity()];
-                            buffer.data.rewind();
-                            buffer.data.get(array);
-                            dataCallback.pushData(array);
-                        } else {
-                            String message = StandardCharsets.UTF_8.decode(buffer.data).toString();
-                            handleLargeMessage(message, buffer);
-                        }
-                    }
-                });
-            }
-
-            @Override
-            public void onRenegotiationNeeded() {
-                Log.d(TAG, "onRenegotiationNeeded: ");
-            }
-
-            @Override
-            public void onAddTrack(RtpReceiver rtpReceiver, MediaStream[] mediaStreams) {
-                MediaStreamTrack track = rtpReceiver.track();
-                if (track instanceof VideoTrack && isReceiver) {
-                    Log.i(TAG, "onAddVideoTrack");
-                    VideoTrack remoteVideoTrack = (VideoTrack) track;
-                    remoteVideoTrack.setEnabled(true);
-                    ProxyVideoSink videoSink = new ProxyVideoSink(dataCallback);
-                    remoteVideoTrack.addSink(videoSink);
-                }
-            }
-        };
-        return factory.createPeerConnection(rtcConfig, pcConstraints, pcObserver);
-    }
+    public void removePeer(String discoveryId) {
+        if (peerDiscoveryId == null || peerDiscoveryId.isEmpty())
+            return;
 
-    private void handleLargeMessage(String message, DataChannel.Buffer buffer) {
-        if (EOF_MESSAGE.equals(message)) {
-            // Received data is in compressed format, un compress it
-            try {
-                byte[] compBytes = baos.toByteArray();
-                long deCompSize = Zstd.decompressedSize(compBytes);
-                byte[] deCompBytes = Zstd.decompress(compBytes, max(toIntExact(deCompSize), MAX_UNCOMPRESSED_MESSAGE_SIZE));
-                Log.d(TAG, "UnCompressed Byte array size: " + deCompBytes.length);
-                dataCallback.pushData(deCompBytes);
-                baos.reset();
-                recvLargeChunk = false;
-            } catch (Exception e) {
-                Log.e(TAG, "Error during UnCompression: ");
-            }
-        } else {
-            recvLargeChunk = true;
-            try {
-                byte[] array = new byte[buffer.data.capacity()];
-                buffer.data.rewind();
-                buffer.data.get(array);
-                baos.write(array);
-            } catch (IOException e) {
-                Log.e(TAG, "Failed to write to byteArrayOutputStream " + e);
-            }
+        if (peerDiscoveryId.compareTo(discoveryId) != 0) {
+            Log.d(TAG, "No stream for peer: " + discoveryId);
+            return;
         }
+        stop();
     }
 
     /**
@@ -506,7 +232,7 @@ public final class WebRTC {
      * @param height height of the video frame
      */
     public void sendVideoData(byte[] frame, int width, int height) {
-        videoCapturer.send(frame, width, height);
+        Log.d(TAG, "Send video data");
     }
 
     /**
@@ -515,62 +241,56 @@ public final class WebRTC {
      * @param message message to be sent in byte format
      */
     public boolean sendMessageData(byte[] message) {
-        ByteBuffer chunkData;
-        if (message.length < MAX_MESSAGE_SIZE) {
-            ByteBuffer data = ByteBuffer.wrap(message);
-            return localDataChannel.send(new DataChannel.Buffer(data, false));
-        }
+        Log.d(TAG, "Send message data");
+        return true;
+    }
+
+    protected String extractSDP(String sdpMessage) {
+        String sdp = null;
         try {
-            byte[] compressData = Zstd.compress(message);
-            int len = compressData.length;
-            int i = 0;
-            while (i < compressData.length) {
-                byte[] chunk = Arrays.copyOfRange(compressData, i, i + min(len, MAX_MESSAGE_SIZE));
-                chunkData = ByteBuffer.wrap(chunk);
-                localDataChannel.send(new DataChannel.Buffer(chunkData, false));
-                i += MAX_MESSAGE_SIZE;
-                len -= MAX_MESSAGE_SIZE;
-            }
-        } catch (Exception e) {
-            Log.e(TAG, "Error during message sending", e);
-            return false;
+            JSONObject jsonMessage = new JSONObject(sdpMessage);
+            sdp = jsonMessage.getJSONObject("sdp").getString("sdp");
+        } catch (JSONException e) {
+            Log.e(TAG, "Failed to extract sdp: " + e.getMessage());
         }
-        chunkData = ByteBuffer.wrap(EOF_MESSAGE.getBytes(StandardCharsets.UTF_8));
-        return localDataChannel.send(new DataChannel.Buffer(chunkData, false));
+        return sdp;
     }
 
-    /**
-     * Interface to create data call back mechanism
-     */
-    public interface ReceiveDataCallback {
-        void pushData(byte[] frame);
+    private String extractICE(String iceMessage) {
+        String ice = null;
+        try {
+            JSONObject jsonMessage = new JSONObject(iceMessage);
+            ice = jsonMessage.getJSONObject("ice").getString("candidate");
+        } catch (JSONException e) {
+            Log.e(TAG, "Failed to extract sdp: " + e.getMessage());
+        }
+        return ice;
     }
 
     /**
-     * Class packet to create a packet
+     * Method to initialize peer connection factory
      */
-    private static class Packet implements Serializable {
-        boolean isString;
-        String obj;
-
-        Packet(String s) {
-            isString = true;
-            obj = s;
-        }
+    private void initializePeerConnectionFactory() {
+        EglBase mRootEglBase;
+        mRootEglBase = EglBase.create();
+        VideoEncoderFactory encoderFactory = new DefaultVideoEncoderFactory(mRootEglBase.getEglBaseContext(), true , true);
+        VideoDecoderFactory decoderFactory = new DefaultVideoDecoderFactory(mRootEglBase.getEglBaseContext());
 
-        Packet(JSONObject json) {
-            isString = false;
-            obj = json.toString();
-        }
+        PeerConnectionFactory.initialize(PeerConnectionFactory.InitializationOptions.builder(appContext).setEnableInternalTracer(true).createInitializationOptions());
+        PeerConnectionFactory.Builder builder = PeerConnectionFactory.builder().setVideoEncoderFactory(encoderFactory).setVideoDecoderFactory(decoderFactory);
+        builder.setOptions(null);
+        connectionFactory = builder.createPeerConnectionFactory();
     }
 
     /**
      * Class to implement SDP observer
      */
-    private static class SimpleSdpObserver implements SdpObserver {
+    protected static class SimpleSdpObserver implements SdpObserver {
+        private static final String TAG = "SimpleSdpObserver";
+
         @Override
         public void onCreateSuccess(SessionDescription sessionDescription) {
-            //Required for future reference
+            Log.d(TAG, "onCreateSuccess");
         }
 
         @Override
@@ -588,188 +308,4 @@ public final class WebRTC {
             Log.d(TAG, "onSetFailure: Reason = " + s);
         }
     }
-
-    /**
-     * Class to implement Frame video capturer
-     */
-    private static class FrameVideoCapturer implements VideoCapturer {
-        private CapturerObserver capturerObserver;
-
-        void send(byte[] frame, int width, int height) {
-            long timestampNS = TimeUnit.MILLISECONDS.toNanos(SystemClock.elapsedRealtime());
-            NV21Buffer buffer = new NV21Buffer(frame, width, height, null);
-            VideoFrame videoFrame = new VideoFrame(buffer, 0, timestampNS);
-            this.capturerObserver.onFrameCaptured(videoFrame);
-            videoFrame.release();
-        }
-
-        @Override
-        public void initialize(SurfaceTextureHelper surfaceTextureHelper, Context context, CapturerObserver capturerObserver) {
-            this.capturerObserver = capturerObserver;
-        }
-
-        public void startCapture(int width, int height, int frameRate) {
-            //Required for future reference
-        }
-
-        public void stopCapture() {
-            //Required for future reference
-        }
-
-        public void changeCaptureFormat(int width, int height, int frameRate) {
-            //Required for future reference
-        }
-
-        public void dispose() {
-            //Required for future reference
-        }
-
-        public boolean isScreencast() {
-            return false;
-        }
-    }
-
-    /**
-     * Class to implement SDP thread
-     */
-    private class SDPThread implements Runnable {
-        private volatile boolean isRunning = true;
-
-        @Override
-        public void run() {
-            isChannelReady = true;
-
-            createSocket();
-            invokeSendMessage();
-            Log.i(TAG, "The SDP thread of WebRTC client started.");
-
-            while (isRunning) {
-                try {
-                    Packet recvPacketNew = (Packet) inputStream.readObject();
-                    if (recvPacketNew != null) {
-                        if (recvPacketNew.isString) {
-                            String message = recvPacketNew.obj;
-                            checkPacketMessage(message);
-                        } else {
-                            JSONObject message = new JSONObject(recvPacketNew.obj);
-                            Log.d(TAG, "connectToSignallingServer: got message " + message);
-                            decodeMessage(message);
-                        }
-                    }
-                } catch (ClassNotFoundException | JSONException | IOException e) {
-                    isRunning = false;
-                    Log.e(TAG, "Error during JSON read", e);
-                }
-            }
-        }
-
-        /**
-         * Method to decode message
-         *
-         * @param message Message received in JSON object format
-         */
-        private void decodeMessage(JSONObject message) {
-            try {
-                if (message.getString("type").equals("offer")) {
-                    Log.d(TAG, "connectToSignallingServer: received an offer " + isInitiator + " " + isStarted);
-                    invokeMaybeStart();
-                    peerConnection.setRemoteDescription(new SimpleSdpObserver(), new SessionDescription(OFFER, message.getString("sdp")));
-                    doAnswer();
-                } else if (message.getString("type").equals("answer") && isStarted) {
-                    peerConnection.setRemoteDescription(new SimpleSdpObserver(), new SessionDescription(ANSWER, message.getString("sdp")));
-                } else if (message.getString("type").equals(CANDIDATE) && isStarted) {
-                    Log.d(TAG, "connectToSignallingServer: receiving candidates");
-                    IceCandidate candidate = new IceCandidate(message.getString("id"), message.getInt("label"), message.getString(CANDIDATE));
-                    peerConnection.addIceCandidate(candidate);
-                }
-            } catch (JSONException e) {
-                Log.e(TAG, "Error during message decoding", e);
-            }
-        }
-
-        /**
-         * Method to create SDP answer for a given SDP offer
-         */
-        private void doAnswer() {
-            peerConnection.createAnswer(new SimpleSdpObserver() {
-                @Override
-                public void onCreateSuccess(SessionDescription sessionDescription) {
-                    peerConnection.setLocalDescription(new SimpleSdpObserver(), sessionDescription);
-                    JSONObject message = new JSONObject();
-                    try {
-                        message.put("type", "answer");
-                        message.put("sdp", sessionDescription.description);
-                        sendMessage(true, message);
-                    } catch (JSONException | IOException e) {
-                        Log.e(TAG, "Error during sdp answer", e);
-                    }
-                }
-            }, new MediaConstraints());
-        }
-
-        /**
-         * Method used to create a socket for SDP negotiation
-         */
-        private void createSocket() {
-            try {
-                if (!isReceiver) {
-                    socket = new Socket(receiverIP, receiverPort);
-                }
-                outStream = new ObjectOutputStream(socket.getOutputStream());
-                inputStream = new ObjectInputStream(socket.getInputStream());
-                Log.i(TAG, "A WebRTC client socket and input/output streams are created.");
-            } catch (Exception e) {
-                Log.e(TAG, "Error during create socket", e);
-            }
-        }
-
-        /**
-         * Method to invoke Signalling handshake message
-         */
-        private void invokeSendMessage() {
-            try {
-                sendMessage(false, "got user media");
-            } catch (Exception e) {
-                Log.e(TAG, "Error during invoke send message", e);
-            }
-        }
-
-        /**
-         * Method to check if the message in received packet is "got user media"
-         */
-        private void checkPacketMessage(String message) {
-            if (message.equals("got user media")) {
-                maybeStart();
-            }
-        }
-
-        /**
-         * Method to invoke MaybeStart()
-         */
-        private void invokeMaybeStart() {
-            if (!isInitiator && !isStarted) {
-                maybeStart();
-            }
-        }
-
-        /**
-         * Method to begin SDP negotiation by sending SDP offer to peer
-         */
-        private void maybeStart() {
-            Log.d(TAG, "maybeStart: " + isStarted + " " + isChannelReady);
-            if (!isStarted && isChannelReady) {
-                isStarted = true;
-                if (isInitiator) {
-                    doCall();
-                }
-            }
-        }
-
-        /**
-         * Method to stop thread
-         */
-        public void stop() {
-            isRunning = false;
-        }
-    }
 }
diff --git a/android/modules/webrtc/src/main/java/com/samsung/android/modules/webrtc/WebRTCPublisher.java b/android/modules/webrtc/src/main/java/com/samsung/android/modules/webrtc/WebRTCPublisher.java
new file mode 100644 (file)
index 0000000..b07be00
--- /dev/null
@@ -0,0 +1,283 @@
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.samsung.android.modules.webrtc;
+
+import static org.webrtc.SessionDescription.Type.OFFER;
+
+import static java.lang.Math.min;
+
+import android.content.Context;
+import android.os.SystemClock;
+import android.util.Log;
+
+import com.github.luben.zstd.Zstd;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.webrtc.CapturerObserver;
+import org.webrtc.DataChannel;
+import org.webrtc.IceCandidate;
+import org.webrtc.MediaConstraints;
+import org.webrtc.MediaStream;
+import org.webrtc.MediaStreamTrack;
+import org.webrtc.NV21Buffer;
+import org.webrtc.PeerConnection;
+import org.webrtc.PeerConnection.SignalingState;
+import org.webrtc.RtpReceiver;
+import org.webrtc.SessionDescription;
+import org.webrtc.SurfaceTextureHelper;
+import org.webrtc.VideoCapturer;
+import org.webrtc.VideoFrame;
+import org.webrtc.VideoSource;
+import org.webrtc.VideoTrack;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+public final class WebRTCPublisher extends WebRTC {
+
+    private static final String TAG = "WebRTCPublisher";
+    private static final String VIDEO_TRACK_ID = "AITT_WEBRTC_v0";
+    private static final String MEDIA_STREAM_ID = "AITT_WEBRTC";
+
+    private VideoTrack videoTrackFromSource;
+    private FrameVideoCapturer videoCapturer;
+
+    public WebRTCPublisher(Context appContext) throws InstantiationException {
+        super(appContext);
+
+        VideoSource videoSource = connectionFactory.createVideoSource(false);
+        createVideoTrack(videoSource);
+        addVideoTrack();
+        createVideoCapturer(videoSource);
+    }
+
+    @Override
+    public void setRemoteDescription(String sdp) {
+        if (sdp == null || sdp.isEmpty())
+            return;
+
+        Log.d(TAG, "Got offer " + sdp);
+        String offer = extractSDP(sdp);
+        peerConnection.setRemoteDescription(new SimpleSdpObserver(), new SessionDescription(OFFER, offer));
+    }
+
+    @Override
+    public void sendVideoData(byte[] frame, int width, int height) {
+        videoCapturer.send(frame, width, height);
+    }
+
+    @Override
+    public boolean sendMessageData(byte[] message) {
+        ByteBuffer chunkData;
+        if (message.length < MAX_MESSAGE_SIZE) {
+            ByteBuffer data = ByteBuffer.wrap(message);
+            return localDataChannel.send(new DataChannel.Buffer(data, false));
+        }
+        try {
+            byte[] compressData = Zstd.compress(message);
+            int len = compressData.length;
+            int i = 0;
+            while (i < compressData.length) {
+                byte[] chunk = Arrays.copyOfRange(compressData, i, i + min(len, MAX_MESSAGE_SIZE));
+                chunkData = ByteBuffer.wrap(chunk);
+                localDataChannel.send(new DataChannel.Buffer(chunkData, false));
+                i += MAX_MESSAGE_SIZE;
+                len -= MAX_MESSAGE_SIZE;
+            }
+        } catch (Exception e) {
+            Log.e(TAG, "Error during message sending", e);
+            return false;
+        }
+        chunkData = ByteBuffer.wrap(EOF_MESSAGE.getBytes(StandardCharsets.UTF_8));
+        return localDataChannel.send(new DataChannel.Buffer(chunkData, false));
+    }
+
+    @Override
+    protected void initializePeerConnection() {
+        PeerConnection.RTCConfiguration rtcConfig = new PeerConnection.RTCConfiguration(new ArrayList<>());
+
+        PeerConnection.Observer pcObserver = new PeerConnection.Observer() {
+            @Override
+            public void onSignalingChange(PeerConnection.SignalingState signalingState) {
+                Log.d(TAG, "onSignalingChange: ");
+                if (signalingState == SignalingState.HAVE_REMOTE_OFFER) {
+                    createAnswer();
+                }
+            }
+
+            @Override
+            public void onIceConnectionChange(PeerConnection.IceConnectionState iceConnectionState) {
+                Log.d(TAG, "onIceConnectionChange: ");
+            }
+
+            @Override
+            public void onIceConnectionReceivingChange(boolean b) {
+                Log.d(TAG, "onIceConnectionReceivingChange: ");
+            }
+
+            @Override
+            public void onIceGatheringChange(PeerConnection.IceGatheringState iceGatheringState) {
+                Log.d(TAG, "onIceGatheringChange: ");
+            }
+
+            @Override
+            public void onIceCandidate(IceCandidate iceCandidate) {
+                Log.d(TAG, "onIceCandidate: " + iceCandidate.sdp);
+                try {
+                    JSONObject candidateMessge = new JSONObject();
+                    candidateMessge.put("candidate", iceCandidate.sdp);
+                    candidateMessge.put("sdpMLineIndex", iceCandidate.sdpMLineIndex);
+
+                    JSONObject iceMessage = new JSONObject();
+                    iceMessage.put("ice", candidateMessge);
+                    iceCandidates.add(iceMessage.toString());
+
+                    if (iceCandidateAddedCallback != null)
+                        iceCandidateAddedCallback.onIceCandidate();
+                } catch (JSONException e) {
+                    Log.e(TAG, "Failed to set ice candidate: " + e.getMessage());
+                }
+            }
+
+            @Override
+            public void onIceCandidatesRemoved(IceCandidate[] iceCandidates) {
+                Log.d(TAG, "onIceCandidatesRemoved: ");
+            }
+
+            @Override
+            public void onAddStream(MediaStream mediaStream) {
+                Log.d(TAG, "onAddStream: " + mediaStream.videoTracks.size());
+                VideoTrack remoteVideoTrack = mediaStream.videoTracks.get(0);
+                remoteVideoTrack.setEnabled(true);
+                // ToDo : update stream state as playing
+            }
+
+            @Override
+            public void onRemoveStream(MediaStream mediaStream) {
+                Log.d(TAG, "onRemoveStream: ");
+            }
+
+            @Override
+            public void onDataChannel(DataChannel dataChannel) {
+                Log.d(TAG, "onDataChannel: ");
+            }
+
+            @Override
+            public void onRenegotiationNeeded() {
+                Log.d(TAG, "onRenegotiationNeeded: ");
+            }
+
+            @Override
+            public void onAddTrack(RtpReceiver rtpReceiver, MediaStream[] mediaStreams) {
+                MediaStreamTrack track = rtpReceiver.track();
+                if (track instanceof VideoTrack) {
+                    Log.i(TAG, "onAddTrack");
+                    VideoTrack remoteVideoTrack = (VideoTrack) track;
+                    remoteVideoTrack.setEnabled(true);
+                }
+            }
+        };
+
+        peerConnection = connectionFactory.createPeerConnection(rtcConfig, pcObserver);
+        if (peerConnection == null)
+            return;
+
+        localDataChannel = peerConnection.createDataChannel("sendDataChannel", new DataChannel.Init());
+    }
+
+    private void createAnswer() {
+        peerConnection.createAnswer(new SimpleSdpObserver() {
+            @Override
+            public void onCreateSuccess(SessionDescription sessionDescription) {
+                Log.d(TAG, "CreateAnswer : onCreateSuccess");
+                peerConnection.setLocalDescription(new SimpleSdpObserver(), sessionDescription);
+                try {
+                    JSONObject sdpMessage = new JSONObject();
+                    sdpMessage.put("type", "answer");
+                    sdpMessage.put("sdp", sessionDescription.description);
+
+                    JSONObject answerMessage = new JSONObject();
+                    answerMessage.put("sdp", sdpMessage);
+                    localDescription = answerMessage.toString();
+                } catch (JSONException e) {
+                    Log.e(TAG, "Failed to set local description: " + e.getMessage());
+                }
+            }
+        }, new MediaConstraints());
+    }
+
+    private void createVideoCapturer(VideoSource videoSource) {
+        videoCapturer = new FrameVideoCapturer();
+        videoCapturer.initialize(null, null, videoSource.getCapturerObserver());
+    }
+
+    /**
+     * Method to create video track
+     */
+    private void createVideoTrack(VideoSource videoSource) {
+        videoTrackFromSource = connectionFactory.createVideoTrack(VIDEO_TRACK_ID, videoSource);
+        videoTrackFromSource.setEnabled(true);
+    }
+
+    /**
+     * Method to add video track
+     */
+    private void addVideoTrack() {
+        MediaStream mediaStream = connectionFactory.createLocalMediaStream(MEDIA_STREAM_ID);
+        mediaStream.addTrack(videoTrackFromSource);
+        peerConnection.addStream(mediaStream);
+    }
+
+    /**
+     * Class to implement Frame video capturer
+     */
+    private static class FrameVideoCapturer implements VideoCapturer {
+        private CapturerObserver capturerObserver;
+
+        void send(byte[] frame, int width, int height) {
+            long timestampNS = TimeUnit.MILLISECONDS.toNanos(SystemClock.elapsedRealtime());
+            NV21Buffer buffer = new NV21Buffer(frame, width, height, null);
+            VideoFrame videoFrame = new VideoFrame(buffer, 0, timestampNS);
+            this.capturerObserver.onFrameCaptured(videoFrame);
+            videoFrame.release();
+        }
+
+        @Override
+        public void initialize(SurfaceTextureHelper surfaceTextureHelper, Context context, CapturerObserver capturerObserver) {
+            this.capturerObserver = capturerObserver;
+        }
+
+        @Override
+        public void startCapture(int i, int i1, int i2) {}
+
+        @Override
+        public void stopCapture() {}
+
+        @Override
+        public void changeCaptureFormat(int i, int i1, int i2) {}
+
+        @Override
+        public void dispose() {}
+
+        public boolean isScreencast() {
+            return false;
+        }
+    }
+}
diff --git a/android/modules/webrtc/src/main/java/com/samsung/android/modules/webrtc/WebRTCServer.java b/android/modules/webrtc/src/main/java/com/samsung/android/modules/webrtc/WebRTCServer.java
deleted file mode 100644 (file)
index 6548f32..0000000
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.samsung.android.modules.webrtc;
-
-import android.content.Context;
-import android.util.Log;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Class to implement WebRTC server related functionalities
- */
-public final class WebRTCServer {
-
-    private static final String TAG = "WebRTCServer";
-
-    private final Context appContext;
-    private final List<WebRTC> connectionList = new ArrayList<>();
-
-    private WebRTC.ReceiveDataCallback dataCallback;
-    private ServerSocket serverSocket = null;
-    private ServerThread serverThread = null;
-    private Thread thread;
-
-    /**
-     * WebRTCServer constructor to create its instance
-     *
-     * @param appContext Application context of the app creating WebRTCServer instance
-     */
-    public WebRTCServer(Context appContext) {
-        if (appContext == null)
-            throw new IllegalArgumentException("App context is null.");
-
-        this.appContext = appContext;
-    }
-
-    /**
-     * Setter to set a WebRTC ReceiveDataCallback
-     *
-     * @param dataCallback Data callback object to create call back mechanism
-     */
-    public void setDataCallback(WebRTC.ReceiveDataCallback dataCallback) {
-        this.dataCallback = dataCallback;
-    }
-
-    /**
-     * Method to start WebRTCServer instance
-     *
-     * @return Returns Port number on success and -1 on failure
-     */
-    public int start() {
-        if (dataCallback == null) {
-            Log.e(TAG, "Data callback is null.");
-            return -1;
-        }
-
-        try {
-            serverSocket = new ServerSocket(0);
-        } catch (IOException e) {
-            Log.e(TAG, "Error during start", e);
-            return -1;
-        }
-        serverThread = new ServerThread();
-        thread = new Thread(serverThread);
-        thread.start();
-        return serverSocket.getLocalPort();
-    }
-
-    /**
-     * Method to stop running WebRTC server instance
-     */
-    public void stop() {
-        if (serverThread != null) {
-            serverThread.stop();
-        }
-        try {
-            if (serverSocket != null) {
-                serverSocket.close();
-            }
-        } catch (IOException e) {
-            Log.e(TAG, "Error during stop", e);
-        }
-        for (WebRTC web : connectionList) {
-            web.disconnect();
-        }
-    }
-
-    /**
-     * Class to implement a server thread
-     */
-    private class ServerThread implements Runnable {
-        private volatile boolean isRunning = true;
-
-        @Override
-        public void run() {
-            while (isRunning) {
-                try {
-                    Socket socket = serverSocket.accept();
-                    WebRTC web = new WebRTC(appContext, socket);
-                    web.connect();
-                    web.registerDataCallback(dataCallback);
-                    connectionList.add(web);
-                } catch (IOException e) {
-                    isRunning = false;
-                    Log.e(TAG, "Error during run", e);
-                }
-            }
-        }
-
-        public void stop() {
-            isRunning = false;
-        }
-    }
-}
diff --git a/android/modules/webrtc/src/main/java/com/samsung/android/modules/webrtc/WebRTCSubscriber.java b/android/modules/webrtc/src/main/java/com/samsung/android/modules/webrtc/WebRTCSubscriber.java
new file mode 100644 (file)
index 0000000..9dbe3f0
--- /dev/null
@@ -0,0 +1,298 @@
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.samsung.android.modules.webrtc;
+
+import static org.webrtc.SessionDescription.Type.ANSWER;
+import static java.lang.Math.max;
+import static java.lang.Math.toIntExact;
+
+import android.content.Context;
+import android.util.Log;
+
+import com.github.luben.zstd.Zstd;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.webrtc.DataChannel;
+import org.webrtc.IceCandidate;
+import org.webrtc.MediaConstraints;
+import org.webrtc.MediaStream;
+import org.webrtc.MediaStreamTrack;
+import org.webrtc.PeerConnection;
+import org.webrtc.RtpReceiver;
+import org.webrtc.SessionDescription;
+import org.webrtc.VideoFrame;
+import org.webrtc.VideoSink;
+import org.webrtc.VideoTrack;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+
+public final class WebRTCSubscriber extends WebRTC {
+
+   private static final String TAG = "WebRTCSubscriber";
+
+   private ByteArrayOutputStream baos;
+   private boolean recvLargeChunk = false;
+
+   public WebRTCSubscriber(Context appContext) throws InstantiationException {
+      super(appContext);
+      baos = new ByteArrayOutputStream();
+   }
+
+   @Override
+   public void setRemoteDescription(String sdp) {
+      if (sdp == null || sdp.isEmpty())
+         return;
+
+      Log.d(TAG, "Got answer " + sdp);
+      String answer = extractSDP(sdp);
+      peerConnection.setRemoteDescription(new SimpleSdpObserver(), new SessionDescription(ANSWER, answer));
+   }
+
+   @Override
+   public boolean setPeerDiscoveryId(String id) {
+      if (peerDiscoveryId != null && !peerDiscoveryId.isEmpty()) {
+         Log.e(TAG, "Stream peer already added");
+         return false;
+      }
+      this.peerDiscoveryId = id;
+      createOffer();
+      return true;
+   }
+
+   @Override
+   protected void initializePeerConnection() {
+      PeerConnection.RTCConfiguration rtcConfig = new PeerConnection.RTCConfiguration(new ArrayList<>());
+
+      PeerConnection.Observer pcObserver = new PeerConnection.Observer() {
+         @Override
+         public void onSignalingChange(PeerConnection.SignalingState signalingState) {
+            Log.d(TAG, "onSignalingChange: ");
+         }
+
+         @Override
+         public void onIceConnectionChange(PeerConnection.IceConnectionState iceConnectionState) {
+            Log.d(TAG, "onIceConnectionChange: ");
+         }
+
+         @Override
+         public void onIceConnectionReceivingChange(boolean b) {
+            Log.d(TAG, "onIceConnectionReceivingChange: ");
+         }
+
+         @Override
+         public void onIceGatheringChange(PeerConnection.IceGatheringState iceGatheringState) {
+            Log.d(TAG, "onIceGatheringChange: ");
+         }
+
+         @Override
+         public void onIceCandidate(IceCandidate iceCandidate) {
+            Log.d(TAG, "onIceCandidate: " + iceCandidate.sdp);
+
+            try {
+               JSONObject candidateMessge = new JSONObject();
+               candidateMessge.put("candidate", iceCandidate.sdp);
+               candidateMessge.put("sdpMLineIndex", iceCandidate.sdpMLineIndex);
+
+               JSONObject iceMessage = new JSONObject();
+               iceMessage.put("ice", candidateMessge);
+               iceCandidates.add(iceMessage.toString());
+
+               if (iceCandidateAddedCallback != null)
+                  iceCandidateAddedCallback.onIceCandidate();
+            } catch (JSONException e) {
+               Log.e(TAG, "Failed to set ice candidate: " + e.getMessage());
+            }
+         }
+
+         @Override
+         public void onIceCandidatesRemoved(IceCandidate[] iceCandidates) {
+            Log.d(TAG, "onIceCandidatesRemoved: ");
+         }
+
+         @Override
+         public void onAddStream(MediaStream mediaStream) {
+            Log.d(TAG, "onAddStream: " + mediaStream.videoTracks.size());
+            VideoTrack remoteVideoTrack = mediaStream.videoTracks.get(0);
+            remoteVideoTrack.setEnabled(true);
+            // ToDo : update stream state as playing
+         }
+
+         @Override
+         public void onRemoveStream(MediaStream mediaStream) {
+            Log.d(TAG, "onRemoveStream: ");
+         }
+
+         @Override
+         public void onDataChannel(DataChannel dataChannel) {
+            Log.d(TAG, "onDataChannel: ");
+            dataChannel.registerObserver(new DataChannel.Observer() {
+               @Override
+               public void onBufferedAmountChange(long l) {
+                  //Keep this callback for future usage
+                  Log.d(TAG, "onBufferedAmountChange:");
+               }
+
+               @Override
+               public void onStateChange() {
+                  Log.d(TAG, "onStateChange: remote data channel state: " + dataChannel.state().toString());
+               }
+
+               @Override
+               public void onMessage(DataChannel.Buffer buffer) {
+                  Log.d(TAG, "onMessage: got message");
+                  if (!recvLargeChunk && buffer.data.capacity() < MAX_MESSAGE_SIZE) {
+                     byte[] array = new byte[buffer.data.capacity()];
+                     buffer.data.rewind();
+                     buffer.data.get(array);
+                     dataCallback.pushData(array);
+                  } else {
+                     String message = StandardCharsets.UTF_8.decode(buffer.data).toString();
+                     handleLargeMessage(message, buffer);
+                  }
+               }
+            });
+         }
+
+         @Override
+         public void onRenegotiationNeeded() {
+            Log.d(TAG, "onRenegotiationNeeded: ");
+         }
+
+         @Override
+         public void onAddTrack(RtpReceiver rtpReceiver, MediaStream[] mediaStreams) {
+            MediaStreamTrack track = rtpReceiver.track();
+            if (track instanceof VideoTrack) {
+               Log.i(TAG, "onAddTrack");
+               VideoTrack remoteVideoTrack = (VideoTrack) track;
+               remoteVideoTrack.setEnabled(true);
+               ProxyVideoSink videoSink = new ProxyVideoSink();
+               remoteVideoTrack.addSink(videoSink);
+            }
+         }
+      };
+
+      peerConnection = connectionFactory.createPeerConnection(rtcConfig, pcObserver);
+      if (peerConnection == null)
+         return;
+
+      localDataChannel = peerConnection.createDataChannel("DataChannel", new DataChannel.Init());
+   }
+
+   private void createOffer() {
+      MediaConstraints sdpMediaConstraints = new MediaConstraints();
+      sdpMediaConstraints.mandatory.add(new MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true"));
+
+      peerConnection.createOffer(new SimpleSdpObserver() {
+         @Override
+         public void onCreateSuccess(SessionDescription sessionDescription) {
+            Log.d(TAG, "CreateOffer : onCreateSuccess");
+            peerConnection.setLocalDescription(new SimpleSdpObserver(), sessionDescription);
+
+            try {
+               JSONObject sdpMessage = new JSONObject();
+               sdpMessage.put("type", "offer");
+               sdpMessage.put("sdp", sessionDescription.description);
+
+               JSONObject offerMessage = new JSONObject();
+               offerMessage.put("sdp", sdpMessage);
+               localDescription = offerMessage.toString();
+            } catch (JSONException e) {
+               Log.e(TAG, "Failed to set local description: " + e.getMessage());
+            }
+         }
+      }, sdpMediaConstraints);
+   }
+
+   private void handleLargeMessage(String message, DataChannel.Buffer buffer) {
+      if (EOF_MESSAGE.equals(message)) {
+         // Received data is in compressed format, un compress it
+         try {
+            byte[] compBytes = baos.toByteArray();
+            long deCompSize = Zstd.decompressedSize(compBytes);
+            byte[] deCompBytes = Zstd.decompress(compBytes, max(toIntExact(deCompSize), MAX_UNCOMPRESSED_MESSAGE_SIZE));
+            Log.d(TAG, "UnCompressed Byte array size: " + deCompBytes.length);
+            dataCallback.pushData(deCompBytes);
+            baos.reset();
+            recvLargeChunk = false;
+         } catch (Exception e) {
+            Log.e(TAG, "Error during UnCompression: ");
+         }
+      } else {
+         recvLargeChunk = true;
+         try {
+            byte[] array = new byte[buffer.data.capacity()];
+            buffer.data.rewind();
+            buffer.data.get(array);
+            baos.write(array);
+         } catch (IOException e) {
+            Log.e(TAG, "Failed to write to byteArrayOutputStream " + e);
+         }
+      }
+   }
+
+   /**
+    * Class to create proxy video sink
+    */
+   private class ProxyVideoSink implements VideoSink {
+
+      /**
+       * Method to send data through data call back
+       *
+       * @param frame VideoFrame to be transferred using media channel
+       */
+      @Override
+      synchronized public void onFrame(VideoFrame frame) {
+         byte[] rawFrame = createNV21Data(frame.getBuffer().toI420());
+         dataCallback.pushData(rawFrame);
+      }
+
+      /**
+       * Method used to convert VideoFrame to NV21 data format
+       *
+       * @param i420Buffer VideoFrame in I420 buffer format
+       * @return the video frame in NV21 data format
+       */
+      public byte[] createNV21Data(VideoFrame.I420Buffer i420Buffer) {
+         final int width = i420Buffer.getWidth();
+         final int height = i420Buffer.getHeight();
+         final int chromaWidth = (width + 1) / 2;
+         final int chromaHeight = (height + 1) / 2;
+         final int ySize = width * height;
+         final ByteBuffer nv21Buffer = ByteBuffer.allocateDirect(ySize + width * chromaHeight);
+         final byte[] nv21Data = nv21Buffer.array();
+         for (int y = 0; y < height; ++y) {
+            for (int x = 0; x < width; ++x) {
+               final byte yValue = i420Buffer.getDataY().get(y * i420Buffer.getStrideY() + x);
+               nv21Data[y * width + x] = yValue;
+            }
+         }
+         for (int y = 0; y < chromaHeight; ++y) {
+            for (int x = 0; x < chromaWidth; ++x) {
+               final byte uValue = i420Buffer.getDataU().get(y * i420Buffer.getStrideU() + x);
+               final byte vValue = i420Buffer.getDataV().get(y * i420Buffer.getStrideV() + x);
+               nv21Data[ySize + y * width + 2 * x] = vValue;
+               nv21Data[ySize + y * width + 2 * x + 1] = uValue;
+            }
+         }
+         return nv21Data;
+      }
+   }
+}
index 80e7814..35c5e7c 100644 (file)
@@ -7,10 +7,8 @@ import org.mockito.Mock;
 import org.mockito.MockedStatic;
 
 import static org.junit.Assert.*;
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
-import static org.mockito.Mockito.when;
 
 import android.content.Context;
 import android.util.Log;
@@ -31,53 +29,23 @@ public class WebRTCUnitTest {
         mockedLog.close();
     }
 
-    @Test
-    public void testWebRTCServerConstructor_P() {
-        WebRTCServer webRTCServer = new WebRTCServer(context);
-        assertNotNull("WebRTCServer instance not null", webRTCServer);
-    }
-
     @Test(expected = IllegalArgumentException.class)
-    public void testWebRTCServerConstructor_N() throws IllegalArgumentException {
-        WebRTCServer webRTCServer = new WebRTCServer(null);
-        assertNotNull("WebRTCServer instance not null", webRTCServer);
-    }
-
-    @Test
-    public void testWebRTCServerStart_P() {
-        WebRTCServer webRTCServer = new WebRTCServer(context);
-        webRTCServer.setDataCallback(frame -> {
-        });
-        webRTCServer.start();
-    }
-
-    @Test
-    public void testWebRTCServerStart_N() {
-        when(Log.e(any(String.class), any(String.class))).thenReturn(any(Integer.class), any(Integer.class));
-        WebRTCServer webRTCServer = new WebRTCServer(context);
-        webRTCServer.setDataCallback(null);
-        assertEquals("Fail to start a WebRTC server", -1, webRTCServer.start());
-    }
-
-    @Test
-    public void testWebRTCServerStop_P() {
-        WebRTCServer webRTCServer = new WebRTCServer(context);
-        webRTCServer.setDataCallback(frame -> {
-        });
-        webRTCServer.start();
-
-        webRTCServer.stop();
-    }
-
-    @Test
-    public void testWebRTCConstructor_P() {
-        WebRTC webRTC = new WebRTC(context);
-        assertNotNull("WebRTC instance not null", webRTC);
+    public void testWebRTCPublisherConstructor_N() throws IllegalArgumentException {
+        try {
+            WebRTC webRTC = new WebRTCPublisher(null);
+            assertNotNull("WebRTC instance not null", webRTC);
+        }  catch (InstantiationException e) {
+            fail("Failed to create WebRTCPublisher" + e);
+        }
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void testWebRTCConstructor_N() throws IllegalArgumentException {
-        WebRTC webRTC = new WebRTC(null);
-        assertNotNull("WebRTC instance not null", webRTC);
+    public void testWebRTCSubscriberConstructor_N() throws IllegalArgumentException {
+        try {
+            WebRTC webRTC = new WebRTCSubscriber(null);
+            assertNotNull("WebRTC instance not null", webRTC);
+        } catch (InstantiationException e) {
+            fail("Failed to create WebRTCSubscriber" + e);
+        }
     }
 }