From c69efb8b5f807c09dce572d573d6e4590decf190 Mon Sep 17 00:00:00 2001 From: Xie Ligang Date: Wed, 22 May 2019 10:35:23 +0800 Subject: [PATCH] Add previous steam related function. Change-Id: I20a725f78b2eafabd35b3b0603915eaa76562ca5 Signed-off-by: Xie Ligang --- .../wakeup-manager/inc/wakeup_audio_manager.h | 3 + .../src/wakeup_audio_manager.cpp | 64 +++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/plugins/wakeup-manager/inc/wakeup_audio_manager.h b/plugins/wakeup-manager/inc/wakeup_audio_manager.h index ba07975..d74f7a6 100644 --- a/plugins/wakeup-manager/inc/wakeup_audio_manager.h +++ b/plugins/wakeup-manager/inc/wakeup_audio_manager.h @@ -90,6 +90,7 @@ public: private: void recorder_thread_func(void); void streaming_speech_data_thread_func(); + void streaming_previous_speech_data_thread_func(); void streaming_background_data_thread_func(long start_time); vector mObservers; @@ -106,6 +107,8 @@ private: thread mStreamingThread; atomic_bool mStopStreamingThread{false}; + thread mStreamingPreviousThread; + atomic_bool mStopStreamingPreviousThread{false}; static constexpr long mBackgroundRecordingDurationMilliseconds = 10 * 1000; typedef struct { long time; diff --git a/plugins/wakeup-manager/src/wakeup_audio_manager.cpp b/plugins/wakeup-manager/src/wakeup_audio_manager.cpp index be49e0b..db40516 100644 --- a/plugins/wakeup-manager/src/wakeup_audio_manager.cpp +++ b/plugins/wakeup-manager/src/wakeup_audio_manager.cpp @@ -463,6 +463,59 @@ void CAudioManager::streaming_speech_data_thread_func() } } +/* Need to consider adapting conventional producer-consumer model */ +void CAudioManager::streaming_previous_speech_data_thread_func() +{ + MWR_LOGD("[ENTER]"); + + unique_lock lock(mMutex, defer_lock); + + while (!(mStopStreamingThread.load())) { + int ret = -1; + int cnt = 0; + + /* get feedback data */ + size_t speech_data_size = 0; + lock.lock(); + speech_data_size = mPreviousSpeechData.size(); + lock.unlock(); + /* waiting */ + while (0 >= speech_data_size) { + /* empty queue */ + MWR_LOGD("[DEBUG] No feedback data. Waiting mode : %d", ret); + this_thread::sleep_for(chrono::milliseconds(10)); + lock.lock(); + speech_data_size = mSpeechData.size(); + lock.unlock(); + if (0 < speech_data_size) { + MWR_LOGI("[INFO] Resume thread"); + break; + } + cnt++; + } + MWR_LOGI("[INFO] Finish to wait for new feedback data come"); + + lock.lock(); + for (int index = 0; index < speech_data_size; index++) { + wakeup_speech_data& speech_data = mPreviousSpeechData.at(index).data; + for (const auto& observer : mObservers) { + if (observer) { + if (!observer->on_streaming_audio_data( + speech_data.event, speech_data.buffer, speech_data.len)) { + LOGE("[Recorder WARNING] One of the observer returned false"); + } + } + + if (WAKEUP_SPEECH_STREAMING_EVENT_FINISH == speech_data.event) { + MWR_LOGI("[INFO] Finish to send previous speech data"); + return; + } + } + } + lock.unlock(); + } +} + void CAudioManager::streaming_background_data_thread_func(long start_time) { MWR_LOGD("[ENTER]"); @@ -660,10 +713,21 @@ void CAudioManager::stop_streaming_current_utterance_data() void CAudioManager::start_streaming_previous_utterance_data() { + if (mStreamingPreviousThread.joinable()) { + MWR_LOGE("ERROR : mStreamingPreviousThread is joinable, will not start a new thread"); + return; + } + mStreamingPreviousThread = thread(&CAudioManager::streaming_previous_speech_data_thread_func, this); } void CAudioManager::stop_streaming_previous_utterance_data() { + if (mStreamingPreviousThread.joinable()) { + MWR_LOGD("mStreamingPreviousThread is joinable, trying join()"); + mStopStreamingPreviousThread.store(true); + mStreamingPreviousThread.join(); + } + mStopStreamingThread.store(false); } void CAudioManager::start_streaming_follow_up_data() -- 2.34.1