Reduce blocking time when streaming audio data 50/284950/3
authorJi-hoon Lee <dalton.lee@samsung.com>
Wed, 30 Nov 2022 09:05:43 +0000 (18:05 +0900)
committerJi-hoon Lee <dalton.lee@samsung.com>
Thu, 1 Dec 2022 11:53:41 +0000 (20:53 +0900)
When the CPU is busy handling many tasks, there are times
that the streaming thread gets blocked while calling some
undelying platform API functions. To avoid this problem,
applied several methods for reducing the blocking time
and added log messages for displaying time information.

List of methods used for reducing the blocking time:
1. Cache appid / pid information that is acquired by
   calling AUL API functions, since they sometimes
   do not return immediately and take a while to complete.
2. Instead of passing a pointer to the shared audio data
   into the message transmission module while holding the
   lock to protect the shared audio data, make a copy of
   audio data and release the lock immediately so that
   the copy of audio data can be used without the risk of
   race condition. This is because sometimes the message
   transmission also takes a bit of time to complete, and
   the other threads waiting for the lock cannot proceed
   in such cases.

Change-Id: I6c6959fadddbc557dbb77849fa25ce4949155f4b

inc/service_common.h
plugins/wakeup-manager/src/wakeup_audio_manager.cpp
src/application_manager_aul.cpp
src/service_ipc_dbus.cpp
src/service_plugin.cpp

index 2a3ff4f0e190fe6bd5dbd75a56a85e2f1b87cdf7..58bbf7cf5d1b846da5764268ecb8eddbb5bf5496 100644 (file)
@@ -6,6 +6,9 @@
 #include <multi_assistant_common.h>
 #include <multi_assistant_service.h>
 
+// Enable this macro for profiling with detailed timeinfo
+// #define PRINT_DETAILED_TIMEINFO
+
 #ifdef  LOG_TAG
 #undef  LOG_TAG
 #endif
index b05fcc8c6f6582de95a77ed12ed282930be7e02b..8bab1d697fe42c8dfdf31f7774c76cbf5552753b 100644 (file)
@@ -9,6 +9,7 @@
 #include <sys/resource.h>
 
 #include <algorithm>
+#include <map>
 
 #include <Ecore.h>
 #include <audio_io.h>
@@ -23,6 +24,11 @@ namespace wakeup
 /* Need to check whether this value needs to be configurable */
 static int g_speech_pcm_wait_count = 800;
 
+#ifdef PRINT_DETAILED_TIMEINFO
+static atomic_int g_mutex_wait_index{0};
+static map<int, long long int> g_mutex_wait_time;
+#endif
+
 static long long get_current_milliseconds_after_epoch()
 {
        auto now = chrono::steady_clock::now();
@@ -33,6 +39,17 @@ static long long get_current_milliseconds_after_epoch()
        return value.count();
 }
 
+#ifdef PRINT_DETAILED_TIMEINFO
+static std::string get_current_milliseconds_after_epoch(chrono::time_point<chrono::steady_clock> point)
+{
+       auto now_ms = chrono::time_point_cast<chrono::milliseconds>(point);
+       /* number of milliseconds since the epoch of system_clock */
+       auto value = now_ms.time_since_epoch();
+
+       return std::to_string(value.count());
+}
+#endif
+
 CAudioManager::CAudioManager()
 {
 }
