Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Buffer::slice API #11001

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions velox/buffer/Buffer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/buffer/Buffer.h"

namespace facebook::velox {

namespace {
struct BufferReleaser {
explicit BufferReleaser(const BufferPtr& parent) : parent_(parent) {}
void addRef() const {}
void release() const {}

private:
BufferPtr parent_;
};
} // namespace

BufferPtr Buffer::sliceBufferZeroCopy(
size_t typeSize,
bool podType,
const BufferPtr& buffer,
size_t offset,
size_t length) {
auto bytesOffset = checkedMultiply(offset, typeSize);
auto bytesLength = checkedMultiply(length, typeSize);
VELOX_CHECK_LE(
bytesOffset,
buffer->size(),
"Offset must be less than or equal to {}.",
buffer->size() / typeSize);
VELOX_CHECK_LE(
bytesLength,
buffer->size() - bytesOffset,
"Length must be less than or equal to {}.",
buffer->size() / typeSize - offset);
// Cannot use `Buffer::as<uint8_t>()` here because Buffer::podType_ is false
// when type is OPAQUE.
auto data =
reinterpret_cast<const uint8_t*>(buffer->as<void>()) + bytesOffset;
return BufferView<BufferReleaser>::create(
data, bytesLength, BufferReleaser(buffer), podType);
}

template <>
BufferPtr Buffer::slice<bool>(
const BufferPtr& buffer,
size_t offset,
size_t length,
memory::MemoryPool* pool) {
VELOX_CHECK_NOT_NULL(buffer, "Buffer must not be null.");

if (offset % 8 == 0) {
return sliceBufferZeroCopy(
1, true, buffer, bits::nbytes(offset), bits::nbytes(length));
}
VELOX_CHECK_NOT_NULL(pool, "Pool must not be null.");
auto ans = AlignedBuffer::allocate<bool>(length, pool);
bits::copyBits(
buffer->as<uint64_t>(), offset, ans->asMutable<uint64_t>(), 0, length);
return ans;
}
} // namespace facebook::velox
42 changes: 41 additions & 1 deletion velox/buffer/Buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@
namespace facebook {
namespace velox {

class Buffer;
class AlignedBuffer;

using BufferPtr = boost::intrusive_ptr<Buffer>;

// Represents vector payloads, like arrays of numbers or strings or
// associated null flags. Buffers are reference counted and must be
// held by BufferPtr. Buffers can either own their memory or can be
Expand Down Expand Up @@ -172,6 +175,30 @@ class Buffer {
return os;
}

/// Slice a buffer with specific type T.
/// For boolean type and if the 'offset' is not multiple of 8, return a
/// shifted copy, new buffer is allocated from 'pool'.
/// Otherwise return a BufferView into the original buffer (with shared
/// ownership of original buffer).
///
/// @param buffer A pointer to the buffer to be sliced. Must not be null.
/// @param offset The element position in the buffer where the slice begins.
/// Must be less or equal than the buffer size.
/// @param length The number of elements to include in the slice. Must be
/// less or equal than the buffer size - 'offset'.
/// @param pool A pointer to a memory pool for allocating new buffers,
/// required if a new buffer needs to be created.
template <typename T>
static BufferPtr slice(
const BufferPtr& buffer,
size_t offset,
size_t length,
memory::MemoryPool* pool) {
VELOX_CHECK_NOT_NULL(buffer, "Buffer must not be null.");
return sliceBufferZeroCopy(
sizeof(T), is_pod_like_v<T>, buffer, offset, length);
}

protected:
// Writes a magic word at 'capacity_'. No-op for a BufferView. The actual
// logic is inside a separate virtual function, allowing override by derived
Expand Down Expand Up @@ -246,6 +273,14 @@ class Buffer {
uint64_t padding_[2] = {static_cast<uint64_t>(-1), static_cast<uint64_t>(-1)};
// Needs to use setCapacity() from static method reallocate().
friend class AlignedBuffer;

private:
static BufferPtr sliceBufferZeroCopy(
size_t typeSize,
bool podType,
const BufferPtr& buffer,
size_t offset,
size_t length);
};

static_assert(
Expand All @@ -262,7 +297,12 @@ inline MutableRange<bool> Buffer::asMutableRange<bool>() {
return MutableRange<bool>(asMutable<uint64_t>(), 0, size() * 8);
}

using BufferPtr = boost::intrusive_ptr<Buffer>;
template <>
BufferPtr Buffer::slice<bool>(
const BufferPtr& buffer,
size_t offset,
size_t length,
memory::MemoryPool* pool);

static inline void intrusive_ptr_add_ref(Buffer* buffer) {
buffer->addRef();
Expand Down
2 changes: 1 addition & 1 deletion velox/buffer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

velox_add_library(velox_buffer StringViewBufferHolder.cpp)
velox_add_library(velox_buffer Buffer.cpp StringViewBufferHolder.cpp)

velox_link_libraries(velox_buffer velox_memory velox_common_base Folly::folly)

Expand Down
38 changes: 38 additions & 0 deletions velox/buffer/tests/BufferTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -453,5 +453,43 @@ TEST_F(BufferTest, testAllocateSizeOverflow) {
AlignedBuffer::reallocate<int64_t>(&buf, 1ull << 62), VeloxException);
}

TEST_F(BufferTest, sliceBigintBuffer) {
auto bufferPtr = AlignedBuffer::allocate<int64_t>(10, pool_.get());
auto sliceBufferPtr = Buffer::slice<int64_t>(bufferPtr, 1, 5, pool_.get());
ASSERT_TRUE(sliceBufferPtr->isView());
ASSERT_EQ(sliceBufferPtr->size(), 40); // 5 * type size of int64_t.
ASSERT_EQ(sliceBufferPtr->as<int64_t>(), bufferPtr->as<int64_t>() + 1);

VELOX_ASSERT_THROW(
Buffer::slice<int64_t>(bufferPtr, 11, 1, pool_.get()),
"Offset must be less than or equal to 10.");
VELOX_ASSERT_THROW(
Buffer::slice<int64_t>(bufferPtr, 5, 6, pool_.get()),
"Length must be less than or equal to 5.");
VELOX_ASSERT_THROW(
Buffer::slice<int64_t>(nullptr, 5, 6, pool_.get()),
"Buffer must not be null.");
}

TEST_F(BufferTest, sliceBooleanBuffer) {
auto bufferPtr = AlignedBuffer::allocate<bool>(16, pool_.get());
auto data = bufferPtr->asMutableRange<bool>();
for (int i = 0; i < 16; ++i) {
data[i] = (i % 2 != 0);
}
auto sliceBufferPtr = Buffer::slice<bool>(bufferPtr, 8, 8, pool_.get());
ASSERT_TRUE(sliceBufferPtr->isView());
ASSERT_EQ(sliceBufferPtr->as<bool>(), bufferPtr->as<bool>() + 1);

sliceBufferPtr = Buffer::slice<bool>(bufferPtr, 5, 5, pool_.get());
ASSERT_FALSE(sliceBufferPtr->isView());
auto sliceData = sliceBufferPtr->asRange<bool>();
for (int i = 0; i < 5; ++i) {
ASSERT_EQ(sliceData[i], i % 2 == 0);
}
VELOX_ASSERT_THROW(
Buffer::slice<bool>(bufferPtr, 5, 6, nullptr), "Pool must not be null.");
}

} // namespace velox
} // namespace facebook
64 changes: 0 additions & 64 deletions velox/vector/BaseVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -917,70 +917,6 @@ void BaseVector::validate(const VectorValidateOptions& options) const {
}
}

