Skip to content

Commit

Permalink
Fix SST FFS marshalling
Browse files Browse the repository at this point in the history
  • Loading branch information
eisenhauer committed Jul 12, 2022
1 parent 7b66621 commit 5bf4a36
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 4 deletions.
2 changes: 2 additions & 0 deletions source/adios2/engine/sst/SstReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ SstReader::SstReader(IO &io, const std::string &name, const Mode mode,
&(Reader->m_IO.DefineVariable<T>(variableName)); \
variable->SetData((T *)data); \
variable->m_AvailableStepsCount = 1; \
Reader->RegisterCreatedVariable(variable); \
return (void *)variable; \
}

Expand Down Expand Up @@ -163,6 +164,7 @@ SstReader::SstReader(IO &io, const std::string &name, const Mode mode,
Variable<T> *variable = &(Reader->m_IO.DefineVariable<T>( \
variableName, VecShape, VecStart, VecCount)); \
variable->m_AvailableStepsCount = 1; \
Reader->RegisterCreatedVariable(variable); \
return (void *)variable; \
}
ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)
Expand Down
73 changes: 69 additions & 4 deletions source/adios2/toolkit/sst/cp/cp_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,71 @@
char *SSTStreamStatusStr[] = {"NotOpen", "Opening", "Established",
"PeerClosed", "PeerFailed", "Closed"};

#ifdef MUTEX_DEBUG
#define STREAM_MUTEX_LOCK(Stream) \
{ \
fprintf(stderr, "(PID %lx, TID %lx) CP_COMMON Trying lock line %d\n", \
(long)getpid(), (long)gettid(), __LINE__); \
pthread_mutex_lock(&Stream->DataLock); \
Stream->Locked++; \
fprintf(stderr, "(PID %lx, TID %lx) CP_COMMON Got lock\n", \
(long)getpid(), (long)gettid()); \
}

#define STREAM_MUTEX_UNLOCK(Stream) \
{ \
fprintf(stderr, "(PID %lx, TID %lx) CP_COMMON UNlocking line %d\n", \
(long)getpid(), (long)gettid(), __LINE__); \
Stream->Locked--; \
pthread_mutex_unlock(&Stream->DataLock); \
}
#define STREAM_CONDITION_WAIT(Stream) \
{ \
fprintf( \
stderr, \
"(PID %lx, TID %lx) CP_COMMON Dropping Condition Lock line %d\n", \
(long)getpid(), (long)gettid(), __LINE__); \
Stream->Locked = 0; \
pthread_cond_wait(&Stream->DataCondition, &Stream->DataLock); \
fprintf( \
stderr, \
"(PID %lx, TID %lx) CP_COMMON Acquired Condition Lock line %d\n", \
(long)getpid(), (long)gettid(), __LINE__); \
Stream->Locked = 1; \
}
#define STREAM_CONDITION_SIGNAL(Stream) \
{ \
assert(Stream->Locked == 1); \
fprintf(stderr, \
"(PID %lx, TID %lx) CP_COMMON Signalling Condition line %d\n", \
(long)getpid(), (long)gettid(), __LINE__); \
pthread_cond_signal(&Stream->DataCondition); \
}

#define STREAM_ASSERT_LOCKED(Stream) \
{ \
assert(Stream->Locked == 1); \
}
#else
#define STREAM_MUTEX_LOCK(Stream) \
{ \
pthread_mutex_lock(&Stream->DataLock); \
}
#define STREAM_MUTEX_UNLOCK(Stream) \
{ \
pthread_mutex_unlock(&Stream->DataLock); \
}
#define STREAM_CONDITION_WAIT(Stream) \
{ \
pthread_cond_wait(&Stream->DataCondition, &Stream->DataLock); \
}
#define STREAM_CONDITION_SIGNAL(Stream) \
{ \
pthread_cond_signal(&Stream->DataCondition); \
}
#define STREAM_ASSERT_LOCKED(Stream)
#endif

void CP_validateParams(SstStream Stream, SstParams Params, int Writer)
{
if (Params->RendezvousReaderCount >= 0)
Expand Down Expand Up @@ -1127,9 +1192,9 @@ extern void SstStreamDestroy(SstStream Stream)
* in a safe way after all streams have been destroyed
*/
struct _SstStream StackStream;
pthread_mutex_lock(&Stream->DataLock);
CP_verbose(Stream, PerStepVerbose, "Destroying stream %p, name %s\n",
Stream, Stream->Filename);
STREAM_MUTEX_LOCK(Stream);
StackStream = *Stream;
Stream->Status = Destroyed;
struct _TimestepMetadataList *Next = Stream->Timesteps;
Expand All @@ -1141,7 +1206,7 @@ extern void SstStreamDestroy(SstStream Stream)
}
if (Stream->DP_Stream)
{
pthread_mutex_unlock(&Stream->DataLock);
STREAM_MUTEX_UNLOCK(Stream);
if (Stream->Role == ReaderRole)
{
Stream->DP_Interface->destroyReader(&Svcs, Stream->DP_Stream);
Expand All @@ -1150,7 +1215,7 @@ extern void SstStreamDestroy(SstStream Stream)
{
Stream->DP_Interface->destroyWriter(&Svcs, Stream->DP_Stream);
}
pthread_mutex_lock(&Stream->DataLock);
STREAM_MUTEX_LOCK(Stream);
}
if (Stream->Readers)
{
Expand Down Expand Up @@ -1276,7 +1341,7 @@ extern void SstStreamDestroy(SstStream Stream)
FreeCustomStructs(&Stream->CPInfo->CustomStructs);
free(Stream->CPInfo);

pthread_mutex_unlock(&Stream->DataLock);
STREAM_MUTEX_UNLOCK(Stream);
// Stream is free'd in LastCall

pthread_mutex_lock(&StateMutex);
Expand Down

0 comments on commit 5bf4a36

Please sign in to comment.