14 #include <boost/format.hpp> 15 #include <boost/numeric/conversion/cast.hpp> 16 #include <boost/core/ignore_unused.hpp> 26 using boost::numeric_cast;
30 : m_StateMachine(profilingStateMachine)
31 , m_BufferManager(buffer)
32 , m_SendCounterPacket(sendCounterPacket)
35 , m_KeepRunning(
false)
36 , m_SendThreadException(nullptr)
45 std::lock_guard<std::mutex> lck(m_WaitMutex);
49 m_WaitCondition.notify_one();
55 if (m_IsRunning.load())
61 if (m_SendThread.joinable())
67 m_IsRunning.store(
true);
70 m_KeepRunning.store(
true);
74 m_ReadyToRead =
false;
79 m_SendThread = std::thread(&SendThread::Send,
this, std::ref(profilingConnection));
85 m_KeepRunning.store(
false);
88 if (m_SendThread.joinable())
97 if (!rethrowSendThreadExceptions)
104 if (m_SendThreadException)
107 std::rethrow_exception(m_SendThreadException);
110 m_SendThreadException =
nullptr;
121 switch (currentState)
128 m_KeepRunning.store(
false);
129 m_IsRunning.store(
false);
133 m_SendThreadException =
134 std::make_exception_ptr(
RuntimeException(
"The send thread should not be running with the " 135 "profiling service not yet initialized or connected"));
148 FlushBuffer(profilingConnection);
156 std::unique_lock<std::mutex> lock(m_WaitMutex);
158 bool timeout = m_WaitCondition.wait_for(lock,
159 std::chrono::milliseconds(std::max(m_Timeout, 1000)),
160 [&]{
return m_ReadyToRead; });
168 m_ReadyToRead =
false;
176 std::unique_lock<std::mutex> lock(m_WaitMutex);
183 m_WaitCondition.wait(lock, [&] {
return m_ReadyToRead; });
189 m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] {
return m_ReadyToRead; });
193 m_ReadyToRead =
false;
200 FlushBuffer(profilingConnection);
201 }
while (m_KeepRunning.load());
205 FlushBuffer(profilingConnection,
false);
208 m_IsRunning.store(
false);
217 bool packetsSent =
false;
219 while (packetBuffer !=
nullptr)
222 const unsigned char* readBuffer = packetBuffer->GetReadableData();
223 unsigned int readBufferSize = packetBuffer->GetSize();
225 if (readBuffer ==
nullptr || readBufferSize == 0)
228 m_BufferManager.
MarkRead(packetBuffer);
235 if (profilingConnection.
IsOpen())
238 profilingConnection.
WritePacket(readBuffer, boost::numeric_cast<uint32_t>(readBufferSize));
245 m_BufferManager.
MarkRead(packetBuffer);
251 if (packetsSent && notifyWatchers)
255 std::lock_guard<std::mutex> lck(m_PacketSentWaitMutex);
259 m_PacketSentWaitCondition.notify_one();
265 std::unique_lock<std::mutex> lock(m_PacketSentWaitMutex);
267 bool timedOut = m_PacketSentWaitCondition.wait_for(lock,
268 std::chrono::milliseconds(timeout),
269 [&] {
return m_PacketSent; });
271 m_PacketSent =
false;
void Start(IProfilingConnection &profilingConnection) override
Start the thread.
bool WaitForPacketSent(uint32_t timeout)
std::unique_ptr< IPacketBuffer > IPacketBufferPtr
virtual void MarkRead(IPacketBufferPtr &packetBuffer)=0
void Stop(bool rethrowSendThreadExceptions=true) override
Stop the thread.
virtual void SendStreamMetaDataPacket()=0
Create and write a StreamMetaDataPacket in the buffer.
virtual bool WritePacket(const unsigned char *buffer, uint32_t length)=0
ProfilingState GetCurrentState() const
void SetReadyToRead() override
Set a "ready to read" flag in the buffer to notify the reading thread to start reading it...
SendThread(ProfilingStateMachine &profilingStateMachine, IBufferManager &buffer, ISendCounterPacket &sendCounterPacket, int timeout=1000)
virtual IPacketBufferPtr GetReadableBuffer()=0
virtual void SetConsumer(IConsumer *consumer)=0
virtual bool IsOpen() const =0