Skip to content

Commit

Permalink
SST: add MPI dataplane, MPI_DP
Browse files Browse the repository at this point in the history
  • Loading branch information
vicentebolea committed Jun 10, 2022
1 parent 9e836af commit 95c3627
Show file tree
Hide file tree
Showing 9 changed files with 1,104 additions and 15 deletions.
3 changes: 3 additions & 0 deletions cmake/DetectOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,9 @@ if(ADIOS2_USE_SST AND NOT WIN32)
set(ADIOS2_SST_HAVE_CRAY_DRC TRUE)
endif()
endif()
if(ADIOS2_HAVE_MPI)
set(ADIOS2_SST_HAVE_MPI TRUE)
endif()
endif()

# DAOS
Expand Down
59 changes: 59 additions & 0 deletions docs/user_guide/source/advanced/ecp_hardware.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
######################
ADIOS2 in ECP hardware
######################

ADIOS2 is widely used in ECP (Exascale Computing Project) HPC (high performance
computing) systems, some particular ADIOS2 features needs from specifics
workarounds to run successfully.

OLCF CRUSHER
============

SST MPI Data Transport
----------------------

MPI Data Transport relies on client-server features of MPI which are currently
supported in Cray-MPI implementations with some caveats. Here are some of the
observed issues and what its workaround (if any) are:

**MPI_Finalize** will block the system process in the "Writer/Producer" ADIOS2
instance. The reason is that the Producer ADIOS instance internally calls
`MPI_Open_port` which somehow even after calling `MPI_Close_port` `MPI_Finalize`
still consider its port to be in used, hence blocking the process. The
workaround is to use a `MPI_Barrier(MPI_COMM_WORLD)` instead of `MPI_Finalize()`
call.

**srun does not understand mpmd instructions** Simply disable them with the flag
`-DADIOS2_RUN_MPI_MPMD_TESTS=OFF`

**Tests timeout** Since we launch every tests with srun the scheduling times
can exceed the test default timeout. Use a large timeout (5mins) for running
your tests.

Examples of launching ADIOS2 SST unit tests using MPI DP:

