From fb6fb40273cd2a445072f32b3b1e3e5c47d8a8fd Mon Sep 17 00:00:00 2001 From: Justin Dickow Date: Thu, 17 Jul 2014 13:59:46 -0400 Subject: [PATCH] connection_handler update Authors Alex Kutsan Dmitry Chmerev Newton Kim Signed-off-by: Justin Dickow --- .../include/connection_handler/connection.h | 7 +- .../connection_handler/connection_handler.h | 21 ++++- .../connection_handler/connection_handler_impl.h | 36 +++++++-- .../connection_handler_observer.h | 9 +++ .../include/connection_handler/heartbeat_monitor.h | 7 +- .../connection_handler/src/connection.cc | 44 +++++----- .../src/connection_handler_impl.cc | 93 +++++++++++++++++----- .../connection_handler/src/heartbeat_monitor.cc | 16 +++- 8 files changed, 179 insertions(+), 54 deletions(-) diff --git a/src/components/connection_handler/include/connection_handler/connection.h b/src/components/connection_handler/include/connection_handler/connection.h index a3d28e5..913a63f 100644 --- a/src/components/connection_handler/include/connection_handler/connection.h +++ b/src/components/connection_handler/include/connection_handler/connection.h @@ -133,7 +133,7 @@ class Connection { * \brief Adds session to connection * \return sessionID or -1 in case of issues */ - int32_t AddNewSession(const uint8_t& protocol_version); + int32_t AddNewSession(); /** * \brief Removes session from connection @@ -172,6 +172,11 @@ class Connection { void KeepAlive(uint8_t session_id); /* + * \brief Start heartbeat for specified session + */ + void StartHeartBeat(uint8_t session_id); + + /* * \brief Send heartbeat to mobile app */ void SendHeartBeat(uint8_t session_id); diff --git a/src/components/connection_handler/include/connection_handler/connection_handler.h b/src/components/connection_handler/include/connection_handler/connection_handler.h index cf3b757..e480103 100644 --- a/src/components/connection_handler/include/connection_handler/connection_handler.h +++ b/src/components/connection_handler/include/connection_handler/connection_handler.h @@ -79,11 +79,28 @@ class ConnectionHandler { virtual void CloseConnection(ConnectionHandle connection_handle) = 0; /* + * \brief Return count of session for specified connection + * \param connection_key pair of connection handle and session id + */ + virtual uint32_t GetConnectionSessionsCount(uint32_t connection_key) = 0; + + /* + * Close session associated with the key + */ + virtual void CloseSession(uint32_t key) = 0; + + /* * Close session */ virtual void CloseSession(ConnectionHandle connection_handle, - uint8_t session_id, - const ServiceList& service_list) = 0; + uint8_t session_id) = 0; + + /* + * \brief Start heartbeat for specified session + * + * \param connection_key pair of connection and session id + */ + virtual void StartSessionHeartBeat(uint32_t connection_key) = 0; /* * Send heartbeat to mobile app diff --git a/src/components/connection_handler/include/connection_handler/connection_handler_impl.h b/src/components/connection_handler/include/connection_handler/connection_handler_impl.h index 1cc5b39..72e4908 100644 --- a/src/components/connection_handler/include/connection_handler/connection_handler_impl.h +++ b/src/components/connection_handler/include/connection_handler/connection_handler_impl.h @@ -102,6 +102,14 @@ class ConnectionHandlerImpl : public ConnectionHandler, const std::vector&); /** + * @brief Reaction on event, when new applications are started on device + * and SDL found this application + * + * @param device_handle Unique ID of device with new application list + */ + virtual void OnApplicationListUpdated(DeviceHandle device_handle); + + /** * \brief Available devices list updated. * * Called when device scanning initiated with scanForNewDevices @@ -157,8 +165,7 @@ class ConnectionHandlerImpl : public ConnectionHandler, */ virtual int32_t OnSessionStartedCallback( const transport_manager::ConnectionUID& connection_handle, - const uint8_t& session_id, - const uint8_t& protocol_version, + const uint8_t session_id, const protocol_handler::ServiceType& service_type); /** @@ -172,7 +179,7 @@ class ConnectionHandlerImpl : public ConnectionHandler, */ virtual uint32_t OnSessionEndedCallback( const transport_manager::ConnectionUID& connection_handle, - const uint8_t& session_id, const uint32_t& hashCode, + const uint8_t session_id, const uint32_t& hashCode, const protocol_handler::ServiceType& service_type); /** @@ -239,17 +246,27 @@ class ConnectionHandlerImpl : public ConnectionHandler, virtual void CloseConnection(ConnectionHandle connection_handle) OVERRIDE; /* + * Close session associated with the key + */ + virtual void CloseSession(uint32_t key); + + /* * Function used by HearbeatMonitior to close session on HB timeout * \param connection_handle Connection handler within which session exists * \param session_id Identifier of the session to be ended - * \param ServiceList list of services which associated with session */ virtual void CloseSession(ConnectionHandle connection_handle, - uint8_t session_id, - const ServiceList& service_list); + uint8_t session_id); void SetProtocolHandler(protocol_handler::ProtocolHandler* handler); + + /* + * \brief Return count of session for specified connection + * \param connection_key pair of connection handle and session id + */ + virtual uint32_t GetConnectionSessionsCount(uint32_t connection_key); + /* * Send heartbeat message to mobile app */ @@ -257,6 +274,13 @@ class ConnectionHandlerImpl : public ConnectionHandler, uint8_t session_id); /* + * \brief Start heartbeat for specified session + * + * \param connection_key pair of connection and session id + */ + virtual void StartSessionHeartBeat(uint32_t connection_key); + + /* * Keep connection associated with the key from being closed by heartbeat monitor */ void KeepConnectionAlive(uint32_t connection_key, uint8_t session_id); diff --git a/src/components/connection_handler/include/connection_handler/connection_handler_observer.h b/src/components/connection_handler/include/connection_handler/connection_handler_observer.h index 6143113..b3d6b91 100644 --- a/src/components/connection_handler/include/connection_handler/connection_handler_observer.h +++ b/src/components/connection_handler/include/connection_handler/connection_handler_observer.h @@ -64,6 +64,15 @@ class ConnectionHandlerObserver { const connection_handler::DeviceList& device_list) = 0; /** + * @brief Reaction on event, when new applications are started on device + * and SDL found this application + * + * @param device_handle Unique ID of device with new application list + */ + virtual void OnApplicationListUpdated( + const connection_handler::DeviceHandle& device_handle) = 0; + + /** * \brief Removes device. * * Called when device has been removed from a list. diff --git a/src/components/connection_handler/include/connection_handler/heartbeat_monitor.h b/src/components/connection_handler/include/connection_handler/heartbeat_monitor.h index a7380b0..550f208 100644 --- a/src/components/connection_handler/include/connection_handler/heartbeat_monitor.h +++ b/src/components/connection_handler/include/connection_handler/heartbeat_monitor.h @@ -58,7 +58,12 @@ class HeartBeatMonitor: public threads::ThreadDelegate { */ virtual void threadMain(); - void AddSession(uint8_t session_id); + /** + * \brief add new session + * + * \return true if first session with heartbeat added + */ + bool AddSession(uint8_t session_id); void RemoveSession(uint8_t session_id); /* diff --git a/src/components/connection_handler/src/connection.cc b/src/components/connection_handler/src/connection.cc index 6bc3236..1cf0971 100644 --- a/src/components/connection_handler/src/connection.cc +++ b/src/components/connection_handler/src/connection.cc @@ -66,7 +66,7 @@ Connection::~Connection() { delete heart_beat_monitor_thread_; } -int32_t Connection::AddNewSession(const uint8_t& protocol_version) { +int32_t Connection::AddNewSession() { sync_primitives::AutoLock lock(session_map_lock_); int32_t result = -1; @@ -75,24 +75,16 @@ int32_t Connection::AddNewSession(const uint8_t& protocol_version) { int32_t size = session_map_.size(); if (max_connections > size) { - ++size; - /* whenever new session created RPC and Bulk services are - established automatically */ - session_map_[size].push_back( - static_cast(protocol_handler::kRpc)); - session_map_[size].push_back( - static_cast(protocol_handler::kBulk)); - - if (protocol_handler::PROTOCOL_VERSION_3 == protocol_version) { - heartbeat_monitor_->AddSession(size); - - // start monitoring thread when first session with heartbeat added - if (1 == size) { - heart_beat_monitor_thread_->start(); - } - } - - result = size; + for (uint8_t session_id = 1; session_id <= 255; ++session_id) { + if (session_map_.end() == session_map_.find(session_id)) { + /* whenever new session created RPC and Bulk services are + established automatically */ + session_map_[session_id].push_back(protocol_handler::kRpc); + session_map_[session_id].push_back(protocol_handler::kBulk); + result = session_id; + break; + } + } } return result; @@ -176,7 +168,6 @@ const SessionMap Connection::session_map() const { void Connection::CloseSession(uint8_t session_id) { size_t size; - ServiceList service_list; { sync_primitives::AutoLock lock(session_map_lock_); @@ -187,18 +178,25 @@ void Connection::CloseSession(uint8_t session_id) { } size = session_map_.size(); - service_list = session_map_[session_id]; } //Close connection if it is last session if (1 == size) { connection_handler_->CloseConnection(connection_handle_); } else { - connection_handler_->CloseSession(connection_handle_, session_id, - service_list); + connection_handler_->CloseSession(connection_handle_, session_id); } } +void Connection::StartHeartBeat(uint8_t session_id) { + bool is_first_session = heartbeat_monitor_->AddSession(session_id); + + // start monitoring thread when first session with heartbeat added + if (is_first_session) { + heart_beat_monitor_thread_->start(); + } +} + void Connection::SendHeartBeat(uint8_t session_id) { connection_handler_->SendHeartBeat(connection_handle_, session_id); } diff --git a/src/components/connection_handler/src/connection_handler_impl.cc b/src/components/connection_handler/src/connection_handler_impl.cc index f8800c7..1d095c0 100644 --- a/src/components/connection_handler/src/connection_handler_impl.cc +++ b/src/components/connection_handler/src/connection_handler_impl.cc @@ -78,16 +78,23 @@ ConnectionHandlerImpl::~ConnectionHandlerImpl() { void ConnectionHandlerImpl::set_connection_handler_observer( ConnectionHandlerObserver* observer) { LOG4CXX_INFO(logger_, "ConnectionHandlerImpl::set_connection_handler_observer()"); - if (!observer) { - LOG4CXX_ERROR(logger_, "Null pointer to observer."); - return; - } connection_handler_observer_ = observer; } void ConnectionHandlerImpl::OnDeviceListUpdated( const std::vector& device_info_list) { LOG4CXX_INFO(logger_, "ConnectionHandlerImpl::OnDeviceListUpdated()"); + if (connection_handler_observer_) { + connection_handler_observer_->OnDeviceListUpdated(device_list_); + } +} + +void ConnectionHandlerImpl::OnApplicationListUpdated(DeviceHandle device_handle) { + LOG4CXX_DEBUG(logger_,"ConnectionHandlerImpl::OnApplicationListUpdated() device_handle " + << device_handle); + if (connection_handler_observer_) { + connection_handler_observer_->OnApplicationListUpdated(device_handle); + } } void ConnectionHandlerImpl::OnDeviceFound( @@ -206,8 +213,7 @@ void ConnectionHandlerImpl::RemoveConnection( int32_t ConnectionHandlerImpl::OnSessionStartedCallback( const transport_manager::ConnectionUID& connection_handle, - const uint8_t& sessionId, - const uint8_t& protocol_version, + const uint8_t sessionId, const protocol_handler::ServiceType& service_type) { LOG4CXX_INFO(logger_, "ConnectionHandlerImpl::OnSessionStartedCallback()"); @@ -221,7 +227,7 @@ void ConnectionHandlerImpl::RemoveConnection( } if ((0 == sessionId) && (protocol_handler::kRpc == service_type)) { - new_session_id = (it->second)->AddNewSession(protocol_version); + new_session_id = (it->second)->AddNewSession(); if (0 > new_session_id) { LOG4CXX_ERROR(logger_, "Not possible to start session!"); return -1; @@ -256,7 +262,7 @@ void ConnectionHandlerImpl::RemoveConnection( } uint32_t ConnectionHandlerImpl::OnSessionEndedCallback( - const uint32_t& connection_handle, const uint8_t& sessionId, + const uint32_t& connection_handle, const uint8_t sessionId, const uint32_t& hashCode, const protocol_handler::ServiceType& service_type) { LOG4CXX_INFO(logger_, "ConnectionHandlerImpl::OnSessionEndedCallback()"); @@ -400,7 +406,10 @@ int32_t ConnectionHandlerImpl::GetDataOnDeviceID( const SessionMap& session_map = (itr->second)->session_map(); for (SessionMapConstIterator session_it = session_map.begin(), end = session_map.end(); session_it != end; ++session_it) { - applications_list->push_back(it->first); + const transport_manager::ConnectionUID& connection_handle = itr->first; + const uint32_t session_id = session_it->first; + uint32_t session_key = KeyFromPair(connection_handle, session_id); //application_id + applications_list->push_back(session_key); } } } @@ -492,24 +501,56 @@ void ConnectionHandlerImpl::CloseConnection(ConnectionHandle connection_handle) transport_manager_->DisconnectForce(connection_uid); } -void ConnectionHandlerImpl::CloseSession(ConnectionHandle connection_handle, - uint8_t session_id, - const ServiceList& service_list) { - if (0 != connection_handler_observer_) { - ServiceListConstIterator it = service_list.begin(); - for (;it != service_list.end(); ++it) { - connection_handler_observer_->OnServiceEndedCallback( - session_id, static_cast(*it)); - } +uint32_t ConnectionHandlerImpl::GetConnectionSessionsCount( + uint32_t connection_key) { + uint32_t connection_handle = 0; + uint8_t session_id = 0; + PairFromKey(connection_key, &connection_handle, &session_id); + + sync_primitives::AutoLock lock(connection_list_lock_); + ConnectionListIterator itr = connection_list_.find(connection_handle); + + if (connection_list_.end() != itr) { + return itr->second->session_map().size(); } + return 0; +} + +void ConnectionHandlerImpl::CloseSession(uint32_t key) { + LOG4CXX_INFO(logger_, "ConnectionHandlerImpl::CloseSession"); + + uint32_t connection_handle = 0; + uint8_t session_id = 0; + PairFromKey(key, &connection_handle, &session_id); + + CloseSession(connection_handle, session_id); +} + +void ConnectionHandlerImpl::CloseSession(ConnectionHandle connection_handle, + uint8_t session_id) { + protocol_handler_->SendEndSession(connection_handle, session_id); + transport_manager::ConnectionUID connection_id = ConnectionUIDFromHandle(connection_handle); - sync_primitives::AutoLock lock(connection_list_lock_); - ConnectionListIterator itr = connection_list_.find(connection_id); + sync_primitives::AutoLock connection_list_lock(connection_list_lock_); + ConnectionListIterator itr = connection_list_.find(connection_id); if (connection_list_.end() != itr) { + if (0 != connection_handler_observer_) { + SessionMap session_map = itr->second->session_map(); + SessionMapIterator session_it = session_map.find(session_id); + if (session_it == session_map.end()) { + ServiceList service_list = session_it->second; + ServiceListConstIterator it = service_list.begin(); + for (;it != service_list.end(); ++it) { + connection_handler_observer_->OnServiceEndedCallback( + session_id, static_cast(*it)); + } + } + } + itr->second->RemoveSession(session_id); } } @@ -519,6 +560,18 @@ void ConnectionHandlerImpl::SetProtocolHandler( protocol_handler_ = handler; } +void ConnectionHandlerImpl::StartSessionHeartBeat(uint32_t connection_key) { + uint32_t connection_handle = 0; + uint8_t session_id = 0; + PairFromKey(connection_key, &connection_handle, &session_id); + + sync_primitives::AutoLock lock(connection_list_lock_); + ConnectionListIterator it = connection_list_.find(connection_handle); + if (connection_list_.end() != it) { + it->second->StartHeartBeat(session_id); + } +} + void ConnectionHandlerImpl::SendHeartBeat(ConnectionHandle connection_handle, uint8_t session_id) { diff --git a/src/components/connection_handler/src/heartbeat_monitor.cc b/src/components/connection_handler/src/heartbeat_monitor.cc index b0338d4..7a91f1e 100644 --- a/src/components/connection_handler/src/heartbeat_monitor.cc +++ b/src/components/connection_handler/src/heartbeat_monitor.cc @@ -90,18 +90,32 @@ void HeartBeatMonitor::threadMain() { } } -void HeartBeatMonitor::AddSession(uint8_t session_id) { +bool HeartBeatMonitor::AddSession(uint8_t session_id) { LOG4CXX_INFO(logger_, "Add session with id" << static_cast(session_id)); AutoLock auto_lock(sessions_list_lock_); + if (sessions_.end() != sessions_.find(session_id)) { + return false; + } + SessionState session_state; session_state.heartbeat_expiration_ = date_time::DateTime::getCurrentTime(); session_state.heartbeat_expiration_.tv_sec += heartbeat_timeout_seconds_; session_state.is_heartbeat_sent_ = false; sessions_[session_id] = session_state; + + LOG4CXX_INFO(logger_, "Start heartbeat for session: " << + static_cast(session_id)); + + //first session added, so we need to start monitoring thread + if (1 == sessions_.size()) { + return true; + } + + return false; } void HeartBeatMonitor::RemoveSession(uint8_t session_id) { -- 2.7.4