diff --git a/source/adios2/core/Engine.cpp b/source/adios2/core/Engine.cpp index bca5e5445f..37875395b9 100644 --- a/source/adios2/core/Engine.cpp +++ b/source/adios2/core/Engine.cpp @@ -195,6 +195,20 @@ void Engine::DoPutStructDeferred(VariableStruct &, const void *) return nullptr; \ } +void Engine::RegisterCreatedVariable(const VariableBase *var) +{ + m_CreatedVars.insert(var); +} + +void Engine::RemoveCreatedVars() +{ + for (auto &VarRec : m_CreatedVars) + { + m_IO.RemoveVariable(VarRec->m_Name); + } + m_CreatedVars.clear(); +} + void Engine::DoGetAbsoluteSteps(const VariableBase &variable, std::vector &keys) const { diff --git a/source/adios2/core/Engine.h b/source/adios2/core/Engine.h index 815cc15de8..a066050c21 100644 --- a/source/adios2/core/Engine.h +++ b/source/adios2/core/Engine.h @@ -22,6 +22,7 @@ #include //std::shared_ptr #include #include +#include #include /// \endcond @@ -511,6 +512,9 @@ class Engine /** Inform about computation block through User->ADIOS->IO */ virtual void ExitComputationBlock() noexcept; + void RegisterCreatedVariable(const VariableBase *var); + void RemoveCreatedVars(); + protected: /** from ADIOS class passed to Engine created with Open * if no communicator is passed */ @@ -623,6 +627,8 @@ class Engine bool m_BetweenStepPairs = false; private: + std::unordered_set m_CreatedVars; + /** Throw exception by Engine virtual functions not implemented/supported by * a derived class */ void ThrowUp(const std::string function) const; diff --git a/source/adios2/engine/bp3/BP3Reader.cpp b/source/adios2/engine/bp3/BP3Reader.cpp index 502c6b338f..c6b7032ae5 100644 --- a/source/adios2/engine/bp3/BP3Reader.cpp +++ b/source/adios2/engine/bp3/BP3Reader.cpp @@ -251,6 +251,7 @@ void BP3Reader::DoClose(const int transportIndex) { PERFSTUBS_SCOPED_TIMER("BP3Reader::Close"); PerformGets(); + RemoveCreatedVars(); m_SubFileManager.CloseFiles(); m_FileManager.CloseFiles(); } diff --git a/source/adios2/engine/bp4/BP4Reader.cpp b/source/adios2/engine/bp4/BP4Reader.cpp index cd3b9170cf..7618052b82 100644 --- a/source/adios2/engine/bp4/BP4Reader.cpp +++ b/source/adios2/engine/bp4/BP4Reader.cpp @@ -655,9 +655,8 @@ size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant, } void BP4Reader::ProcessMetadataForNewSteps(const size_t newIdxSize) { - /* Remove all existing variables from previous steps - It seems easier than trying to update them */ - m_IO.RemoveAllVariables(); + /* Remove all variables we created in the last step */ + RemoveCreatedVars(); /* Parse metadata index table (without header) */ /* We need to skew the index table pointers with the @@ -809,6 +808,9 @@ void BP4Reader::DoClose(const int transportIndex) helper::Log("Engine", "BP4Reader", "Close", m_Name, 0, m_Comm.Rank(), 5, m_Verbosity, helper::LogMode::INFO); PerformGets(); + /* Remove all variables we created in the last step */ + RemoveCreatedVars(); + m_DataFileManager.CloseFiles(); m_MDFileManager.CloseFiles(); } diff --git a/source/adios2/engine/sst/SstReader.cpp b/source/adios2/engine/sst/SstReader.cpp index f34dcb2ca1..f658d7ca51 100644 --- a/source/adios2/engine/sst/SstReader.cpp +++ b/source/adios2/engine/sst/SstReader.cpp @@ -68,6 +68,7 @@ SstReader::SstReader(IO &io, const std::string &name, const Mode mode, &(Reader->m_IO.DefineVariable(variableName)); \ variable->SetData((T *)data); \ variable->m_AvailableStepsCount = 1; \ + Reader->RegisterCreatedVariable(variable); \ return (void *)variable; \ } @@ -163,6 +164,7 @@ SstReader::SstReader(IO &io, const std::string &name, const Mode mode, Variable *variable = &(Reader->m_IO.DefineVariable( \ variableName, VecShape, VecStart, VecCount)); \ variable->m_AvailableStepsCount = 1; \ + Reader->RegisterCreatedVariable(variable); \ return (void *)variable; \ } ADIOS2_FOREACH_STDTYPE_1ARG(declare_type) @@ -300,7 +302,7 @@ StepStatus SstReader::BeginStep(StepMode Mode, const float timeout_sec) case adios2::StepMode::Read: break; } - m_IO.RemoveAllVariables(); + RemoveCreatedVars(); result = SstAdvanceStep(m_Input, timeout_sec); if (result == SstEndOfStream) { @@ -356,7 +358,7 @@ StepStatus SstReader::BeginStep(StepMode Mode, const float timeout_sec) i++; } - m_IO.RemoveAllVariables(); + RemoveCreatedVars(); m_BP5Deserializer->SetupForStep( SstCurrentStep(m_Input), static_cast(m_CurrentStepMetaData->WriterCohortSize)); @@ -421,7 +423,7 @@ StepStatus SstReader::BeginStep(StepMode Mode, const float timeout_sec) (*m_CurrentStepMetaData->WriterMetadata)->block, (*m_CurrentStepMetaData->WriterMetadata)->DataSize); - m_IO.RemoveAllVariables(); + RemoveCreatedVars(); m_BP3Deserializer->ParseMetadata(m_BP3Deserializer->m_Metadata, *this); m_IO.ResetVariablesStepSelection(true, "in call to SST Reader BeginStep"); diff --git a/source/adios2/toolkit/format/bp/bp3/BP3Deserializer.tcc b/source/adios2/toolkit/format/bp/bp3/BP3Deserializer.tcc index d6bfe0c1e2..25b1e86b8f 100644 --- a/source/adios2/toolkit/format/bp/bp3/BP3Deserializer.tcc +++ b/source/adios2/toolkit/format/bp/bp3/BP3Deserializer.tcc @@ -695,6 +695,7 @@ inline void BP3Deserializer::DefineVariableInEngineIO( { std::lock_guard lock(m_Mutex); variable = &engine.m_IO.DefineVariable(variableName); + engine.RegisterCreatedVariable(variable); variable->m_Value = characteristics.Statistics.Value; // assigning first step @@ -846,6 +847,8 @@ void BP3Deserializer::DefineVariableInEngineIO(const ElementIndexHeader &header, variableName + ", in call to Open"); } // end switch + engine.RegisterCreatedVariable(variable); + if (characteristics.Statistics.IsValue) { variable->m_Value = characteristics.Statistics.Value; diff --git a/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc b/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc index b09ee68eaf..5408f32ec2 100644 --- a/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc +++ b/source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc @@ -701,6 +701,7 @@ inline void BP4Deserializer::DefineVariableInEngineIOPerStep( { std::lock_guard lock(m_Mutex); variable = &engine.m_IO.DefineVariable(variableName); + engine.RegisterCreatedVariable(variable); variable->m_Value = characteristics.Statistics.Value; // assigning first step @@ -928,6 +929,7 @@ void BP4Deserializer::DefineVariableInEngineIOPerStep( variableName + ", in call to Open"); } // end switch + engine.RegisterCreatedVariable(variable); if (characteristics.Statistics.IsValue) { variable->m_Value = characteristics.Statistics.Value; diff --git a/source/adios2/toolkit/sst/cp/cp_common.c b/source/adios2/toolkit/sst/cp/cp_common.c index 197ebb6a63..8c9c6835fb 100644 --- a/source/adios2/toolkit/sst/cp/cp_common.c +++ b/source/adios2/toolkit/sst/cp/cp_common.c @@ -16,6 +16,71 @@ char *SSTStreamStatusStr[] = {"NotOpen", "Opening", "Established", "PeerClosed", "PeerFailed", "Closed"}; +#ifdef MUTEX_DEBUG +#define STREAM_MUTEX_LOCK(Stream) \ + { \ + fprintf(stderr, "(PID %lx, TID %lx) CP_COMMON Trying lock line %d\n", \ + (long)getpid(), (long)gettid(), __LINE__); \ + pthread_mutex_lock(&Stream->DataLock); \ + Stream->Locked++; \ + fprintf(stderr, "(PID %lx, TID %lx) CP_COMMON Got lock\n", \ + (long)getpid(), (long)gettid()); \ + } + +#define STREAM_MUTEX_UNLOCK(Stream) \ + { \ + fprintf(stderr, "(PID %lx, TID %lx) CP_COMMON UNlocking line %d\n", \ + (long)getpid(), (long)gettid(), __LINE__); \ + Stream->Locked--; \ + pthread_mutex_unlock(&Stream->DataLock); \ + } +#define STREAM_CONDITION_WAIT(Stream) \ + { \ + fprintf( \ + stderr, \ + "(PID %lx, TID %lx) CP_COMMON Dropping Condition Lock line %d\n", \ + (long)getpid(), (long)gettid(), __LINE__); \ + Stream->Locked = 0; \ + pthread_cond_wait(&Stream->DataCondition, &Stream->DataLock); \ + fprintf( \ + stderr, \ + "(PID %lx, TID %lx) CP_COMMON Acquired Condition Lock line %d\n", \ + (long)getpid(), (long)gettid(), __LINE__); \ + Stream->Locked = 1; \ + } +#define STREAM_CONDITION_SIGNAL(Stream) \ + { \ + assert(Stream->Locked == 1); \ + fprintf(stderr, \ + "(PID %lx, TID %lx) CP_COMMON Signalling Condition line %d\n", \ + (long)getpid(), (long)gettid(), __LINE__); \ + pthread_cond_signal(&Stream->DataCondition); \ + } + +#define STREAM_ASSERT_LOCKED(Stream) \ + { \ + assert(Stream->Locked == 1); \ + } +#else +#define STREAM_MUTEX_LOCK(Stream) \ + { \ + pthread_mutex_lock(&Stream->DataLock); \ + } +#define STREAM_MUTEX_UNLOCK(Stream) \ + { \ + pthread_mutex_unlock(&Stream->DataLock); \ + } +#define STREAM_CONDITION_WAIT(Stream) \ + { \ + pthread_cond_wait(&Stream->DataCondition, &Stream->DataLock); \ + } +#define STREAM_CONDITION_SIGNAL(Stream) \ + { \ + pthread_cond_signal(&Stream->DataCondition); \ + } +#define STREAM_ASSERT_LOCKED(Stream) +#endif + void CP_validateParams(SstStream Stream, SstParams Params, int Writer) { if (Params->RendezvousReaderCount >= 0) @@ -1127,9 +1192,9 @@ extern void SstStreamDestroy(SstStream Stream) * in a safe way after all streams have been destroyed */ struct _SstStream StackStream; - pthread_mutex_lock(&Stream->DataLock); CP_verbose(Stream, PerStepVerbose, "Destroying stream %p, name %s\n", Stream, Stream->Filename); + STREAM_MUTEX_LOCK(Stream); StackStream = *Stream; Stream->Status = Destroyed; struct _TimestepMetadataList *Next = Stream->Timesteps; @@ -1141,7 +1206,7 @@ extern void SstStreamDestroy(SstStream Stream) } if (Stream->DP_Stream) { - pthread_mutex_unlock(&Stream->DataLock); + STREAM_MUTEX_UNLOCK(Stream); if (Stream->Role == ReaderRole) { Stream->DP_Interface->destroyReader(&Svcs, Stream->DP_Stream); @@ -1150,7 +1215,7 @@ extern void SstStreamDestroy(SstStream Stream) { Stream->DP_Interface->destroyWriter(&Svcs, Stream->DP_Stream); } - pthread_mutex_lock(&Stream->DataLock); + STREAM_MUTEX_LOCK(Stream); } if (Stream->Readers) { @@ -1276,7 +1341,7 @@ extern void SstStreamDestroy(SstStream Stream) FreeCustomStructs(&Stream->CPInfo->CustomStructs); free(Stream->CPInfo); - pthread_mutex_unlock(&Stream->DataLock); + STREAM_MUTEX_UNLOCK(Stream); // Stream is free'd in LastCall pthread_mutex_lock(&StateMutex); diff --git a/testing/adios2/engine/staging-common/TestCommonRead.cpp b/testing/adios2/engine/staging-common/TestCommonRead.cpp index 30bb3e2042..3efccc5588 100644 --- a/testing/adios2/engine/staging-common/TestCommonRead.cpp +++ b/testing/adios2/engine/staging-common/TestCommonRead.cpp @@ -461,6 +461,8 @@ TEST_F(CommonReadTest, ADIOS2CommonRead1D8) engine.Close(); if (TestVarDestruction) { + // Engine close should delete was was created by the reader, not other + // vars EXPECT_FALSE(io.InquireVariable>("c32")); } } diff --git a/testing/adios2/interface/TestADIOSInterface.cpp b/testing/adios2/interface/TestADIOSInterface.cpp index 8569ae66b0..73245ba047 100644 --- a/testing/adios2/interface/TestADIOSInterface.cpp +++ b/testing/adios2/interface/TestADIOSInterface.cpp @@ -462,8 +462,10 @@ TYPED_TEST(ADIOS2_CXX11_API_MultiBlock, Put2File) writer.Put(var, &myData[b][0], TypeParam::PutMode); } - reader.Close(); + // Close the writer before the reader because the var goes away when the + // reader does writer.Close(); + reader.Close(); this->CheckOutput(filename); }