Skip to content

Commit

Permalink
Merge pull request #3425 from pnorbert/awssdk
Browse files Browse the repository at this point in the history
Added the AWSSDK transport using the AWS SDK S3 API.
  • Loading branch information
pnorbert authored Jan 10, 2023
2 parents 770d852 + 7f47767 commit 3dd524f
Show file tree
Hide file tree
Showing 22 changed files with 723 additions and 53 deletions.
5 changes: 4 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ adios_option(Profiling "Enable support for profiling" AUTO)
adios_option(Endian_Reverse "Enable support for Little/Big Endian Interoperability" AUTO)
adios_option(Sodium "Enable support for Sodium for encryption" AUTO)
adios_option(Catalyst "Enable support for in situ visualization plugin using ParaView Catalyst" AUTO)
adios_option(AWSSDK "Enable support for S3 compatible storage using AWS SDK's S3 module" AUTO)
include(${PROJECT_SOURCE_DIR}/cmake/DetectOptions.cmake)

if(ADIOS2_HAVE_CUDA)
Expand Down Expand Up @@ -225,7 +226,9 @@ endif()


set(ADIOS2_CONFIG_OPTS
BP5 DataMan DataSpaces HDF5 HDF5_VOL MHS SST CUDA Fortran MPI Python Blosc Blosc2 BZip2 LIBPRESSIO MGARD PNG SZ ZFP DAOS IME O_DIRECT Sodium Catalyst SysVShMem ZeroMQ Profiling Endian_Reverse GPU_Support
BP5 DataMan DataSpaces HDF5 HDF5_VOL MHS SST CUDA Fortran MPI Python Blosc Blosc2
BZip2 LIBPRESSIO MGARD PNG SZ ZFP DAOS IME O_DIRECT Sodium Catalyst SysVShMem ZeroMQ
Profiling Endian_Reverse GPU_Support AWSSDK
)

GenerateADIOSHeaderConfig(${ADIOS2_CONFIG_OPTS})
Expand Down
10 changes: 10 additions & 0 deletions cmake/DetectOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,16 @@ if(catalyst_FOUND)
set(ADIOS2_HAVE_Catalyst TRUE)
endif()

# AWS S3
if(ADIOS2_USE_AWSSDK STREQUAL AUTO)
find_package(AWSSDK QUIET COMPONENTS s3)
elseif(ADIOS2_USE_AWSSDK)
find_package(AWSSDK REQUIRED COMPONENTS s3)
endif()
if(AWSSDK_FOUND)
set(ADIOS2_HAVE_AWSSDK TRUE)
endif()

# Multithreading
find_package(Threads REQUIRED)

Expand Down
6 changes: 6 additions & 0 deletions cmake/adios2-config-common.cmake.in
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ if(NOT @BUILD_SHARED_LIBS@)
find_dependency(catalyst)
endif()

set(ADIOS2_HAVE_AWSSDK @ADIOS2_HAVE_AWSSDK@)
if(ADIOS2_HAVE_AWSSDK)
find_dependency(AWSSDK)
endif()


adios2_add_thirdparty_target(pugixml)
set(ADIOS2_USE_EXTERNAL_PUGIXML @ADIOS2_USE_EXTERNAL_PUGIXML@)
if(ADIOS2_USE_EXTERNAL_PUGIXML)
Expand Down
5 changes: 5 additions & 0 deletions source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ if(UNIX)
target_sources(adios2_core PRIVATE toolkit/transport/file/FilePOSIX.cpp)
endif()

if(ADIOS2_HAVE_AWSSDK)
target_sources(adios2_core PRIVATE toolkit/transport/file/FileAWSSDK.cpp)
target_link_libraries(adios2_core PRIVATE ${AWSSDK_LINK_LIBRARIES})
endif()

