[OpenMP][Offloading] Added support for multiple streams so that multiple kernels can be executed concurrently

Reviewed By: jdoerfert

Differential Revision: https://reviews.llvm.org/D74145
This commit is contained in:
Johannes Doerfert 2020-02-11 22:06:32 -06:00
parent cd5b308b82
commit a5153dbc36
2 changed files with 141 additions and 8 deletions

View File

@ -10,10 +10,12 @@
//
//===----------------------------------------------------------------------===//
#include <atomic>
#include <cassert>
#include <cstddef>
#include <cuda.h>
#include <list>
#include <memory>
#include <string>
#include <vector>
@ -90,11 +92,13 @@ std::list<KernelTy> KernelsList;
/// Class containing all the device information.
class RTLDeviceInfoTy {
std::vector<std::list<FuncOrGblEntryTy>> FuncGblEntries;
std::vector<std::unique_ptr<std::atomic_uint>> NextStreamId;
public:
int NumberOfDevices;
std::vector<CUmodule> Modules;
std::vector<CUcontext> Contexts;
std::vector<std::vector<CUstream>> Streams;
// Device properties
std::vector<int> ThreadsPerBlock;
@ -108,6 +112,7 @@ public:
// OpenMP Environment properties
int EnvNumTeams;
int EnvTeamLimit;
int EnvNumStreams;
// OpenMP Requires Flags
int64_t RequiresFlags;
@ -173,6 +178,15 @@ public:
E.Table.EntriesBegin = E.Table.EntriesEnd = 0;
}
// Get the next stream on a given device in a round robin manner
CUstream &getNextStream(const int DeviceId) {
assert(DeviceId >= 0 &&
static_cast<size_t>(DeviceId) < NextStreamId.size() &&
"Unexpected device id!");
const unsigned int Id = NextStreamId[DeviceId]->fetch_add(1);
return Streams[DeviceId][Id % EnvNumStreams];
}
RTLDeviceInfoTy() {
#ifdef OMPTARGET_DEBUG
if (char *envStr = getenv("LIBOMPTARGET_DEBUG")) {
@ -205,6 +219,8 @@ public:
FuncGblEntries.resize(NumberOfDevices);
Contexts.resize(NumberOfDevices);
Streams.resize(NumberOfDevices);
NextStreamId.resize(NumberOfDevices);
ThreadsPerBlock.resize(NumberOfDevices);
BlocksPerGrid.resize(NumberOfDevices);
WarpSize.resize(NumberOfDevices);
@ -229,6 +245,23 @@ public:
EnvNumTeams = -1;
}
// By default let's create 256 streams per device
EnvNumStreams = 256;
envStr = getenv("LIBOMPTARGET_NUM_STREAMS");
if (envStr) {
EnvNumStreams = std::stoi(envStr);
}
// Initialize streams for each device
for (std::vector<CUstream> &S : Streams) {
S.resize(EnvNumStreams);
}
// Initialize the next stream id
for (std::unique_ptr<std::atomic_uint> &Ptr : NextStreamId) {
Ptr = std::make_unique<std::atomic_uint>(0);
}
// Default state.
RequiresFlags = OMP_REQ_UNDEFINED;
}
@ -244,6 +277,24 @@ public:
}
}
// Destroy streams before contexts
for (int I = 0; I < NumberOfDevices; ++I) {
CUresult err = cuCtxSetCurrent(Contexts[I]);
if (err != CUDA_SUCCESS) {
DP("Error when setting current CUDA context\n");
CUDA_ERR_STRING(err);
}
for (auto &S : Streams[I])
if (S) {
err = cuStreamDestroy(S);
if (err != CUDA_SUCCESS) {
DP("Error when destroying CUDA stream\n");
CUDA_ERR_STRING(err);
}
}
}
// Destroy contexts
for (auto &ctx : Contexts)
if (ctx) {
@ -294,6 +345,20 @@ int32_t __tgt_rtl_init_device(int32_t device_id) {
return OFFLOAD_FAIL;
}
err = cuCtxSetCurrent(DeviceInfo.Contexts[device_id]);
if (err != CUDA_SUCCESS) {
DP("Error when setting current CUDA context\n");
CUDA_ERR_STRING(err);
}
for (CUstream &Stream : DeviceInfo.Streams[device_id]) {
err = cuStreamCreate(&Stream, CU_STREAM_NON_BLOCKING);
if (err != CUDA_SUCCESS) {
DP("Error when creating CUDA stream\n");
CUDA_ERR_STRING(err);
}
}
// Query attributes to determine number of threads/block and blocks/grid.
int maxGridDimX;
err = cuDeviceGetAttribute(&maxGridDimX, CU_DEVICE_ATTRIBUTE_MAX_GRID_DIM_X,
@ -607,14 +672,26 @@ int32_t __tgt_rtl_data_submit(int32_t device_id, void *tgt_ptr, void *hst_ptr,
return OFFLOAD_FAIL;
}
err = cuMemcpyHtoD((CUdeviceptr)tgt_ptr, hst_ptr, size);
CUstream &Stream = DeviceInfo.getNextStream(device_id);
err = cuMemcpyHtoDAsync((CUdeviceptr)tgt_ptr, hst_ptr, size, Stream);
if (err != CUDA_SUCCESS) {
DP("Error when copying data from host to device. Pointers: host = " DPxMOD
", device = " DPxMOD ", size = %" PRId64 "\n", DPxPTR(hst_ptr),
DPxPTR(tgt_ptr), size);
", device = " DPxMOD ", size = %" PRId64 "\n",
DPxPTR(hst_ptr), DPxPTR(tgt_ptr), size);
CUDA_ERR_STRING(err);
return OFFLOAD_FAIL;
}
err = cuStreamSynchronize(Stream);
if (err != CUDA_SUCCESS) {
DP("Error when synchronizing async data transfer from host to device. "
"Pointers: host = " DPxMOD ", device = " DPxMOD ", size = %" PRId64 "\n",
DPxPTR(hst_ptr), DPxPTR(tgt_ptr), size);
CUDA_ERR_STRING(err);
return OFFLOAD_FAIL;
}
return OFFLOAD_SUCCESS;
}
@ -628,14 +705,26 @@ int32_t __tgt_rtl_data_retrieve(int32_t device_id, void *hst_ptr, void *tgt_ptr,
return OFFLOAD_FAIL;
}
err = cuMemcpyDtoH(hst_ptr, (CUdeviceptr)tgt_ptr, size);
CUstream &Stream = DeviceInfo.getNextStream(device_id);
err = cuMemcpyDtoHAsync(hst_ptr, (CUdeviceptr)tgt_ptr, size, Stream);
if (err != CUDA_SUCCESS) {
DP("Error when copying data from device to host. Pointers: host = " DPxMOD
", device = " DPxMOD ", size = %" PRId64 "\n", DPxPTR(hst_ptr),
DPxPTR(tgt_ptr), size);
", device = " DPxMOD ", size = %" PRId64 "\n",
DPxPTR(hst_ptr), DPxPTR(tgt_ptr), size);
CUDA_ERR_STRING(err);
return OFFLOAD_FAIL;
}
err = cuStreamSynchronize(Stream);
if (err != CUDA_SUCCESS) {
DP("Error when synchronizing async data transfer from device to host. "
"Pointers: host = " DPxMOD ", device = " DPxMOD ", size = %" PRId64 "\n",
DPxPTR(hst_ptr), DPxPTR(tgt_ptr), size);
CUDA_ERR_STRING(err);
return OFFLOAD_FAIL;
}
return OFFLOAD_SUCCESS;
}
@ -755,8 +844,11 @@ int32_t __tgt_rtl_run_target_team_region(int32_t device_id, void *tgt_entry_ptr,
DP("Launch kernel with %d blocks and %d threads\n", cudaBlocksPerGrid,
cudaThreadsPerBlock);
CUstream &Stream = DeviceInfo.getNextStream(device_id);
err = cuLaunchKernel(KernelInfo->Func, cudaBlocksPerGrid, 1, 1,
cudaThreadsPerBlock, 1, 1, 0 /*bytes of shared memory*/, 0, &args[0], 0);
cudaThreadsPerBlock, 1, 1, 0 /*bytes of shared memory*/,
Stream, &args[0], 0);
if (err != CUDA_SUCCESS) {
DP("Device kernel launch failed!\n");
CUDA_ERR_STRING(err);
@ -766,7 +858,7 @@ int32_t __tgt_rtl_run_target_team_region(int32_t device_id, void *tgt_entry_ptr,
DP("Launch of entry point at " DPxMOD " successful!\n",
DPxPTR(tgt_entry_ptr));
CUresult sync_err = cuCtxSynchronize();
CUresult sync_err = cuStreamSynchronize(Stream);
if (sync_err != CUDA_SUCCESS) {
DP("Kernel execution error at " DPxMOD "!\n", DPxPTR(tgt_entry_ptr));
CUDA_ERR_STRING(sync_err);

View File

@ -0,0 +1,41 @@
// RUN: %libomptarget-compilexx-run-and-check-aarch64-unknown-linux-gnu
// RUN: %libomptarget-compilexx-run-and-check-powerpc64-ibm-linux-gnu
// RUN: %libomptarget-compilexx-run-and-check-powerpc64le-ibm-linux-gnu
// RUN: %libomptarget-compilexx-run-and-check-x86_64-pc-linux-gnu
#include <assert.h>
#include <stdio.h>
int main(int argc, char *argv[]) {
const int num_threads = 64, N = 128;
int array[num_threads] = {0};
#pragma omp parallel for
for (int i = 0; i < num_threads; ++i) {
int tmp[N];
for (int j = 0; j < N; ++j) {
tmp[j] = i;
}
#pragma omp target teams distribute parallel for map(tofrom : tmp)
for (int j = 0; j < N; ++j) {
tmp[j] += j;
}
for (int j = 0; j < N; ++j) {
array[i] += tmp[j];
}
}
// Verify
for (int i = 0; i < num_threads; ++i) {
const int ref = (0 + N - 1) * N / 2 + i * N;
assert(array[i] == ref);
}
printf("PASS\n");
return 0;
}
// CHECK: PASS