Skip to content

Commit

Permalink
Merge pull request #2839 from pnorbert/core-time
Browse files Browse the repository at this point in the history
Moved Seconds and Timepoint definitions to CoreTypes.h …
  • Loading branch information
pnorbert authored Aug 25, 2021
2 parents 83349b7 + 21983d7 commit 688ca55
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 72 deletions.
9 changes: 9 additions & 0 deletions source/adios2/core/CoreTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#define ADIOS2_CORETYPES_H_

/// \cond EXCLUDE_FROM_DOXYGEN
#include <chrono>
#include <cstddef>
#include <cstdint>
/// \endcond
Expand All @@ -32,6 +33,14 @@ struct iovec
size_t iov_len;
};

typedef std::chrono::duration<double> Seconds;
typedef std::chrono::time_point<
std::chrono::steady_clock,
std::chrono::duration<double, std::chrono::steady_clock::period>>
TimePoint;

inline TimePoint Now() { return std::chrono::steady_clock::now(); }

} // end namespace core
} // end namespace adios2

Expand Down
16 changes: 7 additions & 9 deletions source/adios2/engine/bp4/BP4Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,17 @@ void BP4Reader::Init()

/* Do a collective wait for the file(s) to appear within timeout.
Make sure every process comes to the same conclusion */
const Seconds timeoutSeconds =
Seconds(m_BP4Deserializer.m_Parameters.OpenTimeoutSecs);
const Seconds timeoutSeconds(
m_BP4Deserializer.m_Parameters.OpenTimeoutSecs);

Seconds pollSeconds =
Seconds(m_BP4Deserializer.m_Parameters.BeginStepPollingFrequencySecs);
Seconds pollSeconds(
m_BP4Deserializer.m_Parameters.BeginStepPollingFrequencySecs);
if (pollSeconds > timeoutSeconds)
{
pollSeconds = timeoutSeconds;
}

TimePoint timeoutInstant =
std::chrono::steady_clock::now() + timeoutSeconds;
TimePoint timeoutInstant = Now() + timeoutSeconds;

