Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MaxShmSize parameter for BP5 to control the segment size by the user.… #2814

Merged
merged 1 commit into from
Aug 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions source/adios2/common/ADIOSTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ constexpr size_t DefaultMinDeferredSize = 4 * 1024 * 1024;
* 2Gb - 100Kb (tolerance)*/
constexpr size_t DefaultMaxFileBatchSize = 2147381248;

/** default maximum shared memory segment size
* 128Mb */
constexpr uint64_t DefaultMaxShmSize = 128 * 1024 * 1024;

constexpr char PathSeparator =
#ifdef _WIN32
'\\';
Expand Down
1 change: 1 addition & 0 deletions source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class BP5Engine
MACRO(InitialBufferSize, SizeBytes, size_t, DefaultInitialBufferSize) \
MACRO(MinDeferredSize, SizeBytes, size_t, DefaultMinDeferredSize) \
MACRO(BufferChunkSize, SizeBytes, size_t, DefaultBufferChunkSize) \
MACRO(MaxShmSize, SizeBytes, size_t, DefaultMaxShmSize) \
MACRO(BufferVType, BufferVType, int, (int)BufferVType::ChunkVType) \
MACRO(ReaderShortCircuitReads, Bool, bool, false)

Expand Down
18 changes: 16 additions & 2 deletions source/adios2/engine/bp5/BP5Writer_TwoLevelShm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

#include "adios2/common/ADIOSMacros.h"
#include "adios2/core/IO.h"
#include "adios2/helper/adiosFunctions.h" //CheckIndexRange
#include "adios2/helper/adiosFunctions.h" //CheckIndexRange, PaddingToAlignOffset
#include "adios2/toolkit/format/buffer/chunk/ChunkV.h"
#include "adios2/toolkit/format/buffer/malloc/MallocV.h"
#include "adios2/toolkit/transport/file/FileFStream.h"
Expand All @@ -31,7 +31,7 @@ using namespace adios2::format;

void BP5Writer::WriteData_TwoLevelShm(format::BufferV *Data)
{
const aggregator::MPIShmChain *a =
aggregator::MPIShmChain *a =
dynamic_cast<aggregator::MPIShmChain *>(m_Aggregator);

format::BufferV::BufferV_iovec DataVec = Data->DataVec();
Expand Down Expand Up @@ -64,9 +64,19 @@ void BP5Writer::WriteData_TwoLevelShm(format::BufferV *Data)
// This calculation is valid on aggregators only
std::vector<uint64_t> mySizes = a->m_Comm.GatherValues(Data->Size());
uint64_t myTotalSize = 0;
uint64_t maxSize = 0;
for (auto s : mySizes)
{
myTotalSize += s;
if (s > maxSize)
{
maxSize = s;
}
}

if (a->m_Comm.Size() > 1)
{
a->CreateShm(static_cast<size_t>(maxSize), m_Parameters.MaxShmSize);
}

if (a->m_IsAggregator)
Expand Down Expand Up @@ -154,6 +164,10 @@ void BP5Writer::WriteData_TwoLevelShm(format::BufferV *Data)
}
}

if (a->m_Comm.Size() > 1)
{
a->DestroyShm();
}
delete[] DataVec;
}

Expand Down
62 changes: 45 additions & 17 deletions source/adios2/toolkit/aggregator/mpi/MPIShmChain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
*/
#include "MPIShmChain.h"

#include "adios2/helper/adiosMemory.h" // PaddingToAlignOffset

#include <iostream>

namespace adios2
Expand Down Expand Up @@ -161,7 +163,7 @@ void MPIShmChain::Init(const size_t numAggregators, const size_t subStreams,
HandshakeLinks_Start(m_Comm, hs);

/* Create the shared memory segment */
CreateShm();
// CreateShm();

HandshakeLinks_Complete(hs);
}
Expand Down Expand Up @@ -212,30 +214,56 @@ void MPIShmChain::HandshakeLinks_Complete(HandshakeStruct &hs)
"aggregator, at Open");
}

