Skip to content

Commit

Permalink
Merge pull request #717 from eisenhauer/SSTFailureCheckOnReaderSide
Browse files Browse the repository at this point in the history
Sst failure check on reader side
  • Loading branch information
eisenhauer authored Jul 16, 2018
2 parents aca89d3 + d323fed commit 4c6e11f
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 27 deletions.
9 changes: 8 additions & 1 deletion source/adios2/engine/sst/SstReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,15 @@ void SstReader::EndStep()
{
if (m_WriterMarshalMethod == SstMarshalFFS)
{
SstStatusValue Result;
// this does all the deferred gets and fills in the variable array data
SstFFSPerformGets(m_Input);
Result = SstFFSPerformGets(m_Input);
if (Result != SstSuccess)
{
// tentative, until we change EndStep so that it has a return value
throw std::runtime_error(
"ERROR: Writer failed before returning data");
}
}
if (m_WriterMarshalMethod == SstMarshalBP)
{
Expand Down
40 changes: 32 additions & 8 deletions source/adios2/toolkit/sst/cp/cp_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,18 @@ static char *readContactInfo(const char *Name, SstStream Stream)
}
}

static void ReaderConnCloseHandler(CManager cm, CMConnection closed_conn,
static void ReaderConnCloseHandler(CManager cm, CMConnection ClosedConn,
void *client_data)
{
SstStream Stream = (SstStream)client_data;
int FailedPeerRank = -1;
for (int i = 0; i < Stream->WriterCohortSize; i++)
{
if (Stream->ConnectionsToWriter[i].CMconn == ClosedConn)
{
FailedPeerRank = i;
}
}

if (Stream->Status == Established)
{
Expand All @@ -93,9 +101,21 @@ static void ReaderConnCloseHandler(CManager cm, CMConnection closed_conn,
CP_verbose(Stream, "Reader-side Rank received a "
"connection-close event during normal "
"operations, peer likely failed\n");
Stream->Status = PeerClosed;
pthread_mutex_lock(&Stream->DataLock);
Stream->Status = PeerFailed;
pthread_cond_signal(&Stream->DataCondition);
pthread_mutex_unlock(&Stream->DataLock);
CP_verbose(
Stream,
"The close was for connection to writer peer %d, notifying DP\n",
FailedPeerRank);
/* notify DP of failure. This should terminate any waits currently
* pending in the DP for that rank */
Stream->DP_Interface->notifyConnFailure(&Svcs, Stream->DP_Stream,
FailedPeerRank);
}
else if ((Stream->Status == PeerClosed) || (Stream->Status == Closed))
else if ((Stream->Status == PeerClosed) || (Stream->Status == PeerFailed) ||
(Stream->Status == Closed))
{
/* ignore this. We expect a close after the connection is marked closed
*/
Expand All @@ -106,11 +126,10 @@ static void ReaderConnCloseHandler(CManager cm, CMConnection closed_conn,
else
{
fprintf(stderr, "Got an unexpected connection close event\n");
CP_verbose(Stream, "Writer-side Rank received a "
CP_verbose(Stream, "Reader-side Rank received a "
"connection-close event in unexpected "
"state %d\n",
Stream->Status);
Stream->Status = PeerFailed;
}
}

Expand Down Expand Up @@ -738,7 +757,12 @@ extern void SstReaderClose(SstStream Stream)

extern SstStatusValue SstWaitForCompletion(SstStream Stream, void *handle)
{
// We need a way to return an error from DP */
Stream->DP_Interface->waitForCompletion(&Svcs, handle);
return SstSuccess;
if (Stream->DP_Interface->waitForCompletion(&Svcs, handle) != 1)
{
return SstFatalError;
}
else
{
return SstSuccess;
}
}
26 changes: 20 additions & 6 deletions source/adios2/toolkit/sst/cp/ffs_marshal.c
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ static void DecodeAndPrepareData(SstStream Stream, int Writer)
}
}

static void WaitForReadRequests(SstStream Stream)
static SstStatusValue WaitForReadRequests(SstStream Stream)
{
struct FFSReaderMarshalBase *Info = Stream->ReaderMarshalData;

Expand All @@ -830,10 +830,14 @@ static void WaitForReadRequests(SstStream Stream)
}
else
{
/* handle errors here */
CP_verbose(Stream, "Wait for remote read completion failed, "
"returning failure\n");
return Result;
}
}
}
CP_verbose(Stream, "All remote memory reads completed\n");
return SstSuccess;
}

