Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine SSC buffer resize #2671

Merged
merged 5 commits into from
Apr 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 6 additions & 20 deletions source/adios2/engine/ssc/SscHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,7 @@ void SerializeVariables(const BlockVec &input, Buffer &output, const int rank)
for (const auto &b : input)
{
uint64_t pos = output.value<uint64_t>();
if (pos + 1024 > output.capacity())
{
output.reserve((pos + 1024) * 2);
}
if (pos == 0)
{
pos += 8;
}
output.resize(pos + 1024);

output.value<uint8_t>(pos) = static_cast<uint8_t>(b.shapeId);
++pos;
Expand Down Expand Up @@ -189,14 +182,7 @@ void SerializeAttributes(IO &input, Buffer &output)
for (const auto &attributePair : attributeMap)
{
uint64_t pos = output.value<uint64_t>();
if (pos + 1024 > output.capacity())
{
output.reserve((pos + 1024) * 2);
}
if (pos == 0)
{
pos += 8;
}
output.resize(pos + 1024);

if (attributePair.second->m_Type == DataType::String)
{
Expand Down Expand Up @@ -432,7 +418,7 @@ void AggregateMetadata(const Buffer &localBuffer, Buffer &globalBuffer,
std::vector<int> localSizes(mpiSize);
MPI_Gather(&localSize, 1, MPI_INT, localSizes.data(), 1, MPI_INT, 0, comm);
int globalSize = std::accumulate(localSizes.begin(), localSizes.end(), 0);
globalBuffer.reserve(globalSize + 10);
globalBuffer.resize(globalSize + 10);

std::vector<int> displs(mpiSize);
for (size_t i = 1; i < mpiSize; ++i)
Expand All @@ -450,11 +436,11 @@ void AggregateMetadata(const Buffer &localBuffer, Buffer &globalBuffer,

void BroadcastMetadata(Buffer &globalBuffer, const int root, MPI_Comm comm)
{
int globalBufferSize = static_cast<int>(globalBuffer.capacity());
int globalBufferSize = static_cast<int>(globalBuffer.size());
MPI_Bcast(&globalBufferSize, 1, MPI_INT, root, comm);
if (globalBuffer.capacity() < globalBufferSize)
if (globalBuffer.size() < globalBufferSize)
{
globalBuffer.reserve(globalBufferSize);
globalBuffer.resize(globalBufferSize);
}
MPI_Bcast(globalBuffer.data(), globalBufferSize, MPI_CHAR, root, comm);
}
Expand Down
29 changes: 17 additions & 12 deletions source/adios2/engine/ssc/SscHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,38 @@ namespace ssc
class Buffer
{
public:
Buffer()
{
m_Buffer = reinterpret_cast<uint8_t *>(malloc(1));
m_Capacity = 1;
}
Buffer(size_t capacity)
Buffer(const size_t capacity = 1)
{
m_Buffer = reinterpret_cast<uint8_t *>(malloc(capacity));
m_Capacity = capacity;
m_Size = 0;
}
~Buffer()
{
if (m_Buffer)
{
free(m_Buffer);
m_Capacity = 0;
m_Buffer = nullptr;
}
}
void clear()
{
m_Buffer = reinterpret_cast<uint8_t *>(realloc(m_Buffer, 1));
m_Capacity = 1;
m_Size = 0;
}
void reserve(size_t capacity)
void resize(const size_t size)
{
m_Buffer = reinterpret_cast<uint8_t *>(realloc(m_Buffer, capacity));
m_Capacity = capacity;
m_Size = size;
if (size > m_Capacity)
{
m_Capacity = size * 2;
m_Buffer =
reinterpret_cast<uint8_t *>(realloc(m_Buffer, m_Capacity));
}
if (m_Buffer == nullptr)
{
throw("ssc buffer realloc failed");
}
}
template <typename T>
T &value(const size_t pos = 0)
Expand Down Expand Up @@ -94,7 +98,7 @@ class Buffer
{
return reinterpret_cast<const uint8_t *>(m_Buffer + pos);
}
size_t capacity() const { return m_Capacity; }
size_t size() const { return m_Size; }
uint8_t &operator[](const size_t pos) { return *(m_Buffer + pos); }
const uint8_t &operator[](const size_t pos) const
{
Expand All @@ -103,6 +107,7 @@ class Buffer

private:
size_t m_Capacity = 0;
size_t m_Size = 0;
uint8_t *m_Buffer = nullptr;
};

Expand Down
10 changes: 5 additions & 5 deletions source/adios2/engine/ssc/SscReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void SscReader::BeginStepConsequentFixed()
void SscReader::BeginStepFlexible(StepStatus &status)
{
m_AllReceivingWriterRanks.clear();
m_Buffer.reserve(1);
m_Buffer.resize(1);
m_Buffer[0] = 0;
m_GlobalWritePattern.clear();
m_GlobalWritePattern.resize(m_StreamSize);
Expand Down Expand Up @@ -200,7 +200,7 @@ void SscReader::PerformGets()
{
totalDataSize += i.second.second;
}
m_Buffer.reserve(totalDataSize);
m_Buffer.resize(totalDataSize);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Win_lock(MPI_LOCK_SHARED, i.first, 0, m_MpiWin);
Expand Down Expand Up @@ -278,7 +278,7 @@ void SscReader::EndStepFixed()
{
MPI_Win_free(&m_MpiWin);
SyncReadPattern();
MPI_Win_create(m_Buffer.data(), m_Buffer.capacity(), 1, MPI_INFO_NULL,
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
}
if (m_MpiMode == "twosided")
Expand Down Expand Up @@ -461,7 +461,7 @@ void SscReader::SyncReadPattern()
}

ssc::Buffer localBuffer(8);
localBuffer.value<uint64_t>() = 0;
localBuffer.value<uint64_t>() = 8;

ssc::SerializeVariables(m_LocalReadPattern, localBuffer, m_StreamRank);

Expand All @@ -485,7 +485,7 @@ void SscReader::SyncReadPattern()
{
totalDataSize += i.second.second;
}
m_Buffer.reserve(totalDataSize);
m_Buffer.resize(totalDataSize);

if (m_Verbosity >= 20)
{
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/engine/ssc/SscWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ void SscWriter::SyncWritePattern(bool finalStep)
}

ssc::Buffer localBuffer(8);
localBuffer.value<uint64_t>() = 0;
localBuffer.value<uint64_t>() = 8;

ssc::SerializeVariables(m_GlobalWritePattern[m_StreamRank], localBuffer,
m_StreamRank);
Expand Down