Skip to content

Commit

Permalink
Merge pull request #2289 from eisenhauer/SstFixes
Browse files Browse the repository at this point in the history
Sst fixes
  • Loading branch information
eisenhauer authored May 27, 2020
2 parents 494dc1d + d4f2d63 commit 506b950
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 22 deletions.
52 changes: 37 additions & 15 deletions source/adios2/toolkit/sst/cp/cp_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ static void ProcessReleaseList(SstStream Stream, ReturnMetadataInfo Metadata);
{ \
assert(Stream->Locked == 1); \
}
#define STREAM_ASSERT_UNLOCKED(Stream) \
{ \
STREAM_MUTEX_LOCK(Stream); \
STREAM_MUTEX_UNLOCK(Stream); \
}
#else
#define STREAM_MUTEX_LOCK(Stream) \
{ \
Expand All @@ -87,6 +92,7 @@ static void ProcessReleaseList(SstStream Stream, ReturnMetadataInfo Metadata);
pthread_cond_signal(&Stream->DataCondition); \
}
#define STREAM_ASSERT_LOCKED(Stream)
#define STREAM_ASSERT_UNLOCKED(Stream)
#endif

static char *buildContactInfo(SstStream Stream, attr_list DPAttrs)
Expand Down Expand Up @@ -510,8 +516,7 @@ static void SendPeerSetupMsg(WS_ReaderInfo reader, int reversePeer, int myRank)
setup.RS_Stream = reader->Connections[reversePeer].RemoteStreamID;
setup.WriterRank = myRank;
setup.WriterCohortSize = Stream->CohortSize;
STREAM_MUTEX_LOCK(Stream);
STREAM_MUTEX_UNLOCK(Stream);
STREAM_ASSERT_UNLOCKED(Stream);
if (CMwrite(conn, Stream->CPInfo->PeerSetupFormat, &setup) != 1)
{
CP_verbose(Stream,
Expand Down Expand Up @@ -942,8 +947,7 @@ WS_ReaderInfo WriterParticipateInReaderOpen(SstStream Stream)
(struct _CP_WriterInitInfo *)pointers[i]->CP_Info;
response.DP_WriterInfo[i] = pointers[i]->DP_Info;
}
STREAM_MUTEX_LOCK(Stream);
STREAM_MUTEX_UNLOCK(Stream);
STREAM_ASSERT_UNLOCKED(Stream);
if (CMwrite(conn, Stream->CPInfo->WriterResponseFormat, &response) != 1)
{
CP_verbose(
Expand Down Expand Up @@ -986,11 +990,19 @@ void sendOneToWSRCohort(WS_ReaderInfo CP_WSR_Stream, CMFormat f, void *Msg,
CP_verbose(Stream, "Sending a message to reader %d (%p)\n", peer,
*RS_StreamPtr);

if ((!conn) || (CMwrite(conn, f, Msg) != 1))
if (conn)
{
CP_verbose(Stream, "Message failed to send to reader %d (%p)\n",
peer, *RS_StreamPtr);
CP_PeerFailCloseWSReader(CP_WSR_Stream, PeerFailed);
int res;
STREAM_MUTEX_UNLOCK(Stream);
res = CMwrite(conn, f, Msg);
STREAM_MUTEX_LOCK(Stream);
if (res != 1)
{
CP_verbose(Stream,
"Message failed to send to reader %d (%p)\n",
peer, *RS_StreamPtr);
CP_PeerFailCloseWSReader(CP_WSR_Stream, PeerFailed);
}
}
j++;
}
Expand All @@ -1007,11 +1019,20 @@ void sendOneToWSRCohort(WS_ReaderInfo CP_WSR_Stream, CMFormat f, void *Msg,
*RS_StreamPtr = CP_WSR_Stream->Connections[peer].RemoteStreamID;
CP_verbose(Stream, "Sending a message to reader %d (%p)\n", peer,
*RS_StreamPtr);
if ((!conn) || (CMwrite(conn, f, Msg) != 1))

if (conn)
{
CP_verbose(Stream, "Message failed to send to reader %d (%p)\n",
peer, *RS_StreamPtr);
CP_PeerFailCloseWSReader(CP_WSR_Stream, PeerFailed);
int res;
STREAM_MUTEX_UNLOCK(Stream);
res = CMwrite(conn, f, Msg);
STREAM_MUTEX_LOCK(Stream);
if (res != 1)
{
CP_verbose(Stream,
"Message failed to send to reader %d (%p)\n",
peer, *RS_StreamPtr);
CP_PeerFailCloseWSReader(CP_WSR_Stream, PeerFailed);
}
}
}
}
Expand Down Expand Up @@ -1145,9 +1166,10 @@ static void SendTimestepEntryToSingleReader(SstStream Stream,

Entry->Msg->PreloadMode = PMode;
STREAM_MUTEX_LOCK(Stream);
sendOneToWSRCohort(CP_WSR_Stream,
Stream->CPInfo->DeliverTimestepMetadataFormat,
Entry->Msg, &Entry->Msg->RS_Stream);
if (CP_WSR_Stream->ReaderStatus == Established)
sendOneToWSRCohort(CP_WSR_Stream,
Stream->CPInfo->DeliverTimestepMetadataFormat,
Entry->Msg, &Entry->Msg->RS_Stream);
}
}

Expand Down
22 changes: 15 additions & 7 deletions source/adios2/toolkit/sst/dp/evpath_dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -331,18 +331,28 @@ static void EvpathDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v)
}

