Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync release into master #1202

Merged
merged 22 commits into from
Feb 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
873666b
Support option 'accuracy' in MGARD, as a synonym for tolerance
pnorbert Feb 13, 2019
be48fd8
Restore SST no-mpi testing by letting staging-common activate on a no…
eisenhauer Feb 14, 2019
2f5476f
Remove KillReader tests until stabilized. Try to fix VC++ compilation.
eisenhauer Feb 15, 2019
24fb740
Try to fix VC++ compilation.
eisenhauer Feb 15, 2019
5e1d63c
Remove usleep for MSVC
eisenhauer Feb 15, 2019
ef5ac95
Remove usleep for MSVC
eisenhauer Feb 15, 2019
03aa0db
add cast
eisenhauer Feb 15, 2019
296f785
add cast
eisenhauer Feb 15, 2019
1e16f9f
add cast
eisenhauer Feb 15, 2019
ec42817
add cast
eisenhauer Feb 15, 2019
5ec7411
Merge pull request #1186 from eisenhauer/SSTNoMPITesting
eisenhauer Feb 15, 2019
a2952b5
Tweak SST Data transport so that if a transport is unavailable, we do…
eisenhauer Feb 14, 2019
6b3e87e
Merge pull request #1183 from eisenhauer/SSTTransportTweaks
eisenhauer Feb 15, 2019
2035c69
Merge pull request #1196 from pnorbert/mgard-option
pnorbert Feb 15, 2019
75f08ba
Sort variable and attributes names to return the same list as the C++…
pnorbert Feb 15, 2019
8c0f8ea
clang-format
pnorbert Feb 13, 2019
a61e6a4
Merge pull request #1197 from pnorbert/c-api-ordered-names
pnorbert Feb 16, 2019
5db447d
Clean up queue handling in GlobalOp to produce closer to expected res…
eisenhauer Feb 17, 2019
1e9ce2c
Bump timeout on TimeoutReader test because it's close
eisenhauer Feb 17, 2019
13720a2
Bump timeout on TimeoutReader test because it's close
eisenhauer Feb 18, 2019
e7f8b4f
Merge pull request #1201 from eisenhauer/QueueLimitsFix
eisenhauer Feb 18, 2019
499f7e3
Merge branch 'release' into sync-release-into-master
Feb 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions bindings/C/c/adios2_c_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,17 @@ adios2_error adios2_inquire_all_variables(adios2_variable ***variables,
*size = dataMap.size();
adios2_variable **list =
(adios2_variable **)calloc(*size, sizeof(adios2_variable *));

// Sort the names so that we return the same order as the
// C++, python APIs
std::set<std::string> names;
for (auto &it : dataMap)
names.insert(it.first);

size_t n = 0;
for (auto it = dataMap.begin(); it != dataMap.end(); ++it)
for (auto &name : names)
{
const std::string name(it->first);
auto it = dataMap.find(name);
const std::string type(it->second.first);
adios2::core::VariableBase *variable = nullptr;

Expand Down Expand Up @@ -487,10 +494,17 @@ adios2_error adios2_inquire_all_attributes(adios2_attribute ***attributes,
*size = dataMap.size();
adios2_attribute **list =
(adios2_attribute **)calloc(*size, sizeof(adios2_attribute *));

// Sort the names so that we return the same order as the
// C++, python APIs
std::set<std::string> names;
for (auto &it : dataMap)
names.insert(it.first);

size_t n = 0;
for (auto it = dataMap.begin(); it != dataMap.end(); ++it)
for (auto &name : names)
{
const std::string name(it->first);
auto it = dataMap.find(name);
const std::string type(it->second.first);
adios2::core::AttributeBase *attribute = nullptr;

Expand Down
15 changes: 14 additions & 1 deletion docs/user_guide/source/engines/sst.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,19 @@ applications running on different interconnects, the Wide Area Network
(WAN) option should be chosen. This value is interpreted by both SST
Writer and Reader engines.

5. **NetworkInterface**: Default **NULL**. In situations in which
6. **DataTransport**: Default **tcp**. This string value specifies
the underlying network communication mechanism to use for performing
control operations in SST. SST can be configured to standard TCP
sockets, which are very reliable and efficient, but which are limited
in their scalability. Alternatively, SST can use a reliable UDP
protocol, that is more scalable, but as of ADIOS2 Release 2.3.1 still
suffers from some reliability problems. (**sockets** is accepted as
equivalent to **tcp** and **udp**, **rudp**, and **enet** are
equivalent to **scalable**. Generally both the reader and writer
should be using the same control transport. This value is interpreted
by both SST Writer and Reader engines.

7. **NetworkInterface**: Default **NULL**. In situations in which
there are multiple possible network interfaces available to SST, this
string value specifies which should be used to generate SST's contact
information for writers. Generally this should *NOT* be specified
Expand All @@ -173,5 +185,6 @@ This value is interpreted by only by the SST Writer engine.
QueueLimit integer **0** (no queue limits)
QueueFullPolicy string **Block**, Discard
DataTransport string **default varies by platform**, RDMA, WAN
ControlTransport string **TCP**, Scalable
NetworkInterface string **NULL**
======================= ===================== =========================================================
26 changes: 17 additions & 9 deletions source/adios2/operator/compress/CompressMGARD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,26 @@ size_t CompressMGARD::Compress(const void *dataIn, const Dims &dimensions,
}

// Parameters
bool hasTolerance = false;
double tolerance;
auto itAccuracy = parameters.find("accuracy");
if (itAccuracy != parameters.end())
{
tolerance = std::stod(itAccuracy->second);
hasTolerance = true;
}
auto itTolerance = parameters.find("tolerance");
if (m_DebugMode)
if (itTolerance != parameters.end())
{
if (itTolerance == parameters.end())
{
throw std::invalid_argument("ERROR: missing mandatory parameter "
"tolerance for MGARD compression "
"operator, in call to Put\n");
}
tolerance = std::stod(itTolerance->second);
hasTolerance = true;
}
if (!hasTolerance)
{
throw std::invalid_argument("ERROR: missing mandatory parameter "
"tolerance for MGARD compression "
"operator\n");
}

double tolerance = std::stod(itTolerance->second);

int sizeOut = 0;
unsigned char *dataOutPtr =
Expand Down
32 changes: 28 additions & 4 deletions source/adios2/toolkit/sst/cp/cp_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ void CP_validateParams(SstStream Stream, SstParams Params, int Writer)
}
Stream->QueueFullPolicy = Params->QueueFullPolicy;
Stream->RegistrationMethod = Params->RegistrationMethod;
char *SelectedTransport = NULL;
if (Params->DataTransport != NULL)
{
int i;
SelectedTransport = malloc(strlen(Params->DataTransport) + 1);
char *SelectedTransport = malloc(strlen(Params->DataTransport) + 1);
for (i = 0; Params->DataTransport[i] != 0; i++)
{
SelectedTransport[i] = tolower(Params->DataTransport[i]);
Expand All @@ -69,8 +68,33 @@ void CP_validateParams(SstStream Stream, SstParams Params, int Writer)
}
if (Params->ControlTransport == NULL)
{
/* determine reasonable default, now "enet" */
Params->ControlTransport = strdup("enet");
/* determine reasonable default, now "sockets" */
Params->ControlTransport = strdup("sockets");
}
else
{
int i;
char *SelectedTransport = malloc(strlen(Params->ControlTransport) + 1);
for (i = 0; Params->ControlTransport[i] != 0; i++)
{
SelectedTransport[i] = tolower(Params->ControlTransport[i]);
}
SelectedTransport[i] = 0;

/* canonicalize SelectedTransport */
if ((strcmp(SelectedTransport, "sockets") == 0) ||
(strcmp(SelectedTransport, "tcp") == 0))
{
Params->ControlTransport = strdup("sockets");
}
else if ((strcmp(SelectedTransport, "udp") == 0) ||
(strcmp(SelectedTransport, "rudp") == 0) ||
(strcmp(SelectedTransport, "scalable") == 0) ||
(strcmp(SelectedTransport, "enet") == 0))
{
Params->ControlTransport = strdup("enet");
}
free(SelectedTransport);
}
Stream->ConnectionUsleepMultiplier = 50;
if ((strcmp(Params->ControlTransport, "enet") == 0) &&
Expand Down
62 changes: 54 additions & 8 deletions source/adios2/toolkit/sst/cp/cp_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1154,9 +1154,14 @@ static void DoWriterSideGlobalOp(SstStream Stream, int *DiscardIncomingTimestep)
ArrivingReader = ArrivingReader->Next;
}
if (Stream->QueueLimit &&
(Stream->QueuedTimestepCount > Stream->QueueLimit))
((Stream->QueuedTimestepCount + 1) > Stream->QueueLimit))
{
SendBlock[1] = 1; /* this rank is over stream limit */
CP_verbose(Stream,
"Writer will be over queue limit, count %d, limit %d\n",
Stream->QueuedTimestepCount, Stream->QueueLimit);

SendBlock[1] =
1; /* this rank will be over queue limit with new timestep */
}
else
{
Expand Down Expand Up @@ -1207,10 +1212,6 @@ static void DoWriterSideGlobalOp(SstStream Stream, int *DiscardIncomingTimestep)
ActiveReaderCount++;
}

/*
* Then handle possible incoming connection requests. (Only rank 0 has
* valid info.)
*/
int OverLimit = 0;
for (int i = 0; i < Stream->CohortSize; i++)
{
Expand All @@ -1220,6 +1221,47 @@ static void DoWriterSideGlobalOp(SstStream Stream, int *DiscardIncomingTimestep)
/* we've got all the state we need to know, release data lock */
PTHREAD_MUTEX_UNLOCK(&Stream->DataLock);

/*
* before we add any new readers, see if we are going to need to
* discard anything in the queue to stay in the queue limit
*/

if (OverLimit)
{
if (Stream->QueueFullPolicy == SstQueueFullDiscard)
{
/* discard things */
if (ActiveReaderCount == 0)
{
PTHREAD_MUTEX_LOCK(&Stream->DataLock);
/*
* Have to double-check here. While in general, if everyone
* had zero readers at the start, everyone should have the
* same set of timesteps queued and everyone should be doing
* a dequeue here. However, we might have just gone to zero
* active (because of failed or exiting readers), and some
* timesteps might just have been released. (Note: Assuming
* that if having gone to zero readers will have dequeued
* timesteps that had reference counts. So generally we
* must be dequeueing things with a zero reference count
* here.) That is an assumption that should not be
* violated.
*/
if ((Stream->QueuedTimestepCount + 1) >= Stream->QueueLimit)
{
CP_verbose(Stream, "Writer doing discard for overlimit\n");
DoStreamDiscard(Stream);
OverLimit = 0; /* handled */
}
PTHREAD_MUTEX_UNLOCK(&Stream->DataLock);
}
}
}

/*
* Then handle possible incoming connection requests. (Only rank 0 has
* valid info, so only look to RecvBlock[0].)
*/
for (int i = 0; i < RecvBlock[0]; i++)
{
WS_ReaderInfo reader;
Expand All @@ -1235,7 +1277,9 @@ static void DoWriterSideGlobalOp(SstStream Stream, int *DiscardIncomingTimestep)
}

/*
* Lastly, we'll decide what to do with the current provided timestep.
* Lastly, we'll decide what to do with the current provided timestep,
* (if it was not discarded before we added new readers).
*
* If any writer rank is over the queuelimit, we must discard or block
* decision points:
If mode is block on queue limit:
Expand Down Expand Up @@ -1307,7 +1351,9 @@ static void DoWriterSideGlobalOp(SstStream Stream, int *DiscardIncomingTimestep)
/* discard things */
if (ActiveReaderCount == 0)
{
DoStreamDiscard(Stream);
/* this should have been handled above */
CP_verbose(Stream, "Finding a late need to discard when Active "
"Readers is zero, shouldn't happen!!\n\n");
}
else
{
Expand Down
27 changes: 25 additions & 2 deletions source/adios2/toolkit/sst/dp/dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream,
int BestPriority = -1;
int BestPrioDP = -1;
int i = 0;
int FoundPreferred = 0;
if (Params->DataTransport)
{
Svcs->verbose(CP_Stream, "Prefered dataplane name is \"%s\"\n",
Expand All @@ -79,8 +80,18 @@ CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream,
{
if (strcasecmp(List[i].Name, Params->DataTransport) == 0)
{
SelectedDP = i;
break;
FoundPreferred = 1;
if (List[i].Priority >= 0)
{
SelectedDP = i;
break;
}
else
{
fprintf(stderr, "Warning: Perferred DataPlane \"%s\" is "
"not available.",
List[i].Name);
}
}
}
if (List[i].Priority > BestPriority)
Expand All @@ -90,6 +101,11 @@ CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream,
}
i++;
}
if (Params->DataTransport && (FoundPreferred == 0))
{
fprintf(stderr, "Warning: Preferred DataPlane \"%s\" not found.",
Params->DataTransport);
}
if (SelectedDP != -1)
{
Svcs->verbose(CP_Stream,
Expand All @@ -115,6 +131,13 @@ CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream,
}
i++;
}

if (Params->DataTransport)
{
free(Params->DataTransport);
}
Params->DataTransport = strdup(List[SelectedDP].Name);

Ret = List[SelectedDP].Interface;
free(List);
return Ret;
Expand Down
2 changes: 1 addition & 1 deletion testing/adios2/engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ endif()
if(ADIOS2_HAVE_MPI)
add_subdirectory(common)
add_subdirectory(insitumpi)
add_subdirectory(staging-common)
endif()

add_subdirectory(staging-common)
37 changes: 18 additions & 19 deletions testing/adios2/engine/staging-common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,24 @@

find_package(Threads REQUIRED)

set(extra_test_args EXEC_WRAPPER ${MPIEXEC_COMMAND})

add_executable(TestStagingMPMD TestStagingMPMD.cpp)
target_link_libraries(TestStagingMPMD adios2 gtest ${CMAKE_THREAD_LIBS_INIT})
if(ADIOS2_HAVE_MPI)
target_link_libraries(TestStagingMPMD adios2 gtest_interface MPI::MPI_C)
endif()
set(extra_test_args EXEC_WRAPPER ${MPIEXEC_COMMAND})

if(ADIOS2_HAVE_SST)
gtest_add_tests(TARGET TestStagingMPMD ${extra_test_args}
EXTRA_ARGS "SST" "MarshalMethod:FFS"
TEST_SUFFIX _SST_FFS)
gtest_add_tests(TARGET TestStagingMPMD ${extra_test_args}
EXTRA_ARGS "SST" "MarshalMethod:BP"
TEST_SUFFIX _SST_BP)
endif()
add_executable(TestStagingMPMD TestStagingMPMD.cpp)
target_link_libraries(TestStagingMPMD adios2 gtest ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(TestStagingMPMD adios2 gtest_interface MPI::MPI_C)
if(ADIOS2_HAVE_SST)
gtest_add_tests(TARGET TestStagingMPMD ${extra_test_args}
EXTRA_ARGS "SST" "MarshalMethod:FFS"
TEST_SUFFIX _SST_FFS)
gtest_add_tests(TARGET TestStagingMPMD ${extra_test_args}
EXTRA_ARGS "SST" "MarshalMethod:BP"
TEST_SUFFIX _SST_BP)
endif()

if(ADIOS2_HAVE_MPI)
gtest_add_tests(TARGET TestStagingMPMD ${extra_test_args}
EXTRA_ARGS "InSituMPI"
TEST_SUFFIX _InSituMPI)
gtest_add_tests(TARGET TestStagingMPMD ${extra_test_args}
EXTRA_ARGS "InSituMPI"
TEST_SUFFIX _InSituMPI)
endif()

add_executable(TestCommonWrite TestCommonWrite.cpp)
Expand Down Expand Up @@ -96,7 +93,8 @@ set (FORTRAN_TESTS "")
if(ADIOS2_HAVE_Fortran)
set (FORTRAN_TESTS "FtoC.1x1;CtoF.1x1.FFS;CtoF.1x1.BP;FtoF.1x1")
endif()
set (SPECIAL_TESTS "KillReadersSerialized;KillReaders3Max;TimeoutReader;LatestReader.FFS;LatestReader.BP;DiscardWriter")
#set (SPECIAL_TESTS "KillReadersSerialized;KillReaders3Max;TimeoutReader;LatestReader.FFS;LatestReader.BP;DiscardWriter")
set (SPECIAL_TESTS "TimeoutReader;LatestReader.FFS;LatestReader.BP;DiscardWriter")

set (MPI_TESTS "")
set (MPI_FORTRAN_TESTS "")
Expand Down Expand Up @@ -139,6 +137,7 @@ set (KillReadersSerialized_TIMEOUT "300")
set (KillReaders3Max_CMD "run_multi_test -test_protocol kill_readers -verbose -nw 3 -nr 2 -max_readers 3 -warg RendezvousReaderCount:0,ControlTransport:sockets -rarg --ignore_time_gap")
set (KillReaders3Max_TIMEOUT "300")
set (TimeoutReader_CMD "run_multi_test -test_protocol one_to_one -verbose -nw 1 -nr 1 -max_readers 1 -rarg --non_blocking -warg --ms_delay -warg 2000")
set (TimeoutReaders_TIMEOUT "60")
set (LatestReader.FFS_CMD "run_multi_test -test_protocol one_to_one -verbose -nw 1 -nr 1 -max_readers 1 -warg --ms_delay -warg 250 -warg --engine_params -warg MarshalMethod:FFS -rarg --latest -rarg --long_first_delay")
set (LatestReader.BP_CMD "run_multi_test -test_protocol one_to_one -verbose -nw 1 -nr 1 -max_readers 1 -warg --ms_delay -warg 250 -warg --engine_params -warg MarshalMethod:BP -rarg --latest -rarg --long_first_delay")
set (LatestReader_CMD "run_multi_test -test_protocol one_to_one -verbose -nw 1 -nr 1 -max_readers 1 -warg --ms_delay -warg 250 -warg --engine_params -rarg --latest -rarg --long_first_delay")
Expand Down
Loading