Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for variable-based encoding in backends without step support #1484

Merged
merged 12 commits into from
Aug 9, 2023
Merged
8 changes: 8 additions & 0 deletions include/openPMD/IO/AbstractIOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,14 @@ class AbstractIOHandler
Access const m_frontendAccess;
internal::SeriesStatus m_seriesStatus = internal::SeriesStatus::Default;
std::queue<IOTask> m_work;
/**
* This is to avoid that the destructor tries flushing again if an error
* happened. Otherwise, this would lead to confusing error messages.
* Initialized as false, set to true after successful construction.
* If flushing results in an error, set this back to false.
* The destructor will only attempt flushing again if this is true.
*/
bool m_lastFlushSuccessful = false;
}; // AbstractIOHandler

} // namespace openPMD
9 changes: 9 additions & 0 deletions include/openPMD/IO/AbstractIOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
#pragma once

#include "openPMD/Error.hpp"
franzpoeschel marked this conversation as resolved.
Show resolved Hide resolved
#include "openPMD/IO/AbstractIOHandler.hpp"
#include "openPMD/IO/IOTask.hpp"
#include "openPMD/auxiliary/DerefDynamicCast.hpp"
Expand Down Expand Up @@ -263,6 +264,14 @@ class AbstractIOHandlerImpl
*/
virtual void advance(Writable *, Parameter<Operation::ADVANCE> &parameters)
{
if (parameters.isThisStepMandatory)
{
throw error::OperationUnsupportedInBackend(
m_handler->backendName(),
"Variable-based encoding requires backend support for IO steps "
"in order to store more than one iteration (only supported in "
"ADIOS2 backend).");
}
*parameters.status = AdvanceStatus::RANDOMACCESS;
}

Expand Down
1 change: 1 addition & 0 deletions include/openPMD/IO/IOTask.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@ struct OPENPMDAPI_EXPORT Parameter<Operation::ADVANCE>

//! input parameter
AdvanceMode mode;
bool isThisStepMandatory = false;
//! output parameter
std::shared_ptr<AdvanceStatus> status =
std::make_shared<AdvanceStatus>(AdvanceStatus::OK);
Expand Down
22 changes: 18 additions & 4 deletions include/openPMD/ReadIterations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,20 @@ class SeriesIterator
std::set<Iteration::IterationIndex_t> ignoreIterations;
};

std::shared_ptr<SharedData> m_data;
/*
* The shared data is never empty, emptiness is indicated by std::optional
*/
std::shared_ptr<std::optional<SharedData>> m_data =
std::make_shared<std::optional<SharedData>>(std::nullopt);

SharedData &get()
{
return m_data->value();
}
SharedData const &get() const
{
return m_data->value();
}

