Wait message queue flush 10 ms if rendering is too late 05/320205/6
authorEunki, Hong <eunkiki.hong@samsung.com>
Mon, 11 Nov 2024 01:04:35 +0000 (10:04 +0900)
committerEunki Hong <eunkiki.hong@samsung.com>
Thu, 14 Nov 2024 15:52:49 +0000 (00:52 +0900)
Let we wait message queue be flushed if the message queue count is too big.

It will be resolved some memory limitation issues when
event thread flush queue very quickly, but render thread cannot
follow up the messages.

To awake as fast as we can, let's use promise-future system.
Since we don't need to call future.get(), let's just use future<void>

Change-Id: I74b81d0267221f14b4447ebfa092dd76cd56d57a
Signed-off-by: Eunki, Hong <eunkiki.hong@samsung.com>
automated-tests/src/dali-internal/utc-Dali-Internal-Core.cpp
dali/internal/update/queue/update-message-queue.cpp

index 981eab0e401538527bb171879a0630e6493e25bf..9a20ada33a9bdb20c90a0ad71b79712733f969fe 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2023 Samsung Electronics Co., Ltd.
+ * Copyright (c) 2024 Samsung Electronics Co., Ltd.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -79,6 +79,7 @@ public:
     RelayoutSignalHandler::RelayoutCallback(actor);
   }
 };
+constexpr size_t FORCIBLE_WAIT_FLUSHED_BUFFER_COUNT_THRESHOLD = 1024;
 
 } // anonymous namespace
 
@@ -109,6 +110,50 @@ int UtcDaliCoreProcessEvents(void)
   END_TEST;
 }
 
+int UtcDaliCoreProcessEventsStressTest(void)
+{
+  TestApplication application;
+  tet_infoline("Testing Dali::Integration::Core::ProcessEvents more than 1k times before render");
+
+  Vector3 size(100.0f, 100.0f, 0.0f);
+  Vector3 position(100.0f, 100.0f, 0.0f);
+
+  Actor actor = Actor::New();
+  actor.SetResizePolicy(ResizePolicy::FIXED, Dimension::ALL_DIMENSIONS);
+  actor.SetProperty(Actor::Property::SIZE, size);
+  actor.SetProperty(Actor::Property::POSITION, position);
+  application.GetScene().Add(actor);
+
+  RelayoutSignalHandler relayoutSignal(application);
+  actor.OnRelayoutSignal().Connect(&relayoutSignal, &RelayoutSignalHandler::RelayoutCallback);
+
+  application.SendNotification();
+
+  DALI_TEST_EQUALS(relayoutSignal.mSignalCalled, true, TEST_LOCATION);
+
+  DALI_TEST_EQUALS(actor.GetProperty(Actor::Property::SIZE).Get<Vector3>(), size, TEST_LOCATION);
+  DALI_TEST_EQUALS(actor.GetProperty(Actor::Property::POSITION).Get<Vector3>(), position, TEST_LOCATION);
+
+  relayoutSignal.mSignalCalled = false;
+
+  for(size_t i = 0; i < FORCIBLE_WAIT_FLUSHED_BUFFER_COUNT_THRESHOLD; ++i)
+  {
+    Vector3 newSize = size + Vector3(i + 1, i + 1, 0);
+    actor.SetProperty(Actor::Property::SIZE, newSize);
+    application.SendNotification();
+    DALI_TEST_EQUALS(relayoutSignal.mSignalCalled, true, TEST_LOCATION);
+    DALI_TEST_EQUALS(actor.GetProperty(Actor::Property::SIZE).Get<Vector3>(), newSize, TEST_LOCATION);
+
+    relayoutSignal.mSignalCalled = false;
+  }
+
+  application.Render();
+  application.SendNotification();
+  application.Render();
+
+  END_TEST;
+}
+
 int UtcDaliCoreForceRelayout(void)
 {
   TestApplication application;
@@ -176,4 +221,4 @@ int UtcDaliCoreForceRelayout2(void)
   application.SendNotification();
 
   END_TEST;
-}
\ No newline at end of file
+}
index cb35cc2d2cd65db2e2d5f128cb57413ebd9d5542..300b17808cf73a589c06cc5a47801abad296738c 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2023 Samsung Electronics Co., Ltd.
+ * Copyright (c) 2024 Samsung Electronics Co., Ltd.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 // CLASS HEADER
 #include <dali/internal/update/queue/update-message-queue.h>
 
