Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable PerformPuts for BP5 ChunkV #2784

Merged
merged 5 commits into from
Jul 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions source/adios2/engine/bp5/BP5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ StepStatus BP5Writer::BeginStep(StepMode mode, const float timeoutSeconds)
}
else
{
m_BP5Serializer.InitStep(new ChunkV("BP5Writer", true /* always copy */,
m_BP5Serializer.InitStep(new ChunkV("BP5Writer",
false /* always copy */,
m_Parameters.BufferChunkSize));
}
return StepStatus::OK;
Expand All @@ -64,6 +65,7 @@ size_t BP5Writer::CurrentStep() const { return m_WriterStep; }
void BP5Writer::PerformPuts()
{
PERFSTUBS_SCOPED_TIMER("BP5Writer::PerformPuts");
m_BP5Serializer.PerformPuts();
return;
}

Expand Down Expand Up @@ -692,7 +694,6 @@ void BP5Writer::DoFlush(const bool isFinal, const int transportIndex)
void BP5Writer::DoClose(const int transportIndex)
{
PERFSTUBS_SCOPED_TIMER("BP5Writer::Close");
PerformPuts();

DoFlush(true, transportIndex);

Expand Down
2 changes: 0 additions & 2 deletions source/adios2/engine/bp5/BP5Writer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
*
* BP5Writer.tcc implementation of template functions with known type
*
* Created on: Aug 1, 2018
* Author: Lipeng Wan wanl@ornl.gov
*/
#ifndef ADIOS2_ENGINE_BP5_BP5WRITER_TCC_
#define ADIOS2_ENGINE_BP5_BP5WRITER_TCC_
Expand Down
5 changes: 4 additions & 1 deletion source/adios2/toolkit/format/bp5/BP5Serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "adios2/core/IO.h"
#include "adios2/helper/adiosMemory.h"
#include "adios2/toolkit/format/buffer/ffs/BufferFFS.h"
#include <stddef.h> // max_align_t

#include <cstring>

Expand Down Expand Up @@ -429,6 +430,8 @@ size_t BP5Serializer::CalcSize(const size_t Count, const size_t *Vals)
return Elems;
}

void BP5Serializer::PerformPuts() { CurDataBuffer->CopyExternalToInternal(); }

void BP5Serializer::Marshal(void *Variable, const char *Name,
const DataType Type, size_t ElemSize,
size_t DimCount, const size_t *Shape,
Expand Down Expand Up @@ -681,7 +684,7 @@ BP5Serializer::TimestepInfo BP5Serializer::CloseTimestep(int timestep)
"BP5Serializer:: CloseTimestep without Prior Init");
}
MBase->DataBlockSize = CurDataBuffer->AddToVec(
0, NULL, 8, true); // output block size multiple of 8, offset is size
0, NULL, sizeof(max_align_t), true); // output block size aligned

void *MetaDataBlock = FFSencode(MetaEncodeBuffer, Info.MetaFormat,
MetadataBuf, &MetaDataSize);
Expand Down
1 change: 1 addition & 0 deletions source/adios2/toolkit/format/bp5/BP5Serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class BP5Serializer : virtual public BP5Base

void InitStep(BufferV *DataBuffer);
TimestepInfo CloseTimestep(int timestep);
void PerformPuts();

core::Engine *m_Engine = NULL;

Expand Down
7 changes: 7 additions & 0 deletions source/adios2/toolkit/format/buffer/BufferV.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "adios2/common/ADIOSConfig.h"
#include "adios2/common/ADIOSTypes.h"
#include <iostream>

namespace adios2
{
Expand All @@ -34,6 +35,12 @@ class BufferV

virtual BufferV_iovec DataVec() noexcept = 0;

/*
* This is used in PerformPuts() to copy externally referenced data so that
* it can be modified by the application
*/
virtual void CopyExternalToInternal() = 0;

/**
* Reset the buffer to initial state (without freeing internal buffers)
*/
Expand Down
53 changes: 52 additions & 1 deletion source/adios2/toolkit/format/buffer/chunk/ChunkV.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

#include "ChunkV.h"
#include "adios2/toolkit/format/buffer/BufferV.h"
#include <assert.h>
#include <iostream>
#include <stddef.h> // max_align_t
#include <string.h>

namespace adios2
Expand All @@ -30,14 +32,62 @@ ChunkV::~ChunkV()
}
}

void ChunkV::CopyExternalToInternal()
{
for (std::size_t i = 0; i < DataV.size(); ++i)
{
if (DataV[i].External)
{
size_t size = DataV[i].Size;
// we can possibly append this entry to the tail if the tail entry
// is internal
bool AppendPossible = DataV.size() && !DataV.back().External;

if (AppendPossible && (m_TailChunkPos + size > m_ChunkSize))
{
// No room in current chunk, close it out
// realloc down to used size (helpful?) and set size in array
m_Chunks.back() =
(char *)realloc(m_Chunks.back(), m_TailChunkPos);

m_TailChunkPos = 0;
m_TailChunk = NULL;
AppendPossible = false;
}
if (AppendPossible)
{
// We can use current chunk, just append the data and modify the
// DataV entry
memcpy(m_TailChunk + m_TailChunkPos, DataV[i].Base, size);
DataV[i].External = false;
DataV[i].Base = m_TailChunk + m_TailChunkPos;
m_TailChunkPos += size;
}
else
{
// We need a new chunk, get the larger of size or m_ChunkSize
size_t NewSize = m_ChunkSize;
if (size > m_ChunkSize)
NewSize = size;
m_TailChunk = (char *)malloc(NewSize);
m_Chunks.push_back(m_TailChunk);
memcpy(m_TailChunk, DataV[i].Base, size);
m_TailChunkPos = size;
DataV[i] = {false, m_TailChunk, 0, size};
}
}
}
}

