Skip to content

Commit

Permalink
make zfp buffer self-contained and backward-compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
JasonRuonanWang committed Sep 27, 2021
1 parent 047cf9f commit 80abbc9
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 20 deletions.
42 changes: 42 additions & 0 deletions source/adios2/core/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
/// \cond EXCLUDE_FROM_DOXYGEN
#include <cstring>
#include <functional>
#include <iostream>
#include <string>
#include <vector>
/// \endcond
Expand Down Expand Up @@ -125,6 +126,47 @@ class Operator
return ret;
}

template <typename U>
void PutParameters(char *buffer, U &pos, const Params &parameters)
{
uint8_t size = static_cast<uint8_t>(parameters.size());
PutParameter(buffer, pos, size);
for (const auto &p : parameters)
{
size = static_cast<uint8_t>(p.first.size());
PutParameter(buffer, pos, size);

std::memcpy(buffer + pos, p.first.data(), size);
pos += size;

size = static_cast<uint8_t>(p.second.size());
PutParameter(buffer, pos, size);

std::memcpy(buffer + pos, p.second.data(), size);
pos += size;
}
}

template <typename U>
Params GetParameters(const char *buffer, U &pos)
{
Params ret;
uint8_t params = GetParameter<uint8_t>(buffer, pos);
for (uint8_t i = 0; i < params; ++i)
{
uint8_t size = GetParameter<uint8_t>(buffer, pos);
std::string key =
std::string(reinterpret_cast<const char *>(buffer + pos), size);
pos += size;
size = GetParameter<uint8_t>(buffer, pos);
std::string value =
std::string(reinterpret_cast<const char *>(buffer + pos), size);
pos += size;
ret[key] = value;
}
return ret;
}

private:
void CheckCallbackType(const std::string type) const;
};
Expand Down
4 changes: 2 additions & 2 deletions source/adios2/operator/compress/CompressBZIP2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ size_t CompressBZIP2::DecompressV1(const char *bufferIn, const size_t sizeIn,

size_t bufferInOffset = 4; // skip the first four bytes

size_t sizeOut = GetParameter<size_t>(bufferIn, bufferInOffset);
size_t batches = GetParameter<size_t>(bufferIn, bufferInOffset);
size_t sizeOut = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
size_t batches = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);