+// EXTERNAL INCLUDES
+#include <future> ///< for std::future and std::promise
+#include <chrono> ///< for std::chrono::milliseconds
+
 // INTERNAL INCLUDES
 #include <dali/devel-api/threading/mutex.h>
+#include <dali/integration-api/debug.h>
 #include <dali/integration-api/render-controller.h>
 #include <dali/internal/common/message-buffer.h>
 #include <dali/internal/common/message.h>
@@ -43,6 +48,11 @@ static const std::size_t INITIAL_BUFFER_SIZE   = 32768;
 static const std::size_t MAX_BUFFER_CAPACITY   = 73728; // Avoid keeping buffers which exceed this
 static const std::size_t MAX_FREE_BUFFER_COUNT = 3;     // Allow this number of buffers to be recycled
 
+// Threshold of flushed buffers count to keep in the message queue.
+// If the buffer exceeded orver the max allowed count, main thread will be sleep to avoid too much message flushing.
+constexpr std::size_t MAX_MESSAGES_ALLOWED_IN_PROCESS_QUEUE            = 1024;
+constexpr uint32_t    TIME_TO_WAIT_FOR_MESSAGE_PROCESSING_MILLISECONDS = 10; // milliseconds
+
 // A queue of message buffers
 typedef vector<MessageBuffer*> MessageBufferQueue;
 using MessageBufferIter = MessageBufferQueue::iterator;
@@ -128,6 +138,9 @@ struct MessageQueue::Impl
   MessageBufferQueue processQueue; ///< to process in the next update
   MessageBufferQueue recycleQueue; ///< to recycle MessageBuffers after the messages have been processed
 
+  std::promise<void> messagePromise; ///< promise for message queue processing. Should be created and set value under mutex.
+  std::future<void>  messageFuture;  ///< future for message queue processing done. Should be created under mutex. Will be wait rarely.
+
   MessageBuffer*     currentMessageBuffer; ///< can be used without locking
   MessageBufferQueue freeQueue;            ///< buffers from the recycleQueue; can be used without locking
 };
@@ -214,6 +227,19 @@ bool MessageQueue::FlushQueue()
     mImpl->processQueue.push_back(mImpl->currentMessageBuffer);
     mImpl->currentMessageBuffer = nullptr;
 
+    // Reset message promise and future.
+    mImpl->messagePromise = std::promise<void>();
+    mImpl->messageFuture  = mImpl->messagePromise.get_future();
+    if(DALI_UNLIKELY(mImpl->processQueue.size() >= MAX_MESSAGES_ALLOWED_IN_PROCESS_QUEUE))
+    {
+      DALI_LOG_ERROR("MessageQueue count exceeded [%zu >= %zu] Wait maximum %u ms\n", mImpl->processQueue.size(), MAX_MESSAGES_ALLOWED_IN_PROCESS_QUEUE, TIME_TO_WAIT_FOR_MESSAGE_PROCESSING_MILLISECONDS);
+    }
+    else
+    {
+      // Promise value immediatly.
+      mImpl->messagePromise.set_value();
+    }
+
     // Grab any recycled MessageBuffers
     while(!mImpl->recycleQueue.empty())
     {
@@ -239,6 +265,15 @@ bool MessageQueue::FlushQueue()
     }
   }
 
+  // Block if too much message queued without processing.
+  // It will be unlocked whenever ProcessMessages called, or time expired.
+  auto futureState = mImpl->messageFuture.wait_for(std::chrono::milliseconds(TIME_TO_WAIT_FOR_MESSAGE_PROCESSING_MILLISECONDS));
+
+  if(DALI_UNLIKELY(futureState != std::future_status::ready))
+  {
+    DALI_LOG_ERROR("MessageQueue not processed for overhead cases.\n");
+  }
+
   return messagesToProcess;
 }
 
@@ -258,6 +293,11 @@ bool MessageQueue::ProcessMessages(BufferIndex updateBufferIndex)
     mImpl->queueWasEmpty = mImpl->processQueue.empty(); // Flag whether we processed anything
 
     copiedProcessQueue = std::move(mImpl->processQueue); // Move message queue
+
+    if(DALI_UNLIKELY(copiedProcessQueue.size() >= MAX_MESSAGES_ALLOWED_IN_PROCESS_QUEUE))
+    {
+      mImpl->messagePromise.set_value();
+    }
   }
 
   for(auto&& buffer : copiedProcessQueue)