static void MapLocalToGlobalIndex(size_t Dims, const size_t *LocalIndex,
Expand Down Expand Up @@ -1117,17 +1121,27 @@ static void FillReadRequests(SstStream Stream, FFSArrayRequest Reqs)
}
}

extern void SstFFSPerformGets(SstStream Stream)
extern SstStatusValue SstFFSPerformGets(SstStream Stream)
{
struct FFSReaderMarshalBase *Info = Stream->ReaderMarshalData;
SstStatusValue Ret;

IssueReadRequests(Stream, Info->PendingVarRequests);

WaitForReadRequests(Stream);

FillReadRequests(Stream, Info->PendingVarRequests);
Ret = WaitForReadRequests(Stream);

if (Ret == SstSuccess)
{
FillReadRequests(Stream, Info->PendingVarRequests);
}
else
{
CP_verbose(Stream, "Some memory read failed, not filling requests and "
"returning failure\n");
}
ClearReadRequests(Stream);

return Ret;
}

extern void SstFFSWriterEndStep(SstStream Stream, size_t Timestep)
Expand Down
122 changes: 114 additions & 8 deletions source/adios2/toolkit/sst/dp/evpath_dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ typedef struct _Evpath_RS_Stream
int WriterCohortSize;
CP_PeerCohort PeerCohort;
struct _EvpathWriterContactInfo *WriterContactInfo;
struct _EvpathCompletionHandle *PendingReadRequests;
} * Evpath_RS_Stream;

typedef struct _Evpath_WSR_Stream
Expand Down Expand Up @@ -290,8 +291,11 @@ typedef struct _EvpathCompletionHandle
int CMcondition;
CManager cm;
void *CPStream;
void *DPStream;
void *Buffer;
int Failed;
int Rank;
struct _EvpathCompletionHandle *Next;
} * EvpathCompletionHandle;

