Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre-allocate error vector in TRY #9986

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions velox/expression/EvalCtx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,14 @@ void EvalCtx::saveAndReset(ContextSaver& saver, const SelectivityVector& rows) {
void EvalCtx::ensureErrorsVectorSize(ErrorVectorPtr& vector, vector_size_t size)
const {
if (!vector) {
// Do not allocate 'values' buffer. It uses ~20 bytes per row and it may not
// be needed.
vector = std::make_shared<ErrorVector>(
pool(),
OpaqueType::create<void>(),
allocateNulls(size, pool(), bits::kNull),
size,
AlignedBuffer::allocate<ErrorVector::value_type>(
size, pool(), ErrorVector::value_type()),
nullptr,
std::vector<BufferPtr>{});
} else if (vector->size() < size) {
const auto oldSize = vector->size();
Expand Down
18 changes: 15 additions & 3 deletions velox/expression/TryExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ void TryExpr::evalSpecialForm(
// threw exceptions which this expression already handled.
ScopedVarSetter<ErrorVectorPtr> errorsSetter(context.errorsPtr(), nullptr);

// Allocate error vector to avoid repeated re-allocations for every failed
// row.
context.ensureErrorsVectorSize(rows.end());

inputs_[0]->eval(rows, context, result);

nullOutErrors(rows, context, result);
Expand Down Expand Up @@ -108,6 +112,14 @@ void TryExpr::nullOutErrors(
return;
}

const auto numErrors = errors->size();
const auto firstErrorRow = bits::findFirstBit(
errors->rawNulls(), rows.begin(), std::min(numErrors, rows.end()));
if (firstErrorRow < 0) {
// All rows are null. No errors.
return;
}

applyListenersOnError(rows, context);

if (result->isConstantEncoding()) {
Expand All @@ -124,7 +136,7 @@ void TryExpr::nullOutErrors(
auto nulls = allocateNulls(size, context.pool());
auto rawNulls = nulls->asMutable<uint64_t>();
rows.applyToSelected([&](auto row) {
if (row < errors->size() && !errors->isNullAt(row)) {
if (row < numErrors && !errors->isNullAt(row)) {
bits::setNull(rawNulls, row, true);
}
});
Expand All @@ -135,7 +147,7 @@ void TryExpr::nullOutErrors(
} else if (result.unique() && result->isNullsWritable()) {
auto* rawNulls = result->mutableRawNulls();
rows.applyToSelected([&](auto row) {
if (row < errors->size() && !errors->isNullAt(row)) {
if (row < numErrors && !errors->isNullAt(row)) {
bits::setNull(rawNulls, row, true);
}
});
Expand All @@ -147,7 +159,7 @@ void TryExpr::nullOutErrors(

rows.applyToSelected([&](auto row) {
rawIndices[row] = row;
if (row < errors->size() && !errors->isNullAt(row)) {
if (row < numErrors && !errors->isNullAt(row)) {
bits::setNull(rawNulls, row, true);
}
});
Expand Down
78 changes: 78 additions & 0 deletions velox/expression/tests/TryExprTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,22 @@ class TryExprTest : public functions::test::FunctionBaseTest {
FunctionBaseTest::SetUpTestCase();
TestValue::enable();
}

VectorPtr evaluateWithCustomMemoryPool(
const std::string& sql,
const RowVectorPtr& data,
memory::MemoryPool* pool) {
auto typedExpr = makeTypedExpr(sql, asRowType(data->type()));

core::ExecCtx execCtx{pool, queryCtx_.get()};
exec::ExprSet exprSet({typedExpr}, &execCtx);
exec::EvalCtx context(&execCtx, &exprSet, data.get());

std::vector<VectorPtr> result(1);
SelectivityVector allRows(data->size());
exprSet.eval(allRows, context, result);
return result[0];
}
};

TEST_F(TryExprTest, tryExpr) {
Expand Down Expand Up @@ -489,4 +505,66 @@ DEBUG_ONLY_TEST_F(TryExprTest, errorRestoringContext) {
VELOX_ASSERT_THROW(
evaluate("try(always_throws(c0))", data), exceptionMessage);
}

// Verify memory usage increase from wrapping an expression in TRY.
//
// When wrapping non-throwing expression memory usage increase should not exceed
// the amount of memory needed to store 1 bit per row (to track whether an
// error occurred or not).
//
// When wrapping throwing expression memory usage increase should not exceed the
// amount of memory needed to store 1 bit + 1
// std::shared_ptr<std::exception_ptr> per row (16 bytes).
//
// We also account for the fact that memory is allocated in minimum chunks
// (MemoryPool::preferredSize).
TEST_F(TryExprTest, memoryUsage) {
vector_size_t size = 10'000;

auto data = makeRowVector({makeFlatVector<int64_t>(size, folly::identity)});

// Measure memory usage without TRY.
int64_t baseline;

{
auto pool = rootPool_->addLeafChild("test-memory-usage");
auto result = evaluateWithCustomMemoryPool("c0 + 1", data, pool.get());
ASSERT_FALSE(result->mayHaveNulls());

baseline = pool->peakBytes();
}

// Measure memory usage with TRY over non-throwing expression.
{
// Memory allocation is not precisise. There is some padding, rounding and
// quantizing that makes it hard to tell exactly how much memory is
// allocated. Given that we only need 1 bit per row, allowing 4 bits per row
// seems conservative enough.
const auto expectedIncrease = size / 2;

auto pool = rootPool_->addLeafChild("test-memory-usage");
auto result = evaluateWithCustomMemoryPool("try(c0 + 1)", data, pool.get());
ASSERT_FALSE(result->mayHaveNulls());

ASSERT_GE(pool->peakBytes(), baseline);
ASSERT_LE(pool->peakBytes(), baseline + expectedIncrease)
<< (pool->peakBytes() - baseline);
}

// Measure memory usage with TRY over expression that throws from every row.
{
// We need 16 bytes + 1 bit per row. Allow 20 bytes to account for extra
// padding, rounding and quantiziing.
const auto expectedIncrease = 20 * size;

auto pool = rootPool_->addLeafChild("test-memory-usage");
auto result = evaluateWithCustomMemoryPool("try(c0 / 0)", data, pool.get());
ASSERT_TRUE(result->mayHaveNulls());

ASSERT_GE(pool->peakBytes(), baseline);
ASSERT_LE(pool->peakBytes(), baseline + expectedIncrease)
<< (pool->peakBytes() - baseline);
}
}

} // namespace facebook::velox
Loading