std::unique_ptr<IPacketBuffer> BufferManager::Reserve(unsigned int requestedSize, unsigned int& reservedSize)
{
+ reservedSize = 0;
std::unique_lock<std::mutex> availableListLock(m_AvailableMutex, std::defer_lock);
if (requestedSize > m_MaxBufferSize)
{
- throw armnn::InvalidArgumentException("The maximum buffer size that can be requested is [" +
- std::to_string(m_MaxBufferSize) + "] bytes");
+ return nullptr;
}
availableListLock.lock();
if (m_AvailableList.empty())
{
- throw armnn::profiling::BufferExhaustion("Buffer not available");
+ availableListLock.unlock();
+ return nullptr;
}
std::unique_ptr<IPacketBuffer> buffer = std::move(m_AvailableList.back());
m_AvailableList.pop_back();
std::unique_ptr<IPacketBuffer> writeBuffer = m_BufferManager.Reserve(totalSize, reserved);
- if (reserved < totalSize)
+ if (writeBuffer == nullptr || reserved < totalSize)
{
CancelOperationAndThrow<BufferExhaustion>(
- writeBuffer,
- boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.")
- % totalSize));
- }
-
- if (writeBuffer == nullptr)
- {
- CancelOperationAndThrow<RuntimeException>("Error reserving buffer memory.");
+ writeBuffer,
+ boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") % totalSize));
}
try
uint32_t reserved = 0;
std::unique_ptr<IPacketBuffer> writeBuffer = m_BufferManager.Reserve(totalSize, reserved);
- // Check that the reserved buffer size is enough to hold the counter directory packet
- if (reserved < totalSize)
+ if (writeBuffer == nullptr || reserved < totalSize)
{
CancelOperationAndThrow<BufferExhaustion>(
- writeBuffer,
- boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes")
- % totalSize));
- }
-
- // Check the buffer handle is valid
- if (writeBuffer == nullptr)
- {
- CancelOperationAndThrow<RuntimeException>("Error reserving buffer memory");
+ writeBuffer,
+ boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") % totalSize));
}
// Offset for writing to the buffer
std::unique_ptr<IPacketBuffer> writeBuffer = m_BufferManager.Reserve(totalSize, reserved);
- if (reserved < totalSize)
+ if (writeBuffer == nullptr || reserved < totalSize)
{
CancelOperationAndThrow<BufferExhaustion>(
- writeBuffer,
- boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.")
- % totalSize));
- }
-
- if (writeBuffer == nullptr)
- {
- CancelOperationAndThrow<RuntimeException>("Error reserving buffer memory.");
+ writeBuffer,
+ boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") % totalSize));
}
// Create header.
std::unique_ptr<IPacketBuffer> writeBuffer = m_BufferManager.Reserve(totalSize, reserved);
- if (reserved < totalSize)
+ if (writeBuffer == nullptr || reserved < totalSize)
{
CancelOperationAndThrow<BufferExhaustion>(
- writeBuffer,
- boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.")
- % totalSize));
- }
-
- if (writeBuffer == nullptr)
- {
- CancelOperationAndThrow<RuntimeException>("Error reserving buffer memory.");
+ writeBuffer,
+ boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") % totalSize));
}
// Create header.
// Lock the mutex to wait on it
std::unique_lock<std::mutex> lock(m_WaitMutex);
- // Wait until the thread is notified of something to read from the buffer, or check anyway after a second
- m_WaitCondition.wait_for(lock, std::chrono::seconds(1));
+ if (m_Timeout < 0)
+ {
+ // Wait indefinitely until notified that something to read has become available in the buffer
+ m_WaitCondition.wait(lock);
+ }
+ else
+ {
+ // Wait until the thread is notified of something to read from the buffer,
+ // or check anyway after a second
+ m_WaitCondition.wait_for(lock, std::chrono::seconds(m_Timeout));
+ }
}
// Wait condition lock scope - End
- // Get the buffer to read from
- std::unique_ptr<IPacketBuffer> packetBuffer = m_BufferManager.GetReadableBuffer();
- if (packetBuffer == nullptr)
- {
- // Nothing to read from, ignore and continue
- continue;
- }
+ FlushBuffer();
+ }
+ // Ensure that all readable data got written to the profiling connection before the thread is stopped
+ FlushBuffer();
+
+ // Mark the send thread as not running
+ m_IsRunning.store(false);
+}
+
+void SendCounterPacket::FlushBuffer()
+{
+ // Get the first available readable buffer
+ std::unique_ptr<IPacketBuffer> packetBuffer = m_BufferManager.GetReadableBuffer();
+
+ while (packetBuffer != nullptr)
+ {
// Get the data to send from the buffer
const unsigned char* readBuffer = packetBuffer->GetReadableData();
unsigned int readBufferSize = packetBuffer->GetSize();
+
if (readBuffer == nullptr || readBufferSize == 0)
{
- // Nothing to send, ignore and continue
+ // Nothing to send, get the next available readable buffer and continue
+ m_BufferManager.MarkRead(packetBuffer);
+ packetBuffer = m_BufferManager.GetReadableBuffer();
+
continue;
}
// Mark the packet buffer as read
m_BufferManager.MarkRead(packetBuffer);
- }
- // Mark the send thread as not running
- m_IsRunning.store(false);
+ // Get next available readable buffer
+ packetBuffer = m_BufferManager.GetReadableBuffer();
+ }
}
} // namespace profiling
#include "ISendCounterPacket.hpp"
#include "ICounterDirectory.hpp"
#include "IProfilingConnection.hpp"
+#include "ProfilingUtils.hpp"
#include <atomic>
+#include <condition_variable>
#include <mutex>
#include <thread>
-#include <condition_variable>
+#include <type_traits>
namespace armnn
{
using IndexValuePairsVector = std::vector<std::pair<uint16_t, uint32_t>>;
- SendCounterPacket(IProfilingConnection& profilingConnection, IBufferManager& buffer)
+ SendCounterPacket(IProfilingConnection& profilingConnection, IBufferManager& buffer, int timeout = 1)
: m_ProfilingConnection(profilingConnection)
, m_BufferManager(buffer)
, m_IsRunning(false)
, m_KeepRunning(false)
+ , m_Timeout(timeout)
{}
~SendCounterPacket() { Stop(); }
template <typename ExceptionType>
void CancelOperationAndThrow(std::unique_ptr<IPacketBuffer>& writerBuffer, const std::string& errorMessage)
{
+ if (std::is_same<ExceptionType, armnn::profiling::BufferExhaustion>::value)
+ {
+ SetReadyToRead();
+ }
if (writerBuffer != nullptr)
{
// Cancel the operation
throw ExceptionType(errorMessage);
}
+ void FlushBuffer();
+
IProfilingConnection& m_ProfilingConnection;
IBufferManager& m_BufferManager;
std::mutex m_WaitMutex;
std::thread m_SendThread;
std::atomic<bool> m_IsRunning;
std::atomic<bool> m_KeepRunning;
+ int m_Timeout;
protected:
// Helper methods, protected for testing
unsigned int reservedSize = 0;
// Cannot reserve buffer bigger than maximum buffer size
- BOOST_CHECK_THROW(bufferManager.Reserve(1024, reservedSize), armnn::InvalidArgumentException);
+ auto reservedBuffer = bufferManager.Reserve(1024, reservedSize);
+ BOOST_TEST(reservedSize == 0);
+ BOOST_TEST(!reservedBuffer.get());
}
BOOST_AUTO_TEST_CASE(BufferExhaustionTest)
BOOST_TEST(packetBuffer.get());
// Cannot reserve buffer when buffer is not available
- BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize), BufferExhaustion);
+ auto reservedBuffer = bufferManager.Reserve(512, reservedSize);
+ BOOST_TEST(reservedSize == 0);
+ BOOST_TEST(!reservedBuffer.get());
}
BOOST_AUTO_TEST_CASE(BufferReserveMultipleTest)
// Cannot reserve when buffer is not available
unsigned int reservedSize3 = 0;
- BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize3), BufferExhaustion);
+ auto reservedBuffer = bufferManager.Reserve(512, reservedSize3);
+ BOOST_TEST(reservedSize3 == 0);
+ BOOST_TEST(!reservedBuffer.get());
}
BOOST_AUTO_TEST_CASE(BufferReleaseTest)
// Cannot reserve when buffer is not available
unsigned int reservedSize2 = 0;
- BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize2), BufferExhaustion);
+ auto reservedBuffer = bufferManager.Reserve(512, reservedSize2);
+ BOOST_TEST(reservedSize2 == 0);
+ BOOST_TEST(!reservedBuffer.get());
bufferManager.Release(packetBuffer0);
BOOST_TEST(packetBuffer1.get());
unsigned int reservedSize2 = 0;
- BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize2), BufferExhaustion);
+ auto reservedBuffer = bufferManager.Reserve(512, reservedSize2);
+ BOOST_TEST(reservedSize2 == 0);
+ BOOST_TEST(!reservedBuffer.get());
bufferManager.Commit(packetBuffer0, 256);
BOOST_TEST(packetBuffer2->GetSize() == 256);
// Buffer not set back to available list after commit
- BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize2), BufferExhaustion);
+ unsigned int reservedSize = 0;
+ reservedBuffer = bufferManager.Reserve(512, reservedSize);
+ BOOST_TEST(reservedSize == 0);
+ BOOST_TEST(!reservedBuffer.get());
}
BOOST_AUTO_TEST_CASE(BufferMarkReadTest)
// Cannot reserve when buffer is not available
unsigned int reservedSize2 = 0;
- BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize2), BufferExhaustion);
+ auto reservedBuffer = bufferManager.Reserve(512, reservedSize2);
+ BOOST_TEST(reservedSize2 == 0);
+ BOOST_TEST(!reservedBuffer.get());
bufferManager.Commit(packetBuffer0, 256);
BOOST_TEST(packetBuffer2->GetSize() == 256);
// Buffer not set back to available list after commit
- BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize2), BufferExhaustion);
+ reservedBuffer = bufferManager.Reserve(512, reservedSize2);
+ BOOST_TEST(reservedSize2 == 0);
+ BOOST_TEST(!reservedBuffer.get());
bufferManager.MarkRead(packetBuffer2);
#include "SendCounterPacketTests.hpp"
+#include <CounterDirectory.hpp>
+#include <BufferManager.hpp>
#include <EncodeVersion.hpp>
#include <ProfilingUtils.hpp>
#include <SendCounterPacket.hpp>
-#include <CounterDirectory.hpp>
#include <armnn/Exceptions.hpp>
#include <armnn/Conversion.hpp>
BOOST_CHECK(mockStreamCounterBuffer.GetReadSize() <= mockStreamCounterBuffer.GetCommittedSize());
}
+BOOST_AUTO_TEST_CASE(SendThreadBufferTest)
+{
+ MockProfilingConnection mockProfilingConnection;
+ BufferManager bufferManager(1, 1024);
+ SendCounterPacket sendCounterPacket(mockProfilingConnection, bufferManager, -1);
+ sendCounterPacket.Start();
+
+ // Interleaving writes and reads to/from the buffer with pauses to test that the send thread actually waits for
+ // something to become available for reading
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ // SendStreamMetaDataPacket
+ sendCounterPacket.SendStreamMetaDataPacket();
+
+ // Read data from the buffer
+ // Buffer should become readable after commit by SendStreamMetaDataPacket
+ auto packetBuffer = bufferManager.GetReadableBuffer();
+ BOOST_TEST(packetBuffer.get());
+
+ std::string processName = GetProcessName().substr(0, 60);
+ unsigned int processNameSize = processName.empty() ? 0 : boost::numeric_cast<unsigned int>(processName.size()) + 1;
+ unsigned int streamMetadataPacketsize = 118 + processNameSize;
+ BOOST_TEST(packetBuffer->GetSize() == streamMetadataPacketsize);
+
+ // Buffer is not available when SendStreamMetaDataPacket already occupied the buffer.
+ unsigned int reservedSize = 0;
+ auto reservedBuffer = bufferManager.Reserve(512, reservedSize);
+ BOOST_TEST(!reservedBuffer.get());
+
+ // Recommit to be read by sendCounterPacket
+ bufferManager.Commit(packetBuffer, streamMetadataPacketsize);
+
+ sendCounterPacket.SetReadyToRead();
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ // The buffer is read by the send thread so it should not be in the readable buffer.
+ auto readBuffer = bufferManager.GetReadableBuffer();
+ BOOST_TEST(!readBuffer);
+
+ // Successfully reserved the buffer with requested size
+ reservedBuffer = bufferManager.Reserve(512, reservedSize);
+ BOOST_TEST(reservedSize == 512);
+ BOOST_TEST(reservedBuffer.get());
+
+ // Release the buffer to be used by sendCounterPacket
+ bufferManager.Release(reservedBuffer);
+
+ // SendCounterDirectoryPacket
+ CounterDirectory counterDirectory;
+ sendCounterPacket.SendCounterDirectoryPacket(counterDirectory);
+
+ // Read data from the buffer
+ // Buffer should become readable after commit by SendCounterDirectoryPacket
+ auto counterDirectoryPacketBuffer = bufferManager.GetReadableBuffer();
+ BOOST_TEST(counterDirectoryPacketBuffer.get());
+
+ // Get the size of the Counter Directory Packet
+ unsigned int counterDirectoryPacketSize = 32;
+ BOOST_TEST(counterDirectoryPacketBuffer->GetSize() == counterDirectoryPacketSize);
+
+ // Buffer is not available when SendCounterDirectoryPacket already occupied the buffer.
+ reservedSize = 0;
+ reservedBuffer = bufferManager.Reserve(512, reservedSize);
+ BOOST_TEST(reservedSize == 0);
+ BOOST_TEST(!reservedBuffer.get());
+
+ // Recommit to be read by sendCounterPacket
+ bufferManager.Commit(counterDirectoryPacketBuffer, counterDirectoryPacketSize);
+
+ sendCounterPacket.SetReadyToRead();
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ // The buffer is read by the send thread so it should not be in the readable buffer.
+ readBuffer = bufferManager.GetReadableBuffer();
+ BOOST_TEST(!readBuffer);
+
+ // Successfully reserved the buffer with requested size
+ reservedBuffer = bufferManager.Reserve(512, reservedSize);
+ BOOST_TEST(reservedSize == 512);
+ BOOST_TEST(reservedBuffer.get());
+
+ // Release the buffer to be used by sendCounterPacket
+ bufferManager.Release(reservedBuffer);
+
+ // SendPeriodicCounterCapturePacket
+ sendCounterPacket.SendPeriodicCounterCapturePacket(123u,
+ {
+ { 1u, 23u },
+ { 33u, 1207623u }
+ });
+
+ // Read data from the buffer
+ // Buffer should become readable after commit by SendPeriodicCounterCapturePacket
+ auto periodicCounterCapturePacketBuffer = bufferManager.GetReadableBuffer();
+ BOOST_TEST(periodicCounterCapturePacketBuffer.get());
+
+ // Get the size of the Periodic Counter Capture Packet
+ unsigned int periodicCounterCapturePacketSize = 28;
+ BOOST_TEST(periodicCounterCapturePacketBuffer->GetSize() == periodicCounterCapturePacketSize);
+
+ // Buffer is not available when SendPeriodicCounterCapturePacket already occupied the buffer.
+ reservedSize = 0;
+ reservedBuffer = bufferManager.Reserve(512, reservedSize);
+ BOOST_TEST(reservedSize == 0);
+ BOOST_TEST(!reservedBuffer.get());
+
+ // Recommit to be read by sendCounterPacket
+ bufferManager.Commit(periodicCounterCapturePacketBuffer, periodicCounterCapturePacketSize);
+
+ sendCounterPacket.SetReadyToRead();
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ // The buffer is read by the send thread so it should not be in the readable buffer.
+ readBuffer = bufferManager.GetReadableBuffer();
+ BOOST_TEST(!readBuffer);
+
+ // Successfully reserved the buffer with requested size
+ reservedBuffer = bufferManager.Reserve(512, reservedSize);
+ BOOST_TEST(reservedSize == 512);
+ BOOST_TEST(reservedBuffer.get());
+
+ sendCounterPacket.Stop();
+}
+
+BOOST_AUTO_TEST_CASE(SendThreadBufferTest1)
+{
+ MockWriteProfilingConnection mockProfilingConnection;
+ BufferManager bufferManager(3, 1024);
+ SendCounterPacket sendCounterPacket(mockProfilingConnection, bufferManager, -1);
+ sendCounterPacket.Start();
+
+ // SendStreamMetaDataPacket
+ sendCounterPacket.SendStreamMetaDataPacket();
+
+ // Read data from the buffer
+ // Buffer should become readable after commit by SendStreamMetaDataPacket
+ auto packetBuffer = bufferManager.GetReadableBuffer();
+ BOOST_TEST(packetBuffer.get());
+
+ std::string processName = GetProcessName().substr(0, 60);
+ unsigned int processNameSize = processName.empty() ? 0 : boost::numeric_cast<unsigned int>(processName.size()) + 1;
+ unsigned int streamMetadataPacketsize = 118 + processNameSize;
+ BOOST_TEST(packetBuffer->GetSize() == streamMetadataPacketsize);
+
+ // Recommit to be read by sendCounterPacket
+ bufferManager.Commit(packetBuffer, streamMetadataPacketsize);
+
+ sendCounterPacket.SetReadyToRead();
+
+ // SendCounterDirectoryPacket
+ CounterDirectory counterDirectory;
+ sendCounterPacket.SendCounterDirectoryPacket(counterDirectory);
+
+ sendCounterPacket.SetReadyToRead();
+
+ // SendPeriodicCounterCapturePacket
+ sendCounterPacket.SendPeriodicCounterCapturePacket(123u,
+ {
+ { 1u, 23u },
+ { 33u, 1207623u }
+ });
+
+ sendCounterPacket.SetReadyToRead();
+
+ sendCounterPacket.Stop();
+
+ // The buffer is read by the send thread so it should not be in the readable buffer.
+ auto readBuffer = bufferManager.GetReadableBuffer();
+ BOOST_TEST(!readBuffer);
+
+ // Successfully reserved the buffer with requested size
+ unsigned int reservedSize = 0;
+ auto reservedBuffer = bufferManager.Reserve(512, reservedSize);
+ BOOST_TEST(reservedSize == 512);
+ BOOST_TEST(reservedBuffer.get());
+
+ // Check that data was actually written to the profiling connection in any order
+ std::vector<uint32_t> writtenData = mockProfilingConnection.GetWrittenData();
+ std::vector<uint32_t> expectedOutput{streamMetadataPacketsize, 32, 28};
+ BOOST_TEST(writtenData.size() == 3);
+ bool foundStreamMetaDataPacket =
+ std::find(writtenData.begin(), writtenData.end(), streamMetadataPacketsize) != writtenData.end();
+ bool foundCounterDirectoryPacket = std::find(writtenData.begin(), writtenData.end(), 32) != writtenData.end();
+ bool foundPeriodicCounterCapturePacket = std::find(writtenData.begin(), writtenData.end(), 28) != writtenData.end();
+ BOOST_TEST(foundStreamMetaDataPacket);
+ BOOST_TEST(foundCounterDirectoryPacket);
+ BOOST_TEST(foundPeriodicCounterCapturePacket);
+}
+
BOOST_AUTO_TEST_SUITE_END()
bool m_IsOpen;
};
+class MockWriteProfilingConnection : public IProfilingConnection
+{
+public:
+ MockWriteProfilingConnection()
+ : m_IsOpen(true)
+ {}
+
+ bool IsOpen() override { return m_IsOpen; }
+
+ void Close() override { m_IsOpen = false; }
+
+ bool WritePacket(const unsigned char* buffer, uint32_t length) override
+ {
+ m_WrittenData.push_back(length);
+ return buffer != nullptr && length > 0;
+ }
+
+ Packet ReadPacket(uint32_t timeout) override { return Packet(); }
+
+ std::vector<uint32_t> GetWrittenData()
+ {
+ return m_WrittenData;
+ }
+
+private:
+ bool m_IsOpen;
+ std::vector<uint32_t> m_WrittenData;
+};
+
class MockPacketBuffer : public IPacketBuffer
{
public: