IVGCVSW-5301 Remove all boost::numeric_cast from armnn/src/profiling
[platform/upstream/armnn.git] / src / profiling / FileOnlyProfilingConnection.cpp
1 //
2 // Copyright © 2019 Arm Ltd and Contributors. All rights reserved.
3 // SPDX-License-Identifier: MIT
4 //
5
6 #include "FileOnlyProfilingConnection.hpp"
7 #include "PacketVersionResolver.hpp"
8
9 #include <armnn/Exceptions.hpp>
10 #include <common/include/Constants.hpp>
11 #include <common/include/ProfilingException.hpp>
12
13 #include <algorithm>
14 #include <iostream>
15 #include <thread>
16
17 namespace armnn
18 {
19
20 namespace profiling
21 {
22
23 std::vector<uint32_t> StreamMetaDataProcessor::GetHeadersAccepted()
24 {
25     std::vector<uint32_t> headers;
26     headers.push_back(m_MetaDataPacketHeader);
27     return headers;
28 }
29
30 void StreamMetaDataProcessor::HandlePacket(const arm::pipe::Packet& packet)
31 {
32     if (packet.GetHeader() != m_MetaDataPacketHeader)
33     {
34         throw arm::pipe::ProfilingException("StreamMetaDataProcessor can only handle Stream Meta Data Packets");
35     }
36     // determine the endianness of the protocol
37     TargetEndianness endianness;
38     if (ToUint32(packet.GetData(),TargetEndianness::BeWire) == arm::pipe::PIPE_MAGIC)
39     {
40         endianness = TargetEndianness::BeWire;
41     }
42     else if (ToUint32(packet.GetData(), TargetEndianness::LeWire) == arm::pipe::PIPE_MAGIC)
43     {
44         endianness = TargetEndianness::LeWire;
45     }
46     else
47     {
48         throw arm::pipe::ProfilingException("Protocol read error. Unable to read the PIPE_MAGIC value.");
49     }
50     m_FileOnlyProfilingConnection->SetEndianess(endianness);
51     // send back the acknowledgement
52     std::unique_ptr<unsigned char[]> uniqueNullPtr = nullptr;
53     arm::pipe::Packet returnPacket(0x10000, 0, uniqueNullPtr);
54     m_FileOnlyProfilingConnection->ReturnPacket(returnPacket);
55 }
56
57 uint32_t StreamMetaDataProcessor::ToUint32(const unsigned char* data, TargetEndianness endianness)
58 {
59     // Extract the first 4 bytes starting at data and push them into a 32bit integer based on the
60     // specified endianness.
61     if (endianness == TargetEndianness::BeWire)
62     {
63         return static_cast<uint32_t>(data[0]) << 24 | static_cast<uint32_t>(data[1]) << 16 |
64                static_cast<uint32_t>(data[2]) << 8 | static_cast<uint32_t>(data[3]);
65     }
66     else
67     {
68         return static_cast<uint32_t>(data[3]) << 24 | static_cast<uint32_t>(data[2]) << 16 |
69                static_cast<uint32_t>(data[1]) << 8 | static_cast<uint32_t>(data[0]);
70     }
71 }
72
73 FileOnlyProfilingConnection::~FileOnlyProfilingConnection()
74 {
75     try
76     {
77         Close();
78     }
79     catch (...)
80     {
81         // do nothing
82     }
83 }
84
85 bool FileOnlyProfilingConnection::IsOpen() const
86 {
87     // This type of connection is always open.
88     return true;
89 }
90
91 void FileOnlyProfilingConnection::Close()
92 {
93     // Dump any unread packets out of the queue.
94     size_t initialSize = m_PacketQueue.size();
95     for (size_t i = 0; i < initialSize; ++i)
96     {
97         m_PacketQueue.pop();
98     }
99     // dispose of the processing thread
100     m_KeepRunning.store(false);
101     if (m_LocalHandlersThread.joinable())
102     {
103         // make sure the thread wakes up and sees it has to stop
104         m_ConditionPacketReadable.notify_one();
105         m_LocalHandlersThread.join();
106     }
107 }
108
109 bool FileOnlyProfilingConnection::WritePacket(const unsigned char* buffer, uint32_t length)
110 {
111     ARMNN_ASSERT(buffer);
112     arm::pipe::Packet packet = ReceivePacket(buffer, length);
113     ForwardPacketToHandlers(packet);
114     return true;
115 }
116
117 void FileOnlyProfilingConnection::ReturnPacket(arm::pipe::Packet& packet)
118 {
119     {
120         std::lock_guard<std::mutex> lck(m_PacketAvailableMutex);
121         m_PacketQueue.push(std::move(packet));
122     }
123     m_ConditionPacketAvailable.notify_one();
124 }
125
126 arm::pipe::Packet FileOnlyProfilingConnection::ReadPacket(uint32_t timeout)
127 {
128     std::unique_lock<std::mutex> lck(m_PacketAvailableMutex);
129
130     // Here we are using m_PacketQueue.empty() as a predicate variable
131     // The conditional variable will wait until packetQueue is not empty or until a timeout
132     if (!m_ConditionPacketAvailable.wait_for(lck,
133                                              std::chrono::milliseconds(timeout),
134                                              [&]{return !m_PacketQueue.empty();}))
135     {
136         arm::pipe::Packet empty;
137         return empty;
138     }
139
140     arm::pipe::Packet returnedPacket = std::move(m_PacketQueue.front());
141     m_PacketQueue.pop();
142     return returnedPacket;
143 }
144
145 void FileOnlyProfilingConnection::Fail(const std::string& errorMessage)
146 {
147     Close();
148     throw RuntimeException(errorMessage);
149 }
150
151 /// Adds a local packet handler to the FileOnlyProfilingConnection. Invoking this will start
152 /// a processing thread that will ensure that processing of packets will happen on a separate
153 /// thread from the profiling services send thread and will therefore protect against the
154 /// profiling message buffer becoming exhausted because packet handling slows the dispatch.
155 void FileOnlyProfilingConnection::AddLocalPacketHandler(ILocalPacketHandlerSharedPtr localPacketHandler)
156 {
157     m_PacketHandlers.push_back(std::move(localPacketHandler));
158     ILocalPacketHandlerSharedPtr localCopy = m_PacketHandlers.back();
159     localCopy->SetConnection(this);
160     if (localCopy->GetHeadersAccepted().empty())
161     {
162         //this is a universal handler
163         m_UniversalHandlers.push_back(localCopy);
164     }
165     else
166     {
167         for (uint32_t header : localCopy->GetHeadersAccepted())
168         {
169             auto iter = m_IndexedHandlers.find(header);
170             if (iter == m_IndexedHandlers.end())
171             {
172                 std::vector<ILocalPacketHandlerSharedPtr> handlers;
173                 handlers.push_back(localCopy);
174                 m_IndexedHandlers.emplace(std::make_pair(header, handlers));
175             }
176             else
177             {
178                 iter->second.push_back(localCopy);
179             }
180         }
181     }
182 }
183
184 void FileOnlyProfilingConnection::StartProcessingThread()
185 {
186     // check if the thread has already started
187     if (m_IsRunning.load())
188     {
189         return;
190     }
191     // make sure if there was one running before it is joined
192     if (m_LocalHandlersThread.joinable())
193     {
194         m_LocalHandlersThread.join();
195     }
196     m_IsRunning.store(true);
197     m_KeepRunning.store(true);
198     m_LocalHandlersThread = std::thread(&FileOnlyProfilingConnection::ServiceLocalHandlers, this);
199 }
200
201 void FileOnlyProfilingConnection::ForwardPacketToHandlers(arm::pipe::Packet& packet)
202 {
203     if (m_PacketHandlers.empty())
204     {
205         return;
206     }
207     if (!m_KeepRunning.load())
208     {
209         return;
210     }
211     {
212         std::unique_lock<std::mutex> readableListLock(m_ReadableMutex);
213         if (!m_KeepRunning.load())
214         {
215             return;
216         }
217         m_ReadableList.push(std::move(packet));
218     }
219     m_ConditionPacketReadable.notify_one();
220 }
221
222 void FileOnlyProfilingConnection::ServiceLocalHandlers()
223 {
224     do
225     {
226         arm::pipe::Packet returnedPacket;
227         bool readPacket = false;
228         {   // only lock while we are taking the packet off the incoming list
229             std::unique_lock<std::mutex> lck(m_ReadableMutex);
230             if (m_Timeout < 0)
231             {
232                 m_ConditionPacketReadable.wait(lck,
233                                                [&] { return !m_ReadableList.empty(); });
234             }
235             else
236             {
237                 m_ConditionPacketReadable.wait_for(lck,
238                                                    std::chrono::milliseconds(std::max(m_Timeout, 1000)),
239                                                    [&] { return !m_ReadableList.empty(); });
240             }
241             if (m_KeepRunning.load())
242             {
243                 if (!m_ReadableList.empty())
244                 {
245                     returnedPacket = std::move(m_ReadableList.front());
246                     m_ReadableList.pop();
247                     readPacket = true;
248                 }
249             }
250             else
251             {
252                 ClearReadableList();
253             }
254         }
255         if (m_KeepRunning.load() && readPacket)
256         {
257             DispatchPacketToHandlers(returnedPacket);
258         }
259     } while (m_KeepRunning.load());
260     // make sure the readable list is cleared
261     ClearReadableList();
262     m_IsRunning.store(false);
263 }
264
265 void FileOnlyProfilingConnection::ClearReadableList()
266 {
267     // make sure the incoming packet queue gets emptied
268     size_t initialSize = m_ReadableList.size();
269     for (size_t i = 0; i < initialSize; ++i)
270     {
271         m_ReadableList.pop();
272     }
273 }
274
275 void FileOnlyProfilingConnection::DispatchPacketToHandlers(const arm::pipe::Packet& packet)
276 {
277     for (auto& delegate : m_UniversalHandlers)
278     {
279         delegate->HandlePacket(packet);
280     }
281     auto iter = m_IndexedHandlers.find(packet.GetHeader());
282     if (iter != m_IndexedHandlers.end())
283     {
284         for (auto& delegate : iter->second)
285         {
286             try
287             {
288                 delegate->HandlePacket(packet);
289             }
290             catch (const arm::pipe::ProfilingException& ex)
291             {
292                 Fail(ex.what());
293             }
294             catch (const std::exception& ex)
295             {
296                 Fail(ex.what());
297             }
298             catch (...)
299             {
300                 Fail("handler failed");
301             }
302         }
303     }
304 }
305
306 }    // namespace profiling
307
308 }    // namespace armnn