Skip to content

Commit

Permalink
Merge pull request #3272 from eisenhauer/MoreDestruction
Browse files Browse the repository at this point in the history
Destroy only created vars on BeginStep and again on Reader close
  • Loading branch information
eisenhauer authored Jul 12, 2022
2 parents a1649b6 + 5bf4a36 commit 15eb09e
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 11 deletions.
14 changes: 14 additions & 0 deletions source/adios2/core/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,20 @@ void Engine::DoPutStructDeferred(VariableStruct &, const void *)
return nullptr; \
}

void Engine::RegisterCreatedVariable(const VariableBase *var)
{
m_CreatedVars.insert(var);
}

void Engine::RemoveCreatedVars()
{
for (auto &VarRec : m_CreatedVars)
{
m_IO.RemoveVariable(VarRec->m_Name);
}
m_CreatedVars.clear();
}

void Engine::DoGetAbsoluteSteps(const VariableBase &variable,
std::vector<size_t> &keys) const
{
Expand Down
6 changes: 6 additions & 0 deletions source/adios2/core/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <memory> //std::shared_ptr
#include <set>
#include <string>
#include <unordered_set>
#include <vector>
/// \endcond

Expand Down Expand Up @@ -511,6 +512,9 @@ class Engine
/** Inform about computation block through User->ADIOS->IO */
virtual void ExitComputationBlock() noexcept;

void RegisterCreatedVariable(const VariableBase *var);
void RemoveCreatedVars();

protected:
/** from ADIOS class passed to Engine created with Open
* if no communicator is passed */
Expand Down Expand Up @@ -637,6 +641,8 @@ class Engine
bool m_BetweenStepPairs = false;

private:
std::unordered_set<const VariableBase *> m_CreatedVars;

/** Throw exception by Engine virtual functions not implemented/supported by
* a derived class */
void ThrowUp(const std::string function) const;
Expand Down
1 change: 1 addition & 0 deletions source/adios2/engine/bp3/BP3Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ void BP3Reader::DoClose(const int transportIndex)
{
PERFSTUBS_SCOPED_TIMER("BP3Reader::Close");
PerformGets();
RemoveCreatedVars();
m_SubFileManager.CloseFiles();
m_FileManager.CloseFiles();
}
Expand Down
8 changes: 5 additions & 3 deletions source/adios2/engine/bp4/BP4Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,9 +665,8 @@ size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant,
}
void BP4Reader::ProcessMetadataForNewSteps(const size_t newIdxSize)
{
/* Remove all existing variables from previous steps
It seems easier than trying to update them */
m_IO.RemoveAllVariables();
/* Remove all variables we created in the last step */
RemoveCreatedVars();

/* Parse metadata index table (without header) */
/* We need to skew the index table pointers with the
Expand Down Expand Up @@ -819,6 +818,9 @@ void BP4Reader::DoClose(const int transportIndex)
helper::Log("Engine", "BP4Reader", "Close", m_Name, 0, m_Comm.Rank(), 5,
m_Verbosity, helper::LogMode::INFO);
PerformGets();
/* Remove all variables we created in the last step */
RemoveCreatedVars();

m_DataFileManager.CloseFiles();
m_MDFileManager.CloseFiles();
}
Expand Down
8 changes: 5 additions & 3 deletions source/adios2/engine/sst/SstReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ SstReader::SstReader(IO &io, const std::string &name, const Mode mode,
&(Reader->m_IO.DefineVariable<T>(variableName)); \
variable->SetData((T *)data); \
variable->m_AvailableStepsCount = 1; \
Reader->RegisterCreatedVariable(variable); \
return (void *)variable; \
}