void MPIShmChain::CreateShm()
void MPIShmChain::CreateShm(size_t blocksize, const size_t maxsegmentsize)
{
void *ptr;
if (!m_Comm.IsMPI())
{
throw std::runtime_error("Coding Error: MPIShmChain::CreateShm was "
"called with a non-MPI communicator");
}
char *ptr;
size_t structsize = sizeof(ShmSegment);
structsize += helper::PaddingToAlignOffset(structsize, sizeof(max_align_t));
if (!m_Rank)
{
m_Win = m_Comm.Win_allocate_shared(sizeof(ShmSegment), 1, &ptr);
blocksize +=
helper::PaddingToAlignOffset(blocksize, sizeof(max_align_t));
size_t totalsize = structsize + 2 * blocksize;
if (totalsize > maxsegmentsize)
{
// roll back and calculate sizes from maxsegmentsize
totalsize = maxsegmentsize - sizeof(max_align_t) + 1;
totalsize +=
helper::PaddingToAlignOffset(totalsize, sizeof(max_align_t));
blocksize = (totalsize - structsize) / 2 - sizeof(max_align_t) + 1;
blocksize +=
helper::PaddingToAlignOffset(blocksize, sizeof(max_align_t));
totalsize = structsize + 2 * blocksize;
}
m_Win = m_Comm.Win_allocate_shared(totalsize, 1, &ptr);
}

if (m_Rank)
else
{
m_Win = m_Comm.Win_allocate_shared(0, 1, &ptr);
size_t shmsize;
int disp_unit;
m_Comm.Win_shared_query(m_Win, 0, &shmsize, &disp_unit, &ptr);
blocksize = (shmsize - structsize) / 2;
}
m_Shm = reinterpret_cast<ShmSegment *>(ptr);
m_Shm->producerBuffer = LastBufferUsed::None;
m_Shm->consumerBuffer = LastBufferUsed::None;
m_Shm->NumBuffersFull = 0;
m_Shm->sdbA.buf = nullptr;
m_Shm->sdbA.max_size = SHM_BUF_SIZE;
m_Shm->sdbB.buf = nullptr;
m_Shm->sdbB.max_size = SHM_BUF_SIZE;
m_ShmBufA = ptr + structsize;
m_ShmBufB = m_ShmBufA + blocksize;

if (!m_Rank)
{
m_Shm->producerBuffer = LastBufferUsed::None;
m_Shm->consumerBuffer = LastBufferUsed::None;
m_Shm->NumBuffersFull = 0;
m_Shm->sdbA.buf = nullptr;
m_Shm->sdbA.max_size = blocksize;
m_Shm->sdbB.buf = nullptr;
m_Shm->sdbB.max_size = blocksize;
}
/*std::cout << "Rank " << m_Rank << " shm = " << ptr
<< " bufA = " << static_cast<void *>(m_Shm->bufA)
<< " bufB = " << static_cast<void *>(m_Shm->bufB) << std::endl;*/
Expand Down Expand Up @@ -293,14 +321,14 @@ MPIShmChain::ShmDataBuffer *MPIShmChain::LockProducerBuffer()
m_Shm->producerBuffer = LastBufferUsed::B;
sdb = &m_Shm->sdbB;
// point to shm data buffer (in local process memory)
sdb->buf = m_Shm->bufB;
sdb->buf = m_ShmBufB;
}
else // None or B
{
m_Shm->producerBuffer = LastBufferUsed::A;
sdb = &m_Shm->sdbA;
// point to shm data buffer (in local process memory)
sdb->buf = m_Shm->bufA;
sdb->buf = m_ShmBufA;
}
m_Shm->lockSegment.unlock();

Expand Down Expand Up @@ -353,14 +381,14 @@ MPIShmChain::ShmDataBuffer *MPIShmChain::LockConsumerBuffer()
m_Shm->consumerBuffer = LastBufferUsed::B;
sdb = &m_Shm->sdbB;
// point to shm data buffer (in local process memory)
sdb->buf = m_Shm->bufB;
sdb->buf = m_ShmBufB;
}
else // None or B
{
m_Shm->consumerBuffer = LastBufferUsed::A;
sdb = &m_Shm->sdbA;
// point to shm data buffer (in local process memory)
sdb->buf = m_Shm->bufA;
sdb->buf = m_ShmBufA;
}
m_Shm->lockSegment.unlock();

Expand Down
14 changes: 9 additions & 5 deletions source/adios2/toolkit/aggregator/mpi/MPIShmChain.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class Spinlock
std::atomic_flag flag_; //{ATOMIC_FLAG_INIT};
};

constexpr size_t SHM_BUF_SIZE = 4194304; // 4MB
// constexpr size_t SHM_BUF_SIZE = 4194304; // 4MB
// we allocate 2x this size + a bit for shared memory segment

/** A one- or two-layer aggregator chain for using Shared memory within a
Expand Down Expand Up @@ -129,6 +129,10 @@ class MPIShmChain : public MPIAggregator
void UnlockConsumerBuffer();
void ResetBuffers() noexcept;

// 2*blocksize+some is allocated but only up to maxsegmentsize
void CreateShm(size_t blocksize, const size_t maxsegmentsize);
void DestroyShm();

private:
struct HandshakeStruct
{
Expand All @@ -142,8 +146,6 @@ class MPIShmChain : public MPIAggregator
void HandshakeLinks_Complete(HandshakeStruct &hs);

helper::Comm::Win m_Win;
void CreateShm();
void DestroyShm();

enum class LastBufferUsed
{
Expand All @@ -165,10 +167,12 @@ class MPIShmChain : public MPIAggregator
aggregator::Spinlock lockA;
aggregator::Spinlock lockB;
// the actual data buffers
char bufA[SHM_BUF_SIZE];
char bufB[SHM_BUF_SIZE];
// char bufA[SHM_BUF_SIZE];
// char bufB[SHM_BUF_SIZE];
};
ShmSegment *m_Shm;
char *m_ShmBufA;
char *m_ShmBufB;
};

} // end namespace aggregator
Expand Down