X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=resource%2Fsrc%2FInProcClientWrapper.cpp;h=abea511e674978a6ecdf61db0d60b50d8c2327f4;hb=8229635f6d207516ccbbdf23b13be164e0fc1787;hp=1ba07f0cc45c471e2f3ef7263937d82dab3b782d;hpb=fe404d601435da7bdf5d8d1b4f48afdcf09ebe60;p=platform%2Fupstream%2Fiotivity.git diff --git a/resource/src/InProcClientWrapper.cpp b/resource/src/InProcClientWrapper.cpp index 1ba07f0..abea511 100644 --- a/resource/src/InProcClientWrapper.cpp +++ b/resource/src/InProcClientWrapper.cpp @@ -25,6 +25,13 @@ #include "OCResource.h" #include "ocpayload.h" #include +#include "logger.h" +#ifdef TCP_ADAPTER +#include "oickeepalive.h" +#endif + +#define TAG "OIC_CLIENT_WRAPPER" + using namespace std; namespace OC @@ -36,6 +43,24 @@ namespace OC { // if the config type is server, we ought to never get called. If the config type // is both, we count on the server to run the thread and do the initialize + start(); + } + + InProcClientWrapper::~InProcClientWrapper() + { + try + { + stop(); + } + catch (InitializeException &e) + { + oclog() << "Exception in stop"<< e.what() << std::flush; + } + } + + OCStackResult InProcClientWrapper::start() + { + OIC_LOG_V(INFO, TAG, "start ocplatform for client : %d", m_cfg.transportType); if (m_cfg.mode == ModeType::Client) { @@ -43,32 +68,45 @@ namespace OC static_cast(m_cfg.serverConnectivity & CT_MASK_FLAGS); OCTransportFlags clientFlags = static_cast(m_cfg.clientConnectivity & CT_MASK_FLAGS); - OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags); + OCStackResult result = OCInit2(OC_CLIENT, serverFlags, clientFlags, + m_cfg.transportType); if (OC_STACK_OK != result) { throw InitializeException(OC::InitException::STACK_INIT_ERROR, result); } - m_threadRun = true; - m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this); + if (false == m_threadRun) + { + m_threadRun = true; + m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this); + } } + return OC_STACK_OK; } - InProcClientWrapper::~InProcClientWrapper() + OCStackResult InProcClientWrapper::stop() { + OIC_LOG(INFO, TAG, "stop ocplatform"); + if (m_threadRun && m_listeningThread.joinable()) { m_threadRun = false; m_listeningThread.join(); } - // only stop if we are the ones who actually called 'init'. We are counting + // only stop if we are the ones who actually called 'start'. We are counting // on the server to do the stop. if (m_cfg.mode == ModeType::Client) { - OCStop(); + OCStackResult result = OCStop(); + + if (OC_STACK_OK != result) + { + throw InitializeException(OC::InitException::STACK_TERMINATE_ERROR, result); + } } + return OC_STACK_OK; } void InProcClientWrapper::listeningFunc() @@ -101,19 +139,15 @@ namespace OC { if (clientResponse->payload == nullptr || ( - clientResponse->payload->type != PAYLOAD_TYPE_DEVICE && - clientResponse->payload->type != PAYLOAD_TYPE_PLATFORM && clientResponse->payload->type != PAYLOAD_TYPE_REPRESENTATION ) ) { - //OCPayloadDestroy(clientResponse->payload); return OCRepresentation(); } MessageContainer oc; oc.setPayload(clientResponse->payload); - //OCPayloadDestroy(clientResponse->payload); std::vector::const_iterator it = oc.representations().begin(); if (it == oc.representations().end()) @@ -122,21 +156,25 @@ namespace OC } // first one is considered the root, everything else is considered a child of this one. - OCRepresentation root = *it; - root.setDevAddr(clientResponse->devAddr); - root.setUri(clientResponse->resourceUri); - ++it; + OCRepresentation root = *it; + root.setDevAddr(clientResponse->devAddr); + root.setUri(clientResponse->resourceUri); + ++it; std::for_each(it, oc.representations().end(), [&root](const OCRepresentation& repItr) {root.addChild(repItr);}); return root; - } OCStackApplicationResult listenCallback(void* ctx, OCDoHandle /*handle*/, OCClientResponse* clientResponse) { + if (!ctx || !clientResponse) + { + return OC_STACK_KEEP_TRANSACTION; + } + ClientCallbackContext::ListenContext* context = static_cast(ctx); @@ -165,7 +203,8 @@ namespace OC return OC_STACK_KEEP_TRANSACTION; } - try{ + try + { ListenOCContainer container(clientWrapper, clientResponse->devAddr, reinterpret_cast(clientResponse->payload)); // loop to ensure valid construction of all resources @@ -176,7 +215,8 @@ namespace OC exec.detach(); } } - catch (std::exception &e){ + catch (std::exception &e) + { oclog() << "Exception in listCallback, ignoring response: " << e.what() << std::flush; } @@ -195,18 +235,14 @@ namespace OC ClientCallbackContext::ListenErrorContext* context = static_cast(ctx); - if (!context) - { - return OC_STACK_KEEP_TRANSACTION; - } OCStackResult result = clientResponse->result; if (result == OC_STACK_OK) { if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY) { - oclog() << "listenCallback(): clientResponse payload was null or the wrong type" - << std::flush; + OIC_LOG_V(DEBUG, TAG, "%s: clientResponse payload was null or the wrong type", + __func__); return OC_STACK_KEEP_TRANSACTION; } @@ -214,8 +250,8 @@ namespace OC if (!clientWrapper) { - oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper" - << std::flush; + OIC_LOG_V(DEBUG, TAG, "%s: failed to get a shared_ptr to the client wrapper", + __func__); return OC_STACK_KEEP_TRANSACTION; } @@ -230,10 +266,16 @@ namespace OC return OC_STACK_KEEP_TRANSACTION; } - std::string resourceURI = clientResponse->resourceUri; + OIC_LOG_V(DEBUG, TAG, "%s: call response callback", __func__); + std::string resourceURI; + if(NULL != clientResponse->resourceUri) + { + resourceURI = clientResponse->resourceUri; + } + std::thread exec(context->errorCallback, resourceURI, result); exec.detach(); - return OC_STACK_DELETE_TRANSACTION; + return OC_STACK_KEEP_TRANSACTION; } OCStackResult InProcClientWrapper::ListenForResource( @@ -325,6 +367,212 @@ namespace OC } return result; } + + OCStackApplicationResult listenResListCallback(void* ctx, OCDoHandle /*handle*/, + OCClientResponse* clientResponse) + { + if (!ctx || !clientResponse) + { + return OC_STACK_KEEP_TRANSACTION; + } + + ClientCallbackContext::ListenResListContext* context = + static_cast(ctx); + + if (clientResponse->result != OC_STACK_OK) + { + oclog() << "listenResListCallback(): failed to create resource. clientResponse: " + << clientResponse->result + << std::flush; + + return OC_STACK_KEEP_TRANSACTION; + } + + if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY) + { + oclog() << "listenResListCallback(): clientResponse payload was null or the wrong type" + << std::flush; + return OC_STACK_KEEP_TRANSACTION; + } + + auto clientWrapper = context->clientWrapper.lock(); + + if (!clientWrapper) + { + oclog() << "listenResListCallback(): failed to get a shared_ptr to the client wrapper" + << std::flush; + return OC_STACK_KEEP_TRANSACTION; + } + + try + { + ListenOCContainer container(clientWrapper, clientResponse->devAddr, + reinterpret_cast(clientResponse->payload)); + + OIC_LOG_V(DEBUG, TAG, "%s: call response callback", __func__); + std::thread exec(context->callback, container.Resources()); + exec.detach(); + } + catch (std::exception &e) + { + oclog() << "Exception in listenResListCallback(), ignoring response: " + << e.what() << std::flush; + } + + return OC_STACK_KEEP_TRANSACTION; + } + + OCStackResult InProcClientWrapper::ListenForResourceList( + const std::string& serviceUrl, + const std::string& resourceType, + OCConnectivityType connectivityType, + FindResListCallback& callback, QualityOfService QoS) + { + if (!callback) + { + return OC_STACK_INVALID_PARAM; + } + + OCStackResult result; + ostringstream resourceUri; + resourceUri << serviceUrl << resourceType; + + ClientCallbackContext::ListenResListContext* context = + new ClientCallbackContext::ListenResListContext(callback, shared_from_this()); + OCCallbackData cbdata; + cbdata.context = static_cast(context), + cbdata.cb = listenResListCallback; + cbdata.cd = [](void* c){delete (ClientCallbackContext::ListenResListContext*)c;}; + + auto cLock = m_csdkLock.lock(); + if (cLock) + { + std::lock_guard lock(*cLock); + result = OCDoResource(nullptr, OC_REST_DISCOVER, + resourceUri.str().c_str(), + nullptr, nullptr, connectivityType, + static_cast(QoS), + &cbdata, + nullptr, 0); + } + else + { + delete context; + result = OC_STACK_ERROR; + } + return result; + } + + OCStackApplicationResult listenResListWithErrorCallback(void* ctx, OCDoHandle /*handle*/, + OCClientResponse* clientResponse) + { + if (!ctx || !clientResponse) + { + return OC_STACK_KEEP_TRANSACTION; + } + + ClientCallbackContext::ListenResListWithErrorContext* context = + static_cast(ctx); + + OCStackResult result = clientResponse->result; + if (result != OC_STACK_OK) + { + oclog() << "listenResListWithErrorCallback(): failed to create resource. clientResponse: " + << result << std::flush; + + //send the error callback + std::string uri; + if(NULL != clientResponse->resourceUri) + { + uri = clientResponse->resourceUri; + } + std::thread exec(context->errorCallback, uri, result); + exec.detach(); + return OC_STACK_KEEP_TRANSACTION; + } + + if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY) + { + oclog() << "listenResListWithErrorCallback(): clientResponse payload was null or the wrong type" + << std::flush; + return OC_STACK_KEEP_TRANSACTION; + } + + auto clientWrapper = context->clientWrapper.lock(); + + if (!clientWrapper) + { + oclog() << "listenResListWithErrorCallback(): failed to get a shared_ptr to the client wrapper" + << std::flush; + return OC_STACK_KEEP_TRANSACTION; + } + + try + { + ListenOCContainer container(clientWrapper, clientResponse->devAddr, + reinterpret_cast(clientResponse->payload)); + + OIC_LOG_V(DEBUG, TAG, "%s: call response callback", __func__); + std::thread exec(context->callback, container.Resources()); + exec.detach(); + } + catch (std::exception &e) + { + oclog() << "Exception in listenResListWithErrorCallback(), ignoring response: " + << e.what() << std::flush; + } + + return OC_STACK_KEEP_TRANSACTION; + } + + OCStackResult InProcClientWrapper::ListenForResourceListWithError( + const std::string& serviceUrl, + const std::string& resourceType, + OCConnectivityType connectivityType, + FindResListCallback& callback, + FindErrorCallback& errorCallback, QualityOfService QoS) + { + if (!callback) + { + return OC_STACK_INVALID_PARAM; + } + + OCStackResult result; + ostringstream resourceUri; + resourceUri << serviceUrl << resourceType; + + ClientCallbackContext::ListenResListWithErrorContext* context = + new ClientCallbackContext::ListenResListWithErrorContext(callback, errorCallback, + shared_from_this()); + if (!context) + { + return OC_STACK_ERROR; + } + + OCCallbackData cbdata; + cbdata.context = static_cast(context), + cbdata.cb = listenResListWithErrorCallback; + cbdata.cd = [](void* c){delete (ClientCallbackContext::ListenResListWithErrorContext*)c;}; + + auto cLock = m_csdkLock.lock(); + if (cLock) + { + std::lock_guard lock(*cLock); + result = OCDoResource(nullptr, OC_REST_DISCOVER, + resourceUri.str().c_str(), + nullptr, nullptr, connectivityType, + static_cast(QoS), + &cbdata, + nullptr, 0); + } + else + { + delete context; + result = OC_STACK_ERROR; + } + return result; + } + #ifdef WITH_MQ OCStackApplicationResult listenMQCallback(void* ctx, OCDoHandle /*handle*/, OCClientResponse* clientResponse) @@ -337,6 +585,12 @@ namespace OC return OC_STACK_DELETE_TRANSACTION; } + std::string resourceURI; + if(NULL != clientResponse->resourceUri) + { + resourceURI = clientResponse->resourceUri; + } + if (clientResponse->result != OC_STACK_OK) { oclog() << "listenMQCallback(): failed to create resource. clientResponse: " @@ -344,7 +598,7 @@ namespace OC << std::flush; std::thread exec(context->callback, clientResponse->result, - clientResponse->resourceUri, nullptr); + resourceURI, nullptr); exec.detach(); return OC_STACK_DELETE_TRANSACTION; @@ -358,7 +612,8 @@ namespace OC return OC_STACK_DELETE_TRANSACTION; } - try{ + try + { ListenOCContainer container(clientWrapper, clientResponse->devAddr, (OCRepPayload *) clientResponse->payload); @@ -366,11 +621,12 @@ namespace OC for (auto resource : container.Resources()) { std::thread exec(context->callback, clientResponse->result, - clientResponse->resourceUri, resource); + resourceURI, resource); exec.detach(); } } - catch (std::exception &e){ + catch (std::exception &e) + { oclog() << "Exception in listCallback, ignoring response: " << e.what() << std::flush; } @@ -435,6 +691,7 @@ namespace OC try { + OIC_LOG_V(DEBUG, TAG, "%s: call response callback", __func__); OCRepresentation rep = parseGetSetCallback(clientResponse); std::thread exec(context->callback, rep); exec.detach(); @@ -551,9 +808,9 @@ namespace OC } } - if (!isLocationOption) + if (!isLocationOption && NULL != clientResponse->resourceUri) { - createdUri = clientResponse->resourceUri; + createdUri = std::string(clientResponse->resourceUri); } auto clientWrapper = context->clientWrapper.lock(); @@ -565,7 +822,8 @@ namespace OC return OC_STACK_DELETE_TRANSACTION; } - try{ + try + { if (OC_STACK_OK == result || OC_STACK_RESOURCE_CREATED == result) { @@ -573,17 +831,23 @@ namespace OC createdUri); for (auto resource : container.Resources()) { - std::thread exec(context->callback, result, createdUri, resource); + std::thread exec(context->callback, result, + createdUri, + resource); exec.detach(); } } else { - std::thread exec(context->callback, result, createdUri, nullptr); + OIC_LOG_V(DEBUG, TAG, "%s: call response callback", __func__); + std::thread exec(context->callback, result, + createdUri, + nullptr); exec.detach(); } } - catch (std::exception &e){ + catch (std::exception &e) + { oclog() << "Exception in createMQTopicCallback, ignoring response: " << e.what() << std::flush; } @@ -642,23 +906,21 @@ namespace OC { ClientCallbackContext::GetContext* context = static_cast(ctx); - OCRepresentation rep; HeaderOptions serverHeaderOptions; OCStackResult result = clientResponse->result; - if (result == OC_STACK_OK) + + parseServerHeaderOptions(clientResponse, serverHeaderOptions); + try { - parseServerHeaderOptions(clientResponse, serverHeaderOptions); - try - { - rep = parseGetSetCallback(clientResponse); - } - catch(OC::OCException& e) - { - result = e.code(); - } + rep = parseGetSetCallback(clientResponse); + } + catch(OC::OCException& e) + { + result = e.code(); } + OIC_LOG_V(DEBUG, TAG, "%s: call response callback", __func__); std::thread exec(context->callback, serverHeaderOptions, rep, result); exec.detach(); return OC_STACK_DELETE_TRANSACTION; @@ -678,12 +940,12 @@ namespace OC OCStackResult result; ClientCallbackContext::GetContext* ctx = new ClientCallbackContext::GetContext(callback); + OCCallbackData cbdata; - cbdata.context = static_cast(ctx), + cbdata.context = static_cast(ctx); cbdata.cb = getResourceCallback; cbdata.cd = [](void* c){delete (ClientCallbackContext::GetContext*)c;}; - std::string uri = assembleSetResourceUri(resourceUri, queryParams); auto cLock = m_csdkLock.lock(); @@ -722,22 +984,18 @@ namespace OC HeaderOptions serverHeaderOptions; OCStackResult result = clientResponse->result; - if (OC_STACK_OK == result || - OC_STACK_RESOURCE_CREATED == result || - OC_STACK_RESOURCE_DELETED == result || - OC_STACK_RESOURCE_CHANGED == result) + + parseServerHeaderOptions(clientResponse, serverHeaderOptions); + try { - parseServerHeaderOptions(clientResponse, serverHeaderOptions); - try - { - attrs = parseGetSetCallback(clientResponse); - } - catch(OC::OCException& e) - { - result = e.code(); - } + attrs = parseGetSetCallback(clientResponse); + } + catch(OC::OCException& e) + { + result = e.code(); } + OIC_LOG_V(DEBUG, TAG, "%s: call response callback", __func__); std::thread exec(context->callback, serverHeaderOptions, attrs, result); exec.detach(); return OC_STACK_DELETE_TRANSACTION; @@ -936,10 +1194,9 @@ namespace OC static_cast(ctx); HeaderOptions serverHeaderOptions; - if (clientResponse->result == OC_STACK_OK) - { - parseServerHeaderOptions(clientResponse, serverHeaderOptions); - } + parseServerHeaderOptions(clientResponse, serverHeaderOptions); + + OIC_LOG_V(DEBUG, TAG, "%s: call response callback", __func__); std::thread exec(context->callback, serverHeaderOptions, clientResponse->result); exec.detach(); return OC_STACK_DELETE_TRANSACTION; @@ -1002,21 +1259,25 @@ namespace OC HeaderOptions serverHeaderOptions; uint32_t sequenceNumber = clientResponse->sequenceNumber; OCStackResult result = clientResponse->result; - if (clientResponse->result == OC_STACK_OK) + + parseServerHeaderOptions(clientResponse, serverHeaderOptions); + try { - parseServerHeaderOptions(clientResponse, serverHeaderOptions); - try - { - attrs = parseGetSetCallback(clientResponse); - } - catch(OC::OCException& e) - { - result = e.code(); - } + attrs = parseGetSetCallback(clientResponse); } + catch(OC::OCException& e) + { + result = e.code(); + } + + OIC_LOG_V(DEBUG, TAG, "%s: call response callback", __func__); std::thread exec(context->callback, serverHeaderOptions, attrs, result, sequenceNumber); exec.detach(); + if (sequenceNumber == MAX_SEQUENCE_NUMBER + 1) + { + return OC_STACK_DELETE_TRANSACTION; + } return OC_STACK_KEEP_TRANSACTION; } @@ -1122,6 +1383,7 @@ namespace OC */ std::string url = clientResponse->devAddr.addr; + OIC_LOG_V(DEBUG, TAG, "%s: call response callback", __func__); std::thread exec(context->callback, clientResponse->result, clientResponse->sequenceNumber, url); @@ -1214,7 +1476,7 @@ namespace OC std::lock_guard lock(*cLock); std::ostringstream os; - os << host << OCF_RSRVD_DEVICE_PRESENCE_URI; + os << host << OC_RSRVD_DEVICE_PRESENCE_URI; QueryParamsList queryParams({{OC_RSRVD_DEVICE_ID, di}}); std::string url = assembleSetResourceUri(os.str(), queryParams); @@ -1256,7 +1518,9 @@ namespace OC options[i].protocolID = OC_COAP_ID; options[i].optionID = it->getOptionID(); options[i].optionLength = it->getOptionData().length() + 1; - strcpy((char*)options[i].optionData, (it->getOptionData().c_str())); + strncpy((char*)options[i].optionData, it->getOptionData().c_str(), + sizeof(options[i].optionLength) -1 ); + options[i].optionData[sizeof(options[i].optionLength) - 1] = 0; i++; } @@ -1311,6 +1575,7 @@ namespace OC << std::flush; } else { + OIC_LOG_V(DEBUG, TAG, "%s: call response callback", __func__); convert(list, dpDeviceList); std::thread exec(callback, dpDeviceList); exec.detach(); @@ -1346,10 +1611,10 @@ namespace OC if (NULL == list) { result = OC_STACK_NO_RESOURCE; - oclog() << "findDirectPairingDevices(): No device found for direct pairing" - << std::flush; + OIC_LOG_V(DEBUG, TAG, "%s: No device found for direct pairing", __func__); } else { + OIC_LOG_V(DEBUG, TAG, "%s: call response callback", __func__); convert(list, dpDeviceList); std::thread exec(callback, dpDeviceList); exec.detach(); @@ -1371,6 +1636,7 @@ namespace OC ClientCallbackContext::DirectPairingContext* context = static_cast(ctx); + OIC_LOG_V(DEBUG, TAG, "%s: call response callback", __func__); std::thread exec(context->callback, cloneDevice(peer), result); exec.detach(); } @@ -1402,4 +1668,100 @@ namespace OC } return result; } +#ifdef TCP_ADAPTER + OCStackApplicationResult KeepAliveRespCallback(void* ctx, + OCDoHandle /*handle*/, + OCClientResponse* clientResponse) + { + ClientCallbackContext::KeepAliveContext* context = + static_cast(ctx); + OCRepresentation attrs; + OCStackResult result = clientResponse->result; + + try + { + attrs = parseGetSetCallback(clientResponse); + } + catch(OC::OCException& e) + { + result = e.code(); + } + + OIC_LOG_V(DEBUG, TAG, "%s: call response callback", __func__); + std::thread exec(context->callback, result, attrs); + exec.detach(); + return OC_STACK_DELETE_TRANSACTION; + } + + OCStackResult InProcClientWrapper::findKeepAliveResource(std::string host, + KeepAliveCallback resultCallback) + { + if (host.empty() || !resultCallback) + { + oclog() << "Invalid parameters" << std::flush; + return OC_STACK_INVALID_PARAM; + } + + OCStackResult result = OC_STACK_ERROR; + + ClientCallbackContext::KeepAliveContext* ctx = + new ClientCallbackContext::KeepAliveContext(resultCallback); + OCCallbackData cbdata; + cbdata.context = static_cast(ctx), + cbdata.cb = KeepAliveRespCallback; + cbdata.cd = [](void* c){delete (ClientCallbackContext::KeepAliveContext*)c;}; + + auto cLock = m_csdkLock.lock(); + + if (cLock) + { + std::lock_guard lock(*cLock); + result = OCFindKeepAliveResource(nullptr, host.c_str(), &cbdata); + } + else + { + delete ctx; + result = OC_STACK_ERROR; + } + return result; + } + + OCStackResult InProcClientWrapper::sendKeepAliveRequest(std::string host, + const OCRepresentation& rep, + KeepAliveCallback resultCallback) + { + if (!resultCallback) + { + oclog() << "Invalid parameters" << std::flush; + return OC_STACK_INVALID_PARAM; + } + + OCStackResult result = OC_STACK_ERROR; + + ClientCallbackContext::KeepAliveContext* ctx = new ClientCallbackContext::KeepAliveContext(resultCallback); + OCCallbackData cbdata; + cbdata.context = static_cast(ctx), + cbdata.cb = KeepAliveRespCallback; + cbdata.cd = [](void* c){delete (ClientCallbackContext::KeepAliveContext*)c;}; + + auto cLock = m_csdkLock.lock(); + + if (cLock) + { + std::lock_guard lock(*cLock); + OCRepPayload *payload = rep.getPayload(); + result = OCSendKeepAliveRequest (nullptr, host.c_str(), (OCPayload*)payload, &cbdata); + if (result != OC_STACK_OK) + { + OCRepPayloadDestroy(payload); + } + } + else + { + delete ctx; + result = OC_STACK_ERROR; + } + return result; + } +#endif }