From 7bff336458e1ea5a50dc87b45db5b57b301bcacc Mon Sep 17 00:00:00 2001 From: Nikolay Khlopkov Date: Fri, 14 Mar 2014 10:49:45 +0400 Subject: [PATCH] APPLINK-5071: int type according to the style guide. No subject for review: trivial change Signed-off-by: Justin Dickow Conflicts: src/components/protocol_handler/src/protocol_handler_impl.cc --- .../protocol_handler/src/protocol_handler_impl.cc | 146 ++++++++++++++++++--- 1 file changed, 127 insertions(+), 19 deletions(-) diff --git a/src/components/protocol_handler/src/protocol_handler_impl.cc b/src/components/protocol_handler/src/protocol_handler_impl.cc index 3eb4b68..2e01ebf 100644 --- a/src/components/protocol_handler/src/protocol_handler_impl.cc +++ b/src/components/protocol_handler/src/protocol_handler_impl.cc @@ -55,18 +55,105 @@ std::string ConvertPacketDataToString(const uint8_t *data, const size_t kStackSize = 32768; +class ProtocolHandlerImpl::IncomingDataHandler { + public: + IncomingDataHandler() : connections_data_() {} + + bool ProcessData(const RawMessagePtr tm_message, + std::vector* out_frames) { + const ConnectionID connection_id = tm_message->connection_key(); + const uint8_t* data = tm_message->data(); + const std::size_t size = tm_message->data_size(); + LOG4CXX_TRACE(logger_, "Start of processing incoming data of size " + << size << " for connection " << connection_id); + const uint32_t kBytesForSizeDetection = 8; + ConnectionsData::iterator it = connections_data_.find(connection_id); + if (connections_data_.end() == it) { + LOG4CXX_ERROR(logger_, "ProcessData requested for unknown connection"); + return false; + } + std::vector& connection_data = it->second; + connection_data.insert(connection_data.end(), data, data + size); + + LOG4CXX_TRACE(logger_, "Total data size for connection " + << connection_id << " is " + << connection_data.size()); + while (connection_data.size() >= kBytesForSizeDetection) { + const uint32_t packet_size = GetPacketSize(&connection_data[0]); + if (0 == packet_size) { + LOG4CXX_ERROR(logger_, "Failed to get packet size"); + return false; + } + LOG4CXX_TRACE(logger_, "Packet size " << packet_size); + if (connection_data.size() >= packet_size) { + ProtocolFramePtr frame(new protocol_handler::ProtocolPacket( + connection_id, &connection_data[0], packet_size)); + out_frames->push_back(frame); + connection_data.erase(connection_data.begin(), + connection_data.begin() + packet_size); + LOG4CXX_TRACE(logger_, + "Packet created and passed, new data size for connection " + << connection_id << " is " << connection_data.size()); + } else { + LOG4CXX_TRACE(logger_, "Packet data is not available yet"); + return true; + } + } + return true; + } + + void AddConnection(ConnectionID connection_id) { + connections_data_[connection_id]; + } + + void RemoveConnection(ConnectionID connection_id) { + connections_data_.erase(connection_id); + } + + private: + /** + * @brief Returns size of frame to be formed from raw bytes. + * expects first bytes of message which will be treated as frame header. + */ + uint32_t GetPacketSize(unsigned char* received_bytes) { + DCHECK(received_bytes); + unsigned char offset = sizeof(uint32_t); + unsigned char version = received_bytes[0] >> 4u; + uint32_t frame_body_size = received_bytes[offset++] << 24u; + frame_body_size |= received_bytes[offset++] << 16u; + frame_body_size |= received_bytes[offset++] << 8u; + frame_body_size |= received_bytes[offset++]; + + uint32_t required_size = frame_body_size; + switch (version) { + case PROTOCOL_VERSION_1: + required_size += PROTOCOL_HEADER_V1_SIZE; + break; + case PROTOCOL_VERSION_2: + required_size += PROTOCOL_HEADER_V2_SIZE; + break; + default: + LOG4CXX_ERROR(logger_, "Unknown protocol version."); + return 0; + } + return required_size; + } + + typedef std::map > ConnectionsData; + ConnectionsData connections_data_; +}; + ProtocolHandlerImpl::ProtocolHandlerImpl( transport_manager::TransportManager* transport_manager_param) : protocol_observers_(), session_observer_(0), transport_manager_(transport_manager_param), kPeriodForNaviAck(5), - raw_ford_messages_from_mobile_( - "MessagesFromMobileAppHandler", this, - threads::ThreadOptions(kStackSize)), - raw_ford_messages_to_mobile_( - "MessagesToMobileAppHandler", this, - threads::ThreadOptions(kStackSize)) { + incoming_data_handler_(new IncomingDataHandler), + raw_ford_messages_from_mobile_("MessagesFromMobileAppHandler", this, + threads::ThreadOptions(kStackSize)), + raw_ford_messages_to_mobile_("MessagesToMobileAppHandler", this, + threads::ThreadOptions(kStackSize)) { LOG4CXX_TRACE_ENTER(logger_); LOG4CXX_TRACE_EXIT(logger_); @@ -114,7 +201,7 @@ void ProtocolHandlerImpl::SendStartSessionAck(ConnectionID connection_id, LOG4CXX_TRACE_ENTER(logger_); ProtocolFramePtr ptr(new protocol_handler::ProtocolPacket(connection_id, - PROTOCOL_VERSION_2, COMPRESS_OFF, FRAME_TYPE_CONTROL, + protocol_version, COMPRESS_OFF, FRAME_TYPE_CONTROL, service_type, FRAME_DATA_START_SERVICE_ACK, session_id, 0, hash_code)); @@ -274,32 +361,42 @@ void ProtocolHandlerImpl::SendMessageToMobileApp(const RawMessagePtr& message, LOG4CXX_TRACE_EXIT(logger_); } -void ProtocolHandlerImpl::OnTMMessageReceived(const RawMessagePtr message) { +void ProtocolHandlerImpl::OnTMMessageReceived(const RawMessagePtr tm_message) { LOG4CXX_TRACE_ENTER(logger_); connection_handler::ConnectionHandlerImpl* connection_handler = connection_handler::ConnectionHandlerImpl::instance(); // Connection handler should be accessed from TM thread only - connection_handler->KeepConnectionAlive(message->connection_key()); + connection_handler->KeepConnectionAlive(tm_message->connection_key()); - if (message.valid()) { + if (tm_message) { LOG4CXX_INFO_EXT( logger_, - "Received from TM " << message->data() << - " with connection id " << message->connection_key() - << " msg data_size " << message->data_size()); - - ProtocolFramePtr ptr(new protocol_handler::ProtocolPacket( - message->connection_key(), message->data(), message->data_size())); - - raw_ford_messages_from_mobile_.PostMessage( - impl::RawFordMessageFromMobile(ptr)); + "Received from TM " << tm_message->data() << " with connection id " + << tm_message->connection_key() << " msg data_size " + << tm_message->data_size()); } else { LOG4CXX_ERROR( logger_, "Invalid incoming message received in" << " ProtocolHandler from Transport Manager."); + return; + } + + std::vector protocol_frames; + const bool ok = + incoming_data_handler_->ProcessData(tm_message, &protocol_frames); + if (!ok) { + LOG4CXX_ERROR(logger_, + "Incoming data processing failed. Terminating connection."); + transport_manager_->DisconnectForce(tm_message->connection_key()); } + for (std::vector::const_iterator it = + protocol_frames.begin(); + it != protocol_frames.end(); ++it) { + raw_ford_messages_from_mobile_.PostMessage( + impl::RawFordMessageFromMobile(*it)); + } LOG4CXX_TRACE_EXIT(logger_); } @@ -333,6 +430,17 @@ void ProtocolHandlerImpl::OnTMMessageSendFailed( message-> data() << " failed."); } +void ProtocolHandlerImpl::OnConnectionEstablished( + const transport_manager::DeviceInfo& device_info, + const transport_manager::ConnectionUID& connection_id) { + incoming_data_handler_->AddConnection(connection_id); +} + +void ProtocolHandlerImpl::OnConnectionClosed( + const transport_manager::ConnectionUID& connection_id) { + incoming_data_handler_->RemoveConnection(connection_id); +} + RESULT_CODE ProtocolHandlerImpl::SendFrame(ConnectionID connection_id, const ProtocolPacket& packet) { LOG4CXX_TRACE_ENTER(logger_); -- 2.7.4