package org.iotivity.cloud.ciserver;
import java.util.HashMap;
+import java.util.Iterator;
import org.iotivity.cloud.base.OICConstants;
import org.iotivity.cloud.base.ServerSystem;
import org.iotivity.cloud.base.device.CoapDevice;
import org.iotivity.cloud.base.device.Device;
import org.iotivity.cloud.base.device.IRequestChannel;
+import org.iotivity.cloud.base.exception.ClientException;
import org.iotivity.cloud.base.exception.ServerException;
import org.iotivity.cloud.base.exception.ServerException.BadRequestException;
import org.iotivity.cloud.base.exception.ServerException.UnAuthorizedException;
import org.iotivity.cloud.base.server.CoapServer;
import org.iotivity.cloud.base.server.HttpServer;
import org.iotivity.cloud.base.server.Server;
+import org.iotivity.cloud.util.Bytes;
import org.iotivity.cloud.util.Cbor;
import org.iotivity.cloud.util.Log;
}
}
- public void removeDevice(Device device) {
+ public void removeDevice(Device device) throws ClientException {
String deviceId = ((CoapDevice) device).getDeviceId();
synchronized (mMapDevice) {
if (mMapDevice.get(deviceId) == device) {
mMapDevice.remove(deviceId);
}
}
+ removeObserveDevice(device);
+ }
+
+ private void removeObserveDevice(Device device) throws ClientException {
+ Iterator<String> iterator = mMapDevice.keySet().iterator();
+ while (iterator.hasNext()) {
+ String deviceId = iterator.next();
+ CoapDevice getDevice = (CoapDevice) mDevicePool
+ .queryDevice(deviceId);
+ getDevice.removeObserveChannel(
+ ((CoapDevice) device).getRequestChannel());
+ }
}
public Device queryDevice(String deviceId) {
throw new UnAuthorizedException("token is expired");
}
+ CoapRequest coapRequest = (CoapRequest) msg;
+ IRequestChannel targetChannel = null;
+ if (coapRequest.getUriPathSegments()
+ .contains(Constants.REQ_DEVICE_ID)) {
+ CoapDevice targetDevice = (CoapDevice) mDevicePool
+ .queryDevice(coapRequest.getUriPathSegments()
+ .get(1));
+ targetChannel = targetDevice.getRequestChannel();
+ }
+ switch (coapRequest.getObserve()) {
+ case SUBSCRIBE:
+ coapDevice.addObserveRequest(
+ Bytes.bytesToLong(coapRequest.getToken()),
+ coapRequest);
+ coapDevice.addObserveChannel(targetChannel);
+ break;
+ case UNSUBSCRIBE:
+ coapDevice.removeObserveChannel(targetChannel);
+ coapDevice.removeObserveRequest(
+ Bytes.bytesToLong(coapRequest.getToken()));
+ break;
+ default:
+ break;
+ }
+
} catch (Throwable t) {
Log.f(ctx.channel(), t);
ResponseStatus responseStatus = t instanceof ServerException
@Override
public void channelActive(ChannelHandlerContext ctx) {
-
- // Authenticated device connected
Device device = ctx.channel().attr(keyDevice).get();
- mDevicePool.addDevice(device);
- device.onConnected();
+ // Authenticated device connected
sendDevicePresence(device.getDeviceId(), "on");
+ mDevicePool.addDevice(device);
+
+ device.onConnected();
}
@Override
- public void channelInactive(ChannelHandlerContext ctx) {
+ public void channelInactive(ChannelHandlerContext ctx)
+ throws ClientException {
Device device = ctx.channel().attr(keyDevice).get();
// Some cases, this event occurs after new device connected using
// same di.
// So compare actual value, and remove if same.
if (device != null) {
- mDevicePool.removeDevice(device);
+ sendDevicePresence(device.getDeviceId(), "off");
+
device.onDisconnected();
+
+ mDevicePool.removeDevice(device);
ctx.channel().attr(keyDevice).remove();
- sendDevicePresence(device.getDeviceId(), "off");
}
}
switch (request.getObserve()) {
case UNSUBSCRIBE:
- newToken = mSubscription.remove(Bytes.bytesToLong(token));
+ newToken = removeObserve(Bytes.bytesToLong(token));
break;
case SUBSCRIBE:
- mSubscription.put(Bytes.bytesToLong(token), newToken);
+ addObserve(Bytes.bytesToLong(token), newToken);
default:
// We create temp token
// TODO: temporal handling
if (request.getUriPath()
.equals(OICConstants.RESOURCE_PRESENCE_FULL_URI)) {
- mSubscription.put(Bytes.bytesToLong(token), newToken);
+ addObserve(Bytes.bytesToLong(token), newToken);
observe = Observe.SUBSCRIBE;
}
synchronized (mToken) {
// Subscription response should stored
if (reqInfo.observe != Observe.SUBSCRIBE) {
mTokenExchanger.remove(Bytes.bytesToLong(coapResponse.getToken()));
+ if (reqInfo.observe == Observe.UNSUBSCRIBE && mSubscription
+ .containsKey(Bytes.bytesToLong(reqInfo.originToken))) {
+ mSubscription.remove(Bytes.bytesToLong(reqInfo.originToken));
+ }
}
coapResponse.setToken(reqInfo.originToken);
reqInfo.responseHandler.onResponseReceived(response);
}
+
+ private void addObserve(long token, long newtoken) {
+
+ mSubscription.put(token, newtoken);
+ }
+
+ private Long removeObserve(long token) {
+
+ Long getToken = mSubscription.remove(token);
+ return getToken;
+ }
+
+ public Long isObserveRequest(Long token) {
+ Long getToken = null;
+ getToken = mSubscription.get(token);
+
+ return getToken;
+ }
}
package org.iotivity.cloud.base.connector;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.HashMap;
import org.iotivity.cloud.base.device.IRequestChannel;
public static IRequestChannel getConnection(String name) {
return mConnection.get(name);
}
+
+ public static ArrayList<IRequestChannel> getConnectionList() {
+ return new ArrayList<IRequestChannel>(mConnection.values());
+ }
}
*/
package org.iotivity.cloud.base.device;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
import org.iotivity.cloud.base.connector.CoapClient;
+import org.iotivity.cloud.base.connector.ConnectorPool;
+import org.iotivity.cloud.base.exception.ClientException;
+import org.iotivity.cloud.base.protocols.IRequest;
import org.iotivity.cloud.base.protocols.IResponse;
+import org.iotivity.cloud.base.protocols.MessageBuilder;
+import org.iotivity.cloud.base.protocols.coap.CoapRequest;
+import org.iotivity.cloud.base.protocols.coap.CoapResponse;
+import org.iotivity.cloud.base.protocols.enums.Observe;
+import org.iotivity.cloud.base.protocols.enums.ResponseStatus;
+import org.iotivity.cloud.util.Bytes;
import org.iotivity.cloud.util.Log;
import io.netty.channel.ChannelHandlerContext;
public class CoapDevice extends Device {
- private CoapClient mCoapClient = null;
- private String mUserId = null;
- private String mDeviceId = null;
- private String mAccessToken = null;
- private Date mIssuedTime = null;
- private int mExpiredPolicy = 0;
+ private CoapClient mCoapClient = null;
+ private String mUserId = null;
+ private String mDeviceId = null;
+ private String mAccessToken = null;
+ private Date mIssuedTime = null;
+ private int mExpiredPolicy = 0;
+ private ArrayList<IRequestChannel> mObserveChannelList = new ArrayList<>();
+ private HashMap<Long, IRequest> mObserveRequestList = new HashMap<>();
- private static final int INFINITE_TIME = -1;
+ private static final int INFINITE_TIME = -1;
public CoapDevice(ChannelHandlerContext ctx) {
super(ctx);
this.mAccessToken = accessToken;
}
+ public void addObserveChannel(IRequestChannel channel) {
+
+ if (channel != null) {
+ mObserveChannelList.add(channel);
+ }
+ }
+
+ public void removeObserveChannel(IRequestChannel channel)
+ throws ClientException {
+ if (channel != null && mObserveChannelList.contains(channel)) {
+
+ Iterator<Long> iterator = mObserveRequestList.keySet().iterator();
+ while (iterator.hasNext()) {
+ Long token = iterator.next();
+ CoapClient coapClient = (CoapClient) channel;
+ if (coapClient.isObserveRequest(token) != null) {
+ IRequest getRequest = mObserveRequestList.get(token);
+
+ CoapRequest coapRequest = (CoapRequest) getRequest;
+ coapRequest.setObserve(Observe.UNSUBSCRIBE);
+
+ coapClient.onResponseReceived(MessageBuilder.createResponse(
+ coapRequest, ResponseStatus.CONTENT, null, null));
+ }
+ }
+ mObserveChannelList.remove(channel);
+ }
+ }
+
+ public void addObserveRequest(Long token, IRequest request) {
+
+ mObserveRequestList.put(token, request);
+ }
+
+ public void removeObserveRequest(Long token) {
+
+ mObserveRequestList.remove(token);
+ }
+
// This is called by cloud resource model
@Override
public void sendResponse(IResponse response) {
// This message must converted to CoapResponse
+ CoapResponse coapResp = (CoapResponse) response;
+
+ Iterator<Long> iterator = mObserveRequestList.keySet().iterator();
+ while (iterator.hasNext()) {
+ Long token = iterator.next();
+ Long respToken = Bytes.bytesToLong(coapResp.getToken());
+ if (respToken.equals(token)
+ && coapResp.getObserve().equals(Observe.UNSUBSCRIBE)) {
+ iterator.remove();
+ }
+ }
ctx.channel().writeAndFlush(response);
}
@Override
public void onConnected() {
+ mObserveChannelList.addAll(ConnectorPool.getConnectionList());
}
@Override
public void onDisconnected() {
+ for (IRequestChannel serverChannel : mObserveChannelList) {
+ Iterator<Long> iterator = mObserveRequestList.keySet().iterator();
+ while (iterator.hasNext()) {
+ Long token = iterator.next();
+ CoapClient coapClient = (CoapClient) serverChannel;
+
+ if (coapClient.isObserveRequest(token) != null) {
+ CoapRequest coapRequest = (CoapRequest) mObserveRequestList
+ .get(token);
+ coapRequest.setObserve(Observe.UNSUBSCRIBE);
+ coapRequest.setToken(Bytes.longTo8Bytes(token));
+ serverChannel.sendRequest(MessageBuilder.modifyRequest(
+ coapRequest, null, null, null, null), this);
+ }
+ }
+ }
}
-
}
CoapResponse coapResponse = new CoapResponse(responseStatus);
coapResponse.setUriPath(coapRequest.getUriPath());
coapResponse.setToken(coapRequest.getToken());
+ coapResponse.setObserve(coapRequest.getObserve());
if (payload != null) {
coapResponse.setContentFormat(format);
coapResponse.setPayload(payload);