From 11bd5f170cb12738004fd3679c2674917e2c5ec3 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Tue, 27 Dec 2022 13:44:08 -0500 Subject: [PATCH 1/2] Added the AWSSDK transport using the AWS SDK S3 API. New transport "awssdk" and transport parameters "endpoint=" and "cache=" Cache metadata files with AWSSDK transport. e.g. time ./bin/bpls -la adios-test-data/gs.bp -T "Library=awssdk,endpoint=https://projects.pawsey.org.au,cache=/tmp/cache" Other changes: Added ADIOS::GlobalServices to initialize and shutdown AWS once across simulatneous ADIOS objects BP5Reader: Create n-1 extra file managers for n threads in BP5 in Open, and reuse them during stepping through all the steps instead of creating new file managers in each PerformGets(). Dependecy: AWS SDK, namely the "s3;iam;sts" modules from the SDK. --- CMakeLists.txt | 5 +- cmake/DetectOptions.cmake | 10 + cmake/adios2-config-common.cmake.in | 6 + source/adios2/CMakeLists.txt | 5 + source/adios2/core/ADIOS.cpp | 83 +++- source/adios2/core/ADIOS.h | 13 + source/adios2/core/IO.cpp | 3 +- source/adios2/engine/bp3/BP3Reader.cpp | 2 +- source/adios2/engine/bp3/BP3Writer.cpp | 2 +- source/adios2/engine/bp4/BP4Reader.cpp | 6 +- source/adios2/engine/bp4/BP4Reader.tcc | 23 +- source/adios2/engine/bp4/BP4Writer.cpp | 4 +- source/adios2/engine/bp5/BP5Reader.cpp | 46 +- source/adios2/engine/bp5/BP5Reader.h | 2 + source/adios2/engine/bp5/BP5Writer.cpp | 4 +- source/adios2/engine/hdf5/HDF5ReaderP.cpp | 2 +- source/adios2/helper/adiosSystem.cpp | 4 +- source/adios2/helper/adiosSystem.h | 3 +- .../toolkit/transport/file/FileAWSSDK.cpp | 418 ++++++++++++++++++ .../toolkit/transport/file/FileAWSSDK.h | 120 +++++ .../toolkit/transportman/TransportMan.cpp | 14 +- .../toolkit/transportman/TransportMan.h | 7 +- 22 files changed, 729 insertions(+), 53 deletions(-) create mode 100644 source/adios2/toolkit/transport/file/FileAWSSDK.cpp create mode 100644 source/adios2/toolkit/transport/file/FileAWSSDK.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 95a1d668ec..cd8d453fd8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -165,6 +165,7 @@ adios_option(Profiling "Enable support for profiling" AUTO) adios_option(Endian_Reverse "Enable support for Little/Big Endian Interoperability" AUTO) adios_option(Sodium "Enable support for Sodium for encryption" AUTO) adios_option(Catalyst "Enable support for in situ visualization plugin using ParaView Catalyst" AUTO) +adios_option(AWSSDK "Enable support for S3 compatible storage using AWS SDK's S3 module" AUTO) include(${PROJECT_SOURCE_DIR}/cmake/DetectOptions.cmake) if(ADIOS2_HAVE_CUDA) @@ -225,7 +226,9 @@ endif() set(ADIOS2_CONFIG_OPTS - BP5 DataMan DataSpaces HDF5 HDF5_VOL MHS SST CUDA Fortran MPI Python Blosc Blosc2 BZip2 LIBPRESSIO MGARD PNG SZ ZFP DAOS IME O_DIRECT Sodium Catalyst SysVShMem ZeroMQ Profiling Endian_Reverse GPU_Support + BP5 DataMan DataSpaces HDF5 HDF5_VOL MHS SST CUDA Fortran MPI Python Blosc Blosc2 + BZip2 LIBPRESSIO MGARD PNG SZ ZFP DAOS IME O_DIRECT Sodium Catalyst SysVShMem ZeroMQ + Profiling Endian_Reverse GPU_Support AWSSDK ) GenerateADIOSHeaderConfig(${ADIOS2_CONFIG_OPTS}) diff --git a/cmake/DetectOptions.cmake b/cmake/DetectOptions.cmake index 81f1ade031..63300ec613 100644 --- a/cmake/DetectOptions.cmake +++ b/cmake/DetectOptions.cmake @@ -470,6 +470,16 @@ if(catalyst_FOUND) set(ADIOS2_HAVE_Catalyst TRUE) endif() +# AWS S3 +if(ADIOS2_USE_AWSSDK STREQUAL AUTO) + find_package(AWSSDK QUIET COMPONENTS s3) +elseif(ADIOS2_USE_AWSSDK) + find_package(AWSSDK REQUIRED COMPONENTS s3) +endif() +if(AWSSDK_FOUND) + set(ADIOS2_HAVE_AWSSDK TRUE) +endif() + # Multithreading find_package(Threads REQUIRED) diff --git a/cmake/adios2-config-common.cmake.in b/cmake/adios2-config-common.cmake.in index 8018ce2f44..88efce6254 100644 --- a/cmake/adios2-config-common.cmake.in +++ b/cmake/adios2-config-common.cmake.in @@ -142,6 +142,12 @@ if(NOT @BUILD_SHARED_LIBS@) find_dependency(catalyst) endif() + set(ADIOS2_HAVE_AWSSDK @ADIOS2_HAVE_AWSSDK@) + if(ADIOS2_HAVE_AWSSDK) + find_dependency(AWSSDK) + endif() + + adios2_add_thirdparty_target(pugixml) set(ADIOS2_USE_EXTERNAL_PUGIXML @ADIOS2_USE_EXTERNAL_PUGIXML@) if(ADIOS2_USE_EXTERNAL_PUGIXML) diff --git a/source/adios2/CMakeLists.txt b/source/adios2/CMakeLists.txt index 4285a6ab7a..b6e619ec35 100644 --- a/source/adios2/CMakeLists.txt +++ b/source/adios2/CMakeLists.txt @@ -138,6 +138,11 @@ if(UNIX) target_sources(adios2_core PRIVATE toolkit/transport/file/FilePOSIX.cpp) endif() +if(ADIOS2_HAVE_AWSSDK) + target_sources(adios2_core PRIVATE toolkit/transport/file/FileAWSSDK.cpp) + target_link_libraries(adios2_core PRIVATE ${AWSSDK_LINK_LIBRARIES}) +endif() + if (ADIOS2_HAVE_BP5) target_sources(adios2_core PRIVATE engine/bp5/BP5Engine.cpp diff --git a/source/adios2/core/ADIOS.cpp b/source/adios2/core/ADIOS.cpp index 744d660f4f..09be2dda5f 100644 --- a/source/adios2/core/ADIOS.cpp +++ b/source/adios2/core/ADIOS.cpp @@ -11,6 +11,7 @@ #include "ADIOS.h" #include // std::transform +#include #include #include //std::ios_base::failure #include @@ -27,18 +28,83 @@ #include "adios2/operator/callback/Signature1.h" #include "adios2/operator/callback/Signature2.h" +#ifdef ADIOS2_HAVE_AWSSDK +#include +#include +Aws::SDKOptions awdSDKOptions; +#endif + namespace adios2 { namespace core { +class ADIOS::GlobalServices +{ +public: + GlobalServices() { std::cout << "ADIOS Global() " << std::endl; } + + ~GlobalServices() { std::cout << "ADIOS ~Global()" << std::endl; } + + void CheckStatus() + { + if (wasGlobalShutdown) + { + helper::Throw( + "Core", "ADIOS::GlobalServices", "CheckStatus", + "Global Services was already shutdown. Make sure there is one " + "true global ADIOS object that is created first and destructed " + "last to ensure Global services are initialized only once"); + } + } + + void Finalize() + { + std::cout << "ADIOS Global Finalize() Enter" << std::endl; +#ifdef ADIOS2_HAVE_AWSSDK + if (isAWSInitialized) + { + std::cout << "ADIOS Global Finalize() call Aws::ShutdownAPI" + << std::endl; + Aws::ShutdownAPI(options); + isAWSInitialized = false; + } +#endif + wasGlobalShutdown = true; + } + +#ifdef ADIOS2_HAVE_AWSSDK + void Init_AWS_API() + { + std::cout << "ADIOS Global Init_AWS_API() Enter" << std::endl; + if (!isAWSInitialized) + { + std::cout << "ADIOS Global Init_AWS_API() call Aws::InitAPI" + << std::endl; + options.loggingOptions.logLevel = + Aws::Utils::Logging::LogLevel::Debug; + Aws::InitAPI(options); + isAWSInitialized = true; + } + } + Aws::SDKOptions options; + bool isAWSInitialized = false; +#endif + + bool wasGlobalShutdown = false; +}; + +ADIOS::GlobalServices ADIOS::m_GlobalServices; + std::mutex PerfStubsMutex; +static std::atomic_uint adios_refcount(0); ADIOS::ADIOS(const std::string configFile, helper::Comm comm, const std::string hostLanguage) : m_HostLanguage(hostLanguage), m_Comm(std::move(comm)), m_ConfigFile(configFile) { + ++adios_refcount; #ifdef PERFSTUBS_USE_TIMERS { std::lock_guard lck(PerfStubsMutex); @@ -86,7 +152,14 @@ ADIOS::ADIOS(const std::string hostLanguage) { } -ADIOS::~ADIOS() = default; +ADIOS::~ADIOS() +{ + --adios_refcount; + if (!adios_refcount) + { + m_GlobalServices.Finalize(); + } +} IO &ADIOS::DeclareIO(const std::string name, const ArrayOrdering ArrayOrder) { @@ -227,5 +300,13 @@ void ADIOS::YAMLInit(const std::string &configFileYAML) helper::ParseConfigYAML(*this, configFileYAML, m_IOs); } +void ADIOS::Global_init_AWS_API() +{ + m_GlobalServices.CheckStatus(); +#ifdef ADIOS2_HAVE_AWSSDK + m_GlobalServices.Init_AWS_API(); +#endif +} + } // end namespace core } // end namespace adios2 diff --git a/source/adios2/core/ADIOS.h b/source/adios2/core/ADIOS.h index 7863aa2125..1aca37472a 100644 --- a/source/adios2/core/ADIOS.h +++ b/source/adios2/core/ADIOS.h @@ -191,6 +191,19 @@ class ADIOS void XMLInit(const std::string &configFileXML); void YAMLInit(const std::string &configFileYAML); + +private: + /* Global services that we want to initialize at most once and shutdown + automatically when the ADIOS object is destructed. This only works + properly if the app creates an ADIOS object that is created before all + other ADIOS objects and is destructed after all other ADIOS objects are + destructed*/ + class GlobalServices; + static class GlobalServices m_GlobalServices; + +public: + /** Global service AWS SDK initialization */ + static void Global_init_AWS_API(); }; } // end namespace core diff --git a/source/adios2/core/IO.cpp b/source/adios2/core/IO.cpp index c52eb301f3..ec00bbbe87 100644 --- a/source/adios2/core/IO.cpp +++ b/source/adios2/core/IO.cpp @@ -580,7 +580,8 @@ Engine &IO::Open(const std::string &name, const Mode mode, helper::Comm comm) /* We need to figure out the type of file * from the file itself */ - if (helper::IsHDF5File(name, comm, m_TransportsParameters)) + if (helper::IsHDF5File(name, *this, comm, + m_TransportsParameters)) { engineTypeLC = "hdf5"; } diff --git a/source/adios2/engine/bp3/BP3Reader.cpp b/source/adios2/engine/bp3/BP3Reader.cpp index 355e7e5f22..63416a6eed 100644 --- a/source/adios2/engine/bp3/BP3Reader.cpp +++ b/source/adios2/engine/bp3/BP3Reader.cpp @@ -24,7 +24,7 @@ namespace engine BP3Reader::BP3Reader(IO &io, const std::string &name, const Mode mode, helper::Comm comm) : Engine("BP3", io, name, mode, std::move(comm)), m_BP3Deserializer(m_Comm), - m_FileManager(m_Comm), m_SubFileManager(m_Comm) + m_FileManager(io, m_Comm), m_SubFileManager(io, m_Comm) { PERFSTUBS_SCOPED_TIMER("BP3Reader::Open"); Init(); diff --git a/source/adios2/engine/bp3/BP3Writer.cpp b/source/adios2/engine/bp3/BP3Writer.cpp index 5260132e50..1f1618832e 100644 --- a/source/adios2/engine/bp3/BP3Writer.cpp +++ b/source/adios2/engine/bp3/BP3Writer.cpp @@ -28,7 +28,7 @@ namespace engine BP3Writer::BP3Writer(IO &io, const std::string &name, const Mode mode, helper::Comm comm) : Engine("BP3", io, name, mode, std::move(comm)), m_BP3Serializer(m_Comm), - m_FileDataManager(m_Comm), m_FileMetadataManager(m_Comm) + m_FileDataManager(io, m_Comm), m_FileMetadataManager(io, m_Comm) { PERFSTUBS_SCOPED_TIMER("BP3Writer::Open"); m_IO.m_ReadStreaming = false; diff --git a/source/adios2/engine/bp4/BP4Reader.cpp b/source/adios2/engine/bp4/BP4Reader.cpp index b0ffffefbe..f4e1f5c939 100644 --- a/source/adios2/engine/bp4/BP4Reader.cpp +++ b/source/adios2/engine/bp4/BP4Reader.cpp @@ -26,8 +26,9 @@ namespace engine BP4Reader::BP4Reader(IO &io, const std::string &name, const Mode mode, helper::Comm comm) : Engine("BP4Reader", io, name, mode, std::move(comm)), - m_BP4Deserializer(m_Comm), m_MDFileManager(m_Comm), m_DataFileManager(m_Comm), - m_MDIndexFileManager(m_Comm), m_ActiveFlagFileManager(m_Comm) + m_BP4Deserializer(m_Comm), m_MDFileManager(io, m_Comm), + m_DataFileManager(io, m_Comm), m_MDIndexFileManager(io, m_Comm), + m_ActiveFlagFileManager(io, m_Comm) { PERFSTUBS_SCOPED_TIMER("BP4Reader::Open"); helper::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity); @@ -825,6 +826,7 @@ void BP4Reader::DoClose(const int transportIndex) m_DataFileManager.CloseFiles(); m_MDFileManager.CloseFiles(); + m_MDIndexFileManager.CloseFiles(); } #define declare_type(T) \ diff --git a/source/adios2/engine/bp4/BP4Reader.tcc b/source/adios2/engine/bp4/BP4Reader.tcc index 05c43f711c..99af2c8d28 100644 --- a/source/adios2/engine/bp4/BP4Reader.tcc +++ b/source/adios2/engine/bp4/BP4Reader.tcc @@ -88,26 +88,9 @@ void BP4Reader::ReadVariableBlocks(Variable &variable) m_Name, subStreamBoxInfo.SubStreamID, m_BP4Deserializer.m_Minifooter.HasSubFiles, true); - std::string library; - helper::SetParameterValue( - "Library", m_IO.m_TransportsParameters[0], library); - helper::SetParameterValue( - "library", m_IO.m_TransportsParameters[0], library); - if (library == "Daos" || library == "daos") - { - - m_DataFileManager.OpenFileID( - subFileName, subStreamBoxInfo.SubStreamID, - Mode::Read, - {{"transport", "File"}, {"library", "daos"}}, - profile); - } - else - { - m_DataFileManager.OpenFileID( - subFileName, subStreamBoxInfo.SubStreamID, - Mode::Read, {{"transport", "File"}}, profile); - } + m_DataFileManager.OpenFileID( + subFileName, subStreamBoxInfo.SubStreamID, Mode::Read, + m_IO.m_TransportsParameters[0], profile); } char *buffer = nullptr; diff --git a/source/adios2/engine/bp4/BP4Writer.cpp b/source/adios2/engine/bp4/BP4Writer.cpp index bcb86792b6..d33e885020 100644 --- a/source/adios2/engine/bp4/BP4Writer.cpp +++ b/source/adios2/engine/bp4/BP4Writer.cpp @@ -30,8 +30,8 @@ namespace engine BP4Writer::BP4Writer(IO &io, const std::string &name, const Mode mode, helper::Comm comm) : Engine("BP4Writer", io, name, mode, std::move(comm)), m_BP4Serializer(m_Comm), - m_FileDataManager(m_Comm), m_FileMetadataManager(m_Comm), - m_FileMetadataIndexManager(m_Comm), m_FileDrainer() + m_FileDataManager(io, m_Comm), m_FileMetadataManager(io, m_Comm), + m_FileMetadataIndexManager(io, m_Comm), m_FileDrainer() { PERFSTUBS_SCOPED_TIMER("BP4Writer::Open"); helper::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity); diff --git a/source/adios2/engine/bp5/BP5Reader.cpp b/source/adios2/engine/bp5/BP5Reader.cpp index eeb733bca5..f82ca482f1 100644 --- a/source/adios2/engine/bp5/BP5Reader.cpp +++ b/source/adios2/engine/bp5/BP5Reader.cpp @@ -30,9 +30,10 @@ namespace engine BP5Reader::BP5Reader(IO &io, const std::string &name, const Mode mode, helper::Comm comm) -: Engine("BP5Reader", io, name, mode, std::move(comm)), m_MDFileManager(m_Comm), - m_DataFileManager(m_Comm), m_MDIndexFileManager(m_Comm), - m_FileMetaMetadataManager(m_Comm), m_ActiveFlagFileManager(m_Comm) +: Engine("BP5Reader", io, name, mode, std::move(comm)), + m_MDFileManager(io, m_Comm), m_DataFileManager(io, m_Comm), + m_MDIndexFileManager(io, m_Comm), m_FileMetaMetadataManager(io, m_Comm), + m_ActiveFlagFileManager(io, m_Comm) { PERFSTUBS_SCOPED_TIMER("BP5Reader::Open"); Init(); @@ -228,7 +229,8 @@ BP5Reader::ReadData(adios2::transportman::TransportMan &FileManager, FileManager.CloseFiles((int)m->first); } FileManager.OpenFileID(subFileName, SubfileNum, Mode::Read, - {{"transport", "File"}}, false); + m_IO.m_TransportsParameters[0], + /*{{"transport", "File"}},*/ false); } TP endSubfile = NOW(); double timeSubfile = DURATION(startSubfile, endSubfile); @@ -308,8 +310,7 @@ void BP5Reader::PerformGets() return reqidx; }; - auto lf_Reader = [&](adios2::transportman::TransportMan FileManager, - const size_t maxOpenFiles) + auto lf_Reader = [&](const int FileManagerID, const size_t maxOpenFiles) -> std::tuple { double copyTotal = 0.0; double readTotal = 0.0; @@ -329,9 +330,10 @@ void BP5Reader::PerformGets() { Req.DestinationAddr = buf.data(); } - std::pair t = ReadData( - FileManager, maxOpenFiles, Req.WriterRank, Req.Timestep, - Req.StartOffset, Req.ReadLength, Req.DestinationAddr); + std::pair t = + ReadData(fileManagers[FileManagerID], maxOpenFiles, + Req.WriterRank, Req.Timestep, Req.StartOffset, + Req.ReadLength, Req.DestinationAddr); TP startCopy = NOW(); m_BP5Deserializer->FinalizeGet(Req, false); @@ -361,19 +363,16 @@ void BP5Reader::PerformGets() std::vector>> futures(nThreads - 1); - helper::Comm singleComm; - std::vector fileManagers( - nThreads - 1, transportman::TransportMan(singleComm)); + // launch Threads-1 threads to process subsets of requests, // then main thread process the last subset for (size_t tid = 0; tid < nThreads - 1; ++tid) { - fileManagers[tid]; - futures[tid] = std::async(std::launch::async, lf_Reader, - fileManagers[tid], maxOpenFiles); + futures[tid] = std::async(std::launch::async, lf_Reader, tid + 1, + maxOpenFiles); } // main thread runs last subset of reads - /*auto tMain = */ lf_Reader(m_DataFileManager, maxOpenFiles); + /*auto tMain = */ lf_Reader(0, maxOpenFiles); /*{ double tSubfile = std::get<0>(tMain); double tRead = std::get<1>(tMain); @@ -504,6 +503,15 @@ void BP5Reader::InitParameters() } } + // Create m_Threads-1 extra file managers to be used by threads + // The main thread uses the DataFileManager pushed here to vector[0] + fileManagers.push_back(m_DataFileManager); + for (unsigned int i = 0; i < m_Threads - 1; ++i) + { + fileManagers.push_back(transportman::TransportMan( + transportman::TransportMan(m_IO, singleComm))); + } + size_t limit = helper::RaiseLimitNoFile(); if (m_Parameters.MaxOpenFilesAtOnce > limit - 8) { @@ -1229,6 +1237,12 @@ void BP5Reader::DoClose(const int transportIndex) } m_DataFileManager.CloseFiles(); m_MDFileManager.CloseFiles(); + m_MDIndexFileManager.CloseFiles(); + m_FileMetaMetadataManager.CloseFiles(); + for (unsigned int i = 1; i < m_Threads; ++i) + { + fileManagers[i].CloseFiles(); + } } // DoBlocksInfo will not be called because MinBlocksInfo is operative diff --git a/source/adios2/engine/bp5/BP5Reader.h b/source/adios2/engine/bp5/BP5Reader.h index c1d6c887c9..6b947764b3 100644 --- a/source/adios2/engine/bp5/BP5Reader.h +++ b/source/adios2/engine/bp5/BP5Reader.h @@ -261,7 +261,9 @@ class BP5Reader : public BP5Engine, public Engine /* Communicator connecting ranks on each Compute Node. Only used to calculate the number of threads available for reading */ helper::Comm m_NodeComm; + helper::Comm singleComm; unsigned int m_Threads; + std::vector fileManagers; // manager per thread }; } // end namespace engine diff --git a/source/adios2/engine/bp5/BP5Writer.cpp b/source/adios2/engine/bp5/BP5Writer.cpp index 71bc982cc3..36159e8755 100644 --- a/source/adios2/engine/bp5/BP5Writer.cpp +++ b/source/adios2/engine/bp5/BP5Writer.cpp @@ -36,8 +36,8 @@ using namespace adios2::format; BP5Writer::BP5Writer(IO &io, const std::string &name, const Mode mode, helper::Comm comm) : Engine("BP5Writer", io, name, mode, std::move(comm)), m_BP5Serializer(), - m_FileDataManager(m_Comm), m_FileMetadataManager(m_Comm), - m_FileMetadataIndexManager(m_Comm), m_FileMetaMetadataManager(m_Comm), + m_FileDataManager(io, m_Comm), m_FileMetadataManager(io, m_Comm), + m_FileMetadataIndexManager(io, m_Comm), m_FileMetaMetadataManager(io, m_Comm), m_Profiler(m_Comm) { m_EngineStart = Now(); diff --git a/source/adios2/engine/hdf5/HDF5ReaderP.cpp b/source/adios2/engine/hdf5/HDF5ReaderP.cpp index db1eaf155b..17149c9ac8 100644 --- a/source/adios2/engine/hdf5/HDF5ReaderP.cpp +++ b/source/adios2/engine/hdf5/HDF5ReaderP.cpp @@ -35,7 +35,7 @@ HDF5ReaderP::HDF5ReaderP(IO &io, const std::string &name, const Mode openMode, helper::Comm comm) : Engine("HDF5Reader", io, name, openMode, std::move(comm)) { - if (!helper::IsHDF5File(name, m_Comm, {})) + if (!helper::IsHDF5File(name, io, m_Comm, {})) { helper::Throw( "Engine", "HDF5ReaderP", "HDF5ReaderP", "Invalid HDF5 file found"); diff --git a/source/adios2/helper/adiosSystem.cpp b/source/adios2/helper/adiosSystem.cpp index 58677b144b..e3e5d79a01 100644 --- a/source/adios2/helper/adiosSystem.cpp +++ b/source/adios2/helper/adiosSystem.cpp @@ -126,7 +126,7 @@ int ExceptionToError(const std::string &function) } } -bool IsHDF5File(const std::string &name, helper::Comm &comm, +bool IsHDF5File(const std::string &name, core::IO &io, helper::Comm &comm, const std::vector &transportsParameters) noexcept { bool isHDF5 = false; @@ -134,7 +134,7 @@ bool IsHDF5File(const std::string &name, helper::Comm &comm, { try { - transportman::TransportMan tm(comm); + transportman::TransportMan tm(io, comm); if (transportsParameters.empty()) { std::vector defaultTransportParameters(1); diff --git a/source/adios2/helper/adiosSystem.h b/source/adios2/helper/adiosSystem.h index 5f0940c3b2..cb7b221ee6 100644 --- a/source/adios2/helper/adiosSystem.h +++ b/source/adios2/helper/adiosSystem.h @@ -18,6 +18,7 @@ /// \endcond #include "adios2/common/ADIOSTypes.h" +#include "adios2/core/IO.h" #include "adios2/helper/adiosComm.h" namespace adios2 @@ -75,7 +76,7 @@ bool IsZeroIndexed(const std::string hostLanguage) noexcept; */ int ExceptionToError(const std::string &function); -bool IsHDF5File(const std::string &name, helper::Comm &comm, +bool IsHDF5File(const std::string &name, core::IO &io, helper::Comm &comm, const std::vector &transportsParameters) noexcept; char BPVersion(const std::string &name, helper::Comm &comm, const std::vector &transportsParameters) noexcept; diff --git a/source/adios2/toolkit/transport/file/FileAWSSDK.cpp b/source/adios2/toolkit/transport/file/FileAWSSDK.cpp new file mode 100644 index 0000000000..43cb271513 --- /dev/null +++ b/source/adios2/toolkit/transport/file/FileAWSSDK.cpp @@ -0,0 +1,418 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * FileDescriptor.h wrapper of AWSSDK library functions for file I/O over S3 + * protocol + * + * Created on: Dec 5, 2022 + * Author: Norbert Podhorszki, pnorbert@ornl.gov + */ +#include "FileAWSSDK.h" +#include "adios2/core/ADIOS.h" +#include "adios2/helper/adiosLog.h" +#include "adios2/helper/adiosString.h" +#include "adios2/helper/adiosSystem.h" + +#include // remove +#include // strerror +#include // errno +#include // open +#include +#include // open, fstat +#include // open +#include // write, close, ftruncate + +/// \cond EXCLUDE_FROM_DOXYGEN +#include //std::ios_base::failure +/// \endcond + +namespace adios2 +{ +namespace transport +{ + +FileAWSSDK::FileAWSSDK(helper::Comm const &comm) +: Transport("File", "AWSSDK", comm) /*, m_Impl(&m_ImplSingleton)*/ +{ +} + +FileAWSSDK::~FileAWSSDK() +{ + if (m_IsOpen) + { + Close(); + } +} + +void FileAWSSDK::SetParameters(const Params ¶ms) +{ + // Parameters are set from config parameters if present + // Otherwise, they are set from environment if present + // Otherwise, they remain at their default value + + helper::SetParameterValue("endpoint", params, m_Endpoint); + if (m_Endpoint.empty()) + { + if (const char *epEnv = std::getenv("AWS_ENDPOINT")) + { + m_Endpoint = std::string(epEnv); + } + } + + helper::SetParameterValue("cache", params, m_CachePath); + if (m_CachePath.empty()) + { + if (const char *Env = std::getenv("AWS_CACHE")) + { + m_CachePath = std::string(Env); + } + } + + core::ADIOS::Global_init_AWS_API(); + + s3ClientConfig = new Aws::S3::S3ClientConfiguration; + s3ClientConfig->endpointOverride = m_Endpoint; + s3ClientConfig->useVirtualAddressing = false; + s3ClientConfig->enableEndpointDiscovery = false; + + s3Client = new Aws::S3::S3Client(*s3ClientConfig); + std::cout << "AWS Transport created with endpoint = '" << m_Endpoint << "'" + << std::endl; +} + +void FileAWSSDK::WaitForOpen() +{ + if (m_IsOpening) + { + if (m_OpenFuture.valid()) + { + // m_FileDescriptor = m_OpenFuture.get(); + } + m_IsOpening = false; + CheckFile("couldn't open file " + m_Name + ", in call to AWSSDK open"); + m_IsOpen = true; + } +} + +void FileAWSSDK::SetUpCache() +{ + if (!m_CachePath.empty()) + { + if (helper::EndsWith(m_ObjectName, "md.idx") || + helper::EndsWith(m_ObjectName, "md.0") || + helper::EndsWith(m_ObjectName, "mmd.0")) + { + m_CachingThisFile = true; + } + } + + if (m_CachingThisFile) + { + std::string const ep = + std::regex_replace(m_Endpoint, std::regex("/|:"), "_"); + + m_CacheFileWrite = new FileFStream(m_Comm); + const std::string path(m_CachePath + PathSeparator + ep + + PathSeparator + m_BucketName + PathSeparator + + m_ObjectName); + m_CacheFilePath = path; + const auto lastPathSeparator(path.find_last_of(PathSeparator)); + if (lastPathSeparator != std::string::npos) + { + const std::string dirpath(path.substr(0, lastPathSeparator)); + helper::CreateDirectory(dirpath); + } + + m_CacheFileRead = new FileFStream(m_Comm); + try + { + m_CacheFileRead->Open(path, Mode::Read); + if (m_CacheFileRead->GetSize() == m_Size) + { + m_IsCached = true; + m_CachingThisFile = false; + delete m_CacheFileWrite; + std::cout << "Already cached " << path << std::endl; + } + } + catch (std::ios_base::failure &) + { + delete m_CacheFileRead; + } + + if (m_CachingThisFile) + { + m_CacheFileWrite->Open(path, Mode::Write); + std::cout << "Caching turn on for " << path << std::endl; + } + } +} + +void FileAWSSDK::Open(const std::string &name, const Mode openMode, + const bool async, const bool directio) +{ + m_Name = name; + + size_t pos = name.find(PathSeparator); + if (pos == std::string::npos) + { + helper::Throw( + "Toolkit", "transport::file::FileAWSSDK", "Open", + "invalid 'bucket/object' name " + name); + } + m_BucketName = name.substr(0, pos); + m_ObjectName = name.substr(pos + 1); + + m_OpenMode = openMode; + switch (m_OpenMode) + { + + case Mode::Write: + case Mode::Append: + helper::Throw( + "Toolkit", "transport::file::FileAWSSDK", "Open", + "does not support writing yet " + m_Name); + break; + + case Mode::Read: + { + ProfilerStart("open"); + errno = 0; + Aws::S3::Model::HeadObjectRequest head_object_request; + head_object_request.SetBucket(m_BucketName); + head_object_request.SetKey(m_ObjectName); + + std::cout << "S3 HeadObjectRequests bucket='" + << head_object_request.GetBucket() << "' object = '" + << head_object_request.GetKey() << "'" << std::endl; + + head_object = s3Client->HeadObject(head_object_request); + if (!head_object.IsSuccess()) + { + helper::Throw( + "Toolkit", "transport::file::FileAWSSDK", "Open", + "'bucket/object' " + m_Name + " does not exist "); + } + else + { + m_Size = head_object.GetResult().GetContentLength(); + + /* Cache: check if we want to cache this file (metadata files) */ + SetUpCache(); + } + + m_Errno = errno; + ProfilerStop("open"); + break; + } + default: + CheckFile("unknown open mode for file " + m_Name + + ", in call to AWSSDK open"); + } + + if (!m_IsOpening) + { + CheckFile("couldn't open file " + m_Name + ", in call to AWSSDK open"); + m_IsOpen = true; + } +} + +void FileAWSSDK::OpenChain(const std::string &name, Mode openMode, + const helper::Comm &chainComm, const bool async, + const bool directio) +{ + int token = 1; + if (chainComm.Rank() > 0) + { + chainComm.Recv(&token, 1, chainComm.Rank() - 1, 0, + "Chain token in FileAWSSDK::OpenChain"); + } + + Open(name, openMode, async, directio); + + if (chainComm.Rank() < chainComm.Size() - 1) + { + chainComm.Isend(&token, 1, chainComm.Rank() + 1, 0, + "Sending Chain token in FileAWSSDK::OpenChain"); + } +} + +void FileAWSSDK::Write(const char *buffer, size_t size, size_t start) +{ + helper::Throw( + "Toolkit", "transport::file::FileAWSSDK", "Write", + "does not support writing yet " + m_Name); +} + +void FileAWSSDK::Read(char *buffer, size_t size, size_t start) +{ + WaitForOpen(); + + if (start != MaxSizeT) + { + if (start >= m_Size) + { + helper::Throw( + "Toolkit", "transport::file::FileAWSSDK", "Read", + "couldn't move to start position " + std::to_string(start) + + " beyond the size of " + m_Name + " which is " + + std::to_string(m_Size)); + } + m_SeekPos = start; + errno = 0; + m_Errno = errno; + } + + if (m_SeekPos + size > m_Size) + { + helper::Throw( + "Toolkit", "transport::file::FileAWSSDK", "Read", + "can't read " + std::to_string(size) + " bytes from position " + + std::to_string(m_SeekPos) + " from " + m_Name + + " whose size is " + std::to_string(m_Size)); + } + + if (m_IsCached) + { + m_CacheFileRead->Read(buffer, size, m_SeekPos); + std::cout << "Read from cache " << m_CacheFileRead->m_Name + << " start = " << m_SeekPos << " size = " << size + << std::endl; + return; + } + + Aws::S3::Model::GetObjectRequest request; + request.SetBucket(m_BucketName); + request.SetKey(m_ObjectName); + std::stringstream range; + range << "bytes=" << m_SeekPos << "-" << m_SeekPos + size - 1; + request.SetRange(range.str()); + + Aws::S3::Model::GetObjectOutcome outcome = s3Client->GetObject(request); + + if (!outcome.IsSuccess()) + { + const Aws::S3::S3Error &err = outcome.GetError(); + helper::Throw( + "Toolkit", "transport::file::FileAWSSDK", "Read", + "'bucket/object' " + m_Name + ", range " + range.str() + + "GetObject: " + err.GetExceptionName() + ": " + + err.GetMessage()); + } + else + { + std::cout << "Successfully retrieved '" << m_ObjectName << "' from '" + << m_BucketName << "'." + << "\nObject length = " + << outcome.GetResult().GetContentLength() + << "\nRange requested = " << range.str() << std::endl; + auto body = outcome.GetResult().GetBody().rdbuf(); + body->sgetn(buffer, size); + + /* Save to cache */ + if (m_CachingThisFile) + { + m_CacheFileWrite->Write(buffer, size, m_SeekPos); + std::cout << "Written to cache " << m_CacheFileWrite->m_Name + << " start = " << m_SeekPos << " size = " << size + << std::endl; + } + } +} + +size_t FileAWSSDK::GetSize() +{ + WaitForOpen(); + switch (m_OpenMode) + { + case Mode::Write: + case Mode::Append: + return 0; + break; + case Mode::Read: + return m_Size; + break; + default: + return 0; + } +} + +void FileAWSSDK::Flush() {} + +void FileAWSSDK::Close() +{ + WaitForOpen(); + std::cout << "FileAWSSDK::Close(" << m_Name << ") Enter" << std::endl; + ProfilerStart("close"); + errno = 0; + m_Errno = errno; + if (s3Client) + { + delete s3Client; + s3Client = nullptr; + } + if (s3ClientConfig) + { + delete s3ClientConfig; + s3ClientConfig = nullptr; + } + if (m_CachingThisFile) + { + m_CacheFileWrite->Close(); + } + if (m_IsCached) + { + m_CacheFileRead->Close(); + } + + m_IsOpen = false; + ProfilerStop("close"); +} + +void FileAWSSDK::Delete() +{ + WaitForOpen(); + if (m_IsOpen) + { + Close(); + } + std::remove(m_Name.c_str()); +} + +void FileAWSSDK::CheckFile(const std::string hint) const +{ + if (!head_object.IsSuccess()) + { + helper::Throw( + "Toolkit", "transport::file::FileAWSSDK", "CheckFile", hint); + } +} + +void FileAWSSDK::SeekToEnd() { m_SeekPos = MaxSizeT; } + +void FileAWSSDK::SeekToBegin() { m_SeekPos = 0; } + +void FileAWSSDK::Seek(const size_t start) +{ + if (start != MaxSizeT) + { + m_SeekPos = start; + } + else + { + SeekToEnd(); + } +} + +void FileAWSSDK::Truncate(const size_t length) +{ + helper::Throw( + "Toolkit", "transport::file::FileAWSSDK", "Truncate", + "does not support truncating " + m_Name); +} + +void FileAWSSDK::MkDir(const std::string &fileName) {} + +} // end namespace transport +} // end namespace adios2 diff --git a/source/adios2/toolkit/transport/file/FileAWSSDK.h b/source/adios2/toolkit/transport/file/FileAWSSDK.h new file mode 100644 index 0000000000..552251738d --- /dev/null +++ b/source/adios2/toolkit/transport/file/FileAWSSDK.h @@ -0,0 +1,120 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * FileDescriptor.h wrapper of AWSSDK library functions for file I/O over S3 + * protocol + * + * Created on: Dec 5, 2022 + * Author: Norbert Podhorszki, pnorbert@ornl.gov + */ + +#ifndef ADIOS2_TOOLKIT_TRANSPORT_FILE_AWSSDK_H_ +#define ADIOS2_TOOLKIT_TRANSPORT_FILE_AWSSDK_H_ + +#include //std::async, std::future + +#include "adios2/common/ADIOSConfig.h" +#include "adios2/toolkit/transport/Transport.h" +#include "adios2/toolkit/transport/file/FileFStream.h" + +#include +#include +#include +#include +#include + +namespace adios2 +{ +namespace helper +{ +class Comm; +} +namespace transport +{ + +/** File descriptor transport using the AWSSDK IO library */ +class FileAWSSDK : public Transport +{ + +public: + FileAWSSDK(helper::Comm const &comm); + + ~FileAWSSDK(); + + void SetParameters(const Params ¶meters); + + void Open(const std::string &name, const Mode openMode, + const bool async = false, const bool directio = false) final; + + void OpenChain(const std::string &name, Mode openMode, + const helper::Comm &chainComm, const bool async = false, + const bool directio = false) final; + + void Write(const char *buffer, size_t size, size_t start = MaxSizeT) final; + +#ifdef REALLY_WANT_WRITEV + /* Actual writev() function, inactive for now */ + void WriteV(const core::iovec *iov, const int iovcnt, + size_t start = MaxSizeT) final; +#endif + + void Read(char *buffer, size_t size, size_t start = MaxSizeT) final; + + size_t GetSize() final; + + /** Does nothing, each write is supposed to flush */ + void Flush() final; + + void Close() final; + + void Delete() final; + + void SeekToEnd() final; + + void SeekToBegin() final; + + void Seek(const size_t start = MaxSizeT) final; + + void Truncate(const size_t length) final; + + void MkDir(const std::string &fileName) final; + +private: + // class Impl; + // static class Impl m_ImplSingleton; + // Impl *m_Impl; + // std::unique_ptr m_Impl; + Aws::S3::S3ClientConfiguration *s3ClientConfig = nullptr; + Aws::S3::S3Client *s3Client = nullptr; + /** AWSSDK file handle returned by Open */ + std::string m_Endpoint; + Aws::S3::Model::HeadObjectOutcome head_object; + std::string m_BucketName; + std::string m_ObjectName; + int m_Errno = 0; + bool m_IsOpening = false; + std::future m_OpenFuture; + size_t m_SeekPos = 0; + size_t m_Size = 0; + + void SetUpCache(); + std::string m_CachePath; // local cache directory + bool m_CachingThisFile = false; // save content to local cache + FileFStream *m_CacheFileWrite; + bool m_IsCached = false; // true if file is already in cache + FileFStream *m_CacheFileRead; + std::string m_CacheFilePath; // full path to file in cache + + /** + * Check if m_FileDescriptor is -1 after an operation + * @param hint exception message + */ + void CheckFile(const std::string hint) const; + void WaitForOpen(); +}; + +} // end namespace transport +} // end namespace adios2 + +#endif /* ADIOS2_TRANSPORT_FILE_AWSSDK_H_ */ diff --git a/source/adios2/toolkit/transportman/TransportMan.cpp b/source/adios2/toolkit/transportman/TransportMan.cpp index 4c81b218fb..c2e21b6aec 100644 --- a/source/adios2/toolkit/transportman/TransportMan.cpp +++ b/source/adios2/toolkit/transportman/TransportMan.cpp @@ -26,6 +26,9 @@ #ifdef ADIOS2_HAVE_IME #include "adios2/toolkit/transport/file/FileIME.h" #endif +#ifdef ADIOS2_HAVE_AWSSDK +#include "adios2/toolkit/transport/file/FileAWSSDK.h" +#endif #ifdef _WIN32 #pragma warning(disable : 4503) // length of std::function inside std::async @@ -40,7 +43,10 @@ namespace adios2 namespace transportman { -TransportMan::TransportMan(helper::Comm &comm) : m_Comm(comm) {} +TransportMan::TransportMan(core::IO &io, helper::Comm &comm) +: m_IO(io), m_Comm(comm) +{ +} void TransportMan::MkDirsBarrier(const std::vector &fileNames, const std::vector ¶metersVector, @@ -593,6 +599,12 @@ std::shared_ptr TransportMan::OpenFileTransport( { transport = std::make_shared(m_Comm); } +#endif +#ifdef ADIOS2_HAVE_AWSSDK + else if (library == "awssdk") + { + transport = std::make_shared(m_Comm); + } #endif else if (library == "null") { diff --git a/source/adios2/toolkit/transportman/TransportMan.h b/source/adios2/toolkit/transportman/TransportMan.h index ba60c84897..32ae4b1343 100644 --- a/source/adios2/toolkit/transportman/TransportMan.h +++ b/source/adios2/toolkit/transportman/TransportMan.h @@ -26,6 +26,10 @@ namespace helper { class Comm; } +namespace core +{ +class IO; +} namespace transportman { @@ -46,7 +50,7 @@ class TransportMan * Unique base constructor * @param comm */ - TransportMan(helper::Comm &comm); + TransportMan(core::IO &IO, helper::Comm &comm); virtual ~TransportMan() = default; @@ -215,6 +219,7 @@ class TransportMan const bool profile); protected: + core::IO &m_IO; helper::Comm const &m_Comm; std::shared_ptr From 7f477672218b21a952bb1431c29c02ddf1ce50e9 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Tue, 3 Jan 2023 08:48:53 -0500 Subject: [PATCH 2/2] remove debug prints from GlobalServices --- source/adios2/core/ADIOS.cpp | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/source/adios2/core/ADIOS.cpp b/source/adios2/core/ADIOS.cpp index 09be2dda5f..cd9b83f538 100644 --- a/source/adios2/core/ADIOS.cpp +++ b/source/adios2/core/ADIOS.cpp @@ -42,9 +42,9 @@ namespace core class ADIOS::GlobalServices { public: - GlobalServices() { std::cout << "ADIOS Global() " << std::endl; } + GlobalServices() {} - ~GlobalServices() { std::cout << "ADIOS ~Global()" << std::endl; } + ~GlobalServices() {} void CheckStatus() { @@ -60,12 +60,9 @@ class ADIOS::GlobalServices void Finalize() { - std::cout << "ADIOS Global Finalize() Enter" << std::endl; #ifdef ADIOS2_HAVE_AWSSDK if (isAWSInitialized) { - std::cout << "ADIOS Global Finalize() call Aws::ShutdownAPI" - << std::endl; Aws::ShutdownAPI(options); isAWSInitialized = false; } @@ -76,11 +73,8 @@ class ADIOS::GlobalServices #ifdef ADIOS2_HAVE_AWSSDK void Init_AWS_API() { - std::cout << "ADIOS Global Init_AWS_API() Enter" << std::endl; if (!isAWSInitialized) { - std::cout << "ADIOS Global Init_AWS_API() call Aws::InitAPI" - << std::endl; options.loggingOptions.logLevel = Aws::Utils::Logging::LogLevel::Debug; Aws::InitAPI(options);