IVGCVSW-5301 Remove all boost::numeric_cast from armnn/src/profiling
[platform/upstream/armnn.git] / src / profiling / SendThread.cpp
1 //
2 // Copyright © 2020 Arm Ltd and Contributors. All rights reserved.
3 // SPDX-License-Identifier: MIT
4 //
5
6 #include "SendThread.hpp"
7 #include "ProfilingUtils.hpp"
8
9 #include <armnn/Exceptions.hpp>
10 #include <armnn/Conversion.hpp>
11 #include <armnn/utility/NumericCast.hpp>
12
13 #include <Processes.hpp>
14
15 #include <boost/format.hpp>
16
17 #include <cstring>
18
19 namespace armnn
20 {
21
22 namespace profiling
23 {
24
25 SendThread::SendThread(armnn::profiling::ProfilingStateMachine& profilingStateMachine,
26                        armnn::profiling::IBufferManager& buffer,
27                        armnn::profiling::ISendCounterPacket& sendCounterPacket,
28                        int timeout)
29     : m_StateMachine(profilingStateMachine)
30     , m_BufferManager(buffer)
31     , m_SendCounterPacket(sendCounterPacket)
32     , m_Timeout(timeout)
33     , m_IsRunning(false)
34     , m_KeepRunning(false)
35     , m_SendThreadException(nullptr)
36 {
37     m_BufferManager.SetConsumer(this);
38 }
39
40 void SendThread::SetReadyToRead()
41 {
42     // We need to wait for the send thread to release its mutex
43     {
44         std::lock_guard<std::mutex> lck(m_WaitMutex);
45         m_ReadyToRead = true;
46     }
47     // Signal the send thread that there's something to read in the buffer
48     m_WaitCondition.notify_one();
49 }
50
51 void SendThread::Start(IProfilingConnection& profilingConnection)
52 {
53     // Check if the send thread is already running
54     if (m_IsRunning.load())
55     {
56         // The send thread is already running
57         return;
58     }
59
60     if (m_SendThread.joinable())
61     {
62         m_SendThread.join();
63     }
64
65     // Mark the send thread as running
66     m_IsRunning.store(true);
67
68     // Keep the send procedure going until the send thread is signalled to stop
69     m_KeepRunning.store(true);
70
71     // Make sure the send thread will not flush the buffer until signaled to do so
72     // no need for a mutex as the send thread can not be running at this point
73     m_ReadyToRead = false;
74
75     m_PacketSent = false;
76
77     // Start the send thread
78     m_SendThread = std::thread(&SendThread::Send, this, std::ref(profilingConnection));
79 }
80
81 void SendThread::Stop(bool rethrowSendThreadExceptions)
82 {
83     // Signal the send thread to stop
84     m_KeepRunning.store(false);
85
86     // Check that the send thread is running
87     if (m_SendThread.joinable())
88     {
89         // Kick the send thread out of the wait condition
90         SetReadyToRead();
91         // Wait for the send thread to complete operations
92         m_SendThread.join();
93     }
94
95     // Check if the send thread exception has to be rethrown
96     if (!rethrowSendThreadExceptions)
97     {
98         // No need to rethrow the send thread exception, return immediately
99         return;
100     }
101
102     // Check if there's an exception to rethrow
103     if (m_SendThreadException)
104     {
105         // Rethrow the send thread exception
106         std::rethrow_exception(m_SendThreadException);
107
108         // Nullify the exception as it has been rethrown
109         m_SendThreadException = nullptr;
110     }
111 }
112
113 void SendThread::Send(IProfilingConnection& profilingConnection)
114 {
115     // Run once and keep the sending procedure looping until the thread is signalled to stop
116     do
117     {
118         // Check the current state of the profiling service
119         ProfilingState currentState = m_StateMachine.GetCurrentState();
120         switch (currentState)
121         {
122         case ProfilingState::Uninitialised:
123         case ProfilingState::NotConnected:
124
125             // The send thread cannot be running when the profiling service is uninitialized or not connected,
126             // stop the thread immediately
127             m_KeepRunning.store(false);
128             m_IsRunning.store(false);
129
130             // An exception should be thrown here, save it to be rethrown later from the main thread so that
131             // it can be caught by the consumer
132             m_SendThreadException =
133                     std::make_exception_ptr(RuntimeException("The send thread should not be running with the "
134                                                              "profiling service not yet initialized or connected"));
135
136             return;
137         case ProfilingState::WaitingForAck:
138
139             // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged.
140             // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically
141             // updated by the command handler
142
143             // Prepare a StreamMetadata packet and write it to the Counter Stream buffer
144             m_SendCounterPacket.SendStreamMetaDataPacket();
145
146              // Flush the buffer manually to send the packet
147             FlushBuffer(profilingConnection);
148
149             // Wait for a connection ack from the remote server. We should expect a response within timeout value.
150             // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the
151             // StreamMetadata again.
152
153             // Wait condition lock scope - Begin
154             {
155                 std::unique_lock<std::mutex> lock(m_WaitMutex);
156
157                 bool timeout = m_WaitCondition.wait_for(lock,
158                                                         std::chrono::milliseconds(std::max(m_Timeout, 1000)),
159                                                         [&]{ return m_ReadyToRead; });
160                 // If we get notified we need to flush the buffer again
161                 if(timeout)
162                 {
163                     // Otherwise if we just timed out don't flush the buffer
164                     continue;
165                 }
166                 //reset condition variable predicate for next use
167                 m_ReadyToRead = false;
168             }
169             // Wait condition lock scope - End
170             break;
171         case ProfilingState::Active:
172         default:
173             // Wait condition lock scope - Begin
174             {
175                 std::unique_lock<std::mutex> lock(m_WaitMutex);
176
177                 // Normal working state for the send thread
178                 // Check if the send thread is required to enforce a timeout wait policy
179                 if (m_Timeout < 0)
180                 {
181                     // Wait indefinitely until notified that something to read has become available in the buffer
182                     m_WaitCondition.wait(lock, [&] { return m_ReadyToRead; });
183                 }
184                 else
185                 {
186                     // Wait until the thread is notified of something to read from the buffer,
187                     // or check anyway after the specified number of milliseconds
188                     m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] { return m_ReadyToRead; });
189                 }
190
191                 //reset condition variable predicate for next use
192                 m_ReadyToRead = false;
193             }
194             // Wait condition lock scope - End
195             break;
196         }
197
198         // Send all the available packets in the buffer
199         FlushBuffer(profilingConnection);
200     } while (m_KeepRunning.load());
201
202     // Ensure that all readable data got written to the profiling connection before the thread is stopped
203     // (do not notify any watcher in this case, as this is just to wrap up things before shutting down the send thread)
204     FlushBuffer(profilingConnection, false);
205
206     // Mark the send thread as not running
207     m_IsRunning.store(false);
208 }
209
210 void SendThread::FlushBuffer(IProfilingConnection& profilingConnection, bool notifyWatchers)
211 {
212     // Get the first available readable buffer
213     IPacketBufferPtr packetBuffer = m_BufferManager.GetReadableBuffer();
214
215     // Initialize the flag that indicates whether at least a packet has been sent
216     bool packetsSent = false;
217
218     while (packetBuffer != nullptr)
219     {
220         // Get the data to send from the buffer
221         const unsigned char* readBuffer = packetBuffer->GetReadableData();
222         unsigned int readBufferSize = packetBuffer->GetSize();
223
224         if (readBuffer == nullptr || readBufferSize == 0)
225         {
226             // Nothing to send, get the next available readable buffer and continue
227             m_BufferManager.MarkRead(packetBuffer);
228             packetBuffer = m_BufferManager.GetReadableBuffer();
229
230             continue;
231         }
232
233         // Check that the profiling connection is open, silently drop the data and continue if it's closed
234         if (profilingConnection.IsOpen())
235         {
236             // Write a packet to the profiling connection. Silently ignore any write error and continue
237             profilingConnection.WritePacket(readBuffer, armnn::numeric_cast<uint32_t>(readBufferSize));
238
239             // Set the flag that indicates whether at least a packet has been sent
240             packetsSent = true;
241         }
242
243         // Mark the packet buffer as read
244         m_BufferManager.MarkRead(packetBuffer);
245
246         // Get the next available readable buffer
247         packetBuffer = m_BufferManager.GetReadableBuffer();
248     }
249     // Check whether at least a packet has been sent
250     if (packetsSent && notifyWatchers)
251     {
252         // Wait for the parent thread to release its mutex if necessary
253         {
254             std::lock_guard<std::mutex> lck(m_PacketSentWaitMutex);
255             m_PacketSent = true;
256         }
257         // Notify to any watcher that something has been sent
258         m_PacketSentWaitCondition.notify_one();
259     }
260 }
261
262 bool SendThread::WaitForPacketSent(uint32_t timeout = 1000)
263 {
264     std::unique_lock<std::mutex> lock(m_PacketSentWaitMutex);
265     // Blocks until notified that at least a packet has been sent or until timeout expires.
266     bool timedOut = m_PacketSentWaitCondition.wait_for(lock,
267                                                        std::chrono::milliseconds(timeout),
268                                                        [&] { return m_PacketSent; });
269
270     m_PacketSent = false;
271
272     return timedOut;
273 }
274
275 } // namespace profiling
276
277 } // namespace armnn