static void EvpathReadReplyHandler(CManager cm, CMConnection conn, void *msg_v,
Expand All @@ -300,9 +304,24 @@ static void EvpathReadReplyHandler(CManager cm, CMConnection conn, void *msg_v,
EvpathReadReplyMsg ReadReplyMsg = (EvpathReadReplyMsg)msg_v;
Evpath_RS_Stream RS_Stream = ReadReplyMsg->RS_Stream;
CP_Services Svcs = (CP_Services)client_Data;
EvpathCompletionHandle Handle =
CMCondition_get_client_data(cm, ReadReplyMsg->NotifyCondition);
EvpathCompletionHandle Handle = NULL;

if (CMCondition_has_signaled(cm, ReadReplyMsg->NotifyCondition))
{
Svcs->verbose(RS_Stream->CP_Stream, "Got a reply to remote memory "
"read, but the condition is "
"already signalled, returning\n");
return;
}
Handle = CMCondition_get_client_data(cm, ReadReplyMsg->NotifyCondition);

if (!Handle)
{
Svcs->verbose(
RS_Stream->CP_Stream,
"Got a reply to remote memory read, but condition not found\n");
return;
}
Svcs->verbose(
RS_Stream->CP_Stream,
"Got a reply to remote memory read from rank %d, condition is %d\n",
Expand Down Expand Up @@ -474,6 +493,61 @@ static void EvpathProvideWriterDataToReader(CP_Services Svcs,
}
}

static void AddRequestToList(CP_Services Svcs, Evpath_RS_Stream Stream,
EvpathCompletionHandle Handle)
{
Handle->Next = Stream->PendingReadRequests;
Stream->PendingReadRequests = Handle;
}

static void RemoveRequestFromList(CP_Services Svcs, Evpath_RS_Stream Stream,
EvpathCompletionHandle Handle)
{
EvpathCompletionHandle Tmp = Stream->PendingReadRequests;

if (Stream->PendingReadRequests == Handle)
{
Stream->PendingReadRequests = Handle->Next;
return;
}

while (Tmp != NULL && Tmp->Next != Handle)
{
Tmp = Tmp->Next;
}

if (Tmp == NULL)
return;

// Tmp->Next must be the handle to remove
Tmp->Next = Tmp->Next->Next;
}

static void FailRequestsToRank(CP_Services Svcs, CManager cm,
Evpath_RS_Stream Stream, int FailedRank)
{
EvpathCompletionHandle Tmp = Stream->PendingReadRequests;
Svcs->verbose(Stream->CP_Stream,
"Fail pending requests to writer rank %d\n", FailedRank);
while (Tmp != NULL)
{
if (Tmp->Rank == FailedRank)
{
Tmp->Failed = 1;
Svcs->verbose(Tmp->CPStream, "Found a pending remote memory read "
"to failed writer rank %d, marking as "
"failed and signalling condition %d\n",
Tmp->Rank, Tmp->CMcondition);
CMCondition_signal(cm, Tmp->CMcondition);
Svcs->verbose(Tmp->CPStream, "Did the signal of condition %d\n",
Tmp->Rank, Tmp->CMcondition);
}
Tmp = Tmp->Next;
}
Svcs->verbose(Stream->CP_Stream,
"Done Failing requests to writer rank %d\n", FailedRank);
}

typedef struct _EvpathPerTimestepInfo
{
char *CheckString;
Expand All @@ -494,13 +568,16 @@ static void *EvpathReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v,

ret->CMcondition = CMCondition_get(cm, NULL);
ret->CPStream = Stream->CP_Stream;
ret->DPStream = Stream;
ret->Failed = 0;
ret->cm = cm;
ret->Buffer = Buffer;
ret->Rank = Rank;
/*
* set the completion handle as client Data on the condition so that
* handler has access to it.
*/
AddRequestToList(Svcs, Stream, ret);
CMCondition_set_client_data(cm, ret->CMcondition, ret);

Svcs->verbose(Stream->CP_Stream,
Expand All @@ -525,9 +602,10 @@ static void *EvpathReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v,
return ret;
}

static void EvpathWaitForCompletion(CP_Services Svcs, void *Handle_v)
static int EvpathWaitForCompletion(CP_Services Svcs, void *Handle_v)
{
EvpathCompletionHandle Handle = (EvpathCompletionHandle)Handle_v;
int Ret = 1;
Svcs->verbose(
Handle->CPStream,
"Waiting for completion of memory read to rank %d, condition %d\n",
Expand All @@ -537,12 +615,39 @@ static void EvpathWaitForCompletion(CP_Services Svcs, void *Handle_v)
* this returns immediately. Copying the incoming data to the waiting
* buffer has been done by the reply handler.
*/
CMCondition_wait(Handle->cm, Handle->CMcondition);
Svcs->verbose(
Handle->CPStream,
"Remote memory read to rank %d with condition %d has completed\n",
Handle->Rank, Handle->CMcondition);
if (Handle->CMcondition != -1)
CMCondition_wait(Handle->cm, Handle->CMcondition);
if (Handle->Failed)
{
Svcs->verbose(Handle->CPStream, "Remote memory read to rank %d with "
"condition %d has FAILED because of "
"writer failure\n",
Handle->Rank, Handle->CMcondition);
Ret = 0;
}
else
{
Svcs->verbose(
Handle->CPStream,
"Remote memory read to rank %d with condition %d has completed\n",
Handle->Rank, Handle->CMcondition);
}
RemoveRequestFromList(Svcs, Handle->DPStream, Handle);
free(Handle);
return Ret;
}

static void EvpathNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v,
int FailedPeerRank)
{
Evpath_RS_Stream Stream = (Evpath_RS_Stream)
Stream_v; /* DP_RS_Stream is the return from InitReader */
CManager cm = Svcs->getCManager(Stream->CP_Stream);
Svcs->verbose(Stream->CP_Stream, "received notification that writer peer "
"%d has failed, failing any pending "
"requests\n",
FailedPeerRank);
FailRequestsToRank(Svcs, cm, Stream, FailedPeerRank);
}

static void EvpathProvideTimestep(CP_Services Svcs, DP_WS_Stream Stream_v,
Expand Down Expand Up @@ -660,6 +765,7 @@ extern CP_DP_Interface LoadEVpathDP()
EvpathProvideWriterDataToReader;
evpathDPInterface.readRemoteMemory = EvpathReadRemoteMemory;
evpathDPInterface.waitForCompletion = EvpathWaitForCompletion;
evpathDPInterface.notifyConnFailure = EvpathNotifyConnFailure;
evpathDPInterface.provideTimestep = EvpathProvideTimestep;
evpathDPInterface.releaseTimestep = EvpathReleaseTimestep;
evpathDPInterface.destroyReader = EvpathDestroyReader;
Expand Down
23 changes: 22 additions & 1 deletion source/adios2/toolkit/sst/dp/rdma_dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -737,9 +737,28 @@ static void *RdmaReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v,
return ret;
}

static void RdmaWaitForCompletion(CP_Services Svcs, void *Handle_v)
static void RdmaNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v,
int FailedPeerRank)
{
Rdma_RS_Stream Stream = (Rdma_RS_Stream)
Stream_v; /* DP_RS_Stream is the return from InitReader */
CManager cm = Svcs->getCManager(Stream->CP_Stream);
Svcs->verbose(Stream->CP_Stream, "received notification that writer peer "
"%d has failed, failing any pending "
"requests\n",
FailedPeerRank);
// This is what EVPath does...
// FailRequestsToRank(Svcs, cm, Stream, FailedPeerRank);
}

/*
* RdmaWaitForCompletion should return 1 if successful, but 0 if the reads
* failed for some reason or were aborted by RdmaNotifyConnFailure()
*/
static int RdmaWaitForCompletion(CP_Services Svcs, void *Handle_v)
{
RdmaCompletionHandle Handle = (RdmaCompletionHandle)Handle_v;
int Ret = 1;
Svcs->verbose(
Handle->CPStream,
"Waiting for completion of memory read to rank %d, condition %d\n",
Expand All @@ -756,6 +775,7 @@ static void RdmaWaitForCompletion(CP_Services Svcs, void *Handle_v)
Handle->Rank, Handle->CMcondition);

free(Handle);
return Ret;
}

static void RdmaProvideTimestep(CP_Services Svcs, DP_WS_Stream Stream_v,
Expand Down Expand Up @@ -975,6 +995,7 @@ extern CP_DP_Interface LoadRdmaDP()
RdmaDPInterface.provideWriterDataToReader = RdmaProvideWriterDataToReader;
RdmaDPInterface.readRemoteMemory = RdmaReadRemoteMemory;
RdmaDPInterface.waitForCompletion = RdmaWaitForCompletion;
RdmaDPInterface.notifyConnFailure = RdmaNotifyConnFailure;
RdmaDPInterface.provideTimestep = RdmaProvideTimestep;
RdmaDPInterface.releaseTimestep = RdmaReleaseTimestep;
RdmaDPInterface.destroyReader = RdmaDestroyReader;
Expand Down
Loading

0 comments on commit 4c6e11f

Please sign in to comment.