int small = 0;
int verbosity = 0;
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/operator/compress/CompressBlosc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ size_t CompressBlosc::DecompressV1(const char *bufferIn, const size_t sizeIn,
// DecompressV2 and keep this function for decompressing lagacy data.

size_t bufferInOffset = 0;
size_t sizeOut = GetParameter<size_t>(bufferIn, bufferInOffset);
size_t sizeOut = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
if (sizeIn - bufferInOffset < sizeof(DataHeader))
{
throw("corrupted blosc buffer header");
Expand Down
4 changes: 2 additions & 2 deletions source/adios2/operator/compress/CompressMGARD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,11 @@ size_t CompressMGARD::DecompressV1(const char *bufferIn, const size_t sizeIn,

size_t bufferInOffset = 0;

const size_t ndims = GetParameter<size_t>(bufferIn, bufferInOffset);
const size_t ndims = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
Dims blockCount(ndims);
for (size_t i = 0; i < ndims; ++i)
{
blockCount[i] = GetParameter<size_t>(bufferIn, bufferInOffset);
blockCount[i] = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
}
const DataType type = GetParameter<DataType>(bufferIn, bufferInOffset);

Expand Down
4 changes: 2 additions & 2 deletions source/adios2/operator/compress/CompressSZ.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,11 @@ size_t CompressSZ::DecompressV1(const char *bufferIn, const size_t sizeIn,

size_t bufferInOffset = 0;

const size_t ndims = GetParameter<size_t>(bufferIn, bufferInOffset);
const size_t ndims = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
Dims blockCount(ndims);
for (size_t i = 0; i < ndims; ++i)
{
blockCount[i] = GetParameter<size_t>(bufferIn, bufferInOffset);
blockCount[i] = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
}
const DataType type = GetParameter<DataType>(bufferIn, bufferInOffset);

Expand Down
6 changes: 3 additions & 3 deletions source/adios2/operator/compress/CompressSirius.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,16 @@ size_t CompressSirius::DecompressV1(const char *bufferIn, const size_t sizeIn,
// DecompressV2 and keep this function for decompressing lagacy data.

size_t bufferInOffset = 0;
const size_t ndims = GetParameter<size_t>(bufferIn, bufferInOffset);
const size_t ndims = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
Dims blockStart(ndims);
Dims blockCount(ndims);
for (size_t i = 0; i < ndims; ++i)
{
blockStart[i] = GetParameter<size_t>(bufferIn, bufferInOffset);
blockStart[i] = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
}
for (size_t i = 0; i < ndims; ++i)
{
blockCount[i] = GetParameter<size_t>(bufferIn, bufferInOffset);
blockCount[i] = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
}
const DataType type = GetParameter<DataType>(bufferIn, bufferInOffset);

Expand Down
85 changes: 75 additions & 10 deletions source/adios2/operator/compress/CompressZFP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,33 @@ size_t CompressZFP::Compress(const char *dataIn, const Dims &blockStart,
Params &info)
{

Dims convertedDims = ConvertDims(blockCount, type, 3);
const uint8_t bufferVersion = 1;
size_t bufferOutOffset = 0;

// Universal operator metadata
PutParameter(bufferOut, bufferOutOffset, OperatorType::Sz);
PutParameter(bufferOut, bufferOutOffset, bufferVersion);
bufferOutOffset += 2;
// Universal operator metadata end

const size_t ndims = blockCount.size();

// zfp V1 metadata
PutParameter(bufferOut, bufferOutOffset, ndims);
for (const auto &d : blockCount)
{
PutParameter(bufferOut, bufferOutOffset, d);
}
PutParameter(bufferOut, bufferOutOffset, type);
PutParameters(bufferOut, bufferOutOffset, parameters);
// zfp V1 metadata end

Dims convertedDims = ConvertDims(blockCount, type, 3);
zfp_field *field = GetZFPField(dataIn, convertedDims, type);
zfp_stream *stream = GetZFPStream(convertedDims, type, parameters);
size_t maxSize = zfp_stream_maximum_size(stream, field);
// associate bitstream
bitstream *bitstream = stream_open(bufferOut, maxSize);
bitstream *bitstream = stream_open(bufferOut + bufferOutOffset, maxSize);
zfp_stream_set_bit_stream(stream, bitstream);
zfp_stream_rewind(stream);

Expand All @@ -46,24 +66,41 @@ size_t CompressZFP::Compress(const char *dataIn, const Dims &blockStart,
"size is 0, in call to Compress");
}

bufferOutOffset += sizeOut;

zfp_field_free(field);
zfp_stream_close(stream);
stream_close(bitstream);
return sizeOut;
return bufferOutOffset;
}

size_t CompressZFP::Decompress(const char *bufferIn, const size_t sizeIn,
char *dataOut, const DataType type,
const Dims &blockStart, const Dims &blockCount,
const Params &parameters, Params &info)
size_t CompressZFP::DecompressV1(const char *bufferIn, const size_t sizeIn,
char *dataOut)
{
// Do NOT remove even if the buffer version is updated. Data might be still
// in lagacy formats. This function must be kept for backward compatibility.
// If a newer buffer format is implemented, create another function, e.g.
// DecompressV2 and keep this function for decompressing lagacy data.

size_t bufferInOffset = 0;

const size_t ndims = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
Dims blockCount(ndims);
for (size_t i = 0; i < ndims; ++i)
{
blockCount[i] = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
}
const DataType type = GetParameter<DataType>(bufferIn, bufferInOffset);
const Params parameters = GetParameters(bufferIn, bufferInOffset);

Dims convertedDims = ConvertDims(blockCount, type, 3);

zfp_field *field = GetZFPField(dataOut, convertedDims, type);
zfp_stream *stream = GetZFPStream(convertedDims, type, parameters);

// associate bitstream
bitstream *bitstream = stream_open(const_cast<char *>(bufferIn), sizeIn);
bitstream *bitstream = stream_open(
const_cast<char *>(bufferIn + bufferInOffset), sizeIn - bufferInOffset);
zfp_stream_set_bit_stream(stream, bitstream);
zfp_stream_rewind(stream);

Expand All @@ -80,13 +117,41 @@ size_t CompressZFP::Decompress(const char *bufferIn, const size_t sizeIn,
zfp_stream_close(stream);
stream_close(bitstream);

const size_t typeSizeBytes = helper::GetDataTypeSize(type);
const size_t dataSizeBytes =
helper::GetTotalSize(convertedDims) * typeSizeBytes;
helper::GetTotalSize(convertedDims, helper::GetDataTypeSize(type));

return dataSizeBytes;
}

size_t CompressZFP::Decompress(const char *bufferIn, const size_t sizeIn,
char *dataOut, const DataType /*type*/,
const Dims & /*blockStart*/,
const Dims & /*blockCount*/,
const Params & /*parameters*/, Params &info)
{
size_t bufferInOffset = 1; // skip operator type
const uint8_t bufferVersion =
GetParameter<uint8_t>(bufferIn, bufferInOffset);
bufferInOffset += 2; // skip two reserved bytes

if (bufferVersion == 1)
{
return DecompressV1(bufferIn + bufferInOffset, sizeIn - bufferInOffset,
dataOut);
}
else if (bufferVersion == 2)
{
// TODO: if a Version 2 zfp buffer is being implemented, put it here
// and keep the DecompressV1 routine for backward compatibility
}
else
{
throw("unknown zfp buffer version");
}

return 0;
}

bool CompressZFP::IsDataTypeValid(const DataType type) const
{
#define declare_type(T) \
Expand Down
12 changes: 12 additions & 0 deletions source/adios2/operator/compress/CompressZFP.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ class CompressZFP : public Operator
* @param hint extra exception information
*/
void CheckStatus(const int status, const std::string hint) const;

/**
* Decompress function for V1 buffer. Do NOT remove even if the buffer
* version is updated. Data might be still in lagacy formats. This function
* must be kept for backward compatibility
* @param bufferIn : compressed data buffer (V1 only)
* @param sizeIn : number of bytes in bufferIn
* @param dataOut : decompressed data buffer
* @return : number of bytes in dataOut
*/
size_t DecompressV1(const char *bufferIn, const size_t sizeIn,
char *dataOut);
};

} // end namespace compress
Expand Down

0 comments on commit 80abbc9

Please sign in to comment.