@@ -253,6 +270,9 @@ void CAudioManager::streaming_audio_data_thread_func(long long start_time)
        lock.unlock();
 
        while (!(mStopStreamingThread.load())) {
+               const size_t SEND_BUFFER_SIZE = 4096;
+               unsigned char send_buffer[SEND_BUFFER_SIZE];
+
                int ret = -1;
                int cnt = 0;
 
@@ -318,14 +338,37 @@ void CAudioManager::streaming_audio_data_thread_func(long long start_time)
 
                if (lead != end) {
                        iter = lead;
-                       mas_speech_data& speech_data = iter->data;
+
+                       mas_speech_data speech_data = iter->data;
+                       size_t len = speech_data.len;
+                       if (len > SEND_BUFFER_SIZE) {
+                               LOGE("ERROR : SPEECH DATA contains data bigger than the buffer size : %d, truncating", len);
+                               len = SEND_BUFFER_SIZE;
+                       }
+                       memcpy(send_buffer, speech_data.buffer, len);
+                       speech_data.buffer = send_buffer;
+                       speech_data.len = len;
+                       lock.unlock();
+
                        for (const auto& observer : observers) {
                                if (observer) {
                                        validate_audio_data_event_field(speech_data);
+
+#ifdef PRINT_DETAILED_TIMEINFO
+                                       auto started = std::chrono::steady_clock::now();
+#endif
                                        if (!observer->on_streaming_audio_data(
                                                speech_data.event, speech_data.buffer, speech_data.len)) {
                                                LOGE("[Recorder WARNING] One of the observer returned false");
                                        }
+#ifdef PRINT_DETAILED_TIMEINFO
+                                       auto finished = std::chrono::steady_clock::now();
+                                       auto interval = finished - started;
+                                       long long int count = static_cast<long long int>(
+                                               chrono::duration_cast<chrono::milliseconds>(interval).count());
+                                       int index = g_mutex_wait_index;
+                                       g_mutex_wait_time[index] += count;
+#endif
                                }
                        }
 
@@ -336,8 +379,9 @@ void CAudioManager::streaming_audio_data_thread_func(long long start_time)
                        }
 
                        advance(lead, 1);
+               } else {
+                       lock.unlock();
                }
-               lock.unlock();
        }
 
        if (true != finish_event_sent) {
@@ -359,8 +403,29 @@ void CAudioManager::streaming_audio_data_thread_func(long long start_time)
        MWR_LOGE("[EXIT]");
 }
 
