diff --git a/include/openPMD/Dataset.hpp b/include/openPMD/Dataset.hpp index 89ff082f76..cef34a244a 100644 --- a/include/openPMD/Dataset.hpp +++ b/include/openPMD/Dataset.hpp @@ -40,6 +40,14 @@ class Dataset public: Dataset(Datatype, Extent, std::string options = "{}"); + /** + * @brief Constructor that sets the datatype to undefined. + * + * Helpful for resizing datasets, since datatypes need not be given twice. + * + */ + Dataset( Extent ); + Dataset& extend(Extent newExtent); Dataset& setChunkSize(Extent const&); Dataset& setCompression(std::string const&, uint8_t const); diff --git a/include/openPMD/IO/IOTask.hpp b/include/openPMD/IO/IOTask.hpp index da83b4303b..3e05f31a5c 100644 --- a/include/openPMD/IO/IOTask.hpp +++ b/include/openPMD/IO/IOTask.hpp @@ -281,8 +281,7 @@ template<> struct OPENPMDAPI_EXPORT Parameter< Operation::EXTEND_DATASET > : public AbstractParameter { Parameter() = default; - Parameter(Parameter const & p) : AbstractParameter(), - name(p.name), extent(p.extent) {} + Parameter(Parameter const & p) : AbstractParameter(), extent(p.extent) {} std::unique_ptr< AbstractParameter > clone() const override @@ -291,7 +290,6 @@ struct OPENPMDAPI_EXPORT Parameter< Operation::EXTEND_DATASET > : public Abstrac new Parameter< Operation::EXTEND_DATASET >(*this)); } - std::string name = ""; Extent extent = {}; }; diff --git a/include/openPMD/RecordComponent.hpp b/include/openPMD/RecordComponent.hpp index c3432ca55a..e5c33237b8 100644 --- a/include/openPMD/RecordComponent.hpp +++ b/include/openPMD/RecordComponent.hpp @@ -100,7 +100,26 @@ class RecordComponent : public BaseRecordComponent RecordComponent& setUnitSI(double); - RecordComponent& resetDataset(Dataset); + /** + * @brief Declare the dataset's type and extent. + * + * Calling this again after flushing will require resizing the dataset. + * Support for this depends on the backend. + * Unsupported are: + * * Changing the datatype. + * * Shrinking any dimension's extent. + * * Changing the number of dimensions. + * + * Backend support for resizing datasets: + * * JSON: Supported + * * ADIOS1: Unsupported + * * ADIOS2: Supported as of ADIOS2 2.7.0 + * * HDF5: (Currently) unsupported. + * Will be probably supported as soon as chunking is supported in HDF5. + * + * @return RecordComponent& + */ + RecordComponent & resetDataset( Dataset ); uint8_t getDimensionality() const; Extent getExtent() const; @@ -196,6 +215,10 @@ class RecordComponent : public BaseRecordComponent std::shared_ptr< std::queue< IOTask > > m_chunks; std::shared_ptr< Attribute > m_constantValue; std::shared_ptr< bool > m_isEmpty = std::make_shared< bool >( false ); + // User has extended the dataset, but the EXTEND task must yet be flushed + // to the backend + std::shared_ptr< bool > m_hasBeenExtended = + std::make_shared< bool >( false ); private: void flush(std::string const&); diff --git a/src/Dataset.cpp b/src/Dataset.cpp index 9f834d1ea9..21be4fa3f2 100644 --- a/src/Dataset.cpp +++ b/src/Dataset.cpp @@ -34,8 +34,12 @@ Dataset::Dataset(Datatype d, Extent e, std::string options_in) options{std::move(options_in)} { } -Dataset& -Dataset::extend(Extent newExtents) +Dataset::Dataset( Extent e ) : Dataset( Datatype::UNDEFINED, std::move( e ) ) +{ +} + +Dataset & +Dataset::extend( Extent newExtents ) { if( newExtents.size() != rank ) throw std::runtime_error("Dimensionality of extended Dataset must match the original dimensionality"); diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index 64509a65ec..df46ab5ee5 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -403,15 +403,58 @@ void ADIOS2IOHandlerImpl::createDataset( } } -void ADIOS2IOHandlerImpl::extendDataset( - Writable *, const Parameter< Operation::EXTEND_DATASET > & ) +namespace detail +{ + struct DatasetExtender + { + template< typename T, typename... Args > + void + operator()( + adios2::IO & IO, + std::string const & variable, + Extent const & newShape ) + { + auto var = IO.InquireVariable< T >( variable ); + if( !var ) + { + throw std::runtime_error( + "[ADIOS2] Unable to retrieve variable for resizing: '" + + variable + "'." ); + } + adios2::Dims dims; + dims.reserve( newShape.size() ); + for( auto ext : newShape ) + { + dims.push_back( ext ); + } + var.SetShape( dims ); + } + + std::string errorMsg = "ADIOS2: extendDataset()"; + }; +} // namespace detail + +void +ADIOS2IOHandlerImpl::extendDataset( + Writable * writable, + const Parameter< Operation::EXTEND_DATASET > & parameters ) { - throw std::runtime_error( - "[ADIOS2] Dataset extension not implemented in ADIOS backend" ); + VERIFY_ALWAYS( + m_handler->m_backendAccess != Access::READ_ONLY, + "[ADIOS2] Cannot extend datasets in read-only mode." ); + setAndGetFilePosition( writable ); + auto file = refreshFileFromParent( writable ); + std::string name = nameOfVariable( writable ); + auto & filedata = getFileData( file ); + static detail::DatasetExtender de; + Datatype dt = detail::fromADIOS2Type( filedata.m_IO.VariableType( name ) ); + switchAdios2VariableType( dt, de, filedata.m_IO, name, parameters.extent ); } -void ADIOS2IOHandlerImpl::openFile( - Writable * writable, const Parameter< Operation::OPEN_FILE > & parameters ) +void +ADIOS2IOHandlerImpl::openFile( + Writable * writable, + const Parameter< Operation::OPEN_FILE > & parameters ) { if ( !auxiliary::directory_exists( m_handler->directory ) ) { diff --git a/src/IO/HDF5/HDF5IOHandler.cpp b/src/IO/HDF5/HDF5IOHandler.cpp index 2ac8b1a327..3aa32453b8 100644 --- a/src/IO/HDF5/HDF5IOHandler.cpp +++ b/src/IO/HDF5/HDF5IOHandler.cpp @@ -346,25 +346,36 @@ HDF5IOHandlerImpl::extendDataset(Writable* writable, if( !writable->written ) throw std::runtime_error("[HDF5] Extending an unwritten Dataset is not possible."); - auto file = getFile(writable->parent).get(); - hid_t node_id, dataset_id; - node_id = H5Gopen(file.id, - concrete_h5_file_position(writable->parent).c_str(), - H5P_DEFAULT); - VERIFY(node_id >= 0, "[HDF5] Internal error: Failed to open HDF5 group during dataset extension"); - - /* Sanitize name */ - std::string name = parameters.name; - if( auxiliary::starts_with(name, '/') ) - name = auxiliary::replace_first(name, "/", ""); - if( !auxiliary::ends_with(name, '/') ) - name += '/'; - - dataset_id = H5Dopen(node_id, - name.c_str(), + auto res = getFile( writable ); + if( !res ) + res = getFile( writable->parent ); + hid_t dataset_id = H5Dopen(res.get().id, + concrete_h5_file_position(writable).c_str(), H5P_DEFAULT); VERIFY(dataset_id >= 0, "[HDF5] Internal error: Failed to open HDF5 dataset during dataset extension"); + // Datasets may only be extended if they have chunked layout, so let's see + // whether this one does + { + hid_t dataset_space = H5Dget_space( dataset_id ); + int ndims = H5Sget_simple_extent_ndims( dataset_space ); + VERIFY( + ndims >= 0, + "[HDF5]: Internal error: Failed to retrieve dimensionality of " + "dataset " + "during dataset read." ); + hid_t propertyList = H5Dget_create_plist( dataset_id ); + std::vector< hsize_t > chunkExtent( ndims, 0 ); + int chunkDimensionality = + H5Pget_chunk( propertyList, ndims, chunkExtent.data() ); + if( chunkDimensionality < 0 ) + { + throw std::runtime_error( + "[HDF5] Cannot extend datasets unless written with chunked " + "layout (currently unsupported)." ); + } + } + std::vector< hsize_t > size; for( auto const& val : parameters.extent ) size.push_back(static_cast< hsize_t >(val)); @@ -375,8 +386,6 @@ HDF5IOHandlerImpl::extendDataset(Writable* writable, status = H5Dclose(dataset_id); VERIFY(status == 0, "[HDF5] Internal error: Failed to close HDF5 dataset during dataset extension"); - status = H5Gclose(node_id); - VERIFY(status == 0, "[HDF5] Internal error: Failed to close HDF5 group during dataset extension"); } void diff --git a/src/IO/JSON/JSONIOHandlerImpl.cpp b/src/IO/JSON/JSONIOHandlerImpl.cpp index d9bf154d3c..dc9298ce3d 100644 --- a/src/IO/JSON/JSONIOHandlerImpl.cpp +++ b/src/IO/JSON/JSONIOHandlerImpl.cpp @@ -248,18 +248,42 @@ namespace openPMD } } + namespace + { + void + mergeInto( nlohmann::json & into, nlohmann::json & from ); + void + mergeInto( nlohmann::json & into, nlohmann::json & from ) + { + if( !from.is_array() ) + { + into = from; // copy + } + else + { + size_t size = from.size(); + for( size_t i = 0; i < size; ++i ) + { + if( !from[ i ].is_null() ) + { + mergeInto( into[ i ], from[ i ] ); + } + } + } + } + } // namespace - void JSONIOHandlerImpl::extendDataset( + void + JSONIOHandlerImpl::extendDataset( Writable * writable, - Parameter< Operation::EXTEND_DATASET > const & parameters - ) + Parameter< Operation::EXTEND_DATASET > const & parameters ) { - VERIFY_ALWAYS(m_handler->m_backendAccess != Access::READ_ONLY, + VERIFY_ALWAYS( + m_handler->m_backendAccess != Access::READ_ONLY, "[JSON] Cannot extend a dataset in read-only mode." ) - refreshFileFromParent( writable ); setAndGetFilePosition( writable ); - auto name = removeSlashes( parameters.name ); - auto & j = obtainJsonContents( writable )[name]; + refreshFileFromParent( writable ); + auto & j = obtainJsonContents( writable ); try { @@ -280,7 +304,8 @@ namespace openPMD } } catch( json::basic_json::type_error & ) { - throw std::runtime_error( "[JSON] The specified location contains no valid dataset" ); + throw std::runtime_error( + "[JSON] The specified location contains no valid dataset" ); } switch( stringToDatatype( j[ "datatype" ].get< std::string >() ) ) { @@ -288,17 +313,23 @@ namespace openPMD case Datatype::CDOUBLE: case Datatype::CLONG_DOUBLE: { + // @todo test complex resizing auto complexExtent = parameters.extent; complexExtent.push_back( 2 ); - j["data"] = initializeNDArray( complexExtent ); + nlohmann::json newData = initializeNDArray( complexExtent ); + nlohmann::json & oldData = j[ "data" ]; + mergeInto( newData, oldData ); + j[ "data" ] = newData; break; } default: - j["data"] = initializeNDArray( parameters.extent ); + nlohmann::json newData = initializeNDArray( parameters.extent ); + nlohmann::json & oldData = j[ "data" ]; + mergeInto( newData, oldData ); + j[ "data" ] = newData; break; } writable->written = true; - } namespace diff --git a/src/RecordComponent.cpp b/src/RecordComponent.cpp index 1909233f9f..61fddca4d9 100644 --- a/src/RecordComponent.cpp +++ b/src/RecordComponent.cpp @@ -52,17 +52,36 @@ RecordComponent & RecordComponent::resetDataset( Dataset d ) { if( written() ) - throw std::runtime_error( "A record's Dataset cannot (yet) be changed " - "after it has been written." ); - //if( d.extent.empty() ) + { + if( d.dtype == Datatype::UNDEFINED ) + { + d.dtype = m_dataset->dtype; + } + else if( d.dtype != m_dataset->dtype ) + { + throw std::runtime_error( + "Cannot change the datatype of a dataset." ); + } + *m_hasBeenExtended = true; + } + // if( d.extent.empty() ) // throw std::runtime_error("Dataset extent must be at least 1D."); if( std::any_of( d.extent.begin(), d.extent.end(), []( Extent::value_type const & i ) { return i == 0u; } ) ) - return makeEmpty( std::move(d) ); + return makeEmpty( std::move( d ) ); + + *m_isEmpty = false; + if( written() ) + { + m_dataset->extend( std::move( d.extent ) ); + } + else + { + *m_dataset = std::move( d ); + } - *m_dataset = std::move(d); dirty() = true; return *this; } @@ -109,20 +128,41 @@ RecordComponent& RecordComponent::makeEmpty( Dataset d ) { if( written() ) - throw std::runtime_error( - "A RecordComponent cannot (yet) be made" - " empty after it has been written."); - if( d.extent.size() == 0 ) - throw std::runtime_error("Dataset extent must be at least 1D."); + { + if( !constant() ) + { + throw std::runtime_error( + "An empty record component's extent can only be changed" + " in case it has been initialized as an empty or constant" + " record component." ); + } + if( d.dtype == Datatype::UNDEFINED ) + { + d.dtype = m_dataset->dtype; + } + else if( d.dtype != m_dataset->dtype ) + { + throw std::runtime_error( + "Cannot change the datatype of a dataset." ); + } + m_dataset->extend( std::move( d.extent ) ); + *m_hasBeenExtended = true; + } + else + { + *m_dataset = std::move( d ); + } + + if( m_dataset->extent.size() == 0 ) + throw std::runtime_error( "Dataset extent must be at least 1D." ); *m_isEmpty = true; - *m_dataset = std::move(d); dirty() = true; - static detail::DefaultValue< RecordComponent > dv; - switchType( - m_dataset->dtype, - dv, - *this ); + if( !written() ) + { + static detail::DefaultValue< RecordComponent > dv; + switchType( m_dataset->dtype, dv, *this ); + } return *this; } @@ -175,6 +215,26 @@ RecordComponent::flush(std::string const& name) } } + if( *m_hasBeenExtended ) + { + if( constant() ) + { + Parameter< Operation::WRITE_ATT > aWrite; + aWrite.name = "shape"; + Attribute a( getExtent() ); + aWrite.dtype = a.dtype; + aWrite.resource = a.getResource(); + IOHandler()->enqueue( IOTask( this, aWrite ) ); + } + else + { + Parameter< Operation::EXTEND_DATASET > pExtend; + pExtend.extent = m_dataset->extent; + IOHandler()->enqueue( IOTask( this, std::move( pExtend ) ) ); + *m_hasBeenExtended = false; + } + } + while( !m_chunks->empty() ) { IOHandler()->enqueue(m_chunks->front()); diff --git a/test/ParallelIOTest.cpp b/test/ParallelIOTest.cpp index f62a8ad2c3..adbc894f1c 100644 --- a/test/ParallelIOTest.cpp +++ b/test/ParallelIOTest.cpp @@ -371,6 +371,63 @@ TEST_CASE( "available_chunks_test", "[parallel][adios]" ) available_chunks_test( "bp" ); } +void +extendDataset( std::string const & ext ) +{ + std::string filename = "../samples/parallelExtendDataset." + ext; + int r_mpi_rank{ -1 }, r_mpi_size{ -1 }; + MPI_Comm_rank( MPI_COMM_WORLD, &r_mpi_rank ); + MPI_Comm_size( MPI_COMM_WORLD, &r_mpi_size ); + unsigned mpi_rank{ static_cast< unsigned >( r_mpi_rank ) }, + mpi_size{ static_cast< unsigned >( r_mpi_size ) }; + std::vector< int > data1( 25 ); + std::vector< int > data2( 25 ); + std::iota( data1.begin(), data1.end(), 0 ); + std::iota( data2.begin(), data2.end(), 25 ); + { + Series write( filename, Access::CREATE, MPI_COMM_WORLD ); + if( ext == "bp" && write.backend() != "ADIOS2" ) + { + // dataset resizing unsupported in ADIOS1 + return; + } + Dataset ds1{ Datatype::INT, { mpi_size, 25 } }; + Dataset ds2{ { mpi_size, 50 } }; + + // array record component -> array record component + // should work + auto E_x = write.iterations[ 0 ].meshes[ "E" ][ "x" ]; + E_x.resetDataset( ds1 ); + E_x.storeChunk( data1, { mpi_rank, 0 }, { 1, 25 } ); + write.flush(); + + E_x.resetDataset( ds2 ); + E_x.storeChunk( data2, { mpi_rank, 25 }, { 1, 25 } ); + write.flush(); + } + + MPI_Barrier( MPI_COMM_WORLD ); + + { + Series read( filename, Access::READ_ONLY ); + auto E_x = read.iterations[ 0 ].meshes[ "E" ][ "x" ]; + REQUIRE( E_x.getExtent() == Extent{ mpi_size, 50 } ); + auto chunk = E_x.loadChunk< int >( { 0, 0 }, { mpi_size, 50 } ); + read.flush(); + for( size_t rank = 0; rank < mpi_size; ++rank ) + { + for( size_t i = 0; i < 50; ++i ) + { + REQUIRE( chunk.get()[ i ] == int( i ) ); + } + } + } +} + +TEST_CASE( "extend_dataset", "[parallel]" ) +{ + extendDataset( "bp" ); +} #endif #if openPMD_HAVE_ADIOS1 && openPMD_HAVE_MPI diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index decd7cf8fb..f2593c4890 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -3520,3 +3520,139 @@ TEST_CASE( "iterate_nonstreaming_series", "[serial][adios2]" ) "../samples/iterate_nonstreaming_series_groupbased." + t ); } } + +void +extendDataset( std::string const & ext ) +{ + std::string filename = "../samples/extendDataset." + ext; + std::vector< int > data1( 25 ); + std::vector< int > data2( 25 ); + std::iota( data1.begin(), data1.end(), 0 ); + std::iota( data2.begin(), data2.end(), 25 ); + { + Series write( filename, Access::CREATE ); + if( ext == "bp" && write.backend() != "ADIOS2" ) + { + // dataset resizing unsupported in ADIOS1 + return; + } + Dataset ds1{ Datatype::INT, { 5, 5 } }; + Dataset ds2{ Datatype::INT, { 10, 5 } }; + + // array record component -> array record component + // should work + auto E_x = write.iterations[ 0 ].meshes[ "E" ][ "x" ]; + E_x.resetDataset( ds1 ); + E_x.storeChunk( data1, { 0, 0 }, { 5, 5 } ); + write.flush(); + + E_x.resetDataset( ds2 ); + E_x.storeChunk( data2, { 5, 0 }, { 5, 5 } ); + + // constant record component -> constant record component + // should work + auto E_y = write.iterations[ 0 ].meshes[ "E" ][ "y" ]; + E_y.resetDataset( ds1 ); + E_y.makeConstant( 10 ); + write.flush(); + + E_y.resetDataset( ds2 ); + write.flush(); + + // empty record component -> empty record component + // should work + // this does not make a lot of sense since we don't allow shrinking, + // but let's just reset it to itself + auto E_z = write.iterations[ 0 ].meshes[ "E" ][ "z" ]; + E_z.makeEmpty< int >( 3 ); + write.flush(); + + E_z.makeEmpty< int >( 3 ); + write.flush(); + + // empty record component -> empty record component + // (created by resetDataset) + // should work + auto E_a = write.iterations[ 0 ].meshes[ "E" ][ "a" ]; + E_a.makeEmpty< int >( 3 ); + write.flush(); + + E_a.resetDataset( Dataset( Datatype::UNDEFINED, { 0, 1, 2 } ) ); + write.flush(); + + // constant record component -> empty record component + // should fail, since this implies shrinking + auto E_b = write.iterations[ 0 ].meshes[ "E" ][ "b" ]; + E_b.resetDataset( ds1 ); + E_b.makeConstant( 10 ); + write.flush(); + + REQUIRE_THROWS( E_b.makeEmpty< int >( 2 ) ); + + // empty record component -> constant record component + // should work + auto E_c = write.iterations[ 0 ].meshes[ "E" ][ "c" ]; + E_c.makeEmpty< int >( 3 ); + write.flush(); + + E_c.resetDataset( Dataset( { 1, 1, 2 } ) ); + write.flush(); + + // array record component -> constant record component + // should fail + auto E_d = write.iterations[ 0 ].meshes[ "E" ][ "d" ]; + E_d.resetDataset( ds1 ); + E_d.storeChunk( data1, { 0, 0 }, { 5, 5 } ); + write.flush(); + + REQUIRE_THROWS( E_d.makeConstant( 5 ) ); + + // array record component -> empty record component + // should fail + auto E_e = write.iterations[ 0 ].meshes[ "E" ][ "e" ]; + E_e.resetDataset( ds1 ); + E_e.storeChunk( data1, { 0, 0 }, { 5, 5 } ); + write.flush(); + + REQUIRE_THROWS( E_e.makeEmpty< int >( 5 ) ); + } + + { + Series read( filename, Access::READ_ONLY ); + auto E_x = read.iterations[ 0 ].meshes[ "E" ][ "x" ]; + REQUIRE( E_x.getExtent() == Extent{ 10, 5 } ); + auto chunk = E_x.loadChunk< int >( { 0, 0 }, { 10, 5 } ); + read.flush(); + + for( size_t i = 0; i < 50; ++i ) + { + REQUIRE( chunk.get()[ i ] == int( i ) ); + } + + auto E_y = read.iterations[ 0 ].meshes[ "E" ][ "y" ]; + REQUIRE( E_y.getExtent() == Extent{ 10, 5 } ); + + auto E_z = read.iterations[ 0 ].meshes[ "E" ][ "z" ]; + REQUIRE( E_z.getExtent() == Extent{ 0, 0, 0 } ); + + auto E_a = read.iterations[ 0 ].meshes[ "E" ][ "a" ]; + REQUIRE( E_a.getExtent() == Extent{ 0, 1, 2 } ); + + // E_b could not be changed + + auto E_c = read.iterations[ 0 ].meshes[ "E" ][ "c" ]; + REQUIRE( E_c.getExtent() == Extent{ 1, 1, 2 } ); + REQUIRE( !E_c.empty() ); + } +} + +TEST_CASE( "extend_dataset", "[serial]" ) +{ + extendDataset( "json" ); +#if openPMD_HAVE_ADIOS2 + extendDataset( "bp" ); +#endif +#if openPMD_HAVE_HDF5 + // extendDataset( "h5" ); +#endif +}