From: kush.agrawal Date: Mon, 15 Apr 2019 13:24:54 +0000 (+0900) Subject: Make OCProcessEvent method. X-Git-Tag: accepted/tizen/unified/20190418.145712~3 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=refs%2Fchanges%2F89%2F203789%2F1;p=platform%2Fupstream%2Fiotivity.git Make OCProcessEvent method. OCProcess() need to poll periodically to check if any work to process. This can be burden in some cases. OCProcessEvent can get wake-up time using out parameter. So we can use timed_wait not polling every periodic seconds. To wake up for handling any receive event, we need to register oc_event and this will send a signal to waited one. Following patches are taken frm 2.0-rel branch : 1. https://github.sec.samsung.net/RS7-IOTIVITY/IoTivity/pull/385 2. https://github.sec.samsung.net/RS7-IOTIVITY/IoTivity/pull/393 3. https://github.sec.samsung.net/RS7-IOTIVITY/IoTivity/pull/401 4. https://github.sec.samsung.net/RS7-IOTIVITY/IoTivity/pull/431 5. https://github.sec.samsung.net/RS7-IOTIVITY/IoTivity/pull/450 6. https://github.sec.samsung.net/RS7-IOTIVITY/IoTivity/pull/470 https://github.sec.samsung.net/RS7-IOTIVITY/IoTivity/pull/479 (cherry picked from 7ced5d4ee1b0fc3ee3cf0f4478f8df085b540b8e) Change-Id: I27d1e17dcf21c5a45a7ff4a8b1a988b5c5edaf0a Signed-off-by: kush.agrawal Signed-off-by: DoHyun Pyun --- diff --git a/build_common/SConscript b/build_common/SConscript old mode 100644 new mode 100755 index 8612220..f02da23 --- a/build_common/SConscript +++ b/build_common/SConscript @@ -138,6 +138,7 @@ help_vars.Add(EnumVariable('TIZEN_4', 'Build with tizen 4.0 api', 'False', allow help_vars.Add(PathVariable('TIZENRT_OS_DIR', 'Absolute Path to TizenRT OS directory', None, PathVariable.PathAccept)) help_vars.Add(EnumVariable('PLATFORM_TLS', 'Use platform tls instead of local mbedtls', '0', allowed_values=('0', '1'))) help_vars.Add(EnumVariable('OIC_SUPPORT_TIZEN_TRACE', 'Tizen Trace(T-trace) api availability', 'False', allowed_values=('True', 'False'))) +help_vars.Add(EnumVariable('WITH_PROCESS_EVENT','Build including procee event logics in ocstack', 'False', allowed_values=('True', 'False'))) AddOption('--prefix', dest='prefix', type='string', @@ -366,6 +367,9 @@ if env.get('DISABLE_PRESENCE') == False: if env.get('DISABLE_BLE_SERVER'): defines.append('-DDISABLE_BLE_SERVER=1') +if env.get('WITH_PROCESS_EVENT'): + env.AppendUnique(CPPDEFINES=['WITH_PROCESS_EVENT']) + libs = [] if env.get('SECURED') == '1': defines.append('-D__WITH_DTLS__=1') diff --git a/resource/c_common/SConscript b/resource/c_common/SConscript old mode 100644 new mode 100755 index 0a9ff8e..c023903 --- a/resource/c_common/SConscript +++ b/resource/c_common/SConscript @@ -149,7 +149,8 @@ env.AppendUnique(CPPPATH = [ os.path.join(Dir('.').abspath, 'oic_string', 'include'), os.path.join(Dir('.').abspath, 'oic_time', 'include'), os.path.join(Dir('.').abspath, 'ocrandom', 'include'), - os.path.join(Dir('.').abspath, 'octhread', 'include') + os.path.join(Dir('.').abspath, 'octhread', 'include'), + os.path.join(Dir('.').abspath, 'ocevent', 'include') ]) if target_os == 'tizen': @@ -179,6 +180,11 @@ elif target_os in ['windows']: else: common_src.append('octhread/src/noop/octhread.c') +if target_os in ['windows']: + common_src.append('ocevent/src/windows/ocevent.c') +else: + common_src.append('ocevent/src/others/ocevent.c') + commonlib = common_env.StaticLibrary('c_common', common_src) common_env.InstallTarget(commonlib, 'c_common') common_env.UserInstallTargetLib(commonlib, 'c_common') diff --git a/resource/c_common/iotivity_debug.h b/resource/c_common/iotivity_debug.h new file mode 100755 index 0000000..9a0141a --- /dev/null +++ b/resource/c_common/iotivity_debug.h @@ -0,0 +1,42 @@ +//****************************************************************** +// +// Copyright 2016 Microsoft +// +//-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//****************************************************************** + +/** + * @file + * + * This file contains debug helpers. + */ + +#ifndef IOTIVITY_DEBUG_H_ +#define IOTIVITY_DEBUG_H_ + +#include + +// Macro used to avoid the need for a local variable just for an assert. Using +// a local variable just for assert, instead of this macro, can cause compiler +// warnings on NDEBUG builds. Example: use OC_VERIFY(foo() == 0) instead of +// {int local = foo(); assert(local == 0);} +#if defined(NDEBUG) +#define OC_VERIFY(condition) ((void)(condition)) +#else +#define OC_VERIFY(condition) assert(condition) +#endif + +#endif // #ifndef IOTIVITY_DEBUG_H_ diff --git a/resource/c_common/ocevent/include/ocevent.h b/resource/c_common/ocevent/include/ocevent.h new file mode 100755 index 0000000..26de762 --- /dev/null +++ b/resource/c_common/ocevent/include/ocevent.h @@ -0,0 +1,81 @@ +/* ***************************************************************** + * + * Copyright 2017 Microsoft + * + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************/ + +/** + * @file + * + * This file defines the event object. + */ + +#ifndef OC_EVENT_H_ +#define OC_EVENT_H_ + +#include "octhread.h" +#include + +#ifdef __cplusplus +extern "C" +{ +#endif /* __cplusplus */ + +typedef struct oc_event_t *oc_event; + +/** + * Creates a new event. + * + * @return Reference to newly created event, NULL on allocation failure. + */ +oc_event oc_event_new(void); + +/** + * Frees the oc_event. + * + * @param[in] event Optional event to be freed. + */ +void oc_event_free(oc_event event); + +/** + * Waits infinitely for the event to be signaled. + * + * @param[in] event Event to wait on. + */ +void oc_event_wait(oc_event event); + +/** + * Waits for the event to be signaled or timed out. + * + * @param[in] event Event to wait on. + * @param[in] milliseconds Timeout in milliseconds. + * @return OC_WAIT_SUCCESS Event was signaled before timeout expired. + * OC_WAIT_TIMEDOUT Timeout interval elapsed. + */ +OCWaitResult_t oc_event_wait_for(oc_event event, uint32_t milliseconds); + +/** + * Signals the event. + * + * @param[in] event Event to signal. + */ +void oc_event_signal(oc_event event); + +#ifdef __cplusplus +} /* extern "C" */ +#endif /* __cplusplus */ + +#endif /* OC_EVENT_H_ */ diff --git a/resource/c_common/ocevent/src/others/ocevent.c b/resource/c_common/ocevent/src/others/ocevent.c new file mode 100755 index 0000000..75ee446 --- /dev/null +++ b/resource/c_common/ocevent/src/others/ocevent.c @@ -0,0 +1,159 @@ +/* ***************************************************************** + * + * Copyright 2017 Microsoft + * + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************/ + +/** + * @file + * This file implements Event object for allowing threads to wait on. + */ + +#include "ocevent.h" +#include "oic_malloc.h" +#include "oic_time.h" +#include "octhread.h" +#include "logger.h" +#include "iotivity_debug.h" + +#include +#include +#include + +/** + * TAG + * Logging tag for module name + */ +#define TAG "OIC_EVENT" + +typedef struct oc_event_t +{ + /* Mutex for protecting the members. */ + oc_mutex mutex; + /* The conditional variable to wait on. */ + oc_cond cond; + /* Whether the event is signaled. */ + bool signaled; +} oc_event_t; + +oc_event oc_event_new(void) +{ + oc_event event = (oc_event)OICCalloc(1, sizeof(oc_event_t)); + if (!event) + { + OIC_LOG(ERROR, TAG, "Failed to allocate oc_event"); + return NULL; + } + + event->mutex = oc_mutex_new(); + event->cond = oc_cond_new(); + event->signaled = false; + + if (!event->mutex || !event->cond) + { + oc_event_free(event); + return NULL; + } + + return event; +} + +void oc_event_free(oc_event event) +{ + if (event) + { + oc_mutex_free(event->mutex); + oc_cond_free(event->cond); + OICFree(event); + } +} + +void oc_event_wait(oc_event event) +{ + OC_VERIFY(OC_WAIT_SUCCESS == oc_event_wait_for(event, UINT32_MAX)); +} + +OCWaitResult_t oc_event_wait_for(oc_event event, uint32_t milliseconds) +{ + bool timedOut = false; + oc_mutex_assert_owner(event->mutex, false); + oc_mutex_lock(event->mutex); + + if (!event->signaled) + { + if (0 != milliseconds) + { + const uint64_t startTime = OICGetCurrentTime(TIME_IN_MS); + uint64_t remaining = milliseconds; + // This while loop is to filter spurious wakeups caused by conditional variable. + while (!event->signaled) + { + oc_mutex_assert_owner(event->mutex, true); + OCWaitResult_t waitResult = oc_cond_wait_for(event->cond, event->mutex, + (remaining * US_PER_MS)); + if (OC_WAIT_TIMEDOUT == waitResult) + { + timedOut = true; + break; + } + assert(OC_WAIT_SUCCESS == waitResult); + + // Not timed out, see if the event is in signaled state and reset it. + if (event->signaled) + { + timedOut = false; + break; + } + + // Not timed out and not signaled => spurious wakeup, see if we ran out of time. + const uint64_t elapsed = (OICGetCurrentTime(TIME_IN_MS) - startTime); + if (elapsed >= (uint64_t)milliseconds) + { + timedOut = true; + break; + } + + // Encountered spurious wakeup and still has time to wait, recalculate the + // remaining time and wait again. + // Spurious wakeup: depending on the platform, waiting on a Condition Variable can + // occasionally (though rarely) return from the wait state even when the condition + // isn't set, so we always need to revalidate the state in a loop here. + remaining = (uint64_t)milliseconds - elapsed; + } + } + else + { + // Zero timeout provided and the event has not been signaled. + timedOut = true; + } + } + oc_mutex_assert_owner(event->mutex, true); + event->signaled = false; + oc_mutex_unlock(event->mutex); + return timedOut ? OC_WAIT_TIMEDOUT : OC_WAIT_SUCCESS; +} + +void oc_event_signal(oc_event event) +{ + oc_mutex_assert_owner(event->mutex, false); + oc_mutex_lock(event->mutex); + if (!event->signaled) + { + event->signaled = true; + oc_cond_signal(event->cond); + } + oc_mutex_unlock(event->mutex); +} diff --git a/resource/c_common/ocevent/src/windows/ocevent.c b/resource/c_common/ocevent/src/windows/ocevent.c new file mode 100755 index 0000000..babdc0a --- /dev/null +++ b/resource/c_common/ocevent/src/windows/ocevent.c @@ -0,0 +1,98 @@ +/* ***************************************************************** + * + * Copyright 2017 Microsoft + * + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************/ + +/** + * @file + * This file implements Event object for allowing threads to wait on. + */ + +#include "ocevent.h" +#include "oic_malloc.h" +#include "experimental/logger.h" +#include "iotivity_debug.h" + +#include +#include +#include +#include + +/** + * TAG + * Logging tag for module name + */ +#define TAG "OIC_EVENT" + +typedef struct oc_event_t +{ + /* The event handle */ + HANDLE event; +} oc_event_t; + +oc_event oc_event_new() +{ + oc_event event = (oc_event)OICCalloc(1, sizeof(oc_event_t)); + if (!event) + { + OIC_LOG(ERROR, TAG, "Failed to allocate oc_event"); + return NULL; + } + + /* Create an auto-reset event */ + event->event = CreateEvent(NULL, FALSE, FALSE, NULL); + if (!event->event) + { + oc_event_free(event); + return NULL; + } + + return event; +} + +void oc_event_free(oc_event event) +{ + if (event && event->event) + { + OC_VERIFY(0 != CloseHandle(event->event)); + } + OICFree(event); +} + +void oc_event_wait(oc_event event) +{ + OC_VERIFY(OC_WAIT_SUCCESS == WaitForSingleObject(event->event, INFINITE)); +} + +OCWaitResult_t oc_event_wait_for(oc_event event, uint32_t milliseconds) +{ + DWORD waitResult = WaitForSingleObject(event->event, milliseconds); + switch (waitResult) + { + case WAIT_OBJECT_0: + return OC_WAIT_SUCCESS; + case WAIT_TIMEOUT: + return OC_WAIT_TIMEDOUT; + default: + return OC_WAIT_INVAL; + } +} + +void oc_event_signal(oc_event event) +{ + OC_VERIFY(0 != SetEvent(event->event)); +} \ No newline at end of file diff --git a/resource/c_common/ocevent/test/SConscript b/resource/c_common/ocevent/test/SConscript new file mode 100755 index 0000000..8424a56 --- /dev/null +++ b/resource/c_common/ocevent/test/SConscript @@ -0,0 +1,53 @@ +#****************************************************************** +# +# Copyright 2017 Microsoft +# +#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + +import os +import os.path +from tools.scons.RunTest import * + +Import('test_env') + +eventtests_env = test_env.Clone() +target_os = eventtests_env.get('TARGET_OS') + +###################################################################### +# Build flags +###################################################################### +eventtests_env.PrependUnique(CPPPATH=['#resource/c_common/ocevent/include']) + +eventtests_env.AppendUnique(LIBPATH=[eventtests_env.get('BUILD_DIR')]) +eventtests_env.Append(LIBS=['logger']) + +if eventtests_env.get('LOGGING'): + eventtests_env.AppendUnique(CPPDEFINES=['TB_LOG']) + +###################################################################### +# Source files and Targets +###################################################################### +eventtests = eventtests_env.Program('eventtests', ['eventtest.cpp']) + +Alias("test", [eventtests]) + +eventtests_env.AppendTarget('test') +if eventtests_env.get('TEST') == '1': + if target_os in ['linux', 'windows']: + run_test(eventtests_env, + 'resource_c_common_event_test.memcheck', + 'resource/c_common/ocevent/test/eventtests') diff --git a/resource/c_common/ocevent/test/eventtest.cpp b/resource/c_common/ocevent/test/eventtest.cpp new file mode 100755 index 0000000..1a2c41e --- /dev/null +++ b/resource/c_common/ocevent/test/eventtest.cpp @@ -0,0 +1,97 @@ +/* ***************************************************************** + * + * Copyright 2017 Microsoft + * + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************/ + +/** + * @file + * + * This file implement tests for the Event object. + */ + +#include "ocevent.h" +#include "gtest/gtest.h" +#include +#include + +class EventTester : public testing::Test +{ + protected: + virtual void SetUp() + { + m_event = oc_event_new(); + ASSERT_TRUE(nullptr != m_event); + } + virtual void TearDown() + { + oc_event_free(m_event); + } + void TestSignalBeforeWait(uint32_t timeout); + oc_event m_event; +}; + +void EventTester::TestSignalBeforeWait(uint32_t timeout) +{ + std::thread thread([this]() + { + oc_event_signal(m_event); + }); + + // Make sure the signal occurs before the wait. + thread.join(); + + if (UINT_MAX == timeout) + { + oc_event_wait(m_event); + } + else + { + EXPECT_EQ(OC_WAIT_SUCCESS, oc_event_wait_for(m_event, timeout)); + } +} + +TEST_F(EventTester, InfiniteTimeout_SignaledAfterWait) +{ + std::thread thread([this]() + { + // This sleep allows the main thread to enter the wait state before the signal. + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + oc_event_signal(m_event); + }); + oc_event_wait(m_event); + thread.join(); +} + +TEST_F(EventTester, InfiniteTimeout_SignaledBeforeWait) +{ + TestSignalBeforeWait(UINT_MAX); +} + +TEST_F(EventTester, ZeroTimeout_NotSignaled) +{ + EXPECT_EQ(OC_WAIT_TIMEDOUT, oc_event_wait_for(m_event, 0)); +} + +TEST_F(EventTester, ZeroTimeout_Signaled) +{ + TestSignalBeforeWait(0); +} + +TEST_F(EventTester, TimedOut) +{ + EXPECT_EQ(OC_WAIT_TIMEDOUT, oc_event_wait_for(m_event, 10)); +} diff --git a/resource/c_common/octhread/include/octhread.h b/resource/c_common/octhread/include/octhread.h old mode 100644 new mode 100755 index 8f96de8..3caffa0 --- a/resource/c_common/octhread/include/octhread.h +++ b/resource/c_common/octhread/include/octhread.h @@ -37,6 +37,11 @@ extern "C" { #endif /* __cplusplus */ +/** + * Value used for the owner field of an oc_mutex that doesn't have an owner. + */ +#define OC_INVALID_THREAD_ID 0 + typedef struct oc_mutex_internal *oc_mutex; typedef struct oc_cond_internal *oc_cond; typedef struct oc_thread_internal *oc_thread; @@ -164,6 +169,18 @@ void oc_mutex_unlock(oc_mutex mutex); bool oc_mutex_free(oc_mutex mutex); /** + * On Debug builds, assert that the current thread owns or does not own a mutex. + * + * This function has no effect on Release builds. + * + * @param[in] mutex The mutex to assert on. + * @param[in] currentThreadIsOwner true if the current thread is expected to + * be the mutex owner, false otherwise. + * + */ +void oc_mutex_assert_owner(const oc_mutex mutex, bool currentThreadIsOwner); + +/** * Creates new condition. * * @return Reference to newly created oc_cond, otherwise NULL. diff --git a/resource/c_common/octhread/src/noop/octhread.c b/resource/c_common/octhread/src/noop/octhread.c old mode 100644 new mode 100755 index 43e2bb6..39cfff9 --- a/resource/c_common/octhread/src/noop/octhread.c +++ b/resource/c_common/octhread/src/noop/octhread.c @@ -94,6 +94,11 @@ void oc_mutex_unlock(oc_mutex mutex) return; } +void oc_mutex_assert_owner(const oc_mutex mutex, bool currentThreadIsOwner) +{ + return; +} + oc_cond oc_cond_new(void) { return (oc_cond)&g_condInfo; diff --git a/resource/c_common/octhread/src/posix/octhread.c b/resource/c_common/octhread/src/posix/octhread.c old mode 100644 new mode 100755 index 4a0ca43..7214ebf --- a/resource/c_common/octhread/src/posix/octhread.c +++ b/resource/c_common/octhread/src/posix/octhread.c @@ -88,6 +88,8 @@ static const uint64_t NANOSECS_PER_SEC = 1000000000L; typedef struct _tagMutexInfo_t { pthread_mutex_t mutex; + pthread_t owner; + uint32_t recursionCount; } oc_mutex_internal; typedef struct _tagEventInfo_t @@ -102,6 +104,13 @@ typedef struct _tagThreadInfo_t pthread_attr_t threadattr; } oc_thread_internal; +static pthread_t oc_get_current_thread_id() +{ + pthread_t id = pthread_self(); + assert(OC_INVALID_THREAD_ID != id); + return id; +} + #ifndef __TIZENRT__ OCThreadResult_t oc_thread_new(oc_thread *t, void *(*start_routine)(void *), void *arg) #else @@ -300,6 +309,23 @@ void oc_mutex_unlock(oc_mutex mutex) } } +void oc_mutex_assert_owner(const oc_mutex mutex, bool currentThreadIsOwner) +{ + assert(NULL != mutex); + const oc_mutex_internal *mutexInfo = (const oc_mutex_internal*) mutex; + + pthread_t currentThreadID = oc_get_current_thread_id(); + if (currentThreadIsOwner) + { + assert(pthread_equal(mutexInfo->owner, currentThreadID)); + assert(mutexInfo->recursionCount != 0); + } + else + { + assert(!pthread_equal(mutexInfo->owner, currentThreadID)); + } +} + oc_cond oc_cond_new(void) { oc_cond retVal = NULL; diff --git a/resource/c_common/octhread/src/windows/octhread.c b/resource/c_common/octhread/src/windows/octhread.c old mode 100644 new mode 100755 index 2613883..826a68a --- a/resource/c_common/octhread/src/windows/octhread.c +++ b/resource/c_common/octhread/src/windows/octhread.c @@ -38,8 +38,17 @@ static const uint64_t USECS_PER_MSEC = 1000; typedef struct _tagMutexInfo_t { CRITICAL_SECTION mutex; + DWORD owner; + uint32_t recursionCount; } oc_mutex_internal; +static DWORD oc_get_current_thread_id() +{ + DWORD id = GetCurrentThreadId(); + assert(OC_INVALID_THREAD_ID != id); + return id; +} + typedef struct _tagEventInfo_t { CONDITION_VARIABLE cond; @@ -174,6 +183,23 @@ void oc_mutex_unlock(oc_mutex mutex) } } +void oc_mutex_assert_owner(const oc_mutex mutex, bool currentThreadIsOwner) +{ + assert(NULL != mutex); + const oc_mutex_internal *mutexInfo = (const oc_mutex_internal*) mutex; + + DWORD currentThreadID = oc_get_current_thread_id(); + if (currentThreadIsOwner) + { + assert(mutexInfo->owner == currentThreadID); + assert(mutexInfo->recursionCount != 0); + } + else + { + assert(mutexInfo->owner != currentThreadID); + } +} + oc_cond oc_cond_new(void) { oc_cond retVal = NULL; diff --git a/resource/csdk/connectivity/api/cacommon.h b/resource/csdk/connectivity/api/cacommon.h index 3145bdd..7810b0d 100755 --- a/resource/csdk/connectivity/api/cacommon.h +++ b/resource/csdk/connectivity/api/cacommon.h @@ -27,6 +27,9 @@ #define CA_COMMON_H_ #include "iotivity_config.h" +#ifdef WITH_PROCESS_EVENT +#include "ocevent.h" +#endif #ifndef WITH_ARDUINO #ifdef TCP_ADAPTER diff --git a/resource/csdk/connectivity/api/cainterface.h b/resource/csdk/connectivity/api/cainterface.h old mode 100644 new mode 100755 index b667766..4abe1a0 --- a/resource/csdk/connectivity/api/cainterface.h +++ b/resource/csdk/connectivity/api/cainterface.h @@ -245,6 +245,10 @@ CAResult_t CASetRAInfo(const CARAInfo_t *caraInfo); CAResult_t CASetProxyUri(const char *uri); #endif +#ifdef WITH_PROCESS_EVENT +void CARegisterProcessEvent(oc_event event); +#endif + #ifdef __cplusplus } /* extern "C" */ #endif diff --git a/resource/csdk/connectivity/inc/camessagehandler.h b/resource/csdk/connectivity/inc/camessagehandler.h old mode 100644 new mode 100755 index 10203b9..9b4e83a --- a/resource/csdk/connectivity/inc/camessagehandler.h +++ b/resource/csdk/connectivity/inc/camessagehandler.h @@ -129,6 +129,10 @@ void CAAddDataToSendThread(CAData_t *data); void CAAddDataToReceiveThread(CAData_t *data); #endif +#ifdef WITH_PROCESS_EVENT +void CARegisterMessageProcessEvent(oc_event event); +#endif + #ifdef __cplusplus } /* extern "C" */ #endif diff --git a/resource/csdk/connectivity/src/caconnectivitymanager.c b/resource/csdk/connectivity/src/caconnectivitymanager.c old mode 100644 new mode 100755 index c3247d2..bd5df0e --- a/resource/csdk/connectivity/src/caconnectivitymanager.c +++ b/resource/csdk/connectivity/src/caconnectivitymanager.c @@ -610,3 +610,10 @@ void CARegisterKeepAliveHandler(CAKeepAliveConnectionCallback ConnHandler) CATCPSetKeepAliveCallbacks(ConnHandler); } #endif + +#ifdef WITH_PROCESS_EVENT +void CARegisterProcessEvent(oc_event event) +{ + CARegisterMessageProcessEvent(event); +} +#endif // WITH_PROCESS_EVENT \ No newline at end of file diff --git a/resource/csdk/connectivity/src/camessagehandler.c b/resource/csdk/connectivity/src/camessagehandler.c index 882ddd4..638610e 100644 --- a/resource/csdk/connectivity/src/camessagehandler.c +++ b/resource/csdk/connectivity/src/camessagehandler.c @@ -60,6 +60,10 @@ static ca_thread_pool_t g_threadPoolHandle = NULL; static CAQueueingThread_t g_sendThread; static CAQueueingThread_t g_receiveThread; +#ifdef WITH_PROCESS_EVENT +static oc_event g_processEvent = NULL; +#endif // WITH_PROCESS_EVENT + #else #define CA_MAX_RT_ARRAY_SIZE 3 #endif // SINGLE_THREAD @@ -121,6 +125,13 @@ void CAAddDataToReceiveThread(CAData_t *data) // add thread CAQueueingThreadAddData(&g_receiveThread, data, sizeof(CAData_t)); + +#ifdef WITH_PROCESS_EVENT + if (g_processEvent) + { + oc_event_signal(g_processEvent); + } +#endif } #endif @@ -336,7 +347,14 @@ static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uin CAProcessReceivedData(cadata); #else CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t)); -#endif + +#ifdef WITH_PROCESS_EVENT + if (g_processEvent) + { + oc_event_signal(g_processEvent); + } +#endif//WITH_PROCESS_EVENT +#endif// SINGLE_THREAD } static void CADestroyData(void *data, uint32_t size) @@ -865,7 +883,7 @@ static CAResult_t CAReceivedPacketCallback(const CASecureEndpoint_t *sep, if (CA_NOT_SUPPORTED == res || CA_REQUEST_TIMEOUT == res) { OIC_LOG(DEBUG, TAG, "this message does not have block option"); - CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t)); + CAAddDataToReceiveThread(cadata); } else { @@ -876,6 +894,13 @@ static CAResult_t CAReceivedPacketCallback(const CASecureEndpoint_t *sep, #endif { CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t)); + +#ifdef WITH_PROCESS_EVENT + if (g_processEvent) + { + oc_event_signal(g_processEvent); + } +#endif } #endif // SINGLE_THREAD @@ -932,6 +957,18 @@ static void CAConnectionStateChangedCallback(const CAEndpoint_t *info, bool isCo } } +static u_queue_message_t *get_receive_queue_item(void) +{ + u_queue_message_t *item = NULL; + + oc_mutex_lock(g_receiveThread.threadMutex); + item = u_queue_get_element(g_receiveThread.dataQueue); + oc_mutex_unlock(g_receiveThread.threadMutex); + + return item; +} + + void CAHandleRequestResponseCallbacks() { #ifdef SINGLE_THREAD @@ -943,39 +980,44 @@ void CAHandleRequestResponseCallbacks() // #1 parse the data // #2 get endpoint - oc_mutex_lock(g_receiveThread.threadMutex); - - u_queue_message_t *item = u_queue_get_element(g_receiveThread.dataQueue); - - oc_mutex_unlock(g_receiveThread.threadMutex); + u_queue_message_t *item = NULL; +#ifdef WITH_PROCESS_EVENT + while ((item = get_receive_queue_item()) != NULL) +#else + if ((item = get_receive_queue_item()) != NULL) +#endif + { if (NULL == item->msg) + { + OICFree(item); +#ifdef WITH_PROCESS_EVENT + continue; +#else + return; +#endif + } - if (NULL == item || NULL == item->msg) - { - return; - } + // get endpoint + CAData_t *td = (CAData_t *) item->msg; - // get endpoint - CAData_t *td = (CAData_t *) item->msg; + if (td->requestInfo && g_requestHandler) + { + OIC_LOG_V(DEBUG, TAG, "request callback : %d", td->requestInfo->info.numOptions); + g_requestHandler(td->remoteEndpoint, td->requestInfo); + } + else if (td->responseInfo && g_responseHandler) + { + OIC_LOG_V(DEBUG, TAG, "response callback : %d", td->responseInfo->info.numOptions); + g_responseHandler(td->remoteEndpoint, td->responseInfo); + } + else if (td->errorInfo && g_errorHandler) + { + OIC_LOG_V(DEBUG, TAG, "error callback error: %d", td->errorInfo->result); + g_errorHandler(td->remoteEndpoint, td->errorInfo); + } - if (td->requestInfo && g_requestHandler) - { - OIC_LOG_V(DEBUG, TAG, "request callback : %d", td->requestInfo->info.numOptions); - g_requestHandler(td->remoteEndpoint, td->requestInfo); - } - else if (td->responseInfo && g_responseHandler) - { - OIC_LOG_V(DEBUG, TAG, "response callback : %d", td->responseInfo->info.numOptions); - g_responseHandler(td->remoteEndpoint, td->responseInfo); - } - else if (td->errorInfo && g_errorHandler) - { - OIC_LOG_V(DEBUG, TAG, "error callback error: %d", td->errorInfo->result); - g_errorHandler(td->remoteEndpoint, td->errorInfo); + CADestroyData(item->msg, sizeof(CAData_t)); + OICFree(item); } - - CADestroyData(item->msg, sizeof(CAData_t)); - OICFree(item); - #endif // SINGLE_HANDLE #endif // SINGLE_THREAD } @@ -1323,6 +1365,10 @@ void CATerminateMessageHandler() u_arraylist_t *list = CAGetSelectedNetworkList(); uint32_t length = u_arraylist_length(list); + #ifdef WITH_PROCESS_EVENT + g_processEvent = NULL; +#endif + uint32_t i = 0; for (i = 0; i < length; i++) { @@ -1455,6 +1501,13 @@ void CAErrorHandler(const CAEndpoint_t *endpoint, cadata->errorInfo->result = result; CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t)); + +#ifdef WITH_PROCESS_EVENT + if (g_processEvent) + { + oc_event_signal(g_processEvent); + } +#endif coap_delete_pdu(pdu); #else (void)result; @@ -1508,6 +1561,13 @@ static void CASendErrorInfo(const CAEndpoint_t *endpoint, const CAInfo_t *info, cadata->dataType = CA_ERROR_DATA; CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t)); + +#ifdef WITH_PROCESS_EVENT + if (g_processEvent) + { + oc_event_signal(g_processEvent); + } +#endif//WITH_PROCESS_EVENT #endif OIC_LOG(DEBUG, TAG, "CASendErrorInfo OUT"); } @@ -1819,3 +1879,10 @@ static void CALogPDUInfo(const CAData_t *data, const coap_pdu_t *pdu) pdu->transport_hdr->udp.token_length); } #endif + +#ifdef WITH_PROCESS_EVENT +void CARegisterMessageProcessEvent(oc_event event) +{ + g_processEvent = event; +} +#endif // WITH_PROCESS_EVENT diff --git a/resource/csdk/routing/src/routingmanager.c b/resource/csdk/routing/src/routingmanager.c old mode 100644 new mode 100755 index 9f87d2f..fb4afac --- a/resource/csdk/routing/src/routingmanager.c +++ b/resource/csdk/routing/src/routingmanager.c @@ -714,7 +714,21 @@ exit: return result; } -void RMProcess() +#ifdef WITH_PROCESS_EVENT +static void compareAndApplyTimeout(uint32_t *nextEventTime, + uint64_t timeoutSec) +{ + uint32_t timeoutMs = timeoutSec * MS_PER_SEC; + if (timeoutMs < *nextEventTime) + { + *nextEventTime = timeoutMs; + } +} + +void RMProcess(uint32_t *nextEventTime) +#else // WITH_PROCESS_EVENT +void RMProcess(void) +#endif // !WITH_PROCESS_EVENT { if (!g_isRMInitialized) { @@ -741,7 +755,17 @@ void RMProcess() result = RMSendNotificationToAll(payload); RMPFreePayload(payload); RM_VERIFY_SUCCESS(result, OC_STACK_OK); +#ifdef WITH_PROCESS_EVENT + compareAndApplyTimeout(nextEventTime, GATEWAY_ALIVE_TIMEOUT); +#endif // WITH_PROCESS_EVENT + } +#ifdef WITH_PROCESS_EVENT + else + { + compareAndApplyTimeout(nextEventTime, + GATEWAY_ALIVE_TIMEOUT - (currentTime - g_aliveTime)); } +#endif // WITH_PROCESS_EVENT if (ROUTINGTABLE_VALIDATION_TIMEOUT <= currentTime - g_refreshTableTime) { @@ -770,8 +794,18 @@ void RMProcess() g_refreshTableTime = currentTime; g_isValidated = false; u_linklist_free(&removedEntries); +#ifdef WITH_PROCESS_EVENT + compareAndApplyTimeout(nextEventTime, ROUTINGTABLE_VALIDATION_TIMEOUT); +#endif // WITH_PROCESS_EVENT goto exit; } +#ifdef WITH_PROCESS_EVENT + else + { + compareAndApplyTimeout(nextEventTime, + ROUTINGTABLE_VALIDATION_TIMEOUT - (currentTime - g_refreshTableTime)); + } +#endif // WITH_PROCESS_EVENT if (!g_isValidated && ROUTINGTABLE_REFRESH_TIMEOUT <= (currentTime - g_refreshTableTime)) { @@ -799,7 +833,17 @@ void RMProcess() g_isValidated = true; RTMPrintTable(g_routingGatewayTable, g_routingEndpointTable); u_linklist_free(&invalidInterfaces); +#ifdef WITH_PROCESS_EVENT + compareAndApplyTimeout(nextEventTime, ROUTINGTABLE_REFRESH_TIMEOUT); +#endif // WITH_PROCESS_EVENT + } +#ifdef WITH_PROCESS_EVENT + else + { + compareAndApplyTimeout(nextEventTime, + ROUTINGTABLE_REFRESH_TIMEOUT - (currentTime - g_refreshTableTime)); } +#endif // WITH_PROCESS_EVENT exit: return; diff --git a/resource/csdk/stack/include/internal/ocstackinternal.h b/resource/csdk/stack/include/internal/ocstackinternal.h old mode 100644 new mode 100755 index 4885b56..47003a4 --- a/resource/csdk/stack/include/internal/ocstackinternal.h +++ b/resource/csdk/stack/include/internal/ocstackinternal.h @@ -367,6 +367,13 @@ OCStackResult ParseRequestUri(const char *fullUri, */ void FixUpClientResponse(OCClientResponse *cr); +#ifdef WITH_PROCESS_EVENT +/** + * Send a signal to processEvent if event is registered. + */ +void OCSendProcessEventSignal(void); +#endif // WITH_PROCESS_EVENT + #ifdef __cplusplus } #endif // __cplusplus diff --git a/resource/csdk/stack/include/internal/oickeepaliveinternal.h b/resource/csdk/stack/include/internal/oickeepaliveinternal.h old mode 100644 new mode 100755 index caccdac..61cfb42 --- a/resource/csdk/stack/include/internal/oickeepaliveinternal.h +++ b/resource/csdk/stack/include/internal/oickeepaliveinternal.h @@ -77,7 +77,11 @@ OCStackResult OCHandleKeepAliveResponse(const CAEndpoint_t *endPoint, const OCPa /** * Process the KeepAlive timer to send ping message to OIC Server. */ +#ifdef WITH_PROCESS_EVENT +void OCProcessKeepAlive(uint32_t *nextEventTime); +#else // WITH_PROCESS_EVENT void OCProcessKeepAlive(); +#endif // !WITH_PROCESS_EVENT /** * This API will be called from RI layer whenever there is a request for KeepAlive. diff --git a/resource/csdk/stack/include/ocstack.h b/resource/csdk/stack/include/ocstack.h old mode 100644 new mode 100755 index 96bae43..a1f5cd2 --- a/resource/csdk/stack/include/ocstack.h +++ b/resource/csdk/stack/include/ocstack.h @@ -31,6 +31,9 @@ #include #include #include "octypes.h" +#ifdef WITH_PROCESS_EVENT +#include "ocevent.h" +#endif #ifdef __cplusplus extern "C" { @@ -125,7 +128,14 @@ OCStackResult OCStopMulticastServer(); * * @return ::OC_STACK_OK on success, some other value upon failure. */ + +#ifdef WITH_PROCESS_EVENT OCStackResult OCProcess(); +OCStackResult OCProcessEvent(uint32_t *nextEventTime); +void OCRegisterProcessEvent(oc_event event); +#else +OCStackResult OCProcess(); +#endif /** * This function discovers or Perform requests on a specified resource diff --git a/resource/csdk/stack/samples/linux/SimpleClientServer/ocserver.cpp b/resource/csdk/stack/samples/linux/SimpleClientServer/ocserver.cpp old mode 100644 new mode 100755 index b832193..e9e481a --- a/resource/csdk/stack/samples/linux/SimpleClientServer/ocserver.cpp +++ b/resource/csdk/stack/samples/linux/SimpleClientServer/ocserver.cpp @@ -51,6 +51,9 @@ static int gObserveNotifyType = 3; int gQuitFlag = 0; int gLightUnderObservation = 0; +#ifdef WITH_PROCESS_EVENT +static oc_event processEvent = NULL; +#endif static LightResource Light; // This variable determines instance number of the Light resource. @@ -698,6 +701,10 @@ void handleSigInt(int signum) if (signum == SIGINT) { gQuitFlag = 1; +#ifdef WITH_PROCESS_EVENT + if (processEvent) + oc_event_signal(processEvent); +#endif } } @@ -1116,6 +1123,16 @@ int main(int argc, char* argv[]) OIC_LOG(INFO, TAG, "Device Registration failed!"); exit (EXIT_FAILURE); } +#ifdef WITH_PROCESS_EVENT + processEvent = oc_event_new(); + if (!processEvent) + { + OIC_LOG(INFO, TAG, "oc_event_new failed!"); + exit (EXIT_FAILURE); + } + + OCRegisterProcessEvent(processEvent); +#endif /* * Declare and create the example resource: Light @@ -1148,11 +1165,21 @@ int main(int argc, char* argv[]) while (!gQuitFlag) { +#ifdef WITH_PROCESS_EVENT + uint32_t nextEventTime; + if (OCProcessEvent(&nextEventTime) != OC_STACK_OK) + { + OIC_LOG(ERROR, TAG, "OCStack process error"); + break; + } + oc_event_wait_for(processEvent, nextEventTime); +#else if (OCProcess() != OC_STACK_OK) { OIC_LOG(ERROR, TAG, "OCStack process error"); return 0; } +#endif } if (observeThreadStarted) @@ -1170,6 +1197,14 @@ int main(int argc, char* argv[]) OIC_LOG(INFO, TAG, "Exiting ocserver main loop..."); +#ifdef WITH_PROCESS_EVENT + if (processEvent) + { + oc_event_free(processEvent); + processEvent = NULL; + } +#endif + if (OCStop() != OC_STACK_OK) { OIC_LOG(ERROR, TAG, "OCStack process error"); diff --git a/resource/csdk/stack/src/ocstack.c b/resource/csdk/stack/src/ocstack.c index e3aef64..63eef55 100644 --- a/resource/csdk/stack/src/ocstack.c +++ b/resource/csdk/stack/src/ocstack.c @@ -155,6 +155,10 @@ static const char CORESPEC[] = "core"; static OCOtmEventHandler_t g_otmEventHandler = {NULL, NULL}; #endif +#ifdef WITH_PROCESS_EVENT +static oc_event g_ocProcessEvent = NULL; +#endif // WITH_PROCESS_EVENT + //----------------------------------------------------------------------------- // Macros //----------------------------------------------------------------------------- @@ -3145,6 +3149,9 @@ OCStackResult OCDoRequest(OCDoHandle *handle, { *handle = resHandle; } +#ifdef WITH_PROCESS_EVENT + OCSendProcessEventSignal(); +#endif // WITH_PROCESS_EVENT goto exit; } @@ -3329,7 +3336,11 @@ OCStackResult OCRegisterPersistentStorageHandler(OCPersistentStorage* persistent #ifdef WITH_PRESENCE -OCStackResult OCProcessPresence() +#ifdef WITH_PROCESS_EVENT +OCStackResult OCProcessPresence(uint32_t *nextEventTime) +#else // WITH_PROCESS_EVENT +OCStackResult OCProcessPresence(void) +#endif // !WITH_PROCESS_EVENT { OCStackResult result = OC_STACK_OK; @@ -3390,8 +3401,15 @@ OCStackResult OCProcessPresence() continue; } - if (now < cbNode->presence->timeOut[cbNode->presence->TTLlevel]) + uint32_t timeout = cbNode->presence->timeOut[cbNode->presence->TTLlevel]; + if (now < timeout) { +#ifdef WITH_PROCESS_EVENT + if (nextEventTime && (timeout - now) < *nextEventTime) + { + *nextEventTime = timeout - now; + } +#endif // WITH_PROCESS_EVENT continue; } @@ -3429,6 +3447,56 @@ exit: } #endif // WITH_PRESENCE +#ifdef WITH_PROCESS_EVENT +OCStackResult OCProcess(void) +{ + uint32_t nextEventTime; + return OCProcessEvent(&nextEventTime); +} + +OCStackResult OCProcessEvent(uint32_t *nextEventTime) +{ + if (stackState == OC_STACK_UNINITIALIZED) + { + OIC_LOG(ERROR, TAG, "OCProcess has failed. ocstack is not initialized"); + return OC_STACK_ERROR; + } + + *nextEventTime = UINT32_MAX; + +#ifdef WITH_PRESENCE + OCProcessPresence(nextEventTime); + OIC_LOG_V(INFO, TAG, "OCProcessPresence next event time : %u", *nextEventTime); +#endif + CAHandleRequestResponse(); + +// TODO +#ifdef ROUTING_GATEWAY + RMProcess(nextEventTime); +#endif + +#ifdef TCP_ADAPTER + OCProcessKeepAlive(nextEventTime); + OIC_LOG_V(INFO, TAG, "OCProcessKeepAlive next event time : %u", *nextEventTime); +#endif + return OC_STACK_OK; +} + +void OCRegisterProcessEvent(oc_event event) +{ + g_ocProcessEvent = event; + CARegisterProcessEvent(event); +} + +void OCSendProcessEventSignal(void) +{ + if (g_ocProcessEvent) + { + oc_event_signal(g_ocProcessEvent); + } +} +#else // WITH_PROCESS_EVENT + OCStackResult OCProcess() { if (stackState == OC_STACK_UNINITIALIZED) @@ -3450,6 +3518,7 @@ OCStackResult OCProcess() #endif return OC_STACK_OK; } +#endif // !WITH_PROCESS_EVENT #ifdef WITH_PRESENCE OCStackResult OCStartPresence(const uint32_t ttl) @@ -5499,7 +5568,7 @@ OCStackResult OCGetHeaderOption(OCHeaderOption* ocHdrOpt, size_t numOptions, uin void OCDefaultAdapterStateChangedHandler(CATransportAdapter_t adapter, bool enabled) { OIC_LOG(DEBUG, TAG, "OCDefaultAdapterStateChangedHandler"); - + OC_UNUSED(adapter); OC_UNUSED(enabled); } diff --git a/resource/csdk/stack/src/oickeepalive.c b/resource/csdk/stack/src/oickeepalive.c old mode 100644 new mode 100755 index 7c29f72..451d9dc --- a/resource/csdk/stack/src/oickeepalive.c +++ b/resource/csdk/stack/src/oickeepalive.c @@ -614,13 +614,20 @@ OCEntityHandlerResult OCHandleKeepAlivePOSTRequest(OCServerRequest *request, return OC_EH_OK; } +#ifdef WITH_PROCESS_EVENT +void OCProcessKeepAlive(uint32_t *nextEventTime) +#else // WITH_PROCESS_EVENT void OCProcessKeepAlive() +#endif // !WITH_PROCESS_EVENT { oc_mutex_lock(g_mutexObjectList); KeepAliveEntry_t *entry = NULL; KeepAliveEntry_t *tmp = NULL; LL_FOREACH_SAFE(g_keepAliveConnectionTable, entry, tmp) { +#ifdef WITH_PROCESS_EVENT + uint64_t nextPingMicroSeconds = UINT64_MAX; +#endif // WITH_PROCESS_EVENT if (entry) { uint64_t currentTime = OICGetCurrentTime(TIME_IN_US); @@ -640,6 +647,13 @@ void OCProcessKeepAlive() // Send message to disconnect session. OCSendDisconnectMessage(entry); } +#ifdef WITH_PROCESS_EVENT + else + { + nextPingMicroSeconds = (KEEPALIVE_RESPONSE_TIMEOUT_SEC * USECS_PER_SEC) - + (currentTime - entry->timeStamp); + } +#endif // WITH_PROCESS_EVENT } } else if (OC_SERVER == entry->mode) @@ -655,8 +669,33 @@ void OCProcessKeepAlive() OIC_LOG(DEBUG, TAG, "Server does not receive a PUT/POST request."); OCSendDisconnectMessage(entry); } +#ifdef WITH_PROCESS_EVENT + else + { + nextPingMicroSeconds = + (entry->interval * KEEPALIVE_RESPONSE_TIMEOUT_SEC * USECS_PER_SEC) - + (currentTime - entry->timeStamp); + } +#endif // WITH_PROCESS_EVENT + } + +#ifdef WITH_PROCESS_EVENT + if (nextEventTime && (nextPingMicroSeconds < UINT64_MAX)) + { + uint32_t nextPingMiliSeconds = (uint32_t)(nextPingMicroSeconds / US_PER_MS); + uint32_t nextPingMicroRemain = (uint32_t)(nextPingMicroSeconds % US_PER_MS); + if (nextPingMicroRemain > 0) + { + nextPingMiliSeconds += 1; // To round up the remaining microsecond. + } + + if (nextPingMiliSeconds < *nextEventTime) + { + *nextEventTime = nextPingMiliSeconds; + } + } +#endif // WITH_PROCESS_EVENT } - } } oc_mutex_unlock(g_mutexObjectList); } @@ -744,6 +783,9 @@ KeepAliveEntry_t *OCAddKeepAliveEntry(const CAEndpoint_t *endpoint, OCMode mode, entry->interval = interval; LL_APPEND(g_keepAliveConnectionTable, entry); +#ifdef WITH_PROCESS_EVENT + OCSendProcessEventSignal(); +#endif // WITH_PROCESS_EVENT return entry; } diff --git a/resource/include/InProcClientWrapper.h b/resource/include/InProcClientWrapper.h old mode 100644 new mode 100755 index 05b1472..95143d6 --- a/resource/include/InProcClientWrapper.h +++ b/resource/include/InProcClientWrapper.h @@ -30,6 +30,9 @@ #include #include #include +#ifdef WITH_PROCESS_EVENT +#include "ocevent.h" +#endif namespace OC { @@ -273,6 +276,9 @@ namespace OC private: PlatformConfig m_cfg; +#ifdef WITH_PROCESS_EVENT + oc_event m_processEvent; +#endif }; } diff --git a/resource/include/InProcServerWrapper.h b/resource/include/InProcServerWrapper.h old mode 100644 new mode 100755 index b81a73a..37b94e9 --- a/resource/include/InProcServerWrapper.h +++ b/resource/include/InProcServerWrapper.h @@ -25,6 +25,9 @@ #include #include +#ifdef WITH_PROCESS_EVENT +#include "ocevent.h" +#endif namespace OC { @@ -99,6 +102,9 @@ namespace OC bool m_threadRun; std::weak_ptr m_csdkLock; PlatformConfig m_cfg; +#ifdef WITH_PROCESS_EVENT + oc_event m_processEvent; +#endif }; } diff --git a/resource/src/InProcClientWrapper.cpp b/resource/src/InProcClientWrapper.cpp old mode 100644 new mode 100755 index 75b7b46..0dfc3b6 --- a/resource/src/InProcClientWrapper.cpp +++ b/resource/src/InProcClientWrapper.cpp @@ -79,6 +79,15 @@ namespace OC if (false == m_threadRun) { m_threadRun = true; +#ifdef WITH_PROCESS_EVENT + m_processEvent = oc_event_new(); + if (!m_processEvent) + { + OIC_LOG(INFO, TAG, "oc_event_new failed!"); + return OC_STACK_ERROR; + } + OCRegisterProcessEvent(m_processEvent); +#endif m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this); } } @@ -92,9 +101,23 @@ namespace OC if (m_threadRun && m_listeningThread.joinable()) { m_threadRun = false; +#ifdef WITH_PROCESS_EVENT + if (m_processEvent) + { + oc_event_signal(m_processEvent); + } +#endif m_listeningThread.join(); } +#ifdef WITH_PROCESS_EVENT + if (m_processEvent) + { + oc_event_free(m_processEvent); + m_processEvent = NULL; + } +#endif + // only stop if we are the ones who actually called 'start'. We are counting // on the server to do the stop. if (m_cfg.mode == ModeType::Client) @@ -114,11 +137,18 @@ namespace OC while(m_threadRun) { OCStackResult result; +#ifdef WITH_PROCESS_EVENT + uint32_t nextEventTime; +#endif auto cLock = m_csdkLock.lock(); if (cLock) { std::lock_guard lock(*cLock); +#ifdef WITH_PROCESS_EVENT + result = OCProcessEvent(&nextEventTime); +#else result = OCProcess(); +#endif } else { @@ -130,8 +160,12 @@ namespace OC // TODO: do something with result if failed? } +#ifdef WITH_PROCESS_EVENT + oc_event_wait_for(m_processEvent, nextEventTime); +#else // To minimize CPU utilization we may wish to do this with sleep std::this_thread::sleep_for(std::chrono::milliseconds(10)); +#endif } } diff --git a/resource/src/InProcServerWrapper.cpp b/resource/src/InProcServerWrapper.cpp old mode 100644 new mode 100755 index 6bc0bae..f36a1f3 --- a/resource/src/InProcServerWrapper.cpp +++ b/resource/src/InProcServerWrapper.cpp @@ -260,7 +260,7 @@ namespace OC std::weak_ptr csdkLock, PlatformConfig cfg) : m_threadRun(false), m_csdkLock(csdkLock), - m_cfg { cfg } + m_cfg { cfg } { start(); } @@ -303,6 +303,15 @@ namespace OC if (false == m_threadRun) { m_threadRun = true; +#ifdef WITH_PROCESS_EVENT + m_processEvent = oc_event_new(); + if (!m_processEvent) + { + OIC_LOG(INFO, TAG, "oc_event_new failed!"); + return OC_STACK_ERROR; + } + OCRegisterProcessEvent(m_processEvent); +#endif m_processThread = std::thread(&InProcServerWrapper::processFunc, this); } return OC_STACK_OK; @@ -315,9 +324,23 @@ namespace OC if(m_processThread.joinable()) { m_threadRun = false; +#ifdef WITH_PROCESS_EVENT + if (m_processEvent) + { + oc_event_signal(m_processEvent); + } +#endif m_processThread.join(); } +#ifdef WITH_PROCESS_EVENT + if (m_processEvent) + { + oc_event_free(m_processEvent); + m_processEvent = NULL; + } +#endif + OCStackResult res = OCStop(); if (OC_STACK_OK != res) @@ -330,6 +353,9 @@ namespace OC void InProcServerWrapper::processFunc() { +#ifdef WITH_PROCESS_EVENT + uint32_t nextEventTime; +#endif auto cLock = m_csdkLock.lock(); while(cLock && m_threadRun) { @@ -337,7 +363,11 @@ namespace OC { std::lock_guard lock(*cLock); +#ifdef WITH_PROCESS_EVENT + result = OCProcessEvent(&nextEventTime); +#else result = OCProcess(); +#endif } if(OC_STACK_ERROR == result) @@ -346,7 +376,11 @@ namespace OC // ...the value of variable result is simply ignored for now. } + #ifdef WITH_PROCESS_EVENT + oc_event_wait_for(m_processEvent, nextEventTime); +#else std::this_thread::sleep_for(std::chrono::milliseconds(10)); +#endif } }