2 // Copyright © 2019 Arm Ltd and Contributors. All rights reserved.
3 // SPDX-License-Identifier: MIT
6 #include "FileOnlyProfilingConnection.hpp"
7 #include "PacketVersionResolver.hpp"
9 #include <armnn/Exceptions.hpp>
10 #include <common/include/Constants.hpp>
11 #include <common/include/ProfilingException.hpp>
23 std::vector<uint32_t> StreamMetaDataProcessor::GetHeadersAccepted()
25 std::vector<uint32_t> headers;
26 headers.push_back(m_MetaDataPacketHeader);
30 void StreamMetaDataProcessor::HandlePacket(const arm::pipe::Packet& packet)
32 if (packet.GetHeader() != m_MetaDataPacketHeader)
34 throw arm::pipe::ProfilingException("StreamMetaDataProcessor can only handle Stream Meta Data Packets");
36 // determine the endianness of the protocol
37 TargetEndianness endianness;
38 if (ToUint32(packet.GetData(),TargetEndianness::BeWire) == arm::pipe::PIPE_MAGIC)
40 endianness = TargetEndianness::BeWire;
42 else if (ToUint32(packet.GetData(), TargetEndianness::LeWire) == arm::pipe::PIPE_MAGIC)
44 endianness = TargetEndianness::LeWire;
48 throw arm::pipe::ProfilingException("Protocol read error. Unable to read the PIPE_MAGIC value.");
50 m_FileOnlyProfilingConnection->SetEndianess(endianness);
51 // send back the acknowledgement
52 std::unique_ptr<unsigned char[]> uniqueNullPtr = nullptr;
53 arm::pipe::Packet returnPacket(0x10000, 0, uniqueNullPtr);
54 m_FileOnlyProfilingConnection->ReturnPacket(returnPacket);
57 uint32_t StreamMetaDataProcessor::ToUint32(const unsigned char* data, TargetEndianness endianness)
59 // Extract the first 4 bytes starting at data and push them into a 32bit integer based on the
60 // specified endianness.
61 if (endianness == TargetEndianness::BeWire)
63 return static_cast<uint32_t>(data[0]) << 24 | static_cast<uint32_t>(data[1]) << 16 |
64 static_cast<uint32_t>(data[2]) << 8 | static_cast<uint32_t>(data[3]);
68 return static_cast<uint32_t>(data[3]) << 24 | static_cast<uint32_t>(data[2]) << 16 |
69 static_cast<uint32_t>(data[1]) << 8 | static_cast<uint32_t>(data[0]);
73 FileOnlyProfilingConnection::~FileOnlyProfilingConnection()
85 bool FileOnlyProfilingConnection::IsOpen() const
87 // This type of connection is always open.
91 void FileOnlyProfilingConnection::Close()
93 // Dump any unread packets out of the queue.
94 size_t initialSize = m_PacketQueue.size();
95 for (size_t i = 0; i < initialSize; ++i)
99 // dispose of the processing thread
100 m_KeepRunning.store(false);
101 if (m_LocalHandlersThread.joinable())
103 // make sure the thread wakes up and sees it has to stop
104 m_ConditionPacketReadable.notify_one();
105 m_LocalHandlersThread.join();
109 bool FileOnlyProfilingConnection::WritePacket(const unsigned char* buffer, uint32_t length)
111 ARMNN_ASSERT(buffer);
112 arm::pipe::Packet packet = ReceivePacket(buffer, length);
113 ForwardPacketToHandlers(packet);
117 void FileOnlyProfilingConnection::ReturnPacket(arm::pipe::Packet& packet)
120 std::lock_guard<std::mutex> lck(m_PacketAvailableMutex);
121 m_PacketQueue.push(std::move(packet));
123 m_ConditionPacketAvailable.notify_one();
126 arm::pipe::Packet FileOnlyProfilingConnection::ReadPacket(uint32_t timeout)
128 std::unique_lock<std::mutex> lck(m_PacketAvailableMutex);
130 // Here we are using m_PacketQueue.empty() as a predicate variable
131 // The conditional variable will wait until packetQueue is not empty or until a timeout
132 if (!m_ConditionPacketAvailable.wait_for(lck,
133 std::chrono::milliseconds(timeout),
134 [&]{return !m_PacketQueue.empty();}))
136 arm::pipe::Packet empty;
140 arm::pipe::Packet returnedPacket = std::move(m_PacketQueue.front());
142 return returnedPacket;
145 void FileOnlyProfilingConnection::Fail(const std::string& errorMessage)
148 throw RuntimeException(errorMessage);
151 /// Adds a local packet handler to the FileOnlyProfilingConnection. Invoking this will start
152 /// a processing thread that will ensure that processing of packets will happen on a separate
153 /// thread from the profiling services send thread and will therefore protect against the
154 /// profiling message buffer becoming exhausted because packet handling slows the dispatch.
155 void FileOnlyProfilingConnection::AddLocalPacketHandler(ILocalPacketHandlerSharedPtr localPacketHandler)
157 m_PacketHandlers.push_back(std::move(localPacketHandler));
158 ILocalPacketHandlerSharedPtr localCopy = m_PacketHandlers.back();
159 localCopy->SetConnection(this);
160 if (localCopy->GetHeadersAccepted().empty())
162 //this is a universal handler
163 m_UniversalHandlers.push_back(localCopy);
167 for (uint32_t header : localCopy->GetHeadersAccepted())
169 auto iter = m_IndexedHandlers.find(header);
170 if (iter == m_IndexedHandlers.end())
172 std::vector<ILocalPacketHandlerSharedPtr> handlers;
173 handlers.push_back(localCopy);
174 m_IndexedHandlers.emplace(std::make_pair(header, handlers));
178 iter->second.push_back(localCopy);
184 void FileOnlyProfilingConnection::StartProcessingThread()
186 // check if the thread has already started
187 if (m_IsRunning.load())
191 // make sure if there was one running before it is joined
192 if (m_LocalHandlersThread.joinable())
194 m_LocalHandlersThread.join();
196 m_IsRunning.store(true);
197 m_KeepRunning.store(true);
198 m_LocalHandlersThread = std::thread(&FileOnlyProfilingConnection::ServiceLocalHandlers, this);
201 void FileOnlyProfilingConnection::ForwardPacketToHandlers(arm::pipe::Packet& packet)
203 if (m_PacketHandlers.empty())
207 if (!m_KeepRunning.load())
212 std::unique_lock<std::mutex> readableListLock(m_ReadableMutex);
213 if (!m_KeepRunning.load())
217 m_ReadableList.push(std::move(packet));
219 m_ConditionPacketReadable.notify_one();
222 void FileOnlyProfilingConnection::ServiceLocalHandlers()
226 arm::pipe::Packet returnedPacket;
227 bool readPacket = false;
228 { // only lock while we are taking the packet off the incoming list
229 std::unique_lock<std::mutex> lck(m_ReadableMutex);
232 m_ConditionPacketReadable.wait(lck,
233 [&] { return !m_ReadableList.empty(); });
237 m_ConditionPacketReadable.wait_for(lck,
238 std::chrono::milliseconds(std::max(m_Timeout, 1000)),
239 [&] { return !m_ReadableList.empty(); });
241 if (m_KeepRunning.load())
243 if (!m_ReadableList.empty())
245 returnedPacket = std::move(m_ReadableList.front());
246 m_ReadableList.pop();
255 if (m_KeepRunning.load() && readPacket)
257 DispatchPacketToHandlers(returnedPacket);
259 } while (m_KeepRunning.load());
260 // make sure the readable list is cleared
262 m_IsRunning.store(false);
265 void FileOnlyProfilingConnection::ClearReadableList()
267 // make sure the incoming packet queue gets emptied
268 size_t initialSize = m_ReadableList.size();
269 for (size_t i = 0; i < initialSize; ++i)
271 m_ReadableList.pop();
275 void FileOnlyProfilingConnection::DispatchPacketToHandlers(const arm::pipe::Packet& packet)
277 for (auto& delegate : m_UniversalHandlers)
279 delegate->HandlePacket(packet);
281 auto iter = m_IndexedHandlers.find(packet.GetHeader());
282 if (iter != m_IndexedHandlers.end())
284 for (auto& delegate : iter->second)
288 delegate->HandlePacket(packet);
290 catch (const arm::pipe::ProfilingException& ex)
294 catch (const std::exception& ex)
300 Fail("handler failed");
306 } // namespace profiling