IVGCVSW-4221 Fix SendCounterPacket hanging for indefinite time
authorFinn Williams <Finn.Williams@arm.com>
Tue, 3 Dec 2019 11:55:31 +0000 (11:55 +0000)
committerFinn Williams <Finn.Williams@arm.com>
Wed, 4 Dec 2019 11:47:50 +0000 (11:47 +0000)
Signed-off-by: Finn Williams <Finn.Williams@arm.com>
Change-Id: I612f4d0162e7f35296f7d484350a937f6344fcfb

src/profiling/SendCounterPacket.cpp
src/profiling/SendCounterPacket.hpp

index 2d64583..e3a9b77 100644 (file)
@@ -903,6 +903,11 @@ void SendCounterPacket::SendPeriodicCounterSelectionPacket(uint32_t capturePerio
 
 void SendCounterPacket::SetReadyToRead()
 {
+    // We need to wait for the send thread to release its mutex
+    {
+        std::lock_guard<std::mutex> lck(m_WaitMutex);
+        m_ReadyToRead = true;
+    }
     // Signal the send thread that there's something to read in the buffer
     m_WaitCondition.notify_one();
 }
@@ -927,6 +932,10 @@ void SendCounterPacket::Start(IProfilingConnection& profilingConnection)
     // Keep the send procedure going until the send thread is signalled to stop
     m_KeepRunning.store(true);
 
+    // Make sure the send thread will not flush the buffer until signaled to do so
+    // no need for a mutex as the send thread can not be running at this point
+    m_ReadyToRead = false;
+
     // Start the send thread
     m_SendThread = std::thread(&SendCounterPacket::Send, this, std::ref(profilingConnection));
 }
@@ -940,8 +949,7 @@ void SendCounterPacket::Stop(bool rethrowSendThreadExceptions)
     if (m_SendThread.joinable())
     {
         // Kick the send thread out of the wait condition
-        m_WaitCondition.notify_one();
-
+        SetReadyToRead();
         // Wait for the send thread to complete operations
         m_SendThread.join();
     }
@@ -953,22 +961,15 @@ void SendCounterPacket::Stop(bool rethrowSendThreadExceptions)
         return;
     }
 
-    // Exception handling lock scope - Begin
+    // Check if there's an exception to rethrow
+    if (m_SendThreadException)
     {
-        // Lock the mutex to handle any exception coming from the send thread
-        std::lock_guard<std::mutex> lock(m_WaitMutex);
-
-        // Check if there's an exception to rethrow
-        if (m_SendThreadException)
-        {
-            // Rethrow the send thread exception
-            std::rethrow_exception(m_SendThreadException);
+        // Rethrow the send thread exception
+        std::rethrow_exception(m_SendThreadException);
 
-            // Nullify the exception as it has been rethrown
-            m_SendThreadException = nullptr;
-        }
+        // Nullify the exception as it has been rethrown
+        m_SendThreadException = nullptr;
     }
-    // Exception handling lock scope - End
 }
 
 void SendCounterPacket::Send(IProfilingConnection& profilingConnection)
