Skip to content

Commit

Permalink
Pass-through flushing parameters (#1226)
Browse files Browse the repository at this point in the history
* Pass-through flushing parameters

* CI fixes
  • Loading branch information
franzpoeschel authored Mar 14, 2022
1 parent 272b137 commit ff8b413
Show file tree
Hide file tree
Showing 46 changed files with 259 additions and 220 deletions.
4 changes: 2 additions & 2 deletions include/openPMD/IO/ADIOS/ADIOS1IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class OPENPMDAPI_EXPORT ADIOS1IOHandler : public AbstractIOHandler
return "ADIOS1";
}

std::future<void> flush() override;
std::future<void> flush(internal::FlushParams const &) override;

void enqueue(IOTask const &) override;

Expand All @@ -72,7 +72,7 @@ class OPENPMDAPI_EXPORT ADIOS1IOHandler : public AbstractIOHandler
return "DUMMY_ADIOS1";
}

std::future<void> flush() override;
std::future<void> flush(internal::FlushParams const &) override;

private:
std::unique_ptr<ADIOS1IOHandlerImpl> m_impl;
Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/IO/ADIOS/ADIOS1IOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class OPENPMDAPI_EXPORT ADIOS1IOHandlerImpl

virtual void init();

std::future<void> flush() override;
std::future<void> flush();

virtual int64_t open_write(Writable *);
virtual ADIOS_FILE *open_read(std::string const &name);
Expand Down
6 changes: 3 additions & 3 deletions include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class ADIOS2IOHandlerImpl

~ADIOS2IOHandlerImpl() override;

std::future<void> flush() override;
std::future<void> flush(internal::FlushParams const &);

void
createFile(Writable *, Parameter<Operation::CREATE_FILE> const &) override;
Expand Down Expand Up @@ -1262,7 +1262,7 @@ class ADIOS2IOHandler : public AbstractIOHandler
// we must not throw in a destructor
try
{
this->flush();
this->flush(internal::defaultFlushParams);
}
catch (std::exception const &ex)
{
Expand Down Expand Up @@ -1301,6 +1301,6 @@ class ADIOS2IOHandler : public AbstractIOHandler
return "ADIOS2";
}

std::future<void> flush() override;
std::future<void> flush(internal::FlushParams const &) override;
}; // ADIOS2IOHandler
} // namespace openPMD
2 changes: 1 addition & 1 deletion include/openPMD/IO/ADIOS/ParallelADIOS1IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class OPENPMDAPI_EXPORT ParallelADIOS1IOHandler : public AbstractIOHandler
return "MPI_ADIOS1";
}

std::future<void> flush() override;
std::future<void> flush(internal::FlushParams const &) override;
#if openPMD_HAVE_ADIOS1
void enqueue(IOTask const &) override;
#endif
Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/IO/ADIOS/ParallelADIOS1IOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class OPENPMDAPI_EXPORT ParallelADIOS1IOHandlerImpl

virtual void init();

std::future<void> flush() override;
std::future<void> flush();

virtual int64_t open_write(Writable *);
virtual ADIOS_FILE *open_read(std::string const &name);
Expand Down
21 changes: 19 additions & 2 deletions include/openPMD/IO/AbstractIOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ enum class FlushLevel : unsigned char
SkeletonOnly
};

namespace internal
{
/**
* Parameters recursively passed through the openPMD hierarchy when
* flushing.
*
*/
struct FlushParams
{
FlushLevel flushLevel = FlushLevel::InternalFlush;
};

/*
* To be used for reading
*/
constexpr FlushParams defaultFlushParams{};
} // namespace internal

/** Interface for communicating between logical and physically persistent data.
*
* Input and output operations are channeled through a task queue that is
Expand Down Expand Up @@ -123,7 +141,7 @@ class AbstractIOHandler
* @return Future indicating the completion state of the operation for
* backends that decide to implement this operation asynchronously.
*/
virtual std::future<void> flush() = 0;
virtual std::future<void> flush(internal::FlushParams const &) = 0;

/** The currently used backend */
virtual std::string backendName() const = 0;
Expand All @@ -132,7 +150,6 @@ class AbstractIOHandler
Access const m_backendAccess;
Access const m_frontendAccess;
std::queue<IOTask> m_work;
FlushLevel m_flushLevel = FlushLevel::InternalFlush;
}; // AbstractIOHandler

} // namespace openPMD
2 changes: 1 addition & 1 deletion include/openPMD/IO/AbstractIOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class AbstractIOHandlerImpl

virtual ~AbstractIOHandlerImpl() = default;

