From 5dd4d0d46fb892975bbb3a086da5a3a9996ced4d Mon Sep 17 00:00:00 2001 From: AndreyChurbanov Date: Tue, 22 Jun 2021 16:29:01 +0300 Subject: [PATCH] [OpenMP] libomp: fix dynamic loop dispatcher Restructured dynamic loop dispatcher code. Fixed use of dispatch buffers for nonmonotonic dynamic (static_steal) schedule: - eliminated possibility of stealing iterations of the wrong loop when victim thread changed its buffer to work on another loop; - fixed race when victim thread changed its buffer to work in nested parallel; - eliminated "static" property of the schedule, that is now a single thread can execute whole loop. Differential Revision: https://reviews.llvm.org/D103648 --- openmp/runtime/src/kmp.h | 29 +- openmp/runtime/src/kmp_dispatch.cpp | 455 ++++++++++++--------- openmp/runtime/src/kmp_dispatch.h | 9 +- openmp/runtime/src/kmp_dispatch_hier.h | 2 +- openmp/runtime/src/kmp_settings.cpp | 7 +- openmp/runtime/test/env/kmp_set_dispatch_buf.c | 6 +- .../test/worksharing/for/kmp_set_dispatch_buf.c | 6 +- .../worksharing/for/omp_for_schedule_runtime.c | 4 +- .../runtime/test/worksharing/for/omp_par_in_loop.c | 28 ++ 9 files changed, 332 insertions(+), 214 deletions(-) create mode 100644 openmp/runtime/test/worksharing/for/omp_par_in_loop.c diff --git a/openmp/runtime/src/kmp.h b/openmp/runtime/src/kmp.h index 0eb3b91..835d4ad 100644 --- a/openmp/runtime/src/kmp.h +++ b/openmp/runtime/src/kmp.h @@ -1675,14 +1675,12 @@ typedef struct KMP_ALIGN_CACHE dispatch_private_info32 { kmp_int32 lb; kmp_int32 st; kmp_int32 tc; - kmp_int32 static_steal_counter; /* for static_steal only; maybe better to put - after ub */ - kmp_lock_t *th_steal_lock; // lock used for chunk stealing - // KMP_ALIGN( 16 ) ensures ( if the KMP_ALIGN macro is turned on ) + kmp_lock_t *steal_lock; // lock used for chunk stealing + // KMP_ALIGN(32) ensures (if the KMP_ALIGN macro is turned on) // a) parm3 is properly aligned and - // b) all parm1-4 are in the same cache line. + // b) all parm1-4 are on the same cache line. // Because of parm1-4 are used together, performance seems to be better - // if they are in the same line (not measured though). + // if they are on the same cache line (not measured though). struct KMP_ALIGN(32) { // AC: changed 16 to 32 in order to simplify template kmp_int32 parm1; // structures in kmp_dispatch.cpp. This should @@ -1694,9 +1692,6 @@ typedef struct KMP_ALIGN_CACHE dispatch_private_info32 { kmp_uint32 ordered_lower; kmp_uint32 ordered_upper; #if KMP_OS_WINDOWS - // This var can be placed in the hole between 'tc' and 'parm1', instead of - // 'static_steal_counter'. It would be nice to measure execution times. - // Conditional if/endif can be removed at all. kmp_int32 last_upper; #endif /* KMP_OS_WINDOWS */ } dispatch_private_info32_t; @@ -1708,9 +1703,7 @@ typedef struct KMP_ALIGN_CACHE dispatch_private_info64 { kmp_int64 lb; /* lower-bound */ kmp_int64 st; /* stride */ kmp_int64 tc; /* trip count (number of iterations) */ - kmp_int64 static_steal_counter; /* for static_steal only; maybe better to put - after ub */ - kmp_lock_t *th_steal_lock; // lock used for chunk stealing + kmp_lock_t *steal_lock; // lock used for chunk stealing /* parm[1-4] are used in different ways by different scheduling algorithms */ // KMP_ALIGN( 32 ) ensures ( if the KMP_ALIGN macro is turned on ) @@ -1729,9 +1722,6 @@ typedef struct KMP_ALIGN_CACHE dispatch_private_info64 { kmp_uint64 ordered_lower; kmp_uint64 ordered_upper; #if KMP_OS_WINDOWS - // This var can be placed in the hole between 'tc' and 'parm1', instead of - // 'static_steal_counter'. It would be nice to measure execution times. - // Conditional if/endif can be removed at all. kmp_int64 last_upper; #endif /* KMP_OS_WINDOWS */ } dispatch_private_info64_t; @@ -1785,9 +1775,8 @@ typedef struct KMP_ALIGN_CACHE dispatch_private_info { } u; enum sched_type schedule; /* scheduling algorithm */ kmp_sched_flags_t flags; /* flags (e.g., ordered, nomerge, etc.) */ + std::atomic steal_flag; // static_steal only, state of a buffer kmp_int32 ordered_bumped; - // To retain the structure size after making ordered_iteration scalar - kmp_int32 ordered_dummy[KMP_MAX_ORDERED - 3]; // Stack of buffers for nest of serial regions struct dispatch_private_info *next; kmp_int32 type_size; /* the size of types in private_info */ @@ -1802,7 +1791,7 @@ typedef struct dispatch_shared_info32 { /* chunk index under dynamic, number of idle threads under static-steal; iteration index otherwise */ volatile kmp_uint32 iteration; - volatile kmp_uint32 num_done; + volatile kmp_int32 num_done; volatile kmp_uint32 ordered_iteration; // Dummy to retain the structure size after making ordered_iteration scalar kmp_int32 ordered_dummy[KMP_MAX_ORDERED - 1]; @@ -1812,7 +1801,7 @@ typedef struct dispatch_shared_info64 { /* chunk index under dynamic, number of idle threads under static-steal; iteration index otherwise */ volatile kmp_uint64 iteration; - volatile kmp_uint64 num_done; + volatile kmp_int64 num_done; volatile kmp_uint64 ordered_iteration; // Dummy to retain the structure size after making ordered_iteration scalar kmp_int64 ordered_dummy[KMP_MAX_ORDERED - 3]; @@ -1848,7 +1837,7 @@ typedef struct kmp_disp { dispatch_private_info_t *th_dispatch_pr_current; dispatch_private_info_t *th_disp_buffer; - kmp_int32 th_disp_index; + kmp_uint32 th_disp_index; kmp_int32 th_doacross_buf_idx; // thread's doacross buffer index volatile kmp_uint32 *th_doacross_flags; // pointer to shared array of flags kmp_int64 *th_doacross_info; // info on loop bounds diff --git a/openmp/runtime/src/kmp_dispatch.cpp b/openmp/runtime/src/kmp_dispatch.cpp index c2e60fc..6687c37 100644 --- a/openmp/runtime/src/kmp_dispatch.cpp +++ b/openmp/runtime/src/kmp_dispatch.cpp @@ -90,6 +90,22 @@ static inline int __kmp_get_monotonicity(ident_t *loc, enum sched_type schedule, return monotonicity; } +#if KMP_STATIC_STEAL_ENABLED +enum { // values for steal_flag (possible states of private per-loop buffer) + UNUSED = 0, + CLAIMED = 1, // owner thread started initialization + READY = 2, // available for stealing + THIEF = 3 // finished by owner, or claimed by thief + // possible state changes: + // 0 -> 1 owner only, sync + // 0 -> 3 thief only, sync + // 1 -> 2 owner only, async + // 2 -> 3 owner only, async + // 3 -> 2 owner only, async + // 3 -> 0 last thread finishing the loop, async +}; +#endif + // Initialize a dispatch_private_info_template buffer for a particular // type of schedule,chunk. The loop description is found in lb (lower bound), // ub (upper bound), and st (stride). nproc is the number of threads relevant @@ -187,6 +203,8 @@ void __kmp_dispatch_init_algorithm(ident_t *loc, int gtid, schedule = team->t.t_sched.r_sched_type; monotonicity = __kmp_get_monotonicity(loc, schedule, use_hier); schedule = SCHEDULE_WITHOUT_MODIFIERS(schedule); + if (pr->flags.ordered) // correct monotonicity for ordered loop if needed + monotonicity = SCHEDULE_MONOTONIC; // Detail the schedule if needed (global controls are differentiated // appropriately) if (schedule == kmp_sch_guided_chunked) { @@ -346,7 +364,7 @@ void __kmp_dispatch_init_algorithm(ident_t *loc, int gtid, } switch (schedule) { -#if (KMP_STATIC_STEAL_ENABLED) +#if KMP_STATIC_STEAL_ENABLED case kmp_sch_static_steal: { T ntc, init; @@ -359,32 +377,37 @@ void __kmp_dispatch_init_algorithm(ident_t *loc, int gtid, KMP_COUNT_BLOCK(OMP_LOOP_STATIC_STEAL); T id = tid; T small_chunk, extras; - + kmp_uint32 old = UNUSED; + int claimed = pr->steal_flag.compare_exchange_strong(old, CLAIMED); + if (traits_t::type_size > 4) { + // AC: TODO: check if 16-byte CAS available and use it to + // improve performance (probably wait for explicit request + // before spending time on this). + // For now use dynamically allocated per-private-buffer lock, + // free memory in __kmp_dispatch_next when status==0. + pr->u.p.steal_lock = (kmp_lock_t *)__kmp_allocate(sizeof(kmp_lock_t)); + __kmp_init_lock(pr->u.p.steal_lock); + } small_chunk = ntc / nproc; extras = ntc % nproc; init = id * small_chunk + (id < extras ? id : extras); pr->u.p.count = init; - pr->u.p.ub = init + small_chunk + (id < extras ? 1 : 0); - - pr->u.p.parm2 = lb; + if (claimed) { // are we succeeded in claiming own buffer? + pr->u.p.ub = init + small_chunk + (id < extras ? 1 : 0); + // Other threads will inspect steal_flag when searching for a victim. + // READY means other threads may steal from this thread from now on. + KMP_ATOMIC_ST_REL(&pr->steal_flag, READY); + } else { + // other thread has stolen whole our range + KMP_DEBUG_ASSERT(pr->steal_flag == THIEF); + pr->u.p.ub = init; // mark there is no iterations to work on + } + pr->u.p.parm2 = ntc; // save number of chunks // parm3 is the number of times to attempt stealing which is - // proportional to the number of chunks per thread up until - // the maximum value of nproc. - pr->u.p.parm3 = KMP_MIN(small_chunk + extras, nproc); + // nproc (just a heuristics, could be optimized later on). + pr->u.p.parm3 = nproc; pr->u.p.parm4 = (id + 1) % nproc; // remember neighbour tid - pr->u.p.st = st; - if (traits_t::type_size > 4) { - // AC: TODO: check if 16-byte CAS available and use it to - // improve performance (probably wait for explicit request - // before spending time on this). - // For now use dynamically allocated per-thread lock, - // free memory in __kmp_dispatch_next when status==0. - KMP_DEBUG_ASSERT(pr->u.p.th_steal_lock == NULL); - pr->u.p.th_steal_lock = - (kmp_lock_t *)__kmp_allocate(sizeof(kmp_lock_t)); - __kmp_init_lock(pr->u.p.th_steal_lock); - } break; } else { /* too few chunks: switching to kmp_sch_dynamic_chunked */ @@ -881,6 +904,18 @@ __kmp_dispatch_init(ident_t *loc, int gtid, enum sched_type schedule, T lb, &team->t.t_disp_buffer[my_buffer_index % __kmp_dispatch_num_buffers]); KD_TRACE(10, ("__kmp_dispatch_init: T#%d my_buffer_index:%d\n", gtid, my_buffer_index)); + if (sh->buffer_index != my_buffer_index) { // too many loops in progress? + KD_TRACE(100, ("__kmp_dispatch_init: T#%d before wait: my_buffer_index:%d" + " sh->buffer_index:%d\n", + gtid, my_buffer_index, sh->buffer_index)); + __kmp_wait(&sh->buffer_index, my_buffer_index, + __kmp_eq USE_ITT_BUILD_ARG(NULL)); + // Note: KMP_WAIT() cannot be used there: buffer index and + // my_buffer_index are *always* 32-bit integers. + KD_TRACE(100, ("__kmp_dispatch_init: T#%d after wait: my_buffer_index:%d " + "sh->buffer_index:%d\n", + gtid, my_buffer_index, sh->buffer_index)); + } } __kmp_dispatch_init_algorithm(loc, gtid, pr, schedule, lb, ub, st, @@ -897,24 +932,6 @@ __kmp_dispatch_init(ident_t *loc, int gtid, enum sched_type schedule, T lb, th->th.th_dispatch->th_deo_fcn = __kmp_dispatch_deo; th->th.th_dispatch->th_dxo_fcn = __kmp_dispatch_dxo; } - } - - if (active) { - /* The name of this buffer should be my_buffer_index when it's free to use - * it */ - - KD_TRACE(100, ("__kmp_dispatch_init: T#%d before wait: my_buffer_index:%d " - "sh->buffer_index:%d\n", - gtid, my_buffer_index, sh->buffer_index)); - __kmp_wait(&sh->buffer_index, my_buffer_index, - __kmp_eq USE_ITT_BUILD_ARG(NULL)); - // Note: KMP_WAIT() cannot be used there: buffer index and - // my_buffer_index are *always* 32-bit integers. - KMP_MB(); /* is this necessary? */ - KD_TRACE(100, ("__kmp_dispatch_init: T#%d after wait: my_buffer_index:%d " - "sh->buffer_index:%d\n", - gtid, my_buffer_index, sh->buffer_index)); - th->th.th_dispatch->th_dispatch_pr_current = (dispatch_private_info_t *)pr; th->th.th_dispatch->th_dispatch_sh_current = CCAST(dispatch_shared_info_t *, (volatile dispatch_shared_info_t *)sh); @@ -978,21 +995,6 @@ __kmp_dispatch_init(ident_t *loc, int gtid, enum sched_type schedule, T lb, __kmp_str_free(&buff); } #endif -#if (KMP_STATIC_STEAL_ENABLED) - // It cannot be guaranteed that after execution of a loop with some other - // schedule kind all the parm3 variables will contain the same value. Even if - // all parm3 will be the same, it still exists a bad case like using 0 and 1 - // rather than program life-time increment. So the dedicated variable is - // required. The 'static_steal_counter' is used. - if (pr->schedule == kmp_sch_static_steal) { - // Other threads will inspect this variable when searching for a victim. - // This is a flag showing that other threads may steal from this thread - // since then. - volatile T *p = &pr->u.p.static_steal_counter; - *p = *p + 1; - } -#endif // ( KMP_STATIC_STEAL_ENABLED ) - #if OMPT_SUPPORT && OMPT_OPTIONAL if (ompt_enabled.ompt_callback_work) { ompt_team_info_t *team_info = __ompt_get_teaminfo(0, NULL); @@ -1082,7 +1084,6 @@ static void __kmp_dispatch_finish_chunk(int gtid, ident_t *loc) { KD_TRACE(100, ("__kmp_dispatch_finish_chunk: T#%d called\n", gtid)); if (!th->th.th_team->t.t_serialized) { - // int cid; dispatch_private_info_template *pr = reinterpret_cast *>( th->th.th_dispatch->th_dispatch_pr_current); @@ -1094,7 +1095,6 @@ static void __kmp_dispatch_finish_chunk(int gtid, ident_t *loc) { KMP_DEBUG_ASSERT(th->th.th_dispatch == &th->th.th_team->t.t_dispatch[th->th.th_info.ds.ds_tid]); - // for (cid = 0; cid < KMP_MAX_ORDERED; ++cid) { UT lower = pr->u.p.ordered_lower; UT upper = pr->u.p.ordered_upper; UT inc = upper - lower + 1; @@ -1200,10 +1200,10 @@ int __kmp_dispatch_next_algorithm(int gtid, } switch (pr->schedule) { -#if (KMP_STATIC_STEAL_ENABLED) +#if KMP_STATIC_STEAL_ENABLED case kmp_sch_static_steal: { T chunk = pr->u.p.parm1; - + UT nchunks = pr->u.p.parm2; KD_TRACE(100, ("__kmp_dispatch_next_algorithm: T#%d kmp_sch_static_steal case\n", gtid)); @@ -1211,11 +1211,12 @@ int __kmp_dispatch_next_algorithm(int gtid, trip = pr->u.p.tc - 1; if (traits_t::type_size > 4) { - // use lock for 8-byte and CAS for 4-byte induction - // variable. TODO (optional): check and use 16-byte CAS - kmp_lock_t *lck = pr->u.p.th_steal_lock; + // use lock for 8-byte induction variable. + // TODO (optional): check presence and use 16-byte CAS + kmp_lock_t *lck = pr->u.p.steal_lock; KMP_DEBUG_ASSERT(lck != NULL); if (pr->u.p.count < (UT)pr->u.p.ub) { + KMP_DEBUG_ASSERT(pr->steal_flag == READY); __kmp_acquire_lock(lck, gtid); // try to get own chunk of iterations init = (pr->u.p.count)++; @@ -1225,76 +1226,122 @@ int __kmp_dispatch_next_algorithm(int gtid, status = 0; // no own chunks } if (!status) { // try to steal - kmp_info_t **other_threads = team->t.t_threads; + kmp_lock_t *lckv; // victim buffer's lock T while_limit = pr->u.p.parm3; T while_index = 0; - T id = pr->u.p.static_steal_counter; // loop id int idx = (th->th.th_dispatch->th_disp_index - 1) % __kmp_dispatch_num_buffers; // current loop index // note: victim thread can potentially execute another loop - // TODO: algorithm of searching for a victim - // should be cleaned up and measured + KMP_ATOMIC_ST_REL(&pr->steal_flag, THIEF); // mark self buffer inactive while ((!status) && (while_limit != ++while_index)) { - dispatch_private_info_template *victim; + dispatch_private_info_template *v; T remaining; - T victimIdx = pr->u.p.parm4; - T oldVictimIdx = victimIdx ? victimIdx - 1 : nproc - 1; - victim = reinterpret_cast *>( - &other_threads[victimIdx]->th.th_dispatch->th_disp_buffer[idx]); - KMP_DEBUG_ASSERT(victim); - while ((victim == pr || id != victim->u.p.static_steal_counter) && - oldVictimIdx != victimIdx) { - victimIdx = (victimIdx + 1) % nproc; - victim = reinterpret_cast *>( - &other_threads[victimIdx]->th.th_dispatch->th_disp_buffer[idx]); - KMP_DEBUG_ASSERT(victim); + T victimId = pr->u.p.parm4; + T oldVictimId = victimId ? victimId - 1 : nproc - 1; + v = reinterpret_cast *>( + &team->t.t_dispatch[victimId].th_disp_buffer[idx]); + KMP_DEBUG_ASSERT(v); + while ((v == pr || KMP_ATOMIC_LD_RLX(&v->steal_flag) == THIEF) && + oldVictimId != victimId) { + victimId = (victimId + 1) % nproc; + v = reinterpret_cast *>( + &team->t.t_dispatch[victimId].th_disp_buffer[idx]); + KMP_DEBUG_ASSERT(v); } - if (victim == pr || id != victim->u.p.static_steal_counter) { + if (v == pr || KMP_ATOMIC_LD_RLX(&v->steal_flag) == THIEF) { continue; // try once more (nproc attempts in total) - // no victim is ready yet to participate in stealing - // because no victim passed kmp_init_dispatch yet } - if (victim->u.p.count + 2 > (UT)victim->u.p.ub) { - pr->u.p.parm4 = (victimIdx + 1) % nproc; // shift start tid - continue; // not enough chunks to steal, goto next victim + if (KMP_ATOMIC_LD_RLX(&v->steal_flag) == UNUSED) { + kmp_uint32 old = UNUSED; + // try to steal whole range from inactive victim + status = v->steal_flag.compare_exchange_strong(old, THIEF); + if (status) { + // initialize self buffer with victim's whole range of chunks + T id = victimId; + T small_chunk, extras; + small_chunk = nchunks / nproc; // chunks per thread + extras = nchunks % nproc; + init = id * small_chunk + (id < extras ? id : extras); + __kmp_acquire_lock(lck, gtid); + pr->u.p.count = init + 1; // exclude one we execute immediately + pr->u.p.ub = init + small_chunk + (id < extras ? 1 : 0); + __kmp_release_lock(lck, gtid); + pr->u.p.parm4 = (id + 1) % nproc; // remember neighbour tid + // no need to reinitialize other thread invariants: lb, st, etc. +#ifdef KMP_DEBUG + { + char *buff; + // create format specifiers before the debug output + buff = __kmp_str_format( + "__kmp_dispatch_next: T#%%d stolen chunks from T#%%d, " + "count:%%%s ub:%%%s\n", + traits_t::spec, traits_t::spec); + KD_TRACE(10, (buff, gtid, id, pr->u.p.count, pr->u.p.ub)); + __kmp_str_free(&buff); + } +#endif + // activate non-empty buffer and let others steal from us + if (pr->u.p.count < (UT)pr->u.p.ub) + KMP_ATOMIC_ST_REL(&pr->steal_flag, READY); + break; + } } - - lck = victim->u.p.th_steal_lock; - KMP_ASSERT(lck != NULL); - __kmp_acquire_lock(lck, gtid); - limit = victim->u.p.ub; // keep initial ub - if (victim->u.p.count >= limit || - (remaining = limit - victim->u.p.count) < 2) { - __kmp_release_lock(lck, gtid); - pr->u.p.parm4 = (victimIdx + 1) % nproc; // next victim - continue; // not enough chunks to steal + if (KMP_ATOMIC_LD_RLX(&v->steal_flag) != READY || + v->u.p.count >= (UT)v->u.p.ub) { + pr->u.p.parm4 = (victimId + 1) % nproc; // shift start victim tid + continue; // no chunks to steal, try next victim } - // stealing succeeded, reduce victim's ub by 1/4 of undone chunks or - // by 1 - if (remaining > 3) { + lckv = v->u.p.steal_lock; + KMP_ASSERT(lckv != NULL); + __kmp_acquire_lock(lckv, gtid); + limit = v->u.p.ub; // keep initial ub + if (v->u.p.count >= limit) { + __kmp_release_lock(lckv, gtid); + pr->u.p.parm4 = (victimId + 1) % nproc; // shift start victim tid + continue; // no chunks to steal, try next victim + } + + // stealing succeded, reduce victim's ub by 1/4 of undone chunks + // TODO: is this heuristics good enough?? + remaining = limit - v->u.p.count; + if (remaining > 7) { // steal 1/4 of remaining KMP_COUNT_DEVELOPER_VALUE(FOR_static_steal_stolen, remaining >> 2); - init = (victim->u.p.ub -= (remaining >> 2)); + init = (v->u.p.ub -= (remaining >> 2)); } else { - // steal 1 chunk of 2 or 3 remaining + // steal 1 chunk of 1..7 remaining KMP_COUNT_DEVELOPER_VALUE(FOR_static_steal_stolen, 1); - init = (victim->u.p.ub -= 1); + init = (v->u.p.ub -= 1); } - __kmp_release_lock(lck, gtid); - + __kmp_release_lock(lckv, gtid); +#ifdef KMP_DEBUG + { + char *buff; + // create format specifiers before the debug output + buff = __kmp_str_format( + "__kmp_dispatch_next: T#%%d stolen chunks from T#%%d, " + "count:%%%s ub:%%%s\n", + traits_t::spec, traits_t::spec); + KD_TRACE(10, (buff, gtid, victimId, init, limit)); + __kmp_str_free(&buff); + } +#endif KMP_DEBUG_ASSERT(init + 1 <= limit); - pr->u.p.parm4 = victimIdx; // remember victim to steal from + pr->u.p.parm4 = victimId; // remember victim to steal from status = 1; - while_index = 0; - // now update own count and ub with stolen range but init chunk - __kmp_acquire_lock(pr->u.p.th_steal_lock, gtid); + // now update own count and ub with stolen range excluding init chunk + __kmp_acquire_lock(lck, gtid); pr->u.p.count = init + 1; pr->u.p.ub = limit; - __kmp_release_lock(pr->u.p.th_steal_lock, gtid); + __kmp_release_lock(lck, gtid); + // activate non-empty buffer and let others steal from us + if (init + 1 < limit) + KMP_ATOMIC_ST_REL(&pr->steal_flag, READY); } // while (search for victim) } // if (try to find victim and steal) } else { // 4-byte induction variable, use 8-byte CAS for pair (count, ub) + // as all operations on pair (count, ub) must be done atomically typedef union { struct { UT count; @@ -1302,86 +1349,129 @@ int __kmp_dispatch_next_algorithm(int gtid, } p; kmp_int64 b; } union_i4; - // All operations on 'count' or 'ub' must be combined atomically - // together. - { - union_i4 vold, vnew; + union_i4 vold, vnew; + if (pr->u.p.count < (UT)pr->u.p.ub) { + KMP_DEBUG_ASSERT(pr->steal_flag == READY); vold.b = *(volatile kmp_int64 *)(&pr->u.p.count); - vnew = vold; - vnew.p.count++; - while (!KMP_COMPARE_AND_STORE_ACQ64( + vnew.b = vold.b; + vnew.p.count++; // get chunk from head of self range + while (!KMP_COMPARE_AND_STORE_REL64( (volatile kmp_int64 *)&pr->u.p.count, *VOLATILE_CAST(kmp_int64 *) & vold.b, *VOLATILE_CAST(kmp_int64 *) & vnew.b)) { KMP_CPU_PAUSE(); vold.b = *(volatile kmp_int64 *)(&pr->u.p.count); - vnew = vold; + vnew.b = vold.b; vnew.p.count++; } - vnew = vold; - init = vnew.p.count; - status = (init < (UT)vnew.p.ub); + init = vold.p.count; + status = (init < (UT)vold.p.ub); + } else { + status = 0; // no own chunks } - - if (!status) { - kmp_info_t **other_threads = team->t.t_threads; + if (!status) { // try to steal T while_limit = pr->u.p.parm3; T while_index = 0; - T id = pr->u.p.static_steal_counter; // loop id int idx = (th->th.th_dispatch->th_disp_index - 1) % __kmp_dispatch_num_buffers; // current loop index // note: victim thread can potentially execute another loop - // TODO: algorithm of searching for a victim - // should be cleaned up and measured + KMP_ATOMIC_ST_REL(&pr->steal_flag, THIEF); // mark self buffer inactive while ((!status) && (while_limit != ++while_index)) { - dispatch_private_info_template *victim; - union_i4 vold, vnew; + dispatch_private_info_template *v; T remaining; - T victimIdx = pr->u.p.parm4; - T oldVictimIdx = victimIdx ? victimIdx - 1 : nproc - 1; - victim = reinterpret_cast *>( - &other_threads[victimIdx]->th.th_dispatch->th_disp_buffer[idx]); - KMP_DEBUG_ASSERT(victim); - while ((victim == pr || id != victim->u.p.static_steal_counter) && - oldVictimIdx != victimIdx) { - victimIdx = (victimIdx + 1) % nproc; - victim = reinterpret_cast *>( - &other_threads[victimIdx]->th.th_dispatch->th_disp_buffer[idx]); - KMP_DEBUG_ASSERT(victim); + T victimId = pr->u.p.parm4; + T oldVictimId = victimId ? victimId - 1 : nproc - 1; + v = reinterpret_cast *>( + &team->t.t_dispatch[victimId].th_disp_buffer[idx]); + KMP_DEBUG_ASSERT(v); + while ((v == pr || KMP_ATOMIC_LD_RLX(&v->steal_flag) == THIEF) && + oldVictimId != victimId) { + victimId = (victimId + 1) % nproc; + v = reinterpret_cast *>( + &team->t.t_dispatch[victimId].th_disp_buffer[idx]); + KMP_DEBUG_ASSERT(v); } - if (victim == pr || id != victim->u.p.static_steal_counter) { + if (v == pr || KMP_ATOMIC_LD_RLX(&v->steal_flag) == THIEF) { continue; // try once more (nproc attempts in total) - // no victim is ready yet to participate in stealing - // because no victim passed kmp_init_dispatch yet } - pr->u.p.parm4 = victimIdx; // new victim found - while (1) { // CAS loop if victim has enough chunks to steal - vold.b = *(volatile kmp_int64 *)(&victim->u.p.count); - vnew = vold; - - KMP_DEBUG_ASSERT((vnew.p.ub - 1) * (UT)chunk <= trip); - if (vnew.p.count >= (UT)vnew.p.ub || - (remaining = vnew.p.ub - vnew.p.count) < 2) { - pr->u.p.parm4 = (victimIdx + 1) % nproc; // shift start victim id - break; // not enough chunks to steal, goto next victim + if (KMP_ATOMIC_LD_RLX(&v->steal_flag) == UNUSED) { + kmp_uint32 old = UNUSED; + // try to steal whole range from inactive victim + status = v->steal_flag.compare_exchange_strong(old, THIEF); + if (status) { + // initialize self buffer with victim's whole range of chunks + T id = victimId; + T small_chunk, extras; + small_chunk = nchunks / nproc; // chunks per thread + extras = nchunks % nproc; + init = id * small_chunk + (id < extras ? id : extras); + vnew.p.count = init + 1; + vnew.p.ub = init + small_chunk + (id < extras ? 1 : 0); + // write pair (count, ub) at once atomically +#if KMP_ARCH_X86 + KMP_XCHG_FIXED64((volatile kmp_int64 *)(&pr->u.p.count), vnew.b); +#else + *(volatile kmp_int64 *)(&pr->u.p.count) = vnew.b; +#endif + pr->u.p.parm4 = (id + 1) % nproc; // remember neighbour tid + // no need to initialize other thread invariants: lb, st, etc. +#ifdef KMP_DEBUG + { + char *buff; + // create format specifiers before the debug output + buff = __kmp_str_format( + "__kmp_dispatch_next: T#%%d stolen chunks from T#%%d, " + "count:%%%s ub:%%%s\n", + traits_t::spec, traits_t::spec); + KD_TRACE(10, (buff, gtid, id, pr->u.p.count, pr->u.p.ub)); + __kmp_str_free(&buff); + } +#endif + // activate non-empty buffer and let others steal from us + if (pr->u.p.count < (UT)pr->u.p.ub) + KMP_ATOMIC_ST_REL(&pr->steal_flag, READY); + break; } - if (remaining > 3) { - // try to steal 1/4 of remaining - vnew.p.ub -= remaining >> 2; + } + while (1) { // CAS loop with check if victim still has enough chunks + // many threads may be stealing concurrently from same victim + vold.b = *(volatile kmp_int64 *)(&v->u.p.count); + if (KMP_ATOMIC_LD_ACQ(&v->steal_flag) != READY || + vold.p.count >= (UT)vold.p.ub) { + pr->u.p.parm4 = (victimId + 1) % nproc; // shift start victim id + break; // no chunks to steal, try next victim + } + vnew.b = vold.b; + remaining = vold.p.ub - vold.p.count; + // try to steal 1/4 of remaining + // TODO: is this heuristics good enough?? + if (remaining > 7) { + vnew.p.ub -= remaining >> 2; // steal from tail of victim's range } else { - vnew.p.ub -= 1; // steal 1 chunk of 2 or 3 remaining + vnew.p.ub -= 1; // steal 1 chunk of 1..7 remaining } KMP_DEBUG_ASSERT((vnew.p.ub - 1) * (UT)chunk <= trip); - // TODO: Should this be acquire or release? - if (KMP_COMPARE_AND_STORE_ACQ64( - (volatile kmp_int64 *)&victim->u.p.count, + if (KMP_COMPARE_AND_STORE_REL64( + (volatile kmp_int64 *)&v->u.p.count, *VOLATILE_CAST(kmp_int64 *) & vold.b, *VOLATILE_CAST(kmp_int64 *) & vnew.b)) { - // stealing succeeded + // stealing succedded +#ifdef KMP_DEBUG + { + char *buff; + // create format specifiers before the debug output + buff = __kmp_str_format( + "__kmp_dispatch_next: T#%%d stolen chunks from T#%%d, " + "count:%%%s ub:%%%s\n", + traits_t::spec, traits_t::spec); + KD_TRACE(10, (buff, gtid, victimId, vnew.p.ub, vold.p.ub)); + __kmp_str_free(&buff); + } +#endif KMP_COUNT_DEVELOPER_VALUE(FOR_static_steal_stolen, vold.p.ub - vnew.p.ub); status = 1; - while_index = 0; + pr->u.p.parm4 = victimId; // keep victim id // now update own count and ub init = vnew.p.ub; vold.p.count = init + 1; @@ -1390,6 +1480,9 @@ int __kmp_dispatch_next_algorithm(int gtid, #else *(volatile kmp_int64 *)(&pr->u.p.count) = vold.b; #endif + // activate non-empty buffer and let others steal from us + if (vold.p.count < (UT)vold.p.ub) + KMP_ATOMIC_ST_REL(&pr->steal_flag, READY); break; } // if (check CAS result) KMP_CPU_PAUSE(); // CAS failed, repeatedly attempt @@ -1403,13 +1496,16 @@ int __kmp_dispatch_next_algorithm(int gtid, if (p_st != NULL) *p_st = 0; } else { - start = pr->u.p.parm2; + start = pr->u.p.lb; init *= chunk; limit = chunk + init - 1; incr = pr->u.p.st; KMP_COUNT_DEVELOPER_VALUE(FOR_static_steal_chunks, 1); KMP_DEBUG_ASSERT(init <= trip); + // keep track of done chunks for possible early exit from stealing + // TODO: count executed chunks locally with rare update of shared location + // test_then_inc((volatile ST *)&sh->u.s.iteration); if ((last = (limit >= trip)) != 0) limit = trip; if (p_st != NULL) @@ -1422,15 +1518,10 @@ int __kmp_dispatch_next_algorithm(int gtid, *p_lb = start + init * incr; *p_ub = start + limit * incr; } - - if (pr->flags.ordered) { - pr->u.p.ordered_lower = init; - pr->u.p.ordered_upper = limit; - } // if } // if break; } // case -#endif // ( KMP_STATIC_STEAL_ENABLED ) +#endif // KMP_STATIC_STEAL_ENABLED case kmp_sch_static_balanced: { KD_TRACE( 10, @@ -2075,16 +2166,15 @@ static int __kmp_dispatch_next(ident_t *loc, int gtid, kmp_int32 *p_last, th->th.th_info.ds.ds_tid); // status == 0: no more iterations to execute if (status == 0) { - UT num_done; - - num_done = test_then_inc((volatile ST *)&sh->u.s.num_done); + ST num_done; + num_done = test_then_inc(&sh->u.s.num_done); #ifdef KMP_DEBUG { char *buff; // create format specifiers before the debug output buff = __kmp_str_format( "__kmp_dispatch_next: T#%%d increment num_done:%%%s\n", - traits_t::spec); + traits_t::spec); KD_TRACE(10, (buff, gtid, sh->u.s.num_done)); __kmp_str_free(&buff); } @@ -2093,28 +2183,31 @@ static int __kmp_dispatch_next(ident_t *loc, int gtid, kmp_int32 *p_last, #if KMP_USE_HIER_SCHED pr->flags.use_hier = FALSE; #endif - if ((ST)num_done == th->th.th_team_nproc - 1) { -#if (KMP_STATIC_STEAL_ENABLED) - if (pr->schedule == kmp_sch_static_steal && - traits_t::type_size > 4) { + if (num_done == th->th.th_team_nproc - 1) { +#if KMP_STATIC_STEAL_ENABLED + if (pr->schedule == kmp_sch_static_steal) { int i; int idx = (th->th.th_dispatch->th_disp_index - 1) % __kmp_dispatch_num_buffers; // current loop index - kmp_info_t **other_threads = team->t.t_threads; // loop complete, safe to destroy locks used for stealing for (i = 0; i < th->th.th_team_nproc; ++i) { dispatch_private_info_template *buf = reinterpret_cast *>( - &other_threads[i]->th.th_dispatch->th_disp_buffer[idx]); - kmp_lock_t *lck = buf->u.p.th_steal_lock; - KMP_ASSERT(lck != NULL); - __kmp_destroy_lock(lck); - __kmp_free(lck); - buf->u.p.th_steal_lock = NULL; + &team->t.t_dispatch[i].th_disp_buffer[idx]); + KMP_ASSERT(buf->steal_flag == THIEF); // buffer must be inactive + KMP_ATOMIC_ST_RLX(&buf->steal_flag, UNUSED); + if (traits_t::type_size > 4) { + // destroy locks used for stealing + kmp_lock_t *lck = buf->u.p.steal_lock; + KMP_ASSERT(lck != NULL); + __kmp_destroy_lock(lck); + __kmp_free(lck); + buf->u.p.steal_lock = NULL; + } } } #endif - /* NOTE: release this buffer to be reused */ + /* NOTE: release shared buffer to be reused */ KMP_MB(); /* Flush all pending memory write invalidates. */ @@ -2126,8 +2219,6 @@ static int __kmp_dispatch_next(ident_t *loc, int gtid, kmp_int32 *p_last, sh->u.s.ordered_iteration = 0; } - KMP_MB(); /* Flush all pending memory write invalidates. */ - sh->buffer_index += __kmp_dispatch_num_buffers; KD_TRACE(100, ("__kmp_dispatch_next: T#%d change buffer_index:%d\n", gtid, sh->buffer_index)); diff --git a/openmp/runtime/src/kmp_dispatch.h b/openmp/runtime/src/kmp_dispatch.h index 1f98e4b..ae11361 100644 --- a/openmp/runtime/src/kmp_dispatch.h +++ b/openmp/runtime/src/kmp_dispatch.h @@ -74,8 +74,7 @@ template struct dispatch_private_infoXX_template { T lb; ST st; // signed UT tc; // unsigned - T static_steal_counter; // for static_steal only; maybe better to put after ub - kmp_lock_t *th_steal_lock; // lock used for chunk stealing + kmp_lock_t *steal_lock; // lock used for chunk stealing /* parm[1-4] are used in different ways by different scheduling algorithms */ // KMP_ALIGN( 32 ) ensures ( if the KMP_ALIGN macro is turned on ) @@ -134,9 +133,8 @@ template struct KMP_ALIGN_CACHE dispatch_private_info_template { } u; enum sched_type schedule; /* scheduling algorithm */ kmp_sched_flags_t flags; /* flags (e.g., ordered, nomerge, etc.) */ + std::atomic steal_flag; // static_steal only, state of a buffer kmp_uint32 ordered_bumped; - // to retain the structure size after making order - kmp_int32 ordered_dummy[KMP_MAX_ORDERED - 3]; dispatch_private_info *next; /* stack of buffers for nest of serial regions */ kmp_uint32 type_size; #if KMP_USE_HIER_SCHED @@ -153,10 +151,11 @@ template struct KMP_ALIGN_CACHE dispatch_private_info_template { // dispatch_shared_info{32,64}_t types template struct dispatch_shared_infoXX_template { typedef typename traits_t::unsigned_t UT; + typedef typename traits_t::signed_t ST; /* chunk index under dynamic, number of idle threads under static-steal; iteration index otherwise */ volatile UT iteration; - volatile UT num_done; + volatile ST num_done; volatile UT ordered_iteration; // to retain the structure size making ordered_iteration scalar UT ordered_dummy[KMP_MAX_ORDERED - 3]; diff --git a/openmp/runtime/src/kmp_dispatch_hier.h b/openmp/runtime/src/kmp_dispatch_hier.h index 1181970..dbea088 100644 --- a/openmp/runtime/src/kmp_dispatch_hier.h +++ b/openmp/runtime/src/kmp_dispatch_hier.h @@ -924,7 +924,7 @@ void __kmp_dispatch_init_hierarchy(ident_t *loc, int n, T lb, T ub, typename traits_t::signed_t st) { int tid, gtid, num_hw_threads, num_threads_per_layer1, active; - int my_buffer_index; + unsigned int my_buffer_index; kmp_info_t *th; kmp_team_t *team; dispatch_private_info_template *pr; diff --git a/openmp/runtime/src/kmp_settings.cpp b/openmp/runtime/src/kmp_settings.cpp index 8e4da0b..a98a2a4 100644 --- a/openmp/runtime/src/kmp_settings.cpp +++ b/openmp/runtime/src/kmp_settings.cpp @@ -4022,8 +4022,11 @@ static const char *__kmp_parse_single_omp_schedule(const char *name, else if (!__kmp_strcasecmp_with_sentinel("static", ptr, *delim)) sched = kmp_sch_static; #if KMP_STATIC_STEAL_ENABLED - else if (!__kmp_strcasecmp_with_sentinel("static_steal", ptr, *delim)) - sched = kmp_sch_static_steal; + else if (!__kmp_strcasecmp_with_sentinel("static_steal", ptr, *delim)) { + // replace static_steal with dynamic to better cope with ordered loops + sched = kmp_sch_dynamic_chunked; + sched_modifier = sched_type::kmp_sch_modifier_nonmonotonic; + } #endif else { // If there is no proper schedule kind, then this schedule is invalid diff --git a/openmp/runtime/test/env/kmp_set_dispatch_buf.c b/openmp/runtime/test/env/kmp_set_dispatch_buf.c index 0177623..539bb5a 100644 --- a/openmp/runtime/test/env/kmp_set_dispatch_buf.c +++ b/openmp/runtime/test/env/kmp_set_dispatch_buf.c @@ -9,7 +9,7 @@ // RUN: env KMP_DISP_NUM_BUFFERS=3 %libomp-run // RUN: env KMP_DISP_NUM_BUFFERS=4 %libomp-run // RUN: env KMP_DISP_NUM_BUFFERS=7 %libomp-run -// UNSUPPORTED: clang-11, clang-12, clang-13 +// UNSUPPORTED: clang-11, clang-12 #include #include #include @@ -78,5 +78,9 @@ int main(int argc, char** argv) num_failed++; } } + if (num_failed == 0) + printf("passed\n"); + else + printf("failed %d\n", num_failed); return num_failed; } diff --git a/openmp/runtime/test/worksharing/for/kmp_set_dispatch_buf.c b/openmp/runtime/test/worksharing/for/kmp_set_dispatch_buf.c index 3386618..efcca74 100644 --- a/openmp/runtime/test/worksharing/for/kmp_set_dispatch_buf.c +++ b/openmp/runtime/test/worksharing/for/kmp_set_dispatch_buf.c @@ -3,7 +3,7 @@ // RUN: %libomp-run 1 && %libomp-run 2 && %libomp-run 5 // RUN: %libomp-compile -DMY_SCHEDULE=guided && %libomp-run 7 // RUN: %libomp-run 1 && %libomp-run 2 && %libomp-run 5 -// UNSUPPORTED: clang-11, clang-12, clang-13 +// UNSUPPORTED: clang-11, clang-12 #include #include #include @@ -88,5 +88,9 @@ int main(int argc, char** argv) num_failed++; } } + if (num_failed == 0) + printf("passed\n"); + else + printf("failed %d\n", num_failed); return num_failed; } diff --git a/openmp/runtime/test/worksharing/for/omp_for_schedule_runtime.c b/openmp/runtime/test/worksharing/for/omp_for_schedule_runtime.c index 27a7656..e2d34f1 100644 --- a/openmp/runtime/test/worksharing/for/omp_for_schedule_runtime.c +++ b/openmp/runtime/test/worksharing/for/omp_for_schedule_runtime.c @@ -8,8 +8,8 @@ // RUN: env OMP_SCHEDULE=auto %libomp-run 4 1 // RUN: env OMP_SCHEDULE=trapezoidal %libomp-run 101 1 // RUN: env OMP_SCHEDULE=trapezoidal,13 %libomp-run 101 13 -// RUN: env OMP_SCHEDULE=static_steal %libomp-run 102 1 -// RUN: env OMP_SCHEDULE=static_steal,14 %libomp-run 102 14 +// RUN: env OMP_SCHEDULE=static_steal %libomp-run 2 1 +// RUN: env OMP_SCHEDULE=static_steal,14 %libomp-run 2 14 #include #include diff --git a/openmp/runtime/test/worksharing/for/omp_par_in_loop.c b/openmp/runtime/test/worksharing/for/omp_par_in_loop.c new file mode 100644 index 0000000..d80de5d --- /dev/null +++ b/openmp/runtime/test/worksharing/for/omp_par_in_loop.c @@ -0,0 +1,28 @@ +// RUN: %libomp-compile-and-run +// +#include +#include +#include +#include + +#define TYPE long +#define MAX_ITER (TYPE)((TYPE)1000000) +#define EVERY (TYPE)((TYPE)100000) + +int main(int argc, char* argv[]) { + TYPE x = MAX_ITER; + omp_set_max_active_levels(2); + omp_set_num_threads(2); + #pragma omp parallel for schedule(nonmonotonic:dynamic,1) + for (TYPE i = 0; i < x; i++) { + int tid = omp_get_thread_num(); + omp_set_num_threads(1); + #pragma omp parallel proc_bind(spread) + { + if (i % EVERY == (TYPE)0) + printf("Outer thread %d at iter %ld\n", tid, i); + } + } + printf("passed\n"); + return 0; +} -- 2.7.4