2 // Copyright © 2020 Arm Ltd and Contributors. All rights reserved.
3 // SPDX-License-Identifier: MIT
6 #include "SendThread.hpp"
7 #include "ProfilingUtils.hpp"
9 #include <armnn/Exceptions.hpp>
10 #include <armnn/Conversion.hpp>
11 #include <armnn/utility/NumericCast.hpp>
13 #include <Processes.hpp>
15 #include <boost/format.hpp>
25 SendThread::SendThread(armnn::profiling::ProfilingStateMachine& profilingStateMachine,
26 armnn::profiling::IBufferManager& buffer,
27 armnn::profiling::ISendCounterPacket& sendCounterPacket,
29 : m_StateMachine(profilingStateMachine)
30 , m_BufferManager(buffer)
31 , m_SendCounterPacket(sendCounterPacket)
34 , m_KeepRunning(false)
35 , m_SendThreadException(nullptr)
37 m_BufferManager.SetConsumer(this);
40 void SendThread::SetReadyToRead()
42 // We need to wait for the send thread to release its mutex
44 std::lock_guard<std::mutex> lck(m_WaitMutex);
47 // Signal the send thread that there's something to read in the buffer
48 m_WaitCondition.notify_one();
51 void SendThread::Start(IProfilingConnection& profilingConnection)
53 // Check if the send thread is already running
54 if (m_IsRunning.load())
56 // The send thread is already running
60 if (m_SendThread.joinable())
65 // Mark the send thread as running
66 m_IsRunning.store(true);
68 // Keep the send procedure going until the send thread is signalled to stop
69 m_KeepRunning.store(true);
71 // Make sure the send thread will not flush the buffer until signaled to do so
72 // no need for a mutex as the send thread can not be running at this point
73 m_ReadyToRead = false;
77 // Start the send thread
78 m_SendThread = std::thread(&SendThread::Send, this, std::ref(profilingConnection));
81 void SendThread::Stop(bool rethrowSendThreadExceptions)
83 // Signal the send thread to stop
84 m_KeepRunning.store(false);
86 // Check that the send thread is running
87 if (m_SendThread.joinable())
89 // Kick the send thread out of the wait condition
91 // Wait for the send thread to complete operations
95 // Check if the send thread exception has to be rethrown
96 if (!rethrowSendThreadExceptions)
98 // No need to rethrow the send thread exception, return immediately
102 // Check if there's an exception to rethrow
103 if (m_SendThreadException)
105 // Rethrow the send thread exception
106 std::rethrow_exception(m_SendThreadException);
108 // Nullify the exception as it has been rethrown
109 m_SendThreadException = nullptr;
113 void SendThread::Send(IProfilingConnection& profilingConnection)
115 // Run once and keep the sending procedure looping until the thread is signalled to stop
118 // Check the current state of the profiling service
119 ProfilingState currentState = m_StateMachine.GetCurrentState();
120 switch (currentState)
122 case ProfilingState::Uninitialised:
123 case ProfilingState::NotConnected:
125 // The send thread cannot be running when the profiling service is uninitialized or not connected,
126 // stop the thread immediately
127 m_KeepRunning.store(false);
128 m_IsRunning.store(false);
130 // An exception should be thrown here, save it to be rethrown later from the main thread so that
131 // it can be caught by the consumer
132 m_SendThreadException =
133 std::make_exception_ptr(RuntimeException("The send thread should not be running with the "
134 "profiling service not yet initialized or connected"));
137 case ProfilingState::WaitingForAck:
139 // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged.
140 // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically
141 // updated by the command handler
143 // Prepare a StreamMetadata packet and write it to the Counter Stream buffer
144 m_SendCounterPacket.SendStreamMetaDataPacket();
146 // Flush the buffer manually to send the packet
147 FlushBuffer(profilingConnection);
149 // Wait for a connection ack from the remote server. We should expect a response within timeout value.
150 // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the
151 // StreamMetadata again.
153 // Wait condition lock scope - Begin
155 std::unique_lock<std::mutex> lock(m_WaitMutex);
157 bool timeout = m_WaitCondition.wait_for(lock,
158 std::chrono::milliseconds(std::max(m_Timeout, 1000)),
159 [&]{ return m_ReadyToRead; });
160 // If we get notified we need to flush the buffer again
163 // Otherwise if we just timed out don't flush the buffer
166 //reset condition variable predicate for next use
167 m_ReadyToRead = false;
169 // Wait condition lock scope - End
171 case ProfilingState::Active:
173 // Wait condition lock scope - Begin
175 std::unique_lock<std::mutex> lock(m_WaitMutex);
177 // Normal working state for the send thread
178 // Check if the send thread is required to enforce a timeout wait policy
181 // Wait indefinitely until notified that something to read has become available in the buffer
182 m_WaitCondition.wait(lock, [&] { return m_ReadyToRead; });
186 // Wait until the thread is notified of something to read from the buffer,
187 // or check anyway after the specified number of milliseconds
188 m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] { return m_ReadyToRead; });
191 //reset condition variable predicate for next use
192 m_ReadyToRead = false;
194 // Wait condition lock scope - End
198 // Send all the available packets in the buffer
199 FlushBuffer(profilingConnection);
200 } while (m_KeepRunning.load());
202 // Ensure that all readable data got written to the profiling connection before the thread is stopped
203 // (do not notify any watcher in this case, as this is just to wrap up things before shutting down the send thread)
204 FlushBuffer(profilingConnection, false);
206 // Mark the send thread as not running
207 m_IsRunning.store(false);
210 void SendThread::FlushBuffer(IProfilingConnection& profilingConnection, bool notifyWatchers)
212 // Get the first available readable buffer
213 IPacketBufferPtr packetBuffer = m_BufferManager.GetReadableBuffer();
215 // Initialize the flag that indicates whether at least a packet has been sent
216 bool packetsSent = false;
218 while (packetBuffer != nullptr)
220 // Get the data to send from the buffer
221 const unsigned char* readBuffer = packetBuffer->GetReadableData();
222 unsigned int readBufferSize = packetBuffer->GetSize();
224 if (readBuffer == nullptr || readBufferSize == 0)
226 // Nothing to send, get the next available readable buffer and continue
227 m_BufferManager.MarkRead(packetBuffer);
228 packetBuffer = m_BufferManager.GetReadableBuffer();
233 // Check that the profiling connection is open, silently drop the data and continue if it's closed
234 if (profilingConnection.IsOpen())
236 // Write a packet to the profiling connection. Silently ignore any write error and continue
237 profilingConnection.WritePacket(readBuffer, armnn::numeric_cast<uint32_t>(readBufferSize));
239 // Set the flag that indicates whether at least a packet has been sent
243 // Mark the packet buffer as read
244 m_BufferManager.MarkRead(packetBuffer);
246 // Get the next available readable buffer
247 packetBuffer = m_BufferManager.GetReadableBuffer();
249 // Check whether at least a packet has been sent
250 if (packetsSent && notifyWatchers)
252 // Wait for the parent thread to release its mutex if necessary
254 std::lock_guard<std::mutex> lck(m_PacketSentWaitMutex);
257 // Notify to any watcher that something has been sent
258 m_PacketSentWaitCondition.notify_one();
262 bool SendThread::WaitForPacketSent(uint32_t timeout = 1000)
264 std::unique_lock<std::mutex> lock(m_PacketSentWaitMutex);
265 // Blocks until notified that at least a packet has been sent or until timeout expires.
266 bool timedOut = m_PacketSentWaitCondition.wait_for(lock,
267 std::chrono::milliseconds(timeout),
268 [&] { return m_PacketSent; });
270 m_PacketSent = false;
275 } // namespace profiling