namespace {

size_t typeSize(const Type& type) {
switch (type.kind()) {
case TypeKind::VARCHAR:
case TypeKind::VARBINARY:
return sizeof(StringView);
case TypeKind::OPAQUE:
return sizeof(std::shared_ptr<void>);
default:
VELOX_DCHECK(type.isPrimitiveType(), type.toString());
return type.cppSizeInBytes();
}
}

struct BufferReleaser {
explicit BufferReleaser(const BufferPtr& parent) : parent_(parent) {}
void addRef() const {}
void release() const {}

private:
BufferPtr parent_;
};

BufferPtr sliceBufferZeroCopy(
size_t typeSize,
bool podType,
const BufferPtr& buf,
vector_size_t offset,
vector_size_t length) {
// Cannot use `Buffer::as<uint8_t>()` here because Buffer::podType_ is false
// when type is OPAQUE.
auto data =
reinterpret_cast<const uint8_t*>(buf->as<void>()) + offset * typeSize;
return BufferView<BufferReleaser>::create(
data, length * typeSize, BufferReleaser(buf), podType);
}

} // namespace

// static
BufferPtr BaseVector::sliceBuffer(
const Type& type,
const BufferPtr& buf,
vector_size_t offset,
vector_size_t length,
memory::MemoryPool* pool) {
if (!buf) {
return nullptr;
}
if (type.kind() != TypeKind::BOOLEAN) {
return sliceBufferZeroCopy(
typeSize(type), type.isPrimitiveType(), buf, offset, length);
}
if (offset % 8 == 0) {
return sliceBufferZeroCopy(1, true, buf, offset / 8, (length + 7) / 8);
}
VELOX_DCHECK_NOT_NULL(pool);
auto ans = AlignedBuffer::allocate<bool>(length, pool);
bits::copyBits(
buf->as<uint64_t>(), offset, ans->asMutable<uint64_t>(), 0, length);
return ans;
}

