APPLINK-5808
authorVladislav Smenyuk <VSmenyuk@luxoft.com>
Thu, 3 Apr 2014 14:14:58 +0000 (07:14 -0700)
committerJustin Dickow <jjdickow@gmail.com>
Tue, 8 Jul 2014 22:47:00 +0000 (18:47 -0400)
src/components/media_manager/include/media_manager/socket_streamer_adapter.h
src/components/media_manager/src/socket_streamer_adapter.cc
src/components/utils/include/utils/message_queue.h

index 899e3ed..c900ea5 100644 (file)
@@ -126,6 +126,7 @@ class SocketStreamerAdapter : public MediaAdapterImpl {
     int32_t                                       socket_fd_;
     bool                                          is_ready_;
     threads::Thread*                              thread_;
+    Streamer*                                     streamer_;
     MessageQueue<protocol_handler::RawMessagePtr> messages_;
 
     DISALLOW_COPY_AND_ASSIGN(SocketStreamerAdapter);
index 65e823d..ed4a7cf 100644 (file)
@@ -53,11 +53,13 @@ SocketStreamerAdapter::SocketStreamerAdapter()
   : socket_fd_(0),
     is_ready_(false),
     messages_(),
-    thread_(NULL) {
+    thread_(NULL),
+    streamer_(NULL) {
 }
 
 SocketStreamerAdapter::~SocketStreamerAdapter() {
   thread_->stop();
+  streamer_ = NULL;
   delete thread_;
   if (socket_fd_ != -1) {
     ::close(socket_fd_);
@@ -75,6 +77,8 @@ void SocketStreamerAdapter::StartActivity(int32_t application_key) {
   is_ready_ = true;
   current_application_ = application_key;
 
+  messages_.Reset();
+
   for (std::set<MediaListenerPtr>::iterator it = media_listeners_.begin();
        media_listeners_.end() != it;
        ++it) {
@@ -93,6 +97,11 @@ void SocketStreamerAdapter::StopActivity(int32_t application_key) {
   is_ready_ = false;
   current_application_ = 0;
 
+  if (streamer_) {
+    streamer_->stop();
+    messages_.Shutdown();
+  }
+
   for (std::set<MediaListenerPtr>::iterator it = media_listeners_.begin();
        media_listeners_.end() != it;
        ++it) {
@@ -108,7 +117,8 @@ bool SocketStreamerAdapter::is_app_performing_activity(
 void SocketStreamerAdapter::Init() {
   if (!thread_) {
     LOG4CXX_INFO(logger, "Create and start sending thread");
-    thread_ = new threads::Thread("PipeStreamerAdapter", new Streamer(this));
+    streamer_ = new Streamer(this);
+    thread_ = new threads::Thread("SocketStreamerAdapter", streamer_);
     const size_t kStackSize = 16384;
     thread_->startWithOptions(threads::ThreadOptions(kStackSize));
   }
@@ -159,6 +169,7 @@ void SocketStreamerAdapter::Streamer::threadMain() {
     }
 
     is_client_connected_ = true;
+    is_first_loop_ = true;
     while (is_client_connected_) {
       while (!server_->messages_.empty()) {
         protocol_handler::RawMessagePtr msg = server_->messages_.pop();
@@ -172,20 +183,24 @@ void SocketStreamerAdapter::Streamer::threadMain() {
         ++messsages_for_session;
 
         LOG4CXX_INFO(logger, "Handling map streaming message. This is "
-                     << messsages_for_session << " the message for "
-                     << server_->current_application_);
-        std::set<MediaListenerPtr>::iterator it =
-            server_->media_listeners_.begin();
+            << messsages_for_session << " the message for "
+            << server_->current_application_);
+        std::set<MediaListenerPtr>::iterator it = server_->media_listeners_
+            .begin();
         for (; server_->media_listeners_.end() != it; ++it) {
           (*it)->OnDataReceived(server_->current_application_,
                                 messsages_for_session);
         }
       }
+
+      if (!is_ready()) {
+        LOG4CXX_INFO(logger, "Client disconnected.");
+        stop();
+        break;
+      }
+
       server_->messages_.wait();
-      is_client_connected_ = is_ready();
     }
-
-    stop();
   }
 }
 
@@ -233,7 +248,7 @@ void SocketStreamerAdapter::Streamer::start() {
 }
 
 void SocketStreamerAdapter::Streamer::stop() {
-  is_client_connected_ = false;
+  LOG4CXX_INFO(logger, "SocketStreamerAdapter::Streamer::stop");
   if (!new_socket_fd_) {
     return;
   }
@@ -249,6 +264,7 @@ void SocketStreamerAdapter::Streamer::stop() {
   }
 
   new_socket_fd_ = -1;
+  is_client_connected_ = false;
 }
 
 bool SocketStreamerAdapter::Streamer::is_ready() const {
index e043440..8657957 100644 (file)
@@ -100,6 +100,12 @@ template<typename T, class Q = std::queue<T> > class MessageQueue {
      * shutting down
      */
     void Shutdown();
+
+    /**
+      * \brief Clears queue.
+      */
+    void Reset();
+
   private:
 
     /**
@@ -107,11 +113,12 @@ template<typename T, class Q = std::queue<T> > class MessageQueue {
      */
     Queue queue_;
     volatile bool shutting_down_;
+
     /**
      *\brief Platform specific syncronisation variable
      */
     mutable sync_primitives::Lock queue_lock_;
-    mutable sync_primitives::ConditionalVariable queue_new_items_;
+    sync_primitives::ConditionalVariable queue_new_items_;
 };
 
 template<typename T, class Q> MessageQueue<T, Q>::MessageQueue()
@@ -128,7 +135,7 @@ template<typename T, class Q> MessageQueue<T, Q>::~MessageQueue() {
 
 template<typename T, class Q> void MessageQueue<T, Q>::wait() {
   sync_primitives::AutoLock auto_lock(queue_lock_);
-  while (!shutting_down_ && queue_.empty()) {
+  while ((!shutting_down_) && queue_.empty()) {
     queue_new_items_.Wait(auto_lock);
   }
 }
@@ -178,4 +185,13 @@ template<typename T, class Q> void MessageQueue<T, Q>::Shutdown() {
   queue_new_items_.Broadcast();
 }
 
+template<typename T, class Q> void MessageQueue<T, Q>::Reset() {
+  sync_primitives::AutoLock auto_lock(queue_lock_);
+  shutting_down_ = false;
+  if (!queue_.empty()) {
+    Queue empty_queue;
+    queue_.swap(empty_queue);
+  }
+}
+
 #endif  //  MESSAGE_QUEUE_CLASS