modified observe exception, when channel is disconnected upstream_real
authorJung Seungho <shonest.jung@samsung.com>
Thu, 1 Sep 2016 07:43:14 +0000 (16:43 +0900)
committerJee Hyeok Kim <jihyeok13.kim@samsung.com>
Sun, 11 Sep 2016 09:02:05 +0000 (09:02 +0000)
Change-Id: I6309712296cb8269c9dba027d64e8656e8479428
Signed-off-by: Jung Seungho <shonest.jung@samsung.com>
Reviewed-on: https://gerrit.iotivity.org/gerrit/11213
Tested-by: jenkins-iotivity <jenkins-iotivity@opendaylight.org>
Reviewed-by: Jee Hyeok Kim <jihyeok13.kim@samsung.com>
cloud/interface/src/main/java/org/iotivity/cloud/ciserver/DeviceServerSystem.java
cloud/stack/src/main/java/org/iotivity/cloud/base/connector/CoapClient.java
cloud/stack/src/main/java/org/iotivity/cloud/base/connector/ConnectorPool.java
cloud/stack/src/main/java/org/iotivity/cloud/base/device/CoapDevice.java
cloud/stack/src/main/java/org/iotivity/cloud/base/protocols/MessageBuilder.java

index 14e2479..772a051 100644 (file)
@@ -22,6 +22,7 @@
 package org.iotivity.cloud.ciserver;
 
 import java.util.HashMap;
+import java.util.Iterator;
 
 import org.iotivity.cloud.base.OICConstants;
 import org.iotivity.cloud.base.ServerSystem;
@@ -29,6 +30,7 @@ import org.iotivity.cloud.base.connector.ConnectorPool;
 import org.iotivity.cloud.base.device.CoapDevice;
 import org.iotivity.cloud.base.device.Device;
 import org.iotivity.cloud.base.device.IRequestChannel;
+import org.iotivity.cloud.base.exception.ClientException;
 import org.iotivity.cloud.base.exception.ServerException;
 import org.iotivity.cloud.base.exception.ServerException.BadRequestException;
 import org.iotivity.cloud.base.exception.ServerException.UnAuthorizedException;
@@ -41,6 +43,7 @@ import org.iotivity.cloud.base.protocols.enums.ResponseStatus;
 import org.iotivity.cloud.base.server.CoapServer;
 import org.iotivity.cloud.base.server.HttpServer;
 import org.iotivity.cloud.base.server.Server;
+import org.iotivity.cloud.util.Bytes;
 import org.iotivity.cloud.util.Cbor;
 import org.iotivity.cloud.util.Log;
 
@@ -67,13 +70,25 @@ public class DeviceServerSystem extends ServerSystem {
             }
         }
 