virtual std::future<void> flush()
std::future<void> flush()
{
using namespace auxiliary;

Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/IO/DummyIOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ class DummyIOHandler : public AbstractIOHandler
/** No-op consistent with the IOHandler interface to enable library use
* without IO.
*/
std::future<void> flush() override;
std::future<void> flush(internal::FlushParams const &) override;
}; // DummyIOHandler
} // namespace openPMD
2 changes: 1 addition & 1 deletion include/openPMD/IO/HDF5/HDF5IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class HDF5IOHandler : public AbstractIOHandler
return "HDF5";
}

std::future<void> flush() override;
std::future<void> flush(internal::FlushParams const &) override;

private:
std::unique_ptr<HDF5IOHandlerImpl> m_impl;
Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/IO/HDF5/ParallelHDF5IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ParallelHDF5IOHandler : public AbstractIOHandler
return "MPI_HDF5";
}

std::future<void> flush() override;
std::future<void> flush(internal::FlushParams const &) override;

private:
std::unique_ptr<ParallelHDF5IOHandlerImpl> m_impl;
Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/IO/JSON/JSONIOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class JSONIOHandler : public AbstractIOHandler
return "JSON";
}

std::future<void> flush() override;
std::future<void> flush(internal::FlushParams const &) override;

private:
JSONIOHandlerImpl m_impl;
Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl

void listAttributes(Writable *, Parameter<Operation::LIST_ATTS> &) override;

std::future<void> flush() override;
std::future<void> flush();

private:
using FILEHANDLE = std::fstream;
Expand Down
9 changes: 5 additions & 4 deletions include/openPMD/Iteration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,11 @@ class Iteration : public Attributable
return *m_iterationData;
}

void flushFileBased(std::string const &, uint64_t);
void flushGroupBased(uint64_t);
void flushVariableBased(uint64_t);
void flush();
void flushFileBased(
std::string const &, uint64_t, internal::FlushParams const &);
void flushGroupBased(uint64_t, internal::FlushParams const &);
void flushVariableBased(uint64_t, internal::FlushParams const &);
void flush(internal::FlushParams const &);
void deferParseAccess(internal::DeferredParseAccess);
/*
* Control flow for read(), readFileBased(), readGroupBased() and
Expand Down
3 changes: 2 additions & 1 deletion include/openPMD/Mesh.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ class Mesh : public BaseRecord<MeshRecordComponent>
private:
Mesh();

void flush_impl(std::string const &) override;
void
flush_impl(std::string const &, internal::FlushParams const &) override;
void read() override;
}; // Mesh

Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/ParticleSpecies.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ParticleSpecies : public Container<Record>
ParticleSpecies();

void read();
void flush(std::string const &) override;
void flush(std::string const &, internal::FlushParams const &) override;

/**
* @brief Check recursively whether this ParticleSpecies is dirty.
Expand Down
3 changes: 2 additions & 1 deletion include/openPMD/Record.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class Record : public BaseRecord<RecordComponent>
private:
Record();

void flush_impl(std::string const &) override;
void
flush_impl(std::string const &, internal::FlushParams const &) override;
void read() override;
}; // Record

Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/RecordComponent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class RecordComponent : public BaseRecordComponent
static constexpr char const *const SCALAR = "\vScalar";

private:
void flush(std::string const &);
void flush(std::string const &, internal::FlushParams const &);
virtual void read();

/**
Expand Down
16 changes: 8 additions & 8 deletions include/openPMD/RecordComponent.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ RecordComponent::storeChunk( Offset o, Extent e, F && createBuffer )
* Flush the openPMD hierarchy to the backend without flushing any actual
* data yet.
*/
seriesFlush( FlushLevel::SkeletonOnly );
seriesFlush({FlushLevel::SkeletonOnly});

size_t size = 1;
for( auto ext : e )
Expand All @@ -305,16 +305,16 @@ RecordComponent::storeChunk( Offset o, Extent e, F && createBuffer )
getBufferView.offset = o;
getBufferView.extent = e;
getBufferView.dtype = getDatatype();
IOHandler()->enqueue( IOTask( this, getBufferView ) );
IOHandler()->flush();
IOHandler()->enqueue(IOTask(this, getBufferView));
IOHandler()->flush(internal::defaultFlushParams);
auto &out = *getBufferView.out;
if( !out.backendManagedBuffer )
if (!out.backendManagedBuffer)
{
auto data = std::forward< F >( createBuffer )( size );
out.ptr = static_cast< void * >( data.get() );
storeChunk( std::move( data ), std::move( o ), std::move( e ) );
auto data = std::forward<F>(createBuffer)(size);
out.ptr = static_cast<void *>(data.get());
storeChunk(std::move(data), std::move(o), std::move(e));
}
return DynamicMemoryView< T >{ std::move( getBufferView ), size, *this };
return DynamicMemoryView<T>{std::move(getBufferView), size, *this};
}

