[OpenMP] Fix thread_limits to work properly for teams construct

The thread-limit-var and omp_get_thread_limit API was not perfectly handled for
teams construct. Now, when modified by thread_limit clause, omp_get_thread_limit
reports the correct value. In addition, the value is restored when leaving the
teams construct to what it was in the encountering context.

This is done partly by creating the notion of a Contention Group root (CG root)
that keeps track of the thread at the root of each separate CG, the
thread-limit-var associated with the CG, and associated counter of active
threads within the contention group.

thread-limits are passed from master to worker threads via an entry in the ICV
data structure. When a "contention group switch" occurs, a new CG root record is
made and passed from master to worker. A thread could potentially have several
CG root records if it encounters multiple nested teams constructs (but at the
moment the spec doesn't allow for nested teams, so the most one could have
currently is 2). The master of the teams masters gets the thread-limit clause
value stored to its local ICV structure, and the other teams masters copy it
from the master. The thread-limit is set from that ICV copy and restored to the
ICV copy when entering and leaving the teams construct.

This change also fixes a bug when the top-level teams construct team gets
reused, and OMP_DYNAMIC was true, which can cause the expected size of this team
to be smaller than what was actually allocated. The fix updates the size of the
team after its threads were reserved.

Patch by Terry Wilmarth

Differential Revision: https://reviews.llvm.org/D56804

llvm-svn: 353747
This commit is contained in:
Jonathan Peyton 2019-02-11 21:04:23 +00:00
parent 24e0af6906
commit 65ebfeecf8
5 changed files with 198 additions and 26 deletions

View File