OpenFiles(timeoutInstant, pollSeconds, timeoutSeconds);
if (!m_BP4Deserializer.m_Parameters.StreamReader)
Expand All @@ -180,7 +179,7 @@ void BP4Reader::Init()
bool BP4Reader::SleepOrQuit(const TimePoint &timeoutInstant,
const Seconds &pollSeconds)
{
auto now = std::chrono::steady_clock::now();
auto now = Now();
if (now + pollSeconds >= timeoutInstant)
{
return false;
Expand Down Expand Up @@ -684,8 +683,7 @@ StepStatus BP4Reader::CheckForNewSteps(Seconds timeoutSeconds)
{
timeoutSeconds = Seconds(999999999); // max 1 billion seconds wait
}
const TimePoint timeoutInstant =
std::chrono::steady_clock::now() + timeoutSeconds;
const TimePoint timeoutInstant = Now() + timeoutSeconds;

auto pollSeconds =
Seconds(m_BP4Deserializer.m_Parameters.BeginStepPollingFrequencySecs);
Expand Down
9 changes: 1 addition & 8 deletions source/adios2/engine/bp4/BP4Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
#define ADIOS2_ENGINE_BP4_BP4READER_H_

#include "adios2/common/ADIOSConfig.h"
#include "adios2/core/CoreTypes.h"
#include "adios2/core/Engine.h"
#include "adios2/helper/adiosComm.h"
#include "adios2/toolkit/format/bp/bp4/BP4Deserializer.h"
#include "adios2/toolkit/transportman/TransportMan.h"

#include <chrono>

namespace adios2
{
namespace core
Expand Down Expand Up @@ -52,12 +51,6 @@ class BP4Reader : public Engine
void PerformGets() final;

private:
typedef std::chrono::duration<double> Seconds;
typedef std::chrono::time_point<
std::chrono::steady_clock,
std::chrono::duration<double, std::chrono::steady_clock::period>>
TimePoint;

format::BP4Deserializer m_BP4Deserializer;
/* transport manager for metadata file */
transportman::TransportMan m_MDFileManager;
Expand Down
6 changes: 2 additions & 4 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

#include <adios2-perfstubs-interface.h>

#include <chrono>
#include <errno.h>

namespace adios2
Expand Down Expand Up @@ -224,8 +223,7 @@ void BP5Reader::Init()
pollSeconds = timeoutSeconds;
}

TimePoint timeoutInstant =
std::chrono::steady_clock::now() + timeoutSeconds;
TimePoint timeoutInstant = Now() + timeoutSeconds;

OpenFiles(timeoutInstant, pollSeconds, timeoutSeconds);

Expand All @@ -236,7 +234,7 @@ void BP5Reader::Init()
bool BP5Reader::SleepOrQuit(const TimePoint &timeoutInstant,
const Seconds &pollSeconds)
{
auto now = std::chrono::steady_clock::now();
auto now = Now();
if (now + pollSeconds >= timeoutInstant)
{
return false;
Expand Down
7 changes: 1 addition & 6 deletions source/adios2/engine/bp5/BP5Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#define ADIOS2_ENGINE_BP5_BP5READER_H_

#include "adios2/common/ADIOSConfig.h"
#include "adios2/core/CoreTypes.h"
#include "adios2/core/Engine.h"
#include "adios2/engine/bp5/BP5Engine.h"
#include "adios2/helper/adiosComm.h"
Expand Down Expand Up @@ -55,12 +56,6 @@ class BP5Reader : public BP5Engine, public Engine
MinVarInfo *MinBlocksInfo(const VariableBase &, const size_t Step) const;

private:
typedef std::chrono::duration<double> Seconds;
typedef std::chrono::time_point<
std::chrono::steady_clock,
std::chrono::duration<double, std::chrono::steady_clock::period>>
TimePoint;

format::BP5Deserializer *m_BP5Deserializer = nullptr;
/* transport manager for metadata file */
transportman::TransportMan m_MDFileManager;
Expand Down
85 changes: 40 additions & 45 deletions source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <vector>

#include "../../common/ADIOSTypes.h"
#include "../../core/CoreTypes.h"

/// \endcond
#if defined(__has_feature)
Expand Down Expand Up @@ -54,23 +55,17 @@ void FileDrainerSingleThread::Finish()
finishMutex.unlock();
}

typedef std::chrono::duration<double> Seconds;
typedef std::chrono::time_point<
std::chrono::steady_clock,
std::chrono::duration<double, std::chrono::steady_clock::period>>
TimePoint;

void FileDrainerSingleThread::Join()
{
if (th.joinable())
{
const auto tTotalStart = std::chrono::steady_clock::now();
Seconds timeTotal = Seconds(0.0);
const auto tTotalStart = core::Now();
core::Seconds timeTotal(0.0);

Finish();
th.join();

const auto tTotalEnd = std::chrono::steady_clock::now();
const auto tTotalEnd = core::Now();
timeTotal = tTotalEnd - tTotalStart;
if (m_Verbose)
{
Expand All @@ -89,13 +84,13 @@ void FileDrainerSingleThread::Join()
*/
void FileDrainerSingleThread::DrainThread()
{
const auto tTotalStart = std::chrono::steady_clock::now();
Seconds timeTotal = Seconds(0.0);
Seconds timeSleep = Seconds(0.0);
Seconds timeRead = Seconds(0.0);
Seconds timeWrite = Seconds(0.0);
Seconds timeClose = Seconds(0.0);
TimePoint ts, te;
const auto tTotalStart = core::Now();
core::Seconds timeTotal(0.0);
core::Seconds timeSleep(0.0);
core::Seconds timeRead(0.0);
core::Seconds timeWrite(0.0);
core::Seconds timeClose(0.0);
core::TimePoint ts, te;
size_t maxQueueSize = 0;
std::vector<char> buffer; // fixed, preallocated buffer to read/write data
buffer.resize(bufferSize);
Expand All @@ -110,18 +105,18 @@ void FileDrainerSingleThread::DrainThread()
auto lf_Copy = [&](FileDrainOperation &fdo, InputFile fdr, OutputFile fdw,
size_t count) {
nReadBytesTasked += count;
ts = std::chrono::steady_clock::now();
ts = core::Now();
std::pair<size_t, double> ret =
Read(fdr, count, buffer.data(), fdo.fromFileName);
te = std::chrono::steady_clock::now();
te = core::Now();
timeRead += te - ts;
nReadBytesSucc += ret.first;
sleptForWaitingOnRead += ret.second;

nWriteBytesTasked += count;
ts = std::chrono::steady_clock::now();
ts = core::Now();
size_t n = Write(fdw, count, buffer.data(), fdo.toFileName);
te = std::chrono::steady_clock::now();
te = core::Now();
timeWrite += te - ts;
nWriteBytesSucc += n;
};
Expand All @@ -141,9 +136,9 @@ void FileDrainerSingleThread::DrainThread()
{
break;
}
ts = std::chrono::steady_clock::now();
ts = core::Now();
std::this_thread::sleep_for(d);
te = std::chrono::steady_clock::now();
te = core::Now();
timeSleep += te - ts;
continue;
}
Expand All @@ -162,15 +157,15 @@ void FileDrainerSingleThread::DrainThread()
case DrainOperation::CopyAt:
case DrainOperation::Copy:
{
ts = std::chrono::steady_clock::now();
ts = core::Now();
auto fdr = GetFileForRead(fdo.fromFileName);
te = std::chrono::steady_clock::now();
te = core::Now();
timeRead += te - ts;

ts = std::chrono::steady_clock::now();
ts = core::Now();
bool append = (fdo.op == DrainOperation::Copy);
auto fdw = GetFileForWrite(fdo.toFileName, append);
te = std::chrono::steady_clock::now();
te = core::Now();
timeWrite += te - ts;

if (m_Verbose >= 2)
Expand Down Expand Up @@ -198,14 +193,14 @@ void FileDrainerSingleThread::DrainThread()
{
if (fdo.op == DrainOperation::CopyAt)
{
ts = std::chrono::steady_clock::now();
ts = core::Now();
Seek(fdr, fdo.fromOffset, fdo.fromFileName);
te = std::chrono::steady_clock::now();
te = core::Now();
timeRead += te - ts;

ts = std::chrono::steady_clock::now();
ts = core::Now();
Seek(fdw, fdo.toOffset, fdo.toFileName);
te = std::chrono::steady_clock::now();
te = core::Now();
timeWrite += te - ts;
}
const size_t batches = fdo.countBytes / bufferSize;
Expand Down Expand Up @@ -236,10 +231,10 @@ void FileDrainerSingleThread::DrainThread()
<< fdo.toFileName << std::endl;
#endif
}
ts = std::chrono::steady_clock::now();
ts = core::Now();
auto fdw = GetFileForWrite(fdo.toFileName);
SeekEnd(fdw);
te = std::chrono::steady_clock::now();
te = core::Now();
timeWrite += te - ts;
break;
}
Expand All @@ -255,12 +250,12 @@ void FileDrainerSingleThread::DrainThread()
#endif
}
nWriteBytesTasked += fdo.countBytes;
ts = std::chrono::steady_clock::now();
ts = core::Now();
auto fdw = GetFileForWrite(fdo.toFileName);
Seek(fdw, fdo.toOffset, fdo.toFileName);
size_t n = Write(fdw, fdo.countBytes, fdo.dataToWrite.data(),
fdo.toFileName);
te = std::chrono::steady_clock::now();
te = core::Now();
timeWrite += te - ts;
nWriteBytesSucc += n;
break;
Expand All @@ -277,11 +272,11 @@ void FileDrainerSingleThread::DrainThread()
#endif
}
nWriteBytesTasked += fdo.countBytes;
ts = std::chrono::steady_clock::now();
ts = core::Now();
auto fdw = GetFileForWrite(fdo.toFileName);
size_t n = Write(fdw, fdo.countBytes, fdo.dataToWrite.data(),
fdo.toFileName);
te = std::chrono::steady_clock::now();
te = core::Now();
timeWrite += te - ts;
nWriteBytesSucc += n;
break;
Expand All @@ -295,9 +290,9 @@ void FileDrainerSingleThread::DrainThread()
<< fdo.toFileName << std::endl;
#endif
}
ts = std::chrono::steady_clock::now();
ts = core::Now();
GetFileForWrite(fdo.toFileName, false);
te = std::chrono::steady_clock::now();
te = core::Now();
timeWrite += te - ts;
break;
}
Expand All @@ -310,9 +305,9 @@ void FileDrainerSingleThread::DrainThread()
<< fdo.toFileName << " for append " << std::endl;
#endif
}
ts = std::chrono::steady_clock::now();
ts = core::Now();
GetFileForWrite(fdo.toFileName, true);
te = std::chrono::steady_clock::now();
te = core::Now();
timeWrite += te - ts;
break;
}
Expand All @@ -325,10 +320,10 @@ void FileDrainerSingleThread::DrainThread()
<< fdo.toFileName << std::endl;
#endif
}
ts = std::chrono::steady_clock::now();
ts = core::Now();
auto fdw = GetFileForWrite(fdo.toFileName, true);
Delete(fdw, fdo.toFileName);
te = std::chrono::steady_clock::now();
te = core::Now();
timeWrite += te - ts;
break;
}
Expand All @@ -349,12 +344,12 @@ void FileDrainerSingleThread::DrainThread()
#endif
}

ts = std::chrono::steady_clock::now();
ts = core::Now();
CloseAll();
te = std::chrono::steady_clock::now();
te = core::Now();
timeClose += te - ts;

const auto tTotalEnd = std::chrono::steady_clock::now();
const auto tTotalEnd = core::Now();
timeTotal = tTotalEnd - tTotalStart;
const bool shouldReport =
(m_Verbose || (nReadBytesTasked != nReadBytesSucc) ||
Expand Down

0 comments on commit 688ca55

Please sign in to comment.