: socket_fd_(0),
is_ready_(false),
messages_(),
- thread_(NULL) {
+ thread_(NULL),
+ streamer_(NULL) {
}
SocketStreamerAdapter::~SocketStreamerAdapter() {
thread_->stop();
+ streamer_ = NULL;
delete thread_;
if (socket_fd_ != -1) {
::close(socket_fd_);
is_ready_ = true;
current_application_ = application_key;
+ messages_.Reset();
+
for (std::set<MediaListenerPtr>::iterator it = media_listeners_.begin();
media_listeners_.end() != it;
++it) {
is_ready_ = false;
current_application_ = 0;
+ if (streamer_) {
+ streamer_->stop();
+ messages_.Shutdown();
+ }
+
for (std::set<MediaListenerPtr>::iterator it = media_listeners_.begin();
media_listeners_.end() != it;
++it) {
void SocketStreamerAdapter::Init() {
if (!thread_) {
LOG4CXX_INFO(logger, "Create and start sending thread");
- thread_ = new threads::Thread("PipeStreamerAdapter", new Streamer(this));
+ streamer_ = new Streamer(this);
+ thread_ = new threads::Thread("SocketStreamerAdapter", streamer_);
const size_t kStackSize = 16384;
thread_->startWithOptions(threads::ThreadOptions(kStackSize));
}
}
is_client_connected_ = true;
+ is_first_loop_ = true;
while (is_client_connected_) {
while (!server_->messages_.empty()) {
protocol_handler::RawMessagePtr msg = server_->messages_.pop();
++messsages_for_session;
LOG4CXX_INFO(logger, "Handling map streaming message. This is "
- << messsages_for_session << " the message for "
- << server_->current_application_);
- std::set<MediaListenerPtr>::iterator it =
- server_->media_listeners_.begin();
+ << messsages_for_session << " the message for "
+ << server_->current_application_);
+ std::set<MediaListenerPtr>::iterator it = server_->media_listeners_
+ .begin();
for (; server_->media_listeners_.end() != it; ++it) {
(*it)->OnDataReceived(server_->current_application_,
messsages_for_session);
}
}
+
+ if (!is_ready()) {
+ LOG4CXX_INFO(logger, "Client disconnected.");
+ stop();
+ break;
+ }
+
server_->messages_.wait();
- is_client_connected_ = is_ready();
}
-
- stop();
}
}
}
void SocketStreamerAdapter::Streamer::stop() {
- is_client_connected_ = false;
+ LOG4CXX_INFO(logger, "SocketStreamerAdapter::Streamer::stop");
if (!new_socket_fd_) {
return;
}
}
new_socket_fd_ = -1;
+ is_client_connected_ = false;
}
bool SocketStreamerAdapter::Streamer::is_ready() const {
* shutting down
*/
void Shutdown();
+
+ /**
+ * \brief Clears queue.
+ */
+ void Reset();
+
private:
/**
*/
Queue queue_;
volatile bool shutting_down_;
+
/**
*\brief Platform specific syncronisation variable
*/
mutable sync_primitives::Lock queue_lock_;
- mutable sync_primitives::ConditionalVariable queue_new_items_;
+ sync_primitives::ConditionalVariable queue_new_items_;
};
template<typename T, class Q> MessageQueue<T, Q>::MessageQueue()
template<typename T, class Q> void MessageQueue<T, Q>::wait() {
sync_primitives::AutoLock auto_lock(queue_lock_);
- while (!shutting_down_ && queue_.empty()) {
+ while ((!shutting_down_) && queue_.empty()) {
queue_new_items_.Wait(auto_lock);
}
}
queue_new_items_.Broadcast();
}
+template<typename T, class Q> void MessageQueue<T, Q>::Reset() {
+ sync_primitives::AutoLock auto_lock(queue_lock_);
+ shutting_down_ = false;
+ if (!queue_.empty()) {
+ Queue empty_queue;
+ queue_.swap(empty_queue);
+ }
+}
+
#endif // MESSAGE_QUEUE_CLASS