private final JniInterface mJniInterface;
private final String ip;
private final Context appContext;
- private final long instance;
private Map<String, ArrayList<SubscribeCallback>> subscribeCallbacks = new HashMap<>();
private Map<String, Long> aittSubId = new HashMap<>();
throw new IllegalArgumentException("Invalid ip");
mJniInterface = new JniInterface();
- instance = mJniInterface.init(id, ip, clearSession);
+ long instance = mJniInterface.init(id, ip, clearSession);
if (instance == 0L)
throw new InstantiationException("Failed to instantiate native instance");
public void disconnect() {
mJniInterface.publish(Definitions.JAVA_SPECIFIC_DISCOVERY_TOPIC, new byte[0], 0, Protocol.MQTT.getValue(), QoS.AT_LEAST_ONCE.ordinal(), true);
+ // ToDo : Disconnect the running stream instances
mJniInterface.disconnect();
try {
close();
jniProtocols += p.getValue();
protocols.remove(p);
}
+ if (isStreamProtocol(p)) {
+ Log.w(TAG, "Use AittStream interface to publish with " + p + " protocol");
+ protocols.remove(p);
+ }
}
if (jniProtocols > 0)
mJniInterface.publish(topic, message, message.length, jniProtocols, qos.ordinal(), retain);
- publishTransportProtocols(topic, message, protocols);
+ if (!protocols.isEmpty())
+ publishTransportProtocols(topic, message, protocols);
}
/**
* @param protocols Protocols to be used to publish message
*/
private void publishTransportProtocols(String topic, byte[] message, EnumSet<Protocol> protocols) {
- for (Protocol protocol : protocols) {
- try {
- synchronized (this) {
- HostTable hostTable = getHostTable(topic);
- for (String hostIp : hostTable.hostMap.keySet()) {
- PortTable portTable = hostTable.hostMap.get(hostIp);
- if (portTable == null) {
- Log.e(TAG, "Port table for host [" + hostIp + "] is null.");
- continue;
- }
- for (Integer port : portTable.portMap.keySet()) {
- Pair<Protocol, Object> protocolPair = portTable.portMap.get(port);
- if (protocolPair == null) {
- Log.e(TAG, "Pair for port: " + port + "is null.");
- continue;
- }
- if (protocolPair.first == protocol)
- publishHandler(protocol, portTable, topic, protocolPair.second, hostIp, port, message);
- }
- }
- }
- } catch (Exception e) {
- Log.e(TAG, "Error during publishing transport protocols", e);
+ synchronized (this) {
+ if (!publishTable.containsKey(topic)) {
+ Log.e(TAG, "No subscriber for the topic: " + topic);
+ return;
}
- }
- }
- /**
- * Method to get a host table related to a specific topic
- *
- * @param topic String to which message needs to be published
- */
- private HostTable getHostTable(String topic) {
- if (!publishTable.containsKey(topic))
- throw new IllegalArgumentException("Invalid publish request over an unsubscribed topic");
-
- HostTable hostTable = publishTable.get(topic);
- if (hostTable == null)
- throw new IllegalArgumentException("Host table for topic [" + topic + "] is null.");
+ HostTable hostTable = publishTable.get(topic);
+ if (hostTable == null) {
+ Log.e(TAG, "No host table entry for topic: " + topic);
+ return;
+ }
- return hostTable;
+ for (Map.Entry<String, PortTable> entry : hostTable.hostMap.entrySet()) {
+ String hostIp = entry.getKey();
+ PortTable portTable = entry.getValue();
+ if (portTable == null)
+ continue;
+ for (Integer port : portTable.portMap.keySet()) {
+ Pair<Protocol, Object> protocolHandlerPair = portTable.portMap.get(port);
+ if (protocolHandlerPair == null || !protocols.contains(protocolHandlerPair.first))
+ continue;
+ publishHandler(protocolHandlerPair.first, portTable, topic, protocolHandlerPair.second, hostIp, port, message);
+ }
+ }
+ }
}
/**
* @param message Data to be transferred over WebRTC
*/
private void publishHandler(Protocol protocol, PortTable portTable, String topic, Object moduleHandlerObject, String ip, int port, byte[] message) {
- // TODO: Validate protocol type.
- TransportHandler transportHandler;
- if (moduleHandlerObject == null) {
- transportHandler = (TransportHandler) createModuleHandler(protocol);
+ try {
+ TransportHandler transportHandler;
+ if (moduleHandlerObject == null) {
+ transportHandler = (TransportHandler) createModuleHandler(protocol);
+ if (transportHandler != null)
+ transportHandler.setAppContext(appContext);
+ portTable.portMap.replace(port, new Pair<>(protocol, transportHandler));
+ } else {
+ transportHandler = (TransportHandler) moduleHandlerObject;
+ }
+
if (transportHandler != null)
- transportHandler.setAppContext(appContext);
- portTable.portMap.replace(port, new Pair<>(protocol, transportHandler));
- } else {
- transportHandler = (TransportHandler) moduleHandlerObject;
+ transportHandler.publish(topic, ip, port, message);
+ } catch (Exception e) {
+ Log.e(TAG, "Error during publishing transport protocols " + e.getMessage());
}
-
- if (transportHandler != null)
- transportHandler.publish(topic, ip, port, message);
}
/**
* Method to differentiate android specific protocol
*
- * @param protocols Protocol to be classified
+ * @param protocol Protocol to be classified
* @return true if the protocol is using native publish/subscribe
*/
- private boolean isUsingNativePubSub(Protocol protocols) {
- return protocols.equals(Protocol.MQTT) || protocols.equals(Protocol.TCP) || protocols.equals(Protocol.TCP_SECURE);
+ private boolean isUsingNativePubSub(Protocol protocol) {
+ return protocol.equals(Protocol.MQTT) || protocol.equals(Protocol.TCP) || protocol.equals(Protocol.TCP_SECURE);
+ }
+
+ /**
+ * Method to check if it is a stream protocol
+ *
+ * @param protocol Protocol to be classified
+ * @return true if the protocol is a stream protocol
+ */
+ private boolean isStreamProtocol(Protocol protocol) {
+ return protocol.equals(Protocol.RTSP) || protocol.equals(Protocol.WEBRTC);
}
/**
HostTable hostTable = publishTable.get(topic);
if (hostTable == null) {
- Log.d(TAG, "Host table for topic[" + topic + "] is null,");
+ Log.d(TAG, "[updatePublishTable] No host table entry for topic: " + topic);
return;
}
if (!hostTable.hostMap.containsKey(host)) {
PortTable portTable = hostTable.hostMap.get(host);
if (portTable == null) {
- Log.d(TAG, "Port table for host[" + host + "] is null.");
+ Log.d(TAG, "[updatePublishTable] No port table entry for host: " + host);
return;
}
if (portTable.portMap.containsKey(port)) {
portLists.append(p);
portLists.append(", ");
}
- Log.d(TAG, "Existing ports list: " + portLists);
+ Log.d(TAG, "[updatePublishTable] Existing ports list: " + portLists);
portTable.portMap.clear();
}
- Log.d(TAG, "[updatePublishTable] port " + port + "is added. (Topic,Host) = (" + topic + "," + host + ")");
+ Log.d(TAG, "[updatePublishTable] Port " + port + "is added. (Topic,Host) = (" + topic + "," + host + ")");
portTable.portMap.put(port, new Pair<>(protocol, null));
}
}
}
@Test
- public void testPublishWebRTC_P() {
- try {
- shadowJniInterface.setInitReturn(true);
- Aitt aitt = new Aitt(appContext, aittId);
- aitt.connect(brokerIp, port);
-
- byte[] payload = message.getBytes();
- aitt.publish(topic, payload, Aitt.Protocol.WEBRTC, Aitt.QoS.AT_MOST_ONCE, false);
-
- aitt.disconnect();
- } catch (Exception e) {
- fail("Failed testPublishWebRTC " + e);
- }
- }
-
- @Test
- public void testPublishWebRTCInvalidTopic_N() {
- try {
- shadowJniInterface.setInitReturn(true);
- Aitt aitt = new Aitt(appContext, aittId);
- aitt.connect(brokerIp, port);
-
- String _topic = "";
- byte[] payload = message.getBytes();
- assertThrows(IllegalArgumentException.class, () -> aitt.publish(_topic, payload, Aitt.Protocol.WEBRTC, Aitt.QoS.AT_MOST_ONCE, false));
-
- aitt.disconnect();
- } catch (Exception e) {
- fail("Failed testPublishWebRTCInvalidTopic" + e);
- }
- }
-
- @Test
public void testPublishIpc_P() {
try {
shadowJniInterface.setInitReturn(true);
aitt.connect(brokerIp, port);
byte[] payload = message.getBytes();
- aitt.publish(topic, payload, Aitt.Protocol.TCP, Aitt.QoS.AT_LEAST_ONCE, false);
+ aitt.publish(topic, payload, Aitt.Protocol.MQTT);
+ aitt.publish(topic, payload, Aitt.Protocol.TCP, Aitt.QoS.EXACTLY_ONCE);
+ aitt.publish(topic, payload, Aitt.Protocol.TCP, Aitt.QoS.AT_LEAST_ONCE, true);
aitt.disconnect();
} catch (Exception e) {