if (ADIOS2_HAVE_BP5)
target_sources(adios2_core PRIVATE
engine/bp5/BP5Engine.cpp
Expand Down
77 changes: 76 additions & 1 deletion source/adios2/core/ADIOS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "ADIOS.h"

#include <algorithm> // std::transform
#include <atomic>
#include <fstream>
#include <ios> //std::ios_base::failure
#include <mutex>
Expand All @@ -27,18 +28,77 @@
#include "adios2/operator/callback/Signature1.h"
#include "adios2/operator/callback/Signature2.h"

#ifdef ADIOS2_HAVE_AWSSDK
#include <aws/core/Aws.h>
#include <aws/core/utils/logging/LogLevel.h>
Aws::SDKOptions awdSDKOptions;
#endif

namespace adios2
{
namespace core
{

class ADIOS::GlobalServices
{
public:
GlobalServices() {}

~GlobalServices() {}

void CheckStatus()
{
if (wasGlobalShutdown)
{
helper::Throw<std::logic_error>(
"Core", "ADIOS::GlobalServices", "CheckStatus",
"Global Services was already shutdown. Make sure there is one "
"true global ADIOS object that is created first and destructed "
"last to ensure Global services are initialized only once");
}
}

void Finalize()
{
#ifdef ADIOS2_HAVE_AWSSDK
if (isAWSInitialized)
{
Aws::ShutdownAPI(options);
isAWSInitialized = false;
}
#endif
wasGlobalShutdown = true;
}

#ifdef ADIOS2_HAVE_AWSSDK
void Init_AWS_API()
{
if (!isAWSInitialized)
{
options.loggingOptions.logLevel =
Aws::Utils::Logging::LogLevel::Debug;
Aws::InitAPI(options);
isAWSInitialized = true;
}
}
Aws::SDKOptions options;
bool isAWSInitialized = false;
#endif

bool wasGlobalShutdown = false;
};

ADIOS::GlobalServices ADIOS::m_GlobalServices;

std::mutex PerfStubsMutex;
static std::atomic_uint adios_refcount(0);

ADIOS::ADIOS(const std::string configFile, helper::Comm comm,
const std::string hostLanguage)
: m_HostLanguage(hostLanguage), m_Comm(std::move(comm)),
m_ConfigFile(configFile)
{
++adios_refcount;
#ifdef PERFSTUBS_USE_TIMERS
{
std::lock_guard<std::mutex> lck(PerfStubsMutex);
Expand Down Expand Up @@ -86,7 +146,14 @@ ADIOS::ADIOS(const std::string hostLanguage)
{
}

ADIOS::~ADIOS() = default;
ADIOS::~ADIOS()
{
--adios_refcount;
if (!adios_refcount)
{
m_GlobalServices.Finalize();
}
}

IO &ADIOS::DeclareIO(const std::string name, const ArrayOrdering ArrayOrder)
{
Expand Down Expand Up @@ -227,5 +294,13 @@ void ADIOS::YAMLInit(const std::string &configFileYAML)
helper::ParseConfigYAML(*this, configFileYAML, m_IOs);
}

void ADIOS::Global_init_AWS_API()
{
m_GlobalServices.CheckStatus();
#ifdef ADIOS2_HAVE_AWSSDK
m_GlobalServices.Init_AWS_API();
#endif
}

} // end namespace core
} // end namespace adios2
13 changes: 13 additions & 0 deletions source/adios2/core/ADIOS.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,19 @@ class ADIOS
void XMLInit(const std::string &configFileXML);

void YAMLInit(const std::string &configFileYAML);

private:
/* Global services that we want to initialize at most once and shutdown
automatically when the ADIOS object is destructed. This only works
properly if the app creates an ADIOS object that is created before all
other ADIOS objects and is destructed after all other ADIOS objects are
destructed*/
class GlobalServices;
static class GlobalServices m_GlobalServices;

public:
/** Global service AWS SDK initialization */
static void Global_init_AWS_API();
};

} // end namespace core
Expand Down
3 changes: 2 additions & 1 deletion source/adios2/core/IO.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,8 @@ Engine &IO::Open(const std::string &name, const Mode mode, helper::Comm comm)
/* We need to figure out the type of file
* from the file itself
*/
if (helper::IsHDF5File(name, comm, m_TransportsParameters))
if (helper::IsHDF5File(name, *this, comm,
m_TransportsParameters))
{
engineTypeLC = "hdf5";
}
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/engine/bp3/BP3Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace engine
BP3Reader::BP3Reader(IO &io, const std::string &name, const Mode mode,
helper::Comm comm)
: Engine("BP3", io, name, mode, std::move(comm)), m_BP3Deserializer(m_Comm),
m_FileManager(m_Comm), m_SubFileManager(m_Comm)
m_FileManager(io, m_Comm), m_SubFileManager(io, m_Comm)
{
PERFSTUBS_SCOPED_TIMER("BP3Reader::Open");
Init();
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/engine/bp3/BP3Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace engine
BP3Writer::BP3Writer(IO &io, const std::string &name, const Mode mode,
helper::Comm comm)
: Engine("BP3", io, name, mode, std::move(comm)), m_BP3Serializer(m_Comm),
m_FileDataManager(m_Comm), m_FileMetadataManager(m_Comm)
m_FileDataManager(io, m_Comm), m_FileMetadataManager(io, m_Comm)
{
PERFSTUBS_SCOPED_TIMER("BP3Writer::Open");
m_IO.m_ReadStreaming = false;
Expand Down
6 changes: 4 additions & 2 deletions source/adios2/engine/bp4/BP4Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ namespace engine
BP4Reader::BP4Reader(IO &io, const std::string &name, const Mode mode,
helper::Comm comm)
: Engine("BP4Reader", io, name, mode, std::move(comm)),
m_BP4Deserializer(m_Comm), m_MDFileManager(m_Comm), m_DataFileManager(m_Comm),
m_MDIndexFileManager(m_Comm), m_ActiveFlagFileManager(m_Comm)
m_BP4Deserializer(m_Comm), m_MDFileManager(io, m_Comm),
m_DataFileManager(io, m_Comm), m_MDIndexFileManager(io, m_Comm),
m_ActiveFlagFileManager(io, m_Comm)
{
PERFSTUBS_SCOPED_TIMER("BP4Reader::Open");
helper::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
Expand Down Expand Up @@ -825,6 +826,7 @@ void BP4Reader::DoClose(const int transportIndex)

m_DataFileManager.CloseFiles();
m_MDFileManager.CloseFiles();
m_MDIndexFileManager.CloseFiles();
}

#define declare_type(T) \
Expand Down
23 changes: 3 additions & 20 deletions source/adios2/engine/bp4/BP4Reader.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -88,26 +88,9 @@ void BP4Reader::ReadVariableBlocks(Variable<T> &variable)
m_Name, subStreamBoxInfo.SubStreamID,
m_BP4Deserializer.m_Minifooter.HasSubFiles, true);

std::string library;
helper::SetParameterValue(
"Library", m_IO.m_TransportsParameters[0], library);
helper::SetParameterValue(
"library", m_IO.m_TransportsParameters[0], library);
if (library == "Daos" || library == "daos")
{

m_DataFileManager.OpenFileID(
subFileName, subStreamBoxInfo.SubStreamID,
Mode::Read,
{{"transport", "File"}, {"library", "daos"}},
profile);
}
else
{
m_DataFileManager.OpenFileID(
subFileName, subStreamBoxInfo.SubStreamID,
Mode::Read, {{"transport", "File"}}, profile);
}
m_DataFileManager.OpenFileID(
subFileName, subStreamBoxInfo.SubStreamID, Mode::Read,
m_IO.m_TransportsParameters[0], profile);
}

char *buffer = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions source/adios2/engine/bp4/BP4Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ namespace engine
BP4Writer::BP4Writer(IO &io, const std::string &name, const Mode mode,
helper::Comm comm)
: Engine("BP4Writer", io, name, mode, std::move(comm)), m_BP4Serializer(m_Comm),
m_FileDataManager(m_Comm), m_FileMetadataManager(m_Comm),
m_FileMetadataIndexManager(m_Comm), m_FileDrainer()
m_FileDataManager(io, m_Comm), m_FileMetadataManager(io, m_Comm),
m_FileMetadataIndexManager(io, m_Comm), m_FileDrainer()
{
PERFSTUBS_SCOPED_TIMER("BP4Writer::Open");
helper::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
Expand Down
46 changes: 30 additions & 16 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ namespace engine

BP5Reader::BP5Reader(IO &io, const std::string &name, const Mode mode,
helper::Comm comm)
: Engine("BP5Reader", io, name, mode, std::move(comm)), m_MDFileManager(m_Comm),
m_DataFileManager(m_Comm), m_MDIndexFileManager(m_Comm),
m_FileMetaMetadataManager(m_Comm), m_ActiveFlagFileManager(m_Comm)
: Engine("BP5Reader", io, name, mode, std::move(comm)),
m_MDFileManager(io, m_Comm), m_DataFileManager(io, m_Comm),
m_MDIndexFileManager(io, m_Comm), m_FileMetaMetadataManager(io, m_Comm),
m_ActiveFlagFileManager(io, m_Comm)
{
PERFSTUBS_SCOPED_TIMER("BP5Reader::Open");
Init();
Expand Down Expand Up @@ -228,7 +229,8 @@ BP5Reader::ReadData(adios2::transportman::TransportMan &FileManager,
FileManager.CloseFiles((int)m->first);
}
FileManager.OpenFileID(subFileName, SubfileNum, Mode::Read,
{{"transport", "File"}}, false);
m_IO.m_TransportsParameters[0],
/*{{"transport", "File"}},*/ false);
}
TP endSubfile = NOW();
double timeSubfile = DURATION(startSubfile, endSubfile);
Expand Down Expand Up @@ -308,8 +310,7 @@ void BP5Reader::PerformGets()
return reqidx;
};

