help_vars.Add(PathVariable('TIZENRT_OS_DIR', 'Absolute Path to TizenRT OS directory', None, PathVariable.PathAccept))
help_vars.Add(EnumVariable('PLATFORM_TLS', 'Use platform tls instead of local mbedtls', '0', allowed_values=('0', '1')))
help_vars.Add(EnumVariable('OIC_SUPPORT_TIZEN_TRACE', 'Tizen Trace(T-trace) api availability', 'False', allowed_values=('True', 'False')))
+help_vars.Add(EnumVariable('WITH_PROCESS_EVENT','Build including procee event logics in ocstack', 'False', allowed_values=('True', 'False')))
AddOption('--prefix',
dest='prefix',
type='string',
if env.get('DISABLE_BLE_SERVER'):
defines.append('-DDISABLE_BLE_SERVER=1')
+if env.get('WITH_PROCESS_EVENT'):
+ env.AppendUnique(CPPDEFINES=['WITH_PROCESS_EVENT'])
+
libs = []
if env.get('SECURED') == '1':
defines.append('-D__WITH_DTLS__=1')
os.path.join(Dir('.').abspath, 'oic_string', 'include'),
os.path.join(Dir('.').abspath, 'oic_time', 'include'),
os.path.join(Dir('.').abspath, 'ocrandom', 'include'),
- os.path.join(Dir('.').abspath, 'octhread', 'include')
+ os.path.join(Dir('.').abspath, 'octhread', 'include'),
+ os.path.join(Dir('.').abspath, 'ocevent', 'include')
])
if target_os == 'tizen':
else:
common_src.append('octhread/src/noop/octhread.c')
+if target_os in ['windows']:
+ common_src.append('ocevent/src/windows/ocevent.c')
+else:
+ common_src.append('ocevent/src/others/ocevent.c')
+
commonlib = common_env.StaticLibrary('c_common', common_src)
common_env.InstallTarget(commonlib, 'c_common')
common_env.UserInstallTargetLib(commonlib, 'c_common')
--- /dev/null
+//******************************************************************
+//
+// Copyright 2016 Microsoft
+//
+//-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+//******************************************************************
+
+/**
+ * @file
+ *
+ * This file contains debug helpers.
+ */
+
+#ifndef IOTIVITY_DEBUG_H_
+#define IOTIVITY_DEBUG_H_
+
+#include <assert.h>
+
+// Macro used to avoid the need for a local variable just for an assert. Using
+// a local variable just for assert, instead of this macro, can cause compiler
+// warnings on NDEBUG builds. Example: use OC_VERIFY(foo() == 0) instead of
+// {int local = foo(); assert(local == 0);}
+#if defined(NDEBUG)
+#define OC_VERIFY(condition) ((void)(condition))
+#else
+#define OC_VERIFY(condition) assert(condition)
+#endif
+
+#endif // #ifndef IOTIVITY_DEBUG_H_
--- /dev/null
+/* *****************************************************************
+ *
+ * Copyright 2017 Microsoft
+ *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ ******************************************************************/
+
+/**
+ * @file
+ *
+ * This file defines the event object.
+ */
+
+#ifndef OC_EVENT_H_
+#define OC_EVENT_H_
+
+#include "octhread.h"
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif /* __cplusplus */
+
+typedef struct oc_event_t *oc_event;
+
+/**
+ * Creates a new event.
+ *
+ * @return Reference to newly created event, NULL on allocation failure.
+ */
+oc_event oc_event_new(void);
+
+/**
+ * Frees the oc_event.
+ *
+ * @param[in] event Optional event to be freed.
+ */
+void oc_event_free(oc_event event);
+
+/**
+ * Waits infinitely for the event to be signaled.
+ *
+ * @param[in] event Event to wait on.
+ */
+void oc_event_wait(oc_event event);
+
+/**
+ * Waits for the event to be signaled or timed out.
+ *
+ * @param[in] event Event to wait on.
+ * @param[in] milliseconds Timeout in milliseconds.
+ * @return OC_WAIT_SUCCESS Event was signaled before timeout expired.
+ * OC_WAIT_TIMEDOUT Timeout interval elapsed.
+ */
+OCWaitResult_t oc_event_wait_for(oc_event event, uint32_t milliseconds);
+
+/**
+ * Signals the event.
+ *
+ * @param[in] event Event to signal.
+ */
+void oc_event_signal(oc_event event);
+
+#ifdef __cplusplus
+} /* extern "C" */
+#endif /* __cplusplus */
+
+#endif /* OC_EVENT_H_ */
--- /dev/null
+/* *****************************************************************
+ *
+ * Copyright 2017 Microsoft
+ *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ ******************************************************************/
+
+/**
+ * @file
+ * This file implements Event object for allowing threads to wait on.
+ */
+
+#include "ocevent.h"
+#include "oic_malloc.h"
+#include "oic_time.h"
+#include "octhread.h"
+#include "logger.h"
+#include "iotivity_debug.h"
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stdlib.h>
+
+/**
+ * TAG
+ * Logging tag for module name
+ */
+#define TAG "OIC_EVENT"
+
+typedef struct oc_event_t
+{
+ /* Mutex for protecting the members. */
+ oc_mutex mutex;
+ /* The conditional variable to wait on. */
+ oc_cond cond;
+ /* Whether the event is signaled. */
+ bool signaled;
+} oc_event_t;
+
+oc_event oc_event_new(void)
+{
+ oc_event event = (oc_event)OICCalloc(1, sizeof(oc_event_t));
+ if (!event)
+ {
+ OIC_LOG(ERROR, TAG, "Failed to allocate oc_event");
+ return NULL;
+ }
+
+ event->mutex = oc_mutex_new();
+ event->cond = oc_cond_new();
+ event->signaled = false;
+
+ if (!event->mutex || !event->cond)
+ {
+ oc_event_free(event);
+ return NULL;
+ }
+
+ return event;
+}
+
+void oc_event_free(oc_event event)
+{
+ if (event)
+ {
+ oc_mutex_free(event->mutex);
+ oc_cond_free(event->cond);
+ OICFree(event);
+ }
+}
+
+void oc_event_wait(oc_event event)
+{
+ OC_VERIFY(OC_WAIT_SUCCESS == oc_event_wait_for(event, UINT32_MAX));
+}
+
+OCWaitResult_t oc_event_wait_for(oc_event event, uint32_t milliseconds)
+{
+ bool timedOut = false;
+ oc_mutex_assert_owner(event->mutex, false);
+ oc_mutex_lock(event->mutex);
+
+ if (!event->signaled)
+ {
+ if (0 != milliseconds)
+ {
+ const uint64_t startTime = OICGetCurrentTime(TIME_IN_MS);
+ uint64_t remaining = milliseconds;
+ // This while loop is to filter spurious wakeups caused by conditional variable.
+ while (!event->signaled)
+ {
+ oc_mutex_assert_owner(event->mutex, true);
+ OCWaitResult_t waitResult = oc_cond_wait_for(event->cond, event->mutex,
+ (remaining * US_PER_MS));
+ if (OC_WAIT_TIMEDOUT == waitResult)
+ {
+ timedOut = true;
+ break;
+ }
+ assert(OC_WAIT_SUCCESS == waitResult);
+
+ // Not timed out, see if the event is in signaled state and reset it.
+ if (event->signaled)
+ {
+ timedOut = false;
+ break;
+ }
+
+ // Not timed out and not signaled => spurious wakeup, see if we ran out of time.
+ const uint64_t elapsed = (OICGetCurrentTime(TIME_IN_MS) - startTime);
+ if (elapsed >= (uint64_t)milliseconds)
+ {
+ timedOut = true;
+ break;
+ }
+
+ // Encountered spurious wakeup and still has time to wait, recalculate the
+ // remaining time and wait again.
+ // Spurious wakeup: depending on the platform, waiting on a Condition Variable can
+ // occasionally (though rarely) return from the wait state even when the condition
+ // isn't set, so we always need to revalidate the state in a loop here.
+ remaining = (uint64_t)milliseconds - elapsed;
+ }
+ }
+ else
+ {
+ // Zero timeout provided and the event has not been signaled.
+ timedOut = true;
+ }
+ }
+ oc_mutex_assert_owner(event->mutex, true);
+ event->signaled = false;
+ oc_mutex_unlock(event->mutex);
+ return timedOut ? OC_WAIT_TIMEDOUT : OC_WAIT_SUCCESS;
+}
+
+void oc_event_signal(oc_event event)
+{
+ oc_mutex_assert_owner(event->mutex, false);
+ oc_mutex_lock(event->mutex);
+ if (!event->signaled)
+ {
+ event->signaled = true;
+ oc_cond_signal(event->cond);
+ }
+ oc_mutex_unlock(event->mutex);
+}
--- /dev/null
+/* *****************************************************************
+ *
+ * Copyright 2017 Microsoft
+ *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ ******************************************************************/
+
+/**
+ * @file
+ * This file implements Event object for allowing threads to wait on.
+ */
+
+#include "ocevent.h"
+#include "oic_malloc.h"
+#include "experimental/logger.h"
+#include "iotivity_debug.h"
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stdlib.h>
+#include <windows.h>
+
+/**
+ * TAG
+ * Logging tag for module name
+ */
+#define TAG "OIC_EVENT"
+
+typedef struct oc_event_t
+{
+ /* The event handle */
+ HANDLE event;
+} oc_event_t;
+
+oc_event oc_event_new()
+{
+ oc_event event = (oc_event)OICCalloc(1, sizeof(oc_event_t));
+ if (!event)
+ {
+ OIC_LOG(ERROR, TAG, "Failed to allocate oc_event");
+ return NULL;
+ }
+
+ /* Create an auto-reset event */
+ event->event = CreateEvent(NULL, FALSE, FALSE, NULL);
+ if (!event->event)
+ {
+ oc_event_free(event);
+ return NULL;
+ }
+
+ return event;
+}
+
+void oc_event_free(oc_event event)
+{
+ if (event && event->event)
+ {
+ OC_VERIFY(0 != CloseHandle(event->event));
+ }
+ OICFree(event);
+}
+
+void oc_event_wait(oc_event event)
+{
+ OC_VERIFY(OC_WAIT_SUCCESS == WaitForSingleObject(event->event, INFINITE));
+}
+
+OCWaitResult_t oc_event_wait_for(oc_event event, uint32_t milliseconds)
+{
+ DWORD waitResult = WaitForSingleObject(event->event, milliseconds);
+ switch (waitResult)
+ {
+ case WAIT_OBJECT_0:
+ return OC_WAIT_SUCCESS;
+ case WAIT_TIMEOUT:
+ return OC_WAIT_TIMEDOUT;
+ default:
+ return OC_WAIT_INVAL;
+ }
+}
+
+void oc_event_signal(oc_event event)
+{
+ OC_VERIFY(0 != SetEvent(event->event));
+}
\ No newline at end of file
--- /dev/null
+#******************************************************************
+#
+# Copyright 2017 Microsoft
+#
+#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+
+import os
+import os.path
+from tools.scons.RunTest import *
+
+Import('test_env')
+
+eventtests_env = test_env.Clone()
+target_os = eventtests_env.get('TARGET_OS')
+
+######################################################################
+# Build flags
+######################################################################
+eventtests_env.PrependUnique(CPPPATH=['#resource/c_common/ocevent/include'])
+
+eventtests_env.AppendUnique(LIBPATH=[eventtests_env.get('BUILD_DIR')])
+eventtests_env.Append(LIBS=['logger'])
+
+if eventtests_env.get('LOGGING'):
+ eventtests_env.AppendUnique(CPPDEFINES=['TB_LOG'])
+
+######################################################################
+# Source files and Targets
+######################################################################
+eventtests = eventtests_env.Program('eventtests', ['eventtest.cpp'])
+
+Alias("test", [eventtests])
+
+eventtests_env.AppendTarget('test')
+if eventtests_env.get('TEST') == '1':
+ if target_os in ['linux', 'windows']:
+ run_test(eventtests_env,
+ 'resource_c_common_event_test.memcheck',
+ 'resource/c_common/ocevent/test/eventtests')
--- /dev/null
+/* *****************************************************************
+ *
+ * Copyright 2017 Microsoft
+ *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ ******************************************************************/
+
+/**
+ * @file
+ *
+ * This file implement tests for the Event object.
+ */
+
+#include "ocevent.h"
+#include "gtest/gtest.h"
+#include <limits.h>
+#include <thread>
+
+class EventTester : public testing::Test
+{
+ protected:
+ virtual void SetUp()
+ {
+ m_event = oc_event_new();
+ ASSERT_TRUE(nullptr != m_event);
+ }
+ virtual void TearDown()
+ {
+ oc_event_free(m_event);
+ }
+ void TestSignalBeforeWait(uint32_t timeout);
+ oc_event m_event;
+};
+
+void EventTester::TestSignalBeforeWait(uint32_t timeout)
+{
+ std::thread thread([this]()
+ {
+ oc_event_signal(m_event);
+ });
+
+ // Make sure the signal occurs before the wait.
+ thread.join();
+
+ if (UINT_MAX == timeout)
+ {
+ oc_event_wait(m_event);
+ }
+ else
+ {
+ EXPECT_EQ(OC_WAIT_SUCCESS, oc_event_wait_for(m_event, timeout));
+ }
+}
+
+TEST_F(EventTester, InfiniteTimeout_SignaledAfterWait)
+{
+ std::thread thread([this]()
+ {
+ // This sleep allows the main thread to enter the wait state before the signal.
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ oc_event_signal(m_event);
+ });
+ oc_event_wait(m_event);
+ thread.join();
+}
+
+TEST_F(EventTester, InfiniteTimeout_SignaledBeforeWait)
+{
+ TestSignalBeforeWait(UINT_MAX);
+}
+
+TEST_F(EventTester, ZeroTimeout_NotSignaled)
+{
+ EXPECT_EQ(OC_WAIT_TIMEDOUT, oc_event_wait_for(m_event, 0));
+}
+
+TEST_F(EventTester, ZeroTimeout_Signaled)
+{
+ TestSignalBeforeWait(0);
+}
+
+TEST_F(EventTester, TimedOut)
+{
+ EXPECT_EQ(OC_WAIT_TIMEDOUT, oc_event_wait_for(m_event, 10));
+}
{
#endif /* __cplusplus */
+/**
+ * Value used for the owner field of an oc_mutex that doesn't have an owner.
+ */
+#define OC_INVALID_THREAD_ID 0
+
typedef struct oc_mutex_internal *oc_mutex;
typedef struct oc_cond_internal *oc_cond;
typedef struct oc_thread_internal *oc_thread;
bool oc_mutex_free(oc_mutex mutex);
/**
+ * On Debug builds, assert that the current thread owns or does not own a mutex.
+ *
+ * This function has no effect on Release builds.
+ *
+ * @param[in] mutex The mutex to assert on.
+ * @param[in] currentThreadIsOwner true if the current thread is expected to
+ * be the mutex owner, false otherwise.
+ *
+ */
+void oc_mutex_assert_owner(const oc_mutex mutex, bool currentThreadIsOwner);
+
+/**
* Creates new condition.
*
* @return Reference to newly created oc_cond, otherwise NULL.
return;
}
+void oc_mutex_assert_owner(const oc_mutex mutex, bool currentThreadIsOwner)
+{
+ return;
+}
+
oc_cond oc_cond_new(void)
{
return (oc_cond)&g_condInfo;
typedef struct _tagMutexInfo_t
{
pthread_mutex_t mutex;
+ pthread_t owner;
+ uint32_t recursionCount;
} oc_mutex_internal;
typedef struct _tagEventInfo_t
pthread_attr_t threadattr;
} oc_thread_internal;
+static pthread_t oc_get_current_thread_id()
+{
+ pthread_t id = pthread_self();
+ assert(OC_INVALID_THREAD_ID != id);
+ return id;
+}
+
#ifndef __TIZENRT__
OCThreadResult_t oc_thread_new(oc_thread *t, void *(*start_routine)(void *), void *arg)
#else
}
}
+void oc_mutex_assert_owner(const oc_mutex mutex, bool currentThreadIsOwner)
+{
+ assert(NULL != mutex);
+ const oc_mutex_internal *mutexInfo = (const oc_mutex_internal*) mutex;
+
+ pthread_t currentThreadID = oc_get_current_thread_id();
+ if (currentThreadIsOwner)
+ {
+ assert(pthread_equal(mutexInfo->owner, currentThreadID));
+ assert(mutexInfo->recursionCount != 0);
+ }
+ else
+ {
+ assert(!pthread_equal(mutexInfo->owner, currentThreadID));
+ }
+}
+
oc_cond oc_cond_new(void)
{
oc_cond retVal = NULL;
typedef struct _tagMutexInfo_t
{
CRITICAL_SECTION mutex;
+ DWORD owner;
+ uint32_t recursionCount;
} oc_mutex_internal;
+static DWORD oc_get_current_thread_id()
+{
+ DWORD id = GetCurrentThreadId();
+ assert(OC_INVALID_THREAD_ID != id);
+ return id;
+}
+
typedef struct _tagEventInfo_t
{
CONDITION_VARIABLE cond;
}
}
+void oc_mutex_assert_owner(const oc_mutex mutex, bool currentThreadIsOwner)
+{
+ assert(NULL != mutex);
+ const oc_mutex_internal *mutexInfo = (const oc_mutex_internal*) mutex;
+
+ DWORD currentThreadID = oc_get_current_thread_id();
+ if (currentThreadIsOwner)
+ {
+ assert(mutexInfo->owner == currentThreadID);
+ assert(mutexInfo->recursionCount != 0);
+ }
+ else
+ {
+ assert(mutexInfo->owner != currentThreadID);
+ }
+}
+
oc_cond oc_cond_new(void)
{
oc_cond retVal = NULL;
#define CA_COMMON_H_
#include "iotivity_config.h"
+#ifdef WITH_PROCESS_EVENT
+#include "ocevent.h"
+#endif
#ifndef WITH_ARDUINO
#ifdef TCP_ADAPTER
CAResult_t CASetProxyUri(const char *uri);
#endif
+#ifdef WITH_PROCESS_EVENT
+void CARegisterProcessEvent(oc_event event);
+#endif
+
#ifdef __cplusplus
} /* extern "C" */
#endif
void CAAddDataToReceiveThread(CAData_t *data);
#endif
+#ifdef WITH_PROCESS_EVENT
+void CARegisterMessageProcessEvent(oc_event event);
+#endif
+
#ifdef __cplusplus
} /* extern "C" */
#endif
CATCPSetKeepAliveCallbacks(ConnHandler);
}
#endif
+
+#ifdef WITH_PROCESS_EVENT
+void CARegisterProcessEvent(oc_event event)
+{
+ CARegisterMessageProcessEvent(event);
+}
+#endif // WITH_PROCESS_EVENT
\ No newline at end of file
static CAQueueingThread_t g_sendThread;
static CAQueueingThread_t g_receiveThread;
+#ifdef WITH_PROCESS_EVENT
+static oc_event g_processEvent = NULL;
+#endif // WITH_PROCESS_EVENT
+
#else
#define CA_MAX_RT_ARRAY_SIZE 3
#endif // SINGLE_THREAD
// add thread
CAQueueingThreadAddData(&g_receiveThread, data, sizeof(CAData_t));
+
+#ifdef WITH_PROCESS_EVENT
+ if (g_processEvent)
+ {
+ oc_event_signal(g_processEvent);
+ }
+#endif
}
#endif
CAProcessReceivedData(cadata);
#else
CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
-#endif
+
+#ifdef WITH_PROCESS_EVENT
+ if (g_processEvent)
+ {
+ oc_event_signal(g_processEvent);
+ }
+#endif//WITH_PROCESS_EVENT
+#endif// SINGLE_THREAD
}
static void CADestroyData(void *data, uint32_t size)
if (CA_NOT_SUPPORTED == res || CA_REQUEST_TIMEOUT == res)
{
OIC_LOG(DEBUG, TAG, "this message does not have block option");
- CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+ CAAddDataToReceiveThread(cadata);
}
else
{
#endif
{
CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+
+#ifdef WITH_PROCESS_EVENT
+ if (g_processEvent)
+ {
+ oc_event_signal(g_processEvent);
+ }
+#endif
}
#endif // SINGLE_THREAD
}
}
+static u_queue_message_t *get_receive_queue_item(void)
+{
+ u_queue_message_t *item = NULL;
+
+ oc_mutex_lock(g_receiveThread.threadMutex);
+ item = u_queue_get_element(g_receiveThread.dataQueue);
+ oc_mutex_unlock(g_receiveThread.threadMutex);
+
+ return item;
+}
+
+
void CAHandleRequestResponseCallbacks()
{
#ifdef SINGLE_THREAD
// #1 parse the data
// #2 get endpoint
- oc_mutex_lock(g_receiveThread.threadMutex);
-
- u_queue_message_t *item = u_queue_get_element(g_receiveThread.dataQueue);
-
- oc_mutex_unlock(g_receiveThread.threadMutex);
+ u_queue_message_t *item = NULL;
+#ifdef WITH_PROCESS_EVENT
+ while ((item = get_receive_queue_item()) != NULL)
+#else
+ if ((item = get_receive_queue_item()) != NULL)
+#endif
+ { if (NULL == item->msg)
+ {
+ OICFree(item);
+#ifdef WITH_PROCESS_EVENT
+ continue;
+#else
+ return;
+#endif
+ }
- if (NULL == item || NULL == item->msg)
- {
- return;
- }
+ // get endpoint
+ CAData_t *td = (CAData_t *) item->msg;
- // get endpoint
- CAData_t *td = (CAData_t *) item->msg;
+ if (td->requestInfo && g_requestHandler)
+ {
+ OIC_LOG_V(DEBUG, TAG, "request callback : %d", td->requestInfo->info.numOptions);
+ g_requestHandler(td->remoteEndpoint, td->requestInfo);
+ }
+ else if (td->responseInfo && g_responseHandler)
+ {
+ OIC_LOG_V(DEBUG, TAG, "response callback : %d", td->responseInfo->info.numOptions);
+ g_responseHandler(td->remoteEndpoint, td->responseInfo);
+ }
+ else if (td->errorInfo && g_errorHandler)
+ {
+ OIC_LOG_V(DEBUG, TAG, "error callback error: %d", td->errorInfo->result);
+ g_errorHandler(td->remoteEndpoint, td->errorInfo);
+ }
- if (td->requestInfo && g_requestHandler)
- {
- OIC_LOG_V(DEBUG, TAG, "request callback : %d", td->requestInfo->info.numOptions);
- g_requestHandler(td->remoteEndpoint, td->requestInfo);
- }
- else if (td->responseInfo && g_responseHandler)
- {
- OIC_LOG_V(DEBUG, TAG, "response callback : %d", td->responseInfo->info.numOptions);
- g_responseHandler(td->remoteEndpoint, td->responseInfo);
- }
- else if (td->errorInfo && g_errorHandler)
- {
- OIC_LOG_V(DEBUG, TAG, "error callback error: %d", td->errorInfo->result);
- g_errorHandler(td->remoteEndpoint, td->errorInfo);
+ CADestroyData(item->msg, sizeof(CAData_t));
+ OICFree(item);
}
-
- CADestroyData(item->msg, sizeof(CAData_t));
- OICFree(item);
-
#endif // SINGLE_HANDLE
#endif // SINGLE_THREAD
}
u_arraylist_t *list = CAGetSelectedNetworkList();
uint32_t length = u_arraylist_length(list);
+ #ifdef WITH_PROCESS_EVENT
+ g_processEvent = NULL;
+#endif
+
uint32_t i = 0;
for (i = 0; i < length; i++)
{
cadata->errorInfo->result = result;
CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+
+#ifdef WITH_PROCESS_EVENT
+ if (g_processEvent)
+ {
+ oc_event_signal(g_processEvent);
+ }
+#endif
coap_delete_pdu(pdu);
#else
(void)result;
cadata->dataType = CA_ERROR_DATA;
CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+
+#ifdef WITH_PROCESS_EVENT
+ if (g_processEvent)
+ {
+ oc_event_signal(g_processEvent);
+ }
+#endif//WITH_PROCESS_EVENT
#endif
OIC_LOG(DEBUG, TAG, "CASendErrorInfo OUT");
}
pdu->transport_hdr->udp.token_length);
}
#endif
+
+#ifdef WITH_PROCESS_EVENT
+void CARegisterMessageProcessEvent(oc_event event)
+{
+ g_processEvent = event;
+}
+#endif // WITH_PROCESS_EVENT
return result;
}
-void RMProcess()
+#ifdef WITH_PROCESS_EVENT
+static void compareAndApplyTimeout(uint32_t *nextEventTime,
+ uint64_t timeoutSec)
+{
+ uint32_t timeoutMs = timeoutSec * MS_PER_SEC;
+ if (timeoutMs < *nextEventTime)
+ {
+ *nextEventTime = timeoutMs;
+ }
+}
+
+void RMProcess(uint32_t *nextEventTime)
+#else // WITH_PROCESS_EVENT
+void RMProcess(void)
+#endif // !WITH_PROCESS_EVENT
{
if (!g_isRMInitialized)
{
result = RMSendNotificationToAll(payload);
RMPFreePayload(payload);
RM_VERIFY_SUCCESS(result, OC_STACK_OK);
+#ifdef WITH_PROCESS_EVENT
+ compareAndApplyTimeout(nextEventTime, GATEWAY_ALIVE_TIMEOUT);
+#endif // WITH_PROCESS_EVENT
+ }
+#ifdef WITH_PROCESS_EVENT
+ else
+ {
+ compareAndApplyTimeout(nextEventTime,
+ GATEWAY_ALIVE_TIMEOUT - (currentTime - g_aliveTime));
}
+#endif // WITH_PROCESS_EVENT
if (ROUTINGTABLE_VALIDATION_TIMEOUT <= currentTime - g_refreshTableTime)
{
g_refreshTableTime = currentTime;
g_isValidated = false;
u_linklist_free(&removedEntries);
+#ifdef WITH_PROCESS_EVENT
+ compareAndApplyTimeout(nextEventTime, ROUTINGTABLE_VALIDATION_TIMEOUT);
+#endif // WITH_PROCESS_EVENT
goto exit;
}
+#ifdef WITH_PROCESS_EVENT
+ else
+ {
+ compareAndApplyTimeout(nextEventTime,
+ ROUTINGTABLE_VALIDATION_TIMEOUT - (currentTime - g_refreshTableTime));
+ }
+#endif // WITH_PROCESS_EVENT
if (!g_isValidated && ROUTINGTABLE_REFRESH_TIMEOUT <= (currentTime - g_refreshTableTime))
{
g_isValidated = true;
RTMPrintTable(g_routingGatewayTable, g_routingEndpointTable);
u_linklist_free(&invalidInterfaces);
+#ifdef WITH_PROCESS_EVENT
+ compareAndApplyTimeout(nextEventTime, ROUTINGTABLE_REFRESH_TIMEOUT);
+#endif // WITH_PROCESS_EVENT
+ }
+#ifdef WITH_PROCESS_EVENT
+ else
+ {
+ compareAndApplyTimeout(nextEventTime,
+ ROUTINGTABLE_REFRESH_TIMEOUT - (currentTime - g_refreshTableTime));
}
+#endif // WITH_PROCESS_EVENT
exit:
return;
*/
void FixUpClientResponse(OCClientResponse *cr);
+#ifdef WITH_PROCESS_EVENT
+/**
+ * Send a signal to processEvent if event is registered.
+ */
+void OCSendProcessEventSignal(void);
+#endif // WITH_PROCESS_EVENT
+
#ifdef __cplusplus
}
#endif // __cplusplus
/**
* Process the KeepAlive timer to send ping message to OIC Server.
*/
+#ifdef WITH_PROCESS_EVENT
+void OCProcessKeepAlive(uint32_t *nextEventTime);
+#else // WITH_PROCESS_EVENT
void OCProcessKeepAlive();
+#endif // !WITH_PROCESS_EVENT
/**
* This API will be called from RI layer whenever there is a request for KeepAlive.
#include <stdio.h>
#include <stdint.h>
#include "octypes.h"
+#ifdef WITH_PROCESS_EVENT
+#include "ocevent.h"
+#endif
#ifdef __cplusplus
extern "C" {
*
* @return ::OC_STACK_OK on success, some other value upon failure.
*/
+
+#ifdef WITH_PROCESS_EVENT
OCStackResult OCProcess();
+OCStackResult OCProcessEvent(uint32_t *nextEventTime);
+void OCRegisterProcessEvent(oc_event event);
+#else
+OCStackResult OCProcess();
+#endif
/**
* This function discovers or Perform requests on a specified resource
int gQuitFlag = 0;
int gLightUnderObservation = 0;
+#ifdef WITH_PROCESS_EVENT
+static oc_event processEvent = NULL;
+#endif
static LightResource Light;
// This variable determines instance number of the Light resource.
if (signum == SIGINT)
{
gQuitFlag = 1;
+#ifdef WITH_PROCESS_EVENT
+ if (processEvent)
+ oc_event_signal(processEvent);
+#endif
}
}
OIC_LOG(INFO, TAG, "Device Registration failed!");
exit (EXIT_FAILURE);
}
+#ifdef WITH_PROCESS_EVENT
+ processEvent = oc_event_new();
+ if (!processEvent)
+ {
+ OIC_LOG(INFO, TAG, "oc_event_new failed!");
+ exit (EXIT_FAILURE);
+ }
+
+ OCRegisterProcessEvent(processEvent);
+#endif
/*
* Declare and create the example resource: Light
while (!gQuitFlag)
{
+#ifdef WITH_PROCESS_EVENT
+ uint32_t nextEventTime;
+ if (OCProcessEvent(&nextEventTime) != OC_STACK_OK)
+ {
+ OIC_LOG(ERROR, TAG, "OCStack process error");
+ break;
+ }
+ oc_event_wait_for(processEvent, nextEventTime);
+#else
if (OCProcess() != OC_STACK_OK)
{
OIC_LOG(ERROR, TAG, "OCStack process error");
return 0;
}
+#endif
}
if (observeThreadStarted)
OIC_LOG(INFO, TAG, "Exiting ocserver main loop...");
+#ifdef WITH_PROCESS_EVENT
+ if (processEvent)
+ {
+ oc_event_free(processEvent);
+ processEvent = NULL;
+ }
+#endif
+
if (OCStop() != OC_STACK_OK)
{
OIC_LOG(ERROR, TAG, "OCStack process error");
static OCOtmEventHandler_t g_otmEventHandler = {NULL, NULL};
#endif
+#ifdef WITH_PROCESS_EVENT
+static oc_event g_ocProcessEvent = NULL;
+#endif // WITH_PROCESS_EVENT
+
//-----------------------------------------------------------------------------
// Macros
//-----------------------------------------------------------------------------
{
*handle = resHandle;
}
+#ifdef WITH_PROCESS_EVENT
+ OCSendProcessEventSignal();
+#endif // WITH_PROCESS_EVENT
goto exit;
}
#ifdef WITH_PRESENCE
-OCStackResult OCProcessPresence()
+#ifdef WITH_PROCESS_EVENT
+OCStackResult OCProcessPresence(uint32_t *nextEventTime)
+#else // WITH_PROCESS_EVENT
+OCStackResult OCProcessPresence(void)
+#endif // !WITH_PROCESS_EVENT
{
OCStackResult result = OC_STACK_OK;
continue;
}
- if (now < cbNode->presence->timeOut[cbNode->presence->TTLlevel])
+ uint32_t timeout = cbNode->presence->timeOut[cbNode->presence->TTLlevel];
+ if (now < timeout)
{
+#ifdef WITH_PROCESS_EVENT
+ if (nextEventTime && (timeout - now) < *nextEventTime)
+ {
+ *nextEventTime = timeout - now;
+ }
+#endif // WITH_PROCESS_EVENT
continue;
}
}
#endif // WITH_PRESENCE
+#ifdef WITH_PROCESS_EVENT
+OCStackResult OCProcess(void)
+{
+ uint32_t nextEventTime;
+ return OCProcessEvent(&nextEventTime);
+}
+
+OCStackResult OCProcessEvent(uint32_t *nextEventTime)
+{
+ if (stackState == OC_STACK_UNINITIALIZED)
+ {
+ OIC_LOG(ERROR, TAG, "OCProcess has failed. ocstack is not initialized");
+ return OC_STACK_ERROR;
+ }
+
+ *nextEventTime = UINT32_MAX;
+
+#ifdef WITH_PRESENCE
+ OCProcessPresence(nextEventTime);
+ OIC_LOG_V(INFO, TAG, "OCProcessPresence next event time : %u", *nextEventTime);
+#endif
+ CAHandleRequestResponse();
+
+// TODO
+#ifdef ROUTING_GATEWAY
+ RMProcess(nextEventTime);
+#endif
+
+#ifdef TCP_ADAPTER
+ OCProcessKeepAlive(nextEventTime);
+ OIC_LOG_V(INFO, TAG, "OCProcessKeepAlive next event time : %u", *nextEventTime);
+#endif
+ return OC_STACK_OK;
+}
+
+void OCRegisterProcessEvent(oc_event event)
+{
+ g_ocProcessEvent = event;
+ CARegisterProcessEvent(event);
+}
+
+void OCSendProcessEventSignal(void)
+{
+ if (g_ocProcessEvent)
+ {
+ oc_event_signal(g_ocProcessEvent);
+ }
+}
+#else // WITH_PROCESS_EVENT
+
OCStackResult OCProcess()
{
if (stackState == OC_STACK_UNINITIALIZED)
#endif
return OC_STACK_OK;
}
+#endif // !WITH_PROCESS_EVENT
#ifdef WITH_PRESENCE
OCStackResult OCStartPresence(const uint32_t ttl)
void OCDefaultAdapterStateChangedHandler(CATransportAdapter_t adapter, bool enabled)
{
OIC_LOG(DEBUG, TAG, "OCDefaultAdapterStateChangedHandler");
-
+
OC_UNUSED(adapter);
OC_UNUSED(enabled);
}
return OC_EH_OK;
}
+#ifdef WITH_PROCESS_EVENT
+void OCProcessKeepAlive(uint32_t *nextEventTime)
+#else // WITH_PROCESS_EVENT
void OCProcessKeepAlive()
+#endif // !WITH_PROCESS_EVENT
{
oc_mutex_lock(g_mutexObjectList);
KeepAliveEntry_t *entry = NULL;
KeepAliveEntry_t *tmp = NULL;
LL_FOREACH_SAFE(g_keepAliveConnectionTable, entry, tmp)
{
+#ifdef WITH_PROCESS_EVENT
+ uint64_t nextPingMicroSeconds = UINT64_MAX;
+#endif // WITH_PROCESS_EVENT
if (entry)
{
uint64_t currentTime = OICGetCurrentTime(TIME_IN_US);
// Send message to disconnect session.
OCSendDisconnectMessage(entry);
}
+#ifdef WITH_PROCESS_EVENT
+ else
+ {
+ nextPingMicroSeconds = (KEEPALIVE_RESPONSE_TIMEOUT_SEC * USECS_PER_SEC) -
+ (currentTime - entry->timeStamp);
+ }
+#endif // WITH_PROCESS_EVENT
}
}
else if (OC_SERVER == entry->mode)
OIC_LOG(DEBUG, TAG, "Server does not receive a PUT/POST request.");
OCSendDisconnectMessage(entry);
}
+#ifdef WITH_PROCESS_EVENT
+ else
+ {
+ nextPingMicroSeconds =
+ (entry->interval * KEEPALIVE_RESPONSE_TIMEOUT_SEC * USECS_PER_SEC) -
+ (currentTime - entry->timeStamp);
+ }
+#endif // WITH_PROCESS_EVENT
+ }
+
+#ifdef WITH_PROCESS_EVENT
+ if (nextEventTime && (nextPingMicroSeconds < UINT64_MAX))
+ {
+ uint32_t nextPingMiliSeconds = (uint32_t)(nextPingMicroSeconds / US_PER_MS);
+ uint32_t nextPingMicroRemain = (uint32_t)(nextPingMicroSeconds % US_PER_MS);
+ if (nextPingMicroRemain > 0)
+ {
+ nextPingMiliSeconds += 1; // To round up the remaining microsecond.
+ }
+
+ if (nextPingMiliSeconds < *nextEventTime)
+ {
+ *nextEventTime = nextPingMiliSeconds;
+ }
+ }
+#endif // WITH_PROCESS_EVENT
}
- }
}
oc_mutex_unlock(g_mutexObjectList);
}
entry->interval = interval;
LL_APPEND(g_keepAliveConnectionTable, entry);
+#ifdef WITH_PROCESS_EVENT
+ OCSendProcessEventSignal();
+#endif // WITH_PROCESS_EVENT
return entry;
}
#include <IClientWrapper.h>
#include <InitializeException.h>
#include <ResourceInitException.h>
+#ifdef WITH_PROCESS_EVENT
+#include "ocevent.h"
+#endif
namespace OC
{
private:
PlatformConfig m_cfg;
+#ifdef WITH_PROCESS_EVENT
+ oc_event m_processEvent;
+#endif
};
}
#include <mutex>
#include <IServerWrapper.h>
+#ifdef WITH_PROCESS_EVENT
+#include "ocevent.h"
+#endif
namespace OC
{
bool m_threadRun;
std::weak_ptr<std::recursive_mutex> m_csdkLock;
PlatformConfig m_cfg;
+#ifdef WITH_PROCESS_EVENT
+ oc_event m_processEvent;
+#endif
};
}
if (false == m_threadRun)
{
m_threadRun = true;
+#ifdef WITH_PROCESS_EVENT
+ m_processEvent = oc_event_new();
+ if (!m_processEvent)
+ {
+ OIC_LOG(INFO, TAG, "oc_event_new failed!");
+ return OC_STACK_ERROR;
+ }
+ OCRegisterProcessEvent(m_processEvent);
+#endif
m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
}
}
if (m_threadRun && m_listeningThread.joinable())
{
m_threadRun = false;
+#ifdef WITH_PROCESS_EVENT
+ if (m_processEvent)
+ {
+ oc_event_signal(m_processEvent);
+ }
+#endif
m_listeningThread.join();
}
+#ifdef WITH_PROCESS_EVENT
+ if (m_processEvent)
+ {
+ oc_event_free(m_processEvent);
+ m_processEvent = NULL;
+ }
+#endif
+
// only stop if we are the ones who actually called 'start'. We are counting
// on the server to do the stop.
if (m_cfg.mode == ModeType::Client)
while(m_threadRun)
{
OCStackResult result;
+#ifdef WITH_PROCESS_EVENT
+ uint32_t nextEventTime;
+#endif
auto cLock = m_csdkLock.lock();
if (cLock)
{
std::lock_guard<std::recursive_mutex> lock(*cLock);
+#ifdef WITH_PROCESS_EVENT
+ result = OCProcessEvent(&nextEventTime);
+#else
result = OCProcess();
+#endif
}
else
{
// TODO: do something with result if failed?
}
+#ifdef WITH_PROCESS_EVENT
+ oc_event_wait_for(m_processEvent, nextEventTime);
+#else
// To minimize CPU utilization we may wish to do this with sleep
std::this_thread::sleep_for(std::chrono::milliseconds(10));
+#endif
}
}
std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
: m_threadRun(false),
m_csdkLock(csdkLock),
- m_cfg { cfg }
+ m_cfg { cfg }
{
start();
}
if (false == m_threadRun)
{
m_threadRun = true;
+#ifdef WITH_PROCESS_EVENT
+ m_processEvent = oc_event_new();
+ if (!m_processEvent)
+ {
+ OIC_LOG(INFO, TAG, "oc_event_new failed!");
+ return OC_STACK_ERROR;
+ }
+ OCRegisterProcessEvent(m_processEvent);
+#endif
m_processThread = std::thread(&InProcServerWrapper::processFunc, this);
}
return OC_STACK_OK;
if(m_processThread.joinable())
{
m_threadRun = false;
+#ifdef WITH_PROCESS_EVENT
+ if (m_processEvent)
+ {
+ oc_event_signal(m_processEvent);
+ }
+#endif
m_processThread.join();
}
+#ifdef WITH_PROCESS_EVENT
+ if (m_processEvent)
+ {
+ oc_event_free(m_processEvent);
+ m_processEvent = NULL;
+ }
+#endif
+
OCStackResult res = OCStop();
if (OC_STACK_OK != res)
void InProcServerWrapper::processFunc()
{
+#ifdef WITH_PROCESS_EVENT
+ uint32_t nextEventTime;
+#endif
auto cLock = m_csdkLock.lock();
while(cLock && m_threadRun)
{
{
std::lock_guard<std::recursive_mutex> lock(*cLock);
+#ifdef WITH_PROCESS_EVENT
+ result = OCProcessEvent(&nextEventTime);
+#else
result = OCProcess();
+#endif
}
if(OC_STACK_ERROR == result)
// ...the value of variable result is simply ignored for now.
}
+ #ifdef WITH_PROCESS_EVENT
+ oc_event_wait_for(m_processEvent, nextEventTime);
+#else
std::this_thread::sleep_for(std::chrono::milliseconds(10));
+#endif
}
}