From 0ec068f220daf03049a9ffb5ac53118162f50106 Mon Sep 17 00:00:00 2001 From: Narumol Prangnawarat Date: Mon, 30 Sep 2019 16:20:20 +0100 Subject: [PATCH] IVGCVSW-3904 Add more unit tests for send thread with BufferManager * Add timeout parameter to wait for readable data * Write all readable data to the profiling connection when ready to read * Set ready to read when buffer exhaust * Ensure that readable data get written to profiling connection before the send thread is stopped * Add MockWriteProfilingConnection to be able to test WritePacket * Refactor BufferManager and the unit tests Signed-off-by: Narumol Prangnawarat Change-Id: I80ae01bd8d0119a3a3a957069ae8ac521c005a12 --- src/profiling/BufferManager.cpp | 7 +- src/profiling/SendCounterPacket.cpp | 98 +++++++------ src/profiling/SendCounterPacket.hpp | 14 +- src/profiling/test/BufferTests.cpp | 33 +++-- src/profiling/test/SendCounterPacketTests.cpp | 195 +++++++++++++++++++++++++- src/profiling/test/SendCounterPacketTests.hpp | 29 ++++ 6 files changed, 311 insertions(+), 65 deletions(-) diff --git a/src/profiling/BufferManager.cpp b/src/profiling/BufferManager.cpp index dbf4466..6ac3ee1 100644 --- a/src/profiling/BufferManager.cpp +++ b/src/profiling/BufferManager.cpp @@ -29,16 +29,17 @@ BufferManager::BufferManager(unsigned int numberOfBuffers, unsigned int maxPacke std::unique_ptr BufferManager::Reserve(unsigned int requestedSize, unsigned int& reservedSize) { + reservedSize = 0; std::unique_lock availableListLock(m_AvailableMutex, std::defer_lock); if (requestedSize > m_MaxBufferSize) { - throw armnn::InvalidArgumentException("The maximum buffer size that can be requested is [" + - std::to_string(m_MaxBufferSize) + "] bytes"); + return nullptr; } availableListLock.lock(); if (m_AvailableList.empty()) { - throw armnn::profiling::BufferExhaustion("Buffer not available"); + availableListLock.unlock(); + return nullptr; } std::unique_ptr buffer = std::move(m_AvailableList.back()); m_AvailableList.pop_back(); diff --git a/src/profiling/SendCounterPacket.cpp b/src/profiling/SendCounterPacket.cpp index 0a2f08b..9aafa2c 100644 --- a/src/profiling/SendCounterPacket.cpp +++ b/src/profiling/SendCounterPacket.cpp @@ -67,17 +67,11 @@ void SendCounterPacket::SendStreamMetaDataPacket() std::unique_ptr writeBuffer = m_BufferManager.Reserve(totalSize, reserved); - if (reserved < totalSize) + if (writeBuffer == nullptr || reserved < totalSize) { CancelOperationAndThrow( - writeBuffer, - boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") - % totalSize)); - } - - if (writeBuffer == nullptr) - { - CancelOperationAndThrow("Error reserving buffer memory."); + writeBuffer, + boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") % totalSize)); } try @@ -795,19 +789,11 @@ void SendCounterPacket::SendCounterDirectoryPacket(const ICounterDirectory& coun uint32_t reserved = 0; std::unique_ptr writeBuffer = m_BufferManager.Reserve(totalSize, reserved); - // Check that the reserved buffer size is enough to hold the counter directory packet - if (reserved < totalSize) + if (writeBuffer == nullptr || reserved < totalSize) { CancelOperationAndThrow( - writeBuffer, - boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes") - % totalSize)); - } - - // Check the buffer handle is valid - if (writeBuffer == nullptr) - { - CancelOperationAndThrow("Error reserving buffer memory"); + writeBuffer, + boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") % totalSize)); } // Offset for writing to the buffer @@ -837,17 +823,11 @@ void SendCounterPacket::SendPeriodicCounterCapturePacket(uint64_t timestamp, con std::unique_ptr writeBuffer = m_BufferManager.Reserve(totalSize, reserved); - if (reserved < totalSize) + if (writeBuffer == nullptr || reserved < totalSize) { CancelOperationAndThrow( - writeBuffer, - boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") - % totalSize)); - } - - if (writeBuffer == nullptr) - { - CancelOperationAndThrow("Error reserving buffer memory."); + writeBuffer, + boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") % totalSize)); } // Create header. @@ -888,17 +868,11 @@ void SendCounterPacket::SendPeriodicCounterSelectionPacket(uint32_t capturePerio std::unique_ptr writeBuffer = m_BufferManager.Reserve(totalSize, reserved); - if (reserved < totalSize) + if (writeBuffer == nullptr || reserved < totalSize) { CancelOperationAndThrow( - writeBuffer, - boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") - % totalSize)); - } - - if (writeBuffer == nullptr) - { - CancelOperationAndThrow("Error reserving buffer memory."); + writeBuffer, + boost::str(boost::format("No space left in buffer. Unable to reserve (%1%) bytes.") % totalSize)); } // Create header. @@ -972,25 +946,47 @@ void SendCounterPacket::Send() // Lock the mutex to wait on it std::unique_lock lock(m_WaitMutex); - // Wait until the thread is notified of something to read from the buffer, or check anyway after a second - m_WaitCondition.wait_for(lock, std::chrono::seconds(1)); + if (m_Timeout < 0) + { + // Wait indefinitely until notified that something to read has become available in the buffer + m_WaitCondition.wait(lock); + } + else + { + // Wait until the thread is notified of something to read from the buffer, + // or check anyway after a second + m_WaitCondition.wait_for(lock, std::chrono::seconds(m_Timeout)); + } } // Wait condition lock scope - End - // Get the buffer to read from - std::unique_ptr packetBuffer = m_BufferManager.GetReadableBuffer(); - if (packetBuffer == nullptr) - { - // Nothing to read from, ignore and continue - continue; - } + FlushBuffer(); + } + // Ensure that all readable data got written to the profiling connection before the thread is stopped + FlushBuffer(); + + // Mark the send thread as not running + m_IsRunning.store(false); +} + +void SendCounterPacket::FlushBuffer() +{ + // Get the first available readable buffer + std::unique_ptr packetBuffer = m_BufferManager.GetReadableBuffer(); + + while (packetBuffer != nullptr) + { // Get the data to send from the buffer const unsigned char* readBuffer = packetBuffer->GetReadableData(); unsigned int readBufferSize = packetBuffer->GetSize(); + if (readBuffer == nullptr || readBufferSize == 0) { - // Nothing to send, ignore and continue + // Nothing to send, get the next available readable buffer and continue + m_BufferManager.MarkRead(packetBuffer); + packetBuffer = m_BufferManager.GetReadableBuffer(); + continue; } @@ -1003,10 +999,10 @@ void SendCounterPacket::Send() // Mark the packet buffer as read m_BufferManager.MarkRead(packetBuffer); - } - // Mark the send thread as not running - m_IsRunning.store(false); + // Get next available readable buffer + packetBuffer = m_BufferManager.GetReadableBuffer(); + } } } // namespace profiling diff --git a/src/profiling/SendCounterPacket.hpp b/src/profiling/SendCounterPacket.hpp index c57546d..748371b 100644 --- a/src/profiling/SendCounterPacket.hpp +++ b/src/profiling/SendCounterPacket.hpp @@ -9,11 +9,13 @@ #include "ISendCounterPacket.hpp" #include "ICounterDirectory.hpp" #include "IProfilingConnection.hpp" +#include "ProfilingUtils.hpp" #include +#include #include #include -#include +#include namespace armnn { @@ -31,11 +33,12 @@ public: using IndexValuePairsVector = std::vector>; - SendCounterPacket(IProfilingConnection& profilingConnection, IBufferManager& buffer) + SendCounterPacket(IProfilingConnection& profilingConnection, IBufferManager& buffer, int timeout = 1) : m_ProfilingConnection(profilingConnection) , m_BufferManager(buffer) , m_IsRunning(false) , m_KeepRunning(false) + , m_Timeout(timeout) {} ~SendCounterPacket() { Stop(); } @@ -70,6 +73,10 @@ private: template void CancelOperationAndThrow(std::unique_ptr& writerBuffer, const std::string& errorMessage) { + if (std::is_same::value) + { + SetReadyToRead(); + } if (writerBuffer != nullptr) { // Cancel the operation @@ -80,6 +87,8 @@ private: throw ExceptionType(errorMessage); } + void FlushBuffer(); + IProfilingConnection& m_ProfilingConnection; IBufferManager& m_BufferManager; std::mutex m_WaitMutex; @@ -87,6 +96,7 @@ private: std::thread m_SendThread; std::atomic m_IsRunning; std::atomic m_KeepRunning; + int m_Timeout; protected: // Helper methods, protected for testing diff --git a/src/profiling/test/BufferTests.cpp b/src/profiling/test/BufferTests.cpp index a2f3c30..7a06843 100644 --- a/src/profiling/test/BufferTests.cpp +++ b/src/profiling/test/BufferTests.cpp @@ -130,7 +130,9 @@ BOOST_AUTO_TEST_CASE(BufferReserveExceedingSpaceTest) unsigned int reservedSize = 0; // Cannot reserve buffer bigger than maximum buffer size - BOOST_CHECK_THROW(bufferManager.Reserve(1024, reservedSize), armnn::InvalidArgumentException); + auto reservedBuffer = bufferManager.Reserve(1024, reservedSize); + BOOST_TEST(reservedSize == 0); + BOOST_TEST(!reservedBuffer.get()); } BOOST_AUTO_TEST_CASE(BufferExhaustionTest) @@ -144,7 +146,9 @@ BOOST_AUTO_TEST_CASE(BufferExhaustionTest) BOOST_TEST(packetBuffer.get()); // Cannot reserve buffer when buffer is not available - BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize), BufferExhaustion); + auto reservedBuffer = bufferManager.Reserve(512, reservedSize); + BOOST_TEST(reservedSize == 0); + BOOST_TEST(!reservedBuffer.get()); } BOOST_AUTO_TEST_CASE(BufferReserveMultipleTest) @@ -173,7 +177,9 @@ BOOST_AUTO_TEST_CASE(BufferReserveMultipleTest) // Cannot reserve when buffer is not available unsigned int reservedSize3 = 0; - BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize3), BufferExhaustion); + auto reservedBuffer = bufferManager.Reserve(512, reservedSize3); + BOOST_TEST(reservedSize3 == 0); + BOOST_TEST(!reservedBuffer.get()); } BOOST_AUTO_TEST_CASE(BufferReleaseTest) @@ -195,7 +201,9 @@ BOOST_AUTO_TEST_CASE(BufferReleaseTest) // Cannot reserve when buffer is not available unsigned int reservedSize2 = 0; - BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize2), BufferExhaustion); + auto reservedBuffer = bufferManager.Reserve(512, reservedSize2); + BOOST_TEST(reservedSize2 == 0); + BOOST_TEST(!reservedBuffer.get()); bufferManager.Release(packetBuffer0); @@ -222,7 +230,9 @@ BOOST_AUTO_TEST_CASE(BufferCommitTest) BOOST_TEST(packetBuffer1.get()); unsigned int reservedSize2 = 0; - BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize2), BufferExhaustion); + auto reservedBuffer = bufferManager.Reserve(512, reservedSize2); + BOOST_TEST(reservedSize2 == 0); + BOOST_TEST(!reservedBuffer.get()); bufferManager.Commit(packetBuffer0, 256); @@ -232,7 +242,10 @@ BOOST_AUTO_TEST_CASE(BufferCommitTest) BOOST_TEST(packetBuffer2->GetSize() == 256); // Buffer not set back to available list after commit - BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize2), BufferExhaustion); + unsigned int reservedSize = 0; + reservedBuffer = bufferManager.Reserve(512, reservedSize); + BOOST_TEST(reservedSize == 0); + BOOST_TEST(!reservedBuffer.get()); } BOOST_AUTO_TEST_CASE(BufferMarkReadTest) @@ -252,7 +265,9 @@ BOOST_AUTO_TEST_CASE(BufferMarkReadTest) // Cannot reserve when buffer is not available unsigned int reservedSize2 = 0; - BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize2), BufferExhaustion); + auto reservedBuffer = bufferManager.Reserve(512, reservedSize2); + BOOST_TEST(reservedSize2 == 0); + BOOST_TEST(!reservedBuffer.get()); bufferManager.Commit(packetBuffer0, 256); @@ -262,7 +277,9 @@ BOOST_AUTO_TEST_CASE(BufferMarkReadTest) BOOST_TEST(packetBuffer2->GetSize() == 256); // Buffer not set back to available list after commit - BOOST_CHECK_THROW(bufferManager.Reserve(512, reservedSize2), BufferExhaustion); + reservedBuffer = bufferManager.Reserve(512, reservedSize2); + BOOST_TEST(reservedSize2 == 0); + BOOST_TEST(!reservedBuffer.get()); bufferManager.MarkRead(packetBuffer2); diff --git a/src/profiling/test/SendCounterPacketTests.cpp b/src/profiling/test/SendCounterPacketTests.cpp index ba1d470..0d21ec0 100644 --- a/src/profiling/test/SendCounterPacketTests.cpp +++ b/src/profiling/test/SendCounterPacketTests.cpp @@ -5,10 +5,11 @@ #include "SendCounterPacketTests.hpp" +#include +#include #include #include #include -#include #include #include @@ -2044,4 +2045,196 @@ BOOST_AUTO_TEST_CASE(SendThreadTest3) BOOST_CHECK(mockStreamCounterBuffer.GetReadSize() <= mockStreamCounterBuffer.GetCommittedSize()); } +BOOST_AUTO_TEST_CASE(SendThreadBufferTest) +{ + MockProfilingConnection mockProfilingConnection; + BufferManager bufferManager(1, 1024); + SendCounterPacket sendCounterPacket(mockProfilingConnection, bufferManager, -1); + sendCounterPacket.Start(); + + // Interleaving writes and reads to/from the buffer with pauses to test that the send thread actually waits for + // something to become available for reading + std::this_thread::sleep_for(std::chrono::seconds(1)); + + // SendStreamMetaDataPacket + sendCounterPacket.SendStreamMetaDataPacket(); + + // Read data from the buffer + // Buffer should become readable after commit by SendStreamMetaDataPacket + auto packetBuffer = bufferManager.GetReadableBuffer(); + BOOST_TEST(packetBuffer.get()); + + std::string processName = GetProcessName().substr(0, 60); + unsigned int processNameSize = processName.empty() ? 0 : boost::numeric_cast(processName.size()) + 1; + unsigned int streamMetadataPacketsize = 118 + processNameSize; + BOOST_TEST(packetBuffer->GetSize() == streamMetadataPacketsize); + + // Buffer is not available when SendStreamMetaDataPacket already occupied the buffer. + unsigned int reservedSize = 0; + auto reservedBuffer = bufferManager.Reserve(512, reservedSize); + BOOST_TEST(!reservedBuffer.get()); + + // Recommit to be read by sendCounterPacket + bufferManager.Commit(packetBuffer, streamMetadataPacketsize); + + sendCounterPacket.SetReadyToRead(); + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + // The buffer is read by the send thread so it should not be in the readable buffer. + auto readBuffer = bufferManager.GetReadableBuffer(); + BOOST_TEST(!readBuffer); + + // Successfully reserved the buffer with requested size + reservedBuffer = bufferManager.Reserve(512, reservedSize); + BOOST_TEST(reservedSize == 512); + BOOST_TEST(reservedBuffer.get()); + + // Release the buffer to be used by sendCounterPacket + bufferManager.Release(reservedBuffer); + + // SendCounterDirectoryPacket + CounterDirectory counterDirectory; + sendCounterPacket.SendCounterDirectoryPacket(counterDirectory); + + // Read data from the buffer + // Buffer should become readable after commit by SendCounterDirectoryPacket + auto counterDirectoryPacketBuffer = bufferManager.GetReadableBuffer(); + BOOST_TEST(counterDirectoryPacketBuffer.get()); + + // Get the size of the Counter Directory Packet + unsigned int counterDirectoryPacketSize = 32; + BOOST_TEST(counterDirectoryPacketBuffer->GetSize() == counterDirectoryPacketSize); + + // Buffer is not available when SendCounterDirectoryPacket already occupied the buffer. + reservedSize = 0; + reservedBuffer = bufferManager.Reserve(512, reservedSize); + BOOST_TEST(reservedSize == 0); + BOOST_TEST(!reservedBuffer.get()); + + // Recommit to be read by sendCounterPacket + bufferManager.Commit(counterDirectoryPacketBuffer, counterDirectoryPacketSize); + + sendCounterPacket.SetReadyToRead(); + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + // The buffer is read by the send thread so it should not be in the readable buffer. + readBuffer = bufferManager.GetReadableBuffer(); + BOOST_TEST(!readBuffer); + + // Successfully reserved the buffer with requested size + reservedBuffer = bufferManager.Reserve(512, reservedSize); + BOOST_TEST(reservedSize == 512); + BOOST_TEST(reservedBuffer.get()); + + // Release the buffer to be used by sendCounterPacket + bufferManager.Release(reservedBuffer); + + // SendPeriodicCounterCapturePacket + sendCounterPacket.SendPeriodicCounterCapturePacket(123u, + { + { 1u, 23u }, + { 33u, 1207623u } + }); + + // Read data from the buffer + // Buffer should become readable after commit by SendPeriodicCounterCapturePacket + auto periodicCounterCapturePacketBuffer = bufferManager.GetReadableBuffer(); + BOOST_TEST(periodicCounterCapturePacketBuffer.get()); + + // Get the size of the Periodic Counter Capture Packet + unsigned int periodicCounterCapturePacketSize = 28; + BOOST_TEST(periodicCounterCapturePacketBuffer->GetSize() == periodicCounterCapturePacketSize); + + // Buffer is not available when SendPeriodicCounterCapturePacket already occupied the buffer. + reservedSize = 0; + reservedBuffer = bufferManager.Reserve(512, reservedSize); + BOOST_TEST(reservedSize == 0); + BOOST_TEST(!reservedBuffer.get()); + + // Recommit to be read by sendCounterPacket + bufferManager.Commit(periodicCounterCapturePacketBuffer, periodicCounterCapturePacketSize); + + sendCounterPacket.SetReadyToRead(); + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + // The buffer is read by the send thread so it should not be in the readable buffer. + readBuffer = bufferManager.GetReadableBuffer(); + BOOST_TEST(!readBuffer); + + // Successfully reserved the buffer with requested size + reservedBuffer = bufferManager.Reserve(512, reservedSize); + BOOST_TEST(reservedSize == 512); + BOOST_TEST(reservedBuffer.get()); + + sendCounterPacket.Stop(); +} + +BOOST_AUTO_TEST_CASE(SendThreadBufferTest1) +{ + MockWriteProfilingConnection mockProfilingConnection; + BufferManager bufferManager(3, 1024); + SendCounterPacket sendCounterPacket(mockProfilingConnection, bufferManager, -1); + sendCounterPacket.Start(); + + // SendStreamMetaDataPacket + sendCounterPacket.SendStreamMetaDataPacket(); + + // Read data from the buffer + // Buffer should become readable after commit by SendStreamMetaDataPacket + auto packetBuffer = bufferManager.GetReadableBuffer(); + BOOST_TEST(packetBuffer.get()); + + std::string processName = GetProcessName().substr(0, 60); + unsigned int processNameSize = processName.empty() ? 0 : boost::numeric_cast(processName.size()) + 1; + unsigned int streamMetadataPacketsize = 118 + processNameSize; + BOOST_TEST(packetBuffer->GetSize() == streamMetadataPacketsize); + + // Recommit to be read by sendCounterPacket + bufferManager.Commit(packetBuffer, streamMetadataPacketsize); + + sendCounterPacket.SetReadyToRead(); + + // SendCounterDirectoryPacket + CounterDirectory counterDirectory; + sendCounterPacket.SendCounterDirectoryPacket(counterDirectory); + + sendCounterPacket.SetReadyToRead(); + + // SendPeriodicCounterCapturePacket + sendCounterPacket.SendPeriodicCounterCapturePacket(123u, + { + { 1u, 23u }, + { 33u, 1207623u } + }); + + sendCounterPacket.SetReadyToRead(); + + sendCounterPacket.Stop(); + + // The buffer is read by the send thread so it should not be in the readable buffer. + auto readBuffer = bufferManager.GetReadableBuffer(); + BOOST_TEST(!readBuffer); + + // Successfully reserved the buffer with requested size + unsigned int reservedSize = 0; + auto reservedBuffer = bufferManager.Reserve(512, reservedSize); + BOOST_TEST(reservedSize == 512); + BOOST_TEST(reservedBuffer.get()); + + // Check that data was actually written to the profiling connection in any order + std::vector writtenData = mockProfilingConnection.GetWrittenData(); + std::vector expectedOutput{streamMetadataPacketsize, 32, 28}; + BOOST_TEST(writtenData.size() == 3); + bool foundStreamMetaDataPacket = + std::find(writtenData.begin(), writtenData.end(), streamMetadataPacketsize) != writtenData.end(); + bool foundCounterDirectoryPacket = std::find(writtenData.begin(), writtenData.end(), 32) != writtenData.end(); + bool foundPeriodicCounterCapturePacket = std::find(writtenData.begin(), writtenData.end(), 28) != writtenData.end(); + BOOST_TEST(foundStreamMetaDataPacket); + BOOST_TEST(foundCounterDirectoryPacket); + BOOST_TEST(foundPeriodicCounterCapturePacket); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/src/profiling/test/SendCounterPacketTests.hpp b/src/profiling/test/SendCounterPacketTests.hpp index 3c3427c..584df5e 100644 --- a/src/profiling/test/SendCounterPacketTests.hpp +++ b/src/profiling/test/SendCounterPacketTests.hpp @@ -42,6 +42,35 @@ private: bool m_IsOpen; }; +class MockWriteProfilingConnection : public IProfilingConnection +{ +public: + MockWriteProfilingConnection() + : m_IsOpen(true) + {} + + bool IsOpen() override { return m_IsOpen; } + + void Close() override { m_IsOpen = false; } + + bool WritePacket(const unsigned char* buffer, uint32_t length) override + { + m_WrittenData.push_back(length); + return buffer != nullptr && length > 0; + } + + Packet ReadPacket(uint32_t timeout) override { return Packet(); } + + std::vector GetWrittenData() + { + return m_WrittenData; + } + +private: + bool m_IsOpen; + std::vector m_WrittenData; +}; + class MockPacketBuffer : public IPacketBuffer { public: -- 2.7.4