import com.google.flatbuffers.FlexBuffers;
import com.google.flatbuffers.FlexBuffersBuilder;
-import com.samsung.android.modules.webrtc.WebRTC;
-import com.samsung.android.modules.webrtc.WebRTCServer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
TCP(0x1 << 1), // Publish message to peers using the TCP
UDP(0x1 << 2), // Publish message to peers using the UDP
SRTP(0x1 << 3), // Publish message to peers using the SRTP
- WEBRTC(0x1 << 4); // Publish message to peers using the WEBRTC
+ WEBRTC(0x1 << 4), // Publish message to peers using the WEBRTC
+ IPC(0x1 << 5); // Publish message to peers using the IPC
private final int value;
if (protocols.isEmpty()) {
throw new IllegalArgumentException("Invalid protocols");
}
- if (protocols.contains(Protocol.WEBRTC)) {
- try {
- synchronized (this) {
- if (!publishTable.containsKey(topic)) {
- Log.e(TAG, "Invalid publish request over unsubscribed topic");
- return;
- }
- HostTable hostTable = publishTable.get(topic);
- for (String hostIp : hostTable.hostMap.keySet()) {
- PortTable portTable = hostTable.hostMap.get(hostIp);
- for (Integer port : portTable.portMap.keySet()) {
- Object transportHandler = portTable.portMap.get(port).second;
- publishWebRTC(portTable, topic, transportHandler, hostIp, port, message);
+
+ int jniProtocols = 0;
+
+ for (Protocol p : protocols) {
+ if (!classifyProtocol(p)) {
+ jniProtocols += p.getValue();
+ protocols.remove(p);
+ }
+ }
+
+ if(jniProtocols > 0) {
+ publishJNI(instance, topic, message, message.length, jniProtocols, qos.ordinal(), retain);
+ }
+
+ for(Protocol pro : protocols) {
+ try {
+ synchronized (this) {
+ if (!publishTable.containsKey(topic)) {
+ Log.e(TAG, "Invalid publish request over unsubscribed topic");
+ return;
+ }
+ HostTable hostTable = publishTable.get(topic);
+ for (String hostIp : hostTable.hostMap.keySet()) {
+ PortTable portTable = hostTable.hostMap.get(hostIp);
+ for (Integer port : portTable.portMap.keySet()) {
+ Object transportHandler = portTable.portMap.get(port).second;
+ if(portTable.portMap.get(port).first == pro)
+ publishHandler(pro, portTable, topic, transportHandler, hostIp, port, message);
+ }
}
}
+ } catch (Exception e) {
+ Log.e(TAG, "Error during publish", e);
}
- } catch (Exception e) {
- Log.e(TAG, "Error during publish", e);
- }
- } else {
- int proto = protocolsToInt(protocols);
- publishJNI(instance, topic, message, message.length, proto, qos.ordinal(), retain);
}
}
/**
- * Method used to identify data type for webRTC channel and transfer data
+ * Method to create transportHandler and publish message based on protocol
+ * @param protocol protocol using which data needs to be published
* @param portTable portTable has information about port and associated protocol with transport Handler object
* @param topic The topic to which data is published
- * @param transportHandler WebRTC object instance
+ * @param transportHandlerObject transportHandler object used to publish message
* @param ip IP address of the destination
* @param port Port number of the destination
* @param message Data to be tranferred over WebRTC
*/
- private void publishWebRTC(PortTable portTable, String topic, Object transportHandler, String ip, int port, byte[] message) {
- WebRTC.DataType dataType = topic.endsWith(RESPONSE_POSTFIX) ? WebRTC.DataType.MESSAGE : WebRTC.DataType.VIDEOFRAME;
- WebRTC webrtcHandler;
- if (transportHandler == null) {
- webrtcHandler = new WebRTC(dataType, appContext);
- transportHandler = webrtcHandler;
- portTable.portMap.replace(port, new Pair<>(Protocol.WEBRTC, transportHandler));
- webrtcHandler.connect(ip, port);
- } else {
- webrtcHandler = (WebRTC) transportHandler;
+ private void publishHandler(Protocol protocol, PortTable portTable, String topic, Object transportHandlerObject, String ip, int port, byte[] message){
+ TransportHandler transportHandler;
+ if(transportHandlerObject == null){
+ transportHandler = TransportFactory.createTransport(protocol);
+ if(transportHandler!=null)
+ transportHandler.setAppContext(appContext);
+ portTable.portMap.replace(port, new Pair<>(protocol, transportHandler));
+ }else{
+ transportHandler = (TransportHandler) transportHandlerObject;
}
- if (dataType == WebRTC.DataType.MESSAGE) {
- webrtcHandler.sendMessageData(message);
- } else if (dataType == WebRTC.DataType.VIDEOFRAME) {
- webrtcHandler.sendVideoData(message, frameWidth, frameHeight);
+
+ if(transportHandler!=null){
+ transportHandler.publish(topic,ip, port, message);
}
}
/**
+ * Method to differentiate android specific protocol
+ * @param protocols Protocol to be classified
+ * @return true if the protocol is specific to android implementation
+ */
+ private boolean classifyProtocol(Protocol protocols){
+ return protocols.equals(Protocol.WEBRTC) || protocols.equals(Protocol.IPC);
+ }
+
+ /**
* Method to subscribe to a specific topic
* @param topic String to which applications can subscribe, to receive data
* @param callback Callback object specific to a subscribe call
if (protocols.isEmpty()) {
throw new IllegalArgumentException("Invalid protocols");
}
- try {
- if (protocols.contains(Protocol.WEBRTC)) {
- WebRTC.ReceiveDataCallback cb = frame -> {
- AittMessage message = new AittMessage(frame);
- message.setTopic(topic);
- messageReceived(message);
- };
- WebRTC.DataType dataType = topic.endsWith(RESPONSE_POSTFIX) ? WebRTC.DataType.MESSAGE : WebRTC.DataType.VIDEOFRAME;
- WebRTCServer ws = new WebRTCServer(appContext, dataType, cb);
- int serverPort = ws.start();
- if (serverPort < 0) {
- throw new IllegalArgumentException("Failed to start webRTC server-socket");
- }
- synchronized (this) {
- subscribeMap.put(topic, new Pair(Protocol.WEBRTC, ws));
- }
- byte[] data = wrapPublishData(topic, serverPort);
- publishJNI(instance, JAVA_SPECIFIC_DISCOVERY_TOPIC, data, data.length, Protocol.MQTT.value, QoS.EXACTLY_ONCE.ordinal(), true);
- } else {
- int proto = protocolsToInt(protocols);
- Long pObject = subscribeJNI(instance, topic, proto, qos.ordinal());
- synchronized (this) {
- aittSubId.put(topic, pObject);
- }
+
+ int jniProtocols = 0;
+
+ for (Protocol p : protocols) {
+ if (!classifyProtocol(p)) {
+ jniProtocols += p.getValue();
+ protocols.remove(p);
}
- } catch (Exception e) {
- Log.e(TAG, "Error during subscribe", e);
}
- addCallBackToSubscribeMap(topic, callback);
- }
- /**
- * Method to wrap topic, device IP address, webRTC server instance port number for publishing
- * @param topic Topic to which the application has subscribed to
- * @param serverPort Port number of the WebRTC server instance
- * @return Byte data wrapped, contains topic, device IP, webRTC server port number
- */
- private byte[] wrapPublishData(String topic, int serverPort) {
- FlexBuffersBuilder fbb = new FlexBuffersBuilder(ByteBuffer.allocate(512));
- {
- int smap = fbb.startMap();
- fbb.putString(STATUS, JOIN_NETWORK);
- fbb.putString("host", this.ip);
- {
- int smap1 = fbb.startMap();
- fbb.putInt("protocol", Protocol.WEBRTC.value);
- fbb.putInt("port", serverPort);
- fbb.endMap(topic, smap1);
+ if(jniProtocols > 0) {
+ Long pObject = subscribeJNI(instance, topic, jniProtocols, qos.ordinal());
+ synchronized (this) {
+ aittSubId.put(topic, pObject);
+ }
+ }
+
+ for(Protocol pro : protocols) {
+ try {
+ TransportHandler transportHandler = TransportFactory.createTransport(pro);
+
+ if (transportHandler != null) {
+ synchronized (this) {
+ subscribeMap.put(topic, new Pair(pro, transportHandler));
+ }
+
+ transportHandler.setAppContext(appContext);
+ transportHandler.setSelfIP(ip);
+ transportHandler.subscribe(topic, data -> {
+ AittMessage message = new AittMessage(data);
+ message.setTopic(topic);
+ messageReceived(message);
+ });
+ byte[] data = transportHandler.getPublishData();
+ publishJNI(instance, JAVA_SPECIFIC_DISCOVERY_TOPIC, data, data.length, Protocol.MQTT.value, QoS.EXACTLY_ONCE.ordinal(), true);
+ }
+ } catch (Exception e) {
+ Log.e(TAG, "Error during subscribe", e);
}
- fbb.endMap(null, smap);
+ addCallBackToSubscribeMap(topic, callback);
}
- ByteBuffer buffer = fbb.finish();
- byte[] data = new byte[buffer.remaining()];
- buffer.get(data, 0, data.length);
- return data;
}
/**
try {
synchronized (this) {
if (subscribeMap.containsKey(topic) && subscribeMap.get(topic).first == Protocol.WEBRTC) {
- WebRTCServer ws = (WebRTCServer) subscribeMap.get(topic).second;
- ws.stop();
+ TransportHandler transportHandler = (TransportHandler) subscribeMap.get(topic).second;
+ transportHandler.unsubscribe();
subscribeMap.remove(topic);
isRemoved = true;
}
}
/**
- * Method used to convert EnumSet protocol into int
- * @param protocols List of protocols
- * @return The protocol value
- */
- private int protocolsToInt(EnumSet<Protocol> protocols) {
- int proto = 0;
- for (Protocol p : Protocol.values()) {
- if (protocols.contains(p)) {
- proto += p.getValue();
- }
- }
- return proto;
- }
-
- /**
* Method to close all the callbacks and release resources
*/
public void close() {
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.samsung.android.aitt;
+
+import android.content.Context;
+
+/**
+ * An interface to create a transportHandler and provide APIs for protocol specific implementation
+ */
+public interface TransportHandler {
+ /**
+ * Method to implement protocol specific subscribe functionalities
+ * @param topic String topic to which subscribe is called
+ * @param handlerDataCallback callback object to send data from transport handler layer to aitt layer
+ */
+ void subscribe(String topic, HandlerDataCallback handlerDataCallback);
+
+ /**
+ * Method to implement protocol specific publish functionalities
+ * @param topic String topic to which publish is called
+ * @param ip IP address of the destination
+ * @param port port number of the destination
+ * @param message message to be published to specific topic
+ */
+ void publish(String topic, String ip, int port, byte[] message);
+
+ /**
+ * Method to implement protocol specific unsubscribe functionalities
+ */
+ void unsubscribe();
+
+ /**
+ * Method to implement protocol specific disconnect functionalities
+ */
+ void disconnect();
+
+ /**
+ * Method to set application context to transport handler
+ * @param appContext application context
+ */
+ void setAppContext(Context appContext);
+
+ /**
+ * Method to set IP address of self device to transport handler
+ * @param ip IP address of the device
+ */
+ void setSelfIP(String ip);
+
+ /**
+ * Method to get data to publish self details
+ * @return returns details of self device
+ */
+ byte[] getPublishData();
+
+
+ /**
+ * Interface to implement handler data callback mechanism
+ */
+ interface HandlerDataCallback{
+ void pushHandlerData(byte[] frame);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.samsung.android.aitt;
+
+import android.content.Context;
+
+import com.google.flatbuffers.FlexBuffersBuilder;
+import com.samsung.android.modules.webrtc.WebRTC;
+import com.samsung.android.modules.webrtc.WebRTCServer;
+
+import java.nio.ByteBuffer;
+
+public class WebRTCHandler implements TransportHandler{
+
+ private static final String RESPONSE_POSTFIX = "_AittRe_";
+ private static final String JOIN_NETWORK = "connected";
+ private static final String STATUS = "status";
+ private Context appContext;
+ private String ip;
+ private byte[] publishData;
+ private WebRTC webrtc;
+ private WebRTCServer ws;
+ //ToDo - For now using sample app parameters, later fetch frameWidth & frameHeight from app
+ private final Integer frameWidth = 640;
+ private final Integer frameHeight = 480;
+
+ @Override
+ public void setAppContext(Context appContext) {
+ this.appContext = appContext;
+ }
+
+ @Override
+ public void setSelfIP(String ip) {
+ this.ip = ip;
+ }
+
+ @Override
+ public void subscribe(String topic, HandlerDataCallback handlerDataCallback) {
+ WebRTC.ReceiveDataCallback cb = data -> {
+ handlerDataCallback.pushHandlerData(data);
+ };
+ WebRTC.DataType dataType = topic.endsWith(RESPONSE_POSTFIX) ? WebRTC.DataType.MESSAGE : WebRTC.DataType.VIDEOFRAME;
+ ws = new WebRTCServer(appContext, dataType, cb);
+ int serverPort = ws.start();
+ if (serverPort < 0) {
+ throw new IllegalArgumentException("Failed to start webRTC server-socket");
+ }
+
+ publishData = wrapPublishData(topic, serverPort);
+ }
+
+ @Override
+ public byte[] getPublishData() {
+ return publishData;
+ }
+
+ /**
+ * Method to wrap topic, device IP address, webRTC server instance port number for publishing
+ * @param topic Topic to which the application has subscribed to
+ * @param serverPort Port number of the WebRTC server instance
+ * @return Byte data wrapped, contains topic, device IP, webRTC server port number
+ */
+ private byte[] wrapPublishData(String topic, int serverPort) {
+ FlexBuffersBuilder fbb = new FlexBuffersBuilder(ByteBuffer.allocate(512));
+ {
+ int smap = fbb.startMap();
+ fbb.putString(STATUS, JOIN_NETWORK);
+ fbb.putString("host", ip);
+ {
+ int smap1 = fbb.startMap();
+ fbb.putInt("protocol", Aitt.Protocol.WEBRTC.getValue());
+ fbb.putInt("port", serverPort);
+ fbb.endMap(topic, smap1);
+ }
+ fbb.endMap(null, smap);
+ }
+ ByteBuffer buffer = fbb.finish();
+ byte[] data = new byte[buffer.remaining()];
+ buffer.get(data, 0, data.length);
+ return data;
+ }
+
+ @Override
+ public void publish(String topic, String ip, int port, byte[] message) {
+ WebRTC.DataType dataType = topic.endsWith(RESPONSE_POSTFIX) ? WebRTC.DataType.MESSAGE : WebRTC.DataType.VIDEOFRAME;
+ if (webrtc == null) {
+ webrtc = new WebRTC(dataType, appContext);
+ webrtc.connect(ip, port);
+ }
+ if (dataType == WebRTC.DataType.MESSAGE) {
+ webrtc.sendMessageData(message);
+ } else if (dataType == WebRTC.DataType.VIDEOFRAME) {
+ webrtc.sendVideoData(message, frameWidth, frameHeight);
+ }
+ }
+
+ @Override
+ public void unsubscribe() {
+ ws.stop();
+ }
+
+ @Override
+ public void disconnect() {
+
+ }
+}