projects
/
profile
/
ivi
/
smartdevicelink.git
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
b812fc5
)
APPLINK-5808
author
Vladislav Smenyuk
<VSmenyuk@luxoft.com>
Thu, 3 Apr 2014 14:14:58 +0000
(07:14 -0700)
committer
Justin Dickow
<jjdickow@gmail.com>
Tue, 8 Jul 2014 22:47:00 +0000
(18:47 -0400)
src/components/media_manager/include/media_manager/socket_streamer_adapter.h
patch
|
blob
|
history
src/components/media_manager/src/socket_streamer_adapter.cc
patch
|
blob
|
history
src/components/utils/include/utils/message_queue.h
patch
|
blob
|
history
diff --git
a/src/components/media_manager/include/media_manager/socket_streamer_adapter.h
b/src/components/media_manager/include/media_manager/socket_streamer_adapter.h
index
899e3ed
..
c900ea5
100644
(file)
--- a/
src/components/media_manager/include/media_manager/socket_streamer_adapter.h
+++ b/
src/components/media_manager/include/media_manager/socket_streamer_adapter.h
@@
-126,6
+126,7
@@
class SocketStreamerAdapter : public MediaAdapterImpl {
int32_t socket_fd_;
bool is_ready_;
threads::Thread* thread_;
int32_t socket_fd_;
bool is_ready_;
threads::Thread* thread_;
+ Streamer* streamer_;
MessageQueue<protocol_handler::RawMessagePtr> messages_;
DISALLOW_COPY_AND_ASSIGN(SocketStreamerAdapter);
MessageQueue<protocol_handler::RawMessagePtr> messages_;
DISALLOW_COPY_AND_ASSIGN(SocketStreamerAdapter);
diff --git
a/src/components/media_manager/src/socket_streamer_adapter.cc
b/src/components/media_manager/src/socket_streamer_adapter.cc
index
65e823d
..
ed4a7cf
100644
(file)
--- a/
src/components/media_manager/src/socket_streamer_adapter.cc
+++ b/
src/components/media_manager/src/socket_streamer_adapter.cc
@@
-53,11
+53,13
@@
SocketStreamerAdapter::SocketStreamerAdapter()
: socket_fd_(0),
is_ready_(false),
messages_(),
: socket_fd_(0),
is_ready_(false),
messages_(),
- thread_(NULL) {
+ thread_(NULL),
+ streamer_(NULL) {
}
SocketStreamerAdapter::~SocketStreamerAdapter() {
thread_->stop();
}
SocketStreamerAdapter::~SocketStreamerAdapter() {
thread_->stop();
+ streamer_ = NULL;
delete thread_;
if (socket_fd_ != -1) {
::close(socket_fd_);
delete thread_;
if (socket_fd_ != -1) {
::close(socket_fd_);
@@
-75,6
+77,8
@@
void SocketStreamerAdapter::StartActivity(int32_t application_key) {
is_ready_ = true;
current_application_ = application_key;
is_ready_ = true;
current_application_ = application_key;
+ messages_.Reset();
+
for (std::set<MediaListenerPtr>::iterator it = media_listeners_.begin();
media_listeners_.end() != it;
++it) {
for (std::set<MediaListenerPtr>::iterator it = media_listeners_.begin();
media_listeners_.end() != it;
++it) {
@@
-93,6
+97,11
@@
void SocketStreamerAdapter::StopActivity(int32_t application_key) {
is_ready_ = false;
current_application_ = 0;
is_ready_ = false;
current_application_ = 0;
+ if (streamer_) {
+ streamer_->stop();
+ messages_.Shutdown();
+ }
+
for (std::set<MediaListenerPtr>::iterator it = media_listeners_.begin();
media_listeners_.end() != it;
++it) {
for (std::set<MediaListenerPtr>::iterator it = media_listeners_.begin();
media_listeners_.end() != it;
++it) {
@@
-108,7
+117,8
@@
bool SocketStreamerAdapter::is_app_performing_activity(
void SocketStreamerAdapter::Init() {
if (!thread_) {
LOG4CXX_INFO(logger, "Create and start sending thread");
void SocketStreamerAdapter::Init() {
if (!thread_) {
LOG4CXX_INFO(logger, "Create and start sending thread");
- thread_ = new threads::Thread("PipeStreamerAdapter", new Streamer(this));
+ streamer_ = new Streamer(this);
+ thread_ = new threads::Thread("SocketStreamerAdapter", streamer_);
const size_t kStackSize = 16384;
thread_->startWithOptions(threads::ThreadOptions(kStackSize));
}
const size_t kStackSize = 16384;
thread_->startWithOptions(threads::ThreadOptions(kStackSize));
}
@@
-159,6
+169,7
@@
void SocketStreamerAdapter::Streamer::threadMain() {
}
is_client_connected_ = true;
}
is_client_connected_ = true;
+ is_first_loop_ = true;
while (is_client_connected_) {
while (!server_->messages_.empty()) {
protocol_handler::RawMessagePtr msg = server_->messages_.pop();
while (is_client_connected_) {
while (!server_->messages_.empty()) {
protocol_handler::RawMessagePtr msg = server_->messages_.pop();
@@
-172,20
+183,24
@@
void SocketStreamerAdapter::Streamer::threadMain() {
++messsages_for_session;
LOG4CXX_INFO(logger, "Handling map streaming message. This is "
++messsages_for_session;
LOG4CXX_INFO(logger, "Handling map streaming message. This is "
-
<< messsages_for_session << " the message for "
-
<< server_->current_application_);
- std::set<MediaListenerPtr>::iterator it =
-
server_->media_listeners_
.begin();
+ << messsages_for_session << " the message for "
+ << server_->current_application_);
+ std::set<MediaListenerPtr>::iterator it =
server_->media_listeners_
+ .begin();
for (; server_->media_listeners_.end() != it; ++it) {
(*it)->OnDataReceived(server_->current_application_,
messsages_for_session);
}
}
for (; server_->media_listeners_.end() != it; ++it) {
(*it)->OnDataReceived(server_->current_application_,
messsages_for_session);
}
}
+
+ if (!is_ready()) {
+ LOG4CXX_INFO(logger, "Client disconnected.");
+ stop();
+ break;
+ }
+
server_->messages_.wait();
server_->messages_.wait();
- is_client_connected_ = is_ready();
}
}
-
- stop();
}
}
}
}
@@
-233,7
+248,7
@@
void SocketStreamerAdapter::Streamer::start() {
}
void SocketStreamerAdapter::Streamer::stop() {
}
void SocketStreamerAdapter::Streamer::stop() {
-
is_client_connected_ = false
;
+
LOG4CXX_INFO(logger, "SocketStreamerAdapter::Streamer::stop")
;
if (!new_socket_fd_) {
return;
}
if (!new_socket_fd_) {
return;
}
@@
-249,6
+264,7
@@
void SocketStreamerAdapter::Streamer::stop() {
}
new_socket_fd_ = -1;
}
new_socket_fd_ = -1;
+ is_client_connected_ = false;
}
bool SocketStreamerAdapter::Streamer::is_ready() const {
}
bool SocketStreamerAdapter::Streamer::is_ready() const {
diff --git
a/src/components/utils/include/utils/message_queue.h
b/src/components/utils/include/utils/message_queue.h
index
e043440
..
8657957
100644
(file)
--- a/
src/components/utils/include/utils/message_queue.h
+++ b/
src/components/utils/include/utils/message_queue.h
@@
-100,6
+100,12
@@
template<typename T, class Q = std::queue<T> > class MessageQueue {
* shutting down
*/
void Shutdown();
* shutting down
*/
void Shutdown();
+
+ /**
+ * \brief Clears queue.
+ */
+ void Reset();
+
private:
/**
private:
/**
@@
-107,11
+113,12
@@
template<typename T, class Q = std::queue<T> > class MessageQueue {
*/
Queue queue_;
volatile bool shutting_down_;
*/
Queue queue_;
volatile bool shutting_down_;
+
/**
*\brief Platform specific syncronisation variable
*/
mutable sync_primitives::Lock queue_lock_;
/**
*\brief Platform specific syncronisation variable
*/
mutable sync_primitives::Lock queue_lock_;
-
mutable
sync_primitives::ConditionalVariable queue_new_items_;
+ sync_primitives::ConditionalVariable queue_new_items_;
};
template<typename T, class Q> MessageQueue<T, Q>::MessageQueue()
};
template<typename T, class Q> MessageQueue<T, Q>::MessageQueue()
@@
-128,7
+135,7
@@
template<typename T, class Q> MessageQueue<T, Q>::~MessageQueue() {
template<typename T, class Q> void MessageQueue<T, Q>::wait() {
sync_primitives::AutoLock auto_lock(queue_lock_);
template<typename T, class Q> void MessageQueue<T, Q>::wait() {
sync_primitives::AutoLock auto_lock(queue_lock_);
- while (
!shutting_down_
&& queue_.empty()) {
+ while (
(!shutting_down_)
&& queue_.empty()) {
queue_new_items_.Wait(auto_lock);
}
}
queue_new_items_.Wait(auto_lock);
}
}
@@
-178,4
+185,13
@@
template<typename T, class Q> void MessageQueue<T, Q>::Shutdown() {
queue_new_items_.Broadcast();
}
queue_new_items_.Broadcast();
}
+template<typename T, class Q> void MessageQueue<T, Q>::Reset() {
+ sync_primitives::AutoLock auto_lock(queue_lock_);
+ shutting_down_ = false;
+ if (!queue_.empty()) {
+ Queue empty_queue;
+ queue_.swap(empty_queue);
+ }
+}
+
#endif // MESSAGE_QUEUE_CLASS
#endif // MESSAGE_QUEUE_CLASS