@@ -976,70 +977,80 @@ void SendCounterPacket::Send(IProfilingConnection& profilingConnection)
     // Keep the sending procedure looping until the thread is signalled to stop
     while (m_KeepRunning.load())
     {
-        // Wait condition lock scope - Begin
+        // Check the current state of the profiling service
+        ProfilingState currentState = m_StateMachine.GetCurrentState();
+        switch (currentState)
         {
-            // Lock the mutex to wait on it
-            std::unique_lock<std::mutex> lock(m_WaitMutex);
+        case ProfilingState::Uninitialised:
+        case ProfilingState::NotConnected:
+
+            // The send thread cannot be running when the profiling service is uninitialized or not connected,
+            // stop the thread immediately
+            m_KeepRunning.store(false);
+            m_IsRunning.store(false);
+
+            // An exception should be thrown here, save it to be rethrown later from the main thread so that
+            // it can be caught by the consumer
+            m_SendThreadException =
+                    std::make_exception_ptr(RuntimeException("The send thread should not be running with the "
+                                                             "profiling service not yet initialized or connected"));
+
+            return;
+        case ProfilingState::WaitingForAck:
+
+            // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged.
+            // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically
+            // updated by the command handler
 
-            // Check the current state of the profiling service
-            ProfilingState currentState = m_StateMachine.GetCurrentState();
-            switch (currentState)
+            // Prepare a StreamMetadata packet and write it to the Counter Stream buffer
+            SendStreamMetaDataPacket();
+
+             // Flush the buffer manually to send the packet
+            FlushBuffer(profilingConnection);
+
+            // Wait for a connection ack from the remote server. We should expect a response within timeout value.
+            // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the
+            // StreamMetadata again.
+
+            // Wait condition lock scope - Begin
             {
-            case ProfilingState::Uninitialised:
-            case ProfilingState::NotConnected:
-
-                // The send thread cannot be running when the profiling service is uninitialized or not connected,
-                // stop the thread immediately
-                m_KeepRunning.store(false);
-                m_IsRunning.store(false);
-
-                // An exception should be thrown here, save it to be rethrown later from the main thread so that
-                // it can be caught by the consumer
-                m_SendThreadException =
-                        std::make_exception_ptr(RuntimeException("The send thread should not be running with the "
-                                                                 "profiling service not yet initialized or connected"));
-
-                return;
-            case ProfilingState::WaitingForAck:
-
-                // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged.
-                // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically
-                // updated by the command handler
-
-                // Prepare a StreamMetadata packet and write it to the Counter Stream buffer
-                SendStreamMetaDataPacket();
-
-                 // Flush the buffer manually to send the packet
-                FlushBuffer(profilingConnection);
-
-                // Wait for a connection ack from the remote server. We should expect a response within timeout value.
-                // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the
-                // StreamMetadata again.
-                m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout));
-
-                // Do not flush the buffer again
-                continue;
-            case ProfilingState::Active:
-            default:
-                // Normal working state for the send thread
+                std::unique_lock<std::mutex> lock(m_WaitMutex);
+
+                m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&]{ return m_ReadyToRead; });
 
+                //reset condition variable predicate for next use
+                m_ReadyToRead = false;
+            }
+            // Wait condition lock scope - End
+
+            // Do not flush the buffer again
+            continue;
+        case ProfilingState::Active:
+        default:
+            // Wait condition lock scope - Begin
+            {
+                std::unique_lock<std::mutex> lock(m_WaitMutex);
+
+                // Normal working state for the send thread
                 // Check if the send thread is required to enforce a timeout wait policy
                 if (m_Timeout < 0)
                 {
                     // Wait indefinitely until notified that something to read has become available in the buffer
-                    m_WaitCondition.wait(lock);
+                    m_WaitCondition.wait(lock, [&] { return m_ReadyToRead; });
                 }
                 else
                 {
                     // Wait until the thread is notified of something to read from the buffer,
                     // or check anyway after the specified number of milliseconds
-                    m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout));
+                    m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] { return m_ReadyToRead; });
                 }
 
-                break;
+                //reset condition variable predicate for next use
+                m_ReadyToRead = false;
             }
+            // Wait condition lock scope - End
+            break;
         }
-        // Wait condition lock scope - End
 
         // Send all the available packets in the buffer
         FlushBuffer(profilingConnection);
index 42e8432..1158755 100644 (file)
@@ -109,6 +109,8 @@ private:
     std::thread m_SendThread;
     std::atomic<bool> m_IsRunning;
     std::atomic<bool> m_KeepRunning;
+    // m_ReadyToRead will be protected by m_WaitMutex
+    bool m_ReadyToRead;
     std::exception_ptr m_SendThreadException;
     std::mutex m_PacketSentWaitMutex;
     std::condition_variable m_PacketSentWaitCondition;