IVGCVSW-4760 Change the offsets in the counter directory body_header to be from the...
[platform/upstream/armnn.git] / src / profiling / FileOnlyProfilingConnection.cpp
1 //
2 // Copyright © 2019 Arm Ltd. All rights reserved.
3 // SPDX-License-Identifier: MIT
4 //
5
6 #include "FileOnlyProfilingConnection.hpp"
7 #include "PacketVersionResolver.hpp"
8
9 #include <armnn/Exceptions.hpp>
10
11 #include <algorithm>
12 #include <boost/numeric/conversion/cast.hpp>
13 #include <iostream>
14 #include <thread>
15
16 namespace armnn
17 {
18
19 namespace profiling
20 {
21
22 FileOnlyProfilingConnection::~FileOnlyProfilingConnection()
23 {
24     Close();
25 }
26
27 bool FileOnlyProfilingConnection::IsOpen() const
28 {
29     // This type of connection is always open.
30     return true;
31 }
32
33 void FileOnlyProfilingConnection::Close()
34 {
35     // Dump any unread packets out of the queue.
36     size_t initialSize = m_PacketQueue.size();
37     for (size_t i = 0; i < initialSize; ++i)
38     {
39         m_PacketQueue.pop();
40     }
41     // dispose of the processing thread
42     m_KeepRunning.store(false);
43     if (m_LocalHandlersThread.joinable())
44     {
45         // make sure the thread wakes up and sees it has to stop
46         m_ConditionPacketReadable.notify_one();
47         m_LocalHandlersThread.join();
48     }
49 }
50
51 bool FileOnlyProfilingConnection::WaitForStreamMeta(const unsigned char* buffer, uint32_t length)
52 {
53     IgnoreUnused(length);
54
55     // The first word, stream_metadata_identifer, should always be 0.
56     if (ToUint32(buffer, TargetEndianness::BeWire) != 0)
57     {
58         Fail("Protocol error. The stream_metadata_identifer was not 0.");
59     }
60
61     // Before we interpret the length we need to read the pipe_magic word to determine endianness.
62     if (ToUint32(buffer + 8, TargetEndianness::BeWire) == PIPE_MAGIC)
63     {
64         m_Endianness = TargetEndianness::BeWire;
65     }
66     else if (ToUint32(buffer + 8, TargetEndianness::LeWire) == PIPE_MAGIC)
67     {
68         m_Endianness = TargetEndianness::LeWire;
69     }
70     else
71     {
72         Fail("Protocol read error. Unable to read PIPE_MAGIC value.");
73     }
74     return true;
75 }
76
77 void FileOnlyProfilingConnection::SendConnectionAck()
78 {
79     if (!m_QuietOp)
80     {
81         std::cout << "Sending connection acknowledgement." << std::endl;
82     }
83     std::unique_ptr<unsigned char[]> uniqueNullPtr = nullptr;
84     {
85         std::lock_guard<std::mutex> lck(m_PacketAvailableMutex);
86         m_PacketQueue.push(Packet(0x10000, 0, uniqueNullPtr));
87     }
88     m_ConditionPacketAvailable.notify_one();
89 }
90
91 bool FileOnlyProfilingConnection::SendCounterSelectionPacket()
92 {
93     uint32_t uint16_t_size = sizeof(uint16_t);
94     uint32_t uint32_t_size = sizeof(uint32_t);
95
96     uint32_t offset   = 0;
97     uint32_t bodySize = uint32_t_size + boost::numeric_cast<uint32_t>(m_IdList.size()) * uint16_t_size;
98
99     auto uniqueData     = std::make_unique<unsigned char[]>(bodySize);
100     unsigned char* data = reinterpret_cast<unsigned char*>(uniqueData.get());
101
102     // Copy capturePeriod
103     WriteUint32(data, offset, m_Options.m_CapturePeriod);
104
105     // Copy m_IdList
106     offset += uint32_t_size;
107     for (const uint16_t& id : m_IdList)
108     {
109         WriteUint16(data, offset, id);
110         offset += uint16_t_size;
111     }
112
113     {
114         std::lock_guard<std::mutex> lck(m_PacketAvailableMutex);
115         m_PacketQueue.push(Packet(0x40000, bodySize, uniqueData));
116     }
117     m_ConditionPacketAvailable.notify_one();
118
119     return true;
120 }
121
122 bool FileOnlyProfilingConnection::WritePacket(const unsigned char* buffer, uint32_t length)
123 {
124     ARMNN_ASSERT(buffer);
125     Packet packet = ReceivePacket(buffer, length);
126
127     // Read Header and determine case
128     uint32_t outgoingHeaderAsWords[2];
129     PackageActivity packageActivity = GetPackageActivity(packet, outgoingHeaderAsWords);
130
131     switch (packageActivity)
132     {
133         case PackageActivity::StreamMetaData:
134         {
135             if (!WaitForStreamMeta(buffer, length))
136             {
137                 return EXIT_FAILURE;
138             }
139
140             SendConnectionAck();
141             break;
142         }
143         case PackageActivity::CounterDirectory:
144         {
145             std::unique_ptr<unsigned char[]> uniqueCounterData = std::make_unique<unsigned char[]>(length - 8);
146
147             std::memcpy(uniqueCounterData.get(), buffer + 8, length - 8);
148
149             Packet directoryPacket(outgoingHeaderAsWords[0], length - 8, uniqueCounterData);
150
151             armnn::profiling::PacketVersionResolver packetVersionResolver;
152             DirectoryCaptureCommandHandler directoryCaptureCommandHandler(
153                 0, 2, packetVersionResolver.ResolvePacketVersion(0, 2).GetEncodedValue());
154             directoryCaptureCommandHandler.operator()(directoryPacket);
155             const ICounterDirectory& counterDirectory = directoryCaptureCommandHandler.GetCounterDirectory();
156             for (auto& category : counterDirectory.GetCategories())
157             {
158                 // Remember we need to translate the Uid's from our CounterDirectory instance to the parent one.
159                 std::vector<uint16_t> translatedCounters;
160                 for (auto const& copyUid : category->m_Counters)
161                 {
162                     translatedCounters.emplace_back(directoryCaptureCommandHandler.TranslateUIDCopyToOriginal(copyUid));
163                 }
164                 m_IdList.insert(std::end(m_IdList), std::begin(translatedCounters), std::end(translatedCounters));
165             }
166             SendCounterSelectionPacket();
167             break;
168         }
169         default:
170         {
171             break;
172         }
173     }
174     ForwardPacketToHandlers(packet);
175     return true;
176 }
177
178 Packet FileOnlyProfilingConnection::ReadPacket(uint32_t timeout)
179 {
180     std::unique_lock<std::mutex> lck(m_PacketAvailableMutex);
181
182     // Here we are using m_PacketQueue.empty() as a predicate variable
183     // The conditional variable will wait until packetQueue is not empty or until a timeout
184     if(!m_ConditionPacketAvailable.wait_for(lck,
185                                             std::chrono::milliseconds(timeout),
186                                             [&]{return !m_PacketQueue.empty();}))
187     {
188         throw armnn::TimeoutException("Thread has timed out as per requested time limit");
189     }
190
191     Packet returnedPacket = std::move(m_PacketQueue.front());
192     m_PacketQueue.pop();
193     return returnedPacket;
194 }
195
196 PackageActivity FileOnlyProfilingConnection::GetPackageActivity(const Packet& packet, uint32_t headerAsWords[2])
197 {
198     headerAsWords[0] = packet.GetHeader();
199     headerAsWords[1] = packet.GetLength();
200     if (headerAsWords[0] == 0x20000)    // Packet family = 0 Packet Id = 2
201     {
202         return PackageActivity::CounterDirectory;
203     }
204     else if (headerAsWords[0] == 0)    // Packet family = 0 Packet Id = 0
205     {
206         return PackageActivity::StreamMetaData;
207     }
208     else
209     {
210         return PackageActivity::Unknown;
211     }
212 }
213
214 uint32_t FileOnlyProfilingConnection::ToUint32(const unsigned char* data, TargetEndianness endianness)
215 {
216     // Extract the first 4 bytes starting at data and push them into a 32bit integer based on the
217     // specified endianness.
218     if (endianness == TargetEndianness::BeWire)
219     {
220         return static_cast<uint32_t>(data[0]) << 24 | static_cast<uint32_t>(data[1]) << 16 |
221                static_cast<uint32_t>(data[2]) << 8 | static_cast<uint32_t>(data[3]);
222     }
223     else
224     {
225         return static_cast<uint32_t>(data[3]) << 24 | static_cast<uint32_t>(data[2]) << 16 |
226                static_cast<uint32_t>(data[1]) << 8 | static_cast<uint32_t>(data[0]);
227     }
228 }
229
230 void FileOnlyProfilingConnection::Fail(const std::string& errorMessage)
231 {
232     Close();
233     throw RuntimeException(errorMessage);
234 }
235
236 /// Adds a local packet handler to the FileOnlyProfilingConnection. Invoking this will start
237 /// a processing thread that will ensure that processing of packets will happen on a separate
238 /// thread from the profiling services send thread and will therefore protect against the
239 /// profiling message buffer becoming exhausted because packet handling slows the dispatch.
240 void FileOnlyProfilingConnection::AddLocalPacketHandler(ILocalPacketHandlerSharedPtr localPacketHandler)
241 {
242     m_PacketHandlers.push_back(std::move(localPacketHandler));
243     ILocalPacketHandlerSharedPtr localCopy = m_PacketHandlers.back();
244     localCopy->SetConnection(this);
245     if (localCopy->GetHeadersAccepted().empty())
246     {
247         //this is a universal handler
248         m_UniversalHandlers.push_back(localCopy);
249     }
250     else
251     {
252         for (uint32_t header : localCopy->GetHeadersAccepted())
253         {
254             auto iter = m_IndexedHandlers.find(header);
255             if (iter == m_IndexedHandlers.end())
256             {
257                 std::vector<ILocalPacketHandlerSharedPtr> handlers;
258                 handlers.push_back(localCopy);
259                 m_IndexedHandlers.emplace(std::make_pair(header, handlers));
260             }
261             else
262             {
263                 iter->second.push_back(localCopy);
264             }
265         }
266     }
267 }
268
269 void FileOnlyProfilingConnection::StartProcessingThread()
270 {
271     // check if the thread has already started
272     if (m_IsRunning.load())
273     {
274         return;
275     }
276     // make sure if there was one running before it is joined
277     if (m_LocalHandlersThread.joinable())
278     {
279         m_LocalHandlersThread.join();
280     }
281     m_IsRunning.store(true);
282     m_KeepRunning.store(true);
283     m_LocalHandlersThread = std::thread(&FileOnlyProfilingConnection::ServiceLocalHandlers, this);
284 }
285
286 void FileOnlyProfilingConnection::ForwardPacketToHandlers(Packet& packet)
287 {
288     if (m_PacketHandlers.empty())
289     {
290         return;
291     }
292     if (m_KeepRunning.load() == false)
293     {
294         return;
295     }
296     {
297         std::unique_lock<std::mutex> readableListLock(m_ReadableMutex);
298         if (m_KeepRunning.load() == false)
299         {
300             return;
301         }
302         m_ReadableList.push(std::move(packet));
303     }
304     m_ConditionPacketReadable.notify_one();
305 }
306
307 void FileOnlyProfilingConnection::ServiceLocalHandlers()
308 {
309     do
310     {
311         Packet returnedPacket;
312         bool readPacket = false;
313         {   // only lock while we are taking the packet off the incoming list
314             std::unique_lock<std::mutex> lck(m_ReadableMutex);
315             if (m_Timeout < 0)
316             {
317                 m_ConditionPacketReadable.wait(lck,
318                                                [&] { return !m_ReadableList.empty(); });
319             }
320             else
321             {
322                 m_ConditionPacketReadable.wait_for(lck,
323                                                    std::chrono::milliseconds(std::max(m_Timeout, 1000)),
324                                                    [&] { return !m_ReadableList.empty(); });
325             }
326             if (m_KeepRunning.load())
327             {
328                 if (!m_ReadableList.empty())
329                 {
330                     returnedPacket = std::move(m_ReadableList.front());
331                     m_ReadableList.pop();
332                     readPacket = true;
333                 }
334             }
335             else
336             {
337                 ClearReadableList();
338             }
339         }
340         if (m_KeepRunning.load() && readPacket)
341         {
342             DispatchPacketToHandlers(returnedPacket);
343         }
344     } while (m_KeepRunning.load());
345     // make sure the readable list is cleared
346     ClearReadableList();
347     m_IsRunning.store(false);
348 }
349
350 void FileOnlyProfilingConnection::ClearReadableList()
351 {
352     // make sure the incoming packet queue gets emptied
353     size_t initialSize = m_ReadableList.size();
354     for (size_t i = 0; i < initialSize; ++i)
355     {
356         m_ReadableList.pop();
357     }
358 }
359
360 void FileOnlyProfilingConnection::DispatchPacketToHandlers(const Packet& packet)
361 {
362     for (auto& delegate : m_UniversalHandlers)
363     {
364         delegate->HandlePacket(packet);
365     }
366     auto iter = m_IndexedHandlers.find(packet.GetHeader());
367     if (iter != m_IndexedHandlers.end())
368     {
369         for (auto &delegate : iter->second)
370         {
371             delegate->HandlePacket(packet);
372         }
373     }
374 }
375
376 }    // namespace profiling
377
378 }    // namespace armnn