diff --git a/source/adios2/toolkit/format/dataman/DataManSerializer.tcc b/source/adios2/toolkit/format/dataman/DataManSerializer.tcc index 535e0a00cc..2e43e28c5a 100644 --- a/source/adios2/toolkit/format/dataman/DataManSerializer.tcc +++ b/source/adios2/toolkit/format/dataman/DataManSerializer.tcc @@ -261,11 +261,17 @@ int DataManSerializer::GetData(T *outputData, const std::string &varName, { input_data += j.position; } - - if (j.shape.size() > 0 and j.shape[0] > 1 and j.start.size() > 0 and - j.start.size() == j.count.size() and - j.start.size() == varStart.size() and - j.start.size() == varCount.size()) + /* single values */ + if (j.shape.empty() or + std::all_of(j.shape.begin(), j.shape.end(), + [&](size_t i) { return i == 1; })) + { + std::memcpy(reinterpret_cast(outputData), input_data, + sizeof(T)); + } + else if (j.start.size() > 0 and j.start.size() == j.count.size() and + j.start.size() == varStart.size() and + j.start.size() == varCount.size()) { if (m_ContiguousMajor) { @@ -284,10 +290,12 @@ int DataManSerializer::GetData(T *outputData, const std::string &varName, sizeof(T), j.start, j.count, varMemStart, varMemCount); } } - if (j.shape.empty() or (j.shape.size() == 1 and j.shape[0] == 1)) + else { - std::memcpy(reinterpret_cast(outputData), input_data, - sizeof(T)); + throw std::runtime_error( + "DataManSerializer::GeData end with Step \" + " + "std::to_string(step) +\n" + " \" Var \" + varName failed"); } } } diff --git a/testing/adios2/engine/dataman/CMakeLists.txt b/testing/adios2/engine/dataman/CMakeLists.txt index f40d6b60a6..812649cc21 100644 --- a/testing/adios2/engine/dataman/CMakeLists.txt +++ b/testing/adios2/engine/dataman/CMakeLists.txt @@ -17,6 +17,12 @@ foreach(tst IN ITEMS ) endforeach() +if (ADIOS2_HAVE_Python) + python_add_test(NAME Test.Engine.DataMan1D.Serial SCRIPT TestDataMan1D.py) + python_add_test(NAME Test.Engine.DataMan1xN.Serial SCRIPT TestDataMan1xN.py) + python_add_test(NAME Test.Engine.DataManSingleValues SCRIPT TestDataManSingleValues.py) +endif() + if(ADIOS2_HAVE_ZFP) gtest_add_tests_helper(2DZfp MPI_NONE DataMan Engine.DataMan. "") set_tests_properties(${Test.Engine.DataMan.2DZfp-TESTS} diff --git a/testing/adios2/engine/dataman/TestDataMan1D.py b/testing/adios2/engine/dataman/TestDataMan1D.py new file mode 100644 index 0000000000..8b8aac928f --- /dev/null +++ b/testing/adios2/engine/dataman/TestDataMan1D.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python +# +# Distributed under the OSI-approved Apache License, Version 2.0. See +# accompanying file Copyright.txt for details. +# +# TestDataMan1D.py: test for 1D data transfer by reading in Python +# Created on: March 3, 2023 +# Author: Dmitry Ganyushin ganyushindi@ornl.gov +from multiprocessing import Process +import unittest +import numpy as np +import adios2 + + +class TestDataMan1D(unittest.TestCase): + + def setUp(self): + self.conf = { + "IPAddress": "127.0.0.1", + "Port": "12306", + "Timeout": "5", + "TransportMode": "reliable", + "RendezvousReaderCount": "1", + } + self.Nx = 10 + self.fill_value = 1.0 + self.shape = [self.Nx] + + def test_run(self): + + s = Process(target=self.thread_send) + r = Process(target=self.thread_receive) + + s.start() + r.start() + + r.join() + s.join() + + def thread_send(self): + data = np.full(shape=self.shape, fill_value=self.fill_value) + shape = data.shape + count = shape + start = (0,) * len(shape) + + adios_io = adios2.ADIOS() + wan = adios_io.DeclareIO("Server") + wan.SetEngine("Dataman") + + wan.SetParameters(self.conf) + writer = wan.Open("testdata", adios2.Mode.Write) + sendbuffer = wan.DefineVariable("np_data", data, shape, + start, count, adios2.ConstantDims) + self.assertIsNotNone(sendbuffer) + if sendbuffer: + writer.BeginStep() + writer.Put(sendbuffer, data, adios2.Mode.Deferred) + writer.EndStep() + else: + raise ValueError("DefineVariable failed") + + writer.Close() + + def thread_receive(self): + data = np.zeros(shape=self.shape) + adios_io = adios2.ADIOS() + wan = adios_io.DeclareIO("Client") + wan.SetEngine("Dataman") + wan.SetParameters(self.conf) + reader = wan.Open("testdata", adios2.Mode.Read) + while True: + stepStatus = reader.BeginStep() + if stepStatus == adios2.StepStatus.OK: + recvar = wan.InquireVariable("np_data") + self.assertIsNotNone(recvar) + bufshape = recvar.Shape() + self.assertTrue(bufshape[0] == self.Nx) + reader.Get(recvar, data, adios2.Mode.Sync) + + elif stepStatus == adios2.StepStatus.EndOfStream: + break + else: + raise StopIteration() + reader.EndStep() + reader.Close() + self.assertTrue(all([data[i] == self.fill_value for i + in range(len(data))])) + + +if __name__ == '__main__': + unittest.main() diff --git a/testing/adios2/engine/dataman/TestDataMan1xN.py b/testing/adios2/engine/dataman/TestDataMan1xN.py new file mode 100644 index 0000000000..7097c798c7 --- /dev/null +++ b/testing/adios2/engine/dataman/TestDataMan1xN.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python +# +# Distributed under the OSI-approved Apache License, Version 2.0. See +# accompanying file Copyright.txt for details. +# +# TestDataMan1D.py: test for 1D data transfer by reading in Python +# Created on: March 3, 2023 +# Author: Dmitry Ganyushin ganyushindi@ornl.gov +from multiprocessing import Process +import unittest +import numpy as np +import adios2 + + +class TestDataMan1D(unittest.TestCase): + + def setUp(self): + self.conf = { + "IPAddress": "127.0.0.1", + "Port": "12306", + "Timeout": "5", + "TransportMode": "reliable", + "RendezvousReaderCount": "1", + } + self.Nx = 10 + self.fill_value = 1.0 + self.shape = [1, self.Nx] + + def test_run(self): + + s = Process(target=self.thread_send) + r = Process(target=self.thread_receive) + + s.start() + r.start() + + r.join() + s.join() + + def thread_send(self): + data = np.full(shape=self.shape, fill_value=self.fill_value) + shape = data.shape + count = shape + start = (0,) * len(shape) + + adios_io = adios2.ADIOS() + wan = adios_io.DeclareIO("Server") + wan.SetEngine("Dataman") + + wan.SetParameters(self.conf) + writer = wan.Open("testdata", adios2.Mode.Write) + sendbuffer = wan.DefineVariable("np_data", data, shape, + start, count, adios2.ConstantDims) + self.assertIsNotNone(sendbuffer) + if sendbuffer: + writer.BeginStep() + writer.Put(sendbuffer, data, adios2.Mode.Deferred) + writer.EndStep() + else: + raise ValueError("DefineVariable failed") + + writer.Close() + + def thread_receive(self): + data = np.zeros(shape=self.shape) + adios_io = adios2.ADIOS() + wan = adios_io.DeclareIO("Client") + wan.SetEngine("Dataman") + wan.SetParameters(self.conf) + reader = wan.Open("testdata", adios2.Mode.Read) + while True: + stepStatus = reader.BeginStep() + if stepStatus == adios2.StepStatus.OK: + recvar = wan.InquireVariable("np_data") + self.assertIsNotNone(recvar) + bufshape = recvar.Shape() + self.assertTrue(bufshape[0] == 1) + self.assertTrue(bufshape[1] == self.Nx) + reader.Get(recvar, data, adios2.Mode.Sync) + + elif stepStatus == adios2.StepStatus.EndOfStream: + break + else: + raise StopIteration() + reader.EndStep() + reader.Close() + self.assertTrue(all([data[0][i] == self.fill_value for i + in range(len(data))])) + + +if __name__ == '__main__': + unittest.main() diff --git a/testing/adios2/engine/dataman/TestDataManSingleValues.py b/testing/adios2/engine/dataman/TestDataManSingleValues.py new file mode 100644 index 0000000000..72e116f2b5 --- /dev/null +++ b/testing/adios2/engine/dataman/TestDataManSingleValues.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python +# +# Distributed under the OSI-approved Apache License, Version 2.0. See +# accompanying file Copyright.txt for details. +# +# TestDataMan1D.py: test for 1D data transfer by reading in Python +# Created on: March 3, 2023 +# Author: Dmitry Ganyushin ganyushindi@ornl.gov +from multiprocessing import Process +import unittest +import numpy as np +import adios2 + + +class TestDataMan1D(unittest.TestCase): + + def setUp(self): + self.conf = { + "IPAddress": "127.0.0.1", + "Port": "12306", + "Timeout": "5", + "TransportMode": "reliable", + "RendezvousReaderCount": "1", + } + self.Nx = 1 + self.fill_value = 1.0 + self.shape = [self.Nx] + + def test_run(self): + + s = Process(target=self.thread_send) + r = Process(target=self.thread_receive) + + s.start() + r.start() + + r.join() + s.join() + + def thread_send(self): + data = np.full(shape=self.shape, fill_value=self.fill_value) + shape = data.shape + count = shape + start = (0,) * len(shape) + + adios_io = adios2.ADIOS() + wan = adios_io.DeclareIO("Server") + wan.SetEngine("Dataman") + + wan.SetParameters(self.conf) + writer = wan.Open("testdata", adios2.Mode.Write) + sendbuffer = wan.DefineVariable("np_data", data, shape, + start, count, adios2.ConstantDims) + self.assertIsNotNone(sendbuffer) + if sendbuffer: + writer.BeginStep() + writer.Put(sendbuffer, data, adios2.Mode.Deferred) + writer.EndStep() + else: + raise ValueError("DefineVariable failed") + + writer.Close() + + def thread_receive(self): + data = np.zeros(shape=self.shape) + adios_io = adios2.ADIOS() + wan = adios_io.DeclareIO("Client") + wan.SetEngine("Dataman") + wan.SetParameters(self.conf) + reader = wan.Open("testdata", adios2.Mode.Read) + while True: + stepStatus = reader.BeginStep() + if stepStatus == adios2.StepStatus.OK: + recvar = wan.InquireVariable("np_data") + self.assertIsNotNone(recvar) + reader.Get(recvar, data, adios2.Mode.Sync) + + elif stepStatus == adios2.StepStatus.EndOfStream: + break + else: + raise StopIteration() + reader.EndStep() + reader.Close() + self.assertTrue(all([data[i] == self.fill_value for i + in range(len(data))])) + + +if __name__ == '__main__': + unittest.main()