import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import org.iotivity.cloud.base.CoapClient;
import org.iotivity.cloud.base.protocols.coap.enums.CoapStatus;
import org.iotivity.cloud.ciserver.Constants;
import org.iotivity.cloud.util.Logger;
-import org.iotivity.cloud.util.Net;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler.Sharable;
}
}
- private CoapClient rdClient = new CoapClient();
- ///////////
+ private CoapClient rdClient = null;
+ ///////////
////////// Handler for Account Server
private static final AttributeKey<List<CoapRequest>> keyAccountClient = AttributeKey
}
}
- private CoapClient asClient = new CoapClient();
+ private CoapClient asClient = null;
//////////
private SessionManager sessionManager = null;
- public CoapRelayHandler(SessionManager sessionManager, String rdAddress,
- int rdPort, String acAddress, int acPort) {
+ public CoapRelayHandler(SessionManager sessionManager) {
this.sessionManager = sessionManager;
+ rdClient = new CoapClient();
+
rdClient.addHandler(new RDHandler());
+ asClient = new CoapClient();
+
asClient.addHandler(new AccountHandler());
+ }
- try {
- rdClient.startClient(new InetSocketAddress(rdAddress, rdPort));
- asClient.startClient(new InetSocketAddress(acAddress, acPort));
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ public void startHandler(String rdAddress, int rdPort, String acAddress,
+ int acPort) throws Exception {
+ rdClient.startClient(new InetSocketAddress(rdAddress, rdPort));
+
+ asClient.startClient(new InetSocketAddress(acAddress, acPort));
asClient.getChannelFuture().channel().attr(keyAccountClient)
.set(new ArrayList<CoapRequest>());
}
+ public void stopHandler() throws Exception {
+ asClient.stopClient();
+
+ rdClient.stopClient();
+ }
+
private static final AttributeKey<ChannelHandlerContext> keyDevice = AttributeKey
.newInstance("deviceCtx");
- private HashMap<String, CoapClient> ciRelayClients = new HashMap<String, CoapClient>();
-
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
String uriPath = request.getUriPath();
CoapRequest accountRequest = null;
String userId, deviceId, authPayload;
- CoapResponse response = null;
+ CoapResponse response;
Logger.d("Request received, URI: " + uriPath);
- switch (uriPath) {
- case Constants.AUTH_URI:
- // This case user wants to logout
- if (request.getUriQuery().endsWith("logout")) {
- ctx.channel().attr(Constants.Attribute_UserId).remove();
- response = new CoapResponse(CoapStatus.DELETED);
- } else {
- response = new CoapResponse(CoapStatus.BAD_REQUEST);
- }
- ctx.writeAndFlush(response);
- break;
-
- case Constants.RD_URI:
- // RD POST means publish device to server
- switch (request.getRequestMethod()) {
- case POST:
- userId = ctx.channel()
- .attr(Constants.Attribute_UserId).get();
- deviceId = request.decodeDeviceId();
- authPayload = String.format(
- "{\"userid\":\"%s\",\"deviceid\":\"%s\"}",
- userId, deviceId);
- accountRequest = new CoapRequest(CoapMethod.POST);
- accountRequest.setUriPath(Constants.ACCOUNT_URI);
- accountRequest.setUriQuery("reqtype=publish");
- accountRequest.setToken(request.getToken());
- accountRequest.setPayload(authPayload.getBytes(StandardCharsets.UTF_8));
-
- // TODO: deviceId must be registered after session
- // granted
- Logger.d("Adding deviceId to session: " + deviceId);
- sessionManager.addSession(deviceId, ctx);
- break;
-
- default:
- Logger.e("Unsupported request type");
- break;
- }
-
- rdClient.getChannelFuture().channel().attr(keyRDClient)
- .set(ctx);
-
- // Add original request to list for future use
- asClient.getChannelFuture().channel().attr(keyAccountClient)
- .get().add(request);
- asClient.sendRequest(accountRequest);
- return;
-
- case Constants.WELL_KNOWN_URI:
- switch (request.getRequestMethod()) {
- case GET:
- userId = ctx.channel()
- .attr(Constants.Attribute_UserId).get();
- authPayload = String.format("{\"userid\":\"%s\"}",
- userId);
- accountRequest = new CoapRequest(CoapMethod.GET);
- accountRequest.setUriPath(Constants.ACCOUNT_URI);
- accountRequest.setUriQuery("reqtype=find");
- accountRequest.setToken(request.getToken());
- accountRequest.setPayload(authPayload.getBytes());
- break;
-
- default:
- Logger.e("Unsupported request type");
- break;
- }
-
- rdClient.getChannelFuture().channel().attr(keyRDClient)
- .set(ctx);
-
- // Add original request to list for future use
- asClient.getChannelFuture().channel().attr(keyAccountClient)
- .get().add(request);
- asClient.sendRequest(accountRequest);
- return;
-
- case Constants.KEEP_ALIVE_URI:
- break;
-
- default:
- List<String> uriPathList = request.getUriPathSegments();
- String originUriPathList = request.getUriPath();
- Logger.i("uriPahtList: " + uriPathList.toString());
- String ciAddress = uriPathList.get(0);
- String did = uriPathList.get(1);
-
- Logger.i("CI address: " + ciAddress);
- Logger.i("did: " + did);
-
- // TODO: getMyIP ?
- String hostAddress = Net.getMyIpAddress().replace("/", "");
- Logger.i("hostAddress : " + hostAddress);
- // if published CI is mine
- if (hostAddress.equals(ciAddress) == true) {
- // find ctx about did, and send msg
- Logger.d("published CI is mine");
- String resource = new String();
- List<String> pathSegments = uriPathList.subList(2,
- uriPathList.size());
- for (String path : pathSegments) {
- resource += "/";
- resource += path;
- }
- Logger.i("resource: " + resource);
- request.setUriPath(resource);
-
- ChannelHandlerContext deviceCtx = sessionManager
- .querySession(did);
- if (deviceCtx != null) {
- deviceCtx.attr(keyDevice).set(ctx);
- deviceCtx.writeAndFlush(request);
- } else {
- Logger.e("deviceCtx is null");
- response = new CoapResponse(CoapStatus.FORBIDDEN);
- response.setToken(request.getToken());
+ if (uriPath != null) {
+ switch (uriPath) {
+ case Constants.AUTH_URI:
+ // This case user wants to logout
+ String uriQuery = request.getUriQuery();
+ if (uriQuery != null) {
+ if (uriQuery.endsWith("logout")) {
+ ctx.channel().attr(Constants.Attribute_UserId)
+ .remove();
+ response = new CoapResponse(CoapStatus.DELETED);
+ } else {
+ response = new CoapResponse(
+ CoapStatus.BAD_REQUEST);
+ }
ctx.writeAndFlush(response);
}
- } else {
- // if CI is not connected, connect and send msg
- CoapClient otherCI = null;
- synchronized (ciRelayClients) {
- otherCI = ciRelayClients.get(ciAddress);
- if (otherCI == null) {
- otherCI = new CoapClient();
- otherCI.startClient(
- new InetSocketAddress(ciAddress, 5683));
- ciRelayClients.put(ciAddress, otherCI);
+ break;
+
+ case Constants.RD_URI:
+ // RD POST means publish device to server
+ switch (request.getRequestMethod()) {
+ case POST:
+ userId = ctx.channel()
+ .attr(Constants.Attribute_UserId).get();
+ deviceId = request.decodeDeviceId();
+ authPayload = String.format(
+ "{\"userid\":\"%s\",\"deviceid\":\"%s\"}",
+ userId, deviceId);
+ accountRequest = new CoapRequest(
+ CoapMethod.POST);
+ accountRequest
+ .setUriPath(Constants.ACCOUNT_URI);
+ accountRequest.setUriQuery("reqtype=publish");
+ accountRequest.setToken(request.getToken());
+ accountRequest.setPayload(authPayload
+ .getBytes(StandardCharsets.UTF_8));
+
+ // TODO: deviceId must be registered after
+ // session
+ // granted
+ Logger.d("Adding deviceId to session: "
+ + deviceId);
+ sessionManager.addSession(deviceId, ctx);
+ break;
+
+ default:
+ Logger.e("Unsupported request type");
+ break;
+ }
+
+ rdClient.getChannelFuture().channel().attr(keyRDClient)
+ .set(ctx);
+
+ // Add original request to list for future use
+ asClient.getChannelFuture().channel()
+ .attr(keyAccountClient).get().add(request);
+ asClient.sendRequest(accountRequest);
+ return;
+
+ case Constants.WELL_KNOWN_URI:
+ switch (request.getRequestMethod()) {
+ case GET:
+ userId = ctx.channel()
+ .attr(Constants.Attribute_UserId).get();
+ authPayload = String
+ .format("{\"userid\":\"%s\"}", userId);
+ accountRequest = new CoapRequest(
+ CoapMethod.GET);
+ accountRequest
+ .setUriPath(Constants.ACCOUNT_URI);
+ accountRequest.setUriQuery("reqtype=find");
+ accountRequest.setToken(request.getToken());
+ accountRequest.setPayload(authPayload
+ .getBytes(StandardCharsets.UTF_8));
+ break;
+
+ default:
+ Logger.e("Unsupported request type");
+ break;
+ }
+
+ rdClient.getChannelFuture().channel().attr(keyRDClient)
+ .set(ctx);
+
+ // Add original request to list for future use
+ asClient.getChannelFuture().channel()
+ .attr(keyAccountClient).get().add(request);
+ asClient.sendRequest(accountRequest);
+ return;
+
+ case Constants.KEEP_ALIVE_URI:
+ break;
+
+ default:
+ List<String> uriPathList = request.getUriPathSegments();
+ if (uriPathList != null) {
+ Logger.i("uriPahtList: " + uriPathList.toString());
+
+ String did = uriPathList.get(0);
+
+ Logger.i("did: " + did);
+
+ // TODO: Clustering algorithm required
+ // find ctx about did, and send msg
+ StringBuffer resource = new StringBuffer();
+ List<String> pathSegments = uriPathList.subList(1,
+ uriPathList.size());
+ for (String path : pathSegments) {
+ resource.append("/");
+ resource.append(path);
+ }
+ Logger.i("resource: " + resource);
+ request.setUriPath(resource.toString());
+
+ ChannelHandlerContext deviceCtx = sessionManager
+ .querySession(did);
+ if (deviceCtx != null) {
+ deviceCtx.attr(keyDevice).set(ctx);
+ deviceCtx.writeAndFlush(request);
+ } else {
+ Logger.e("deviceCtx is null");
+ response = new CoapResponse(
+ CoapStatus.FORBIDDEN);
+ response.setToken(request.getToken());
+ ctx.writeAndFlush(response);
}
}
- request.setUriPath(originUriPathList);
- otherCI.sendRequest(request);
- }
- return;
+ return;
+ }
}
} else if (msg instanceof CoapResponse) {
- if (ctx.attr(keyDevice).get() != null) {
- Logger.i("ctx.channel : "
- + ctx.attr(keyDevice).get().channel().toString());
- ctx.attr(keyDevice).get().writeAndFlush(msg);
+ ChannelHandlerContext resourceClient = ctx.attr(keyDevice).get();
+ if (resourceClient != null) {
+ Logger.i("Forwards message to client");
+
+ CoapResponse response = (CoapResponse) msg;
+
+ // If response contains path, add di
+ String did = sessionManager.queryDid(ctx);
+ if (response.getOption(11) != null && did != null) {
+ response.getOption(11).add(0,
+ did.getBytes(StandardCharsets.UTF_8));
+ }
+
+ Logger.i(
+ "ctx.channel : " + resourceClient.channel().toString());
+ resourceClient.writeAndFlush(response);
return;
}
}
}
@Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ Logger.d("Channel Inactive");
+ sessionManager.removeSessionByChannel(ctx);
+ super.channelInactive(ctx);
+ }
+
+ @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();