Skip to content

Commit

Permalink
Iteration::open() (openPMD#862)
Browse files Browse the repository at this point in the history
* Test: non-collective, parallel read
* Iteration::open()
* Enforce strict opening of files in ADIOS2
* use general flush implementation
* file_based_write_read: Skip ADIOS1
* Skip ADIOS1 coverage for now, it's blocking the release.

Co-authored-by: Franz Pöschel <franz.poeschel@gmail.com>
  • Loading branch information
ax3l and franzpoeschel committed Jan 29, 2021
1 parent d655e99 commit 29c1e14
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 1 deletion.
3 changes: 3 additions & 0 deletions docs/source/details/mpi.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Functionality Behavior Description
``Series`` **collective** open and close
``::flush()`` **collective** read and write
``Iteration`` [1]_ independent declare and open
``::open()`` [3]_ **collective** explicit open
``Mesh`` [1]_ independent declare, open, write
``ParticleSpecies`` [1]_ independent declare, open, write
``::setAttribute`` [2]_ *backend-specific* declare, write
Expand All @@ -33,6 +34,8 @@ Functionality Behavior Description
.. [2] :ref:`HDF5 <backends-hdf5>` only supports collective attribute definitions/writes; :ref:`ADIOS1 <backends-adios1>` and :ref:`ADIOS2 <backends-adios2>` attributes can be written independently.
If you want to support all backends equally, treat as a collective operation.
.. [3] We usually open iterations delayed on first access. This first access is usually the ``flush()`` call after a ``storeChunk``/``loadChunk`` operation. If the first access is non-collective, an explicit, collective ``Iteration::open()`` can be used to have the files already open.
.. tip::

Just because an operation is independent does not mean it is allowed to be inconsistent.
Expand Down
13 changes: 13 additions & 0 deletions include/openPMD/Iteration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ class Iteration : public Attributable
Iteration &
close( bool flush = true );

/** Open an iteration
*
* Explicitly open an iteration.
* Usually, file-open operations are delayed until the first load/storeChunk
* operation is flush-ed. In parallel contexts where it is know that such a
* first access needs to be run non-collectively, one can explicitly open
* an iteration through this collective call.
*
* @return Reference to iteration.
*/
Iteration &
open();

/**
* @brief Has the iteration been closed?
* A closed iteration may not (yet) be reopened.
Expand Down
4 changes: 4 additions & 0 deletions src/IO/ADIOS/ADIOS2IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,10 @@ void ADIOS2IOHandlerImpl::openFile(

writable->written = true;
writable->abstractFilePosition = std::make_shared< ADIOS2FilePosition >( );

// enforce opening the file
// lazy opening is deathly in parallel situations
getFileData( file );
}

void
Expand Down
18 changes: 18 additions & 0 deletions src/Iteration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "openPMD/Datatype.hpp"
#include "openPMD/Series.hpp"
#include "openPMD/auxiliary/DerefDynamicCast.hpp"
#include "openPMD/auxiliary/Filesystem.hpp"
#include "openPMD/auxiliary/StringManip.hpp"
#include "openPMD/backend/Writable.hpp"

Expand Down Expand Up @@ -174,6 +175,23 @@ Iteration::close( bool _flush )
return *this;
}

Iteration &
Iteration::open()
{
Series * s = &auxiliary::deref_dynamic_cast< Series >(
parent->attributable->parent->attributable );
// figure out my iteration number
auto begin = s->indexOf( *this );
auto end = begin;
++end;
// set dirty, so Series::flush will open the file
this->dirty() = true;
s->flush_impl( begin, end );
this->dirty() = false;

return *this;
}

bool
Iteration::closed() const
{
Expand Down
1 change: 1 addition & 0 deletions src/binding/python/Iteration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ void init_Iteration(py::module &m) {
.def_property("dt", &Iteration::dt<double>, &Iteration::setDt<double>)
.def_property("dt", &Iteration::dt<long double>, &Iteration::setDt<long double>)
.def_property("time_unit_SI", &Iteration::timeUnitSI, &Iteration::setTimeUnitSI)
.def("open", &Iteration::open)
.def("close", &Iteration::close, py::arg("flush") = true)

// TODO remove in future versions (deprecated)
Expand Down
99 changes: 98 additions & 1 deletion test/ParallelIOTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,103 @@ TEST_CASE( "close_iteration_test", "[parallel]" )
}
}

void
file_based_write_read( std::string file_ending )
{
namespace io = openPMD;

// the iterations we want to write
std::vector< int > iterations = { 10, 30, 50, 70 };

// MPI communicator meta-data and file name
int i_mpi_rank{ -1 }, i_mpi_size{ -1 };
MPI_Comm_rank( MPI_COMM_WORLD, &i_mpi_rank );
MPI_Comm_size( MPI_COMM_WORLD, &i_mpi_size );
unsigned mpi_rank{ static_cast< unsigned >( i_mpi_rank ) },
mpi_size{ static_cast< unsigned >( i_mpi_size ) };
std::string name = "../samples/file_based_write_read_%05T." + file_ending;

// data (we just use the same data for each step for demonstration)
// we assign 10 longitudinal cells & 300 transversal cells per rank here
unsigned const local_Nz = 10u;
unsigned const global_Nz = local_Nz * mpi_size;
unsigned const global_Nx = 300u;
using precision = double;
std::vector< precision > E_x_data( global_Nx * local_Nz );
// filling some values: 0, 1, ...
std::iota( E_x_data.begin(), E_x_data.end(), local_Nz * mpi_rank);
std::transform(E_x_data.begin(), E_x_data.end(), E_x_data.begin(),
[](precision d) -> precision { return std::sin( d * 2.0 * 3.1415 / 20. ); });

{
// open a parallel series
Series series(name, Access::CREATE, MPI_COMM_WORLD);
series.setIterationEncoding(IterationEncoding::fileBased);

int const last_step = 100;
for (int step = 0; step < last_step; ++step) {
MPI_Barrier(MPI_COMM_WORLD);

// is this an output step?
bool const rank_in_output_step =
std::find(iterations.begin(), iterations.end(), step) != iterations.end();
if (!rank_in_output_step) continue;

// now we write (parallel, independent I/O)
auto it = series.iterations[step];
auto E = it.meshes["E"]; // record
auto E_x = E["x"]; // record component

// some meta-data
E.setAxisLabels({"z", "x"});
E.setGridSpacing<double>({1.0, 1.0});
E.setGridGlobalOffset({0.0, 0.0});
E_x.setPosition<double>({0.0, 0.0});

// update values
std::iota(E_x_data.begin(), E_x_data.end(), local_Nz * mpi_rank);
std::transform(E_x_data.begin(), E_x_data.end(), E_x_data.begin(),
[&step](precision d) -> precision {
return std::sin(d * 2.0 * 3.1415 / 100. + step);
});

auto dataset = io::Dataset(
io::determineDatatype<precision>(),
{global_Nx, global_Nz});
E_x.resetDataset(dataset);

Offset chunk_offset = {0, local_Nz * mpi_rank};
Extent chunk_extent = {global_Nx, local_Nz};
E_x.storeChunk(
io::shareRaw(E_x_data),
chunk_offset, chunk_extent);
series.flush();
}
}

// check non-collective, parallel read
{
Series read( name, Access::READ_ONLY, MPI_COMM_WORLD );
Iteration it = read.iterations[ 30 ];
it.open(); // collective
bool isAdios1 = read.backend() == "MPI_ADIOS1"; // FIXME: this is an ADIOS1 backend bug
if( mpi_rank == 0 || isAdios1 ) // non-collective branch (unless ADIOS1)
{
auto E_x = it.meshes["E"]["x"];
auto data = E_x.loadChunk< double >();
read.flush();
}
}
}

TEST_CASE( "file_based_write_read", "[parallel]" )
{
for( auto const & t : getBackends() )
{
file_based_write_read( t );
}
}

void
hipace_like_write( std::string file_ending )
{
Expand Down Expand Up @@ -772,4 +869,4 @@ TEST_CASE( "adios2_streaming", "[pseudoserial][adios2]" )
{
adios2_streaming();
}
#endif
#endif

0 comments on commit 29c1e14

Please sign in to comment.