std::optional<vector_size_t> BaseVector::findDuplicateValue(
vector_size_t start,
vector_size_t size,
Expand Down
14 changes: 1 addition & 13 deletions velox/vector/BaseVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -882,20 +882,8 @@ class BaseVector {
ensureNullsCapacity(length_, true);
}

// Slice a buffer with specific type.
//
// For boolean type and if the offset is not multiple of 8, return a shifted
// copy; otherwise return a BufferView into the original buffer (with shared
// ownership of original buffer).
static BufferPtr sliceBuffer(
const Type&,
const BufferPtr&,
vector_size_t offset,
vector_size_t length,
memory::MemoryPool*);

BufferPtr sliceNulls(vector_size_t offset, vector_size_t length) const {
return sliceBuffer(*BOOLEAN(), nulls_, offset, length, pool_);
return nulls_ ? Buffer::slice<bool>(nulls_, offset, length, pool_) : nulls_;
}

TypePtr type_;
Expand Down
12 changes: 8 additions & 4 deletions velox/vector/ComplexVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1186,8 +1186,10 @@ VectorPtr ArrayVector::slice(vector_size_t offset, vector_size_t length) const {
type_,
sliceNulls(offset, length),
length,
sliceBuffer(*INTEGER(), offsets_, offset, length, pool_),
sliceBuffer(*INTEGER(), sizes_, offset, length, pool_),
offsets_ ? Buffer::slice<vector_size_t>(offsets_, offset, length, pool_)
: offsets_,
sizes_ ? Buffer::slice<vector_size_t>(sizes_, offset, length, pool_)
: sizes_,
elements_);
}

Expand Down Expand Up @@ -1485,8 +1487,10 @@ VectorPtr MapVector::slice(vector_size_t offset, vector_size_t length) const {
type_,
sliceNulls(offset, length),
length,
sliceBuffer(*INTEGER(), offsets_, offset, length, pool_),
sliceBuffer(*INTEGER(), sizes_, offset, length, pool_),
offsets_ ? Buffer::slice<vector_size_t>(offsets_, offset, length, pool_)
: offsets_,
sizes_ ? Buffer::slice<vector_size_t>(sizes_, offset, length, pool_)
: sizes_,
keys_,
values_);
}
Expand Down
5 changes: 3 additions & 2 deletions velox/vector/DictionaryVector-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,9 @@ VectorPtr DictionaryVector<T>::slice(vector_size_t offset, vector_size_t length)
this->sliceNulls(offset, length),
length,
valueVector(),
BaseVector::sliceBuffer(
*INTEGER(), indices_, offset, length, this->pool_));
indices_
? Buffer::slice<vector_size_t>(indices_, offset, length, this->pool_)
: indices_);
}

template <typename T>
Expand Down
4 changes: 2 additions & 2 deletions velox/vector/FlatVector-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,8 @@ VectorPtr FlatVector<T>::slice(vector_size_t offset, vector_size_t length)
this->type_,
this->sliceNulls(offset, length),
length,
BaseVector::sliceBuffer(
*this->type_, values_, offset, length, this->pool_),
values_ ? Buffer::slice<T>(values_, offset, length, this->pool_)
: values_,
std::vector<BufferPtr>(stringBuffers_));
}

Expand Down
Loading