// writer side routine, called by the
static void MarkReadRequest(TimestepList TS, DP_WSR_Stream Reader,
static void MarkReadRequest(TimestepList TS, DP_WSR_Stream WSR_Stream_v,
int RequestingRank)
{
Evpath_WSR_Stream Reader = (Evpath_WSR_Stream)WSR_Stream_v;
ReaderRequestTrackPtr ReqList = TS->ReaderRequests;
while (ReqList != NULL)
{
if (ReqList->Reader == Reader)
{
ReqList->RequestList[RequestingRank] = 1;
return;
}
ReqList = ReqList->Next;
}
/* Didn't find this reader. go ahead and record read patterns, just in case
* we need them */
ReaderRequestTrackPtr ReqTrk = calloc(1, sizeof(*ReqTrk));
ReqTrk->Reader = Reader;
ReqTrk->RequestList = calloc(1, Reader->ReaderCohortSize);
ReqTrk->RequestList[RequestingRank] = 1;
ReqTrk->Next = TS->ReaderRequests;
TS->ReaderRequests = ReqTrk;
}

// writer side routine, called by the network handler thread
Expand Down Expand Up @@ -1098,12 +1108,6 @@ static void EvpathWSReaderRegisterTimestep(CP_Services Svcs,
pthread_mutex_unlock(&WS_Stream->DataLock);
return;
}
/* go ahead and record read patterns, just in case we need them */
ReaderRequestTrackPtr ReqTrk = calloc(1, sizeof(*ReqTrk));
ReqTrk->Reader = WSR_Stream;
ReqTrk->RequestList = calloc(1, WSR_Stream->ReaderCohortSize);
ReqTrk->Next = Entry->ReaderRequests;
Entry->ReaderRequests = ReqTrk;

Svcs->verbose(WS_Stream->CP_Stream,
"Per reader registration for timestep %ld, preload mode %d\n",
Expand Down Expand Up @@ -1173,6 +1177,10 @@ static void SendPreloadMsgs(CP_Services Svcs, Evpath_WSR_Stream WSR_Stream,
if (WSR_Stream->ReaderRequestArray[i])
{
PreloadMsg.RS_Stream = WSR_Stream->ReaderContactInfo[i].RS_Stream;
Svcs->verbose(
WS_Stream->CP_Stream,
"EVPATH Preload message for timestep %ld, going to rank %d\n",
TS->Timestep, i);
CMwrite(WSR_Stream->ReaderContactInfo[i].Conn,
WS_Stream->PreloadFormat, &PreloadMsg);
}
Expand Down

0 comments on commit 506b950

Please sign in to comment.