From f3290962e19192074fb45061c313b8520e8f1151 Mon Sep 17 00:00:00 2001 From: Vladislav Smenyuk Date: Thu, 3 Apr 2014 07:14:58 -0700 Subject: [PATCH] APPLINK-5808 --- .../media_manager/socket_streamer_adapter.h | 1 + .../media_manager/src/socket_streamer_adapter.cc | 36 ++++++++++++++++------ src/components/utils/include/utils/message_queue.h | 20 ++++++++++-- 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/src/components/media_manager/include/media_manager/socket_streamer_adapter.h b/src/components/media_manager/include/media_manager/socket_streamer_adapter.h index 899e3ed..c900ea5 100644 --- a/src/components/media_manager/include/media_manager/socket_streamer_adapter.h +++ b/src/components/media_manager/include/media_manager/socket_streamer_adapter.h @@ -126,6 +126,7 @@ class SocketStreamerAdapter : public MediaAdapterImpl { int32_t socket_fd_; bool is_ready_; threads::Thread* thread_; + Streamer* streamer_; MessageQueue messages_; DISALLOW_COPY_AND_ASSIGN(SocketStreamerAdapter); diff --git a/src/components/media_manager/src/socket_streamer_adapter.cc b/src/components/media_manager/src/socket_streamer_adapter.cc index 65e823d..ed4a7cf 100644 --- a/src/components/media_manager/src/socket_streamer_adapter.cc +++ b/src/components/media_manager/src/socket_streamer_adapter.cc @@ -53,11 +53,13 @@ SocketStreamerAdapter::SocketStreamerAdapter() : socket_fd_(0), is_ready_(false), messages_(), - thread_(NULL) { + thread_(NULL), + streamer_(NULL) { } SocketStreamerAdapter::~SocketStreamerAdapter() { thread_->stop(); + streamer_ = NULL; delete thread_; if (socket_fd_ != -1) { ::close(socket_fd_); @@ -75,6 +77,8 @@ void SocketStreamerAdapter::StartActivity(int32_t application_key) { is_ready_ = true; current_application_ = application_key; + messages_.Reset(); + for (std::set::iterator it = media_listeners_.begin(); media_listeners_.end() != it; ++it) { @@ -93,6 +97,11 @@ void SocketStreamerAdapter::StopActivity(int32_t application_key) { is_ready_ = false; current_application_ = 0; + if (streamer_) { + streamer_->stop(); + messages_.Shutdown(); + } + for (std::set::iterator it = media_listeners_.begin(); media_listeners_.end() != it; ++it) { @@ -108,7 +117,8 @@ bool SocketStreamerAdapter::is_app_performing_activity( void SocketStreamerAdapter::Init() { if (!thread_) { LOG4CXX_INFO(logger, "Create and start sending thread"); - thread_ = new threads::Thread("PipeStreamerAdapter", new Streamer(this)); + streamer_ = new Streamer(this); + thread_ = new threads::Thread("SocketStreamerAdapter", streamer_); const size_t kStackSize = 16384; thread_->startWithOptions(threads::ThreadOptions(kStackSize)); } @@ -159,6 +169,7 @@ void SocketStreamerAdapter::Streamer::threadMain() { } is_client_connected_ = true; + is_first_loop_ = true; while (is_client_connected_) { while (!server_->messages_.empty()) { protocol_handler::RawMessagePtr msg = server_->messages_.pop(); @@ -172,20 +183,24 @@ void SocketStreamerAdapter::Streamer::threadMain() { ++messsages_for_session; LOG4CXX_INFO(logger, "Handling map streaming message. This is " - << messsages_for_session << " the message for " - << server_->current_application_); - std::set::iterator it = - server_->media_listeners_.begin(); + << messsages_for_session << " the message for " + << server_->current_application_); + std::set::iterator it = server_->media_listeners_ + .begin(); for (; server_->media_listeners_.end() != it; ++it) { (*it)->OnDataReceived(server_->current_application_, messsages_for_session); } } + + if (!is_ready()) { + LOG4CXX_INFO(logger, "Client disconnected."); + stop(); + break; + } + server_->messages_.wait(); - is_client_connected_ = is_ready(); } - - stop(); } } @@ -233,7 +248,7 @@ void SocketStreamerAdapter::Streamer::start() { } void SocketStreamerAdapter::Streamer::stop() { - is_client_connected_ = false; + LOG4CXX_INFO(logger, "SocketStreamerAdapter::Streamer::stop"); if (!new_socket_fd_) { return; } @@ -249,6 +264,7 @@ void SocketStreamerAdapter::Streamer::stop() { } new_socket_fd_ = -1; + is_client_connected_ = false; } bool SocketStreamerAdapter::Streamer::is_ready() const { diff --git a/src/components/utils/include/utils/message_queue.h b/src/components/utils/include/utils/message_queue.h index e043440..8657957 100644 --- a/src/components/utils/include/utils/message_queue.h +++ b/src/components/utils/include/utils/message_queue.h @@ -100,6 +100,12 @@ template > class MessageQueue { * shutting down */ void Shutdown(); + + /** + * \brief Clears queue. + */ + void Reset(); + private: /** @@ -107,11 +113,12 @@ template > class MessageQueue { */ Queue queue_; volatile bool shutting_down_; + /** *\brief Platform specific syncronisation variable */ mutable sync_primitives::Lock queue_lock_; - mutable sync_primitives::ConditionalVariable queue_new_items_; + sync_primitives::ConditionalVariable queue_new_items_; }; template MessageQueue::MessageQueue() @@ -128,7 +135,7 @@ template MessageQueue::~MessageQueue() { template void MessageQueue::wait() { sync_primitives::AutoLock auto_lock(queue_lock_); - while (!shutting_down_ && queue_.empty()) { + while ((!shutting_down_) && queue_.empty()) { queue_new_items_.Wait(auto_lock); } } @@ -178,4 +185,13 @@ template void MessageQueue::Shutdown() { queue_new_items_.Broadcast(); } +template void MessageQueue::Reset() { + sync_primitives::AutoLock auto_lock(queue_lock_); + shutting_down_ = false; + if (!queue_.empty()) { + Queue empty_queue; + queue_.swap(empty_queue); + } +} + #endif // MESSAGE_QUEUE_CLASS -- 2.7.4