Traces throttling. Preliminary, please test before trusting.

git-svn-id: https://subversion.assembla.com/svn/qmcdev/trunk@6367 e5b18d87-469d-4833-9cc0-8cdfa06e9491
This commit is contained in:
Jaron Krogel 2014-10-03 21:44:09 +00:00
parent e63019fdee
commit 74ff608843
6 changed files with 64 additions and 65 deletions

View File

@ -1218,6 +1218,12 @@ struct TraceBuffer
buffer.resize(0,buffer.size(1));
}
inline bool empty()
{
return buffer.size(0)==0;
}
inline void set_samples(TraceSamples<T>& s)
{
@ -1569,6 +1575,7 @@ public:
bool method_allows_traces;
bool streaming_traces;
bool writing_traces;
int buffer_steps; //number of steps to buffer data between writes
bool verbose;
string format;
bool hdf_format;
@ -1587,6 +1594,7 @@ public:
reset_permissions();
master_copy = true;
communicator = comm;
buffer_steps = 10;
format = "hdf";
default_domain = "scalars";
request.set_scalar_domain(default_domain);
@ -1622,6 +1630,7 @@ public:
request = tm.request;
streaming_traces = tm.streaming_traces;
writing_traces = tm.writing_traces;
buffer_steps = tm.buffer_steps;
verbose = tm.verbose;
format = tm.format;
hdf_format = tm.hdf_format;
@ -1687,6 +1696,7 @@ public:
attrib.add(verbose_write, "verbose" );
attrib.add(array, "particle" );//legacy
attrib.add(array_defaults, "particle_defaults" );//legacy
attrib.add(buffer_steps, "buffer" );//legacy
attrib.put(cur);
writing_traces = writing == "yes";
bool scalars_on = scalar == "yes";
@ -1694,6 +1704,8 @@ public:
bool use_scalar_defaults = scalar_defaults == "yes";
bool use_array_defaults = array_defaults == "yes";
verbose = verbose_write == "yes";
if(buffer_steps<1)
APP_ABORT("TraceManager::put buffer steps cannot be less than 1");
tolower(format);
if(format=="hdf")
{
@ -2051,34 +2063,43 @@ public:
}
//write buffered trace data to file
inline void write_buffers(vector<TraceManager*>& clones, int block)
//monitor what step the run is on, and write when it is time
//(throttled write, threaded, critical)
inline void monitor_writes(int step,TraceManager* master)
{
if(master_copy)
if(step%buffer_steps)
{
if(writing_traces)
#pragma omp critical(TRACE_BUFFER_WRITE)
{
double tstart = MPI_Wtime();
if(verbose)
app_log()<<"TraceManager::write_buffers "<<master_copy<<endl;
if(hdf_format)
{
write_buffers_hdf(clones);
}
if(adios_format)
{
#ifdef HAVE_ADIOS
write_buffers_adios(clones, block);
#else
APP_ABORT("TraceManager::write_buffers (adios) ADIOS is not found");
#endif
//app_log()<<"TraceManager::write_buffers (adios) has not yet been implemented"<<endl;
app_log()<<" write_buffers() total time "<<MPI_Wtime()-tstart<<endl;
}
write_buffers(*master);
}
}
else
APP_ABORT("TraceManager::write_buffers should not be called from non-master copy");
}
//write buffered trace data to file
inline void write_buffers(TraceManager& master)
{
if(writing_traces && !buffers_empty())
{
if(hdf_format)
write_buffers_hdf(master);
if(adios_format)
{
#ifdef HAVE_ADIOS
write_buffers_adios(master);
#else
APP_ABORT("TraceManager::write_buffer (adios) ADIOS is not found");
#endif
}
reset_buffers();
}
}
inline bool buffers_empty()
{
return int_buffer.empty() && real_buffer.empty();
}
@ -2139,7 +2160,7 @@ public:
inline void startRun(int blocks,vector<TraceManager*>& clones)
inline void startRun(vector<TraceManager*>& clones)
{
if(verbose)
app_log()<<"TraceManager::startRun "<<master_copy<<endl;
@ -2148,18 +2169,23 @@ public:
initialize_traces();
check_clones(clones);
open_file(clones);
for(int ip=0; ip<clones.size(); ++ip)
clones[ip]->reset_buffers();
}
else
APP_ABORT("TraceManager::startRun should not be called from non-master copy");
}
inline void stopRun()
inline void stopRun(vector<TraceManager*>& clones)
{
if(verbose)
app_log()<<"TraceManager::stopRun "<<master_copy<<endl;
if(master_copy)
{
if(writing_traces)// flush any additional data from the buffers
for(int ip=0; ip<clones.size(); ++ip)
clones[ip]->write_buffers(*this);
close_file();
finalize_traces();
}
@ -2168,21 +2194,6 @@ public:
}
inline void startBlock(int nsteps)
{
if(verbose)
app_log()<<"TraceManager::startBlock "<<master_copy<<endl;
reset_buffers();
}
inline void stopBlock()
{
if(verbose)
app_log()<<"TraceManager::stopBlock "<<master_copy<<endl;
}
inline void write_summary(string pad=" ")
{
string pad2 = pad+" ";
@ -2252,16 +2263,10 @@ public:
}
inline void write_buffers_hdf(vector<TraceManager*>& clones)
inline void write_buffers_hdf(TraceManager& master)
{
if(verbose)
app_log()<<"TraceManager::write_buffers_hdf "<<master_copy<<endl;
for(int ip=0; ip<clones.size(); ++ip)
{
TraceManager& tm = *clones[ip];
tm.int_buffer.write_hdf(*hdf_file,int_buffer.hdf_file_pointer);
tm.real_buffer.write_hdf(*hdf_file,real_buffer.hdf_file_pointer);
}
int_buffer.write_hdf(*master.hdf_file,master.int_buffer.hdf_file_pointer);
real_buffer.write_hdf(*master.hdf_file,master.real_buffer.hdf_file_pointer);
}

View File

@ -196,7 +196,7 @@ bool CSVMC::run()
Estimators->start(nBlocks);
for (int ip=0; ip<NumThreads; ++ip)
CSMovers[ip]->startRun(nBlocks,false);
Traces->startRun(nBlocks,traceClones);
Traces->startRun(traceClones);
const bool has_collectables=W.Collectables.size();
ADIOS_PROFILE::profile_adios_init(nBlocks);
for (int block=0; block<nBlocks; ++block)
@ -227,6 +227,7 @@ bool CSVMC::run()
wClones[ip]->saveEnsemble(wit,wit_end);
// if(storeConfigs && (now_loc%storeConfigs == 0))
// ForwardWalkingHistory.storeConfigsForForwardWalking(*wClones[ip]);
traceClones[ip]->monitor_writes(now_loc,Traces);
}
CSMovers[ip]->stopBlock(false);
// app_log()<<"THREAD "<<ip<<endl;
@ -236,11 +237,6 @@ bool CSVMC::run()
//Estimators->accumulateCollectables(wClones,nSteps);
CurrentStep+=nSteps;
Estimators->stopBlock(estimatorClones);
ADIOS_PROFILE::profile_adios_end_comp(block);
ADIOS_PROFILE::profile_adios_start_trace(block);
Traces->write_buffers(traceClones, block);
ADIOS_PROFILE::profile_adios_end_trace(block);
ADIOS_PROFILE::profile_adios_start_checkpoint(block);
if(storeConfigs)
recordBlock(block);
ADIOS_PROFILE::profile_adios_end_checkpoint(block);
@ -249,7 +245,7 @@ bool CSVMC::run()
Estimators->stop(estimatorClones);
for (int ip=0; ip<NumThreads; ++ip)
CSMovers[ip]->stopRun2();
Traces->stopRun();
Traces->stopRun(traceClones);
//copy back the random states
for (int ip=0; ip<NumThreads; ++ip)
*(RandomNumberControl::Children[ip])=*(Rng[ip]);

