Improve OpenMP threadprivate implementation.

Patch by Terry Wilmarth

Differential Revision: https://reviews.llvm.org/D41914

llvm-svn: 326733
This commit is contained in:
Andrey Churbanov 2018-03-05 18:42:01 +00:00
parent 47f42c2167
commit 9e9333aa8a
3 changed files with 182 additions and 111 deletions

View File

@ -1444,6 +1444,8 @@ typedef void *(*kmpc_cctor_vec)(void *, void *,
/* keeps tracked of threadprivate cache allocations for cleanup later */ /* keeps tracked of threadprivate cache allocations for cleanup later */
typedef struct kmp_cached_addr { typedef struct kmp_cached_addr {
void **addr; /* address of allocated cache */ void **addr; /* address of allocated cache */
void ***compiler_cache; /* pointer to compiler's cache */
void *data; /* pointer to global data */
struct kmp_cached_addr *next; /* pointer to next cached address */ struct kmp_cached_addr *next; /* pointer to next cached address */
} kmp_cached_addr_t; } kmp_cached_addr_t;
@ -3774,6 +3776,8 @@ void kmp_threadprivate_insert_private_data(int gtid, void *pc_addr,
struct private_common *kmp_threadprivate_insert(int gtid, void *pc_addr, struct private_common *kmp_threadprivate_insert(int gtid, void *pc_addr,
void *data_addr, void *data_addr,
size_t pc_size); size_t pc_size);
void __kmp_threadprivate_resize_cache(int newCapacity);
void __kmp_cleanup_threadprivate_caches();
// ompc_, kmpc_ entries moved from omp.h. // ompc_, kmpc_ entries moved from omp.h.
#if KMP_OS_WINDOWS #if KMP_OS_WINDOWS

View File

@ -3508,8 +3508,14 @@ static int __kmp_reclaim_dead_roots(void) {
If any argument is negative, the behavior is undefined. */ If any argument is negative, the behavior is undefined. */
static int __kmp_expand_threads(int nNeed) { static int __kmp_expand_threads(int nNeed) {
int added = 0; int added = 0;
int old_tp_cached; int minimumRequiredCapacity;
int __kmp_actual_max_nth; int newCapacity;
kmp_info_t **newThreads;
kmp_root_t **newRoot;
// All calls to __kmp_expand_threads should be under __kmp_forkjoin_lock, so
// resizing __kmp_threads does not need additional protection if foreign
// threads are present
#if KMP_OS_WINDOWS && !defined KMP_DYNAMIC_LIB #if KMP_OS_WINDOWS && !defined KMP_DYNAMIC_LIB
/* only for Windows static library */ /* only for Windows static library */
@ -3525,91 +3531,64 @@ static int __kmp_expand_threads(int nNeed) {
if (nNeed <= 0) if (nNeed <= 0)
return added; return added;
while (1) { // Note that __kmp_threads_capacity is not bounded by __kmp_max_nth. If
int nTarget; // __kmp_max_nth is set to some value less than __kmp_sys_max_nth by the
int minimumRequiredCapacity; // user via KMP_DEVICE_THREAD_LIMIT, then __kmp_threads_capacity may become
int newCapacity; // > __kmp_max_nth in one of two ways:
kmp_info_t **newThreads; //
kmp_root_t **newRoot; // 1) The initialization thread (gtid = 0) exits. __kmp_threads[0]
// may not be resused by another thread, so we may need to increase
// __kmp_threads_capacity to __kmp_max_nth + 1.
//
// 2) New foreign root(s) are encountered. We always register new foreign
// roots. This may cause a smaller # of threads to be allocated at
// subsequent parallel regions, but the worker threads hang around (and
// eventually go to sleep) and need slots in the __kmp_threads[] array.
//
// Anyway, that is the reason for moving the check to see if
// __kmp_max_nth was exceeded into __kmp_reserve_threads()
// instead of having it performed here. -BB
// Note that __kmp_threads_capacity is not bounded by __kmp_max_nth. If KMP_DEBUG_ASSERT(__kmp_sys_max_nth >= __kmp_threads_capacity);
// __kmp_max_nth is set to some value less than __kmp_sys_max_nth by the
// user via KMP_DEVICE_THREAD_LIMIT, then __kmp_threads_capacity may become
// > __kmp_max_nth in one of two ways:
//
// 1) The initialization thread (gtid = 0) exits. __kmp_threads[0]
// may not be resused by another thread, so we may need to increase
// __kmp_threads_capacity to __kmp_max_nth + 1.
//
// 2) New foreign root(s) are encountered. We always register new foreign
// roots. This may cause a smaller # of threads to be allocated at
// subsequent parallel regions, but the worker threads hang around (and
// eventually go to sleep) and need slots in the __kmp_threads[] array.
//
// Anyway, that is the reason for moving the check to see if
// __kmp_max_nth was exceeded into __kmp_reserve_threads()
// instead of having it performed here. -BB
old_tp_cached = __kmp_tp_cached;
__kmp_actual_max_nth =
old_tp_cached ? __kmp_tp_capacity : __kmp_sys_max_nth;
KMP_DEBUG_ASSERT(__kmp_actual_max_nth >= __kmp_threads_capacity);
/* compute expansion headroom to check if we can expand */ /* compute expansion headroom to check if we can expand */
nTarget = nNeed; if (__kmp_sys_max_nth - __kmp_threads_capacity < nNeed) {
if (__kmp_actual_max_nth - __kmp_threads_capacity < nTarget) { /* possible expansion too small -- give up */
/* possible expansion too small -- give up */ return added;
break;
}
minimumRequiredCapacity = __kmp_threads_capacity + nTarget;
newCapacity = __kmp_threads_capacity;
do {
newCapacity = newCapacity <= (__kmp_actual_max_nth >> 1)
? (newCapacity << 1)
: __kmp_actual_max_nth;
} while (newCapacity < minimumRequiredCapacity);
newThreads = (kmp_info_t **)__kmp_allocate(
(sizeof(kmp_info_t *) + sizeof(kmp_root_t *)) * newCapacity +
CACHE_LINE);
newRoot = (kmp_root_t **)((char *)newThreads +
sizeof(kmp_info_t *) * newCapacity);
KMP_MEMCPY(newThreads, __kmp_threads,
__kmp_threads_capacity * sizeof(kmp_info_t *));
KMP_MEMCPY(newRoot, __kmp_root,
__kmp_threads_capacity * sizeof(kmp_root_t *));
memset(newThreads + __kmp_threads_capacity, 0,
(newCapacity - __kmp_threads_capacity) * sizeof(kmp_info_t *));
memset(newRoot + __kmp_threads_capacity, 0,
(newCapacity - __kmp_threads_capacity) * sizeof(kmp_root_t *));
if (!old_tp_cached && __kmp_tp_cached && newCapacity > __kmp_tp_capacity) {
/* __kmp_tp_cached has changed, i.e. __kmpc_threadprivate_cached has
allocated a threadprivate cache while we were allocating the expanded
array, and our new capacity is larger than the threadprivate cache
capacity, so we should deallocate the expanded arrays and try again.
This is the first check of a double-check pair. */
__kmp_free(newThreads);
continue; /* start over and try again */
}
__kmp_acquire_bootstrap_lock(&__kmp_tp_cached_lock);
if (!old_tp_cached && __kmp_tp_cached && newCapacity > __kmp_tp_capacity) {
/* Same check as above, but this time with the lock so we can be sure if
we can succeed. */
__kmp_release_bootstrap_lock(&__kmp_tp_cached_lock);
__kmp_free(newThreads);
continue; /* start over and try again */
} else {
/* success */
// __kmp_free( __kmp_threads ); // ATT: It leads to crash. Need to be
// investigated.
*(kmp_info_t * *volatile *)&__kmp_threads = newThreads;
*(kmp_root_t * *volatile *)&__kmp_root = newRoot;
added += newCapacity - __kmp_threads_capacity;
*(volatile int *)&__kmp_threads_capacity = newCapacity;
__kmp_release_bootstrap_lock(&__kmp_tp_cached_lock);
break; /* succeeded, so we can exit the loop */
}
} }
minimumRequiredCapacity = __kmp_threads_capacity + nNeed;
newCapacity = __kmp_threads_capacity;
do {
newCapacity = newCapacity <= (__kmp_sys_max_nth >> 1) ? (newCapacity << 1)
: __kmp_sys_max_nth;
} while (newCapacity < minimumRequiredCapacity);
newThreads = (kmp_info_t **)__kmp_allocate(
(sizeof(kmp_info_t *) + sizeof(kmp_root_t *)) * newCapacity + CACHE_LINE);
newRoot =
(kmp_root_t **)((char *)newThreads + sizeof(kmp_info_t *) * newCapacity);
KMP_MEMCPY(newThreads, __kmp_threads,
__kmp_threads_capacity * sizeof(kmp_info_t *));
KMP_MEMCPY(newRoot, __kmp_root,
__kmp_threads_capacity * sizeof(kmp_root_t *));
kmp_info_t **temp_threads = __kmp_threads;
*(kmp_info_t * *volatile *)&__kmp_threads = newThreads;
*(kmp_root_t * *volatile *)&__kmp_root = newRoot;
__kmp_free(temp_threads);
added += newCapacity - __kmp_threads_capacity;
*(volatile int *)&__kmp_threads_capacity = newCapacity;
if (newCapacity > __kmp_tp_capacity) {
__kmp_acquire_bootstrap_lock(&__kmp_tp_cached_lock);
if (__kmp_tp_cached && newCapacity > __kmp_tp_capacity) {
__kmp_threadprivate_resize_cache(newCapacity);
} else { // increase __kmp_tp_capacity to correspond with kmp_threads size
*(volatile int *)&__kmp_tp_capacity = newCapacity;
}
__kmp_release_bootstrap_lock(&__kmp_tp_cached_lock);
}
return added; return added;
} }
@ -7333,6 +7312,8 @@ void __kmp_cleanup(void) {
__kmp_init_serial = FALSE; __kmp_init_serial = FALSE;
} }
__kmp_cleanup_threadprivate_caches();
for (f = 0; f < __kmp_threads_capacity; f++) { for (f = 0; f < __kmp_threads_capacity; f++) {
if (__kmp_root[f] != NULL) { if (__kmp_root[f] != NULL) {
__kmp_free(__kmp_root[f]); __kmp_free(__kmp_root[f]);

View File

@ -594,6 +594,13 @@ void *__kmpc_threadprivate(ident_t *loc, kmp_int32 global_tid, void *data,
return ret; return ret;
} }
static kmp_cached_addr_t *__kmp_find_cache(void *data) {
kmp_cached_addr_t *ptr = __kmp_threadpriv_cache_list;
while (ptr && ptr->data != data)
ptr = ptr->next;
return ptr;
}
/*! /*!
@ingroup THREADPRIVATE @ingroup THREADPRIVATE
@param loc source location information @param loc source location information
@ -620,35 +627,40 @@ __kmpc_threadprivate_cached(ident_t *loc,
if (TCR_PTR(*cache) == 0) { if (TCR_PTR(*cache) == 0) {
__kmp_acquire_bootstrap_lock(&__kmp_tp_cached_lock); __kmp_acquire_bootstrap_lock(&__kmp_tp_cached_lock);
__kmp_tp_cached = 1; // Compiler often passes in NULL cache, even if it's already been created
__kmp_release_bootstrap_lock(&__kmp_tp_cached_lock);
void **my_cache; void **my_cache;
KMP_ITT_IGNORE(
my_cache = (void **)__kmp_allocate(
sizeof(void *) * __kmp_tp_capacity + sizeof(kmp_cached_addr_t)););
// No need to zero the allocated memory; __kmp_allocate does that.
KC_TRACE(
50,
("__kmpc_threadprivate_cached: T#%d allocated cache at address %p\n",
global_tid, my_cache));
/* TODO: free all this memory in __kmp_common_destroy using
* __kmp_threadpriv_cache_list */
/* Add address of mycache to linked list for cleanup later */
kmp_cached_addr_t *tp_cache_addr; kmp_cached_addr_t *tp_cache_addr;
// Look for an existing cache
tp_cache_addr = (kmp_cached_addr_t *)&my_cache[__kmp_tp_capacity]; tp_cache_addr = __kmp_find_cache(data);
tp_cache_addr->addr = my_cache; if (!tp_cache_addr) { // Cache was never created; do it now
tp_cache_addr->next = __kmp_threadpriv_cache_list; __kmp_tp_cached = 1;
__kmp_threadpriv_cache_list = tp_cache_addr; KMP_ITT_IGNORE(my_cache = (void **)__kmp_allocate(
sizeof(void *) * __kmp_tp_capacity +
sizeof(kmp_cached_addr_t)););
// No need to zero the allocated memory; __kmp_allocate does that.
KC_TRACE(50, ("__kmpc_threadprivate_cached: T#%d allocated cache at "
"address %p\n",
global_tid, my_cache));
/* TODO: free all this memory in __kmp_common_destroy using
* __kmp_threadpriv_cache_list */
/* Add address of mycache to linked list for cleanup later */
tp_cache_addr = (kmp_cached_addr_t *)&my_cache[__kmp_tp_capacity];
tp_cache_addr->addr = my_cache;
tp_cache_addr->data = data;
tp_cache_addr->compiler_cache = cache;
tp_cache_addr->next = __kmp_threadpriv_cache_list;
__kmp_threadpriv_cache_list = tp_cache_addr;
} else { // A cache was already created; use it
my_cache = tp_cache_addr->addr;
tp_cache_addr->compiler_cache = cache;
}
KMP_MB(); KMP_MB();
TCW_PTR(*cache, my_cache); TCW_PTR(*cache, my_cache);
__kmp_release_bootstrap_lock(&__kmp_tp_cached_lock);
KMP_MB(); KMP_MB();
} }
__kmp_release_lock(&__kmp_global_lock, global_tid); __kmp_release_lock(&__kmp_global_lock, global_tid);
} }
@ -661,10 +673,68 @@ __kmpc_threadprivate_cached(ident_t *loc,
KC_TRACE(10, KC_TRACE(10,
("__kmpc_threadprivate_cached: T#%d exiting; return value = %p\n", ("__kmpc_threadprivate_cached: T#%d exiting; return value = %p\n",
global_tid, ret)); global_tid, ret));
return ret; return ret;
} }
// This function should only be called when both __kmp_tp_cached_lock and
// kmp_forkjoin_lock are held.
void __kmp_threadprivate_resize_cache(int newCapacity) {
KC_TRACE(10, ("__kmp_threadprivate_resize_cache: called with size: %d\n",
newCapacity));
kmp_cached_addr_t *ptr = __kmp_threadpriv_cache_list;
while (ptr) {
if (ptr->data) { // this location has an active cache; resize it
void **my_cache;
KMP_ITT_IGNORE(my_cache =
(void **)__kmp_allocate(sizeof(void *) * newCapacity +
sizeof(kmp_cached_addr_t)););
// No need to zero the allocated memory; __kmp_allocate does that.
KC_TRACE(50, ("__kmp_threadprivate_resize_cache: allocated cache at %p\n",
my_cache));
// Now copy old cache into new cache
void **old_cache = ptr->addr;
for (int i = 0; i < __kmp_tp_capacity; ++i) {
my_cache[i] = old_cache[i];
}
// Add address of new my_cache to linked list for cleanup later
kmp_cached_addr_t *tp_cache_addr;
tp_cache_addr = (kmp_cached_addr_t *)&my_cache[newCapacity];
tp_cache_addr->addr = my_cache;
tp_cache_addr->data = ptr->data;
tp_cache_addr->compiler_cache = ptr->compiler_cache;
tp_cache_addr->next = __kmp_threadpriv_cache_list;
__kmp_threadpriv_cache_list = tp_cache_addr;
// Copy new cache to compiler's location: We can copy directly
// to (*compiler_cache) if compiler guarantees it will keep
// using the same location for the cache. This is not yet true
// for some compilers, in which case we have to check if
// compiler_cache is still pointing at old cache, and if so, we
// can point it at the new cache with an atomic compare&swap
// operation. (Old method will always work, but we should shift
// to new method (commented line below) when Intel and Clang
// compilers use new method.)
(void)KMP_COMPARE_AND_STORE_PTR(tp_cache_addr->compiler_cache, old_cache,
my_cache);
//TCW_PTR(*(tp_cache_addr->compiler_cache), my_cache);
// If the store doesn't happen here, the compiler's old behavior will
// inevitably call __kmpc_threadprivate_cache with a new location for the
// cache, and that function will store the resized cache there at that
// point.
// Nullify old cache's data pointer so we skip it next time
ptr->data = NULL;
}
ptr = ptr->next;
}
// After all caches are resized, update __kmp_tp_capacity to the new size
*(volatile int *)&__kmp_tp_capacity = newCapacity;
}
/*! /*!
@ingroup THREADPRIVATE @ingroup THREADPRIVATE
@param loc source location information @param loc source location information
@ -701,14 +771,30 @@ void __kmpc_threadprivate_register_vec(ident_t *loc, void *data,
d_tn->dt.dtorv = dtor; d_tn->dt.dtorv = dtor;
d_tn->is_vec = TRUE; d_tn->is_vec = TRUE;
d_tn->vec_len = (size_t)vector_length; d_tn->vec_len = (size_t)vector_length;
/* // d_tn->obj_init = 0; // AC: __kmp_allocate zeroes the memory
d_tn->obj_init = 0; // AC: commented out because __kmp_allocate // d_tn->pod_init = 0;
zeroes the memory
d_tn->pod_init = 0;
*/
lnk_tn = &(__kmp_threadprivate_d_table.data[KMP_HASH(data)]); lnk_tn = &(__kmp_threadprivate_d_table.data[KMP_HASH(data)]);
d_tn->next = *lnk_tn; d_tn->next = *lnk_tn;
*lnk_tn = d_tn; *lnk_tn = d_tn;
} }
} }
void __kmp_cleanup_threadprivate_caches() {
kmp_cached_addr_t *ptr = __kmp_threadpriv_cache_list;
while (ptr) {
void **cache = ptr->addr;
__kmp_threadpriv_cache_list = ptr->next;
if (*ptr->compiler_cache)
*ptr->compiler_cache = NULL;
ptr->compiler_cache = NULL;
ptr->data = NULL;
ptr->addr = NULL;
ptr->next = NULL;
// Threadprivate data pointed at by cache entries are destroyed at end of
// __kmp_launch_thread with __kmp_common_destroy_gtid.
__kmp_free(cache); // implicitly frees ptr too
ptr = __kmp_threadpriv_cache_list;
}
}