Protect member variables shared across multiple threads
authorJi-hoon Lee <dalton.lee@samsung.com>
Mon, 8 Apr 2019 01:57:18 +0000 (10:57 +0900)
committerJi-hoon Lee <dalton.lee@samsung.com>
Fri, 12 Apr 2019 13:20:53 +0000 (13:20 +0000)
Change-Id: I1cbefaa78ab53a537851dbdd532fbf9646e64cd1

plugins/wakeup-manager/inc/wakeup_audio_manager.h
plugins/wakeup-manager/src/wakeup_audio_manager.cpp

index 1ba4b77..6de354f 100644 (file)
@@ -22,6 +22,7 @@
 
 #include <atomic>
 #include <list>
+#include <mutex>
 #include <thread>
 #include <vector>
 
@@ -102,6 +103,7 @@ private:
        vector<wakeup_speech_data_with_time> mSpeechData;
        vector<wakeup_speech_data_with_time> mPreviousSpeechData;
        list<wakeup_speech_data_with_time> mBackgroundData;
+       mutex mMutex;
        bool mVoiceKeyPressed{false};
 };
 
index 9c8a228..238028c 100644 (file)
@@ -160,10 +160,7 @@ int CAudioManager::deinitialize(void)
 {
        MWR_LOGD("[ENTER]");
 
-       for (const auto &data : mSpeechData) {
-               free(data.data.buffer);
-       }
-       mSpeechData.clear();
+       clear_speech_data();
 
 #ifdef TV_PRODUCT
        bt_hid_unset_audio_data_receive_cb();
@@ -219,7 +216,7 @@ static long get_current_milliseconds_after_epoch()
 void CAudioManager::recorder_thread_func()
 {
 #ifndef TV_PRODUCT
-
+       unique_lock<mutex> lock(mMutex, defer_lock);
        static int buffer_count = 0;
 
        while (!(mStopRecorderThread.load())) {
@@ -233,7 +230,7 @@ void CAudioManager::recorder_thread_func()
                        LOGE("[Recorder WARNING] Fail to read audio : %d", read_bytes);
                        break;
                }
-               // LOCK REQUIRED
+
                for (const auto& observer : mObservers) {
                        if (observer) {
                                if (!observer->on_recording_audio_data(time, buffer, read_bytes)) {
@@ -243,6 +240,7 @@ void CAudioManager::recorder_thread_func()
                        }
                }
 
+               lock.lock();
                long delta = mBackgroundRecordingDurationMilliseconds;
                if (mBackgroundData.size() > 0) {
                        while(mBackgroundData.size() > 0 && mBackgroundData.front().time < time - delta) {
@@ -253,6 +251,7 @@ void CAudioManager::recorder_thread_func()
                                mBackgroundData.pop_front();
                        }
                }
+               lock.unlock();
 
                wakeup_speech_data_with_time data;
                data.data.buffer = malloc(read_bytes);
@@ -261,10 +260,11 @@ void CAudioManager::recorder_thread_func()
                        data.data.event = WAKEUP_SPEECH_STREAMING_EVENT_CONTINUE;
                        data.data.len = read_bytes;
                        memcpy(data.data.buffer, buffer, read_bytes);
+                       lock.lock();
                        mBackgroundData.push_back(data);
+                       lock.unlock();
                }
 
-               // UNLOCK REQUIRED
                /* Audio read log */
                if (0 == buffer_count % 300) {
                        LOGD("[Recorder][%d] Recording... : read_size(%d)", buffer_count, read_bytes);
@@ -331,27 +331,34 @@ void CAudioManager::start_recording()
        }
 }
 
-
+/* Need to consider adapting conventional producer-consumer model */
 void CAudioManager::streaming_speech_data_thread_func()
 {
        MWR_LOGD("[ENTER]");
 
+       unique_lock<mutex> lock(mMutex, defer_lock);
        int index = 0;
-       MWR_LOGD("data_count : %zu", mSpeechData.size());
 
        while (!(mStopStreamingThread.load())) {
                int ret = -1;
                int cnt = 0;
 
                /* get feedback data */
-               if (index >= mSpeechData.size()) {
+               size_t speech_data_size = 0;
+               lock.lock();
+               speech_data_size = mSpeechData.size();
+               lock.unlock();
+               if (index >= speech_data_size) {
                        /* empty queue */
                        MWR_LOGD("[DEBUG] No feedback data. Waiting mode : %d", ret);
 
                        /* waiting */
                        while (1) {
                                this_thread::sleep_for(chrono::milliseconds(10));
-                               if (index < mSpeechData.size()) {
+                               lock.lock();
+                               speech_data_size = mSpeechData.size();
+                               lock.unlock();
+                               if (index < speech_data_size) {
                                        MWR_LOGI("[INFO] Resume thread");
                                        break;
                                }
@@ -375,6 +382,7 @@ void CAudioManager::streaming_speech_data_thread_func()
                        continue;
                }
 
+               lock.lock();
                wakeup_speech_data& speech_data = mSpeechData.at(index).data;
                for (const auto& observer : mObservers) {
                        if (observer) {
@@ -384,11 +392,14 @@ void CAudioManager::streaming_speech_data_thread_func()
                                }
                        }
                }
+               lock.unlock();
 
                if (WAKEUP_SPEECH_STREAMING_EVENT_FINISH == speech_data.event) {
                        MWR_LOGI("[INFO] Finish to get and send speech data");
                        /* Now move all the speech data to previous speech data for later use */
+                       lock.lock();
                        mPreviousSpeechData = move(mSpeechData);
+                       lock.unlock();
                        break;
                }
 
@@ -400,35 +411,45 @@ void CAudioManager::streaming_background_data_thread_func(long start_time)
 {
        MWR_LOGD("[ENTER]");
 
+       unique_lock<mutex> lock(mMutex, defer_lock);
+
+       lock.lock();
        auto lead = mBackgroundData.begin();
        auto iter = lead;
        while (lead != mBackgroundData.end() && lead->time < start_time) {
                iter = lead;
                advance(lead, 1);
        }
-
        MWR_LOGD("data_count : %zu", mBackgroundData.size());
+       lock.unlock();
 
        while (!(mStopStreamingThread.load())) {
                int ret = -1;
                int cnt = 0;
 
                /* get feedback data */
-               if (lead == mBackgroundData.end()) {
+               lock.lock();
+               auto end = mBackgroundData.end();
+               lock.unlock();
+               if (lead == end) {
                        /* empty queue */
                        MWR_LOGD("[DEBUG] No feedback data. Waiting mode : %d", ret);
 
                        /* waiting */
                        while (1) {
                                this_thread::sleep_for(chrono::milliseconds(10));
-                               if (iter == mBackgroundData.end()) {
-                                       iter = mBackgroundData.begin();
+                               lock.lock();
+                               end = mBackgroundData.end();
+                               auto begin = mBackgroundData.begin();
+                               lock.unlock();
+                               if (iter == end) {
+                                       iter = begin;
                                }
                                lead = iter;
-                               if (lead != mBackgroundData.end()) {
+                               if (lead != end) {
                                        advance(lead, 1);
                                }
-                               if (lead != mBackgroundData.end()) {
+                               if (lead != end) {
                                        MWR_LOGI("[INFO] Resume thread");
                                        break;
                                }
@@ -454,7 +475,9 @@ void CAudioManager::streaming_background_data_thread_func(long start_time)
 
                iter = lead;
                /* Extracted background data will be used as previous utterance data*/
+               lock.lock();
                mSpeechData.push_back(*iter);
+               lock.unlock();
 
                wakeup_speech_data& speech_data = iter->data;
                for (const auto& observer : mObservers) {
@@ -469,7 +492,9 @@ void CAudioManager::streaming_background_data_thread_func(long start_time)
                if (WAKEUP_SPEECH_STREAMING_EVENT_FINISH == speech_data.event) {
                        MWR_LOGI("[INFO] Finish to get and send speech data");
                        /* Now move all the speech data to previous speech data for later use */
+                       lock.lock();
                        mPreviousSpeechData = move(mSpeechData);
+                       lock.unlock();
                        break;
                }
 
@@ -482,11 +507,15 @@ void CAudioManager::add_speech_data(wakeup_speech_data& speech_data)
        wakeup_speech_data_with_time data;
        data.data = speech_data;
        data.time = get_current_milliseconds_after_epoch();
+
+       lock_guard<mutex> lock(mMutex);
        mSpeechData.push_back(data);
 }
 
 void CAudioManager::clear_speech_data()
 {
+       lock_guard<mutex> lock(mMutex);
+
        for (const auto &data : mSpeechData) {
                if (data.data.buffer) free(data.data.buffer);
        }
@@ -505,6 +534,8 @@ void CAudioManager::finalize_speech_data()
                wakeup_speech_data_with_time data;
                data.data = speech_data;
                data.time = get_current_milliseconds_after_epoch();
+
+               lock_guard<mutex> lock(mMutex);
                mSpeechData.push_back(data);
        }
 }
@@ -516,7 +547,7 @@ void CAudioManager::start_streaming_current_utterance_data(bool from_start_time,
                return;
        }
        if (from_start_time) {
-               mSpeechData.clear();
+               clear_speech_data();
                mStreamingThread = thread(&CAudioManager::streaming_background_data_thread_func, this, start_time);
        } else {
                mStreamingThread = thread(&CAudioManager::streaming_speech_data_thread_func, this);
@@ -533,6 +564,7 @@ void CAudioManager::stop_streaming_current_utterance_data()
        mStopStreamingThread.store(false);
 
        /* Now move all the speech data to previous speech data for later use */
+       lock_guard<mutex> lock(mMutex);
        mPreviousSpeechData = move(mSpeechData);
 }