View File

@ -243,7 +243,7 @@ bool DMCOMP::run()
Estimators->start(nBlocks);
for(int ip=0; ip<NumThreads; ip++)
Movers[ip]->startRun(nBlocks,false);
Traces->startRun(nBlocks,traceClones);
Traces->startRun(traceClones);
Timer myclock;
IndexType block = 0;
IndexType updatePeriod=(QMCDriverMode[QMC_UPDATE_MODE])?Period4CheckProperties:(nBlocks+1)*nSteps;
@ -280,6 +280,7 @@ bool DMCOMP::run()
Movers[ip]->setMultiplicity(wit,wit_end);
if(QMCDriverMode[QMC_UPDATE_MODE] && now%updatePeriod == 0)
Movers[ip]->updateWalkers(wit, wit_end);
traceClones[ip]->monitor_writes(now,Traces);
}
prof.pop(); //close dmc_advance
@ -304,7 +305,6 @@ bool DMCOMP::run()
}
// branchEngine->debugFWconfig();
Estimators->stopBlock(acceptRatio());
Traces->write_buffers(traceClones, block);
block++;
if(DumpConfig &&block%Period4CheckPoint == 0)
{
@ -322,7 +322,7 @@ bool DMCOMP::run()
Estimators->stop();
for (int ip=0; ip<NumThreads; ++ip)
Movers[ip]->stopRun2();
Traces->stopRun();
Traces->stopRun(traceClones);
return finalize(nBlocks);
}

View File

@ -154,7 +154,6 @@ void QMCUpdateBase::stopRun2()
void QMCUpdateBase::startBlock(int steps)
{
Estimators->startBlock(steps);
Traces->startBlock(steps);
nAccept = 0;
nReject=0;
nAllRejected=0;
@ -165,7 +164,6 @@ void QMCUpdateBase::startBlock(int steps)
void QMCUpdateBase::stopBlock(bool collectall)
{
Estimators->stopBlock(acceptRatio(),collectall);
Traces->stopBlock();
}
void QMCUpdateBase::initWalkers(WalkerIter_t it, WalkerIter_t it_end)

View File

@ -52,7 +52,7 @@ bool RMCSingleOMP::run()
for (int ip=0; ip<NumThreads; ++ip)
Movers[ip]->startRun(nBlocks,false);
Traces->startRun(nBlocks,traceClones);
Traces->startRun(traceClones);
const bool has_collectables=W.Collectables.size();
for (int block=0; block<nBlocks; ++block)
{

View File

@ -53,7 +53,7 @@ bool VMCSingleOMP::run()
Estimators->start(nBlocks);
for (int ip=0; ip<NumThreads; ++ip)
Movers[ip]->startRun(nBlocks,false);
Traces->startRun(nBlocks,traceClones);
Traces->startRun(traceClones);
const bool has_collectables=W.Collectables.size();
for (int block=0; block<nBlocks; ++block)
{
@ -82,20 +82,20 @@ bool VMCSingleOMP::run()
wClones[ip]->saveEnsemble(wit,wit_end);
// if(storeConfigs && (now_loc%storeConfigs == 0))
// ForwardWalkingHistory.storeConfigsForForwardWalking(*wClones[ip]);
traceClones[ip]->monitor_writes(now_loc,Traces);
}
Movers[ip]->stopBlock(false);
}//end-of-parallel for
//Estimators->accumulateCollectables(wClones,nSteps);
CurrentStep+=nSteps;
Estimators->stopBlock(estimatorClones);
Traces->write_buffers(traceClones, block);
if(storeConfigs)
recordBlock(block);
}//block
Estimators->stop(estimatorClones);
for (int ip=0; ip<NumThreads; ++ip)
Movers[ip]->stopRun2();
Traces->stopRun();
Traces->stopRun(traceClones);
//copy back the random states
for (int ip=0; ip<NumThreads; ++ip)
*(RandomNumberControl::Children[ip])=*(Rng[ip]);