Merge pull request #639 from ye-luo/fix-nonblocking-maxcopy

Fix nonblocking maxcopy
This commit is contained in:
Paul R. C. Kent 2018-01-12 12:38:26 -05:00 committed by GitHub
commit 78727adc86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 101 additions and 118 deletions

View File

@ -121,8 +121,8 @@ struct Walker
* When Multiplicity = 0, this walker will be destroyed.
*/
RealType Multiplicity;
////Number of copies sent only for MPI
int NumSentCopies;
/// mark true if this walker is being sent.
bool SendInProgress;
/** The configuration vector (3N-dimensional vector to store
the positions of all the particles for a single walker)*/
@ -439,7 +439,6 @@ struct Walker
DataSet.add(Age);
DataSet.add(ReleasedNodeAge);
DataSet.add(ReleasedNodeWeight);
DataSet.add(NumSentCopies);
// vectors
DataSet.add(R.first_address(), R.last_address());
#if !defined(SOA_MEMORY_OPTIMIZED)
@ -469,7 +468,7 @@ struct Walker
void copyFromBuffer()
{
DataSet.rewind();
DataSet >> ID >> ParentID >> Generation >> Age >> ReleasedNodeAge >> ReleasedNodeWeight >> NumSentCopies;
DataSet >> ID >> ParentID >> Generation >> Age >> ReleasedNodeAge >> ReleasedNodeWeight;
// vectors
DataSet.get(R.first_address(), R.last_address());
#if !defined(SOA_MEMORY_OPTIMIZED)
@ -516,7 +515,7 @@ struct Walker
void updateBuffer()
{
DataSet.rewind();
DataSet << ID << ParentID << Generation << Age << ReleasedNodeAge << ReleasedNodeWeight << NumSentCopies;
DataSet << ID << ParentID << Generation << Age << ReleasedNodeAge << ReleasedNodeWeight;
// vectors
DataSet.put(R.first_address(), R.last_address());
#if !defined(SOA_MEMORY_OPTIMIZED)

View File

@ -57,11 +57,6 @@ WalkerControlMPI::WalkerControlMPI(Communicate* c): WalkerControlBase(c)
SwapMode=1;
Cur_min=0;
Cur_max=0;
#ifdef MCWALKERSET_MPI_DEBUG
char fname[128];
sprintf(fname,"test.%d",MyContext);
std::ofstream fout(fname);
#endif
setup_timers(myTimers, DMCMPITimerNames, timer_level_medium);
}
@ -158,8 +153,9 @@ void determineNewWalkerPopulation(int Cur_pop, int NumContexts, int MyContext, c
* The algorithm ensures that the load per node can differ only by one walker.
* Each MPI rank can only send or receive or be silent.
* The communication is one-dimensional and very local.
* If multiple copies of a walker need to be sent to the target rank,
* only one walker is sent and the number of copies is encoded in the message.
* If multiple copies of a walker need to be sent to the target rank, only send one.
* The number of copies is communicated ahead via blocking send/recv.
* Then the walkers are transferred via blocking or non-blocking send/recv.
* The blocking send/recv may become serialized and worsen load imbalance.
* Non blocking send/recv algorithm avoids serialization completely.
*/
@ -202,13 +198,18 @@ void WalkerControlMPI::swapWalkersSimple(MCWalkerConfiguration& W)
int nswap=plus.size();
// sort good walkers by the number of copies
assert(good_w.size()==ncopy_w.size());
std::vector<std::pair<int,int> > ncopy_pairs, nrecv_pairs;
std::vector<std::pair<int,int> > ncopy_pairs;
for(int iw=0; iw<ncopy_w.size(); iw++)
ncopy_pairs.push_back(std::make_pair(ncopy_w[iw],iw));
std::sort(ncopy_pairs.begin(), ncopy_pairs.end());
int nsend=0;
std::vector<OOMPI_Request> requests;
struct job {
const int walkerID;
const int target;
job(int wid, int target_in): walkerID(wid), target(target_in) {};
};
std::vector<job> job_list;
for(int ic=0; ic<nswap; ic++)
{
if(plus[ic]==MyContext)
@ -217,8 +218,7 @@ void WalkerControlMPI::swapWalkersSimple(MCWalkerConfiguration& W)
Walker_t* &awalker = good_w[ncopy_pairs.back().second];
// count the possible copies in one send
auto &nsentcopy = awalker->NumSentCopies;
nsentcopy = 0;
int nsentcopy = 0;
for(int id=ic+1; id<nswap; id++)
if(plus[ic]==plus[id]&&minus[ic]==minus[id]&&ncopy_pairs.back().first>0)
@ -231,18 +231,13 @@ void WalkerControlMPI::swapWalkersSimple(MCWalkerConfiguration& W)
break;
}
// pack data and send
size_t byteSize = awalker->byteSize();
awalker->updateBuffer();
OOMPI_Message sendBuffer(awalker->DataSet.data(), byteSize);
if(use_nonblocking)
requests.push_back(myComm->getComm()[minus[ic]].Isend(sendBuffer));
else
{
myTimers[DMC_MPI_send]->start();
myComm->getComm()[minus[ic]].Send(sendBuffer);
myTimers[DMC_MPI_send]->stop();
}
// send the number of copies to the target
myComm->getComm()[minus[ic]].Send(OOMPI_Message(nsentcopy));
job_list.push_back(job(ncopy_pairs.back().second, minus[ic]));
#ifdef MCWALKERSET_MPI_DEBUG
fout << "rank " << plus[ic] << " sends a walker with " << nsentcopy << " copies to rank " << minus[ic] << std::endl;
#endif
// update counter and cursor
++nsend;
ic+=nsentcopy;
@ -261,20 +256,57 @@ void WalkerControlMPI::swapWalkersSimple(MCWalkerConfiguration& W)
}
if(minus[ic]==MyContext)
{
// count receive pairs, (source,copy)
nrecv_pairs.push_back(std::make_pair(plus[ic],0));
for(int id=ic+1; id<nswap; id++)
if(plus[ic]==plus[id]&&minus[ic]==minus[id])
nrecv_pairs.back().second++;
else
break;
Walker_t* awalker(nullptr);
if(!bad_w.empty())
{
awalker=bad_w.back();
bad_w.pop_back();
}
int nsentcopy = 0;
// recv the number of copies from the target
myComm->getComm()[plus[ic]].Recv(OOMPI_Message(nsentcopy));
job_list.push_back(job(newW.size(), plus[ic]));
if(plus[ic]!=plus[ic+nsentcopy]||minus[ic]!=minus[ic+nsentcopy])
APP_ABORT("WalkerControlMPI::swapWalkersSimple send/recv pair checking failed!");
#ifdef MCWALKERSET_MPI_DEBUG
fout << "rank " << minus[ic] << " recvs a walker with " << nsentcopy << " copies from rank " << plus[ic] << std::endl;
#endif
// save the new walker
newW.push_back(awalker);
ncopy_newW.push_back(nsentcopy);
// update cursor
ic+=nrecv_pairs.back().second;
ic+=nsentcopy;
}
}
if(nsend>0)
{
std::vector<OOMPI_Request> requests;
// mark all walkers not in send
for(auto jobit=job_list.begin(); jobit!=job_list.end(); jobit++)
good_w[jobit->walkerID]->SendInProgress=false;
for(auto jobit=job_list.begin(); jobit!=job_list.end(); jobit++)
{
// pack data and send
Walker_t* &awalker = good_w[jobit->walkerID];
size_t byteSize = awalker->byteSize();
if(!awalker->SendInProgress)
{
awalker->updateBuffer();
awalker->SendInProgress=true;
}
OOMPI_Message sendBuffer(awalker->DataSet.data(), byteSize);
if(use_nonblocking)
requests.push_back(myComm->getComm()[jobit->target].Isend(sendBuffer));
else
{
myTimers[DMC_MPI_send]->start();
myComm->getComm()[jobit->target].Send(sendBuffer);
myTimers[DMC_MPI_send]->stop();
}
}
if(use_nonblocking)
{
// wait all the isend
@ -289,88 +321,45 @@ void WalkerControlMPI::swapWalkersSimple(MCWalkerConfiguration& W)
}
else
{
struct job {
OOMPI_Request request;
int walkerID;
int queueID;
job(const OOMPI_Request &req, int wid, int qid): request(req), walkerID(wid), queueID(qid) {};
};
std::vector<job> job_list;
std::vector<bool> queue_status(nrecv_pairs.size(),true);
bool completed=false;
while(!completed)
std::vector<OOMPI_Request> requests;
for(auto jobit=job_list.begin(); jobit!=job_list.end(); jobit++)
{
// receive data
for(int ic=0; ic<nrecv_pairs.size(); ic++)
if(queue_status[ic]&&nrecv_pairs[ic].second>=0)
{
Walker_t* awalker;
if(bad_w.empty())
{
awalker=new Walker_t(wRef);
}
else
{
awalker=bad_w.back();
bad_w.pop_back();
}
size_t byteSize = awalker->byteSize();
myTimers[DMC_MPI_recv]->start();
OOMPI_Message recvBuffer(awalker->DataSet.data(), byteSize);
if(use_nonblocking)
{
job_list.push_back(job(myComm->getComm()[nrecv_pairs[ic].first].Irecv(recvBuffer),
newW.size(),ic));
queue_status[ic]=false;
}
else
{
myComm->getComm()[nrecv_pairs[ic].first].Recv(recvBuffer);
job_list.push_back(job(OOMPI_Request(),newW.size(),ic));
}
myTimers[DMC_MPI_recv]->stop();
newW.push_back(awalker);
ncopy_newW.push_back(0);
}
// recv and unpack data
Walker_t* &awalker = newW[jobit->walkerID];
if(!awalker) awalker=new Walker_t(wRef);
size_t byteSize = awalker->byteSize();
OOMPI_Message recvBuffer(awalker->DataSet.data(), byteSize);
if(use_nonblocking)
{
OOMPI_Status status;
for(auto jobit=job_list.begin(); jobit!=job_list.end(); jobit++)
if(jobit->request.Test(status))
{
auto &awalker=newW[jobit->walkerID];
// unpack data
awalker->copyFromBuffer();
ncopy_newW[jobit->walkerID]=awalker->NumSentCopies;
// update counter
nrecv_pairs[jobit->queueID].second-=(awalker->NumSentCopies+1);
queue_status[jobit->queueID]=true;
job_list.erase(jobit);
break;
}
}
requests.push_back(myComm->getComm()[jobit->target].Irecv(recvBuffer));
else
{
for(auto jobit=job_list.begin(); jobit!=job_list.end(); jobit++)
{
auto &awalker=newW[jobit->walkerID];
// unpack data
awalker->copyFromBuffer();
ncopy_newW[jobit->walkerID]=awalker->NumSentCopies;
// update counter
nrecv_pairs[jobit->queueID].second-=(awalker->NumSentCopies+1);
}
job_list.clear();
myTimers[DMC_MPI_recv]->start();
myComm->getComm()[jobit->target].Recv(recvBuffer);
awalker->copyFromBuffer();
myTimers[DMC_MPI_recv]->stop();
}
// check the completion of queues
completed=true;
for(int ic=0; ic<nrecv_pairs.size(); ic++)
completed = completed && (nrecv_pairs[ic].second==-1);
}
if(use_nonblocking)
{
std::vector<bool> not_completed(requests.size(),true);
bool completed = false;
while(!completed)
{
OOMPI_Status status;
completed = true;
for(int im=0; im<requests.size(); im++)
if(not_completed[im])
{
if(requests[im].Test(status))
{
newW[job_list[im].walkerID]->copyFromBuffer();
not_completed[im] = false;
}
else
completed = false;
}
}
requests.clear();
}
}
//save the number of walkers sent

View File

@ -517,11 +517,6 @@ bool WalkerControlBase::put(xmlNodePtr cur)
if(nonblocking=="yes")
{
use_nonblocking = true;
if(MaxCopy>2)
{
app_warning() << "use_nonblocking==\"yes\" doesn't support maxCopy>2. Overwriting it to 2." << std::endl;
MaxCopy=2;
}
}
else if(nonblocking=="no")
{