template< typename T >
Expand Down
14 changes: 10 additions & 4 deletions include/openPMD/Series.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,24 +532,30 @@ OPENPMD_private
*
* @param begin Start of the range of iterations to flush.
* @param end End of the range of iterations to flush.
* @param level Flush level, as documented in AbstractIOHandler.hpp.
* @param flushParams Flush params, as documented in AbstractIOHandler.hpp.
* @param flushIOHandler Tasks will always be enqueued to the backend.
* If this flag is true, tasks will be flushed to the backend.
*/
std::future<void> flush_impl(
iterations_iterator begin,
iterations_iterator end,
FlushLevel level,
internal::FlushParams flushParams,
bool flushIOHandler = true);
void flushFileBased(iterations_iterator begin, iterations_iterator end);
void flushFileBased(
iterations_iterator begin,
iterations_iterator end,
internal::FlushParams flushParams);
/*
* Group-based and variable-based iteration layouts share a lot of logic
* (realistically, the variable-based iteration layout only throws out
* one layer in the hierarchy).
* As a convention, methods that deal with both layouts are called
* .*GorVBased, short for .*GroupOrVariableBased
*/
void flushGorVBased(iterations_iterator begin, iterations_iterator end);
void flushGorVBased(
iterations_iterator begin,
iterations_iterator end,
internal::FlushParams flushParams);
void flushMeshesPath();
void flushParticlesPath();
void readFileBased();
Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/Span.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class DynamicMemoryView
// might need to update
m_recordComponent.IOHandler()->enqueue(
IOTask(&m_recordComponent, m_param));
m_recordComponent.IOHandler()->flush();
m_recordComponent.IOHandler()->flush(internal::defaultFlushParams);
}
return Span<T>{static_cast<T *>(m_param.out->ptr), m_size};
}
Expand Down
4 changes: 2 additions & 2 deletions include/openPMD/backend/Attributable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,9 @@ OPENPMD_protected
Iteration &containingIteration();
/** @} */

void seriesFlush(FlushLevel);
void seriesFlush(internal::FlushParams);

void flushAttributes();
void flushAttributes(internal::FlushParams const &);
enum ReadMode
{
/**
Expand Down
18 changes: 10 additions & 8 deletions include/openPMD/backend/BaseRecord.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ class BaseRecord : public Container<T_elem>
void readBase();

private:
void flush(std::string const &) final;
virtual void flush_impl(std::string const &) = 0;
void flush(std::string const &, internal::FlushParams const &) final;
virtual void
flush_impl(std::string const &, internal::FlushParams const &) = 0;
virtual void read() = 0;

/**
Expand Down Expand Up @@ -250,7 +251,7 @@ BaseRecord<T_elem>::erase(key_type const &key)
Parameter<Operation::DELETE_DATASET> dDelete;
dDelete.name = ".";
this->IOHandler()->enqueue(IOTask(&rc, dDelete));
this->IOHandler()->flush();
this->IOHandler()->flush(internal::defaultFlushParams);
}
res = Container<T_elem>::erase(key);
}
Expand Down Expand Up @@ -280,7 +281,7 @@ BaseRecord<T_elem>::erase(iterator res)
Parameter<Operation::DELETE_DATASET> dDelete;
dDelete.name = ".";
this->IOHandler()->enqueue(IOTask(&rc, dDelete));
this->IOHandler()->flush();
this->IOHandler()->flush(internal::defaultFlushParams);
}
ret = Container<T_elem>::erase(res);
}
Expand Down Expand Up @@ -315,7 +316,7 @@ inline void BaseRecord<T_elem>::readBase()

aRead.name = "unitDimension";
this->IOHandler()->enqueue(IOTask(this, aRead));
this->IOHandler()->flush();
this->IOHandler()->flush(internal::defaultFlushParams);
if (*aRead.dtype == DT::ARR_DBL_7)
this->setAttribute(
"unitDimension",
Expand All @@ -340,7 +341,7 @@ inline void BaseRecord<T_elem>::readBase()

aRead.name = "timeOffset";
this->IOHandler()->enqueue(IOTask(this, aRead));
this->IOHandler()->flush();
this->IOHandler()->flush(internal::defaultFlushParams);
if (*aRead.dtype == DT::FLOAT)
this->setAttribute(
"timeOffset", Attribute(*aRead.resource).template get<float>());
Expand All @@ -353,15 +354,16 @@ inline void BaseRecord<T_elem>::readBase()
}

template <typename T_elem>
inline void BaseRecord<T_elem>::flush(std::string const &name)
inline void BaseRecord<T_elem>::flush(
std::string const &name, internal::FlushParams const &flushParams)
{
if (!this->written() && this->empty())
throw std::runtime_error(
"A Record can not be written without any contained "
"RecordComponents: " +
name);

this->flush_impl(name);
this->flush_impl(name, flushParams);
// flush_impl must take care to correctly set the dirty() flag so this
// method doesn't do it
}
Expand Down
Loading

0 comments on commit ff8b413

Please sign in to comment.