auto lf_Reader = [&](adios2::transportman::TransportMan FileManager,
const size_t maxOpenFiles)
auto lf_Reader = [&](const int FileManagerID, const size_t maxOpenFiles)
-> std::tuple<double, double, double, size_t> {
double copyTotal = 0.0;
double readTotal = 0.0;
Expand All @@ -329,9 +330,10 @@ void BP5Reader::PerformGets()
{
Req.DestinationAddr = buf.data();
}
std::pair<double, double> t = ReadData(
FileManager, maxOpenFiles, Req.WriterRank, Req.Timestep,
Req.StartOffset, Req.ReadLength, Req.DestinationAddr);
std::pair<double, double> t =
ReadData(fileManagers[FileManagerID], maxOpenFiles,
Req.WriterRank, Req.Timestep, Req.StartOffset,
Req.ReadLength, Req.DestinationAddr);

TP startCopy = NOW();
m_BP5Deserializer->FinalizeGet(Req, false);
Expand Down Expand Up @@ -361,19 +363,16 @@ void BP5Reader::PerformGets()

std::vector<std::future<std::tuple<double, double, double, size_t>>>
futures(nThreads - 1);
helper::Comm singleComm;
std::vector<transportman::TransportMan> fileManagers(
nThreads - 1, transportman::TransportMan(singleComm));

// launch Threads-1 threads to process subsets of requests,
// then main thread process the last subset
for (size_t tid = 0; tid < nThreads - 1; ++tid)
{
fileManagers[tid];
futures[tid] = std::async(std::launch::async, lf_Reader,
fileManagers[tid], maxOpenFiles);
futures[tid] = std::async(std::launch::async, lf_Reader, tid + 1,
maxOpenFiles);
}
// main thread runs last subset of reads
/*auto tMain = */ lf_Reader(m_DataFileManager, maxOpenFiles);
/*auto tMain = */ lf_Reader(0, maxOpenFiles);
/*{
double tSubfile = std::get<0>(tMain);
double tRead = std::get<1>(tMain);
Expand Down Expand Up @@ -504,6 +503,15 @@ void BP5Reader::InitParameters()
}
}

// Create m_Threads-1 extra file managers to be used by threads
// The main thread uses the DataFileManager pushed here to vector[0]
fileManagers.push_back(m_DataFileManager);
for (unsigned int i = 0; i < m_Threads - 1; ++i)
{
fileManagers.push_back(transportman::TransportMan(
transportman::TransportMan(m_IO, singleComm)));
}

size_t limit = helper::RaiseLimitNoFile();
if (m_Parameters.MaxOpenFilesAtOnce > limit - 8)
{
Expand Down Expand Up @@ -1229,6 +1237,12 @@ void BP5Reader::DoClose(const int transportIndex)
}
m_DataFileManager.CloseFiles();
m_MDFileManager.CloseFiles();
m_MDIndexFileManager.CloseFiles();
m_FileMetaMetadataManager.CloseFiles();
for (unsigned int i = 1; i < m_Threads; ++i)
{
fileManagers[i].CloseFiles();
}
}

// DoBlocksInfo will not be called because MinBlocksInfo is operative
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/engine/bp5/BP5Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,9 @@ class BP5Reader : public BP5Engine, public Engine
/* Communicator connecting ranks on each Compute Node.
Only used to calculate the number of threads available for reading */
helper::Comm m_NodeComm;
helper::Comm singleComm;
unsigned int m_Threads;
std::vector<transportman::TransportMan> fileManagers; // manager per thread
};

} // end namespace engine
Expand Down
Loading

0 comments on commit 3dd524f

Please sign in to comment.