From 4db4fe2dbf27a29c478bcdbcae69893b22f28972 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Fri, 30 Jul 2021 20:10:00 -0400 Subject: [PATCH] BP5 Data Flush --- source/adios2/engine/bp5/BP5Engine.h | 17 ++- source/adios2/engine/bp5/BP5Reader.cpp | 56 +++++++-- source/adios2/engine/bp5/BP5Writer.cpp | 114 ++++++++++++------ source/adios2/engine/bp5/BP5Writer.h | 10 +- .../toolkit/format/bp5/BP5Serializer.cpp | 22 +++- .../adios2/toolkit/format/bp5/BP5Serializer.h | 17 +++ source/utils/bp5dbg/adios2/bp5dbg/idxtable.py | 58 ++++----- .../engine/staging-common/CMakeLists.txt | 8 +- .../adios2/engine/staging-common/ParseArgs.h | 5 + .../adios2/engine/staging-common/TestData.h | 44 ++++--- .../staging-common/TestDefSyncWrite.cpp | 15 ++- .../engine/staging-common/TestSupp.cmake | 1 + 12 files changed, 265 insertions(+), 102 deletions(-) diff --git a/source/adios2/engine/bp5/BP5Engine.h b/source/adios2/engine/bp5/BP5Engine.h index 6bc187675d..38a2f5b7db 100644 --- a/source/adios2/engine/bp5/BP5Engine.h +++ b/source/adios2/engine/bp5/BP5Engine.h @@ -147,11 +147,18 @@ class BP5Engine * BP5 header for "Index Table" (64 bytes) * for each Writer, what aggregator writes its data * uint16_t [ WriterCount] - * for each timestep: - * uint64_t 0 : CombinedMetaDataPos - * uint64_t 1 : CombinedMetaDataSize - * for each Writer - * uint64_t DataPos (in the file above) + * for each timestep: (size (WriterCount + 2 ) 64-bit ints + * uint64_t 0 : CombinedMetaDataPos + * uint64_t 1 : CombinedMetaDataSize + * uint64_t 2 : FlushCount + * for each Writer + * for each flush before the last: + * uint64_t DataPos (in the file above) + * uint64_t DataSize + * for the final flush: + * uint64_t DataPos (in the file above) + * So, each timestep takes sizeof(uint64_t)* (3 + ((FlushCount-1)*2 + + *1) * WriterCount) bytes * * MetaMetadata file (mmd.0) contains FFS format information * for each meta metadata item: diff --git a/source/adios2/engine/bp5/BP5Reader.cpp b/source/adios2/engine/bp5/BP5Reader.cpp index 7bd94e3bde..982da776ac 100644 --- a/source/adios2/engine/bp5/BP5Reader.cpp +++ b/source/adios2/engine/bp5/BP5Reader.cpp @@ -138,11 +138,10 @@ void BP5Reader::ReadData(const size_t WriterRank, const size_t Timestep, const size_t StartOffset, const size_t Length, char *Destination) { - size_t DataStartPos = m_MetadataIndexTable[Timestep][2]; + size_t FlushCount = m_MetadataIndexTable[Timestep][2]; + size_t DataPosPos = m_MetadataIndexTable[Timestep][3]; size_t SubfileNum = m_WriterToFileMap[WriterRank]; - DataStartPos += WriterRank * sizeof(uint64_t); - size_t DataStart = helper::ReadValue( - m_MetadataIndex.m_Buffer, DataStartPos, m_Minifooter.IsLittleEndian); + // check if subfile is already opened if (m_DataFileManager.m_Transports.count(SubfileNum) == 0) { @@ -152,7 +151,35 @@ void BP5Reader::ReadData(const size_t WriterRank, const size_t Timestep, m_DataFileManager.OpenFileID(subFileName, SubfileNum, Mode::Read, {{"transport", "File"}}, false); } - m_DataFileManager.ReadFile(Destination, Length, DataStart + StartOffset, + + size_t InfoStartPos = + DataPosPos + (WriterRank * (2 * FlushCount + 1) * sizeof(uint64_t)); + size_t ThisFlushInfo = InfoStartPos; + size_t RemainingLength = Length; + size_t ThisDataPos; + size_t Offset = StartOffset; + for (int flush = 0; flush < FlushCount; flush++) + { + + ThisDataPos = + helper::ReadValue(m_MetadataIndex.m_Buffer, ThisFlushInfo, + m_Minifooter.IsLittleEndian); + size_t ThisDataSize = + helper::ReadValue(m_MetadataIndex.m_Buffer, ThisFlushInfo, + m_Minifooter.IsLittleEndian); + if (ThisDataSize > RemainingLength) + ThisDataSize = RemainingLength; + m_DataFileManager.ReadFile(Destination, ThisDataSize, + ThisDataPos + Offset, SubfileNum); + Destination += ThisDataSize; + RemainingLength -= ThisDataSize; + Offset = 0; + if (RemainingLength == 0) + return; + } + ThisDataPos = helper::ReadValue( + m_MetadataIndex.m_Buffer, ThisFlushInfo, m_Minifooter.IsLittleEndian); + m_DataFileManager.ReadFile(Destination, RemainingLength, ThisDataPos, SubfileNum); } @@ -601,19 +628,34 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, buffer, position, m_Minifooter.IsLittleEndian); const uint64_t MetadataSize = helper::ReadValue( buffer, position, m_Minifooter.IsLittleEndian); + const uint64_t FlushCount = helper::ReadValue( + buffer, position, m_Minifooter.IsLittleEndian); ptrs.push_back(MetadataPos); ptrs.push_back(MetadataSize); + ptrs.push_back(FlushCount); ptrs.push_back(position); m_MetadataIndexTable[currentStep] = ptrs; +#ifdef DUMPDATALOCINFO for (uint64_t i = 0; i < m_WriterCount; i++) { - size_t DataPosPos = ptrs[2] + sizeof(uint64_t) * i; + size_t DataPosPos = ptrs[3]; + std::cout << "Writer " << i << " data at "; + for (uint64_t j = 0; j < FlushCount; j++) + { + const uint64_t DataPos = helper::ReadValue( + buffer, DataPosPos, m_Minifooter.IsLittleEndian); + const uint64_t DataSize = helper::ReadValue( + buffer, DataPosPos, m_Minifooter.IsLittleEndian); + std::cout << "loc:" << DataPos << " siz:" << DataSize << "; "; + } const uint64_t DataPos = helper::ReadValue( buffer, DataPosPos, m_Minifooter.IsLittleEndian); + std::cout << "loc:" << DataPos << std::endl; } +#endif - position += sizeof(uint64_t) * m_WriterCount; + position += sizeof(uint64_t) * m_WriterCount * ((2 * FlushCount) + 1); m_StepsCount++; currentStep++; } while (!oneStepOnly && position < buffer.size()); diff --git a/source/adios2/engine/bp5/BP5Writer.cpp b/source/adios2/engine/bp5/BP5Writer.cpp index 1ac35bf30d..6a68d29e6c 100644 --- a/source/adios2/engine/bp5/BP5Writer.cpp +++ b/source/adios2/engine/bp5/BP5Writer.cpp @@ -57,6 +57,8 @@ StepStatus BP5Writer::BeginStep(StepMode mode, const float timeoutSeconds) false /* always copy */, m_Parameters.BufferChunkSize)); } + m_ThisTimestepDataSize = 0; + return StepStatus::OK; } @@ -242,23 +244,50 @@ void BP5Writer::WriteMetadataFileIndex(uint64_t MetaDataPos, m_FileMetadataManager.FlushFiles(); - uint64_t buf[2]; + std::vector buf; + buf.resize(3 + ((FlushPosSizeInfo.size() * 2) + 1) * m_Comm.Size()); buf[0] = MetaDataPos; buf[1] = MetaDataSize; - m_FileMetadataIndexManager.WriteFiles((char *)buf, sizeof(buf)); - m_FileMetadataIndexManager.WriteFiles((char *)m_WriterDataPos.data(), - m_WriterDataPos.size() * - sizeof(uint64_t)); - /*std::cout << "Write Index positions = {"; - for (size_t i = 0; i < m_WriterDataPos.size(); ++i) - { - std::cout << m_WriterDataPos[i]; - if (i < m_WriterDataPos.size() - 1) + buf[2] = FlushPosSizeInfo.size(); + + uint64_t pos = 3; + + for (int writer = 0; writer < m_Comm.Size(); writer++) + { + for (int flushNum = 0; flushNum < FlushPosSizeInfo.size(); flushNum++) { - std::cout << ", "; + buf[pos + (flushNum * 2)] = FlushPosSizeInfo[flushNum][2 * writer]; + buf[pos + (flushNum * 2) + 1] = + FlushPosSizeInfo[flushNum][2 * writer + 1]; } + buf[pos + FlushPosSizeInfo.size() * 2] = m_WriterDataPos[writer]; + pos += (FlushPosSizeInfo.size() * 2) + 1; } - std::cout << "}" << std::endl;*/ + + m_FileMetadataIndexManager.WriteFiles((char *)buf.data(), + buf.size() * sizeof(uint64_t)); + +#ifdef DUMPDATALOCINFO + std::cout << "Flush count is :" << FlushPosSizeInfo.size() << std::endl; + std::cout << "Write Index positions = {" << std::endl; + + for (size_t i = 0; i < m_Comm.Size(); ++i) + { + std::cout << "Writer " << i << " has data at: " << std::endl; + uint64_t eachWriterSize = FlushPosSizeInfo.size() * 2 + 1; + for (size_t j = 0; j < FlushPosSizeInfo.size(); ++j) + { + std::cout << "loc:" << buf[3 + eachWriterSize * i + j * 2] + << " siz:" << buf[3 + eachWriterSize * i + j * 2 + 1] + << std::endl; + } + std::cout << "loc:" << buf[3 + eachWriterSize * (i + 1) - 1] + << std::endl; + } + std::cout << "}" << std::endl; +#endif + /* reset for next timestep */ + FlushPosSizeInfo.clear(); } void BP5Writer::MarshalAttributes() @@ -332,10 +361,11 @@ void BP5Writer::EndStep() WriteData(TSInfo.DataBuffer); + m_ThisTimestepDataSize += TSInfo.DataBuffer->Size(); + std::vector MetaBuffer = m_BP5Serializer.CopyMetadataToContiguous( TSInfo.NewMetaMetaBlocks, TSInfo.MetaEncodeBuffer, - TSInfo.AttributeEncodeBuffer, TSInfo.DataBuffer->Size(), - m_StartDataPos); + TSInfo.AttributeEncodeBuffer, m_ThisTimestepDataSize, m_StartDataPos); size_t LocalSize = MetaBuffer.size(); std::vector RecvCounts = m_Comm.GatherValues(LocalSize, 0); @@ -765,7 +795,7 @@ void BP5Writer::MakeHeader(format::BufferSTL &b, const std::string fileType, (helper::IsRowMajor(m_IO.m_HostLanguage) == false) ? 'y' : 'n'; helper::CopyToBuffer(buffer, position, &columnMajor); - // byte 45-63: unused + // byte 49-63: unused position += 15; absolutePosition = position; } @@ -807,41 +837,53 @@ void BP5Writer::InitBPBuffer() } } -void BP5Writer::DoFlush(const bool isFinal, const int transportIndex) +void BP5Writer::FlushData(const bool isFinal) { - m_FileMetadataManager.FlushFiles(); - m_FileMetaMetadataManager.FlushFiles(); - m_FileDataManager.FlushFiles(); - - // true: advances step - // BufferV *DataBuf; + BufferV *DataBuf; if (m_Parameters.BufferVType == (int)BufferVType::MallocVType) { - // DataBuf = m_BP5Serializer.ReinitStepData(new - // MallocV("BP5Writer", false, - // m_Parameters.InitialBufferSize, - // m_Parameters.GrowthFactor)); + DataBuf = m_BP5Serializer.ReinitStepData( + new MallocV("BP5Writer", false, m_Parameters.InitialBufferSize, + m_Parameters.GrowthFactor)); } else { - // DataBuf = m_BP5Serializer.ReinitStepData(new - // ChunkV("BP5Writer", - // false /* always - // copy - //*/, - // m_Parameters.BufferChunkSize)); + DataBuf = m_BP5Serializer.ReinitStepData( + new ChunkV("BP5Writer", false /* always copy */, + m_Parameters.BufferChunkSize)); } - // WriteData(DataBuf); - // delete DataBuf; + WriteData(DataBuf); + + m_ThisTimestepDataSize += DataBuf->Size(); + + if (!isFinal) + { + size_t tmp[2]; + // aggregate start pos and data size to rank 0 + tmp[0] = m_StartDataPos; + tmp[1] = DataBuf->Size(); + + std::vector RecvBuffer; + if (m_Comm.Rank() == 0) + { + RecvBuffer.resize(m_Comm.Size() * 2); + } + m_Comm.GatherArrays(tmp, 2, RecvBuffer.data(), 0); + if (m_Comm.Rank() == 0) + { + FlushPosSizeInfo.push_back(RecvBuffer); + } + } + delete DataBuf; } +void BP5Writer::Flush(const int transportIndex) { FlushData(false); } + void BP5Writer::DoClose(const int transportIndex) { PERFSTUBS_SCOPED_TIMER("BP5Writer::Close"); - DoFlush(true, transportIndex); - m_FileDataManager.CloseFiles(transportIndex); // Delete files from temporary storage if draining was on diff --git a/source/adios2/engine/bp5/BP5Writer.h b/source/adios2/engine/bp5/BP5Writer.h index d04187c416..c259f32661 100644 --- a/source/adios2/engine/bp5/BP5Writer.h +++ b/source/adios2/engine/bp5/BP5Writer.h @@ -48,6 +48,7 @@ class BP5Writer : public BP5Engine, public core::Engine size_t CurrentStep() const final; void PerformPuts() final; void EndStep() final; + void Flush(const int transportIndex = -1) final; private: /** Single object controlling BP buffering */ @@ -131,7 +132,7 @@ class BP5Writer : public BP5Engine, public core::Engine ADIOS2_FOREACH_PRIMITVE_STDTYPE_2ARGS(declare_type) #undef declare_type - void DoFlush(const bool isFinal = false, const int transportIndex = -1); + void FlushData(const bool isFinal = false); void DoClose(const int transportIndex = -1) final; @@ -199,6 +200,11 @@ class BP5Writer : public BP5Engine, public core::Engine */ uint64_t m_DataPos = 0; + /* + * Total data written this timestep + */ + uint64_t m_ThisTimestepDataSize = 0; + /** rank 0 collects m_StartDataPos in this vector for writing it * to the index file */ @@ -210,6 +216,8 @@ class BP5Writer : public BP5Engine, public core::Engine // where each writer rank writes its data, init in InitBPBuffer; std::vector m_Assignment; + std::vector> FlushPosSizeInfo; + void MakeHeader(format::BufferSTL &b, const std::string fileType, const bool isActive); }; diff --git a/source/adios2/toolkit/format/bp5/BP5Serializer.cpp b/source/adios2/toolkit/format/bp5/BP5Serializer.cpp index 286995ddd6..04deae3fc8 100644 --- a/source/adios2/toolkit/format/bp5/BP5Serializer.cpp +++ b/source/adios2/toolkit/format/bp5/BP5Serializer.cpp @@ -482,13 +482,14 @@ void BP5Serializer::Marshal(void *Variable, const char *Name, if (span == nullptr) { - DataOffset = CurDataBuffer->AddToVec(ElemCount * ElemSize, Data, + DataOffset = m_PriorDataBufferSizeTotal + + CurDataBuffer->AddToVec(ElemCount * ElemSize, Data, ElemSize, Sync); } else { *span = CurDataBuffer->Allocate(ElemCount * ElemSize, ElemSize); - DataOffset = span->globalPos; + DataOffset = m_PriorDataBufferSizeTotal + span->globalPos; } if (!AlreadyWritten) @@ -636,6 +637,21 @@ void BP5Serializer::InitStep(BufferV *DataBuffer) throw std::logic_error("BP5Serializer:: InitStep without prior close"); } CurDataBuffer = DataBuffer; + m_PriorDataBufferSizeTotal = 0; +} + +BufferV *BP5Serializer::ReinitStepData(BufferV *DataBuffer) +{ + if (CurDataBuffer == NULL) + { + throw std::logic_error("BP5Serializer:: ReinitStep without prior Init"); + } + m_PriorDataBufferSizeTotal += CurDataBuffer->AddToVec( + 0, NULL, sizeof(max_align_t), true); // output block size aligned + + BufferV *tmp = CurDataBuffer; + CurDataBuffer = DataBuffer; + return tmp; } BP5Serializer::TimestepInfo BP5Serializer::CloseTimestep(int timestep) @@ -696,6 +712,8 @@ BP5Serializer::TimestepInfo BP5Serializer::CloseTimestep(int timestep) MBase->DataBlockSize = CurDataBuffer->AddToVec( 0, NULL, sizeof(max_align_t), true); // output block size aligned + MBase->DataBlockSize += m_PriorDataBufferSizeTotal; + void *MetaDataBlock = FFSencode(MetaEncodeBuffer, Info.MetaFormat, MetadataBuf, &MetaDataSize); BufferFFS *Metadata = diff --git a/source/adios2/toolkit/format/bp5/BP5Serializer.h b/source/adios2/toolkit/format/bp5/BP5Serializer.h index 99e7625c9f..97d242643a 100644 --- a/source/adios2/toolkit/format/bp5/BP5Serializer.h +++ b/source/adios2/toolkit/format/bp5/BP5Serializer.h @@ -66,7 +66,22 @@ class BP5Serializer : virtual public BP5Base void MarshalAttribute(const char *Name, const DataType Type, size_t ElemSize, size_t ElemCount, const void *Data); + /* + * InitStep must be called with an appropriate BufferV subtype before a + * step can begin + */ void InitStep(BufferV *DataBuffer); + + /* + * ReinitStepData can be called to "flush" out already written + * data it returns a BufferV representing already-written data and + * provides the serializer with a new, empty BufferV This call + * does *not* reset the data offsets generated with Marshal, so + * those offsets are relative to the entire sequence of data + * produced by a writer rank. + */ + BufferV *ReinitStepData(BufferV *DataBuffer); + TimestepInfo CloseTimestep(int timestep); void PerformPuts(); @@ -128,6 +143,8 @@ class BP5Serializer : virtual public BP5Base BufferV *CurDataBuffer = NULL; std::vector PreviousMetaMetaInfoBlocks; + size_t m_PriorDataBufferSizeTotal = 0; + BP5WriterRec LookupWriterRec(void *Key); BP5WriterRec CreateWriterRec(void *Variable, const char *Name, DataType Type, size_t ElemSize, diff --git a/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py b/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py index 3b62a595f5..0d68354bf7 100644 --- a/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py +++ b/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py @@ -16,7 +16,7 @@ def ReadWriterArray(f, fileSize, WriterCount): data = np.frombuffer(array, dtype=np.uint64, count=1, offset=pos) rank = str(r).rjust(7) sub = str(data[0]).rjust(9) - print("|" + rank + " |" + sub + " |") + print("|" + rank + " | FlushCount = " + sub + " |") print("=====================") return True @@ -24,38 +24,38 @@ def ReadIndex(f, fileSize, WriterCount): nBytes = fileSize - f.tell() if nBytes <= 0: return True - nRows = int(nBytes / (8 * (2 + WriterCount))) table = f.read(nBytes) - print(" timestep count is " + str(nRows)) - for r in range(0, nRows): - pos = r * 8 * (2 + WriterCount) - data = np.frombuffer(table, dtype=np.uint64, count=2 + WriterCount, + pos = 0 + step = 0 + while pos < nBytes: + print("-----------------------------------------------" + + "---------------------------------------------------") + data = np.frombuffer(table, dtype=np.uint64, count=3, offset=pos) - step = str(r).ljust(6) + stepstr = str(step).ljust(6) mdatapos = str(data[0]).ljust(10) mdatasize = str(data[1]).ljust(10) - print("-----------------------------------------------" + - "---------------------------------------------------") - print("| Step = " + step + "| MetadataPos = " + mdatapos + - " | MetadataSize = " + mdatasize + " |") - covered = 0 - for s in range(0, int(WriterCount / 5)): - for t in range(0, 5): - start = "" - start = start + str(data[covered + t + 2]).rjust(10) - print("Data Pos") - print("| Ranks " + str(covered) + "-" + str(covered + 4) + - " " + start) - covered = covered + 5 - covered = int(WriterCount / 5) * 5 - remainder = WriterCount - covered - for t in range(0, remainder): - start = "" - start = start + str(data[covered + t + 2]).rjust(10) - print(" Ranks " + str(covered) + "-" + str(covered + remainder - 1) + - " " + start) - print("---------------------------------------------------" + - "-----------------------------------------------") + flushcount = str(data[2]).ljust(3) + FlushCount = data[2] + print("| Step = " + stepstr + "| MetadataPos = " + mdatapos + + " | MetadataSize = " + mdatasize + " |" + flushcount + "|") + + pos = pos + 3 * 8 + for Writer in range(0, WriterCount): + start = " Writer " + str(Writer) + " data " + thiswriter = np.frombuffer(table, dtype=np.uint64, + count=int(FlushCount * 2 + 1), + offset=pos) + + for i in range(0, FlushCount): # for flushes + start += ("loc:" + str(thiswriter[int(i * 2)]) + " siz:" + + str(thiswriter[i * 2 + 1]) + "; ") + start += "loc:" + str(thiswriter[int(FlushCount * 2)]) + pos = int(pos + (FlushCount * 2 + 1) * 8) + print(start) + print("---------------------------------------------------" + + "-----------------------------------------------") + step = step + 1 if fileSize - f.tell() > 1: print("ERROR: There are {0} bytes at the end of file" diff --git a/testing/adios2/engine/staging-common/CMakeLists.txt b/testing/adios2/engine/staging-common/CMakeLists.txt index 19fad56d5a..e21beecc4e 100644 --- a/testing/adios2/engine/staging-common/CMakeLists.txt +++ b/testing/adios2/engine/staging-common/CMakeLists.txt @@ -104,7 +104,7 @@ if(ADIOS2_HAVE_MPI AND MPIEXEC_EXECUTABLE) endforeach() endif() -set (SIMPLE_TESTS "1x1;1x1DefSync") +set (SIMPLE_TESTS "1x1;1x1DefSync;1x1Flush") set (SIMPLE_FORTRAN_TESTS "") if(ADIOS2_HAVE_Fortran) @@ -147,7 +147,7 @@ endif() SET (BASIC_SST_TESTS "") if(ADIOS2_HAVE_SST) list (APPEND BASIC_SST_TESTS ${ALL_SIMPLE_TESTS} ${SPECIAL_TESTS} ${SST_SPECIFIC_TESTS}) - list (REMOVE_ITEM BASIC_SST_TESTS 1x1DefSync) + list (REMOVE_ITEM BASIC_SST_TESTS 1x1DefSync 1x1Flush) endif() @@ -225,6 +225,8 @@ if(NOT WIN32) # not on windows list (FILTER BP_TESTS EXCLUDE REGEX ".*SharedVar$") # The nobody-writes-data-in-a-timestep tests don't work for any BP file engine list (FILTER BP_TESTS EXCLUDE REGEX ".*NoData$") + # BP3 and BP4 semantics on flush differ from BP5 + list (FILTER BP_TESTS EXCLUDE REGEX ".*Flush.*") foreach(test ${BP_TESTS}) add_common_test(${test} BP4) endforeach() @@ -282,6 +284,7 @@ if(NOT MSVC) # not on windows # The nobody-writes-data-in-a-timestep tests don't work for any BP file engine list (FILTER BP4_STREAM_TESTS EXCLUDE REGEX ".*NoData.BPS$") list (FILTER BP4_STREAM_TESTS EXCLUDE REGEX ".*1x1DefSync.*") + list (FILTER BP4_STREAM_TESTS EXCLUDE REGEX ".*Flush.*") foreach(test ${BP4_STREAM_TESTS}) add_common_test(${test} BP4_stream) @@ -315,6 +318,7 @@ if(NOT MSVC) # not on windows list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*TimeoutOnOpen.FS$") list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*NoReaderNoWait.FS$") list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*1x1DefSync.*") + list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*Flush.*") foreach(test ${FILESTREAM_TESTS}) add_common_test(${test} FileStream) diff --git a/testing/adios2/engine/staging-common/ParseArgs.h b/testing/adios2/engine/staging-common/ParseArgs.h index cf350f0a54..c67eef1932 100644 --- a/testing/adios2/engine/staging-common/ParseArgs.h +++ b/testing/adios2/engine/staging-common/ParseArgs.h @@ -36,6 +36,7 @@ bool VaryingDataSize = false; bool AdvancingAttrs = false; int NoData = 0; int NoDataNode = -1; +int Flush = 0; int EarlyExit = 0; int LocalCount = 1; int DataSize = 5 * 1024 * 1024 / 8; /* DefaultMinDeferredSize is 4*1024*1024 @@ -300,6 +301,10 @@ static void ParseArgs(int argc, char **argv) { EarlyExit++; } + else if (std::string(argv[1]) == "--flush") + { + Flush++; + } else if (std::string(argv[1]) == "--disable_mpmd") { // someone else should have eaten this arg, but if it gets here, diff --git a/testing/adios2/engine/staging-common/TestData.h b/testing/adios2/engine/staging-common/TestData.h index 11a423a19c..68e1b29fa0 100644 --- a/testing/adios2/engine/staging-common/TestData.h +++ b/testing/adios2/engine/staging-common/TestData.h @@ -193,8 +193,9 @@ int validateCommonTestData(int start, int length, size_t step, { std::cout << "Expected 0x" << std::hex << (int16_t)((i + start) * 10 + step) << ", got 0x" - << std::hex << (int16_t)in_I8[i] << " for in_I8[" << i - << "](global[" << i + start << "])" << std::endl; + << std::hex << (int16_t)in_I8[i] << std::dec + << " for in_I8[" << i << "](global[" << i + start + << "]), timestep " << step << std::endl; failures++; } } @@ -203,7 +204,8 @@ int validateCommonTestData(int start, int length, size_t step, std::cout << "Expected 0x" << std::hex << (int16_t)((i + start) * 10 + step) << ", got 0x" << std::hex << in_I16[i] << " for in_I16[" << i - << "](global[" << i + start << "])" << std::endl; + << "](global[" << i + start << "]), timestep " << step + << std::endl; failures++; } if (in_I32[i] != (int32_t)((i + start) * 10 + step)) @@ -211,7 +213,8 @@ int validateCommonTestData(int start, int length, size_t step, std::cout << "Expected 0x" << std::hex << (int32_t)((i + start) * 10 + step) << ", got 0x" << std::hex << in_I32[i] << " for in_I32[" << i - << "](global[" << i + start << "])" << std::endl; + << "](global[" << i + start << "]), timestep " << step + << std::endl; failures++; } if (in_I64[i] != (int64_t)((i + start) * 10 + step)) @@ -219,7 +222,8 @@ int validateCommonTestData(int start, int length, size_t step, std::cout << "Expected 0x" << std::hex << (int64_t)((i + start) * 10 + step) << ", got 0x" << std::hex << in_I64[i] << " for in_I64[" << i - << "](global[" << i + start << "])" << std::endl; + << "](global[" << i + start << "]), timestep " << step + << std::endl; failures++; } @@ -229,7 +233,8 @@ int validateCommonTestData(int start, int length, size_t step, { std::cout << "Expected " << (float)((i + start) * 10 + step) << ", got " << in_R32[i] << " for in_R32[" << i - << "](global[" << i + start << "])" << std::endl; + << "](global[" << i + start << "]), timestep " << step + << std::endl; failures++; } } @@ -244,7 +249,8 @@ int validateCommonTestData(int start, int length, size_t step, << (float)((i + start) * 10 + step + 1000.0 * j) << ", got " << in_R32_blocks[j][i] << " for in_R32[" << i << "][" << j << "(global[" - << i + start << "])" << std::endl; + << i + start << "]), timestep " << step + << std::endl; failures++; } } @@ -254,7 +260,8 @@ int validateCommonTestData(int start, int length, size_t step, { std::cout << "Expected " << (double)((i + start) * 10 + step) << ", got " << in_R64[i] << " for in_R64[" << i - << "](global[" << i + start << "])" << std::endl; + << "](global[" << i + start << "]), timestep " << step + << std::endl; failures++; } if (!missing_end_data) @@ -265,7 +272,8 @@ int validateCommonTestData(int start, int length, size_t step, std::cout << "Expected [" << (float)((i + start) * 10 + step) << ", " << -(float)((i + start) * 10 + step) << "], got " << in_C32[i] << " for in_C32[" << i - << "](global[" << i + start << "])" << std::endl; + << "](global[" << i + start << "]), timestep " << step + << std::endl; failures++; } if ((in_C64[i].imag() != (double)((i + start) * 10 + step)) || @@ -274,15 +282,16 @@ int validateCommonTestData(int start, int length, size_t step, std::cout << "Expected [" << (double)((i + start) * 10 + step) << ", " << -(double)((i + start) * 10 + step) << "], got " << in_C64[i] << " for in_C64[" << i - << "](global[" << i + start << "])" << std::endl; + << "](global[" << i + start << "]), timestep " << step + << std::endl; failures++; } if (in_R64_2d[2 * i] != (double)((i + start) * 10 + step)) { std::cout << "Expected " << (double)((i + start) * 10 + step) << ", got " << in_R64_2d[i] << " for in_R64_2d[" << i - << "][0](global[" << i + start << "][0])" - << std::endl; + << "][0](global[" << i + start << "][0]), timestep " + << step << std::endl; failures++; } if (in_R64_2d[2 * i + 1] != @@ -291,8 +300,8 @@ int validateCommonTestData(int start, int length, size_t step, std::cout << "Expected " << (double)(10000 + (i + start) * 10 + step) << ", got " << in_R64_2d[i] << " for in_R64_2d[" << i - << "][1](global[" << i + start << "][1])" - << std::endl; + << "][1](global[" << i + start << "][1]), timestep " + << step << std::endl; failures++; } if (in_R64_2d_rev[i] != (double)((i + start) * 10 + step)) @@ -300,7 +309,7 @@ int validateCommonTestData(int start, int length, size_t step, std::cout << "Expected " << (double)((i + start) * 10 + step) << ", got " << in_R64_2d_rev[i] << " for in_R64_2d_rev[0][" << i << "](global[0][" - << i + start << "])" << std::endl; + << i + start << "]), timestep " << step << std::endl; failures++; } if (in_R64_2d_rev[i + length] != @@ -310,7 +319,7 @@ int validateCommonTestData(int start, int length, size_t step, << (double)(10000 + (i + start) * 10 + step) << ", got " << in_R64_2d_rev[i + length] << " for in_R64_2d_rev[1][" << i << "](global[1][" - << i + start << "])" << std::endl; + << i + start << "]), timestep " << step << std::endl; failures++; } } @@ -329,7 +338,8 @@ int validateCommonTestDataR64(int start, int length, size_t step, { std::cout << "Expected " << (double)((i + start) * 10 + step) << ", got " << in_R64[i] << " for in_R64[" << i - << "](global[" << i + start << "])" << std::endl; + << "](global[" << i + start << "]), timestep " << step + << std::endl; failures++; } } diff --git a/testing/adios2/engine/staging-common/TestDefSyncWrite.cpp b/testing/adios2/engine/staging-common/TestDefSyncWrite.cpp index 578ecc52bd..9cedcd8251 100644 --- a/testing/adios2/engine/staging-common/TestDefSyncWrite.cpp +++ b/testing/adios2/engine/staging-common/TestDefSyncWrite.cpp @@ -71,7 +71,8 @@ TEST(CommonWriteTest, ADIOS2CommonWrite) * Don't write * Sync - always destroy data afterwards * Deferred - * Deferred with immediate PerformPuts() - Destroy all prior data + * Deferred with immediate PerformPuts() or Flush() - Destroy all + *prior data * */ for (int step = StartStep; step < EndStep; ++step) @@ -112,8 +113,16 @@ TEST(CommonWriteTest, ADIOS2CommonWrite) } else if (this_var_mask == 3) { - std::cout << "P "; - engine.PerformPuts(); + if (Flush) + { + std::cout << "F "; + engine.Flush(); + } + else + { + std::cout << "P "; + engine.PerformPuts(); + } for (int k = 0; k <= j; k++) std::fill(data[k].begin(), data[k].end(), -100.0); } diff --git a/testing/adios2/engine/staging-common/TestSupp.cmake b/testing/adios2/engine/staging-common/TestSupp.cmake index 29f6cf3f8f..183224e845 100644 --- a/testing/adios2/engine/staging-common/TestSupp.cmake +++ b/testing/adios2/engine/staging-common/TestSupp.cmake @@ -65,6 +65,7 @@ set (STAGING_COMMON_TEST_SUPP_VERBOSE OFF) set (1x1_CMD "run_test.py.$ -nw 1 -nr 1") set (1x1GetSync_CMD "run_test.py.$ -nw 1 -nr 1 --rarg=--read_mode --rarg=sync") set (1x1DefSync_CMD "TestDefSyncWrite --data_size 200 --engine_params ChunkSize=500,MinDeferredSize=150") +set (1x1Flush_CMD "TestDefSyncWrite --flush --data_size 200 --engine_params ChunkSize=500,MinDeferredSize=150") set (1x1.NoPreload_CMD "run_test.py.$ -nw 1 -nr 1 --rarg=PreloadMode=SstPreloadNone,RENGINE_PARAMS") set (1x1.SstRUDP_CMD "run_test.py.$ -nw 1 -nr 1 --rarg=DataTransport=WAN,WANDataTransport=enet,RENGINE_PARAMS --warg=DataTransport=WAN,WANDataTransport=enet,WENGINE_PARAMS") set (1x1.NoData_CMD "run_test.py.$ -nw 1 -nr 1 --warg=--no_data --rarg=--no_data")