Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix particle patches flush api #1626

Merged
merged 4 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion examples/12_span_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,28 @@ void span_write(std::string const &filename)
for (size_t i = 0; i < 10; ++i)
{
Iteration iteration = iterations[i];
Record electronPositions = iteration.particles["e"]["position"];
auto patches = iteration.particles["e"].particlePatches;

for (auto record : {"offset", "extent"})
{
for (auto component : {"x", "y", "z"})
{
patches[record][component].resetDataset(
{Datatype::DOUBLE, {1}});
*patches[record][component]
.storeChunk<double>({0}, {1})
.currentBuffer()
.data() = 4.2;
}
}
for (auto record : {"numParticlesOffset", "numParticles"})
{
patches[record].resetDataset({Datatype::INT, {1}});
*patches[record].storeChunk<int>({0}, {1}).currentBuffer().data() =
42;
}

Record electronPositions = iteration.particles["e"]["position"];
size_t j = 0;
for (auto const &dim : {"x", "y", "z"})
{
Expand Down
10 changes: 10 additions & 0 deletions include/openPMD/IO/AbstractIOHandlerImplCommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include "openPMD/auxiliary/StringManip.hpp"
#include "openPMD/backend/Writable.hpp"

#include <stdexcept>
#include <string>
#include <unordered_map>
#include <unordered_set>

Expand Down Expand Up @@ -200,6 +202,14 @@ AbstractIOHandlerImplCommon<FilePositionType>::refreshFileFromParent(
Writable *writable, bool preferParentFile)
{
auto getFileFromParent = [writable, this]() {
auto file_it = m_files.find(writable->parent);
if (file_it == m_files.end())
{
std::stringstream s;
s << "Parent Writable " << writable->parent << " of Writable "
<< writable << " has no associated file.";
throw std::runtime_error(s.str());
}
auto file = m_files.find(writable->parent)->second;
associateWithFile(writable, file);
return file;
Expand Down
5 changes: 0 additions & 5 deletions include/openPMD/ParticleSpecies.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ namespace traits
void operator()(T &ret)
{
ret.particlePatches.linkHierarchy(ret.writable());

auto &np = ret.particlePatches["numParticles"];
np.resetDataset(Dataset(determineDatatype<uint64_t>(), {1}));
auto &npo = ret.particlePatches["numParticlesOffset"];
npo.resetDataset(Dataset(determineDatatype<uint64_t>(), {1}));
}
};
} // namespace traits
Expand Down
5 changes: 1 addition & 4 deletions src/ParticleSpecies.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,7 @@ namespace
{
bool flushParticlePatches(ParticlePatches const &particlePatches)
{
return particlePatches.find("numParticles") != particlePatches.end() &&
particlePatches.find("numParticlesOffset") !=
particlePatches.end() &&
particlePatches.size() >= 3;
return !particlePatches.empty();
}
} // namespace

Expand Down
33 changes: 0 additions & 33 deletions src/binding/python/openpmd_api/pipe/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,28 +166,6 @@ def __init__(self, source, dynamicView, offset, extent):
self.extent = extent


class particle_patch_load:
"""
A deferred load/store operation for a particle patch.
Our particle-patch API requires that users pass a concrete value for
storing, even if the actual write operation occurs much later at
series.flush().
So, unlike other record components, we cannot call .store_chunk() with
a buffer that has not yet been filled, but must wait until the point where
we actual have the data at hand already.
In short: calling .store() must be deferred, until the data has been fully
read from the sink.
This class stores the needed parameters to .store().
"""
def __init__(self, data, dest):
self.data = data
self.dest = dest

def run(self):
for index, item in enumerate(self.data):
self.dest.store(index, item)


class pipe:
"""
Represents the configuration of one "pipe" pass.
Expand Down Expand Up @@ -292,7 +270,6 @@ def __copy(self, src, dest, current_path="/data/"):
print("\t {0}".format(r))
out_iteration = write_iterations[in_iteration.iteration_index]
sys.stdout.flush()
self.__particle_patches = []
self.__copy(
in_iteration, out_iteration,
current_path + str(in_iteration.iteration_index) + "/")
Expand All @@ -301,10 +278,6 @@ def __copy(self, src, dest, current_path="/data/"):
deferred.dynamicView.current_buffer(), deferred.offset,
deferred.extent)
in_iteration.close()
for patch_load in self.__particle_patches:
patch_load.run()
out_iteration.close()
self.__particle_patches.clear()
self.loads.clear()
sys.stdout.flush()
elif isinstance(src, io.Record_Component) and (not is_container
Expand Down Expand Up @@ -333,12 +306,6 @@ def __copy(self, src, dest, current_path="/data/"):
self.loads.append(
deferred_load(src, span, local_chunk.offset,
local_chunk.extent))
elif isinstance(src, io.Patch_Record_Component) and (not is_container
or src.scalar):
dest.reset_dataset(io.Dataset(src.dtype, src.shape))
if self.comm.rank == 0:
self.__particle_patches.append(
particle_patch_load(src.load(), dest))
elif isinstance(src, io.Iteration):
self.__copy(src.meshes, dest.meshes, current_path + "meshes/")
self.__copy(src.particles, dest.particles,
Expand Down
6 changes: 3 additions & 3 deletions test/CoreTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ TEST_CASE("particleSpecies_modification_test", "[core]")
species["positionOffset"][RecordComponent::SCALAR].resetDataset(dset);
REQUIRE(1 == species.count("positionOffset"));
auto &patches = species.particlePatches;
REQUIRE(2 == patches.size());
REQUIRE(0 == patches.size());
REQUIRE(0 == patches.numAttributes());
auto &offset = patches["offset"];
REQUIRE(0 == offset.size());
Expand Down Expand Up @@ -720,10 +720,10 @@ TEST_CASE("structure_test", "[core]")
.parent() == getWritable(&o.iterations[1].particles["P"]));

REQUIRE(
1 ==
0 ==
o.iterations[1].particles["P"].particlePatches.count("numParticles"));
REQUIRE(
1 ==
0 ==
o.iterations[1].particles["P"].particlePatches.count(
"numParticlesOffset"));

Expand Down
Loading