Skip to content

Commit

Permalink
Merge pull request #1202 from chuckatkins/sync-release-into-master
Browse files Browse the repository at this point in the history
Sync release into master
  • Loading branch information
Chuck Atkins authored Feb 18, 2019
2 parents 8d5e22a + 499f7e3 commit faf81e0
Show file tree
Hide file tree
Showing 17 changed files with 218 additions and 79 deletions.
22 changes: 18 additions & 4 deletions bindings/C/c/adios2_c_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,17 @@ adios2_error adios2_inquire_all_variables(adios2_variable ***variables,
*size = dataMap.size();
adios2_variable **list =
(adios2_variable **)calloc(*size, sizeof(adios2_variable *));

// Sort the names so that we return the same order as the
// C++, python APIs
std::set<std::string> names;
for (auto &it : dataMap)
names.insert(it.first);

size_t n = 0;
for (auto it = dataMap.begin(); it != dataMap.end(); ++it)
for (auto &name : names)
{
const std::string name(it->first);
auto it = dataMap.find(name);
const std::string type(it->second.first);
adios2::core::VariableBase *variable = nullptr;

Expand Down Expand Up @@ -487,10 +494,17 @@ adios2_error adios2_inquire_all_attributes(adios2_attribute ***attributes,
*size = dataMap.size();
adios2_attribute **list =
(adios2_attribute **)calloc(*size, sizeof(adios2_attribute *));

// Sort the names so that we return the same order as the
// C++, python APIs
std::set<std::string> names;
for (auto &it : dataMap)
names.insert(it.first);

size_t n = 0;
for (auto it = dataMap.begin(); it != dataMap.end(); ++it)
for (auto &name : names)
{
const std::string name(it->first);
auto it = dataMap.find(name);
const std::string type(it->second.first);
adios2::core::AttributeBase *attribute = nullptr;

Expand Down
15 changes: 14 additions & 1 deletion docs/user_guide/source/engines/sst.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,19 @@ applications running on different interconnects, the Wide Area Network
(WAN) option should be chosen. This value is interpreted by both SST
Writer and Reader engines.

5. **NetworkInterface**: Default **NULL**. In situations in which
6. **DataTransport**: Default **tcp**. This string value specifies
the underlying network communication mechanism to use for performing
control operations in SST. SST can be configured to standard TCP
sockets, which are very reliable and efficient, but which are limited
in their scalability. Alternatively, SST can use a reliable UDP
protocol, that is more scalable, but as of ADIOS2 Release 2.3.1 still
suffers from some reliability problems. (**sockets** is accepted as
equivalent to **tcp** and **udp**, **rudp**, and **enet** are
equivalent to **scalable**. Generally both the reader and writer
should be using the same control transport. This value is interpreted
by both SST Writer and Reader engines.

7. **NetworkInterface**: Default **NULL**. In situations in which
there are multiple possible network interfaces available to SST, this
string value specifies which should be used to generate SST's contact
information for writers. Generally this should *NOT* be specified
Expand All @@ -173,5 +185,6 @@ This value is interpreted by only by the SST Writer engine.
QueueLimit integer **0** (no queue limits)
QueueFullPolicy string **Block**, Discard
DataTransport string **default varies by platform**, RDMA, WAN
ControlTransport string **TCP**, Scalable
NetworkInterface string **NULL**
======================= ===================== =========================================================
26 changes: 17 additions & 9 deletions source/adios2/operator/compress/CompressMGARD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,26 @@ size_t CompressMGARD::Compress(const void *dataIn, const Dims &dimensions,
}

// Parameters
bool hasTolerance = false;
double tolerance;
auto itAccuracy = parameters.find("accuracy");
if (itAccuracy != parameters.end())
{
tolerance = std::stod(itAccuracy->second);
hasTolerance = true;
}
auto itTolerance = parameters.find("tolerance");
if (m_DebugMode)
if (itTolerance != parameters.end())
{
if (itTolerance == parameters.end())
{
throw std::invalid_argument("ERROR: missing mandatory parameter "
"tolerance for MGARD compression "
"operator, in call to Put\n");
}
tolerance = std::stod(itTolerance->second);
hasTolerance = true;
}
if (!hasTolerance)
{
throw std::invalid_argument("ERROR: missing mandatory parameter "
"tolerance for MGARD compression "
"operator\n");
}

double tolerance = std::stod(itTolerance->second);

int sizeOut = 0;
unsigned char *dataOutPtr =
Expand Down
32 changes: 28 additions & 4 deletions source/adios2/toolkit/sst/cp/cp_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ void CP_validateParams(SstStream Stream, SstParams Params, int Writer)
}
Stream->QueueFullPolicy = Params->QueueFullPolicy;
Stream->RegistrationMethod = Params->RegistrationMethod;
char *SelectedTransport = NULL;
if (Params->DataTransport != NULL)
{
int i;
SelectedTransport = malloc(strlen(Params->DataTransport) + 1);
char *SelectedTransport = malloc(strlen(Params->DataTransport) + 1);
for (i = 0; Params->DataTransport[i] != 0; i++)
{
SelectedTransport[i] = tolower(Params->DataTransport[i]);
Expand All @@ -69,8 +68,33 @@ void CP_validateParams(SstStream Stream, SstParams Params, int Writer)
}
if (Params->ControlTransport == NULL)
{
/* determine reasonable default, now "enet" */
Params->ControlTransport = strdup("enet");
/* determine reasonable default, now "sockets" */
Params->ControlTransport = strdup("sockets");
}
else
{
int i;
char *SelectedTransport = malloc(strlen(Params->ControlTransport) + 1);
for (i = 0; Params->ControlTransport[i] != 0; i++)
{
SelectedTransport[i] = tolower(Params->ControlTransport[i]);
}
SelectedTransport[i] = 0;

/* canonicalize SelectedTransport */
if ((strcmp(SelectedTransport, "sockets") == 0) ||
(strcmp(SelectedTransport, "tcp") == 0))
{
Params->ControlTransport = strdup("sockets");
}
else if ((strcmp(SelectedTransport, "udp") == 0) ||
(strcmp(SelectedTransport, "rudp") == 0) ||
(strcmp(SelectedTransport, "scalable") == 0) ||
(strcmp(SelectedTransport, "enet") == 0))
{
Params->ControlTransport = strdup("enet");
}
free(SelectedTransport);
}
Stream->ConnectionUsleepMultiplier = 50;
if ((strcmp(Params->ControlTransport, "enet") == 0) &&
Expand Down
62 changes: 54 additions & 8 deletions source/adios2/toolkit/sst/cp/cp_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1154,9 +1154,14 @@ static void DoWriterSideGlobalOp(SstStream Stream, int *DiscardIncomingTimestep)
ArrivingReader = ArrivingReader->Next;
}
if (Stream->QueueLimit &&
(Stream->QueuedTimestepCount > Stream->QueueLimit))
((Stream->QueuedTimestepCount + 1) > Stream->QueueLimit))
{
SendBlock[1] = 1; /* this rank is over stream limit */
CP_verbose(Stream,
"Writer will be over queue limit, count %d, limit %d\n",
Stream->QueuedTimestepCount, Stream->QueueLimit);

SendBlock[1] =
1; /* this rank will be over queue limit with new timestep */
}
else
{
Expand Down Expand Up @@ -1207,10 +1212,6 @@ static void DoWriterSideGlobalOp(SstStream Stream, int *DiscardIncomingTimestep)
ActiveReaderCount++;
}

/*
* Then handle possible incoming connection requests. (Only rank 0 has
* valid info.)
*/
int OverLimit = 0;
for (int i = 0; i < Stream->CohortSize; i++)
{
Expand All @@ -1220,6 +1221,47 @@ static void DoWriterSideGlobalOp(SstStream Stream, int *DiscardIncomingTimestep)
/* we've got all the state we need to know, release data lock */
PTHREAD_MUTEX_UNLOCK(&Stream->DataLock);

/*
* before we add any new readers, see if we are going to need to
* discard anything in the queue to stay in the queue limit
*/

if (OverLimit)
{
if (Stream->QueueFullPolicy == SstQueueFullDiscard)
{
/* discard things */
if (ActiveReaderCount == 0)
{
PTHREAD_MUTEX_LOCK(&Stream->DataLock);
/*
* Have to double-check here. While in general, if everyone
* had zero readers at the start, everyone should have the
* same set of timesteps queued and everyone should be doing
* a dequeue here. However, we might have just gone to zero
* active (because of failed or exiting readers), and some
* timesteps might just have been released. (Note: Assuming
* that if having gone to zero readers will have dequeued
* timesteps that had reference counts. So generally we
* must be dequeueing things with a zero reference count
* here.) That is an assumption that should not be
* violated.
*/
if ((Stream->QueuedTimestepCount + 1) >= Stream->QueueLimit)
{
CP_verbose(Stream, "Writer doing discard for overlimit\n");
DoStreamDiscard(Stream);
OverLimit = 0; /* handled */
}
PTHREAD_MUTEX_UNLOCK(&Stream->DataLock);
}
}
}

/*
* Then handle possible incoming connection requests. (Only rank 0 has
* valid info, so only look to RecvBlock[0].)
*/
for (int i = 0; i < RecvBlock[0]; i++)
{
WS_ReaderInfo reader;
Expand All @@ -1235,7 +1277,9 @@ static void DoWriterSideGlobalOp(SstStream Stream, int *DiscardIncomingTimestep)
}

/*
* Lastly, we'll decide what to do with the current provided timestep.
* Lastly, we'll decide what to do with the current provided timestep,
* (if it was not discarded before we added new readers).
*
* If any writer rank is over the queuelimit, we must discard or block
* decision points:
If mode is block on queue limit:
Expand Down Expand Up @@ -1307,7 +1351,9 @@ static void DoWriterSideGlobalOp(SstStream Stream, int *DiscardIncomingTimestep)
/* discard things */
if (ActiveReaderCount == 0)
{
DoStreamDiscard(Stream);
/* this should have been handled above */
CP_verbose(Stream, "Finding a late need to discard when Active "
"Readers is zero, shouldn't happen!!\n\n");
}
else
{
Expand Down
27 changes: 25 additions & 2 deletions source/adios2/toolkit/sst/dp/dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream,
int BestPriority = -1;
int BestPrioDP = -1;
int i = 0;
int FoundPreferred = 0;
if (Params->DataTransport)
{
Svcs->verbose(CP_Stream, "Prefered dataplane name is \"%s\"\n",
Expand All @@ -79,8 +80,18 @@ CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream,
{
if (strcasecmp(List[i].Name, Params->DataTransport) == 0)
{
SelectedDP = i;
break;
FoundPreferred = 1;
if (List[i].Priority >= 0)
{
SelectedDP = i;
break;
}
else
{
fprintf(stderr, "Warning: Perferred DataPlane \"%s\" is "
"not available.",
List[i].Name);
}
}
}
if (List[i].Priority > BestPriority)
Expand All @@ -90,6 +101,11 @@ CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream,
}
i++;
}
if (Params->DataTransport && (FoundPreferred == 0))
{
fprintf(stderr, "Warning: Preferred DataPlane \"%s\" not found.",
Params->DataTransport);
}
if (SelectedDP != -1)
{
Svcs->verbose(CP_Stream,
Expand All @@ -115,6 +131,13 @@ CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream,
}
i++;
}

if (Params->DataTransport)
{
free(Params->DataTransport);
}
Params->DataTransport = strdup(List[SelectedDP].Name);

Ret = List[SelectedDP].Interface;
free(List);
return Ret;
Expand Down
2 changes: 1 addition & 1 deletion testing/adios2/engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ endif()
if(ADIOS2_HAVE_MPI)
add_subdirectory(common)
add_subdirectory(insitumpi)
add_subdirectory(staging-common)
endif()

add_subdirectory(staging-common)
37 changes: 18 additions & 19 deletions testing/adios2/engine/staging-common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,24 @@

find_package(Threads REQUIRED)

set(extra_test_args EXEC_WRAPPER ${MPIEXEC_COMMAND})

add_executable(TestStagingMPMD TestStagingMPMD.cpp)
target_link_libraries(TestStagingMPMD adios2 gtest ${CMAKE_THREAD_LIBS_INIT})
if(ADIOS2_HAVE_MPI)
target_link_libraries(TestStagingMPMD adios2 gtest_interface MPI::MPI_C)
endif()
set(extra_test_args EXEC_WRAPPER ${MPIEXEC_COMMAND})

if(ADIOS2_HAVE_SST)
gtest_add_tests(TARGET TestStagingMPMD ${extra_test_args}
EXTRA_ARGS "SST" "MarshalMethod:FFS"
TEST_SUFFIX _SST_FFS)
gtest_add_tests(TARGET TestStagingMPMD ${extra_test_args}
EXTRA_ARGS "SST" "MarshalMethod:BP"
TEST_SUFFIX _SST_BP)
endif()
add_executable(TestStagingMPMD TestStagingMPMD.cpp)
target_link_libraries(TestStagingMPMD adios2 gtest ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(TestStagingMPMD adios2 gtest_interface MPI::MPI_C)
if(ADIOS2_HAVE_SST)
gtest_add_tests(TARGET TestStagingMPMD ${extra_test_args}
EXTRA_ARGS "SST" "MarshalMethod:FFS"
TEST_SUFFIX _SST_FFS)
gtest_add_tests(TARGET TestStagingMPMD ${extra_test_args}
EXTRA_ARGS "SST" "MarshalMethod:BP"
TEST_SUFFIX _SST_BP)
endif()

if(ADIOS2_HAVE_MPI)
gtest_add_tests(TARGET TestStagingMPMD ${extra_test_args}
EXTRA_ARGS "InSituMPI"
TEST_SUFFIX _InSituMPI)
gtest_add_tests(TARGET TestStagingMPMD ${extra_test_args}
EXTRA_ARGS "InSituMPI"
TEST_SUFFIX _InSituMPI)
endif()

add_executable(TestCommonWrite TestCommonWrite.cpp)
Expand Down Expand Up @@ -96,7 +93,8 @@ set (FORTRAN_TESTS "")
if(ADIOS2_HAVE_Fortran)
set (FORTRAN_TESTS "FtoC.1x1;CtoF.1x1.FFS;CtoF.1x1.BP;FtoF.1x1")
endif()
set (SPECIAL_TESTS "KillReadersSerialized;KillReaders3Max;TimeoutReader;LatestReader.FFS;LatestReader.BP;DiscardWriter")
#set (SPECIAL_TESTS "KillReadersSerialized;KillReaders3Max;TimeoutReader;LatestReader.FFS;LatestReader.BP;DiscardWriter")
set (SPECIAL_TESTS "TimeoutReader;LatestReader.FFS;LatestReader.BP;DiscardWriter")

set (MPI_TESTS "")
set (MPI_FORTRAN_TESTS "")
Expand Down Expand Up @@ -139,6 +137,7 @@ set (KillReadersSerialized_TIMEOUT "300")
set (KillReaders3Max_CMD "run_multi_test -test_protocol kill_readers -verbose -nw 3 -nr 2 -max_readers 3 -warg RendezvousReaderCount:0,ControlTransport:sockets -rarg --ignore_time_gap")
set (KillReaders3Max_TIMEOUT "300")
set (TimeoutReader_CMD "run_multi_test -test_protocol one_to_one -verbose -nw 1 -nr 1 -max_readers 1 -rarg --non_blocking -warg --ms_delay -warg 2000")
set (TimeoutReaders_TIMEOUT "60")
set (LatestReader.FFS_CMD "run_multi_test -test_protocol one_to_one -verbose -nw 1 -nr 1 -max_readers 1 -warg --ms_delay -warg 250 -warg --engine_params -warg MarshalMethod:FFS -rarg --latest -rarg --long_first_delay")
set (LatestReader.BP_CMD "run_multi_test -test_protocol one_to_one -verbose -nw 1 -nr 1 -max_readers 1 -warg --ms_delay -warg 250 -warg --engine_params -warg MarshalMethod:BP -rarg --latest -rarg --long_first_delay")
set (LatestReader_CMD "run_multi_test -test_protocol one_to_one -verbose -nw 1 -nr 1 -max_readers 1 -warg --ms_delay -warg 250 -warg --engine_params -rarg --latest -rarg --long_first_delay")
Expand Down
Loading

0 comments on commit faf81e0

Please sign in to comment.