Connection Handler Component
authorJustin Dickow <jjdickow@gmail.com>
Tue, 15 Jul 2014 15:41:47 +0000 (11:41 -0400)
committerJustin Dickow <jjdickow@gmail.com>
Tue, 15 Jul 2014 15:41:47 +0000 (11:41 -0400)
Signed-off-by: Justin Dickow <jjdickow@gmail.com>
src/components/connection_handler/include/connection_handler/connection.h
src/components/connection_handler/include/connection_handler/connection_handler.h
src/components/connection_handler/include/connection_handler/connection_handler_impl.h
src/components/connection_handler/include/connection_handler/device.h
src/components/connection_handler/include/connection_handler/heartbeat_monitor.h
src/components/connection_handler/src/connection.cc
src/components/connection_handler/src/connection_handler_impl.cc
src/components/connection_handler/src/device.cc
src/components/connection_handler/src/heartbeat_monitor.cc

index df95460..a3d28e5 100644 (file)
@@ -40,7 +40,6 @@
 #include <map>
 #include <vector>
 
-#include "utils/logger.h"
 #include "utils/lock.h"
 #include "connection_handler/device.h"
 #include "connection_handler/heartbeat_monitor.h"
@@ -100,7 +99,7 @@ typedef std::map<uint8_t, ServiceList>::iterator SessionMapIterator;
 typedef std::map<uint8_t, ServiceList>::const_iterator SessionMapConstIterator;
 
 /**
- * \class Connection
+ * \class Connection
  * \brief Connection class
  */
 class Connection {
@@ -134,7 +133,7 @@ class Connection {
    * \brief Adds session to connection
    * \return sessionID or -1 in case of issues
    */
-  int32_t AddNewSession();
+  int32_t AddNewSession(const uint8_t& protocol_version);
 
   /**
    * \brief Removes session from connection
@@ -163,14 +162,19 @@ class Connection {
   const SessionMap session_map() const;
 
   /*
-   * \brief Close this connection and all associated sessions
+   * \brief Close session
    */
-  void Close();
+  void CloseSession(uint8_t session_id);
 
   /*
-   * \brief Prevent this connection from being closed by heartbeat timeout
+   * \brief Prevent session from being closed by heartbeat timeout
    */
-  void KeepAlive();
+  void KeepAlive(uint8_t session_id);
+
+  /*
+   * \brief Send heartbeat to  mobile app
+   */
+  void SendHeartBeat(uint8_t session_id);
 
  private:
   ConnectionHandler* connection_handler_;
@@ -195,14 +199,9 @@ class Connection {
   /*
    * \brief monitor that closes connection if there is no traffic over it
    */
-  HeartBeatMonitor heartbeat_monitor_;
+  HeartBeatMonitor* heartbeat_monitor_;
 
-  /**
-   * \brief For logging.
-   */
-#ifdef ENABLE_LOG
-  static log4cxx::LoggerPtr logger_;
-#endif // ENABLE_LOG
+  threads::Thread* heart_beat_monitor_thread_;
 };
 
 }/* namespace connection_handler */
index ca061bc..cf3b757 100644 (file)
@@ -78,6 +78,19 @@ class ConnectionHandler {
    */
   virtual void CloseConnection(ConnectionHandle connection_handle) = 0;
 
+  /*
+   * Close session
+   */
+  virtual void CloseSession(ConnectionHandle connection_handle,
+                            uint8_t session_id,
+                            const ServiceList& service_list) = 0;
+
+  /*
+   * Send heartbeat to mobile app
+   */
+  virtual void SendHeartBeat(ConnectionHandle connection_handle,
+                            uint8_t session_id) = 0;
+
 
  protected:
   /**
index 45f7502..1cc5b39 100644 (file)
@@ -151,19 +151,20 @@ class ConnectionHandlerImpl : public ConnectionHandler,
     /**
      * \brief Callback function used by ProtocolHandler
      * when Mobile Application initiates start of new session.
-     * \param connection_handle Connection identifier whithin which session has to be started.
+     * \param connection_handle Connection identifier within which session has to be started.
      * \param sessionId Identifier of the session to be started
      * \return int32_t Id (number) of new session if successful otherwise -1.
      */
     virtual int32_t OnSessionStartedCallback(
       const transport_manager::ConnectionUID& connection_handle,
       const uint8_t& session_id,
+      const uint8_t& protocol_version,
       const protocol_handler::ServiceType& service_type);
 
     /**
      * \brief Callback function used by ProtocolHandler
      * when Mobile Application initiates session ending.
-     * \param connection_handle Connection identifier whithin which session exists
+     * \param connection_handle Connection identifier within which session exists
      * \param sessionId Identifier of the session to be ended
      * \param hashCode Hash used only in second version of SmartDeviceLink protocol.
      * If not equal to hash assigned to session on start then operation fails.
@@ -178,7 +179,7 @@ class ConnectionHandlerImpl : public ConnectionHandler,
      * \brief Creates unique identifier of session (can be used as hash)
      * from given connection identifier
      * whithin which session exists and session number.
-     * \param  connection_handle Connection identifier whithin which session exists
+     * \param  connection_handle Connection identifier within which session exists
      * \param sessionId Identifier of the session
      * \return int32_t Unique key for session
      */
@@ -223,7 +224,7 @@ class ConnectionHandlerImpl : public ConnectionHandler,
 
 
     /**
-     * \brief Method which should start devices discoveryng
+     * \brief Method which should start devices discovering
      */
     virtual void StartDevicesDiscovery();
 
@@ -238,9 +239,27 @@ class ConnectionHandlerImpl : public ConnectionHandler,
     virtual void CloseConnection(ConnectionHandle connection_handle) OVERRIDE;
 
     /*
+     * Function used by HearbeatMonitior to close session on HB timeout
+     * \param connection_handle Connection handler within which session exists
+     * \param session_id Identifier of the session to be ended
+     * \param ServiceList list of services which associated with session
+     */
+    virtual void CloseSession(ConnectionHandle connection_handle,
+                              uint8_t session_id,
+                              const ServiceList& service_list);
+
+    void SetProtocolHandler(protocol_handler::ProtocolHandler* handler);
+
+    /*
+     * Send heartbeat message to mobile app
+     */
+    virtual void SendHeartBeat(ConnectionHandle connection_handle,
+                               uint8_t session_id);
+
+    /*
      * Keep connection associated with the key from being closed by heartbeat monitor
      */
-    void KeepConnectionAlive(uint32_t connection_key);
+    void KeepConnectionAlive(uint32_t connection_key, uint8_t session_id);
   private:
     /**
      * \brief Default class constructor
@@ -288,14 +307,9 @@ class ConnectionHandlerImpl : public ConnectionHandler,
      */
     utils::StlMapDeleter<ConnectionList> connection_list_deleter_;
 
-    /**
-     *\brief For logging.
-     */
-#ifdef ENABLE_LOG
-    static log4cxx::LoggerPtr logger_;
-#endif // ENABLE_LOG
-    DISALLOW_COPY_AND_ASSIGN(ConnectionHandlerImpl);
+    protocol_handler::ProtocolHandler* protocol_handler_;
 
+    DISALLOW_COPY_AND_ASSIGN(ConnectionHandlerImpl);
     FRIEND_BASE_SINGLETON_CLASS(ConnectionHandlerImpl);
 };
 }/* namespace connection_handler */
