From b024d6c4d2e9742a3bb2dbe31833dffa1cc7efe3 Mon Sep 17 00:00:00 2001 From: Ji-hoon Lee Date: Wed, 30 Nov 2022 18:05:43 +0900 Subject: [PATCH] Reduce blocking time when streaming audio data When the CPU is busy handling many tasks, there are times that the streaming thread gets blocked while calling some undelying platform API functions. To avoid this problem, applied several methods for reducing the blocking time and added log messages for displaying time information. List of methods used for reducing the blocking time: 1. Cache appid / pid information that is acquired by calling AUL API functions, since they sometimes do not return immediately and take a while to complete. 2. Instead of passing a pointer to the shared audio data into the message transmission module while holding the lock to protect the shared audio data, make a copy of audio data and release the lock immediately so that the copy of audio data can be used without the risk of race condition. This is because sometimes the message transmission also takes a bit of time to complete, and the other threads waiting for the lock cannot proceed in such cases. Change-Id: I6c6959fadddbc557dbb77849fa25ce4949155f4b --- inc/service_common.h | 3 + .../wakeup-manager/src/wakeup_audio_manager.cpp | 77 ++++++++++++++++++++-- src/application_manager_aul.cpp | 56 +++++++++++++--- src/service_ipc_dbus.cpp | 71 ++++++++++++++++++++ src/service_plugin.cpp | 45 +++++++++++++ 5 files changed, 237 insertions(+), 15 deletions(-) diff --git a/inc/service_common.h b/inc/service_common.h index 2a3ff4f..58bbf7c 100644 --- a/inc/service_common.h +++ b/inc/service_common.h @@ -6,6 +6,9 @@ #include #include +// Enable this macro for profiling with detailed timeinfo +// #define PRINT_DETAILED_TIMEINFO + #ifdef LOG_TAG #undef LOG_TAG #endif diff --git a/plugins/wakeup-manager/src/wakeup_audio_manager.cpp b/plugins/wakeup-manager/src/wakeup_audio_manager.cpp index b05fcc8..8bab1d6 100644 --- a/plugins/wakeup-manager/src/wakeup_audio_manager.cpp +++ b/plugins/wakeup-manager/src/wakeup_audio_manager.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -23,6 +24,11 @@ namespace wakeup /* Need to check whether this value needs to be configurable */ static int g_speech_pcm_wait_count = 800; +#ifdef PRINT_DETAILED_TIMEINFO +static atomic_int g_mutex_wait_index{0}; +static map g_mutex_wait_time; +#endif + static long long get_current_milliseconds_after_epoch() { auto now = chrono::steady_clock::now(); @@ -33,6 +39,17 @@ static long long get_current_milliseconds_after_epoch() return value.count(); } +#ifdef PRINT_DETAILED_TIMEINFO +static std::string get_current_milliseconds_after_epoch(chrono::time_point point) +{ + auto now_ms = chrono::time_point_cast(point); + /* number of milliseconds since the epoch of system_clock */ + auto value = now_ms.time_since_epoch(); + + return std::to_string(value.count()); +} +#endif + CAudioManager::CAudioManager() { } @@ -253,6 +270,9 @@ void CAudioManager::streaming_audio_data_thread_func(long long start_time) lock.unlock(); while (!(mStopStreamingThread.load())) { + const size_t SEND_BUFFER_SIZE = 4096; + unsigned char send_buffer[SEND_BUFFER_SIZE]; + int ret = -1; int cnt = 0; @@ -318,14 +338,37 @@ void CAudioManager::streaming_audio_data_thread_func(long long start_time) if (lead != end) { iter = lead; - mas_speech_data& speech_data = iter->data; + + mas_speech_data speech_data = iter->data; + size_t len = speech_data.len; + if (len > SEND_BUFFER_SIZE) { + LOGE("ERROR : SPEECH DATA contains data bigger than the buffer size : %d, truncating", len); + len = SEND_BUFFER_SIZE; + } + memcpy(send_buffer, speech_data.buffer, len); + speech_data.buffer = send_buffer; + speech_data.len = len; + lock.unlock(); + for (const auto& observer : observers) { if (observer) { validate_audio_data_event_field(speech_data); + +#ifdef PRINT_DETAILED_TIMEINFO + auto started = std::chrono::steady_clock::now(); +#endif if (!observer->on_streaming_audio_data( speech_data.event, speech_data.buffer, speech_data.len)) { LOGE("[Recorder WARNING] One of the observer returned false"); } +#ifdef PRINT_DETAILED_TIMEINFO + auto finished = std::chrono::steady_clock::now(); + auto interval = finished - started; + long long int count = static_cast( + chrono::duration_cast(interval).count()); + int index = g_mutex_wait_index; + g_mutex_wait_time[index] += count; +#endif } } @@ -336,8 +379,9 @@ void CAudioManager::streaming_audio_data_thread_func(long long start_time) } advance(lead, 1); + } else { + lock.unlock(); } - lock.unlock(); } if (true != finish_event_sent) { @@ -359,8 +403,29 @@ void CAudioManager::streaming_audio_data_thread_func(long long start_time) MWR_LOGE("[EXIT]"); } +#ifdef PRINT_DETAILED_TIMEINFO +static void print_duration(std::chrono::time_point started) +{ + const std::chrono::milliseconds threshold(100); + auto finished = std::chrono::steady_clock::now(); + auto interval = finished - started; + if (interval > threshold) { + long long int count = static_cast( + std::chrono::duration_cast(interval).count()); + int index = g_mutex_wait_index; + MAS_LOGE("Mutex wait time : %d %lld %lld, [%s~]", index, count, + g_mutex_wait_time[index], + get_current_milliseconds_after_epoch(started).c_str()); + } +} +#endif + void CAudioManager::add_audio_data(mas_speech_data& data, long long time) { +#ifdef PRINT_DETAILED_TIMEINFO + ++g_mutex_wait_index; +#endif + long long delta = mAudioRecordingDurationMilliseconds; bool print_log = false; @@ -416,6 +481,10 @@ void CAudioManager::add_audio_data(mas_speech_data& data, long long time) lock_guard lock(mMutex); +#ifdef PRINT_DETAILED_TIMEINFO + print_duration(now); +#endif + /* Pop items only when the streaming is not activated */ if (!mStreamingThreadActive.load()) { while(false == mAudioData.empty() && mAudioData.front().time < time - delta) { @@ -483,9 +552,7 @@ void CAudioManager::notify_audio_data_recording(long time, void* data, int len) lock.unlock(); for (const auto& observer : observers) { if (observer) { - if (!observer->on_recording_audio_data(time, data, len)) { - LOGE("[Recorder WARNING] One of the observer returned false"); - } + observer->on_recording_audio_data(time, data, len); } } } diff --git a/src/application_manager_aul.cpp b/src/application_manager_aul.cpp index e2e293e..ea0d759 100644 --- a/src/application_manager_aul.cpp +++ b/src/application_manager_aul.cpp @@ -18,6 +18,7 @@ #include "service_common.h" #include +#include #include #include @@ -128,14 +129,32 @@ boost::optional CApplicationManagerAul::get_appid_by_pid(pid_t pid) int retry_num = 0; char appid[MAX_APPID_LEN] = {'\0', }; - do { - if (AUL_R_OK == aul_app_get_appid_bypid(pid, appid, sizeof(appid))) { - appid[MAX_APPID_LEN - 1] = '\0'; - ret = std::string{appid}; - succeeded = true; + typedef struct { + std::string appid; + std::chrono::time_point updated; + } AppInfo; + + static std::map appids; + if (appids.find(pid) != appids.end()) { + auto info = appids[pid]; + auto now = std::chrono::steady_clock::now(); + if (now - info.updated < std::chrono::seconds(60)) { + ret = info.appid; } - retry_num++; - } while (!succeeded && retry_num < max_retry_num); + } + + if (!ret) { + do { + if (AUL_R_OK == aul_app_get_appid_bypid(pid, appid, sizeof(appid))) { + appid[MAX_APPID_LEN - 1] = '\0'; + ret = std::string{appid}; + succeeded = true; + + appids[pid] = AppInfo{*ret, std::chrono::steady_clock::now()}; + } + retry_num++; + } while (!succeeded && retry_num < max_retry_num); + } return ret; } @@ -144,9 +163,26 @@ boost::optional CApplicationManagerAul::get_pid_by_appid(const std::strin { boost::optional ret; - pid_t pid = aul_app_get_pid(appid.c_str()); - if (pid >= 0) { - ret = pid; + typedef struct { + pid_t pid; + std::chrono::time_point updated; + } AppInfo; + + static std::map pids; + if (pids.find(appid) != pids.end()) { + auto info = pids[appid]; + auto now = std::chrono::steady_clock::now(); + if (now - info.updated < std::chrono::seconds(10)) { + ret = info.pid; + } + } + + if (!ret) { + pid_t pid = aul_app_get_pid(appid.c_str()); + if (pid >= 0) { + ret = pid; + pids[appid] = AppInfo{pid, std::chrono::steady_clock::now()}; + } } return ret; diff --git a/src/service_ipc_dbus.cpp b/src/service_ipc_dbus.cpp index 915b1ed..d8101ad 100644 --- a/src/service_ipc_dbus.cpp +++ b/src/service_ipc_dbus.cpp @@ -26,6 +26,16 @@ #include "service_main.h" #include "service_ipc_dbus.h" +#ifdef PRINT_DETAILED_TIMEINFO +static long long int get_time_interval_count( + std::chrono::time_point time_point_1, + std::chrono::time_point time_point_2) { + auto interval = time_point_2 - time_point_1; + return static_cast( + std::chrono::duration_cast(interval).count()); +} +#endif + std::atomic_size_t gAudioDataMileage{0}; int CServiceIpcDbus::reconnect() @@ -228,6 +238,11 @@ int CServiceIpcDbus::send_streaming_audio_data(pid_t pid, int event, void* data, return -1; } +#ifdef PRINT_DETAILED_TIMEINFO + bool flushed = false; + auto started = std::chrono::steady_clock::now(); +#endif + static unsigned char pending_buffer[STREAMING_BUFFER_SIZE]; static size_t pending_buffer_size = 0; @@ -244,6 +259,10 @@ int CServiceIpcDbus::send_streaming_audio_data(pid_t pid, int event, void* data, size_t total_size = 0; size_t new_size = 0; +#ifdef PRINT_DETAILED_TIMEINFO + auto checkpoint_1 = std::chrono::steady_clock::now(); +#endif + new_size = sizeof(header); if (new_size + total_size <= STREAMING_BUFFER_SIZE) { memcpy(buffer, &header, new_size); @@ -282,20 +301,47 @@ int CServiceIpcDbus::send_streaming_audio_data(pid_t pid, int event, void* data, MAS_LOGE("queueing streaming data, serial : %d %d %zu", last_serial_waiting_for_flush, event, gAudioDataMileage.load()); } + +#ifdef PRINT_DETAILED_TIMEINFO + auto checkpoint_2 = std::chrono::steady_clock::now(); + auto checkpoint_2_1 = checkpoint_2; + auto checkpoint_2_2 = checkpoint_2; + auto checkpoint_2_3 = checkpoint_2; + auto checkpoint_2_4 = checkpoint_2; +#endif + if (pending_buffer_size + total_size > STREAMING_BUFFER_SIZE || MAS_SPEECH_STREAMING_EVENT_FINISH == event || current_time - last_flush_time > minimum_flush_interval) { + +#ifdef PRINT_DETAILED_TIMEINFO + flushed = true; +#endif + last_flush_time = current_time; bundle *b = bundle_create(); if (b) { bundle_add_byte(b, "content", pending_buffer, pending_buffer_size); +#ifdef PRINT_DETAILED_TIMEINFO + checkpoint_2_1 = std::chrono::steady_clock::now(); +#endif boost::optional appid = mApplicationManager->get_appid_by_pid(pid); +#ifdef PRINT_DETAILED_TIMEINFO + checkpoint_2_2 = std::chrono::steady_clock::now(); +#endif + if (appid) { +#ifdef PRINT_DETAILED_TIMEINFO + checkpoint_2_3 = std::chrono::steady_clock::now(); +#endif #if USE_TRUSTED_MESSAGE_PORT int ret = message_port_send_trusted_message((*appid).c_str(), message_port, b); #else int ret = message_port_send_message((*appid).c_str(), message_port, b); #endif +#ifdef PRINT_DETAILED_TIMEINFO + checkpoint_2_4 = std::chrono::steady_clock::now(); +#endif if (MESSAGE_PORT_ERROR_NONE != ret) masc_message_port_error(ret); } else { @@ -315,6 +361,9 @@ int CServiceIpcDbus::send_streaming_audio_data(pid_t pid, int event, void* data, } } +#ifdef PRINT_DETAILED_TIMEINFO + auto checkpoint_3 = std::chrono::steady_clock::now(); +#endif if (MAS_SPEECH_STREAMING_EVENT_FINISH == event) { MAS_LOGE("Sending FINISH event : %zu", gAudioDataMileage.load()); bundle *b = bundle_create(); @@ -345,6 +394,28 @@ int CServiceIpcDbus::send_streaming_audio_data(pid_t pid, int event, void* data, MAS_LOGE("Buffer overflow : %zu %zu", pending_buffer_size, total_size); } } + +#ifdef PRINT_DETAILED_TIMEINFO + auto finished = std::chrono::steady_clock::now(); + long long int count = get_time_interval_count(started, finished); + + if (count > 30) { + long long int count1 = get_time_interval_count(started, checkpoint_1); + long long int count2 = get_time_interval_count(checkpoint_1, checkpoint_2); + long long int count3 = get_time_interval_count(checkpoint_2, checkpoint_3); + long long int count4 = get_time_interval_count(checkpoint_3, finished); + + long long int count2_1 = get_time_interval_count(checkpoint_2, checkpoint_2_1); + long long int count2_2 = get_time_interval_count(checkpoint_2_1, checkpoint_2_2); + long long int count2_3 = get_time_interval_count(checkpoint_2_2, checkpoint_2_3); + long long int count2_4 = get_time_interval_count(checkpoint_2_3, checkpoint_2_4); + long long int count2_5 = get_time_interval_count(checkpoint_2_4, checkpoint_3); + + MAS_LOGE("Spent %lld for sending a single message %d [%lld %lld %lld %lld] (%lld %lld %lld %lld %lld)", + count, flushed, count1, count2, count3, count4, count2_1, count2_2, count2_3, count2_4, count2_5); + } +#endif + return 0; } diff --git a/src/service_plugin.cpp b/src/service_plugin.cpp index 0df29ed..0c99bd0 100644 --- a/src/service_plugin.cpp +++ b/src/service_plugin.cpp @@ -26,6 +26,8 @@ #include #include +#include + #include "service_main.h" #include "service_plugin.h" #include "service_ipc_dbus.h" @@ -35,6 +37,16 @@ #define BUF_SAVE_MODE #endif +#ifdef PRINT_DETAILED_TIMEINFO +static long long int get_time_interval_count( + std::chrono::time_point time_point_1, + std::chrono::time_point time_point_2) { + auto interval = time_point_2 - time_point_1; + return static_cast( + std::chrono::duration_cast(interval).count()); +} +#endif + static int g_last_wakeup_event_id = 0; typedef struct { @@ -420,6 +432,13 @@ void handle_speech_streaming_event_failure(void* data) static void __audio_streaming_cb(mas_speech_streaming_event_e event, void* buffer, int len, void *user_data) { +#ifdef PRINT_DETAILED_TIMEINFO + auto started = std::chrono::steady_clock::now(); + auto checkpoint_1 = started; + auto checkpoint_2 = started; + auto checkpoint_3 = started; +#endif + CServicePlugin* plugin = static_cast(user_data); if (event == MAS_SPEECH_STREAMING_EVENT_FAIL) { @@ -450,6 +469,10 @@ static void __audio_streaming_cb(mas_speech_streaming_event_e event, void* buffe service_main = plugin->get_service_main(); } +#ifdef PRINT_DETAILED_TIMEINFO + checkpoint_1 = std::chrono::steady_clock::now(); +#endif + if (service_ipc && service_main) { /* First check if we have dedicated audio processing app for current client */ pid_t pid = service_main->get_current_audio_processing_pid(); @@ -461,6 +484,10 @@ static void __audio_streaming_cb(mas_speech_streaming_event_e event, void* buffe MAS_LOGE("[ERROR] Fail to retrieve pid of current MA client"); } else { if (__validate_streaming_event_order(pid, &event)) { +#ifdef PRINT_DETAILED_TIMEINFO + checkpoint_2 = std::chrono::steady_clock::now(); +#endif + int ret = service_ipc->send_streaming_audio_data(pid, event, buffer, len); if (0 != ret) { @@ -473,6 +500,9 @@ static void __audio_streaming_cb(mas_speech_streaming_event_e event, void* buffe MAS_LOGE("[ERROR] Fail to send speech data to preprocessing client, ret(%d)", ret); } } +#ifdef PRINT_DETAILED_TIMEINFO + checkpoint_3 = std::chrono::steady_clock::now(); +#endif } } } @@ -490,6 +520,21 @@ static void __audio_streaming_cb(mas_speech_streaming_event_e event, void* buffe } } #endif + +#ifdef PRINT_DETAILED_TIMEINFO + auto finished = std::chrono::steady_clock::now(); + long long int total_count = get_time_interval_count(started, finished); + + if(total_count > 30) { + long long int count1 = get_time_interval_count(started, checkpoint_1); + long long int count2 = get_time_interval_count(checkpoint_1, checkpoint_2); + long long int count3 = get_time_interval_count(checkpoint_2, checkpoint_3); + long long int count4 = get_time_interval_count(checkpoint_3, finished); + + MAS_LOGE("Spent audio streaming callback time : %lld ms [%lld %lld %lld %lld]", + total_count, count1, count2, count3, count4); + } +#endif } static void __speech_status_cb(mas_speech_status_e status, void *user_data) -- 2.7.4