diff --git a/source/adios2/engine/sst/SstReader.cpp b/source/adios2/engine/sst/SstReader.cpp index f5772415de..c8bc3ef0e3 100644 --- a/source/adios2/engine/sst/SstReader.cpp +++ b/source/adios2/engine/sst/SstReader.cpp @@ -205,8 +205,15 @@ void SstReader::EndStep() { if (m_WriterMarshalMethod == SstMarshalFFS) { + SstStatusValue Result; // this does all the deferred gets and fills in the variable array data - SstFFSPerformGets(m_Input); + Result = SstFFSPerformGets(m_Input); + if (Result != SstSuccess) + { + // tentative, until we change EndStep so that it has a return value + throw std::runtime_error( + "ERROR: Writer failed before returning data"); + } } if (m_WriterMarshalMethod == SstMarshalBP) { diff --git a/source/adios2/toolkit/sst/cp/cp_reader.c b/source/adios2/toolkit/sst/cp/cp_reader.c index f0e9d8c1dd..40b07e3e7e 100644 --- a/source/adios2/toolkit/sst/cp/cp_reader.c +++ b/source/adios2/toolkit/sst/cp/cp_reader.c @@ -78,10 +78,18 @@ static char *readContactInfo(const char *Name, SstStream Stream) } } -static void ReaderConnCloseHandler(CManager cm, CMConnection closed_conn, +static void ReaderConnCloseHandler(CManager cm, CMConnection ClosedConn, void *client_data) { SstStream Stream = (SstStream)client_data; + int FailedPeerRank = -1; + for (int i = 0; i < Stream->WriterCohortSize; i++) + { + if (Stream->ConnectionsToWriter[i].CMconn == ClosedConn) + { + FailedPeerRank = i; + } + } if (Stream->Status == Established) { @@ -93,9 +101,21 @@ static void ReaderConnCloseHandler(CManager cm, CMConnection closed_conn, CP_verbose(Stream, "Reader-side Rank received a " "connection-close event during normal " "operations, peer likely failed\n"); - Stream->Status = PeerClosed; + pthread_mutex_lock(&Stream->DataLock); + Stream->Status = PeerFailed; + pthread_cond_signal(&Stream->DataCondition); + pthread_mutex_unlock(&Stream->DataLock); + CP_verbose( + Stream, + "The close was for connection to writer peer %d, notifying DP\n", + FailedPeerRank); + /* notify DP of failure. This should terminate any waits currently + * pending in the DP for that rank */ + Stream->DP_Interface->notifyConnFailure(&Svcs, Stream->DP_Stream, + FailedPeerRank); } - else if ((Stream->Status == PeerClosed) || (Stream->Status == Closed)) + else if ((Stream->Status == PeerClosed) || (Stream->Status == PeerFailed) || + (Stream->Status == Closed)) { /* ignore this. We expect a close after the connection is marked closed */ @@ -106,11 +126,10 @@ static void ReaderConnCloseHandler(CManager cm, CMConnection closed_conn, else { fprintf(stderr, "Got an unexpected connection close event\n"); - CP_verbose(Stream, "Writer-side Rank received a " + CP_verbose(Stream, "Reader-side Rank received a " "connection-close event in unexpected " "state %d\n", Stream->Status); - Stream->Status = PeerFailed; } } @@ -738,7 +757,12 @@ extern void SstReaderClose(SstStream Stream) extern SstStatusValue SstWaitForCompletion(SstStream Stream, void *handle) { - // We need a way to return an error from DP */ - Stream->DP_Interface->waitForCompletion(&Svcs, handle); - return SstSuccess; + if (Stream->DP_Interface->waitForCompletion(&Svcs, handle) != 1) + { + return SstFatalError; + } + else + { + return SstSuccess; + } } diff --git a/source/adios2/toolkit/sst/cp/ffs_marshal.c b/source/adios2/toolkit/sst/cp/ffs_marshal.c index 1c1dfeca0e..2d9bc5c4cf 100644 --- a/source/adios2/toolkit/sst/cp/ffs_marshal.c +++ b/source/adios2/toolkit/sst/cp/ffs_marshal.c @@ -813,7 +813,7 @@ static void DecodeAndPrepareData(SstStream Stream, int Writer) } } -static void WaitForReadRequests(SstStream Stream) +static SstStatusValue WaitForReadRequests(SstStream Stream) { struct FFSReaderMarshalBase *Info = Stream->ReaderMarshalData; @@ -830,10 +830,14 @@ static void WaitForReadRequests(SstStream Stream) } else { - /* handle errors here */ + CP_verbose(Stream, "Wait for remote read completion failed, " + "returning failure\n"); + return Result; } } } + CP_verbose(Stream, "All remote memory reads completed\n"); + return SstSuccess; } static void MapLocalToGlobalIndex(size_t Dims, const size_t *LocalIndex, @@ -1117,17 +1121,27 @@ static void FillReadRequests(SstStream Stream, FFSArrayRequest Reqs) } } -extern void SstFFSPerformGets(SstStream Stream) +extern SstStatusValue SstFFSPerformGets(SstStream Stream) { struct FFSReaderMarshalBase *Info = Stream->ReaderMarshalData; + SstStatusValue Ret; IssueReadRequests(Stream, Info->PendingVarRequests); - WaitForReadRequests(Stream); - - FillReadRequests(Stream, Info->PendingVarRequests); + Ret = WaitForReadRequests(Stream); + if (Ret == SstSuccess) + { + FillReadRequests(Stream, Info->PendingVarRequests); + } + else + { + CP_verbose(Stream, "Some memory read failed, not filling requests and " + "returning failure\n"); + } ClearReadRequests(Stream); + + return Ret; } extern void SstFFSWriterEndStep(SstStream Stream, size_t Timestep) diff --git a/source/adios2/toolkit/sst/dp/evpath_dp.c b/source/adios2/toolkit/sst/dp/evpath_dp.c index f330813750..416c1fdd7d 100644 --- a/source/adios2/toolkit/sst/dp/evpath_dp.c +++ b/source/adios2/toolkit/sst/dp/evpath_dp.c @@ -57,6 +57,7 @@ typedef struct _Evpath_RS_Stream int WriterCohortSize; CP_PeerCohort PeerCohort; struct _EvpathWriterContactInfo *WriterContactInfo; + struct _EvpathCompletionHandle *PendingReadRequests; } * Evpath_RS_Stream; typedef struct _Evpath_WSR_Stream @@ -290,8 +291,11 @@ typedef struct _EvpathCompletionHandle int CMcondition; CManager cm; void *CPStream; + void *DPStream; void *Buffer; + int Failed; int Rank; + struct _EvpathCompletionHandle *Next; } * EvpathCompletionHandle; static void EvpathReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, @@ -300,9 +304,24 @@ static void EvpathReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, EvpathReadReplyMsg ReadReplyMsg = (EvpathReadReplyMsg)msg_v; Evpath_RS_Stream RS_Stream = ReadReplyMsg->RS_Stream; CP_Services Svcs = (CP_Services)client_Data; - EvpathCompletionHandle Handle = - CMCondition_get_client_data(cm, ReadReplyMsg->NotifyCondition); + EvpathCompletionHandle Handle = NULL; + if (CMCondition_has_signaled(cm, ReadReplyMsg->NotifyCondition)) + { + Svcs->verbose(RS_Stream->CP_Stream, "Got a reply to remote memory " + "read, but the condition is " + "already signalled, returning\n"); + return; + } + Handle = CMCondition_get_client_data(cm, ReadReplyMsg->NotifyCondition); + + if (!Handle) + { + Svcs->verbose( + RS_Stream->CP_Stream, + "Got a reply to remote memory read, but condition not found\n"); + return; + } Svcs->verbose( RS_Stream->CP_Stream, "Got a reply to remote memory read from rank %d, condition is %d\n", @@ -474,6 +493,61 @@ static void EvpathProvideWriterDataToReader(CP_Services Svcs, } } +static void AddRequestToList(CP_Services Svcs, Evpath_RS_Stream Stream, + EvpathCompletionHandle Handle) +{ + Handle->Next = Stream->PendingReadRequests; + Stream->PendingReadRequests = Handle; +} + +static void RemoveRequestFromList(CP_Services Svcs, Evpath_RS_Stream Stream, + EvpathCompletionHandle Handle) +{ + EvpathCompletionHandle Tmp = Stream->PendingReadRequests; + + if (Stream->PendingReadRequests == Handle) + { + Stream->PendingReadRequests = Handle->Next; + return; + } + + while (Tmp != NULL && Tmp->Next != Handle) + { + Tmp = Tmp->Next; + } + + if (Tmp == NULL) + return; + + // Tmp->Next must be the handle to remove + Tmp->Next = Tmp->Next->Next; +} + +static void FailRequestsToRank(CP_Services Svcs, CManager cm, + Evpath_RS_Stream Stream, int FailedRank) +{ + EvpathCompletionHandle Tmp = Stream->PendingReadRequests; + Svcs->verbose(Stream->CP_Stream, + "Fail pending requests to writer rank %d\n", FailedRank); + while (Tmp != NULL) + { + if (Tmp->Rank == FailedRank) + { + Tmp->Failed = 1; + Svcs->verbose(Tmp->CPStream, "Found a pending remote memory read " + "to failed writer rank %d, marking as " + "failed and signalling condition %d\n", + Tmp->Rank, Tmp->CMcondition); + CMCondition_signal(cm, Tmp->CMcondition); + Svcs->verbose(Tmp->CPStream, "Did the signal of condition %d\n", + Tmp->Rank, Tmp->CMcondition); + } + Tmp = Tmp->Next; + } + Svcs->verbose(Stream->CP_Stream, + "Done Failing requests to writer rank %d\n", FailedRank); +} + typedef struct _EvpathPerTimestepInfo { char *CheckString; @@ -494,6 +568,8 @@ static void *EvpathReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, ret->CMcondition = CMCondition_get(cm, NULL); ret->CPStream = Stream->CP_Stream; + ret->DPStream = Stream; + ret->Failed = 0; ret->cm = cm; ret->Buffer = Buffer; ret->Rank = Rank; @@ -501,6 +577,7 @@ static void *EvpathReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, * set the completion handle as client Data on the condition so that * handler has access to it. */ + AddRequestToList(Svcs, Stream, ret); CMCondition_set_client_data(cm, ret->CMcondition, ret); Svcs->verbose(Stream->CP_Stream, @@ -525,9 +602,10 @@ static void *EvpathReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, return ret; } -static void EvpathWaitForCompletion(CP_Services Svcs, void *Handle_v) +static int EvpathWaitForCompletion(CP_Services Svcs, void *Handle_v) { EvpathCompletionHandle Handle = (EvpathCompletionHandle)Handle_v; + int Ret = 1; Svcs->verbose( Handle->CPStream, "Waiting for completion of memory read to rank %d, condition %d\n", @@ -537,12 +615,39 @@ static void EvpathWaitForCompletion(CP_Services Svcs, void *Handle_v) * this returns immediately. Copying the incoming data to the waiting * buffer has been done by the reply handler. */ - CMCondition_wait(Handle->cm, Handle->CMcondition); - Svcs->verbose( - Handle->CPStream, - "Remote memory read to rank %d with condition %d has completed\n", - Handle->Rank, Handle->CMcondition); + if (Handle->CMcondition != -1) + CMCondition_wait(Handle->cm, Handle->CMcondition); + if (Handle->Failed) + { + Svcs->verbose(Handle->CPStream, "Remote memory read to rank %d with " + "condition %d has FAILED because of " + "writer failure\n", + Handle->Rank, Handle->CMcondition); + Ret = 0; + } + else + { + Svcs->verbose( + Handle->CPStream, + "Remote memory read to rank %d with condition %d has completed\n", + Handle->Rank, Handle->CMcondition); + } + RemoveRequestFromList(Svcs, Handle->DPStream, Handle); free(Handle); + return Ret; +} + +static void EvpathNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v, + int FailedPeerRank) +{ + Evpath_RS_Stream Stream = (Evpath_RS_Stream) + Stream_v; /* DP_RS_Stream is the return from InitReader */ + CManager cm = Svcs->getCManager(Stream->CP_Stream); + Svcs->verbose(Stream->CP_Stream, "received notification that writer peer " + "%d has failed, failing any pending " + "requests\n", + FailedPeerRank); + FailRequestsToRank(Svcs, cm, Stream, FailedPeerRank); } static void EvpathProvideTimestep(CP_Services Svcs, DP_WS_Stream Stream_v, @@ -660,6 +765,7 @@ extern CP_DP_Interface LoadEVpathDP() EvpathProvideWriterDataToReader; evpathDPInterface.readRemoteMemory = EvpathReadRemoteMemory; evpathDPInterface.waitForCompletion = EvpathWaitForCompletion; + evpathDPInterface.notifyConnFailure = EvpathNotifyConnFailure; evpathDPInterface.provideTimestep = EvpathProvideTimestep; evpathDPInterface.releaseTimestep = EvpathReleaseTimestep; evpathDPInterface.destroyReader = EvpathDestroyReader; diff --git a/source/adios2/toolkit/sst/dp/rdma_dp.c b/source/adios2/toolkit/sst/dp/rdma_dp.c index 8d5957b2c8..2f2587381f 100644 --- a/source/adios2/toolkit/sst/dp/rdma_dp.c +++ b/source/adios2/toolkit/sst/dp/rdma_dp.c @@ -737,9 +737,28 @@ static void *RdmaReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, return ret; } -static void RdmaWaitForCompletion(CP_Services Svcs, void *Handle_v) +static void RdmaNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v, + int FailedPeerRank) +{ + Rdma_RS_Stream Stream = (Rdma_RS_Stream) + Stream_v; /* DP_RS_Stream is the return from InitReader */ + CManager cm = Svcs->getCManager(Stream->CP_Stream); + Svcs->verbose(Stream->CP_Stream, "received notification that writer peer " + "%d has failed, failing any pending " + "requests\n", + FailedPeerRank); + // This is what EVPath does... + // FailRequestsToRank(Svcs, cm, Stream, FailedPeerRank); +} + +/* + * RdmaWaitForCompletion should return 1 if successful, but 0 if the reads + * failed for some reason or were aborted by RdmaNotifyConnFailure() + */ +static int RdmaWaitForCompletion(CP_Services Svcs, void *Handle_v) { RdmaCompletionHandle Handle = (RdmaCompletionHandle)Handle_v; + int Ret = 1; Svcs->verbose( Handle->CPStream, "Waiting for completion of memory read to rank %d, condition %d\n", @@ -756,6 +775,7 @@ static void RdmaWaitForCompletion(CP_Services Svcs, void *Handle_v) Handle->Rank, Handle->CMcondition); free(Handle); + return Ret; } static void RdmaProvideTimestep(CP_Services Svcs, DP_WS_Stream Stream_v, @@ -975,6 +995,7 @@ extern CP_DP_Interface LoadRdmaDP() RdmaDPInterface.provideWriterDataToReader = RdmaProvideWriterDataToReader; RdmaDPInterface.readRemoteMemory = RdmaReadRemoteMemory; RdmaDPInterface.waitForCompletion = RdmaWaitForCompletion; + RdmaDPInterface.notifyConnFailure = RdmaNotifyConnFailure; RdmaDPInterface.provideTimestep = RdmaProvideTimestep; RdmaDPInterface.releaseTimestep = RdmaReleaseTimestep; RdmaDPInterface.destroyReader = RdmaDestroyReader; diff --git a/source/adios2/toolkit/sst/dp_interface.h b/source/adios2/toolkit/sst/dp_interface.h index 2c6faee351..dcf99e55fb 100644 --- a/source/adios2/toolkit/sst/dp_interface.h +++ b/source/adios2/toolkit/sst/dp_interface.h @@ -180,9 +180,22 @@ typedef DP_CompletionHandle (*CP_DP_ReadRemoteMemoryFunc)( * CP_DP_WaitForCompletionFunc is the type of a dataplane function that * suspends the execution of the current thread until the asynchronous * CP_DP_ReadRemoteMemory call that returned its `handle` parameter. + * the return value is 0 in the event that the wait failed, 1 on success. */ -typedef void (*CP_DP_WaitForCompletionFunc)(CP_Services Svcs, - DP_CompletionHandle Handle); +typedef int (*CP_DP_WaitForCompletionFunc)(CP_Services Svcs, + DP_CompletionHandle Handle); + +/*! + * CP_DP_NotifyConnFailureFunc is the type of a dataplane function which the + * control plane uses to notify the data plane that a CP-level connection + * has failed (and therefore the remote host is likely dead). This is an + * informational function by the CP, but as a side effect it should cause + * any pending Wait operations (for ReadRemoteMemory) to complete and return + * an error. + */ +typedef void (*CP_DP_NotifyConnFailureFunc)(CP_Services Svcs, + DP_RS_Stream RS_Stream, + int FailedPeerRank); /*! * CP_DP_ProvideTimestepFunc is the type of a dataplane function that @@ -226,6 +239,7 @@ struct _CP_DP_Interface CP_DP_ReadRemoteMemoryFunc readRemoteMemory; CP_DP_WaitForCompletionFunc waitForCompletion; + CP_DP_NotifyConnFailureFunc notifyConnFailure; CP_DP_ProvideTimestepFunc provideTimestep; CP_DP_ReleaseTimestepFunc releaseTimestep; diff --git a/source/adios2/toolkit/sst/sst.h b/source/adios2/toolkit/sst/sst.h index ba7878ecf7..f18c359ce1 100644 --- a/source/adios2/toolkit/sst/sst.h +++ b/source/adios2/toolkit/sst/sst.h @@ -104,7 +104,7 @@ extern void SstFFSGetDeferred(SstStream Stream, void *Variable, const size_t *Start, const size_t *Count, void *Data); -extern void SstFFSPerformGets(SstStream Stream); +extern SstStatusValue SstFFSPerformGets(SstStream Stream); extern int SstFFSWriterBeginStep(SstStream Stream, int mode, const float timeout_sec);