index 355d72a..99b3e7b 100644 (file)
@@ -40,7 +40,6 @@
 #include <string>
 #include <map>
 #include <vector>
-#include "utils/logger.h"
 
 /**
  * \namespace connection_handler
@@ -55,11 +54,11 @@ typedef uint32_t DeviceHandle;
 typedef std::vector<int32_t> AppList;
 
 /**
- * \class Device
+ * \class Device
  * \brief Connection class
  */
 class Device {
-  public:
+public:
     /**
      * \brief Class constructor
      */
@@ -83,9 +82,13 @@ class Device {
      */
     std::string user_friendly_name() const;
 
+    /**
+        *\brief Also should be used for iOS USB connections
+        *(better not know why this same method)
+        */
     std::string mac_address() const;
 
-  private:
+private:
     /**
      * \brief Uniq device handle.
      */
@@ -101,12 +104,6 @@ class Device {
      */
     std::string mac_address_;
 
-    /**
-     * \brief For logging.
-     */
-#ifdef ENABLE_LOG
-    static log4cxx::LoggerPtr logger_;
-#endif // ENABLE_LOG
 };
 
 /**
index b43f0e9..a7380b0 100644 (file)
 #include <stdint.h>
 #include <map>
 
-#include "utils/timer_thread.h"
-#include "utils/threads/thread_validator.h"
+#include "utils/threads/thread.h"
+#include "utils/threads/thread_delegate.h"
+#include "utils/date_time.h"
 #include "utils/macro.h"
+#include "utils/lock.h"
 
 namespace connection_handler {
 
 class Connection;
 
 /*
- * Starts timer and when it elapses closes associated connection
+ * Starts hearbeat timer for session and when it elapses closes it
  */
-class HeartBeatMonitor: public threads::SingleThreadSimpleValidator {
+class HeartBeatMonitor: public threads::ThreadDelegate {
  public:
   HeartBeatMonitor(int32_t heartbeat_timeout_seconds,
                    Connection* connection);
   ~HeartBeatMonitor();
+
+  /**
+   * Thread procedure.
+   */
+  virtual void threadMain();
+
+  void AddSession(uint8_t session_id);
+  void RemoveSession(uint8_t session_id);
+
   /*
-   * \brief Starts connection monitoring.
-   * Should be called when first session was opened.
+   * Resets timer preventing session from being killed
    */
-  void BeginMonitoring();
+  void KeepAlive(uint8_t session_id);
+
   /*
-   * Resets keepalive timer preventing connection from being killed
+   * Thread exit procedure.
    */
-  void KeepAlive();
- private:
-  void TimeOut();
+  virtual bool exitThreadMain();
+
  private:
   // \brief Heartbeat timeout, should be read from profile
   const int32_t heartbeat_timeout_seconds_;
   // \brief Connection that must be closed when timeout elapsed
   Connection* connection_;
-  timer::TimerThread<HeartBeatMonitor> timer_;
+
+  static const int32_t kdefault_cycle_timeout = 1000000;
+
+  struct SessionState {
+    TimevalStruct heartbeat_expiration_;
+    bool is_heartbeat_sent_;
+  };
+
+  // \brief monitored sessions collection
+  std::map<uint8_t, SessionState> sessions_;
+
+  sync_primitives::Lock sessions_list_lock_;
+
+  volatile bool stop_flag_;
+
   DISALLOW_COPY_AND_ASSIGN(HeartBeatMonitor);
 };
 
index 38596d6..6bc3236 100644 (file)
@@ -1,8 +1,5 @@
 /**
- * \file Connection.cpp
- * \brief Connection class implementation.
- *
- * Copyright (c) 2013, Ford Motor Company
+ * Copyright (c) 2014, Ford Motor Company
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -37,6 +34,8 @@
 
 #include "connection_handler/connection.h"
 #include "connection_handler/connection_handler.h"
+#include "protocol_handler/protocol_packet.h"
+#include "utils/logger.h"
 #include "utils/macro.h"
 
 /**
  */
 namespace connection_handler {
 
-#ifdef ENABLE_LOG
-log4cxx::LoggerPtr Connection::logger_ = log4cxx::LoggerPtr(
-    log4cxx::Logger::getLogger("ConnectionHandler"));
-#endif // ENABLE_LOG
+CREATE_LOGGERPTR_GLOBAL(logger_, "ConnectionHandler")
 
 Connection::Connection(ConnectionHandle connection_handle,
                        DeviceHandle connection_device_handle,
@@ -56,24 +52,25 @@ Connection::Connection(ConnectionHandle connection_handle,
                        int32_t heartbeat_timeout)
     : connection_handler_(connection_handler),
       connection_handle_(connection_handle),
-      connection_device_handle_(connection_device_handle),
-      heartbeat_monitor_(heartbeat_timeout, this) {
+      connection_device_handle_(connection_device_handle) {
   DCHECK(connection_handler_);
+
+  heartbeat_monitor_ = new HeartBeatMonitor(heartbeat_timeout, this);
+  heart_beat_monitor_thread_ = new threads::Thread("HeartBeatMonitorThread",
+                                                   heartbeat_monitor_);
 }
 
 Connection::~Connection() {
   session_map_.clear();
+  heart_beat_monitor_thread_->stop();
+  delete heart_beat_monitor_thread_;
 }
 
-int32_t Connection::AddNewSession() {
+int32_t Connection::AddNewSession(const uint8_t& protocol_version) {
   sync_primitives::AutoLock lock(session_map_lock_);
 
   int32_t result = -1;
 
-  if (session_map_.empty()) {
-    heartbeat_monitor_.BeginMonitoring();
-  }
-
   const uint8_t max_connections = 255;
   int32_t size = session_map_.size();
   if (max_connections > size) {
@@ -86,6 +83,15 @@ int32_t Connection::AddNewSession() {
     session_map_[size].push_back(
         static_cast<uint8_t>(protocol_handler::kBulk));
 
+    if (protocol_handler::PROTOCOL_VERSION_3 == protocol_version) {
+      heartbeat_monitor_->AddSession(size);
+
+      // start monitoring thread when first session with heartbeat added
+      if (1 == size) {
+        heart_beat_monitor_thread_->start();
+      }
+    }
+
     result = size;
   }
 
@@ -99,6 +105,7 @@ int32_t Connection::RemoveSession(uint8_t session) {
   if (session_map_.end() == it) {
     LOG4CXX_ERROR(logger_, "Session not found in this connection!");
   } else {
+    heartbeat_monitor_->RemoveSession(session);
     session_map_.erase(session);
     result = session;
   }
@@ -119,7 +126,8 @@ bool Connection::AddNewService(uint8_t session, uint8_t service) {
   ServiceListIterator service_it = find(session_it->second.begin(),
                                         session_it->second.end(), service);
   if (service_it != session_it->second.end()) {
-    LOG4CXX_ERROR(logger_, "Session " << session << " already established"
+    LOG4CXX_ERROR(logger_, "Session " << static_cast<int32_t>(session) <<
+                  " already established"
                   " service " << service);
   } else {
     session_it->second.push_back(service);
@@ -145,7 +153,8 @@ bool Connection::RemoveService(uint8_t session, uint8_t service) {
     session_it->second.erase(service_it);
     result = true;
   } else {
-    LOG4CXX_ERROR(logger_, "Session " << session << " didn't established"
+    LOG4CXX_ERROR(logger_, "Session " << static_cast<int32_t>(session) <<
+                  " didn't established"
                   " service " << service);
   }
 
@@ -165,12 +174,37 @@ const SessionMap Connection::session_map() const {
   return session_map_;
 }
 
-void Connection::Close() {
-  connection_handler_->CloseConnection(connection_handle_);
+void Connection::CloseSession(uint8_t session_id) {
+  size_t size;
+  ServiceList service_list;
+
+  {
+    sync_primitives::AutoLock lock(session_map_lock_);
+
+    SessionMapIterator session_it = session_map_.find(session_id);
+    if (session_it == session_map_.end()) {
+      return;
+    }
+
+    size = session_map_.size();
+    service_list = session_map_[session_id];
+  }
+
+  //Close connection if it is last session
+  if (1 == size) {
+    connection_handler_->CloseConnection(connection_handle_);
+  } else {
+    connection_handler_->CloseSession(connection_handle_, session_id,
+                                      service_list);
+  }
+}
+
+void Connection::SendHeartBeat(uint8_t session_id) {
+  connection_handler_->SendHeartBeat(connection_handle_, session_id);
 }
 
-void Connection::KeepAlive() {
-  heartbeat_monitor_.KeepAlive();
+void Connection::KeepAlive(uint8_t session_id) {
+  heartbeat_monitor_->KeepAlive(session_id);
 }
 
 }/* namespace connection_handler */
index 7be9d97..f8800c7 100644 (file)
@@ -46,7 +46,7 @@ namespace {
 int32_t HeartBeatTimeout() {
   return profile::Profile::instance()->heart_beat_timeout();
 }
-}
+}  // namespace
 
 /**
  * \namespace connection_handler
@@ -54,6 +54,8 @@ int32_t HeartBeatTimeout() {
  */
 namespace connection_handler {
 
+CREATE_LOGGERPTR_GLOBAL(logger_, "ConnectionHandler")
+
 ConnectionHandle HandleFromConnectionUID(transport_manager::ConnectionUID uid) {
   return ConnectionHandle(uid);
 }
@@ -62,15 +64,11 @@ transport_manager::ConnectionUID ConnectionUIDFromHandle(ConnectionHandle handle
   return transport_manager::ConnectionUID(handle);
 }
 
-#ifdef ENABLE_LOG
-log4cxx::LoggerPtr ConnectionHandlerImpl::logger_ = log4cxx::LoggerPtr(
-    log4cxx::Logger::getLogger("ConnectionHandler"));
-#endif // ENABLE_LOG
-
 ConnectionHandlerImpl::ConnectionHandlerImpl()
   : connection_handler_observer_(NULL),
     transport_manager_(NULL),
-    connection_list_deleter_(&connection_list_){
+    connection_list_deleter_(&connection_list_),
+    protocol_handler_(NULL) {
 }
 
 ConnectionHandlerImpl::~ConnectionHandlerImpl() {
@@ -209,6 +207,7 @@ void ConnectionHandlerImpl::RemoveConnection(
  int32_t ConnectionHandlerImpl::OnSessionStartedCallback(
   const transport_manager::ConnectionUID& connection_handle,
   const uint8_t& sessionId,
+  const uint8_t& protocol_version,
   const protocol_handler::ServiceType& service_type) {
   LOG4CXX_INFO(logger_, "ConnectionHandlerImpl::OnSessionStartedCallback()");
 
@@ -222,7 +221,7 @@ void ConnectionHandlerImpl::RemoveConnection(
   }
 
   if ((0 == sessionId) && (protocol_handler::kRpc == service_type)) {
-    new_session_id = (it->second)->AddNewSession();
+    new_session_id = (it->second)->AddNewSession(protocol_version);
     if (0 > new_session_id) {
       LOG4CXX_ERROR(logger_, "Not possible to start session!");
       return -1;
@@ -287,10 +286,10 @@ uint32_t ConnectionHandlerImpl::OnSessionEndedCallback(
   }
 
   if (0 != connection_handler_observer_) {
-    int32_t sessionKey = KeyFromPair(connection_handle, sessionId);
-    connection_handler_observer_->OnServiceEndedCallback(sessionKey,
+    int32_t session_key = KeyFromPair(connection_handle, sessionId);
+    connection_handler_observer_->OnServiceEndedCallback(session_key,
                                                          service_type);
-    result = sessionKey;
+    result = session_key;
   }
 
   return result;
@@ -433,7 +432,8 @@ void ConnectionHandlerImpl::StartDevicesDiscovery() {
     LOG4CXX_ERROR(logger_, "Null pointer to TransportManager.");
     return;
   }
-  //transport_manager_->SearchDevices();
+
+  transport_manager_->SearchDevices();
   if (connection_handler_observer_) {
     connection_handler_observer_->OnDeviceListUpdated(device_list_);
   }
@@ -489,18 +489,57 @@ void ConnectionHandlerImpl::CloseConnection(ConnectionHandle connection_handle)
   }
   transport_manager::ConnectionUID connection_uid =
       ConnectionUIDFromHandle(connection_handle);
-  transport_manager_->Disconnect(connection_uid);
+  transport_manager_->DisconnectForce(connection_uid);
 }
 
-void ConnectionHandlerImpl::KeepConnectionAlive(uint32_t connection_key) {
+void ConnectionHandlerImpl::CloseSession(ConnectionHandle connection_handle,
+                                         uint8_t session_id,
+                                         const ServiceList& service_list) {
+  if (0 != connection_handler_observer_) {
+    ServiceListConstIterator it = service_list.begin();
+    for (;it != service_list.end(); ++it) {
+      connection_handler_observer_->OnServiceEndedCallback(
+          session_id, static_cast<protocol_handler::ServiceType>(*it));
+    }
+  }
+
+  transport_manager::ConnectionUID connection_id =
+        ConnectionUIDFromHandle(connection_handle);
+
+  sync_primitives::AutoLock lock(connection_list_lock_);
+  ConnectionListIterator itr = connection_list_.find(connection_id);
+
+  if (connection_list_.end() != itr) {
+    itr->second->RemoveSession(session_id);
+  }
+}
+
+void ConnectionHandlerImpl::SetProtocolHandler(
+    protocol_handler::ProtocolHandler* handler) {
+  protocol_handler_ = handler;
+}
+
+void ConnectionHandlerImpl::SendHeartBeat(ConnectionHandle connection_handle,
+                                          uint8_t session_id) {
+
+  transport_manager::ConnectionUID connection_uid =
+      ConnectionUIDFromHandle(connection_handle);
+  protocol_handler_->SendHeartBeat(connection_uid, session_id);
+}
+
+void ConnectionHandlerImpl::KeepConnectionAlive(uint32_t connection_key,
+                                                uint8_t session_id) {
   uint32_t connection_handle = 0;
-  uint8_t session_id = 0;
-  PairFromKey(connection_key, &connection_handle, &session_id);
+  uint8_t session = 0;
+  PairFromKey(connection_key, &connection_handle, &session);
+
+  LOG4CXX_INFO(logger_, "Keep alive for session: " <<
+               static_cast<int32_t>(session_id));
 
   sync_primitives::AutoLock lock(connection_list_lock_);
   ConnectionListIterator it = connection_list_.find(connection_handle);
   if (connection_list_.end() != it) {
-    it->second->KeepAlive();
+    it->second->KeepAlive(session_id);
   }
 }
 
index 7746d6d..99d9614 100644 (file)
@@ -34,6 +34,8 @@
  */
 
 #include "connection_handler/device.h"
+#include "encryption/hashing.h"
+#include "utils/logger.h"
 
 /**
  * \namespace connection_handler
  */
 namespace connection_handler {
 
-#ifdef ENABLE_LOG
-log4cxx::LoggerPtr Device::logger_ = log4cxx::LoggerPtr(
-                                       log4cxx::Logger::getLogger("ConnectionHandler"));
-#endif // ENABLE_LOG
+CREATE_LOGGERPTR_GLOBAL(logger_, "ConnectionHandler")
 
 Device::Device(DeviceHandle device_handle,
                const std::string& user_friendly_name,
                const std::string& mac_address)
-  : device_handle_(device_handle),
-    user_friendly_name_(user_friendly_name),
-    mac_address_(mac_address) {
+    : device_handle_(device_handle),
+      user_friendly_name_(user_friendly_name),
+      mac_address_(mac_address) {
+    mac_address_ = encryption::MakeHash(mac_address);
 }
 
 Device::~Device() {
 }
 
 DeviceHandle Device::device_handle() const {
-  return device_handle_;
+    return device_handle_;
 }
 
 std::string Device::user_friendly_name() const {
-  return user_friendly_name_;
+    return user_friendly_name_;
 }
 
 std::string Device::mac_address() const {
-  return mac_address_;
+    return mac_address_;
 }
 }/* namespace connection_handler */
index 7058ad0..b0338d4 100644 (file)
  * POSSIBILITY OF SUCH DAMAGE.
  */
 #include "connection_handler/heartbeat_monitor.h"
+#include <unistd.h>
 #include "connection_handler/connection.h"
-#include "log4cxx/logger.h"
+#include "utils/logger.h"
 
 namespace connection_handler {
 
-namespace {
-log4cxx::LoggerPtr g_logger =
-    log4cxx::LoggerPtr(log4cxx::Logger::getLogger("ConnectionHandler"));
-}
+using namespace sync_primitives;
+
+CREATE_LOGGERPTR_GLOBAL(logger_, "HeartBeatMonitor")
 
 HeartBeatMonitor::HeartBeatMonitor(int32_t heartbeat_timeout_seconds,
                                    Connection* connection)
     : heartbeat_timeout_seconds_(heartbeat_timeout_seconds),
       connection_(connection),
-      timer_(this, &HeartBeatMonitor::TimeOut) {
+      stop_flag_(false) {
 }
 
 HeartBeatMonitor::~HeartBeatMonitor() {
-  AssertRunningOnCreationThread();
+  LOG4CXX_TRACE_ENTER(logger_);
 }
 
-void HeartBeatMonitor::BeginMonitoring() {
-  AssertRunningOnCreationThread();
-  if (heartbeat_timeout_seconds_ != 0) {
-    LOG4CXX_INFO(g_logger, "Heart beat monitor: monitoring connection "
-                 << connection_->connection_handle() << ", timeout "
-                 << heartbeat_timeout_seconds_ << " seconds");
-    timer_.start(heartbeat_timeout_seconds_);
-  } else {
-    LOG4CXX_INFO(g_logger, "Heart beat monitor: disabled");
+
+void HeartBeatMonitor::threadMain() {
+   std::map<uint8_t, SessionState>::iterator it;
+
+  while (!stop_flag_) {
+    usleep(kdefault_cycle_timeout);
+
+    AutoLock auto_lock(sessions_list_lock_);
+
+    it = sessions_.begin();
+    while (it != sessions_.end()) {
+      if (it->second.heartbeat_expiration_.tv_sec < date_time::DateTime::getCurrentTime().tv_sec) {
+        if (it->second.is_heartbeat_sent_) {
+          LOG4CXX_INFO(logger_, "Session with id " << static_cast<int32_t>(it->first) <<" timed out, closing");
+          uint8_t session_id = it->first;
+          sessions_.erase(it);
+          it = sessions_.begin();
+          if (sessions_.empty()) {
+            stop_flag_ = true;
+          }
+          connection_->CloseSession(session_id);
+        } else {
+          it->second.heartbeat_expiration_ =
+              date_time::DateTime::getCurrentTime();
+          it->second.heartbeat_expiration_.tv_sec +=
+              heartbeat_timeout_seconds_;
+
+          connection_->SendHeartBeat(it->first);
+
+          it->second.is_heartbeat_sent_ = true;
+          ++it;
+        }
+      } else {
+        ++it;
+      }
+    }
   }
 }
 
-void HeartBeatMonitor::TimeOut() {
-  LOG4CXX_INFO(g_logger, "Heart beat monitor: connection "
-               << connection_->connection_handle() << " timed out, closing");
-  connection_->Close();
+void HeartBeatMonitor::AddSession(uint8_t session_id) {
+  LOG4CXX_INFO(logger_, "Add session with id" <<
+               static_cast<int32_t>(session_id));
+
+  AutoLock auto_lock(sessions_list_lock_);
+
+  SessionState session_state;
+  session_state.heartbeat_expiration_ = date_time::DateTime::getCurrentTime();
+  session_state.heartbeat_expiration_.tv_sec +=  heartbeat_timeout_seconds_;
+  session_state.is_heartbeat_sent_ = false;
+
+  sessions_[session_id] = session_state;
+}
+
+void HeartBeatMonitor::RemoveSession(uint8_t session_id) {
+  AutoLock auto_lock(sessions_list_lock_);
+
+  if (sessions_.end() != sessions_.find(session_id)) {
+    LOG4CXX_INFO(logger_, "Remove session with id" <<
+                 static_cast<int32_t>(session_id));
+    sessions_.erase(session_id);
+  }
 }
 
-void HeartBeatMonitor::KeepAlive() {
-  AssertRunningOnCreationThread();
-  if (heartbeat_timeout_seconds_ != 0) {
-    LOG4CXX_INFO(
-        g_logger,
-        "Resetting heart beat timer for connection "
-          << connection_->connection_handle());
-    timer_.stop();
-    timer_.start(heartbeat_timeout_seconds_);
+void HeartBeatMonitor::KeepAlive(uint8_t session_id) {
+  AutoLock auto_lock(sessions_list_lock_);
+
+  if (sessions_.end() != sessions_.find(session_id)) {
+    LOG4CXX_INFO(logger_, "Resetting heart beat timer for session with id"
+                 << static_cast<int32_t>(session_id));
+
+    sessions_[session_id].heartbeat_expiration_ =
+        date_time::DateTime::getCurrentTime();
+    sessions_[session_id].heartbeat_expiration_.tv_sec +=
+        heartbeat_timeout_seconds_;
+    sessions_[session_id].is_heartbeat_sent_ = false;
   }
 }
 
+bool HeartBeatMonitor::exitThreadMain() {
+  stop_flag_ = true;
+  LOG4CXX_INFO(logger_, "exitThreadMain");
+  return true;
+}
 
 } // namespace connection_handler