Expand Down Expand Up @@ -163,6 +164,7 @@ SstReader::SstReader(IO &io, const std::string &name, const Mode mode,
Variable<T> *variable = &(Reader->m_IO.DefineVariable<T>( \
variableName, VecShape, VecStart, VecCount)); \
variable->m_AvailableStepsCount = 1; \
Reader->RegisterCreatedVariable(variable); \
return (void *)variable; \
}
ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)
Expand Down Expand Up @@ -323,7 +325,7 @@ StepStatus SstReader::BeginStep(StepMode Mode, const float timeout_sec)
case adios2::StepMode::Read:
break;
}
m_IO.RemoveAllVariables();
RemoveCreatedVars();
result = SstAdvanceStep(m_Input, timeout_sec);
if (result == SstEndOfStream)
{
Expand Down Expand Up @@ -379,7 +381,7 @@ StepStatus SstReader::BeginStep(StepMode Mode, const float timeout_sec)
i++;
}

m_IO.RemoveAllVariables();
RemoveCreatedVars();
m_BP5Deserializer->SetupForStep(
SstCurrentStep(m_Input),
static_cast<size_t>(m_CurrentStepMetaData->WriterCohortSize));
Expand Down Expand Up @@ -444,7 +446,7 @@ StepStatus SstReader::BeginStep(StepMode Mode, const float timeout_sec)
(*m_CurrentStepMetaData->WriterMetadata)->block,
(*m_CurrentStepMetaData->WriterMetadata)->DataSize);

