Skip to content

Commit

Permalink
Pre-allocate error vector in TRY (facebookincubator#9986)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator#9986

TRY(CAST(...)) is up to 4x slower than TRY_CAST when many rows fail.
The profile reveals that significant percentage of cpu time goes to
EvalCtx::ensureErrorsVectorSize. For every row that fails, we call
EvalCtx::ensureErrorsVectorSize to resize the error vector to accommodate that
row. When many rows fail we end up resizing a lot: resize(1), resize(2), resize
(3),....resize(n). Fix this by pre-allocating error vector in TryExpr.

An earlier attempt at fixing this facebookincubator#9911 caused 2x memory regression in one of the streaming pipelines. The change
was reverted: facebookincubator#9971

The regression was due to TRY starting to allocate 'nulls' buffer in results
unconditionally. Even if there were no errors, TRY would still allocate 'nulls'
buffer. When result is a boolean vector, allocating unnecessary 'nulls' buffer
increases memory usage for 'result' by 2x. This fix makes sure not to do that
and adds a test.

Also, this change creates ErrorVector with only nulls buffer allocated.
The 'values' buffer that requires ~20 bytes per row is allocated only if an
error occurs.

Before:

```
============================================================================
[...]hmarks/ExpressionBenchmarkBuilder.cpp     relative  time/iter   iters/s
============================================================================
cast##try_cast_invalid_empty_input                          2.27ms    440.97
cast##tryexpr_cast_invalid_empty_input                      8.96ms    111.56
cast##try_cast_invalid_nan                                  5.49ms    182.26
cast##tryexpr_cast_invalid_nan                             12.96ms     77.17
```

After:

```
cast##try_cast_invalid_empty_input                          2.22ms    451.34
cast##tryexpr_cast_invalid_empty_input                      4.52ms    221.06
cast##try_cast_invalid_nan                                  5.79ms    172.69
cast##tryexpr_cast_invalid_nan                              8.16ms    122.48
```

Reviewed By: xiaoxmeng, bikramSingh91

Differential Revision: D57968341

fbshipit-source-id: d9f44aeda56596d9efb035ff9fada5eae22bea1d
  • Loading branch information
mbasmanova authored and Joe-Abraham committed Jun 7, 2024
1 parent 57c8944 commit b27cca6
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 5 deletions.
5 changes: 3 additions & 2 deletions velox/expression/EvalCtx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,14 @@ void EvalCtx::saveAndReset(ContextSaver& saver, const SelectivityVector& rows) {
void EvalCtx::ensureErrorsVectorSize(ErrorVectorPtr& vector, vector_size_t size)
const {
if (!vector) {
// Do not allocate 'values' buffer. It uses ~20 bytes per row and it may not
// be needed.
vector = std::make_shared<ErrorVector>(
pool(),
OpaqueType::create<void>(),
allocateNulls(size, pool(), bits::kNull),
size,
AlignedBuffer::allocate<ErrorVector::value_type>(
size, pool(), ErrorVector::value_type()),
nullptr,
std::vector<BufferPtr>{});
} else if (vector->size() < size) {
const auto oldSize = vector->size();
Expand Down
18 changes: 15 additions & 3 deletions velox/expression/TryExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ void TryExpr::evalSpecialForm(
// threw exceptions which this expression already handled.
ScopedVarSetter<ErrorVectorPtr> errorsSetter(context.errorsPtr(), nullptr);

// Allocate error vector to avoid repeated re-allocations for every failed
// row.
context.ensureErrorsVectorSize(rows.end());

inputs_[0]->eval(rows, context, result);

nullOutErrors(rows, context, result);
Expand Down Expand Up @@ -108,6 +112,14 @@ void TryExpr::nullOutErrors(
return;
}

const auto numErrors = errors->size();
const auto firstErrorRow = bits::findFirstBit(
errors->rawNulls(), rows.begin(), std::min(numErrors, rows.end()));
if (firstErrorRow < 0) {
// All rows are null. No errors.
return;
}

applyListenersOnError(rows, context);

if (result->isConstantEncoding()) {
Expand All @@ -124,7 +136,7 @@ void TryExpr::nullOutErrors(
auto nulls = allocateNulls(size, context.pool());
auto rawNulls = nulls->asMutable<uint64_t>();
rows.applyToSelected([&](auto row) {
if (row < errors->size() && !errors->isNullAt(row)) {
if (row < numErrors && !errors->isNullAt(row)) {
bits::setNull(rawNulls, row, true);
}
});
Expand All @@ -135,7 +147,7 @@ void TryExpr::nullOutErrors(
} else if (result.unique() && result->isNullsWritable()) {
auto* rawNulls = result->mutableRawNulls();
rows.applyToSelected([&](auto row) {
if (row < errors->size() && !errors->isNullAt(row)) {
if (row < numErrors && !errors->isNullAt(row)) {
bits::setNull(rawNulls, row, true);
}
});
Expand All @@ -147,7 +159,7 @@ void TryExpr::nullOutErrors(

rows.applyToSelected([&](auto row) {
rawIndices[row] = row;
if (row < errors->size() && !errors->isNullAt(row)) {
if (row < numErrors && !errors->isNullAt(row)) {
bits::setNull(rawNulls, row, true);
}
});
Expand Down
78 changes: 78 additions & 0 deletions velox/expression/tests/TryExprTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,22 @@ class TryExprTest : public functions::test::FunctionBaseTest {
FunctionBaseTest::SetUpTestCase();
TestValue::enable();
}

VectorPtr evaluateWithCustomMemoryPool(
const std::string& sql,
const RowVectorPtr& data,
memory::MemoryPool* pool) {
auto typedExpr = makeTypedExpr(sql, asRowType(data->type()));

core::ExecCtx execCtx{pool, queryCtx_.get()};
exec::ExprSet exprSet({typedExpr}, &execCtx);
exec::EvalCtx context(&execCtx, &exprSet, data.get());

std::vector<VectorPtr> result(1);
SelectivityVector allRows(data->size());
exprSet.eval(allRows, context, result);
return result[0];
}
};

TEST_F(TryExprTest, tryExpr) {
Expand Down Expand Up @@ -489,4 +505,66 @@ DEBUG_ONLY_TEST_F(TryExprTest, errorRestoringContext) {
VELOX_ASSERT_THROW(
evaluate("try(always_throws(c0))", data), exceptionMessage);
}

// Verify memory usage increase from wrapping an expression in TRY.
//
// When wrapping non-throwing expression memory usage increase should not exceed
// the amount of memory needed to store 1 bit per row (to track whether an
// error occurred or not).
//
// When wrapping throwing expression memory usage increase should not exceed the
// amount of memory needed to store 1 bit + 1
// std::shared_ptr<std::exception_ptr> per row (16 bytes).
//
// We also account for the fact that memory is allocated in minimum chunks
// (MemoryPool::preferredSize).
TEST_F(TryExprTest, memoryUsage) {
vector_size_t size = 10'000;

auto data = makeRowVector({makeFlatVector<int64_t>(size, folly::identity)});

// Measure memory usage without TRY.
int64_t baseline;

{
auto pool = rootPool_->addLeafChild("test-memory-usage");
auto result = evaluateWithCustomMemoryPool("c0 + 1", data, pool.get());
ASSERT_FALSE(result->mayHaveNulls());

baseline = pool->peakBytes();
}

// Measure memory usage with TRY over non-throwing expression.
{
// Memory allocation is not precisise. There is some padding, rounding and
// quantizing that makes it hard to tell exactly how much memory is
// allocated. Given that we only need 1 bit per row, allowing 4 bits per row
// seems conservative enough.
const auto expectedIncrease = size / 2;

auto pool = rootPool_->addLeafChild("test-memory-usage");
auto result = evaluateWithCustomMemoryPool("try(c0 + 1)", data, pool.get());
ASSERT_FALSE(result->mayHaveNulls());

ASSERT_GE(pool->peakBytes(), baseline);
ASSERT_LE(pool->peakBytes(), baseline + expectedIncrease)
<< (pool->peakBytes() - baseline);
}

// Measure memory usage with TRY over expression that throws from every row.
{
// We need 16 bytes + 1 bit per row. Allow 20 bytes to account for extra
// padding, rounding and quantiziing.
const auto expectedIncrease = 20 * size;

auto pool = rootPool_->addLeafChild("test-memory-usage");
auto result = evaluateWithCustomMemoryPool("try(c0 / 0)", data, pool.get());
ASSERT_TRUE(result->mayHaveNulls());

ASSERT_GE(pool->peakBytes(), baseline);
ASSERT_LE(pool->peakBytes(), baseline + expectedIncrease)
<< (pool->peakBytes() - baseline);
}
}

} // namespace facebook::velox

0 comments on commit b27cca6

Please sign in to comment.