From f3a81ae06eb91d234d7a3cbf7bf25b7c458cfcbb Mon Sep 17 00:00:00 2001 From: =?utf8?q?Aleksey=20Kliger=20=28=CE=BBgeek=29?= Date: Wed, 27 Nov 2019 16:06:17 -0500 Subject: [PATCH] [threadpool] Decrement max_working when worker times out (mono/mono#17927) [threadpool] Decrement max_working when worker times out The corresponding code in CoreCLR: https://github.com/dotnet/runtime/blob/mono/mono@bf88f146412d1e2d41779422337184a802f186c9/src/coreclr/src/vm/win32threadpool.cpp#L2156-L2164 The issue is that counter._.max_working is only ever changed by: 1. the monitor_thread when it detects starvation (increment by 1). 2. the hill climbing algorithm (increment or decrement). Creating a periodic load of creating many workers at once followed by a minute or more of quiescence increases the variable worker.counters._.max_working in monitor_thread(). Once that counter matches worker.limit_worker_max, monitor_thread() keeps looping without unparking or creating a thread, even though the actual number of threads/workers is small. If the existing threads are all waiting on work that needs a new thread, then a deadlock occurs. With this change, when a parked worker times out, it will lower max_working to the number of active (working + parked + starting) threads minus itself (but no less than limit_worker_min). As a result, monitor_thread will only increment max_working as long as none of the already running workers are timing out. Attempt to address https://github.com/mono/mono/issues/17833 --- Also allow monitor_thread to start workers if there aren't enough active, even if the max worker limit has been reached. Work around a pathological condition where the work_item_count is non-zero, and the max_working limit has been reached, but the number of active threads is still below the max. In that case, unpark some workers and start some new threads. May address http://work.azdo.io/827206 Commit migrated from https://github.com/mono/mono/commit/b6a351c330836dd59f1458beecda62de6cbee3e5 --- src/mono/mono/metadata/threadpool-worker-default.c | 67 ++++++++++++++++++---- 1 file changed, 56 insertions(+), 11 deletions(-) diff --git a/src/mono/mono/metadata/threadpool-worker-default.c b/src/mono/mono/metadata/threadpool-worker-default.c index 1491597..a7119f3 100644 --- a/src/mono/mono/metadata/threadpool-worker-default.c +++ b/src/mono/mono/metadata/threadpool-worker-default.c @@ -194,6 +194,14 @@ COUNTER_READ (void) return counter; } +static gint16 +counter_num_active (ThreadPoolWorkerCounter counter) +{ + gint16 num_active = counter._.starting + counter._.working + counter._.parked; + g_assert (num_active >= 0); + return num_active; +} + static guint32 rand_next (guint32 min, guint32 max) { @@ -442,6 +450,8 @@ worker_try_unpark (void) return res; } +static void hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition); + static gsize WINAPI worker_thread (gpointer unused) { @@ -462,6 +472,7 @@ worker_thread (gpointer unused) thread = mono_thread_internal_current (); g_assert (thread); + gboolean worker_timed_out = FALSE; while (!mono_runtime_is_shutting_down ()) { if (mono_thread_interruption_checkpoint_bool ()) continue; @@ -477,8 +488,10 @@ worker_thread (gpointer unused) if (!work_item_try_pop ()) { gboolean const timeout = worker_park (); - if (timeout) + if (timeout) { + worker_timed_out = TRUE; break; + } continue; } @@ -493,6 +506,19 @@ worker_thread (gpointer unused) counter._.working --; }); + if (worker_timed_out) { + gint16 decr_max_working; + COUNTER_ATOMIC (counter, { + decr_max_working = MAX (worker.limit_worker_min, MIN (counter_num_active (counter), counter._.max_working)); + counter._.max_working = decr_max_working; + }); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker timed out, starting = %d working = %d parked = %d, setting max_working to %d", + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())), + counter._.starting, counter._.working, counter._.parked, + decr_max_working); + hill_climbing_force_change (decr_max_working, TRANSITION_THREAD_TIMED_OUT); + } + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] worker finishing", GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ()))); @@ -653,8 +679,6 @@ monitor_sufficient_delay_since_last_dequeue (void) return mono_msec_ticks () >= worker.heuristic_last_dequeue + threshold; } -static void hill_climbing_force_change (gint16 new_thread_count, ThreadPoolHeuristicStateTransition transition); - static gsize WINAPI monitor_thread (gpointer unused) { @@ -682,9 +706,16 @@ monitor_thread (gpointer unused) g_assert (worker.monitor_status != MONITOR_STATUS_NOT_RUNNING); - // counter = COUNTER_READ (); - // printf ("monitor_thread: starting = %d working = %d parked = %d max_working = %d\n", - // counter._.starting, counter._.working, counter._.parked, counter._.max_working); +#if 0 + // This is ifdef'd out because otherwise we flood the log every + // MONITOR_INTERVAL ms, which is pretty noisy. + if (mono_trace_is_traced (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL)) { + ThreadPoolWorkerCounter trace_counter = COUNTER_READ (); + gint32 work_items = work_item_count (); + mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "monitor_thread: work items = %d, starting = %d working = %d parked = %d max_working = %d\n", + work_items, trace_counter._.starting, trace_counter._.working, trace_counter._.parked, trace_counter._.max_working); + } +#endif do { gint64 ts; @@ -715,20 +746,33 @@ monitor_thread (gpointer unused) if (!monitor_sufficient_delay_since_last_dequeue ()) continue; - limit_worker_max_reached = FALSE; + gboolean active_max_reached; COUNTER_ATOMIC (counter, { + limit_worker_max_reached = FALSE; + active_max_reached = FALSE; if (counter._.max_working >= worker.limit_worker_max) { limit_worker_max_reached = TRUE; + if (counter_num_active (counter) >= counter._.max_working) + active_max_reached = TRUE; break; } counter._.max_working ++; }); - if (limit_worker_max_reached) - continue; - - hill_climbing_force_change (counter._.max_working, TRANSITION_STARVATION); + if (limit_worker_max_reached) { + mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] monitor thread, limit_worker_max (%d) reached", + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())), + worker.limit_worker_max); + if (active_max_reached) + continue; + else + mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] monitor thread, num_active (%d) < max_working, allowing active thread increase", + GUINT_TO_POINTER (MONO_NATIVE_THREAD_ID_TO_UINT (mono_native_thread_id_get ())), + counter_num_active (counter)); + } + else + hill_climbing_force_change (counter._.max_working, TRANSITION_STARVATION); for (i = 0; i < 5; ++i) { if (mono_runtime_is_shutting_down ()) @@ -1084,6 +1128,7 @@ heuristic_adjust (void) counter._.max_working = new_thread_count; }); + /* FIXME: this can never be true. we only leave COUNTER_ATOMIC() if the assignment and CAS succeeded */ if (new_thread_count > counter._.max_working) worker_request (); -- 2.7.4