public:
//! construct the end() iterator
Expand All @@ -79,7 +92,7 @@ class SeriesIterator
private:
inline bool setCurrentIteration()
{
auto &data = *m_data;
auto &data = get();
if (data.iterationsInCurrentStep.empty())
{
std::cerr << "[ReadIterations] Encountered a step without "
Expand All @@ -94,7 +107,7 @@ class SeriesIterator

inline std::optional<uint64_t> peekCurrentIteration()
{
auto &data = *m_data;
auto &data = get();
if (data.iterationsInCurrentStep.empty())
{
return std::nullopt;
Expand Down Expand Up @@ -124,6 +137,8 @@ class SeriesIterator
void deactivateDeadIteration(iteration_index_t);

void initSeriesInLinearReadMode();

void close();
};

/**
Expand Down Expand Up @@ -151,7 +166,6 @@ class ReadIterations
using iterator_t = SeriesIterator;

Series m_series;
std::optional<SeriesIterator> alreadyOpened;
std::optional<internal::ParsePreference> m_parsePreference;

ReadIterations(
Expand Down
51 changes: 44 additions & 7 deletions include/openPMD/Series.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
namespace openPMD
{
class ReadIterations;
class SeriesIterator;
class Series;
class Series;

Expand Down Expand Up @@ -88,6 +89,23 @@ namespace internal
* the same instance.
*/
std::optional<WriteIterations> m_writeIterations;

/**
* Series::readIterations() returns an iterator type that modifies the
* state of the Series (by proceeding through IO steps).
* Hence, we need to make sure that there is only one of them, otherwise
* they will both make modifications to the Series that the other
* iterator is not aware of.
*
* Plan: At some point, we should add a second iterator type that does
* not change the state. Series::readIterations() should then return
* either this or that iterator depending on read mode (linear or
* random-access) and backend capabilities.
*
* Due to include order, this member needs to be a pointer instead of
* an optional.
*/
std::unique_ptr<SeriesIterator> m_sharedStatefulIterator;
/**
* For writing: Remember which iterations have been written in the
* currently active output step. Use this later when writing the
Expand Down Expand Up @@ -151,14 +169,15 @@ namespace internal
* True if a user opts into lazy parsing.
*/
bool m_parseLazily = false;

/**
* This is to avoid that the destructor tries flushing again if an error
* happened. Otherwise, this would lead to confusing error messages.
* Initialized as false, set to true after successful construction.
* If flushing results in an error, set this back to false.
* The destructor will only attempt flushing again if this is true.
* In variable-based encoding, all backends except ADIOS2 can only write
* one single iteration. So, we remember if we already had a step,
* and if yes, Parameter<Operation::ADVANCE>::isThisStepMandatory is
* set as true in variable-based encoding.
* The backend will then throw if it has no support for steps.
*/
bool m_lastFlushSuccessful = false;
bool m_wroteAtLeastOneIOStep = false;

/**
* Remember the preference that the backend specified for parsing.
Expand Down Expand Up @@ -188,6 +207,7 @@ class Series : public Attributable
friend class Attributable;
friend class Iteration;
friend class Writable;
friend class ReadIterations;
friend class SeriesIterator;
friend class internal::SeriesData;
friend class WriteIterations;
Expand Down Expand Up @@ -502,6 +522,22 @@ class Series : public Attributable
*/
ReadIterations readIterations();

/**
* @brief Parse the Series.
*
* Only necessary in linear read mode.
* In linear read mode, the Series constructor does not do any IO accesses.
* This call effectively triggers the side effects of
* Series::readIterations(), for use cases where data needs to be accessed
* before iterating through the iterations.
*
* The reason for introducing this restricted alias to
* Series::readIterations() is that the name "readIterations" is misleading
* for that use case: When using IO steps, this call only ensures that the
* first step is parsed.
*/
void parseBase();

/**
* @brief Entry point to the writing end of the streaming API.
*
Expand Down Expand Up @@ -696,7 +732,8 @@ OPENPMD_private

/**
* @brief Called at the end of an IO step to store the iterations defined
* in the IO step to the snapshot attribute.
* in the IO step to the snapshot attribute and to store that at
* least one step was written.
*
* @param doFlush If true, flush the IO handler.
*/
Expand Down
13 changes: 12 additions & 1 deletion src/IO/AbstractIOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,18 @@ namespace openPMD
std::future<void> AbstractIOHandler::flush(internal::FlushParams const &params)
{
internal::ParsedFlushParams parsedParams{params};
auto future = this->flush(parsedParams);
auto future = [this, &parsedParams]() {
try
{
return this->flush(parsedParams);
}
catch (...)
{
m_lastFlushSuccessful = false;
throw;
}
}();
m_lastFlushSuccessful = true;
json::warnGlobalUnusedOptions(parsedParams.backendConfig);
return future;
}
Expand Down
30 changes: 19 additions & 11 deletions src/Iteration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,25 @@ void Iteration::flushGroupBased(
void Iteration::flushVariableBased(
IterationIndex_t i, internal::FlushParams const &flushParams)
{
if (!written())
{
/* create iteration path */
Parameter<Operation::OPEN_PATH> pOpen;
pOpen.path = "";
IOHandler()->enqueue(IOTask(this, pOpen));
}

switch (flushParams.flushLevel)
{
case FlushLevel::CreateOrOpenFiles:
return;
case FlushLevel::SkeletonOnly:
case FlushLevel::InternalFlush:
case FlushLevel::UserFlush:
flush(flushParams);
break;
}

if (!written())
{
/* create iteration path */
Expand All @@ -282,17 +301,6 @@ void Iteration::flushVariableBased(
wAttr.dtype = Datatype::ULONGLONG;
IOHandler()->enqueue(IOTask(this, wAttr));
}

switch (flushParams.flushLevel)
{
case FlushLevel::CreateOrOpenFiles:
break;
case FlushLevel::SkeletonOnly:
case FlushLevel::InternalFlush:
case FlushLevel::UserFlush:
flush(flushParams);
break;
}
}

void Iteration::flush(internal::FlushParams const &flushParams)
Expand Down
Loading
Loading