Skip to content

Commit

Permalink
PARQUET-438: Update RLE encoding tools and add unit tests from Impala
Browse files Browse the repository at this point in the history
This also changes the compiler options to use only SSE3. We can explore benchmarking to optionally enable SSE4 on supported systems for performance gains in the future.

@nongli would it be OK if I remove the unused delta encodings (they depend on code that is not in Impala anymore, or never way)? Are these encodings used extensively, or primarily PLAIN / DICTIONARY encoding?

Author: Wes McKinney <wes@cloudera.com>

Closes apache#31 from wesm/PARQUET-438 and squashes the following commits:

df9da64 [Wes McKinney] Add back (commented-out) ZigZig encode/decode impls
65c49bc [Wes McKinney] Fix comment
73c1dc1 [Wes McKinney] Fix cpplint errors post PARQUET-496
553ca0e [Wes McKinney] Fix cpplint errors
9528c83 [Wes McKinney] Updates RLE encoding routines and dependencies from upstream changes in Apache Impala (incubating). Add rle-test and bit-util-test modules unchanged from Impala.
  • Loading branch information
wesm committed Sep 2, 2018
1 parent ce2e746 commit afc8fb0
Show file tree
Hide file tree
Showing 11 changed files with 1,417 additions and 226 deletions.
7 changes: 4 additions & 3 deletions cpp/src/parquet/encodings/encodings.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ class Decoder {
#include "parquet/encodings/plain-encoding.h"
#include "parquet/encodings/dictionary-encoding.h"

#include "parquet/encodings/delta-bit-pack-encoding.h"
#include "parquet/encodings/delta-length-byte-array-encoding.h"
#include "parquet/encodings/delta-byte-array-encoding.h"
// The encoding tools changed and these are missing the ZigZag functions
// #include "parquet/encodings/delta-bit-pack-encoding.h"
// #include "parquet/encodings/delta-length-byte-array-encoding.h"
// #include "parquet/encodings/delta-byte-array-encoding.h"

#endif // PARQUET_ENCODINGS_ENCODINGS_H
6 changes: 6 additions & 0 deletions cpp/src/parquet/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ install(FILES
bit-stream-utils.h
bit-stream-utils.inline.h
bit-util.h
cpu-info.h
sse-info.h
compiler-util.h
logging.h
rle-encoding.h
Expand All @@ -29,6 +31,7 @@ install(FILES

add_library(parquet_util STATIC
input_stream.cc
cpu-info.cc
)

add_library(parquet_test_main
Expand All @@ -47,3 +50,6 @@ else()
pthread
)
endif()

ADD_PARQUET_TEST(bit-util-test)
ADD_PARQUET_TEST(rle-test)
108 changes: 61 additions & 47 deletions cpp/src/parquet/util/bit-stream-utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,28 @@
// specific language governing permissions and limitations
// under the License.

// From Apache Impala as of 2016-01-29

#ifndef PARQUET_UTIL_BIT_STREAM_UTILS_H
#define PARQUET_UTIL_BIT_STREAM_UTILS_H

#include <string.h>
#include <algorithm>
#include <cstdint>
#include <string.h>

#include "parquet/util/compiler-util.h"
#include "parquet/util/bit-util.h"
#include "parquet/util/logging.h"
#include "parquet/util/bit-util.h"

namespace parquet_cpp {

// Utility class to write bit/byte streams. This class can write data to either be
// bit packed or byte aligned (and a single stream that has a mix of both).
// This class does not allocate memory.
/// Utility class to write bit/byte streams. This class can write data to either be
/// bit packed or byte aligned (and a single stream that has a mix of both).
/// This class does not allocate memory.
class BitWriter {
public:
// buffer: buffer to write bits to. Buffer should be preallocated with
// 'buffer_len' bytes.
/// buffer: buffer to write bits to. Buffer should be preallocated with
/// 'buffer_len' bytes.
BitWriter(uint8_t* buffer, int buffer_len) :
buffer_(buffer),
max_bytes_(buffer_len) {
Expand All @@ -47,56 +49,56 @@ class BitWriter {
bit_offset_ = 0;
}

// The number of current bytes written, including the current byte (i.e. may include a
// fraction of a byte). Includes buffered values.
/// The number of current bytes written, including the current byte (i.e. may include a
/// fraction of a byte). Includes buffered values.
int bytes_written() const { return byte_offset_ + BitUtil::Ceil(bit_offset_, 8); }
uint8_t* buffer() const { return buffer_; }
int buffer_len() const { return max_bytes_; }

// Writes a value to buffered_values_, flushing to buffer_ if necessary. This is bit
// packed. Returns false if there was not enough space. num_bits must be <= 32.
/// Writes a value to buffered_values_, flushing to buffer_ if necessary. This is bit
/// packed. Returns false if there was not enough space. num_bits must be <= 32.
bool PutValue(uint64_t v, int num_bits);

// Writes v to the next aligned byte using num_bytes. If T is larger than num_bytes, the
// extra high-order bytes will be ignored. Returns false if there was not enough space.
/// Writes v to the next aligned byte using num_bytes. If T is larger than
/// num_bytes, the extra high-order bytes will be ignored. Returns false if
/// there was not enough space.
template<typename T>
bool PutAligned(T v, int num_bytes);

// Write a Vlq encoded int to the buffer. Returns false if there was not enough
// room. The value is written byte aligned.
// For more details on vlq:
// en.wikipedia.org/wiki/Variable-length_quantity
bool PutVlqInt(uint32_t v);
bool PutZigZagVlqInt(int32_t v);
/// Write a Vlq encoded int to the buffer. Returns false if there was not enough
/// room. The value is written byte aligned.
/// For more details on vlq:
/// en.wikipedia.org/wiki/Variable-length_quantity
bool PutVlqInt(int32_t v);

// Get a pointer to the next aligned byte and advance the underlying buffer
// by num_bytes.
// Returns NULL if there was not enough space.
/// Get a pointer to the next aligned byte and advance the underlying buffer
/// by num_bytes.
/// Returns NULL if there was not enough space.
uint8_t* GetNextBytePtr(int num_bytes = 1);

// Flushes all buffered values to the buffer. Call this when done writing to the buffer.
// If 'align' is true, buffered_values_ is reset and any future writes will be written
// to the next byte boundary.
/// Flushes all buffered values to the buffer. Call this when done writing to
/// the buffer. If 'align' is true, buffered_values_ is reset and any future
/// writes will be written to the next byte boundary.
void Flush(bool align = false);

private:
uint8_t* buffer_;
int max_bytes_;

// Bit-packed values are initially written to this variable before being memcpy'd to
// buffer_. This is faster than writing values byte by byte directly to buffer_.
/// Bit-packed values are initially written to this variable before being memcpy'd to
/// buffer_. This is faster than writing values byte by byte directly to buffer_.
uint64_t buffered_values_;

int byte_offset_; // Offset in buffer_
int bit_offset_; // Offset in buffered_values_
};

// Utility class to read bit/byte stream. This class can read bits or bytes
// that are either byte aligned or not. It also has utilities to read multiple
// bytes in one read (e.g. encoded int).
/// Utility class to read bit/byte stream. This class can read bits or bytes
/// that are either byte aligned or not. It also has utilities to read multiple
/// bytes in one read (e.g. encoded int).
class BitReader {
public:
// 'buffer' is the buffer to read from. The buffer's length is 'buffer_len'.
/// 'buffer' is the buffer to read from. The buffer's length is 'buffer_len'.
BitReader(const uint8_t* buffer, int buffer_len) :
buffer_(buffer),
max_bytes_(buffer_len),
Expand All @@ -108,36 +110,48 @@ class BitReader {

BitReader() : buffer_(NULL), max_bytes_(0) {}

// Gets the next value from the buffer. Returns true if 'v' could be read or false if
// there are not enough bytes left. num_bits must be <= 32.
void Reset(const uint8_t* buffer, int buffer_len) {
buffer_ = buffer;
max_bytes_ = buffer_len;
byte_offset_ = 0;
bit_offset_ = 0;
}

/// Gets the next value from the buffer. Returns true if 'v' could be read or false if
/// there are not enough bytes left. num_bits must be <= 32.
template<typename T>
bool GetValue(int num_bits, T* v);

// Reads a 'num_bytes'-sized value from the buffer and stores it in 'v'. T needs to be a
// little-endian native type and big enough to store 'num_bytes'. The value is assumed
// to be byte-aligned so the stream will be advanced to the start of the next byte
// before 'v' is read. Returns false if there are not enough bytes left.
/// Reads a 'num_bytes'-sized value from the buffer and stores it in 'v'. T
/// needs to be a little-endian native type and big enough to store
/// 'num_bytes'. The value is assumed to be byte-aligned so the stream will
/// be advanced to the start of the next byte before 'v' is read. Returns
/// false if there are not enough bytes left.
template<typename T>
bool GetAligned(int num_bytes, T* v);

// Reads a vlq encoded int from the stream. The encoded int must start at the
// beginning of a byte. Return false if there were not enough bytes in the buffer.
bool GetVlqInt(uint64_t* v);
bool GetZigZagVlqInt(int64_t* v);
/// Reads a vlq encoded int from the stream. The encoded int must start at
/// the beginning of a byte. Return false if there were not enough bytes in
/// the buffer.
bool GetVlqInt(int32_t* v);

// Returns the number of bytes left in the stream, not including the current byte (i.e.,
// there may be an additional fraction of a byte).
/// Returns the number of bytes left in the stream, not including the current
/// byte (i.e., there may be an additional fraction of a byte).
int bytes_left() { return max_bytes_ - (byte_offset_ + BitUtil::Ceil(bit_offset_, 8)); }

// Maximum byte length of a vlq encoded int
/// Maximum byte length of a vlq encoded int
static const int MAX_VLQ_BYTE_LEN = 5;

// TODO(nongli): implementations to be fixed given changes in Impala
// bool GetZigZagVlqInt(int64_t* v);
// bool PutZigZagVlqInt(int32_t v);

private:
const uint8_t* buffer_;
int max_bytes_;

// Bytes are memcpy'd from buffer_ and values are read from this variable. This is
// faster than reading values byte by byte directly from buffer_.
/// Bytes are memcpy'd from buffer_ and values are read from this variable. This is
/// faster than reading values byte by byte directly from buffer_.
uint64_t buffered_values_;

int byte_offset_; // Offset in buffer_
Expand All @@ -146,4 +160,4 @@ class BitReader {

} // namespace parquet_cpp

#endif
#endif // PARQUET_UTIL_BIT_STREAM_UTILS_H
32 changes: 19 additions & 13 deletions cpp/src/parquet/util/bit-stream-utils.inline.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

// From Apache Impala as of 2016-01-29

#ifndef PARQUET_UTIL_BIT_STREAM_UTILS_INLINE_H
#define PARQUET_UTIL_BIT_STREAM_UTILS_INLINE_H

Expand Down Expand Up @@ -73,7 +75,7 @@ inline bool BitWriter::PutAligned(T val, int num_bytes) {
return true;
}

inline bool BitWriter::PutVlqInt(uint32_t v) {
inline bool BitWriter::PutVlqInt(int32_t v) {
bool result = true;
while ((v & 0xFFFFFF80) != 0L) {
result &= PutAligned<uint8_t>((v & 0x7F) | 0x80, 1);
Expand All @@ -83,13 +85,9 @@ inline bool BitWriter::PutVlqInt(uint32_t v) {
return result;
}

inline bool BitWriter::PutZigZagVlqInt(int32_t v) {
uint32_t u = (v << 1) ^ (v >> 31);
return PutVlqInt(u);
}

template<typename T>
inline bool BitReader::GetValue(int num_bits, T* v) {
DCHECK(buffer_ != NULL);
// TODO: revisit this limit if necessary
DCHECK_LE(num_bits, 32);
DCHECK_LE(num_bits, sizeof(T) * 8);
Expand Down Expand Up @@ -140,7 +138,7 @@ inline bool BitReader::GetAligned(int num_bytes, T* v) {
return true;
}

inline bool BitReader::GetVlqInt(uint64_t* v) {
inline bool BitReader::GetVlqInt(int32_t* v) {
*v = 0;
int shift = 0;
int num_bytes = 0;
Expand All @@ -154,12 +152,20 @@ inline bool BitReader::GetVlqInt(uint64_t* v) {
return true;
}

inline bool BitReader::GetZigZagVlqInt(int64_t* v) {
uint64_t u;
if (!GetVlqInt(&u)) return false;
*reinterpret_cast<uint64_t*>(v) = (u >> 1) ^ -(u & 1);
return true;
}
// TODO(nongli): review/test these implementations given divergence in Impala
// functions

// inline bool BitWriter::PutZigZagVlqInt(int32_t v) {
// uint32_t u = (v << 1) ^ (v >> 31);
// return PutVlqInt(u);
// }

// inline bool BitReader::GetZigZagVlqInt(int64_t* v) {
// uint64_t u;
// if (!GetVlqInt(&u)) return false;
// *reinterpret_cast<uint64_t*>(v) = (u >> 1) ^ -(u & 1);
// return true;
// }

} // namespace parquet_cpp

Expand Down
Loading

0 comments on commit afc8fb0

Please sign in to comment.