Skip to content

Commit

Permalink
DP: add MPI dataplane, mpi_dp
Browse files Browse the repository at this point in the history
  • Loading branch information
vicentebolea committed Mar 29, 2022
1 parent d0e50e2 commit dd4c43e
Show file tree
Hide file tree
Showing 8 changed files with 975 additions and 17 deletions.
8 changes: 4 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ if(NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
set(CMAKE_BUILD_TYPE "Release" CACHE STRING "Choose the type of build." FORCE)
endif()

# Force C++11 and C99
# Force C++11 and C11
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED True)
if(NOT MSVC)
set(CMAKE_C_STANDARD 99)
set(CMAKE_C_STANDARD 11)
set(CMAKE_C_STANDARD_REQUIRED True)
endif()

Expand All @@ -116,9 +116,9 @@ else()
set(ADIOS2_CXX11_FEATURES cxx_std_11)
endif()
if(CMAKE_C_COMPILER_ID MATCHES "^(GNU|Intel|Clang|AppleClang|MSVC)$")
set(ADIOS2_C99_FEATURES c_restrict)
set(ADIOS2_C11_FEATURES c_std_11)
else()
set(ADIOS2_C99_FEATURES c_std_99)
set(ADIOS2_C11_FEATURES c_std_99)
endif()

include(CMakeDependentOption)
Expand Down
3 changes: 3 additions & 0 deletions cmake/DetectOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,9 @@ if(ADIOS2_USE_SST AND NOT WIN32)
set(ADIOS2_SST_HAVE_CRAY_DRC TRUE)
endif()
endif()
if(ADIOS2_HAVE_MPI)
set(ADIOS2_SST_HAVE_MPI TRUE)
endif()
endif()

# DAOS
Expand Down
6 changes: 6 additions & 0 deletions source/adios2/toolkit/sst/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ if(ADIOS2_HAVE_ZFP)
target_link_libraries(sst PRIVATE zfp::zfp)
endif()

if(ADIOS2_HAVE_MPI)
target_sources(sst PRIVATE dp/mpi_dp.c)
target_link_libraries(sst PRIVATE MPI::MPI_C)
endif()

# Set library version information
set_target_properties(sst PROPERTIES
OUTPUT_NAME adios2${ADIOS2_LIBRARY_SUFFIX}_sst
Expand All @@ -54,6 +59,7 @@ set(SST_CONFIG_OPTS
FI_GNI
CRAY_DRC
NVStream
MPI
)
include(SSTFunctions)
GenerateSSTHeaderConfig(${SST_CONFIG_OPTS})
Expand Down
27 changes: 14 additions & 13 deletions source/adios2/toolkit/sst/cp/cp_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1124,19 +1124,6 @@ extern void SstStreamDestroy(SstStream Stream)
free(Stream->Timesteps);
Stream->Timesteps = Next;
}
if (Stream->DP_Stream)
{
pthread_mutex_unlock(&Stream->DataLock);
if (Stream->Role == ReaderRole)
{
Stream->DP_Interface->destroyReader(&Svcs, Stream->DP_Stream);
}
else
{
Stream->DP_Interface->destroyWriter(&Svcs, Stream->DP_Stream);
}
pthread_mutex_lock(&Stream->DataLock);
}
if (Stream->Readers)
{
for (int i = 0; i < Stream->ReaderCount; i++)
Expand Down Expand Up @@ -1170,6 +1157,20 @@ extern void SstStreamDestroy(SstStream Stream)
Stream->Readers = NULL;
}

if (Stream->DP_Stream)
{
pthread_mutex_unlock(&Stream->DataLock);
if (Stream->Role == ReaderRole)
{
Stream->DP_Interface->destroyReader(&Svcs, Stream->DP_Stream);
}
else
{
Stream->DP_Interface->destroyWriter(&Svcs, Stream->DP_Stream);
}
pthread_mutex_lock(&Stream->DataLock);
}

FFSFormatList FFSList = Stream->PreviousFormats;
Stream->PreviousFormats = NULL;
free(Stream->ReleaseList);
Expand Down
20 changes: 20 additions & 0 deletions source/adios2/toolkit/sst/cp/cp_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1425,6 +1425,14 @@ static void CloseWSRStream(CManager cm, void *WSR_Stream_v)
"Delayed task Moving Reader stream %p to status %s\n",
CP_WSR_Stream, SSTStreamStatusStr[PeerClosed]);
CP_PeerFailCloseWSReader(CP_WSR_Stream, PeerClosed);

if (strncmp("mpi", ParentStream->ConfigParams->DataTransport, 3) == 0 &&
CP_WSR_Stream->DP_WSR_Stream)
{
CP_WSR_Stream->ParentStream->DP_Interface->destroyWriterPerReader(
&Svcs, CP_WSR_Stream->DP_WSR_Stream);
CP_WSR_Stream->DP_WSR_Stream = NULL;
}
STREAM_MUTEX_UNLOCK(ParentStream);
}

Expand Down Expand Up @@ -1476,6 +1484,18 @@ static void CP_PeerFailCloseWSReader(WS_ReaderInfo CP_WSR_Stream,
CMfree(CMadd_delayed_task(ParentStream->CPInfo->SharedCM->cm, 2, 0,
CloseWSRStream, CP_WSR_Stream));
}
else
{
if (strncmp("mpi", ParentStream->ConfigParams->DataTransport, 3) ==
0 &&
CP_WSR_Stream->DP_WSR_Stream)
{
CP_WSR_Stream->ParentStream->DP_Interface
->destroyWriterPerReader(&Svcs,
CP_WSR_Stream->DP_WSR_Stream);
CP_WSR_Stream->DP_WSR_Stream = NULL;
}
}
}
CP_verbose(ParentStream, PerStepVerbose,
"Moving Reader stream %p to status %s\n", CP_WSR_Stream,
Expand Down
7 changes: 7 additions & 0 deletions source/adios2/toolkit/sst/dp/dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ extern CP_DP_Interface LoadRdmaDP();
#ifdef SST_HAVE_DAOS
extern CP_DP_Interface LoadDaosDP();
#endif /* SST_HAVE_LIBFABRIC */
#ifdef SST_HAVE_MPI
extern CP_DP_Interface LoadMpiDP();
#endif /* SST_HAVE_MPI*/
extern CP_DP_Interface LoadEVpathDP();

typedef struct _DPElement
Expand Down Expand Up @@ -68,6 +71,10 @@ CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream,
AddDPPossibility(Svcs, CP_Stream, List, LoadDaosDP(), "daos", Params);
#endif /* SST_HAVE_DAOS */

#ifdef SST_HAVE_MPI
List = AddDPPossibility(Svcs, CP_Stream, List, LoadMpiDP(), "mpi", Params);
#endif /* SST_HAVE_MPI */

int SelectedDP = -1;
int BestPriority = -1;
int BestPrioDP = -1;
Expand Down
Loading

0 comments on commit dd4c43e

Please sign in to comment.