2 // Copyright © 2019 Arm Ltd. All rights reserved.
3 // SPDX-License-Identifier: MIT
6 #include "FileOnlyProfilingConnection.hpp"
7 #include "PacketVersionResolver.hpp"
9 #include <armnn/Exceptions.hpp>
12 #include <boost/numeric/conversion/cast.hpp>
22 FileOnlyProfilingConnection::~FileOnlyProfilingConnection()
27 bool FileOnlyProfilingConnection::IsOpen() const
29 // This type of connection is always open.
33 void FileOnlyProfilingConnection::Close()
35 // Dump any unread packets out of the queue.
36 size_t initialSize = m_PacketQueue.size();
37 for (size_t i = 0; i < initialSize; ++i)
41 // dispose of the processing thread
42 m_KeepRunning.store(false);
43 if (m_LocalHandlersThread.joinable())
45 // make sure the thread wakes up and sees it has to stop
46 m_ConditionPacketReadable.notify_one();
47 m_LocalHandlersThread.join();
51 bool FileOnlyProfilingConnection::WaitForStreamMeta(const unsigned char* buffer, uint32_t length)
55 // The first word, stream_metadata_identifer, should always be 0.
56 if (ToUint32(buffer, TargetEndianness::BeWire) != 0)
58 Fail("Protocol error. The stream_metadata_identifer was not 0.");
61 // Before we interpret the length we need to read the pipe_magic word to determine endianness.
62 if (ToUint32(buffer + 8, TargetEndianness::BeWire) == PIPE_MAGIC)
64 m_Endianness = TargetEndianness::BeWire;
66 else if (ToUint32(buffer + 8, TargetEndianness::LeWire) == PIPE_MAGIC)
68 m_Endianness = TargetEndianness::LeWire;
72 Fail("Protocol read error. Unable to read PIPE_MAGIC value.");
77 void FileOnlyProfilingConnection::SendConnectionAck()
81 std::cout << "Sending connection acknowledgement." << std::endl;
83 std::unique_ptr<unsigned char[]> uniqueNullPtr = nullptr;
85 std::lock_guard<std::mutex> lck(m_PacketAvailableMutex);
86 m_PacketQueue.push(Packet(0x10000, 0, uniqueNullPtr));
88 m_ConditionPacketAvailable.notify_one();
91 bool FileOnlyProfilingConnection::SendCounterSelectionPacket()
93 uint32_t uint16_t_size = sizeof(uint16_t);
94 uint32_t uint32_t_size = sizeof(uint32_t);
97 uint32_t bodySize = uint32_t_size + boost::numeric_cast<uint32_t>(m_IdList.size()) * uint16_t_size;
99 auto uniqueData = std::make_unique<unsigned char[]>(bodySize);
100 unsigned char* data = reinterpret_cast<unsigned char*>(uniqueData.get());
102 // Copy capturePeriod
103 WriteUint32(data, offset, m_Options.m_CapturePeriod);
106 offset += uint32_t_size;
107 for (const uint16_t& id : m_IdList)
109 WriteUint16(data, offset, id);
110 offset += uint16_t_size;
114 std::lock_guard<std::mutex> lck(m_PacketAvailableMutex);
115 m_PacketQueue.push(Packet(0x40000, bodySize, uniqueData));
117 m_ConditionPacketAvailable.notify_one();
122 bool FileOnlyProfilingConnection::WritePacket(const unsigned char* buffer, uint32_t length)
124 ARMNN_ASSERT(buffer);
125 Packet packet = ReceivePacket(buffer, length);
127 // Read Header and determine case
128 uint32_t outgoingHeaderAsWords[2];
129 PackageActivity packageActivity = GetPackageActivity(packet, outgoingHeaderAsWords);
131 switch (packageActivity)
133 case PackageActivity::StreamMetaData:
135 if (!WaitForStreamMeta(buffer, length))
143 case PackageActivity::CounterDirectory:
145 std::unique_ptr<unsigned char[]> uniqueCounterData = std::make_unique<unsigned char[]>(length - 8);
147 std::memcpy(uniqueCounterData.get(), buffer + 8, length - 8);
149 Packet directoryPacket(outgoingHeaderAsWords[0], length - 8, uniqueCounterData);
151 armnn::profiling::PacketVersionResolver packetVersionResolver;
152 DirectoryCaptureCommandHandler directoryCaptureCommandHandler(
153 0, 2, packetVersionResolver.ResolvePacketVersion(0, 2).GetEncodedValue());
154 directoryCaptureCommandHandler.operator()(directoryPacket);
155 const ICounterDirectory& counterDirectory = directoryCaptureCommandHandler.GetCounterDirectory();
156 for (auto& category : counterDirectory.GetCategories())
158 // Remember we need to translate the Uid's from our CounterDirectory instance to the parent one.
159 std::vector<uint16_t> translatedCounters;
160 for (auto const& copyUid : category->m_Counters)
162 translatedCounters.emplace_back(directoryCaptureCommandHandler.TranslateUIDCopyToOriginal(copyUid));
164 m_IdList.insert(std::end(m_IdList), std::begin(translatedCounters), std::end(translatedCounters));
166 SendCounterSelectionPacket();
174 ForwardPacketToHandlers(packet);
178 Packet FileOnlyProfilingConnection::ReadPacket(uint32_t timeout)
180 std::unique_lock<std::mutex> lck(m_PacketAvailableMutex);
182 // Here we are using m_PacketQueue.empty() as a predicate variable
183 // The conditional variable will wait until packetQueue is not empty or until a timeout
184 if(!m_ConditionPacketAvailable.wait_for(lck,
185 std::chrono::milliseconds(timeout),
186 [&]{return !m_PacketQueue.empty();}))
188 throw armnn::TimeoutException("Thread has timed out as per requested time limit");
191 Packet returnedPacket = std::move(m_PacketQueue.front());
193 return returnedPacket;
196 PackageActivity FileOnlyProfilingConnection::GetPackageActivity(const Packet& packet, uint32_t headerAsWords[2])
198 headerAsWords[0] = packet.GetHeader();
199 headerAsWords[1] = packet.GetLength();
200 if (headerAsWords[0] == 0x20000) // Packet family = 0 Packet Id = 2
202 return PackageActivity::CounterDirectory;
204 else if (headerAsWords[0] == 0) // Packet family = 0 Packet Id = 0
206 return PackageActivity::StreamMetaData;
210 return PackageActivity::Unknown;
214 uint32_t FileOnlyProfilingConnection::ToUint32(const unsigned char* data, TargetEndianness endianness)
216 // Extract the first 4 bytes starting at data and push them into a 32bit integer based on the
217 // specified endianness.
218 if (endianness == TargetEndianness::BeWire)
220 return static_cast<uint32_t>(data[0]) << 24 | static_cast<uint32_t>(data[1]) << 16 |
221 static_cast<uint32_t>(data[2]) << 8 | static_cast<uint32_t>(data[3]);
225 return static_cast<uint32_t>(data[3]) << 24 | static_cast<uint32_t>(data[2]) << 16 |
226 static_cast<uint32_t>(data[1]) << 8 | static_cast<uint32_t>(data[0]);
230 void FileOnlyProfilingConnection::Fail(const std::string& errorMessage)
233 throw RuntimeException(errorMessage);
236 /// Adds a local packet handler to the FileOnlyProfilingConnection. Invoking this will start
237 /// a processing thread that will ensure that processing of packets will happen on a separate
238 /// thread from the profiling services send thread and will therefore protect against the
239 /// profiling message buffer becoming exhausted because packet handling slows the dispatch.
240 void FileOnlyProfilingConnection::AddLocalPacketHandler(ILocalPacketHandlerSharedPtr localPacketHandler)
242 m_PacketHandlers.push_back(std::move(localPacketHandler));
243 ILocalPacketHandlerSharedPtr localCopy = m_PacketHandlers.back();
244 localCopy->SetConnection(this);
245 if (localCopy->GetHeadersAccepted().empty())
247 //this is a universal handler
248 m_UniversalHandlers.push_back(localCopy);
252 for (uint32_t header : localCopy->GetHeadersAccepted())
254 auto iter = m_IndexedHandlers.find(header);
255 if (iter == m_IndexedHandlers.end())
257 std::vector<ILocalPacketHandlerSharedPtr> handlers;
258 handlers.push_back(localCopy);
259 m_IndexedHandlers.emplace(std::make_pair(header, handlers));
263 iter->second.push_back(localCopy);
269 void FileOnlyProfilingConnection::StartProcessingThread()
271 // check if the thread has already started
272 if (m_IsRunning.load())
276 // make sure if there was one running before it is joined
277 if (m_LocalHandlersThread.joinable())
279 m_LocalHandlersThread.join();
281 m_IsRunning.store(true);
282 m_KeepRunning.store(true);
283 m_LocalHandlersThread = std::thread(&FileOnlyProfilingConnection::ServiceLocalHandlers, this);
286 void FileOnlyProfilingConnection::ForwardPacketToHandlers(Packet& packet)
288 if (m_PacketHandlers.empty())
292 if (m_KeepRunning.load() == false)
297 std::unique_lock<std::mutex> readableListLock(m_ReadableMutex);
298 if (m_KeepRunning.load() == false)
302 m_ReadableList.push(std::move(packet));
304 m_ConditionPacketReadable.notify_one();
307 void FileOnlyProfilingConnection::ServiceLocalHandlers()
311 Packet returnedPacket;
312 bool readPacket = false;
313 { // only lock while we are taking the packet off the incoming list
314 std::unique_lock<std::mutex> lck(m_ReadableMutex);
317 m_ConditionPacketReadable.wait(lck,
318 [&] { return !m_ReadableList.empty(); });
322 m_ConditionPacketReadable.wait_for(lck,
323 std::chrono::milliseconds(std::max(m_Timeout, 1000)),
324 [&] { return !m_ReadableList.empty(); });
326 if (m_KeepRunning.load())
328 if (!m_ReadableList.empty())
330 returnedPacket = std::move(m_ReadableList.front());
331 m_ReadableList.pop();
340 if (m_KeepRunning.load() && readPacket)
342 DispatchPacketToHandlers(returnedPacket);
344 } while (m_KeepRunning.load());
345 // make sure the readable list is cleared
347 m_IsRunning.store(false);
350 void FileOnlyProfilingConnection::ClearReadableList()
352 // make sure the incoming packet queue gets emptied
353 size_t initialSize = m_ReadableList.size();
354 for (size_t i = 0; i < initialSize; ++i)
356 m_ReadableList.pop();
360 void FileOnlyProfilingConnection::DispatchPacketToHandlers(const Packet& packet)
362 for (auto& delegate : m_UniversalHandlers)
364 delegate->HandlePacket(packet);
366 auto iter = m_IndexedHandlers.find(packet.GetHeader());
367 if (iter != m_IndexedHandlers.end())
369 for (auto &delegate : iter->second)
371 delegate->HandlePacket(packet);
376 } // namespace profiling