14 #include <boost/format.hpp> 15 #include <boost/numeric/conversion/cast.hpp> 29 : m_StateMachine(profilingStateMachine)
30 , m_BufferManager(buffer)
31 , m_SendCounterPacket(sendCounterPacket)
34 , m_KeepRunning(
false)
35 , m_SendThreadException(nullptr)
44 std::lock_guard<std::mutex> lck(m_WaitMutex);
48 m_WaitCondition.notify_one();
54 if (m_IsRunning.load())
60 if (m_SendThread.joinable())
66 m_IsRunning.store(
true);
69 m_KeepRunning.store(
true);
73 m_ReadyToRead =
false;
78 m_SendThread = std::thread(&SendThread::Send,
this, std::ref(profilingConnection));
84 m_KeepRunning.store(
false);
87 if (m_SendThread.joinable())
96 if (!rethrowSendThreadExceptions)
103 if (m_SendThreadException)
106 std::rethrow_exception(m_SendThreadException);
109 m_SendThreadException =
nullptr;
120 switch (currentState)
127 m_KeepRunning.store(
false);
128 m_IsRunning.store(
false);
132 m_SendThreadException =
133 std::make_exception_ptr(
RuntimeException(
"The send thread should not be running with the " 134 "profiling service not yet initialized or connected"));
147 FlushBuffer(profilingConnection);
155 std::unique_lock<std::mutex> lock(m_WaitMutex);
157 bool timeout = m_WaitCondition.wait_for(lock,
158 std::chrono::milliseconds(std::max(m_Timeout, 1000)),
159 [&]{
return m_ReadyToRead; });
167 m_ReadyToRead =
false;
175 std::unique_lock<std::mutex> lock(m_WaitMutex);
182 m_WaitCondition.wait(lock, [&] {
return m_ReadyToRead; });
188 m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] {
return m_ReadyToRead; });
192 m_ReadyToRead =
false;
199 FlushBuffer(profilingConnection);
200 }
while (m_KeepRunning.load());
204 FlushBuffer(profilingConnection,
false);
207 m_IsRunning.store(
false);
216 bool packetsSent =
false;
218 while (packetBuffer !=
nullptr)
221 const unsigned char* readBuffer = packetBuffer->GetReadableData();
222 unsigned int readBufferSize = packetBuffer->GetSize();
224 if (readBuffer ==
nullptr || readBufferSize == 0)
227 m_BufferManager.
MarkRead(packetBuffer);
234 if (profilingConnection.
IsOpen())
237 profilingConnection.
WritePacket(readBuffer, boost::numeric_cast<uint32_t>(readBufferSize));
244 m_BufferManager.
MarkRead(packetBuffer);
250 if (packetsSent && notifyWatchers)
254 std::lock_guard<std::mutex> lck(m_PacketSentWaitMutex);
258 m_PacketSentWaitCondition.notify_one();
264 std::unique_lock<std::mutex> lock(m_PacketSentWaitMutex);
266 bool timedOut = m_PacketSentWaitCondition.wait_for(lock,
267 std::chrono::milliseconds(timeout),
268 [&] {
return m_PacketSent; });
270 m_PacketSent =
false;
ProfilingState GetCurrentState() const
virtual IPacketBufferPtr GetReadableBuffer()=0
Copyright (c) 2020 ARM Limited.
virtual void MarkRead(IPacketBufferPtr &packetBuffer)=0
bool WaitForPacketSent(uint32_t timeout)
SendThread(ProfilingStateMachine &profilingStateMachine, IBufferManager &buffer, ISendCounterPacket &sendCounterPacket, int timeout=1000)
std::enable_if_t< std::is_unsigned< Source >::value &&std::is_unsigned< Dest >::value, Dest > numeric_cast(Source source)
void SetReadyToRead() override
Set a "ready to read" flag in the buffer to notify the reading thread to start reading it...
virtual void SendStreamMetaDataPacket()=0
Create and write a StreamMetaDataPacket in the buffer.
virtual void SetConsumer(IConsumer *consumer)=0
virtual bool IsOpen() const =0
void Stop(bool rethrowSendThreadExceptions=true) override
Stop the thread.
std::unique_ptr< IPacketBuffer > IPacketBufferPtr
void Start(IProfilingConnection &profilingConnection) override
Start the thread.
virtual bool WritePacket(const unsigned char *buffer, uint32_t length)=0