size_t ChunkV::AddToVec(const size_t size, const void *buf, int align,
bool CopyReqd)
{
int badAlign = CurOffset % align;
if (badAlign)
{
int addAlign = align - badAlign;
char zero[16] = {0};
assert(addAlign < sizeof(max_align_t));
static char zero[sizeof(max_align_t)] = {0};
AddToVec(addAlign, zero, 1, true);
}
size_t retOffset = CurOffset;
Expand Down Expand Up @@ -81,6 +131,7 @@ size_t ChunkV::AddToVec(const size_t size, const void *buf, int align,
if (size > m_ChunkSize)
NewSize = size;
m_TailChunk = (char *)malloc(NewSize);
m_Chunks.push_back(m_TailChunk);
memcpy(m_TailChunk, buf, size);
m_TailChunkPos = size;
VecEntry entry = {false, m_TailChunk, 0, size};
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/toolkit/format/buffer/chunk/ChunkV.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class ChunkV : public BufferV
virtual size_t AddToVec(const size_t size, const void *buf, int align,
bool CopyReqd);

void CopyExternalToInternal();

private:
std::vector<char *> m_Chunks;
size_t m_TailChunkPos = 0;
Expand Down
50 changes: 49 additions & 1 deletion source/adios2/toolkit/format/buffer/malloc/MallocV.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#include "MallocV.h"
#include "adios2/toolkit/format/buffer/BufferV.h"
#include <assert.h>
#include <stddef.h> // max_align_t
#include <string.h>

namespace adios2
Expand Down Expand Up @@ -35,14 +37,60 @@ void MallocV::Reset()
DataV.clear();
}

/*
* This is used in PerformPuts() to copy externally referenced data
* so that it can be modified by the application. It does *not*
* change the metadata offset that was originally returned by
* AddToVec. That is, it relocates the data from application memory
* into the internal buffer, but it does not change the position of
* that data in the write order, which may result in non-contiguous
* writes from the internal buffer.
*/
void MallocV::CopyExternalToInternal()
{
for (std::size_t i = 0; i < DataV.size(); ++i)
{
if (DataV[i].External)
{
size_t size = DataV[i].Size;

/* force internal buffer alignment */
(void)AddToVec(0, NULL, sizeof(max_align_t), true);

if (m_internalPos + size > m_AllocatedSize)
{
// need to resize
size_t NewSize;
if (m_internalPos + size > m_AllocatedSize * m_GrowthFactor)
{
// just grow as needed (more than GrowthFactor)
NewSize = m_internalPos + size;
}
else
{
NewSize = (size_t)(m_AllocatedSize * m_GrowthFactor);
}
m_InternalBlock = (char *)realloc(m_InternalBlock, NewSize);
m_AllocatedSize = NewSize;
}
memcpy(m_InternalBlock + m_internalPos, DataV[i].Base, size);
DataV[i].External = false;
DataV[i].Base = NULL;
DataV[i].Offset = m_internalPos;
m_internalPos += size;
}
}
}

size_t MallocV::AddToVec(const size_t size, const void *buf, int align,
bool CopyReqd)
{
int badAlign = CurOffset % align;
if (badAlign)
{
int addAlign = align - badAlign;
char zero[16] = {0};
assert(addAlign < sizeof(max_align_t));
static char zero[sizeof(max_align_t)] = {0};
AddToVec(addAlign, zero, 1, true);
}
size_t retOffset = CurOffset;
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/toolkit/format/buffer/malloc/MallocV.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class MallocV : public BufferV
virtual size_t AddToVec(const size_t size, const void *buf, int align,
bool CopyReqd);

void CopyExternalToInternal();

private:
char *m_InternalBlock = NULL;
size_t m_AllocatedSize = 0;
Expand Down
5 changes: 3 additions & 2 deletions testing/adios2/engine/staging-common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ foreach(helper
TestCommonWriteAttrs
TestCommonWriteLocal
TestCommonWriteShared
TestDefSyncWrite
TestCommonRead
TestCommonReadR64
TestCommonReadLocal
Expand Down Expand Up @@ -149,9 +150,9 @@ if(ADIOS2_HAVE_SST)
endif()


# For the moment, only test the default comm pattern (Peer)
# For the moment, only test the default comm pattern (Min)
MutateTestSet( COMM_MIN_SST_TESTS "CommMin" writer "CPCommPattern=Min" "${BASIC_SST_TESTS}" )
MutateTestSet( COMM_PEER_SST_TESTS "CommPeer" writer "CPCommPattern=Peer" "${BASIC_SST_TESTS}" )
#MutateTestSet( COMM_PEER_SST_TESTS "CommPeer" writer "CPCommPattern=Peer" "${BASIC_SST_TESTS}" )

# temporarily remove PreciousTimestep CommPeer tests
list (REMOVE_ITEM COMM_PEER_SST_TESTS "PreciousTimestep")
Expand Down
2 changes: 2 additions & 0 deletions testing/adios2/engine/staging-common/ParseArgs.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ int NoData = 0;
int NoDataNode = -1;
int EarlyExit = 0;
int LocalCount = 1;
int DataSize = 4 * 1024 * 1024 / 8; /* DefaultMinDeferredSize is 4*1024*1024
This should be more than that. */

std::string shutdown_name = "DieTest";
adios2::Mode GlobalWriteMode = adios2::Mode::Deferred;
Expand Down
Loading