+#ifdef PRINT_DETAILED_TIMEINFO
+static void print_duration(std::chrono::time_point<std::chrono::steady_clock> started)
+{
+       const std::chrono::milliseconds threshold(100);
+       auto finished = std::chrono::steady_clock::now();
+       auto interval = finished - started;
+       if (interval > threshold) {
+               long long int count = static_cast<long long int>(
+                       std::chrono::duration_cast<std::chrono::milliseconds>(interval).count());
+               int index = g_mutex_wait_index;
+               MAS_LOGE("Mutex wait time : %d %lld %lld, [%s~]", index, count,
+                       g_mutex_wait_time[index],
+                       get_current_milliseconds_after_epoch(started).c_str());
+       }
+}
+#endif
+
 void CAudioManager::add_audio_data(mas_speech_data& data, long long time)
 {
+#ifdef PRINT_DETAILED_TIMEINFO
+       ++g_mutex_wait_index;
+#endif
+
        long long delta = mAudioRecordingDurationMilliseconds;
 
        bool print_log = false;
@@ -416,6 +481,10 @@ void CAudioManager::add_audio_data(mas_speech_data& data, long long time)
 
        lock_guard<mutex> lock(mMutex);
 
+#ifdef PRINT_DETAILED_TIMEINFO
+       print_duration(now);
+#endif
+
        /* Pop items only when the streaming is not activated */
        if (!mStreamingThreadActive.load()) {
                while(false == mAudioData.empty() && mAudioData.front().time < time - delta) {
@@ -483,9 +552,7 @@ void CAudioManager::notify_audio_data_recording(long time, void* data, int len)
        lock.unlock();
        for (const auto& observer : observers) {
                if (observer) {
-                       if (!observer->on_recording_audio_data(time, data, len)) {
-                               LOGE("[Recorder WARNING] One of the observer returned false");
-                       }
+                       observer->on_recording_audio_data(time, data, len);
                }
        }
 }
index e2e293e60c4fade6f4b975fa90b80db25dc3357a..ea0d7590c7b16852a9837889528c5eabd7f1c9a7 100644 (file)
@@ -18,6 +18,7 @@
 #include "service_common.h"
 
 #include <chrono>
+#include <map>
 
 #include <aul.h>
 #include <aul_svc.h>
@@ -128,14 +129,32 @@ boost::optional<std::string> CApplicationManagerAul::get_appid_by_pid(pid_t pid)
        int retry_num = 0;
        char appid[MAX_APPID_LEN] = {'\0', };
 
-       do {
-               if (AUL_R_OK == aul_app_get_appid_bypid(pid, appid, sizeof(appid))) {
-                       appid[MAX_APPID_LEN - 1] = '\0';
-                       ret = std::string{appid};
-                       succeeded = true;
+       typedef struct {
+               std::string appid;
+               std::chrono::time_point<std::chrono::steady_clock> updated;
+       } AppInfo;
+
+       static std::map<pid_t, AppInfo> appids;
+       if (appids.find(pid) != appids.end()) {
+               auto info = appids[pid];
+               auto now = std::chrono::steady_clock::now();
+               if (now - info.updated < std::chrono::seconds(60)) {
+                       ret = info.appid;
                }
-               retry_num++;
-       } while (!succeeded && retry_num < max_retry_num);
+       }
+
+       if (!ret) {
+               do {
+                       if (AUL_R_OK == aul_app_get_appid_bypid(pid, appid, sizeof(appid))) {
+                               appid[MAX_APPID_LEN - 1] = '\0';
+                               ret = std::string{appid};
+                               succeeded = true;
+
+                               appids[pid] = AppInfo{*ret, std::chrono::steady_clock::now()};
+                       }
+                       retry_num++;
+               } while (!succeeded && retry_num < max_retry_num);
+       }
 
        return ret;
 }
@@ -144,9 +163,26 @@ boost::optional<pid_t> CApplicationManagerAul::get_pid_by_appid(const std::strin
 {
        boost::optional<pid_t> ret;
 
-       pid_t pid = aul_app_get_pid(appid.c_str());
-       if (pid >= 0) {
-               ret = pid;
+       typedef struct {
+               pid_t pid;
+               std::chrono::time_point<std::chrono::steady_clock> updated;
+       } AppInfo;
+
+       static std::map<std::string, AppInfo> pids;
+       if (pids.find(appid) != pids.end()) {
+               auto info = pids[appid];
+               auto now = std::chrono::steady_clock::now();
+               if (now - info.updated < std::chrono::seconds(10)) {
+                       ret = info.pid;
+               }
+       }
+
+       if (!ret) {
+               pid_t pid = aul_app_get_pid(appid.c_str());
+               if (pid >= 0) {
+                       ret = pid;
+                       pids[appid] = AppInfo{pid, std::chrono::steady_clock::now()};
+               }
        }
 
        return ret;
index 915b1edef6c75ab19c7e5842aac79b5b7b55215e..d8101adba6c86af49a8f20a6dcc62d82dcc708d3 100644 (file)
 #include "service_main.h"
 #include "service_ipc_dbus.h"
 
+#ifdef PRINT_DETAILED_TIMEINFO
+static long long int get_time_interval_count(
+       std::chrono::time_point<std::chrono::steady_clock> time_point_1,
+       std::chrono::time_point<std::chrono::steady_clock> time_point_2) {
+       auto interval = time_point_2 - time_point_1;
+       return static_cast<long long int>(
+               std::chrono::duration_cast<std::chrono::milliseconds>(interval).count());
+}
+#endif
+
 std::atomic_size_t gAudioDataMileage{0};
 
 int CServiceIpcDbus::reconnect()
@@ -228,6 +238,11 @@ int CServiceIpcDbus::send_streaming_audio_data(pid_t pid, int event, void* data,
                return -1;
        }
 
+#ifdef PRINT_DETAILED_TIMEINFO
+       bool flushed = false;
+       auto started = std::chrono::steady_clock::now();
+#endif
+
        static unsigned char pending_buffer[STREAMING_BUFFER_SIZE];
        static size_t pending_buffer_size = 0;
 
@@ -244,6 +259,10 @@ int CServiceIpcDbus::send_streaming_audio_data(pid_t pid, int event, void* data,
        size_t total_size = 0;
        size_t new_size = 0;
 
+#ifdef PRINT_DETAILED_TIMEINFO
+       auto checkpoint_1 = std::chrono::steady_clock::now();
+#endif
+
        new_size = sizeof(header);
        if (new_size + total_size <= STREAMING_BUFFER_SIZE) {
                memcpy(buffer, &header, new_size);
@@ -282,19 +301,46 @@ int CServiceIpcDbus::send_streaming_audio_data(pid_t pid, int event, void* data,
                MAS_LOGE("queueing streaming data, serial : %d %d %zu",
                        last_serial_waiting_for_flush, event, gAudioDataMileage.load());
        }
+
+#ifdef PRINT_DETAILED_TIMEINFO
+       auto checkpoint_2 = std::chrono::steady_clock::now();
+       auto checkpoint_2_1 = checkpoint_2;
+       auto checkpoint_2_2 = checkpoint_2;
+       auto checkpoint_2_3 = checkpoint_2;
+       auto checkpoint_2_4 = checkpoint_2;
+#endif
+
        if (pending_buffer_size + total_size > STREAMING_BUFFER_SIZE ||
                MAS_SPEECH_STREAMING_EVENT_FINISH == event ||
                current_time - last_flush_time > minimum_flush_interval) {
+
+#ifdef PRINT_DETAILED_TIMEINFO
+               flushed = true;
+#endif
+
                last_flush_time = current_time;
                bundle *b = bundle_create();
                if (b) {
                        bundle_add_byte(b, "content", pending_buffer, pending_buffer_size);
+#ifdef PRINT_DETAILED_TIMEINFO
+                       checkpoint_2_1 = std::chrono::steady_clock::now();
+#endif
                        boost::optional<std::string> appid = mApplicationManager->get_appid_by_pid(pid);
+#ifdef PRINT_DETAILED_TIMEINFO
+                       checkpoint_2_2 = std::chrono::steady_clock::now();
+#endif
+
                        if (appid) {
+#ifdef PRINT_DETAILED_TIMEINFO
+                               checkpoint_2_3 = std::chrono::steady_clock::now();
+#endif
 #if USE_TRUSTED_MESSAGE_PORT
                                int ret = message_port_send_trusted_message((*appid).c_str(), message_port, b);
 #else
                                int ret = message_port_send_message((*appid).c_str(), message_port, b);
+#endif
+#ifdef PRINT_DETAILED_TIMEINFO
+                               checkpoint_2_4 = std::chrono::steady_clock::now();
 #endif
                                if (MESSAGE_PORT_ERROR_NONE != ret)
                                        masc_message_port_error(ret);
@@ -315,6 +361,9 @@ int CServiceIpcDbus::send_streaming_audio_data(pid_t pid, int event, void* data,
                }
        }
 
+#ifdef PRINT_DETAILED_TIMEINFO
+       auto checkpoint_3 = std::chrono::steady_clock::now();
+#endif
        if (MAS_SPEECH_STREAMING_EVENT_FINISH == event) {
                MAS_LOGE("Sending FINISH event : %zu", gAudioDataMileage.load());
                bundle *b = bundle_create();
@@ -345,6 +394,28 @@ int CServiceIpcDbus::send_streaming_audio_data(pid_t pid, int event, void* data,
                        MAS_LOGE("Buffer overflow : %zu %zu", pending_buffer_size, total_size);
                }
        }
+
+#ifdef PRINT_DETAILED_TIMEINFO
+       auto finished = std::chrono::steady_clock::now();
+       long long int count = get_time_interval_count(started, finished);
+
+       if (count > 30) {
+               long long int count1 = get_time_interval_count(started, checkpoint_1);
+               long long int count2 = get_time_interval_count(checkpoint_1, checkpoint_2);
+               long long int count3 = get_time_interval_count(checkpoint_2, checkpoint_3);
+               long long int count4 = get_time_interval_count(checkpoint_3, finished);
+
+               long long int count2_1 = get_time_interval_count(checkpoint_2, checkpoint_2_1);
+               long long int count2_2 = get_time_interval_count(checkpoint_2_1, checkpoint_2_2);
+               long long int count2_3 = get_time_interval_count(checkpoint_2_2, checkpoint_2_3);
+               long long int count2_4 = get_time_interval_count(checkpoint_2_3, checkpoint_2_4);
+               long long int count2_5 = get_time_interval_count(checkpoint_2_4, checkpoint_3);
+
+               MAS_LOGE("Spent %lld for sending a single message %d [%lld %lld %lld %lld] (%lld %lld %lld %lld %lld)",
+                               count, flushed, count1, count2, count3, count4, count2_1, count2_2, count2_3, count2_4, count2_5);
+       }
+#endif
+
        return 0;
 }
 
index 0df29ed3cc78e91eabbe98170a1d236e8bf29f36..0c99bd061a55135c964302a138b7fe02c7fac36c 100644 (file)
@@ -26,6 +26,8 @@
 #include <dlfcn.h>
 #include <new>
 
+#include <chrono>
+
 #include "service_main.h"
 #include "service_plugin.h"
 #include "service_ipc_dbus.h"
 #define BUF_SAVE_MODE
 #endif
 
+#ifdef PRINT_DETAILED_TIMEINFO
+static long long int get_time_interval_count(
+       std::chrono::time_point<std::chrono::steady_clock> time_point_1,
+       std::chrono::time_point<std::chrono::steady_clock> time_point_2) {
+       auto interval = time_point_2 - time_point_1;
+       return static_cast<long long int>(
+               std::chrono::duration_cast<std::chrono::milliseconds>(interval).count());
+}
+#endif
+
 static int g_last_wakeup_event_id = 0;
 
 typedef struct {
@@ -420,6 +432,13 @@ void handle_speech_streaming_event_failure(void* data)
 
 static void __audio_streaming_cb(mas_speech_streaming_event_e event, void* buffer, int len, void *user_data)
 {
+#ifdef PRINT_DETAILED_TIMEINFO
+       auto started = std::chrono::steady_clock::now();
+       auto checkpoint_1 = started;
+       auto checkpoint_2 = started;
+       auto checkpoint_3 = started;
+#endif
+
        CServicePlugin* plugin = static_cast<CServicePlugin*>(user_data);
 
        if (event == MAS_SPEECH_STREAMING_EVENT_FAIL) {
@@ -450,6 +469,10 @@ static void __audio_streaming_cb(mas_speech_streaming_event_e event, void* buffe
                service_main = plugin->get_service_main();
        }
 
+#ifdef PRINT_DETAILED_TIMEINFO
+       checkpoint_1 = std::chrono::steady_clock::now();
+#endif
+
        if (service_ipc && service_main) {
                /* First check if we have dedicated audio processing app for current client */
                pid_t pid = service_main->get_current_audio_processing_pid();
@@ -461,6 +484,10 @@ static void __audio_streaming_cb(mas_speech_streaming_event_e event, void* buffe
                        MAS_LOGE("[ERROR] Fail to retrieve pid of current MA client");
                } else {
                        if (__validate_streaming_event_order(pid, &event)) {
+#ifdef PRINT_DETAILED_TIMEINFO
+                               checkpoint_2 = std::chrono::steady_clock::now();
+#endif
+
                                int ret = service_ipc->send_streaming_audio_data(pid,
                                        event, buffer, len);
                                if (0 != ret) {
@@ -473,6 +500,9 @@ static void __audio_streaming_cb(mas_speech_streaming_event_e event, void* buffe
                                                MAS_LOGE("[ERROR] Fail to send speech data to preprocessing client, ret(%d)", ret);
                                        }
                                }
+#ifdef PRINT_DETAILED_TIMEINFO
+                               checkpoint_3 = std::chrono::steady_clock::now();
+#endif
                        }
                }
        }
@@ -490,6 +520,21 @@ static void __audio_streaming_cb(mas_speech_streaming_event_e event, void* buffe
                }
        }
 #endif
+
+#ifdef PRINT_DETAILED_TIMEINFO
+       auto finished = std::chrono::steady_clock::now();
+       long long int total_count = get_time_interval_count(started, finished);
+
+       if(total_count > 30) {
+               long long int count1 = get_time_interval_count(started, checkpoint_1);
+               long long int count2 = get_time_interval_count(checkpoint_1, checkpoint_2);
+               long long int count3 = get_time_interval_count(checkpoint_2, checkpoint_3);
+               long long int count4 = get_time_interval_count(checkpoint_3, finished);
+
+               MAS_LOGE("Spent audio streaming callback time : %lld ms [%lld %lld %lld %lld]",
+                               total_count, count1, count2, count3, count4);
+       }
+#endif
 }
 
 static void __speech_status_cb(mas_speech_status_e status, void *user_data)