.. code-block:: bash
# We omit some of the srun (SLURM) arguments which are specific of the project
# you are working on. Note that you could avoid calling srun directly by
# setting the CMAKE variable `MPIEXEC_EXECUTABLE`.
# Launch simple writer test instance
srun {PROJFLAGS }-N 1 /gpfs/alpine/proj-shared/csc331/vbolea/ADIOS2-build/bin/TestCommonWrite SST mpi_dp_test CPCommPattern=Min,MarshalMethod=BP5'
# On another terminal launch multiple instances of the Reader test
srun {PROJFLAGS} -N 2 /gpfs/alpine/proj-shared/csc331/vbolea/ADIOS2-build/bin/TestCommonRead SST mpi_dp_test
Alternatively, you can configure your CMake build to use srun directly:
.. code-block:: bash
cmake . -DMPIEXEC_EXECUTABLE:FILEPATH="/usr/bin/srun" \
-DMPIEXEC_EXTRA_FLAGS:STRING="-A{YourProject} -pbatch -t10" \
-DMPIEXEC_NUMPROC_FLAG:STRING="-N" \
-DMPIEXEC_MAX_NUMPROCS:STRING="-8" \
-DADIOS2_RUN_MPI_MPMD_TESTS=OFF
cmake --build .
ctest
# monitor your jobs
watch -n1 squeue -l -u $USER
4 changes: 2 additions & 2 deletions docs/user_guide/source/engines/sst.rst
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ the underlying network communication mechanism to use for exchanging
data in SST. Generally this is chosen by SST based upon what is
available on the current platform. However, specifying this engine
parameter allows overriding SST's choice. Current allowed values are
**"RDMA"** and **"WAN"**. (**ib** and **fabric** are accepted as
**"MPI"**, **"RDMA"**, and **"WAN"**. (**ib** and **fabric** are accepted as
equivalent to **RDMA** and **evpath** is equivalent to **WAN**.)
Generally both the reader and writer should be using the same network
transport, and the network transport chosen may be dictated by the
Expand Down Expand Up @@ -288,7 +288,7 @@ BeginStep timeouts) and writer-side rules (like queue limit behavior) apply.
QueueLimit integer **0** (no queue limits)
QueueFullPolicy string **Block**, Discard
ReserveQueueLimit integer **0** (no queue limits)
DataTransport string **default varies by platform**, RDMA, WAN
DataTransport string **default varies by platform**, MPI, RDMA, WAN
WANDataTransport string **sockets**, enet, ib
ControlTransport string **TCP**, Scalable
NetworkInterface string **NULL**
Expand Down
1 change: 1 addition & 0 deletions docs/user_guide/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Funded by the `Exascale Computing Project (ECP) <https://www.exascaleproject.org
advanced/memory_management
advanced/gpu_aware
advanced/plugins
advanced/ecp_hardware

.. toctree::
:caption: Ecosystem Tools
Expand Down
6 changes: 6 additions & 0 deletions source/adios2/toolkit/sst/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ if(ADIOS2_HAVE_ZFP)
target_link_libraries(sst PRIVATE zfp::zfp)
endif()

if(ADIOS2_HAVE_MPI)
target_sources(sst PRIVATE dp/mpi_dp.c)
target_link_libraries(sst PRIVATE MPI::MPI_C)
endif()

# Set library version information
set_target_properties(sst PROPERTIES
OUTPUT_NAME adios2${ADIOS2_LIBRARY_SUFFIX}_sst
Expand All @@ -54,6 +59,7 @@ set(SST_CONFIG_OPTS
FI_GNI
CRAY_DRC
NVStream
MPI
)
include(SSTFunctions)
GenerateSSTHeaderConfig(${SST_CONFIG_OPTS})
Expand Down
27 changes: 14 additions & 13 deletions source/adios2/toolkit/sst/cp/cp_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1143,19 +1143,6 @@ extern void SstStreamDestroy(SstStream Stream)
free(Stream->Timesteps);
Stream->Timesteps = Next;
}
if (Stream->DP_Stream)
{
pthread_mutex_unlock(&Stream->DataLock);
if (Stream->Role == ReaderRole)
{
Stream->DP_Interface->destroyReader(&Svcs, Stream->DP_Stream);
}
else
{
Stream->DP_Interface->destroyWriter(&Svcs, Stream->DP_Stream);
}
pthread_mutex_lock(&Stream->DataLock);
}
if (Stream->Readers)
{
for (int i = 0; i < Stream->ReaderCount; i++)
Expand Down Expand Up @@ -1189,6 +1176,20 @@ extern void SstStreamDestroy(SstStream Stream)
Stream->Readers = NULL;
}

if (Stream->DP_Stream)
{
pthread_mutex_unlock(&Stream->DataLock);
if (Stream->Role == ReaderRole)
{
Stream->DP_Interface->destroyReader(&Svcs, Stream->DP_Stream);
}
else
{
Stream->DP_Interface->destroyWriter(&Svcs, Stream->DP_Stream);
}
pthread_mutex_lock(&Stream->DataLock);
}

FFSFormatList FFSList = Stream->PreviousFormats;
Stream->PreviousFormats = NULL;
free(Stream->ReleaseList);
Expand Down
20 changes: 20 additions & 0 deletions source/adios2/toolkit/sst/cp/cp_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,14 @@ static void CloseWSRStream(CManager cm, void *WSR_Stream_v)
"Delayed task Moving Reader stream %p to status %s\n",
CP_WSR_Stream, SSTStreamStatusStr[PeerClosed]);
CP_PeerFailCloseWSReader(CP_WSR_Stream, PeerClosed);

if (strncmp("mpi", ParentStream->ConfigParams->DataTransport, 3) == 0 &&
CP_WSR_Stream->DP_WSR_Stream)
{
CP_WSR_Stream->ParentStream->DP_Interface->destroyWriterPerReader(
&Svcs, CP_WSR_Stream->DP_WSR_Stream);
CP_WSR_Stream->DP_WSR_Stream = NULL;
}
STREAM_MUTEX_UNLOCK(ParentStream);
}

Expand Down Expand Up @@ -1526,6 +1534,18 @@ static void CP_PeerFailCloseWSReader(WS_ReaderInfo CP_WSR_Stream,
CMfree(CMadd_delayed_task(ParentStream->CPInfo->SharedCM->cm, 2, 0,
CloseWSRStream, CP_WSR_Stream));
}
else
{
if (strncmp("mpi", ParentStream->ConfigParams->DataTransport, 3) ==
0 &&
CP_WSR_Stream->DP_WSR_Stream)
{
CP_WSR_Stream->ParentStream->DP_Interface
->destroyWriterPerReader(&Svcs,
CP_WSR_Stream->DP_WSR_Stream);
CP_WSR_Stream->DP_WSR_Stream = NULL;
}
}
}
CP_verbose(ParentStream, PerStepVerbose,
"Moving Reader stream %p to status %s\n", CP_WSR_Stream,
Expand Down
7 changes: 7 additions & 0 deletions source/adios2/toolkit/sst/dp/dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ extern CP_DP_Interface LoadRdmaDP();
#ifdef SST_HAVE_DAOS
extern CP_DP_Interface LoadDaosDP();
#endif /* SST_HAVE_LIBFABRIC */
#ifdef SST_HAVE_MPI
extern CP_DP_Interface LoadMpiDP();
#endif /* SST_HAVE_MPI*/
extern CP_DP_Interface LoadEVpathDP();

typedef struct _DPElement
Expand Down Expand Up @@ -68,6 +71,10 @@ CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream,
AddDPPossibility(Svcs, CP_Stream, List, LoadDaosDP(), "daos", Params);
#endif /* SST_HAVE_DAOS */

#ifdef SST_HAVE_MPI
List = AddDPPossibility(Svcs, CP_Stream, List, LoadMpiDP(), "mpi", Params);
#endif /* SST_HAVE_MPI */

int SelectedDP = -1;
int BestPriority = -1;
int BestPrioDP = -1;
Expand Down
Loading

0 comments on commit 95c3627

Please sign in to comment.