From dd4c43edff1415fb25043eee700b530e0e493032 Mon Sep 17 00:00:00 2001 From: Vicente Adolfo Bolea Sanchez Date: Tue, 22 Mar 2022 11:27:15 -0400 Subject: [PATCH] DP: add MPI dataplane, mpi_dp --- CMakeLists.txt | 8 +- cmake/DetectOptions.cmake | 3 + source/adios2/toolkit/sst/CMakeLists.txt | 6 + source/adios2/toolkit/sst/cp/cp_common.c | 27 +- source/adios2/toolkit/sst/cp/cp_writer.c | 20 + source/adios2/toolkit/sst/dp/dp.c | 7 + source/adios2/toolkit/sst/dp/mpi_dp.c | 917 ++++++++++++++++++ .../engine/staging-common/CMakeLists.txt | 4 + 8 files changed, 975 insertions(+), 17 deletions(-) create mode 100644 source/adios2/toolkit/sst/dp/mpi_dp.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 1268116008..b51a7de966 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -92,11 +92,11 @@ if(NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES) set(CMAKE_BUILD_TYPE "Release" CACHE STRING "Choose the type of build." FORCE) endif() -# Force C++11 and C99 +# Force C++11 and C11 set(CMAKE_CXX_STANDARD 11) set(CMAKE_CXX_STANDARD_REQUIRED True) if(NOT MSVC) - set(CMAKE_C_STANDARD 99) + set(CMAKE_C_STANDARD 11) set(CMAKE_C_STANDARD_REQUIRED True) endif() @@ -116,9 +116,9 @@ else() set(ADIOS2_CXX11_FEATURES cxx_std_11) endif() if(CMAKE_C_COMPILER_ID MATCHES "^(GNU|Intel|Clang|AppleClang|MSVC)$") - set(ADIOS2_C99_FEATURES c_restrict) + set(ADIOS2_C11_FEATURES c_std_11) else() - set(ADIOS2_C99_FEATURES c_std_99) + set(ADIOS2_C11_FEATURES c_std_99) endif() include(CMakeDependentOption) diff --git a/cmake/DetectOptions.cmake b/cmake/DetectOptions.cmake index 4aae71c5ca..814af99964 100644 --- a/cmake/DetectOptions.cmake +++ b/cmake/DetectOptions.cmake @@ -362,6 +362,9 @@ if(ADIOS2_USE_SST AND NOT WIN32) set(ADIOS2_SST_HAVE_CRAY_DRC TRUE) endif() endif() + if(ADIOS2_HAVE_MPI) + set(ADIOS2_SST_HAVE_MPI TRUE) + endif() endif() # DAOS diff --git a/source/adios2/toolkit/sst/CMakeLists.txt b/source/adios2/toolkit/sst/CMakeLists.txt index 7f3911b960..be3ae8b761 100644 --- a/source/adios2/toolkit/sst/CMakeLists.txt +++ b/source/adios2/toolkit/sst/CMakeLists.txt @@ -39,6 +39,11 @@ if(ADIOS2_HAVE_ZFP) target_link_libraries(sst PRIVATE zfp::zfp) endif() +if(ADIOS2_HAVE_MPI) + target_sources(sst PRIVATE dp/mpi_dp.c) + target_link_libraries(sst PRIVATE MPI::MPI_C) +endif() + # Set library version information set_target_properties(sst PROPERTIES OUTPUT_NAME adios2${ADIOS2_LIBRARY_SUFFIX}_sst @@ -54,6 +59,7 @@ set(SST_CONFIG_OPTS FI_GNI CRAY_DRC NVStream + MPI ) include(SSTFunctions) GenerateSSTHeaderConfig(${SST_CONFIG_OPTS}) diff --git a/source/adios2/toolkit/sst/cp/cp_common.c b/source/adios2/toolkit/sst/cp/cp_common.c index 76f2c023a4..a4c9cf2d15 100644 --- a/source/adios2/toolkit/sst/cp/cp_common.c +++ b/source/adios2/toolkit/sst/cp/cp_common.c @@ -1124,19 +1124,6 @@ extern void SstStreamDestroy(SstStream Stream) free(Stream->Timesteps); Stream->Timesteps = Next; } - if (Stream->DP_Stream) - { - pthread_mutex_unlock(&Stream->DataLock); - if (Stream->Role == ReaderRole) - { - Stream->DP_Interface->destroyReader(&Svcs, Stream->DP_Stream); - } - else - { - Stream->DP_Interface->destroyWriter(&Svcs, Stream->DP_Stream); - } - pthread_mutex_lock(&Stream->DataLock); - } if (Stream->Readers) { for (int i = 0; i < Stream->ReaderCount; i++) @@ -1170,6 +1157,20 @@ extern void SstStreamDestroy(SstStream Stream) Stream->Readers = NULL; } + if (Stream->DP_Stream) + { + pthread_mutex_unlock(&Stream->DataLock); + if (Stream->Role == ReaderRole) + { + Stream->DP_Interface->destroyReader(&Svcs, Stream->DP_Stream); + } + else + { + Stream->DP_Interface->destroyWriter(&Svcs, Stream->DP_Stream); + } + pthread_mutex_lock(&Stream->DataLock); + } + FFSFormatList FFSList = Stream->PreviousFormats; Stream->PreviousFormats = NULL; free(Stream->ReleaseList); diff --git a/source/adios2/toolkit/sst/cp/cp_writer.c b/source/adios2/toolkit/sst/cp/cp_writer.c index 2ec5765383..d99ccb37b9 100644 --- a/source/adios2/toolkit/sst/cp/cp_writer.c +++ b/source/adios2/toolkit/sst/cp/cp_writer.c @@ -1425,6 +1425,14 @@ static void CloseWSRStream(CManager cm, void *WSR_Stream_v) "Delayed task Moving Reader stream %p to status %s\n", CP_WSR_Stream, SSTStreamStatusStr[PeerClosed]); CP_PeerFailCloseWSReader(CP_WSR_Stream, PeerClosed); + + if (strncmp("mpi", ParentStream->ConfigParams->DataTransport, 3) == 0 && + CP_WSR_Stream->DP_WSR_Stream) + { + CP_WSR_Stream->ParentStream->DP_Interface->destroyWriterPerReader( + &Svcs, CP_WSR_Stream->DP_WSR_Stream); + CP_WSR_Stream->DP_WSR_Stream = NULL; + } STREAM_MUTEX_UNLOCK(ParentStream); } @@ -1476,6 +1484,18 @@ static void CP_PeerFailCloseWSReader(WS_ReaderInfo CP_WSR_Stream, CMfree(CMadd_delayed_task(ParentStream->CPInfo->SharedCM->cm, 2, 0, CloseWSRStream, CP_WSR_Stream)); } + else + { + if (strncmp("mpi", ParentStream->ConfigParams->DataTransport, 3) == + 0 && + CP_WSR_Stream->DP_WSR_Stream) + { + CP_WSR_Stream->ParentStream->DP_Interface + ->destroyWriterPerReader(&Svcs, + CP_WSR_Stream->DP_WSR_Stream); + CP_WSR_Stream->DP_WSR_Stream = NULL; + } + } } CP_verbose(ParentStream, PerStepVerbose, "Moving Reader stream %p to status %s\n", CP_WSR_Stream, diff --git a/source/adios2/toolkit/sst/dp/dp.c b/source/adios2/toolkit/sst/dp/dp.c index 755c6400a2..37f6ab45ab 100644 --- a/source/adios2/toolkit/sst/dp/dp.c +++ b/source/adios2/toolkit/sst/dp/dp.c @@ -16,6 +16,9 @@ extern CP_DP_Interface LoadRdmaDP(); #ifdef SST_HAVE_DAOS extern CP_DP_Interface LoadDaosDP(); #endif /* SST_HAVE_LIBFABRIC */ +#ifdef SST_HAVE_MPI +extern CP_DP_Interface LoadMpiDP(); +#endif /* SST_HAVE_MPI*/ extern CP_DP_Interface LoadEVpathDP(); typedef struct _DPElement @@ -68,6 +71,10 @@ CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream, AddDPPossibility(Svcs, CP_Stream, List, LoadDaosDP(), "daos", Params); #endif /* SST_HAVE_DAOS */ +#ifdef SST_HAVE_MPI + List = AddDPPossibility(Svcs, CP_Stream, List, LoadMpiDP(), "mpi", Params); +#endif /* SST_HAVE_MPI */ + int SelectedDP = -1; int BestPriority = -1; int BestPrioDP = -1; diff --git a/source/adios2/toolkit/sst/dp/mpi_dp.c b/source/adios2/toolkit/sst/dp/mpi_dp.c new file mode 100644 index 0000000000..3b63ddc6b1 --- /dev/null +++ b/source/adios2/toolkit/sst/dp/mpi_dp.c @@ -0,0 +1,917 @@ +#include "dp_interface.h" +#include "sst_data.h" +#include + +#if !defined(__linux__) && !defined(__APPLE__) +#error "mpi_dp.c only supported in UNIX-like platforms." +#endif + +#include +#include +#include +#include +#include +#include + +#define MPI_DP_CONTACT_STRING_LEN 64 + +static pthread_mutex_t ws_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t ts_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t rs_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_once_t mpi_initialize_once = PTHREAD_ONCE_INIT; + +typedef struct _Mpi_RS_Stream +{ + CManager cm; + void *CP_Stream; + CMFormat ReadRequestFormat; + int Rank; + int PID; + + /* writer info */ + int WriterCohortSize; + CP_PeerCohort PeerCohort; + struct _MpiWriterContactInfo *WriterContactInfo; + SstStats Stats; +} * Mpi_RS_Stream; + +typedef struct _Mpi_WSR_Stream +{ + struct _Mpi_WS_Stream *WS_Stream; + CP_PeerCohort PeerCohort; + int ReaderCohortSize; + struct _MpiReaderContactInfo *ReaderContactInfo; + SstStats Stats; + char MpiPortName[MPI_MAX_PORT_NAME]; +} * Mpi_WSR_Stream; + +typedef struct _TimestepEntry +{ + long Timestep; + struct _SstData *Data; + struct _MpiPerTimestepInfo *DP_TimestepInfo; + struct _TimestepEntry *Next; +} * TimestepList; + +typedef struct _Mpi_WS_Stream +{ + CManager cm; + void *CP_Stream; + int Rank; + int PID; + + TimestepList Timesteps; + CMFormat ReadReplyFormat; + + int ReaderCount; + Mpi_WSR_Stream *Readers; +} * Mpi_WS_Stream; + +typedef struct _MpiReaderContactInfo +{ + char *ContactString; + void *RS_Stream; + MPI_Comm MpiComm; +} * MpiReaderContactInfo; + +typedef struct _MpiWriterContactInfo +{ + char *ContactString; + void *WS_Stream; + MPI_Comm MpiComm; + int PID; +} * MpiWriterContactInfo; + +enum MPI_DP_COMM_TYPE +{ + MPI_DP_REMOTE = 0, + MPI_DP_LOCAL = 1, +}; + +typedef struct _MpiReadRequestMsg +{ + long Timestep; + size_t Offset; + size_t Length; + void *WS_Stream; + void *RS_Stream; + int RequestingRank; + int NotifyCondition; + enum MPI_DP_COMM_TYPE CommType; +} * MpiReadRequestMsg; + +static FMField MpiReadRequestList[] = { + {"Timestep", "integer", sizeof(long), + FMOffset(MpiReadRequestMsg, Timestep)}, + {"Offset", "integer", sizeof(size_t), FMOffset(MpiReadRequestMsg, Offset)}, + {"Length", "integer", sizeof(size_t), FMOffset(MpiReadRequestMsg, Length)}, + {"WS_Stream", "integer", sizeof(void *), + FMOffset(MpiReadRequestMsg, WS_Stream)}, + {"RS_Stream", "integer", sizeof(void *), + FMOffset(MpiReadRequestMsg, RS_Stream)}, + {"RequestingRank", "integer", sizeof(int), + FMOffset(MpiReadRequestMsg, RequestingRank)}, + {"NotifyCondition", "integer", sizeof(int), + FMOffset(MpiReadRequestMsg, NotifyCondition)}, + {"CommType", "integer", sizeof(int), FMOffset(MpiReadRequestMsg, CommType)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec MpiReadRequestStructs[] = { + {"MpiReadRequest", MpiReadRequestList, sizeof(struct _MpiReadRequestMsg), + NULL}, + {NULL, NULL, 0, NULL}}; + +typedef struct _MpiReadReplyMsg +{ + long Timestep; + size_t DataLength; + void *RS_Stream; + int NotifyCondition; + char *MpiPortName; + char *Data; +} * MpiReadReplyMsg; + +static FMField MpiReadReplyList[] = { + {"Timestep", "integer", sizeof(long), FMOffset(MpiReadReplyMsg, Timestep)}, + {"RS_Stream", "integer", sizeof(void *), + FMOffset(MpiReadReplyMsg, RS_Stream)}, + {"DataLength", "integer", sizeof(size_t), + FMOffset(MpiReadReplyMsg, DataLength)}, + {"NotifyCondition", "integer", sizeof(int), + FMOffset(MpiReadReplyMsg, NotifyCondition)}, + {"MpiPortName", "string", sizeof(char *), + FMOffset(MpiReadReplyMsg, MpiPortName)}, + {"Data", "char[DataLength]", sizeof(char), FMOffset(MpiReadReplyMsg, Data)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec MpiReadReplyStructs[] = { + {"MpiReadReply", MpiReadReplyList, sizeof(struct _MpiReadReplyMsg), NULL}, + {NULL, NULL, 0, NULL}}; + +static FMField MpiReaderContactList[] = { + {"ContactString", "string", sizeof(char *), + FMOffset(MpiReaderContactInfo, ContactString)}, + {"reader_ID", "integer", sizeof(void *), + FMOffset(MpiReaderContactInfo, RS_Stream)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec MpiReaderContactStructs[] = { + {"MpiReaderContactInfo", MpiReaderContactList, + sizeof(struct _MpiReaderContactInfo), NULL}, + {NULL, NULL, 0, NULL}}; + +static FMField MpiWriterContactList[] = { + {"ContactString", "string", sizeof(char *), + FMOffset(MpiWriterContactInfo, ContactString)}, + {"writer_ID", "integer", sizeof(void *), + FMOffset(MpiWriterContactInfo, WS_Stream)}, + {"MpiComm", "integer", sizeof(int), + FMOffset(MpiWriterContactInfo, MpiComm)}, + {"PID", "integer", sizeof(int), FMOffset(MpiWriterContactInfo, PID)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec MpiWriterContactStructs[] = { + {"MpiWriterContactInfo", MpiWriterContactList, + sizeof(struct _MpiWriterContactInfo), NULL}, + {NULL, NULL, 0, NULL}}; + +static void MpiReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, + void *client_Data, attr_list attrs); + +/** + * + * + * + * + */ +static void Initialize_MPI() +{ + int IsInitialized = 0; + int provided; + + MPI_Initialized(&IsInitialized); + if (!IsInitialized) + { + MPI_Init_thread(NULL, NULL, MPI_THREAD_MULTIPLE, &provided); + } + else + { + MPI_Query_thread(&provided); + } + + if (provided != MPI_THREAD_MULTIPLE) + { + fprintf(stderr, + "MPI init without MPI_THREAD_MULTIPLE (Externally " + "initialized:%s)\n", + IsInitialized ? "true" : "false"); + } +} + +/* + * + * InitReader. Called by the control plane collectively during the early + * stages of Open on the reader side. It should do whatever is necessary to + * initialize a new reader-side data plane. A pointer to per-reader-rank + * contact information should be placed in *ReaderContactInfoPtr. The structure + * of that information should be described by DPInterface.ReaderContactFormats. + * (This is an FFS format description. See + * https://www.cc.gatech.edu/systems/projects/FFS/.) + * + */ +static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream, + void **ReaderContactInfoPtr, + struct _SstParams *Params, + attr_list WriterContact, SstStats Stats) +{ + pthread_once(&mpi_initialize_once, Initialize_MPI); + + Mpi_RS_Stream Stream = calloc(sizeof(struct _Mpi_RS_Stream), 1); + MpiReaderContactInfo Contact = + calloc(sizeof(struct _MpiReaderContactInfo), 1); + CManager cm = Svcs->getCManager(CP_Stream); + char *MpiContactString = calloc(sizeof(char), MPI_DP_CONTACT_STRING_LEN); + SMPI_Comm comm = Svcs->getMPIComm(CP_Stream); + CMFormat F; + + Stream->CP_Stream = CP_Stream; + Stream->Stats = Stats; + Stream->PID = getpid(); + + SMPI_Comm_rank(comm, &Stream->Rank); + + snprintf(MpiContactString, MPI_DP_CONTACT_STRING_LEN, "Reader Rank %d", + Stream->Rank); + + /* + * add a handler for read reply messages + */ + Stream->ReadRequestFormat = CMregister_format(cm, MpiReadRequestStructs); + F = CMregister_format(cm, MpiReadReplyStructs); + CMregister_handler(F, MpiReadReplyHandler, Svcs); + + Contact->ContactString = MpiContactString; + Contact->RS_Stream = Stream; + + *ReaderContactInfoPtr = Contact; + + Svcs->verbose(Stream->CP_Stream, DPTraceVerbose, + "MPI dataplane reader initialized, reader rank %d", + Stream->Rank); + + return Stream; +} + +static char *FetchTimeStep(TimestepList timesteps, long timestep, long offset, + long length) +{ + TimestepList ts = timesteps; + + // Find the requested timestep + while (ts != NULL && ts->Timestep != timestep) + { + ts = ts->Next; + } + + if (ts == NULL) + { + fprintf(stderr, "Failed to read Timestep %ld, not found\n", timestep); + return NULL; + } + + char *outboundBuffer = malloc(sizeof(char) * length); + memcpy(outboundBuffer, ts->Data->block + offset, length); + + return outboundBuffer; +} + +static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, + void *client_Data, attr_list attrs) +{ + PERFSTUBS_TIMER_START_FUNC(timer); + MpiReadRequestMsg ReadRequestMsg = (MpiReadRequestMsg)msg_v; + Mpi_WSR_Stream WSR_Stream = ReadRequestMsg->WS_Stream; + + Mpi_WS_Stream WS_Stream = WSR_Stream->WS_Stream; + CP_Services Svcs = (CP_Services)client_Data; + + Svcs->verbose(WS_Stream->CP_Stream, DPTraceVerbose, + "MpiReadRequestHandler:" + "read request from reader=%d,ts=%d,off=%d,len=%d\n", + ReadRequestMsg->RequestingRank, ReadRequestMsg->Timestep, + ReadRequestMsg->Offset, ReadRequestMsg->Length); + + char *outboundBuffer = NULL; + + pthread_mutex_lock(&ts_mutex); + if (NULL == (outboundBuffer = FetchTimeStep( + WS_Stream->Timesteps, ReadRequestMsg->Timestep, + ReadRequestMsg->Offset, ReadRequestMsg->Length))) + { + PERFSTUBS_TIMER_STOP_FUNC(timer); + pthread_mutex_unlock(&ts_mutex); + return; + } + pthread_mutex_unlock(&ts_mutex); + + struct _MpiReadReplyMsg ReadReplyMsg = { + .Timestep = ReadRequestMsg->Timestep, + .DataLength = ReadRequestMsg->Length, + .RS_Stream = ReadRequestMsg->RS_Stream, + .NotifyCondition = ReadRequestMsg->NotifyCondition, + .MpiPortName = WSR_Stream->MpiPortName, + }; + + if (MPI_DP_LOCAL == ReadRequestMsg->CommType) + { + ReadReplyMsg.Data = outboundBuffer; + } + + Svcs->verbose( + WS_Stream->CP_Stream, DPTraceVerbose, + "MpiReadRequestHandler: Replying reader=%d with MPI port name=%s\n", + ReadRequestMsg->RequestingRank, WSR_Stream->MpiPortName); + + Svcs->sendToPeer(WS_Stream->CP_Stream, WSR_Stream->PeerCohort, + ReadRequestMsg->RequestingRank, WS_Stream->ReadReplyFormat, + &ReadReplyMsg); + + if (MPI_DP_REMOTE == ReadRequestMsg->CommType) + { + // Send the actual Data using MPI + MPI_Comm *comm = + &WSR_Stream->ReaderContactInfo[ReadRequestMsg->RequestingRank] + .MpiComm; + MPI_Errhandler worldErrHandler; + MPI_Comm_get_errhandler(MPI_COMM_WORLD, &worldErrHandler); + MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN); + int ret = MPI_Send(outboundBuffer, ReadRequestMsg->Length, MPI_CHAR, 0, + ReadRequestMsg->NotifyCondition, *comm); + MPI_Comm_set_errhandler(MPI_COMM_WORLD, worldErrHandler); + + if (ret != MPI_SUCCESS) + { + MPI_Comm_accept(WSR_Stream->MpiPortName, MPI_INFO_NULL, 0, + MPI_COMM_SELF, comm); + Svcs->verbose( + WS_Stream->CP_Stream, DPTraceVerbose, + "MpiReadRequestHandler: Accepted client, ReaderCohortSize=%d\n", + WSR_Stream->ReaderCohortSize); + MPI_Send(outboundBuffer, ReadRequestMsg->Length, MPI_CHAR, 0, + ReadRequestMsg->NotifyCondition, *comm); + } + } + + free(outboundBuffer); + + PERFSTUBS_TIMER_STOP_FUNC(timer); +} + +typedef struct _MpiCompletionHandle +{ + int CMcondition; + CManager cm; + void *CPStream; + void *Buffer; + int Rank; + enum MPI_DP_COMM_TYPE CommType; +} * MpiCompletionHandle; + +/** + * + * + */ +static void MpiReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, + void *client_Data, attr_list attrs) +{ + PERFSTUBS_TIMER_START_FUNC(timer); + MpiReadReplyMsg ReadReplyMsg = (MpiReadReplyMsg)msg_v; + Mpi_RS_Stream RS_Stream = ReadReplyMsg->RS_Stream; + CP_Services Svcs = (CP_Services)client_Data; + MpiCompletionHandle Handle = + CMCondition_get_client_data(cm, ReadReplyMsg->NotifyCondition); + + Svcs->verbose( + RS_Stream->CP_Stream, DPTraceVerbose, + "MpiReadReplyHandler: Read recv from rank=%d,condition=%d,size=%d\n", + Handle->Rank, ReadReplyMsg->NotifyCondition, ReadReplyMsg->DataLength); + + if (MPI_DP_LOCAL == Handle->CommType) + { + memcpy(Handle->Buffer, ReadReplyMsg->Data, ReadReplyMsg->DataLength); + } + else + { + pthread_mutex_lock(&rs_mutex); + MPI_Comm comm = RS_Stream->WriterContactInfo[Handle->Rank].MpiComm; + pthread_mutex_unlock(&rs_mutex); + + MPI_Errhandler worldErrHandler; + MPI_Comm_get_errhandler(MPI_COMM_WORLD, &worldErrHandler); + MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN); + int ret = + MPI_Recv(Handle->Buffer, ReadReplyMsg->DataLength, MPI_CHAR, 0, + ReadReplyMsg->NotifyCondition, comm, MPI_STATUS_IGNORE); + MPI_Comm_set_errhandler(MPI_COMM_WORLD, worldErrHandler); + + if (ret != MPI_SUCCESS) + { + MPI_Comm_connect(ReadReplyMsg->MpiPortName, MPI_INFO_NULL, 0, + MPI_COMM_SELF, &comm); + + Svcs->verbose(RS_Stream->CP_Stream, DPTraceVerbose, + "MpiReadReplyHandler: Connecting to MPI Server\n"); + MPI_Recv(Handle->Buffer, ReadReplyMsg->DataLength, MPI_CHAR, 0, + ReadReplyMsg->NotifyCondition, comm, MPI_STATUS_IGNORE); + } + + pthread_mutex_lock(&rs_mutex); + RS_Stream->WriterContactInfo[Handle->Rank].MpiComm = comm; + pthread_mutex_unlock(&rs_mutex); + } + RS_Stream->Stats->DataBytesReceived += ReadReplyMsg->DataLength; + + /* + * Signal the condition to wake the reader if they are waiting. + */ + CMCondition_signal(cm, ReadReplyMsg->NotifyCondition); + PERFSTUBS_TIMER_STOP_FUNC(timer); +} + +/* + * + * InitWriter. Called by the control plane collectively during the early + * stages of Open on the writer side. It should do whatever is necessary to + * initialize a new writer-side data plane. This does *not* include creating + * contact information per se. That can be put off until InitWriterPerReader(). + * + */ +static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream, + struct _SstParams *Params, attr_list DPAttrs, + SstStats Stats) +{ + pthread_once(&mpi_initialize_once, Initialize_MPI); + + Mpi_WS_Stream Stream = calloc(sizeof(struct _Mpi_WS_Stream), 1); + CManager cm = Svcs->getCManager(CP_Stream); + SMPI_Comm comm = Svcs->getMPIComm(CP_Stream); + CMFormat F; + + SMPI_Comm_rank(comm, &Stream->Rank); + + Stream->CP_Stream = CP_Stream; + Stream->PID = getpid(); + + /* + * add a handler for read request messages + */ + F = CMregister_format(cm, MpiReadRequestStructs); + CMregister_handler(F, MpiReadRequestHandler, Svcs); + + /* + * register read reply message structure so we can send later + */ + Stream->ReadReplyFormat = CMregister_format(cm, MpiReadReplyStructs); + + return (void *)Stream; +} + +/* + * + * InitWriterPerReader. Called by the control plane collectively when + * accepting a new reader connection. It receives the per-rank reader contact + * information (as created on the connecting peer in InitReader) and should + * create its own per-writer-rank contact information and place it in + * *writerContactInfoPtr. The structure of that information should be described + * by DPInterface.WriterContactFormats. (This is an FFS format description. + * See https://www.cc.gatech.edu/systems/projects/FFS/.) + * + */ +static DP_WSR_Stream +MpiInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_Stream_v, + int readerCohortSize, CP_PeerCohort PeerCohort, + void **providedReaderInfo_v, void **WriterContactInfoPtr) +{ + Mpi_WS_Stream WS_Stream = (Mpi_WS_Stream)WS_Stream_v; + Mpi_WSR_Stream WSR_Stream = calloc(sizeof(struct _Mpi_WSR_Stream), 1); + MpiWriterContactInfo ContactInfo; + SMPI_Comm comm = Svcs->getMPIComm(WS_Stream->CP_Stream); + MpiReaderContactInfo *providedReaderInfo = + (MpiReaderContactInfo *)providedReaderInfo_v; + char *MpiContactString = calloc(sizeof(char), MPI_DP_CONTACT_STRING_LEN); + + int Rank; + SMPI_Comm_rank(comm, &Rank); + snprintf(MpiContactString, MPI_DP_CONTACT_STRING_LEN, + "Writer Rank %d, test contact", Rank); + + MPI_Open_port(MPI_INFO_NULL, WSR_Stream->MpiPortName); + + WSR_Stream->WS_Stream = WS_Stream; /* pointer to writer struct */ + WSR_Stream->PeerCohort = PeerCohort; + WSR_Stream->ReaderCohortSize = readerCohortSize; + + Svcs->verbose(WS_Stream->CP_Stream, DPTraceVerbose, + "MPI dataplane WriterPerReader to be initialized\n"); + + /* + * make a copy of writer contact information (original will not be + * preserved) + */ + WSR_Stream->ReaderContactInfo = + calloc(sizeof(struct _MpiReaderContactInfo), readerCohortSize); + for (int i = 0; i < readerCohortSize; i++) + { + WSR_Stream->ReaderContactInfo[i].ContactString = + strdup(providedReaderInfo[i]->ContactString); + WSR_Stream->ReaderContactInfo[i].RS_Stream = + providedReaderInfo[i]->RS_Stream; + WSR_Stream->ReaderContactInfo[i].MpiComm = MPI_COMM_NULL; + Svcs->verbose( + WS_Stream->CP_Stream, DPTraceVerbose, + "Received contact info \"%s\", RD_Stream %p for Reader Rank %d\n", + WSR_Stream->ReaderContactInfo[i].ContactString, + WSR_Stream->ReaderContactInfo[i].RS_Stream, i); + } + + /* + * add this writer-side reader-specific stream to the parent writer stream + * structure + */ + pthread_mutex_lock(&ws_mutex); + WS_Stream->Readers = realloc( + WS_Stream->Readers, sizeof(*WSR_Stream) * (WS_Stream->ReaderCount + 1)); + WS_Stream->Readers[WS_Stream->ReaderCount] = WSR_Stream; + WS_Stream->ReaderCount++; + pthread_mutex_unlock(&ws_mutex); + + ContactInfo = calloc(sizeof(struct _MpiWriterContactInfo), 1); + ContactInfo->ContactString = MpiContactString; + ContactInfo->WS_Stream = WSR_Stream; + ContactInfo->PID = WS_Stream->PID; + *WriterContactInfoPtr = ContactInfo; + + return WSR_Stream; +} + +static void MpiProvideWriterDataToReader(CP_Services Svcs, + DP_RS_Stream RS_Stream_v, + int writerCohortSize, + CP_PeerCohort PeerCohort, + void **providedWriterInfo_v) +{ + Mpi_RS_Stream RS_Stream = (Mpi_RS_Stream)RS_Stream_v; + MpiWriterContactInfo *providedWriterInfo = + (MpiWriterContactInfo *)providedWriterInfo_v; + + RS_Stream->PeerCohort = PeerCohort; + RS_Stream->WriterCohortSize = writerCohortSize; + + /* + * make a copy of writer contact information (original will not be + * preserved) + */ + struct _MpiWriterContactInfo *tmp = + calloc(sizeof(struct _MpiWriterContactInfo), writerCohortSize); + for (int i = 0; i < writerCohortSize; i++) + { + tmp[i].ContactString = strdup(providedWriterInfo[i]->ContactString); + tmp[i].WS_Stream = providedWriterInfo[i]->WS_Stream; + tmp[i].MpiComm = MPI_COMM_NULL; + tmp[i].PID = providedWriterInfo[i]->PID; + + if (RS_Stream->WriterContactInfo && + RS_Stream->WriterContactInfo[i].MpiComm != MPI_COMM_NULL) + { + MPI_Comm_disconnect(&RS_Stream->WriterContactInfo[i].MpiComm); + } + + Svcs->verbose(RS_Stream->CP_Stream, DPTraceVerbose, + "Received contact info \"%s\", WS_stream %p for WSR Rank " + "%d\n", + tmp[i].ContactString, tmp[i].WS_Stream, i); + } + + RS_Stream->WriterContactInfo = tmp; +} + +typedef struct _MpiPerTimestepInfo +{ + char *CheckString; +} * MpiPerTimestepInfo; + +/* + * + * ReadRemoteMemory. Called by the control plane on the reader side to + * request that timestep data from the writer side, identified by Rank, + * Timestep, starting at a particular Offset and continuing for Length, be + * placed into a local Buffer. The DP_TimestepInfo value will be the per-rank + * per-timestep information that was created during ProvideTimestep by that + * writer rank for that timestep. This is an asyncronous request in that it + * need not be completed before this call returns. The void* return value will + * later be passed to a WaitForCompletion call and should represent a completion + * handle. + * + */ +static void *MpiReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, + int Rank, long Timestep, size_t Offset, + size_t Length, void *Buffer, + void *DP_TimestepInfo) +{ + Mpi_RS_Stream Stream = (Mpi_RS_Stream) + Stream_v; /* DP_RS_Stream is the return from InitReader */ + CManager cm = Svcs->getCManager(Stream->CP_Stream); + MpiCompletionHandle ret = calloc(sizeof(struct _MpiCompletionHandle), 1); + + ret->CMcondition = CMCondition_get(cm, NULL); + ret->CPStream = Stream->CP_Stream; + ret->cm = cm; + ret->Buffer = Buffer; + ret->Rank = Rank; + ret->CommType = (Stream->WriterContactInfo[Rank].PID == Stream->PID) + ? MPI_DP_LOCAL + : MPI_DP_REMOTE; + + /* + * set the completion handle as client Data on the condition so that + * handler has access to it. + */ + CMCondition_set_client_data(cm, ret->CMcondition, ret); + + Svcs->verbose( + Stream->CP_Stream, DPTraceVerbose, + "Reader (rank %d) requesting to read remote memory for Timestep %d " + "from Rank %d, WSR_Stream = %p, Offset=%d, Length=%d\n", + Stream->Rank, Timestep, Rank, Stream->WriterContactInfo[Rank].WS_Stream, + Offset, Length); + + /* send request to appropriate writer */ + struct _MpiReadRequestMsg ReadRequestMsg = { + .Timestep = Timestep, + .Offset = Offset, + .Length = Length, + .WS_Stream = Stream->WriterContactInfo[Rank].WS_Stream, + .RS_Stream = Stream, + .RequestingRank = Stream->Rank, + .NotifyCondition = ret->CMcondition, + .CommType = ret->CommType}; + + Svcs->sendToPeer(Stream->CP_Stream, Stream->PeerCohort, Rank, + Stream->ReadRequestFormat, &ReadRequestMsg); + return ret; +} + +/* + * + * WaitForCompletion. Called by the control plane on the reader side with a + * Handle that is the return value of a prior ReadRemoteMemory call. This call + * should not return until that particular remote memory read is complete and + * the buffer is full. A zero return means that the read failed and will result + * in a (hopefully orderly) shutdown of the stream. + * + */ +static int MpiWaitForCompletion(CP_Services Svcs, void *Handle_v) +{ + MpiCompletionHandle Handle = (MpiCompletionHandle)Handle_v; + int Ret; + Svcs->verbose( + Handle->CPStream, DPTraceVerbose, + "Waiting for completion of memory read to rank %d, condition %d\n", + Handle->Rank, Handle->CMcondition); + /* + * Wait for the CM condition to be signalled. If it has been already, + * this returns immediately. Copying the incoming data to the waiting + * buffer has been done by the reply handler. + */ + Ret = CMCondition_wait(Handle->cm, Handle->CMcondition); + if (!Ret) + { + Svcs->verbose(Handle->CPStream, DPTraceVerbose, + "Remote memory read to rank %d with " + "condition %d has FAILED because of " + "writer failure\n", + Handle->Rank, Handle->CMcondition); + } + else + { + if (Handle->CMcondition != -1) + Svcs->verbose(Handle->CPStream, DPTraceVerbose, + "Remote memory read to rank %d with condition %d has " + "completed\n", + Handle->Rank, Handle->CMcondition); + } + free(Handle); + return Ret; +} + +/* + * + * ProvideTimestep. Called by the control plane collectively on the writer + * side to "give" the data plane new data that is should then serve to the + * readers. DP must do everything necessary here to allow future service (until + * ReleaseTimestep is called). The DP can create per-timestep per-rank + * identifying information that will be placed in the timestep metadata and + * provided to the reader during remote read requests. A pointer to this + * contact information should be placed in *TimestepInfoPtr. This structure + * should be described by DPInterface.TimestepInfoFormats. + * + */ +static void MpiProvideTimestep(CP_Services Svcs, DP_WS_Stream Stream_v, + struct _SstData *Data, + struct _SstData *LocalMetadata, long Timestep, + void **TimestepInfoPtr) +{ + Mpi_WS_Stream Stream = (Mpi_WS_Stream)Stream_v; + TimestepList Entry = calloc(sizeof(struct _TimestepEntry), 1); + struct _MpiPerTimestepInfo *Info = + calloc(sizeof(struct _MpiPerTimestepInfo), 1); + + Info->CheckString = malloc(MPI_DP_CONTACT_STRING_LEN); + snprintf(Info->CheckString, MPI_DP_CONTACT_STRING_LEN, + "Mpi info for timestep %ld from rank %d", Timestep, Stream->Rank); + + Entry->Data = malloc(sizeof(struct _SstData)); + memcpy(Entry->Data, Data, sizeof(struct _SstData)); + Entry->Timestep = Timestep; + Entry->DP_TimestepInfo = Info; + + pthread_mutex_lock(&ts_mutex); + Entry->Next = Stream->Timesteps; + Stream->Timesteps = Entry; + pthread_mutex_unlock(&ts_mutex); + *TimestepInfoPtr = Info; +} + +/* + * + * ReleaseTimestep. Called by the control plane collectively on the writer + * side to tell the data plane that a particular timestep is no longer required + * and any resources devoted to serving it can be released. + * + */ + +static void MpiReleaseTimestep(CP_Services Svcs, DP_WS_Stream Stream_v, + long Timestep) +{ + Mpi_WS_Stream Stream = (Mpi_WS_Stream)Stream_v; + TimestepList List = Stream->Timesteps; + + Svcs->verbose(Stream->CP_Stream, DPTraceVerbose, "Releasing timestep %ld\n", + Timestep); + + pthread_mutex_lock(&ts_mutex); + if (Stream->Timesteps->Timestep == Timestep) + { + Stream->Timesteps = List->Next; + free(List->Data); + free(List); + } + else + { + TimestepList last = List; + List = List->Next; + while (List != NULL) + { + if (List->Timestep == Timestep) + { + last->Next = List->Next; + free(List->Data); + free(List); + pthread_mutex_unlock(&ts_mutex); + return; + } + last = List; + List = List->Next; + } + /* + * Shouldn't ever get here because we should never release a + * timestep that we don't have. + */ + fprintf(stderr, "Failed to release Timestep %ld, not found\n", + Timestep); + } + pthread_mutex_unlock(&ts_mutex); +} + +static int MpiGetPriority(CP_Services Svcs, void *CP_Stream, + struct _SstParams *Params) +{ + int provided = 0; + + // Only enabled when MPI_THREAD_MULTIPLE + Initialize_MPI(); + MPI_Query_thread(&provided); + if (provided == MPI_THREAD_MULTIPLE) + { + return 100; + } + + return 0; +} + +static void MpiNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v, + int FailedPeerRank) +{ + /* DP_RS_Stream is the return from InitReader */ + Mpi_RS_Stream Stream = (Mpi_RS_Stream)Stream_v; + Svcs->verbose(Stream->CP_Stream, DPTraceVerbose, + "received notification that writer peer " + "%d has failed, failing any pending " + "requests\n", + FailedPeerRank); +} + +static void MpiDestroyWriterPerReader(CP_Services Svcs, + DP_WSR_Stream WSR_Stream_v) +{ + Mpi_WSR_Stream WSR_Stream = (Mpi_WSR_Stream)WSR_Stream_v; + Mpi_WS_Stream WS_Stream = WSR_Stream->WS_Stream; + + for (int i = 0; i < WSR_Stream->ReaderCohortSize; i++) + { + if (WSR_Stream->ReaderContactInfo[i].MpiComm != MPI_COMM_NULL) + { + MPI_Comm_disconnect(&WSR_Stream->ReaderContactInfo[i].MpiComm); + } + if (WSR_Stream->ReaderContactInfo[i].ContactString) + { + free(WSR_Stream->ReaderContactInfo[i].ContactString); + } + } + + if (WSR_Stream->ReaderContactInfo) + { + free(WSR_Stream->ReaderContactInfo); + } + + WSR_Stream->ReaderCohortSize = 0; + + MPI_Close_port(WSR_Stream->MpiPortName); + + pthread_mutex_lock(&ws_mutex); + for (int i = 0; i < WS_Stream->ReaderCount; i++) + { + if (WS_Stream->Readers[i] == WSR_Stream) + { + WS_Stream->Readers[i] = + WS_Stream->Readers[WS_Stream->ReaderCount - 1]; + break; + } + } + + WS_Stream->Readers = realloc( + WS_Stream->Readers, sizeof(*WSR_Stream) * (WS_Stream->ReaderCount - 1)); + WS_Stream->ReaderCount--; + pthread_mutex_unlock(&ws_mutex); + + free(WSR_Stream); +} + +static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v) +{ + Mpi_WS_Stream WS_Stream = (Mpi_WS_Stream)WS_Stream_v; + + pthread_mutex_lock(&ws_mutex); + free(WS_Stream->Readers); + free(WS_Stream); + pthread_mutex_unlock(&ws_mutex); +} + +static void MpiDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v) +{ + Mpi_RS_Stream RS_Stream = (Mpi_RS_Stream)RS_Stream_v; + + for (int i = 0; i < RS_Stream->WriterCohortSize; i++) + { + if (RS_Stream->WriterContactInfo[i].MpiComm != MPI_COMM_NULL) + { + MPI_Comm_disconnect(&RS_Stream->WriterContactInfo[i].MpiComm); + } + free(RS_Stream->WriterContactInfo[i].ContactString); + } + free(RS_Stream->WriterContactInfo); + free(RS_Stream); +} + +extern CP_DP_Interface LoadMpiDP() +{ + static struct _CP_DP_Interface mpiDPInterface = { + .ReaderContactFormats = MpiReaderContactStructs, + .WriterContactFormats = MpiWriterContactStructs, + .initReader = MpiInitReader, + .initWriter = MpiInitWriter, + .initWriterPerReader = MpiInitWriterPerReader, + .provideWriterDataToReader = MpiProvideWriterDataToReader, + .readRemoteMemory = MpiReadRemoteMemory, + .waitForCompletion = MpiWaitForCompletion, + .provideTimestep = MpiProvideTimestep, + .releaseTimestep = MpiReleaseTimestep, + .getPriority = MpiGetPriority, + .destroyReader = MpiDestroyReader, + .destroyWriter = MpiDestroyWriter, + .destroyWriterPerReader = MpiDestroyWriterPerReader, + .notifyConnFailure = MpiNotifyConnFailure, + }; + + return &mpiDPInterface; +} diff --git a/testing/adios2/engine/staging-common/CMakeLists.txt b/testing/adios2/engine/staging-common/CMakeLists.txt index d8f972ae78..f96a34f6c3 100644 --- a/testing/adios2/engine/staging-common/CMakeLists.txt +++ b/testing/adios2/engine/staging-common/CMakeLists.txt @@ -9,9 +9,13 @@ if(ADIOS2_HAVE_SST) gtest_add_tests_helper(StagingMPMD MPI_ONLY "" Engine.Staging. ".SST.FFS" EXTRA_ARGS "SST" "MarshalMethod=FFS") gtest_add_tests_helper(StagingMPMD MPI_ONLY "" Engine.Staging. ".SST.BP" EXTRA_ARGS "SST" "MarshalMethod=BP") gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. ".SST.FFS" EXTRA_ARGS "SST" "--engine_params" "MarshalMethod=FFS") + set_tests_properties( ${Test.Engine.Staging.Threads-TESTS} PROPERTIES RUN_SERIAL TRUE) gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. ".SST.BP" EXTRA_ARGS "SST" "--engine_params" "MarshalMethod=BP") + set_tests_properties( ${Test.Engine.Staging.Threads-TESTS} PROPERTIES RUN_SERIAL TRUE) gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. ".BP4_stream" EXTRA_ARGS "BP4" "--engine_params" "OpenTimeoutSecs=5") + set_tests_properties( ${Test.Engine.Staging.Threads-TESTS} PROPERTIES RUN_SERIAL TRUE) gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. ".FileStream" EXTRA_ARGS "FileStream") + set_tests_properties( ${Test.Engine.Staging.Threads-TESTS} PROPERTIES RUN_SERIAL TRUE) endif() foreach(helper