m_IO.RemoveAllVariables();
RemoveCreatedVars();
m_BP3Deserializer->ParseMetadata(m_BP3Deserializer->m_Metadata, *this);
m_IO.ResetVariablesStepSelection(true,
"in call to SST Reader BeginStep");
Expand Down
3 changes: 3 additions & 0 deletions source/adios2/toolkit/format/bp/bp3/BP3Deserializer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@ inline void BP3Deserializer::DefineVariableInEngineIO<std::string>(
{
std::lock_guard<std::mutex> lock(m_Mutex);
variable = &engine.m_IO.DefineVariable<std::string>(variableName);
engine.RegisterCreatedVariable(variable);
variable->m_Value =
characteristics.Statistics.Value; // assigning first step

Expand Down Expand Up @@ -846,6 +847,8 @@ void BP3Deserializer::DefineVariableInEngineIO(const ElementIndexHeader &header,
variableName + ", in call to Open");
} // end switch

engine.RegisterCreatedVariable(variable);

if (characteristics.Statistics.IsValue)
{
variable->m_Value = characteristics.Statistics.Value;
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,7 @@ inline void BP4Deserializer::DefineVariableInEngineIOPerStep<std::string>(
{
std::lock_guard<std::mutex> lock(m_Mutex);
variable = &engine.m_IO.DefineVariable<std::string>(variableName);
engine.RegisterCreatedVariable(variable);
variable->m_Value =
characteristics.Statistics.Value; // assigning first step

Expand Down Expand Up @@ -1001,6 +1002,7 @@ void BP4Deserializer::DefineVariableInEngineIOPerStep(
variableName + ", in call to Open");
} // end switch

engine.RegisterCreatedVariable(variable);
if (characteristics.Statistics.IsValue)
{
variable->m_Value = characteristics.Statistics.Value;
Expand Down
73 changes: 69 additions & 4 deletions source/adios2/toolkit/sst/cp/cp_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,71 @@
char *SSTStreamStatusStr[] = {"NotOpen", "Opening", "Established",
"PeerClosed", "PeerFailed", "Closed"};

#ifdef MUTEX_DEBUG
#define STREAM_MUTEX_LOCK(Stream) \
{ \
fprintf(stderr, "(PID %lx, TID %lx) CP_COMMON Trying lock line %d\n", \
(long)getpid(), (long)gettid(), __LINE__); \
pthread_mutex_lock(&Stream->DataLock); \
Stream->Locked++; \
fprintf(stderr, "(PID %lx, TID %lx) CP_COMMON Got lock\n", \
(long)getpid(), (long)gettid()); \
}

#define STREAM_MUTEX_UNLOCK(Stream) \
{ \
fprintf(stderr, "(PID %lx, TID %lx) CP_COMMON UNlocking line %d\n", \
(long)getpid(), (long)gettid(), __LINE__); \
Stream->Locked--; \
pthread_mutex_unlock(&Stream->DataLock); \
}
#define STREAM_CONDITION_WAIT(Stream) \
{ \
fprintf( \
stderr, \
"(PID %lx, TID %lx) CP_COMMON Dropping Condition Lock line %d\n", \
(long)getpid(), (long)gettid(), __LINE__); \
Stream->Locked = 0; \
pthread_cond_wait(&Stream->DataCondition, &Stream->DataLock); \
fprintf( \
stderr, \
"(PID %lx, TID %lx) CP_COMMON Acquired Condition Lock line %d\n", \
(long)getpid(), (long)gettid(), __LINE__); \
Stream->Locked = 1; \
}
#define STREAM_CONDITION_SIGNAL(Stream) \
{ \
assert(Stream->Locked == 1); \
fprintf(stderr, \
"(PID %lx, TID %lx) CP_COMMON Signalling Condition line %d\n", \
(long)getpid(), (long)gettid(), __LINE__); \
pthread_cond_signal(&Stream->DataCondition); \
}

#define STREAM_ASSERT_LOCKED(Stream) \
{ \
assert(Stream->Locked == 1); \
}
#else
#define STREAM_MUTEX_LOCK(Stream) \
{ \
pthread_mutex_lock(&Stream->DataLock); \
}
#define STREAM_MUTEX_UNLOCK(Stream) \
{ \
pthread_mutex_unlock(&Stream->DataLock); \
}
#define STREAM_CONDITION_WAIT(Stream) \
{ \
pthread_cond_wait(&Stream->DataCondition, &Stream->DataLock); \
}
#define STREAM_CONDITION_SIGNAL(Stream) \
{ \
pthread_cond_signal(&Stream->DataCondition); \
}
#define STREAM_ASSERT_LOCKED(Stream)
#endif

void CP_validateParams(SstStream Stream, SstParams Params, int Writer)
{
if (Params->RendezvousReaderCount >= 0)
Expand Down Expand Up @@ -1127,9 +1192,9 @@ extern void SstStreamDestroy(SstStream Stream)
* in a safe way after all streams have been destroyed
*/
struct _SstStream StackStream;
pthread_mutex_lock(&Stream->DataLock);
CP_verbose(Stream, PerStepVerbose, "Destroying stream %p, name %s\n",
Stream, Stream->Filename);
STREAM_MUTEX_LOCK(Stream);
StackStream = *Stream;
Stream->Status = Destroyed;
struct _TimestepMetadataList *Next = Stream->Timesteps;
Expand All @@ -1141,7 +1206,7 @@ extern void SstStreamDestroy(SstStream Stream)
}
if (Stream->DP_Stream)
{
pthread_mutex_unlock(&Stream->DataLock);
STREAM_MUTEX_UNLOCK(Stream);
if (Stream->Role == ReaderRole)
{
Stream->DP_Interface->destroyReader(&Svcs, Stream->DP_Stream);
Expand All @@ -1150,7 +1215,7 @@ extern void SstStreamDestroy(SstStream Stream)
{
Stream->DP_Interface->destroyWriter(&Svcs, Stream->DP_Stream);
}
pthread_mutex_lock(&Stream->DataLock);
STREAM_MUTEX_LOCK(Stream);
}
if (Stream->Readers)
{
Expand Down Expand Up @@ -1276,7 +1341,7 @@ extern void SstStreamDestroy(SstStream Stream)
FreeCustomStructs(&Stream->CPInfo->CustomStructs);
free(Stream->CPInfo);

pthread_mutex_unlock(&Stream->DataLock);
STREAM_MUTEX_UNLOCK(Stream);
// Stream is free'd in LastCall

pthread_mutex_lock(&StateMutex);
Expand Down
4 changes: 3 additions & 1 deletion testing/adios2/interface/TestADIOSInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,10 @@ TYPED_TEST(ADIOS2_CXX11_API_MultiBlock, Put2File)
writer.Put(var, &myData[b][0], TypeParam::PutMode);
}

reader.Close();
// Close the writer before the reader because the var goes away when the
// reader does
writer.Close();
reader.Close();
this->CheckOutput(filename);
}

Expand Down

0 comments on commit 15eb09e

Please sign in to comment.