@ -10,6 +10,7 @@
include(CheckCCompilerFlag)
include(CheckCSourceCompiles)
include(CheckCXXSourceCompiles)
include(CheckCXXCompilerFlag)
include(CheckIncludeFile)
include(CheckLibraryExists)
@ -38,6 +39,42 @@ function(libomp_check_version_symbols retval)
file(REMOVE ${CMAKE_CURRENT_BINARY_DIR}/__version_script.txt)
endfunction()
function(libomp_check_attribute_fallthrough retval)
set(fallthroughs "[[fallthrough]]" "[[clang::fallthrough]]" "__attribute__((__fallthrough__))")
foreach(fallthrough IN LISTS fallthroughs)
string(MAKE_C_IDENTIFIER "${fallthrough}" test_name)
set(source_code
"#include <stdio.h>
enum class foo_e { zero, one, two, three, four };
int main(int argc, char** argv) {
foo_e foo;
if (argc == 0) foo = foo_e::zero;
else if (argc == 1) foo = foo_e::one;
else if (argc == 2) foo = foo_e::two;
else if (argc == 3) foo = foo_e::three;
else if (argc == 4) foo = foo_e::four;
switch (foo) {
case foo_e::zero:
${fallthrough}
case foo_e::one:
return 1;
case foo_e::two:
return 2;
case foo_e::three:
return 3;
case foo_e::four:
return 4;
}
return 0;
}")
check_cxx_source_compiles("${source_code}" ${test_name})
if(${test_name})
set(${retval} ${fallthrough} PARENT_SCOPE)
break()
endif()
endforeach()
endfunction()
# Includes the architecture flag in both compile and link phase
function(libomp_check_architecture_flag flag retval)
set(CMAKE_REQUIRED_FLAGS "${flag}")
@ -172,6 +209,7 @@ endif()
# Checking features
# Check if version symbol assembler directives are supported
libomp_check_version_symbols(LIBOMP_HAVE_VERSION_SYMBOLS)
libomp_check_attribute_fallthrough(LIBOMP_FALLTHROUGH)
# Check if quad precision types are available
if(CMAKE_C_COMPILER_ID STREQUAL "GNU")

View File

@ -1847,6 +1847,7 @@ typedef struct kmp_internal_control {
#endif
int nproc; /* internal control for #threads for next parallel region (per
thread) */
int thread_limit; /* internal control for thread-limit-var */
int max_active_levels; /* internal control for max_active_levels */
kmp_r_sched_t
sched; /* internal control for runtime schedule {sched,chunk} pair */
@ -2077,6 +2078,9 @@ typedef struct kmp_local {
#define set__nproc(xthread, xval) \
(((xthread)->th.th_current_task->td_icvs.nproc) = (xval))
#define set__thread_limit(xthread, xval) \
(((xthread)->th.th_current_task->td_icvs.thread_limit) = (xval))
#define set__max_active_levels(xthread, xval) \
(((xthread)->th.th_current_task->td_icvs.max_active_levels) = (xval))
@ -2458,6 +2462,26 @@ typedef struct kmp_teams_size {
} kmp_teams_size_t;
#endif
// This struct stores a thread that acts as a "root" for a contention
// group. Contention groups are rooted at kmp_root threads, but also at
// each master thread of each team created in the teams construct.
// This struct therefore also stores a thread_limit associated with
// that contention group, and a counter to track the number of threads
// active in that contention group. Each thread has a list of these: CG
// root threads have an entry in their list in which cg_root refers to
// the thread itself, whereas other workers in the CG will have a
// single entry where cg_root is same as the entry containing their CG
// root. When a thread encounters a teams construct, it will add a new
// entry to the front of its list, because it now roots a new CG.
typedef struct kmp_cg_root {
kmp_info_p *cg_root; // "root" thread for a contention group
// The CG root's limit comes from OMP_THREAD_LIMIT for root threads, or
// thread_limit clause for teams masters
kmp_int32 cg_thread_limit;
kmp_int32 cg_nthreads; // Count of active threads in CG rooted at cg_root
struct kmp_cg_root *up; // pointer to higher level CG root in list
} kmp_cg_root_t;
// OpenMP thread data structures
typedef struct KMP_ALIGN_CACHE kmp_base_info {
@ -2605,6 +2629,7 @@ typedef struct KMP_ALIGN_CACHE kmp_base_info {
#if KMP_OS_UNIX
std::atomic<bool> th_blocking;
#endif
kmp_cg_root_t *th_cg_roots; // list of cg_roots associated with this thread
} kmp_base_info_t;
typedef union KMP_ALIGN_CACHE kmp_info {
@ -2796,7 +2821,6 @@ typedef struct kmp_base_root {
kmp_lock_t r_begin_lock;
volatile int r_begin;
int r_blocktime; /* blocktime for this root and descendants */
int r_cg_nthreads; // count of active threads in a contention group
} kmp_base_root_t;
typedef union KMP_ALIGN_CACHE kmp_root {

View File

@ -425,6 +425,19 @@ void __kmpc_fork_teams(ident_t *loc, kmp_int32 argc, kmpc_micro microtask,
#endif
);
// Pop current CG root off list
KMP_DEBUG_ASSERT(this_thr->th.th_cg_roots);
kmp_cg_root_t *tmp = this_thr->th.th_cg_roots;
this_thr->th.th_cg_roots = tmp->up;
KA_TRACE(100, ("__kmpc_fork_teams: Thread %p popping node %p and moving up"
" to node %p. cg_nthreads was %d\n",
this_thr, tmp, this_thr->th.th_cg_roots, tmp->cg_nthreads));
__kmp_free(tmp);
// Restore current task's thread_limit from CG root
KMP_DEBUG_ASSERT(this_thr->th.th_cg_roots);
this_thr->th.th_current_task->td_icvs.thread_limit =
this_thr->th.th_cg_roots->cg_thread_limit;
this_thr->th.th_teams_microtask = NULL;
this_thr->th.th_teams_level = 0;
*(kmp_int64 *)(&this_thr->th.th_teams_size) = 0L;

View File

@ -734,11 +734,15 @@ int FTN_STDCALL KMP_EXPAND_NAME(FTN_GET_THREAD_LIMIT)(void) {
#ifdef KMP_STUB
return 1; // TO DO: clarify whether it returns 1 or 0?
#else
int gtid;
kmp_info_t *thread;
if (!__kmp_init_serial) {
__kmp_serial_initialize();
}
/* global ICV */
return __kmp_cg_max_nth;
gtid = __kmp_entry_gtid();
thread = __kmp_threads[gtid];
return thread->th.th_current_task->td_icvs.thread_limit;
#endif
}

View File

@ -827,6 +827,7 @@ static int __kmp_reserve_threads(kmp_root_t *root, kmp_team_t *parent_team,
int new_nthreads;
KMP_DEBUG_ASSERT(__kmp_init_serial);
KMP_DEBUG_ASSERT(root && parent_team);
kmp_info_t *this_thr = parent_team->t.t_threads[master_tid];
// If dyn-var is set, dynamically adjust the number of desired threads,
// according to the method specified by dynamic_mode.
@ -916,10 +917,12 @@ static int __kmp_reserve_threads(kmp_root_t *root, kmp_team_t *parent_team,
}
// Respect OMP_THREAD_LIMIT
if (root->r.r_cg_nthreads + new_nthreads -
int cg_nthreads = this_thr->th.th_cg_roots->cg_nthreads;
int max_cg_threads = this_thr->th.th_cg_roots->cg_thread_limit;
if (cg_nthreads + new_nthreads -
(root->r.r_active ? 1 : root->r.r_hot_team->t.t_nproc) >
__kmp_cg_max_nth) {
int tl_nthreads = __kmp_cg_max_nth - root->r.r_cg_nthreads +
max_cg_threads) {
int tl_nthreads = max_cg_threads - cg_nthreads +
(root->r.r_active ? 1 : root->r.r_hot_team->t.t_nproc);
if (tl_nthreads <= 0) {
tl_nthreads = 1;
@ -2362,7 +2365,6 @@ void __kmp_join_call(ident_t *loc, int gtid
kmp_info_t *master_th;
kmp_root_t *root;
int master_active;
int i;
KA_TRACE(20, ("__kmp_join_call: enter T#%d\n", gtid));
@ -2485,21 +2487,24 @@ void __kmp_join_call(ident_t *loc, int gtid
team->t.t_active_level--;
KMP_ATOMIC_DEC(&root->r.r_in_parallel);
/* Restore number of threads in the team if needed */
// Restore number of threads in the team if needed. This code relies on
// the proper adjustment of th_teams_size.nth after the fork in
// __kmp_teams_master on each teams master in the case that
// __kmp_reserve_threads reduced it.
if (master_th->th.th_team_nproc < master_th->th.th_teams_size.nth) {
int old_num = master_th->th.th_team_nproc;
int new_num = master_th->th.th_teams_size.nth;
kmp_info_t **other_threads = team->t.t_threads;
team->t.t_nproc = new_num;
for (i = 0; i < old_num; ++i) {
for (int i = 0; i < old_num; ++i) {
other_threads[i]->th.th_team_nproc = new_num;
}
// Adjust states of non-used threads of the team
for (i = old_num; i < new_num; ++i) {
for (int i = old_num; i < new_num; ++i) {
// Re-initialize thread's barrier data.
int b;
KMP_DEBUG_ASSERT(other_threads[i]);
kmp_balign_t *balign = other_threads[i]->th.th_bar;
for (b = 0; b < bs_last_barrier; ++b) {
for (int b = 0; b < bs_last_barrier; ++b) {
balign[b].bb.b_arrived = team->t.t_bar[b].b_arrived;
KMP_DEBUG_ASSERT(balign[b].bb.wait_flag != KMP_BARRIER_PARENT_FLAG);
#if USE_DEBUGGER
@ -3177,6 +3182,7 @@ static kmp_internal_control_t __kmp_get_global_icvs(void) {
__kmp_dflt_team_nth, // int nproc; //internal control for # of threads for
// next parallel region (per thread)
// (use a max ub on value if __kmp_parallel_initialize not called yet)
__kmp_cg_max_nth, // int thread_limit;
__kmp_dflt_max_active_levels, // int max_active_levels; //internal control
// for max_active_levels
r_sched, // kmp_r_sched_t sched; //internal control for runtime schedule
@ -3220,7 +3226,6 @@ static void __kmp_initialize_root(kmp_root_t *root) {
root->r.r_in_parallel = 0;
root->r.r_blocktime = __kmp_dflt_blocktime;
root->r.r_nested = __kmp_dflt_nested;
root->r.r_cg_nthreads = 1;
/* setup the root team for this task */
/* allocate the root team structure */
@ -3861,6 +3866,16 @@ int __kmp_register_root(int initial_thread) {
root_thread->th.th_prev_num_threads = 1;
#endif
kmp_cg_root_t *tmp = (kmp_cg_root_t *)__kmp_allocate(sizeof(kmp_cg_root_t));
tmp->cg_root = root_thread;
tmp->cg_thread_limit = __kmp_cg_max_nth;
tmp->cg_nthreads = 1;
KA_TRACE(100, ("__kmp_register_root: Thread %p created node %p with"
" cg_nthreads init to 1\n",
root_thread, tmp));
tmp->up = NULL;
root_thread->th.th_cg_roots = tmp;
__kmp_root_counter++;
#if OMPT_SUPPORT
@ -3977,7 +3992,11 @@ static int __kmp_reset_root(int gtid, kmp_root_t *root) {
TCW_4(__kmp_nth,
__kmp_nth - 1); // __kmp_reap_thread will decrement __kmp_all_nth.
root->r.r_cg_nthreads--;
root->r.r_uber_thread->th.th_cg_roots->cg_nthreads--;
KA_TRACE(100, ("__kmp_reset_root: Thread %p decrement cg_nthreads on node %p"
" to %d\n",
root->r.r_uber_thread, root->r.r_uber_thread->th.th_cg_roots,
root->r.r_uber_thread->th.th_cg_roots->cg_nthreads));
__kmp_reap_thread(root->r.r_uber_thread, 1);
@ -4152,6 +4171,22 @@ static void __kmp_initialize_info(kmp_info_t *this_thr, kmp_team_t *team,
this_thr->th.th_pri_head = NULL;
}
if (this_thr != master && // Master's CG root is initialized elsewhere
this_thr->th.th_cg_roots != master->th.th_cg_roots) { // CG root not set
// Make new thread's CG root same as master's
KMP_DEBUG_ASSERT(master->th.th_cg_roots);
this_thr->th.th_cg_roots = master->th.th_cg_roots;
// Increment new thread's CG root's counter to add the new thread
this_thr->th.th_cg_roots->cg_nthreads++;
KA_TRACE(100, ("__kmp_initialize_info: Thread %p increment cg_nthreads on"
" node %p of thread %p to %d\n",
this_thr, this_thr->th.th_cg_roots,
this_thr->th.th_cg_roots->cg_root,
this_thr->th.th_cg_roots->cg_nthreads));
this_thr->th.th_current_task->td_icvs.thread_limit =
this_thr->th.th_cg_roots->cg_thread_limit;
}
/* Initialize dynamic dispatch */
{
volatile kmp_disp_t *dispatch = this_thr->th.th_dispatch;
@ -4233,7 +4268,6 @@ kmp_info_t *__kmp_allocate_thread(kmp_root_t *root, kmp_team_t *team,
/* first, try to get one from the thread pool */
if (__kmp_thread_pool) {
new_thr = CCAST(kmp_info_t *, __kmp_thread_pool);
__kmp_thread_pool = (volatile kmp_info_t *)new_thr->th.th_next_pool;
if (new_thr == __kmp_thread_pool_insert_pt) {
@ -4256,7 +4290,6 @@ kmp_info_t *__kmp_allocate_thread(kmp_root_t *root, kmp_team_t *team,
KMP_DEBUG_ASSERT(new_thr->th.th_serial_team);
TCW_4(__kmp_nth, __kmp_nth + 1);
root->r.r_cg_nthreads++;
new_thr->th.th_task_state = 0;
new_thr->th.th_task_state_top = 0;
@ -4412,8 +4445,6 @@ kmp_info_t *__kmp_allocate_thread(kmp_root_t *root, kmp_team_t *team,
__kmp_all_nth++;
__kmp_nth++;
root->r.r_cg_nthreads++;
// if __kmp_adjust_gtid_mode is set, then we use method #1 (sp search) for low
// numbers of procs, and method #2 (keyed API call) for higher numbers.
if (__kmp_adjust_gtid_mode) {
@ -4965,7 +4996,7 @@ __kmp_allocate_team(kmp_root_t *root, int new_nproc, int max_nproc,
#endif
// Optimization to use a "hot" team
if (use_hot_team && new_nproc > 1) {
KMP_DEBUG_ASSERT(new_nproc == max_nproc);
KMP_DEBUG_ASSERT(new_nproc <= max_nproc);
#if KMP_NESTED_HOT_TEAMS
team = hot_teams[level].hot_team;
#else
@ -5071,10 +5102,11 @@ __kmp_allocate_team(kmp_root_t *root, int new_nproc, int max_nproc,
__kmp_reinitialize_team(team, new_icvs,
root->r.r_uber_thread->th.th_ident);
/* update the remaining threads */
// Update remaining threads
for (f = 0; f < new_nproc; ++f) {
team->t.t_threads[f]->th.th_team_nproc = new_nproc;
}
// restore the current task state of the master thread: should be the
// implicit task
KF_TRACE(10, ("__kmp_allocate_team: T#%d, this_thread=%p team=%p\n", 0,
@ -5202,6 +5234,7 @@ __kmp_allocate_team(kmp_root_t *root, int new_nproc, int max_nproc,
for (f = 0; f < team->t.t_nproc; ++f)
__kmp_initialize_info(team->t.t_threads[f], team, f,
__kmp_gtid_from_tid(f, team));
if (level) { // set th_task_state for new threads in nested hot team
// __kmp_initialize_info() no longer zeroes th_task_state, so we should
// only need to set the th_task_state for the new threads. th_task_state
@ -5490,8 +5523,8 @@ void __kmp_free_team(kmp_root_t *root,
for (tt_idx = 0; tt_idx < 2; ++tt_idx) {
kmp_task_team_t *task_team = team->t.t_task_team[tt_idx];
if (task_team != NULL) {
for (f = 0; f < team->t.t_nproc;
++f) { // Have all threads unref task teams
for (f = 0; f < team->t.t_nproc; ++f) { // threads unref task teams
KMP_DEBUG_ASSERT(team->t.t_threads[f]);
team->t.t_threads[f]->th.th_task_team = NULL;
}
KA_TRACE(
@ -5522,6 +5555,29 @@ void __kmp_free_team(kmp_root_t *root,
/* TODO limit size of team pool, call reap_team if pool too large */
team->t.t_next_pool = CCAST(kmp_team_t *, __kmp_team_pool);
__kmp_team_pool = (volatile kmp_team_t *)team;
} else { // Check if team was created for the masters in a teams construct
// See if first worker is a CG root
KMP_DEBUG_ASSERT(team->t.t_threads[1] &&
team->t.t_threads[1]->th.th_cg_roots);
if (team->t.t_threads[1]->th.th_cg_roots->cg_root == team->t.t_threads[1]) {
// Clean up the CG root nodes on workers so that this team can be re-used
for (f = 1; f < team->t.t_nproc; ++f) {
kmp_info_t *thr = team->t.t_threads[f];
KMP_DEBUG_ASSERT(thr && thr->th.th_cg_roots &&
thr->th.th_cg_roots->cg_root == thr);
// Pop current CG root off list
kmp_cg_root_t *tmp = thr->th.th_cg_roots;
thr->th.th_cg_roots = tmp->up;
KA_TRACE(100, ("__kmp_free_team: Thread %p popping node %p and moving"
" up to node %p. cg_nthreads was %d\n",
thr, tmp, thr->th.th_cg_roots, tmp->cg_nthreads));
__kmp_free(tmp);
// Restore current task's thread_limit from CG root
if (thr->th.th_cg_roots)
thr->th.th_current_task->td_icvs.thread_limit =
thr->th.th_cg_roots->cg_thread_limit;
}
}
}
KMP_MB();
@ -5577,7 +5633,6 @@ kmp_team_t *__kmp_reap_team(kmp_team_t *team) {
void __kmp_free_thread(kmp_info_t *this_th) {
int gtid;
kmp_info_t **scan;
kmp_root_t *root = this_th->th.th_root;
KA_TRACE(20, ("__kmp_free_thread: T#%d putting T#%d back on free pool.\n",
__kmp_get_gtid(), this_th->th.th_info.ds.ds_gtid));
@ -5602,6 +5657,26 @@ void __kmp_free_thread(kmp_info_t *this_th) {
TCW_PTR(this_th->th.th_root, NULL);
TCW_PTR(this_th->th.th_dispatch, NULL); /* NOT NEEDED */
while (this_th->th.th_cg_roots) {
this_th->th.th_cg_roots->cg_nthreads--;
KA_TRACE(100, ("__kmp_free_thread: Thread %p decrement cg_nthreads on node"
" %p of thread %p to %d\n",
this_th, this_th->th.th_cg_roots,
this_th->th.th_cg_roots->cg_root,
this_th->th.th_cg_roots->cg_nthreads));
kmp_cg_root_t *tmp = this_th->th.th_cg_roots;
if (tmp->cg_root == this_th) { // Thread is a cg_root
KMP_DEBUG_ASSERT(tmp->cg_nthreads == 0);
KA_TRACE(
5, ("__kmp_free_thread: Thread %p freeing node %p\n", this_th, tmp));
this_th->th.th_cg_roots = tmp->up;
__kmp_free(tmp);
} else { // Worker thread
this_th->th.th_cg_roots = NULL;
break;
}
}
/* If the implicit task assigned to this thread can be used by other threads
* -> multiple threads can share the data and try to free the task at
* __kmp_reap_thread at exit. This duplicate use of the task data can happen
@ -5645,7 +5720,6 @@ void __kmp_free_thread(kmp_info_t *this_th) {
__kmp_thread_pool_nth++;
TCW_4(__kmp_nth, __kmp_nth - 1);
root->r.r_cg_nthreads--;
#ifdef KMP_ADJUST_BLOCKTIME
/* Adjust blocktime back to user setting or default if necessary */
@ -7102,6 +7176,19 @@ void __kmp_teams_master(int gtid) {
KMP_DEBUG_ASSERT(thr->th.th_set_nproc);
KA_TRACE(20, ("__kmp_teams_master: T#%d, Tid %d, microtask %p\n", gtid,
__kmp_tid_from_gtid(gtid), thr->th.th_teams_microtask));
// This thread is a new CG root. Set up the proper variables.
kmp_cg_root_t *tmp = (kmp_cg_root_t *)__kmp_allocate(sizeof(kmp_cg_root_t));
tmp->cg_root = thr; // Make thr the CG root
// Init to thread limit that was stored when league masters were forked
tmp->cg_thread_limit = thr->th.th_current_task->td_icvs.thread_limit;
tmp->cg_nthreads = 1; // Init counter to one active thread, this one
KA_TRACE(100, ("__kmp_teams_master: Thread %p created node %p and init"
" cg_threads to 1\n",
thr, tmp));
tmp->up = thr->th.th_cg_roots;
thr->th.th_cg_roots = tmp;
// Launch league of teams now, but not let workers execute
// (they hang on fork barrier until next parallel)
#if INCLUDE_SSC_MARKS
@ -7113,7 +7200,9 @@ void __kmp_teams_master(int gtid) {
#if INCLUDE_SSC_MARKS
SSC_MARK_JOINING();
#endif
// If the team size was reduced from the limit, set it to the new size
if (thr->th.th_team_nproc < thr->th.th_teams_size.nth)
thr->th.th_teams_size.nth = thr->th.th_team_nproc;
// AC: last parameter "1" eliminates join barrier which won't work because
// worker threads are in a fork barrier waiting for more parallel regions
__kmp_join_call(loc, gtid
@ -7187,10 +7276,14 @@ void __kmp_push_num_teams(ident_t *id, int gtid, int num_teams,
num_threads = __kmp_teams_max_nth / num_teams;
}
} else {
// This thread will be the master of the league masters
// Store new thread limit; old limit is saved in th_cg_roots list
thr->th.th_current_task->td_icvs.thread_limit = num_threads;
if (num_teams * num_threads > __kmp_teams_max_nth) {
int new_threads = __kmp_teams_max_nth / num_teams;
if (!__kmp_reserve_warn) { // user asked for too many threads
__kmp_reserve_warn = 1; // that conflicts with KMP_TEAMS_THREAD_LIMIT
__kmp_reserve_warn = 1; // conflicts with KMP_TEAMS_THREAD_LIMIT
__kmp_msg(kmp_ms_warning,
KMP_MSG(CantFormThrTeam, num_threads, new_threads),
KMP_HNT(Unset_ALL_THREADS), __kmp_msg_null);