-        public void removeDevice(Device device) {
+        public void removeDevice(Device device) throws ClientException {
             String deviceId = ((CoapDevice) device).getDeviceId();
             synchronized (mMapDevice) {
                 if (mMapDevice.get(deviceId) == device) {
                     mMapDevice.remove(deviceId);
                 }
             }
+            removeObserveDevice(device);
+        }
+
+        private void removeObserveDevice(Device device) throws ClientException {
+            Iterator<String> iterator = mMapDevice.keySet().iterator();
+            while (iterator.hasNext()) {
+                String deviceId = iterator.next();
+                CoapDevice getDevice = (CoapDevice) mDevicePool
+                        .queryDevice(deviceId);
+                getDevice.removeObserveChannel(
+                        ((CoapDevice) device).getRequestChannel());
+            }
         }
 
         public Device queryDevice(String deviceId) {
@@ -101,6 +116,31 @@ public class DeviceServerSystem extends ServerSystem {
                         throw new UnAuthorizedException("token is expired");
                     }
 
+                    CoapRequest coapRequest = (CoapRequest) msg;
+                    IRequestChannel targetChannel = null;
+                    if (coapRequest.getUriPathSegments()
+                            .contains(Constants.REQ_DEVICE_ID)) {
+                        CoapDevice targetDevice = (CoapDevice) mDevicePool
+                                .queryDevice(coapRequest.getUriPathSegments()
+                                        .get(1));
+                        targetChannel = targetDevice.getRequestChannel();
+                    }
+                    switch (coapRequest.getObserve()) {
+                        case SUBSCRIBE:
+                            coapDevice.addObserveRequest(
+                                    Bytes.bytesToLong(coapRequest.getToken()),
+                                    coapRequest);
+                            coapDevice.addObserveChannel(targetChannel);
+                            break;
+                        case UNSUBSCRIBE:
+                            coapDevice.removeObserveChannel(targetChannel);
+                            coapDevice.removeObserveRequest(
+                                    Bytes.bytesToLong(coapRequest.getToken()));
+                            break;
+                        default:
+                            break;
+                    }
+
                 } catch (Throwable t) {
                     Log.f(ctx.channel(), t);
                     ResponseStatus responseStatus = t instanceof ServerException
@@ -117,27 +157,30 @@ public class DeviceServerSystem extends ServerSystem {
 
         @Override
         public void channelActive(ChannelHandlerContext ctx) {
-
-            // Authenticated device connected
             Device device = ctx.channel().attr(keyDevice).get();
-            mDevicePool.addDevice(device);
-            device.onConnected();
+            // Authenticated device connected
 
             sendDevicePresence(device.getDeviceId(), "on");
+            mDevicePool.addDevice(device);
+
+            device.onConnected();
         }
 
         @Override
-        public void channelInactive(ChannelHandlerContext ctx) {
+        public void channelInactive(ChannelHandlerContext ctx)
+                throws ClientException {
             Device device = ctx.channel().attr(keyDevice).get();
             // Some cases, this event occurs after new device connected using
             // same di.
             // So compare actual value, and remove if same.
             if (device != null) {
-                mDevicePool.removeDevice(device);
+                sendDevicePresence(device.getDeviceId(), "off");
+
                 device.onDisconnected();
+
+                mDevicePool.removeDevice(device);
                 ctx.channel().attr(keyDevice).remove();
 
-                sendDevicePresence(device.getDeviceId(), "off");
             }
         }
 
index 6feb5cb..369e268 100644 (file)
@@ -83,17 +83,17 @@ public class CoapClient implements IRequestChannel, IResponseEventHandler {
 
             switch (request.getObserve()) {
                 case UNSUBSCRIBE:
-                    newToken = mSubscription.remove(Bytes.bytesToLong(token));
+                    newToken = removeObserve(Bytes.bytesToLong(token));
                     break;
 
                 case SUBSCRIBE:
-                    mSubscription.put(Bytes.bytesToLong(token), newToken);
+                    addObserve(Bytes.bytesToLong(token), newToken);
                 default:
                     // We create temp token
                     // TODO: temporal handling
                     if (request.getUriPath()
                             .equals(OICConstants.RESOURCE_PRESENCE_FULL_URI)) {
-                        mSubscription.put(Bytes.bytesToLong(token), newToken);
+                        addObserve(Bytes.bytesToLong(token), newToken);
                         observe = Observe.SUBSCRIBE;
                     }
                     synchronized (mToken) {
@@ -133,9 +133,31 @@ public class CoapClient implements IRequestChannel, IResponseEventHandler {
         // Subscription response should stored
         if (reqInfo.observe != Observe.SUBSCRIBE) {
             mTokenExchanger.remove(Bytes.bytesToLong(coapResponse.getToken()));
+            if (reqInfo.observe == Observe.UNSUBSCRIBE && mSubscription
+                    .containsKey(Bytes.bytesToLong(reqInfo.originToken))) {
+                mSubscription.remove(Bytes.bytesToLong(reqInfo.originToken));
+            }
         }
 
         coapResponse.setToken(reqInfo.originToken);
         reqInfo.responseHandler.onResponseReceived(response);
     }
+
+    private void addObserve(long token, long newtoken) {
+
+        mSubscription.put(token, newtoken);
+    }
+
+    private Long removeObserve(long token) {
+
+        Long getToken = mSubscription.remove(token);
+        return getToken;
+    }
+
+    public Long isObserveRequest(Long token) {
+        Long getToken = null;
+        getToken = mSubscription.get(token);
+
+        return getToken;
+    }
 }
index a4f3fef..bf57799 100644 (file)
@@ -22,6 +22,7 @@
 package org.iotivity.cloud.base.connector;
 
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.HashMap;
 
 import org.iotivity.cloud.base.device.IRequestChannel;
@@ -44,4 +45,8 @@ public class ConnectorPool {
     public static IRequestChannel getConnection(String name) {
         return mConnection.get(name);
     }
+
+    public static ArrayList<IRequestChannel> getConnectionList() {
+        return new ArrayList<IRequestChannel>(mConnection.values());
+    }
 }
index 1e9d37b..196ecca 100644 (file)
  */
 package org.iotivity.cloud.base.device;
 
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
 
 import org.iotivity.cloud.base.connector.CoapClient;
+import org.iotivity.cloud.base.connector.ConnectorPool;
+import org.iotivity.cloud.base.exception.ClientException;
+import org.iotivity.cloud.base.protocols.IRequest;
 import org.iotivity.cloud.base.protocols.IResponse;
+import org.iotivity.cloud.base.protocols.MessageBuilder;
+import org.iotivity.cloud.base.protocols.coap.CoapRequest;
+import org.iotivity.cloud.base.protocols.coap.CoapResponse;
+import org.iotivity.cloud.base.protocols.enums.Observe;
+import org.iotivity.cloud.base.protocols.enums.ResponseStatus;
+import org.iotivity.cloud.util.Bytes;
 import org.iotivity.cloud.util.Log;
 
 import io.netty.channel.ChannelHandlerContext;
 
 public class CoapDevice extends Device {
-    private CoapClient       mCoapClient    = null;
-    private String           mUserId        = null;
-    private String           mDeviceId      = null;
-    private String           mAccessToken   = null;
-    private Date             mIssuedTime    = null;
-    private int              mExpiredPolicy = 0;
+    private CoapClient                 mCoapClient         = null;
+    private String                     mUserId             = null;
+    private String                     mDeviceId           = null;
+    private String                     mAccessToken        = null;
+    private Date                       mIssuedTime         = null;
+    private int                        mExpiredPolicy      = 0;
+    private ArrayList<IRequestChannel> mObserveChannelList = new ArrayList<>();
+    private HashMap<Long, IRequest>    mObserveRequestList = new HashMap<>();
 
-    private static final int INFINITE_TIME  = -1;
+    private static final int           INFINITE_TIME       = -1;
 
     public CoapDevice(ChannelHandlerContext ctx) {
         super(ctx);
@@ -85,10 +99,60 @@ public class CoapDevice extends Device {
         this.mAccessToken = accessToken;
     }
 
+    public void addObserveChannel(IRequestChannel channel) {
+
+        if (channel != null) {
+            mObserveChannelList.add(channel);
+        }
+    }
+
+    public void removeObserveChannel(IRequestChannel channel)
+            throws ClientException {
+        if (channel != null && mObserveChannelList.contains(channel)) {
+
+            Iterator<Long> iterator = mObserveRequestList.keySet().iterator();
+            while (iterator.hasNext()) {
+                Long token = iterator.next();
+                CoapClient coapClient = (CoapClient) channel;
+                if (coapClient.isObserveRequest(token) != null) {
+                    IRequest getRequest = mObserveRequestList.get(token);
+
+                    CoapRequest coapRequest = (CoapRequest) getRequest;
+                    coapRequest.setObserve(Observe.UNSUBSCRIBE);
+
+                    coapClient.onResponseReceived(MessageBuilder.createResponse(
+                            coapRequest, ResponseStatus.CONTENT, null, null));
+                }
+            }
+            mObserveChannelList.remove(channel);
+        }
+    }
+
+    public void addObserveRequest(Long token, IRequest request) {
+
+        mObserveRequestList.put(token, request);
+    }
+
+    public void removeObserveRequest(Long token) {
+
+        mObserveRequestList.remove(token);
+    }
+
     // This is called by cloud resource model
     @Override
     public void sendResponse(IResponse response) {
         // This message must converted to CoapResponse
+        CoapResponse coapResp = (CoapResponse) response;
+
+        Iterator<Long> iterator = mObserveRequestList.keySet().iterator();
+        while (iterator.hasNext()) {
+            Long token = iterator.next();
+            Long respToken = Bytes.bytesToLong(coapResp.getToken());
+            if (respToken.equals(token)
+                    && coapResp.getObserve().equals(Observe.UNSUBSCRIBE)) {
+                iterator.remove();
+            }
+        }
         ctx.channel().writeAndFlush(response);
     }
 
@@ -121,10 +185,26 @@ public class CoapDevice extends Device {
 
     @Override
     public void onConnected() {
+        mObserveChannelList.addAll(ConnectorPool.getConnectionList());
     }
 
     @Override
     public void onDisconnected() {
+        for (IRequestChannel serverChannel : mObserveChannelList) {
+            Iterator<Long> iterator = mObserveRequestList.keySet().iterator();
+            while (iterator.hasNext()) {
+                Long token = iterator.next();
+                CoapClient coapClient = (CoapClient) serverChannel;
+
+                if (coapClient.isObserveRequest(token) != null) {
+                    CoapRequest coapRequest = (CoapRequest) mObserveRequestList
+                            .get(token);
+                    coapRequest.setObserve(Observe.UNSUBSCRIBE);
+                    coapRequest.setToken(Bytes.longTo8Bytes(token));
+                    serverChannel.sendRequest(MessageBuilder.modifyRequest(
+                            coapRequest, null, null, null, null), this);
+                }
+            }
+        }
     }
-
 }
index 5a703b7..e4fb793 100644 (file)
@@ -46,6 +46,7 @@ public class MessageBuilder {
             CoapResponse coapResponse = new CoapResponse(responseStatus);
             coapResponse.setUriPath(coapRequest.getUriPath());
             coapResponse.setToken(coapRequest.getToken());
+            coapResponse.setObserve(coapRequest.getObserve());
             if (payload != null) {
                 coapResponse.setContentFormat(format);
                 coapResponse.setPayload(payload);