From a1d3be830d071ac07a8cd440fa0f168ed6f8f354 Mon Sep 17 00:00:00 2001 From: Jung Seungho Date: Thu, 1 Sep 2016 16:43:14 +0900 Subject: [PATCH] modified observe exception, when channel is disconnected Change-Id: I6309712296cb8269c9dba027d64e8656e8479428 Signed-off-by: Jung Seungho Reviewed-on: https://gerrit.iotivity.org/gerrit/11213 Tested-by: jenkins-iotivity Reviewed-by: Jee Hyeok Kim --- .../cloud/ciserver/DeviceServerSystem.java | 59 +++++++++++-- .../iotivity/cloud/base/connector/CoapClient.java | 28 ++++++- .../cloud/base/connector/ConnectorPool.java | 5 ++ .../org/iotivity/cloud/base/device/CoapDevice.java | 96 ++++++++++++++++++++-- .../cloud/base/protocols/MessageBuilder.java | 1 + 5 files changed, 170 insertions(+), 19 deletions(-) diff --git a/cloud/interface/src/main/java/org/iotivity/cloud/ciserver/DeviceServerSystem.java b/cloud/interface/src/main/java/org/iotivity/cloud/ciserver/DeviceServerSystem.java index 14e2479..772a051 100644 --- a/cloud/interface/src/main/java/org/iotivity/cloud/ciserver/DeviceServerSystem.java +++ b/cloud/interface/src/main/java/org/iotivity/cloud/ciserver/DeviceServerSystem.java @@ -22,6 +22,7 @@ package org.iotivity.cloud.ciserver; import java.util.HashMap; +import java.util.Iterator; import org.iotivity.cloud.base.OICConstants; import org.iotivity.cloud.base.ServerSystem; @@ -29,6 +30,7 @@ import org.iotivity.cloud.base.connector.ConnectorPool; import org.iotivity.cloud.base.device.CoapDevice; import org.iotivity.cloud.base.device.Device; import org.iotivity.cloud.base.device.IRequestChannel; +import org.iotivity.cloud.base.exception.ClientException; import org.iotivity.cloud.base.exception.ServerException; import org.iotivity.cloud.base.exception.ServerException.BadRequestException; import org.iotivity.cloud.base.exception.ServerException.UnAuthorizedException; @@ -41,6 +43,7 @@ import org.iotivity.cloud.base.protocols.enums.ResponseStatus; import org.iotivity.cloud.base.server.CoapServer; import org.iotivity.cloud.base.server.HttpServer; import org.iotivity.cloud.base.server.Server; +import org.iotivity.cloud.util.Bytes; import org.iotivity.cloud.util.Cbor; import org.iotivity.cloud.util.Log; @@ -67,13 +70,25 @@ public class DeviceServerSystem extends ServerSystem { } } - public void removeDevice(Device device) { + public void removeDevice(Device device) throws ClientException { String deviceId = ((CoapDevice) device).getDeviceId(); synchronized (mMapDevice) { if (mMapDevice.get(deviceId) == device) { mMapDevice.remove(deviceId); } } + removeObserveDevice(device); + } + + private void removeObserveDevice(Device device) throws ClientException { + Iterator iterator = mMapDevice.keySet().iterator(); + while (iterator.hasNext()) { + String deviceId = iterator.next(); + CoapDevice getDevice = (CoapDevice) mDevicePool + .queryDevice(deviceId); + getDevice.removeObserveChannel( + ((CoapDevice) device).getRequestChannel()); + } } public Device queryDevice(String deviceId) { @@ -101,6 +116,31 @@ public class DeviceServerSystem extends ServerSystem { throw new UnAuthorizedException("token is expired"); } + CoapRequest coapRequest = (CoapRequest) msg; + IRequestChannel targetChannel = null; + if (coapRequest.getUriPathSegments() + .contains(Constants.REQ_DEVICE_ID)) { + CoapDevice targetDevice = (CoapDevice) mDevicePool + .queryDevice(coapRequest.getUriPathSegments() + .get(1)); + targetChannel = targetDevice.getRequestChannel(); + } + switch (coapRequest.getObserve()) { + case SUBSCRIBE: + coapDevice.addObserveRequest( + Bytes.bytesToLong(coapRequest.getToken()), + coapRequest); + coapDevice.addObserveChannel(targetChannel); + break; + case UNSUBSCRIBE: + coapDevice.removeObserveChannel(targetChannel); + coapDevice.removeObserveRequest( + Bytes.bytesToLong(coapRequest.getToken())); + break; + default: + break; + } + } catch (Throwable t) { Log.f(ctx.channel(), t); ResponseStatus responseStatus = t instanceof ServerException @@ -117,27 +157,30 @@ public class DeviceServerSystem extends ServerSystem { @Override public void channelActive(ChannelHandlerContext ctx) { - - // Authenticated device connected Device device = ctx.channel().attr(keyDevice).get(); - mDevicePool.addDevice(device); - device.onConnected(); + // Authenticated device connected sendDevicePresence(device.getDeviceId(), "on"); + mDevicePool.addDevice(device); + + device.onConnected(); } @Override - public void channelInactive(ChannelHandlerContext ctx) { + public void channelInactive(ChannelHandlerContext ctx) + throws ClientException { Device device = ctx.channel().attr(keyDevice).get(); // Some cases, this event occurs after new device connected using // same di. // So compare actual value, and remove if same. if (device != null) { - mDevicePool.removeDevice(device); + sendDevicePresence(device.getDeviceId(), "off"); + device.onDisconnected(); + + mDevicePool.removeDevice(device); ctx.channel().attr(keyDevice).remove(); - sendDevicePresence(device.getDeviceId(), "off"); } } diff --git a/cloud/stack/src/main/java/org/iotivity/cloud/base/connector/CoapClient.java b/cloud/stack/src/main/java/org/iotivity/cloud/base/connector/CoapClient.java index 6feb5cb..369e268 100644 --- a/cloud/stack/src/main/java/org/iotivity/cloud/base/connector/CoapClient.java +++ b/cloud/stack/src/main/java/org/iotivity/cloud/base/connector/CoapClient.java @@ -83,17 +83,17 @@ public class CoapClient implements IRequestChannel, IResponseEventHandler { switch (request.getObserve()) { case UNSUBSCRIBE: - newToken = mSubscription.remove(Bytes.bytesToLong(token)); + newToken = removeObserve(Bytes.bytesToLong(token)); break; case SUBSCRIBE: - mSubscription.put(Bytes.bytesToLong(token), newToken); + addObserve(Bytes.bytesToLong(token), newToken); default: // We create temp token // TODO: temporal handling if (request.getUriPath() .equals(OICConstants.RESOURCE_PRESENCE_FULL_URI)) { - mSubscription.put(Bytes.bytesToLong(token), newToken); + addObserve(Bytes.bytesToLong(token), newToken); observe = Observe.SUBSCRIBE; } synchronized (mToken) { @@ -133,9 +133,31 @@ public class CoapClient implements IRequestChannel, IResponseEventHandler { // Subscription response should stored if (reqInfo.observe != Observe.SUBSCRIBE) { mTokenExchanger.remove(Bytes.bytesToLong(coapResponse.getToken())); + if (reqInfo.observe == Observe.UNSUBSCRIBE && mSubscription + .containsKey(Bytes.bytesToLong(reqInfo.originToken))) { + mSubscription.remove(Bytes.bytesToLong(reqInfo.originToken)); + } } coapResponse.setToken(reqInfo.originToken); reqInfo.responseHandler.onResponseReceived(response); } + + private void addObserve(long token, long newtoken) { + + mSubscription.put(token, newtoken); + } + + private Long removeObserve(long token) { + + Long getToken = mSubscription.remove(token); + return getToken; + } + + public Long isObserveRequest(Long token) { + Long getToken = null; + getToken = mSubscription.get(token); + + return getToken; + } } diff --git a/cloud/stack/src/main/java/org/iotivity/cloud/base/connector/ConnectorPool.java b/cloud/stack/src/main/java/org/iotivity/cloud/base/connector/ConnectorPool.java index a4f3fef..bf57799 100644 --- a/cloud/stack/src/main/java/org/iotivity/cloud/base/connector/ConnectorPool.java +++ b/cloud/stack/src/main/java/org/iotivity/cloud/base/connector/ConnectorPool.java @@ -22,6 +22,7 @@ package org.iotivity.cloud.base.connector; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.HashMap; import org.iotivity.cloud.base.device.IRequestChannel; @@ -44,4 +45,8 @@ public class ConnectorPool { public static IRequestChannel getConnection(String name) { return mConnection.get(name); } + + public static ArrayList getConnectionList() { + return new ArrayList(mConnection.values()); + } } diff --git a/cloud/stack/src/main/java/org/iotivity/cloud/base/device/CoapDevice.java b/cloud/stack/src/main/java/org/iotivity/cloud/base/device/CoapDevice.java index 1e9d37b..196ecca 100644 --- a/cloud/stack/src/main/java/org/iotivity/cloud/base/device/CoapDevice.java +++ b/cloud/stack/src/main/java/org/iotivity/cloud/base/device/CoapDevice.java @@ -21,23 +21,37 @@ */ package org.iotivity.cloud.base.device; +import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; import org.iotivity.cloud.base.connector.CoapClient; +import org.iotivity.cloud.base.connector.ConnectorPool; +import org.iotivity.cloud.base.exception.ClientException; +import org.iotivity.cloud.base.protocols.IRequest; import org.iotivity.cloud.base.protocols.IResponse; +import org.iotivity.cloud.base.protocols.MessageBuilder; +import org.iotivity.cloud.base.protocols.coap.CoapRequest; +import org.iotivity.cloud.base.protocols.coap.CoapResponse; +import org.iotivity.cloud.base.protocols.enums.Observe; +import org.iotivity.cloud.base.protocols.enums.ResponseStatus; +import org.iotivity.cloud.util.Bytes; import org.iotivity.cloud.util.Log; import io.netty.channel.ChannelHandlerContext; public class CoapDevice extends Device { - private CoapClient mCoapClient = null; - private String mUserId = null; - private String mDeviceId = null; - private String mAccessToken = null; - private Date mIssuedTime = null; - private int mExpiredPolicy = 0; + private CoapClient mCoapClient = null; + private String mUserId = null; + private String mDeviceId = null; + private String mAccessToken = null; + private Date mIssuedTime = null; + private int mExpiredPolicy = 0; + private ArrayList mObserveChannelList = new ArrayList<>(); + private HashMap mObserveRequestList = new HashMap<>(); - private static final int INFINITE_TIME = -1; + private static final int INFINITE_TIME = -1; public CoapDevice(ChannelHandlerContext ctx) { super(ctx); @@ -85,10 +99,60 @@ public class CoapDevice extends Device { this.mAccessToken = accessToken; } + public void addObserveChannel(IRequestChannel channel) { + + if (channel != null) { + mObserveChannelList.add(channel); + } + } + + public void removeObserveChannel(IRequestChannel channel) + throws ClientException { + if (channel != null && mObserveChannelList.contains(channel)) { + + Iterator iterator = mObserveRequestList.keySet().iterator(); + while (iterator.hasNext()) { + Long token = iterator.next(); + CoapClient coapClient = (CoapClient) channel; + if (coapClient.isObserveRequest(token) != null) { + IRequest getRequest = mObserveRequestList.get(token); + + CoapRequest coapRequest = (CoapRequest) getRequest; + coapRequest.setObserve(Observe.UNSUBSCRIBE); + + coapClient.onResponseReceived(MessageBuilder.createResponse( + coapRequest, ResponseStatus.CONTENT, null, null)); + } + } + mObserveChannelList.remove(channel); + } + } + + public void addObserveRequest(Long token, IRequest request) { + + mObserveRequestList.put(token, request); + } + + public void removeObserveRequest(Long token) { + + mObserveRequestList.remove(token); + } + // This is called by cloud resource model @Override public void sendResponse(IResponse response) { // This message must converted to CoapResponse + CoapResponse coapResp = (CoapResponse) response; + + Iterator iterator = mObserveRequestList.keySet().iterator(); + while (iterator.hasNext()) { + Long token = iterator.next(); + Long respToken = Bytes.bytesToLong(coapResp.getToken()); + if (respToken.equals(token) + && coapResp.getObserve().equals(Observe.UNSUBSCRIBE)) { + iterator.remove(); + } + } ctx.channel().writeAndFlush(response); } @@ -121,10 +185,26 @@ public class CoapDevice extends Device { @Override public void onConnected() { + mObserveChannelList.addAll(ConnectorPool.getConnectionList()); } @Override public void onDisconnected() { + for (IRequestChannel serverChannel : mObserveChannelList) { + Iterator iterator = mObserveRequestList.keySet().iterator(); + while (iterator.hasNext()) { + Long token = iterator.next(); + CoapClient coapClient = (CoapClient) serverChannel; + + if (coapClient.isObserveRequest(token) != null) { + CoapRequest coapRequest = (CoapRequest) mObserveRequestList + .get(token); + coapRequest.setObserve(Observe.UNSUBSCRIBE); + coapRequest.setToken(Bytes.longTo8Bytes(token)); + serverChannel.sendRequest(MessageBuilder.modifyRequest( + coapRequest, null, null, null, null), this); + } + } + } } - } diff --git a/cloud/stack/src/main/java/org/iotivity/cloud/base/protocols/MessageBuilder.java b/cloud/stack/src/main/java/org/iotivity/cloud/base/protocols/MessageBuilder.java index 5a703b7..e4fb793 100644 --- a/cloud/stack/src/main/java/org/iotivity/cloud/base/protocols/MessageBuilder.java +++ b/cloud/stack/src/main/java/org/iotivity/cloud/base/protocols/MessageBuilder.java @@ -46,6 +46,7 @@ public class MessageBuilder { CoapResponse coapResponse = new CoapResponse(responseStatus); coapResponse.setUriPath(coapRequest.getUriPath()); coapResponse.setToken(coapRequest.getToken()); + coapResponse.setObserve(coapRequest.getObserve()); if (payload != null) { coapResponse.setContentFormat(format); coapResponse.setPayload(payload); -- 2.7.4