From 44f2384e2f877503b963bdf37243081e347f8061 Mon Sep 17 00:00:00 2001 From: Ye Luo Date: Tue, 9 Jan 2018 22:48:50 -0600 Subject: [PATCH 1/4] Fix nonblocking send/recv with maxCopy>2 --- src/Particle/Walker.h | 9 +- src/QMCDrivers/DMC/WalkerControlMPI.cpp | 198 +++++++++++------------- src/QMCDrivers/WalkerControlBase.cpp | 5 - 3 files changed, 97 insertions(+), 115 deletions(-) diff --git a/src/Particle/Walker.h b/src/Particle/Walker.h index d3aaa06c9..b47216585 100644 --- a/src/Particle/Walker.h +++ b/src/Particle/Walker.h @@ -121,8 +121,8 @@ struct Walker * When Multiplicity = 0, this walker will be destroyed. */ RealType Multiplicity; - ////Number of copies sent only for MPI - int NumSentCopies; + /// mark ture if this walker is being sent. + bool SendInProgress; /** The configuration vector (3N-dimensional vector to store the positions of all the particles for a single walker)*/ @@ -439,7 +439,6 @@ struct Walker DataSet.add(Age); DataSet.add(ReleasedNodeAge); DataSet.add(ReleasedNodeWeight); - DataSet.add(NumSentCopies); // vectors DataSet.add(R.first_address(), R.last_address()); #if !defined(SOA_MEMORY_OPTIMIZED) @@ -469,7 +468,7 @@ struct Walker void copyFromBuffer() { DataSet.rewind(); - DataSet >> ID >> ParentID >> Generation >> Age >> ReleasedNodeAge >> ReleasedNodeWeight >> NumSentCopies; + DataSet >> ID >> ParentID >> Generation >> Age >> ReleasedNodeAge >> ReleasedNodeWeight; // vectors DataSet.get(R.first_address(), R.last_address()); #if !defined(SOA_MEMORY_OPTIMIZED) @@ -516,7 +515,7 @@ struct Walker void updateBuffer() { DataSet.rewind(); - DataSet << ID << ParentID << Generation << Age << ReleasedNodeAge << ReleasedNodeWeight << NumSentCopies; + DataSet << ID << ParentID << Generation << Age << ReleasedNodeAge << ReleasedNodeWeight; // vectors DataSet.put(R.first_address(), R.last_address()); #if !defined(SOA_MEMORY_OPTIMIZED) diff --git a/src/QMCDrivers/DMC/WalkerControlMPI.cpp b/src/QMCDrivers/DMC/WalkerControlMPI.cpp index 46f099629..216eb9d0f 100644 --- a/src/QMCDrivers/DMC/WalkerControlMPI.cpp +++ b/src/QMCDrivers/DMC/WalkerControlMPI.cpp @@ -57,11 +57,6 @@ WalkerControlMPI::WalkerControlMPI(Communicate* c): WalkerControlBase(c) SwapMode=1; Cur_min=0; Cur_max=0; -#ifdef MCWALKERSET_MPI_DEBUG - char fname[128]; - sprintf(fname,"test.%d",MyContext); - std::ofstream fout(fname); -#endif setup_timers(myTimers, DMCMPITimerNames, timer_level_medium); } @@ -208,7 +203,12 @@ void WalkerControlMPI::swapWalkersSimple(MCWalkerConfiguration& W) std::sort(ncopy_pairs.begin(), ncopy_pairs.end()); int nsend=0; - std::vector requests; + struct job { + const int walkerID; + const int target; + job(int wid, int target_in): walkerID(wid), target(target_in) {}; + }; + std::vector job_list; for(int ic=0; icNumSentCopies; - nsentcopy = 0; + int nsentcopy = 0; for(int id=ic+1; id0) @@ -231,18 +230,13 @@ void WalkerControlMPI::swapWalkersSimple(MCWalkerConfiguration& W) break; } - // pack data and send - size_t byteSize = awalker->byteSize(); - awalker->updateBuffer(); - OOMPI_Message sendBuffer(awalker->DataSet.data(), byteSize); - if(use_nonblocking) - requests.push_back(myComm->getComm()[minus[ic]].Isend(sendBuffer)); - else - { - myTimers[DMC_MPI_send]->start(); - myComm->getComm()[minus[ic]].Send(sendBuffer); - myTimers[DMC_MPI_send]->stop(); - } + // send the number of copies to the target + myComm->getComm()[minus[ic]].Send(OOMPI_Message(nsentcopy)); + job_list.push_back(job(ncopy_pairs.back().second, minus[ic])); +#ifdef MCWALKERSET_MPI_DEBUG + fout << "rank " << plus[ic] << " sends a walker with " << nsentcopy << " copies to rank " << minus[ic] << std::endl; +#endif + // update counter and cursor ++nsend; ic+=nsentcopy; @@ -261,20 +255,57 @@ void WalkerControlMPI::swapWalkersSimple(MCWalkerConfiguration& W) } if(minus[ic]==MyContext) { - // count receive pairs, (source,copy) - nrecv_pairs.push_back(std::make_pair(plus[ic],0)); - for(int id=ic+1; idgetComm()[plus[ic]].Recv(OOMPI_Message(nsentcopy)); + job_list.push_back(job(newW.size(), plus[ic])); + if(plus[ic]!=plus[ic+nsentcopy]||minus[ic]!=minus[ic+nsentcopy]) + APP_ABORT("WalkerControlMPI::swapWalkersSimple send/recv pair checking failed!"); +#ifdef MCWALKERSET_MPI_DEBUG + fout << "rank " << minus[ic] << " recvs a walker with " << nsentcopy << " copies from rank " << plus[ic] << std::endl; +#endif + + // save the new walker + newW.push_back(awalker); + ncopy_newW.push_back(nsentcopy); // update cursor - ic+=nrecv_pairs.back().second; + ic+=nsentcopy; } } if(nsend>0) { + std::vector requests; + // mark all walkers not in send + for(auto jobit=job_list.begin(); jobit!=job_list.end(); jobit++) + good_w[jobit->walkerID]->SendInProgress=false; + for(auto jobit=job_list.begin(); jobit!=job_list.end(); jobit++) + { + // pack data and send + Walker_t* &awalker = good_w[jobit->walkerID]; + size_t byteSize = awalker->byteSize(); + if(!awalker->SendInProgress) + { + awalker->updateBuffer(); + awalker->SendInProgress=true; + } + OOMPI_Message sendBuffer(awalker->DataSet.data(), byteSize); + if(use_nonblocking) + requests.push_back(myComm->getComm()[jobit->target].Isend(sendBuffer)); + else + { + myTimers[DMC_MPI_send]->start(); + myComm->getComm()[jobit->target].Send(sendBuffer); + myTimers[DMC_MPI_send]->stop(); + } + } if(use_nonblocking) { // wait all the isend @@ -289,88 +320,45 @@ void WalkerControlMPI::swapWalkersSimple(MCWalkerConfiguration& W) } else { - struct job { - OOMPI_Request request; - int walkerID; - int queueID; - job(const OOMPI_Request &req, int wid, int qid): request(req), walkerID(wid), queueID(qid) {}; - }; - std::vector job_list; - std::vector queue_status(nrecv_pairs.size(),true); - - bool completed=false; - while(!completed) + std::vector requests; + for(auto jobit=job_list.begin(); jobit!=job_list.end(); jobit++) { - // receive data - for(int ic=0; ic=0) - { - Walker_t* awalker; - if(bad_w.empty()) - { - awalker=new Walker_t(wRef); - } - else - { - awalker=bad_w.back(); - bad_w.pop_back(); - } - - size_t byteSize = awalker->byteSize(); - myTimers[DMC_MPI_recv]->start(); - OOMPI_Message recvBuffer(awalker->DataSet.data(), byteSize); - if(use_nonblocking) - { - job_list.push_back(job(myComm->getComm()[nrecv_pairs[ic].first].Irecv(recvBuffer), - newW.size(),ic)); - queue_status[ic]=false; - } - else - { - myComm->getComm()[nrecv_pairs[ic].first].Recv(recvBuffer); - job_list.push_back(job(OOMPI_Request(),newW.size(),ic)); - } - myTimers[DMC_MPI_recv]->stop(); - - newW.push_back(awalker); - ncopy_newW.push_back(0); - } - + // recv and unpack data + Walker_t* &awalker = newW[jobit->walkerID]; + if(!awalker) awalker=new Walker_t(wRef); + size_t byteSize = awalker->byteSize(); + OOMPI_Message recvBuffer(awalker->DataSet.data(), byteSize); if(use_nonblocking) - { - OOMPI_Status status; - for(auto jobit=job_list.begin(); jobit!=job_list.end(); jobit++) - if(jobit->request.Test(status)) - { - auto &awalker=newW[jobit->walkerID]; - // unpack data - awalker->copyFromBuffer(); - ncopy_newW[jobit->walkerID]=awalker->NumSentCopies; - // update counter - nrecv_pairs[jobit->queueID].second-=(awalker->NumSentCopies+1); - queue_status[jobit->queueID]=true; - job_list.erase(jobit); - break; - } - } + requests.push_back(myComm->getComm()[jobit->target].Irecv(recvBuffer)); else { - for(auto jobit=job_list.begin(); jobit!=job_list.end(); jobit++) - { - auto &awalker=newW[jobit->walkerID]; - // unpack data - awalker->copyFromBuffer(); - ncopy_newW[jobit->walkerID]=awalker->NumSentCopies; - // update counter - nrecv_pairs[jobit->queueID].second-=(awalker->NumSentCopies+1); - } - job_list.clear(); + myTimers[DMC_MPI_recv]->start(); + myComm->getComm()[jobit->target].Recv(recvBuffer); + awalker->copyFromBuffer(); + myTimers[DMC_MPI_recv]->stop(); } - - // check the completion of queues - completed=true; - for(int ic=0; ic not_completed(requests.size(),true); + bool completed = false; + while(!completed) + { + OOMPI_Status status; + completed = true; + for(int im=0; imcopyFromBuffer(); + not_completed[im] = false; + } + else + completed = false; + } + } + requests.clear(); } } //save the number of walkers sent diff --git a/src/QMCDrivers/WalkerControlBase.cpp b/src/QMCDrivers/WalkerControlBase.cpp index 887052697..ef54d81e4 100644 --- a/src/QMCDrivers/WalkerControlBase.cpp +++ b/src/QMCDrivers/WalkerControlBase.cpp @@ -517,11 +517,6 @@ bool WalkerControlBase::put(xmlNodePtr cur) if(nonblocking=="yes") { use_nonblocking = true; - if(MaxCopy>2) - { - app_warning() << "use_nonblocking==\"yes\" doesn't support maxCopy>2. Overwriting it to 2." << std::endl; - MaxCopy=2; - } } else if(nonblocking=="no") { From b1b9648f1845e96b677c428df662035c7ea10430 Mon Sep 17 00:00:00 2001 From: Ye Luo Date: Wed, 10 Jan 2018 10:53:19 -0600 Subject: [PATCH 2/4] Update comments. --- src/QMCDrivers/DMC/WalkerControlMPI.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/QMCDrivers/DMC/WalkerControlMPI.cpp b/src/QMCDrivers/DMC/WalkerControlMPI.cpp index 216eb9d0f..e33382a81 100644 --- a/src/QMCDrivers/DMC/WalkerControlMPI.cpp +++ b/src/QMCDrivers/DMC/WalkerControlMPI.cpp @@ -153,8 +153,9 @@ void determineNewWalkerPopulation(int Cur_pop, int NumContexts, int MyContext, c * The algorithm ensures that the load per node can differ only by one walker. * Each MPI rank can only send or receive or be silent. * The communication is one-dimensional and very local. - * If multiple copies of a walker need to be sent to the target rank, - * only one walker is sent and the number of copies is encoded in the message. + * If multiple copies of a walker need to be sent to the target rank, only send one. + * The number of copies is communicated ahead via blocking send/recv. + * Then the walkers are transferred via blocking or non-blocking send/recv. * The blocking send/recv may become serialized and worsen load imbalance. * Non blocking send/recv algorithm avoids serialization completely. */ From 175357e48e9be977ba2885065356678aea6d90cf Mon Sep 17 00:00:00 2001 From: Ye Luo Date: Wed, 10 Jan 2018 16:19:13 -0600 Subject: [PATCH 3/4] Minor change. --- src/QMCDrivers/DMC/WalkerControlMPI.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/QMCDrivers/DMC/WalkerControlMPI.cpp b/src/QMCDrivers/DMC/WalkerControlMPI.cpp index e33382a81..17aaef559 100644 --- a/src/QMCDrivers/DMC/WalkerControlMPI.cpp +++ b/src/QMCDrivers/DMC/WalkerControlMPI.cpp @@ -198,7 +198,7 @@ void WalkerControlMPI::swapWalkersSimple(MCWalkerConfiguration& W) int nswap=plus.size(); // sort good walkers by the number of copies assert(good_w.size()==ncopy_w.size()); - std::vector > ncopy_pairs, nrecv_pairs; + std::vector > ncopy_pairs; for(int iw=0; iw Date: Fri, 12 Jan 2018 12:03:13 -0500 Subject: [PATCH 4/4] Typo fix --- src/Particle/Walker.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Particle/Walker.h b/src/Particle/Walker.h index b47216585..290999435 100644 --- a/src/Particle/Walker.h +++ b/src/Particle/Walker.h @@ -121,7 +121,7 @@ struct Walker * When Multiplicity = 0, this walker will be destroyed. */ RealType Multiplicity; - /// mark ture if this walker is being sent. + /// mark true if this walker is being sent. bool SendInProgress; /** The configuration vector (3N-dimensional vector to store