Skip to content

Commit

Permalink
Add initialized bit for aggregates in RowContainer (facebookincubator…
Browse files Browse the repository at this point in the history
…#9067)

Summary:
Pull Request resolved: facebookincubator#9067

The way aggregates are used in RowContainer, the typical flow is that a group of rows are allocated in the RowContainer, then
accumulators are allocated column wise across all rows via initializeNewGroups.

This leaves a window of time where the accumulators are not null but also not non-null they simply aren't initialized while new
rows are being allocated before  initializeNewGroups has been called.  During this time if an exception occurs (e.g. an OOM
allocating a row), when the RowContainer is destructed as part of stack unwinding, it may call freeAggregates which will
attempt to destroy the accumulators that were never created leading to crashes.

We can't simply set the null bit for the accumulators because
1) Some aggregates initialize an accumulator even if the column is null, and therefore still need to destroy it.
2) Aggregates maintain a nullCount_ internally, if this is 0 null bits are ignored.  Incrementing this for each aggregate on each
row creation would have a performance cost, much like allocating the accumulator on each row would.

In order to handle this, I've added an initialized bit along side the null bit for every aggregation.  This is initially set to 0 by
RowContainer's initializeRow function.  When initializeNewGroups is called Aggregate flips this bit to 1.  It also flips it back to
0 when destroy is called.  Destroy checks this bit before destroying an accumulator to determine if it has been initialized.

To minimize the changes needed in Aggregate implementations, I've made initializeNewGroups and destroy non-virtual in
Aggregate, they call new virtual methods initializeNewGroupsInternal and destroyInternal which are drop in replacements for
the original functions in the implementations.  This way Aggregate handles flipping the initialized bit in one place.

The only other change needed was in the few existing Aggregates that destroy their accumulator on their own (rather than
through Aggregate's destroyAccumulator function) I added a check for the initialized bit.

This diff is unfortunately large because of the need to update every Aggregate function to rename the functions, and the
need to update the signature of setOffsets to add the initialized bit offset.  The key changes are in
* RowContainer.h/.cpp
* Aggregate.h/.cpp

Reviewed By: xiaoxmeng

Differential Revision: D54862289

fbshipit-source-id: 7549a374530976ba24c9d1308e3db616f5925b36
  • Loading branch information
Kevin Wilfong authored and Joe-Abraham committed Apr 4, 2024
1 parent 5dc6108 commit 748e4c2
Show file tree
Hide file tree
Showing 52 changed files with 890 additions and 506 deletions.
33 changes: 29 additions & 4 deletions velox/docs/develop/aggregate-functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,21 @@ location of the accumulator.
// @param offset Offset in bytes from the start of the row of the accumulator
// @param nullByte Offset in bytes from the start of the row of the null flag
// @param nullMask The specific bit in the nullByte that stores the null flag
void setOffsets(int32_t offset, int32_t nullByte, uint8_t nullMask)
// @param initializedByte Offset in bytes from the start of the row of the
// initialized flag
// @param initializedMask The specific bit in the initializedByte that stores
// the initialized flag
// @param rowSizeOffset The offset of a uint32_t row size from the start of
// the row. Only applies to accumulators that store variable size data out of
// line. Fixed length accumulators do not use this. 0 if the row does not have
// a size field.
void setOffsets(
int32_t offset,
int32_t nullByte,
uint8_t nullMask,
int32_t initializedByte,
int8_t initializedMask,
int32_t rowSizeOffset)

The base class implements the setOffsets method by storing the offsets in member variables.

Expand All @@ -480,9 +494,20 @@ The base class implements the setOffsets method by storing the offsets in member
// Byte position of null flag in group row.
int32_t nullByte_;
uint8_t nullMask_;
// Byte position of the initialized flag in group row.
int32_t initializedByte_;
uint8_t initializedMask_;
// Offset of fixed length accumulator state in group row.
int32_t offset_;

// Offset of uint32_t row byte size of row. 0 if there are no
// variable width fields or accumulators on the row. The size is
// capped at 4G and will stay at 4G and not wrap around if growing
// past this. This serves to track the batch size when extracting
// rows. A size in excess of 4G would finish the batch in any case,
// so larger values need not be represented.
int32_t rowSizeOffset_ = 0;

Typically, an aggregate function doesn’t use the offsets directly. Instead, it uses helper methods from the base class.

To access the accumulator:
Expand All @@ -509,14 +534,14 @@ To manipulate the null flags:
Initialization
^^^^^^^^^^^^^^

Once you have accumulatorFixedWidthSize(), the next method to implement is initializeNewGroups().
Once you have accumulatorFixedWidthSize(), the next method to implement is initializeNewGroupsInternal().

.. code-block:: c++

// Initializes null flags and accumulators for newly encountered groups.
// @param groups Pointers to the start of the new group rows.
// @param indices Indices into 'groups' of the new entries.
virtual void initializeNewGroups(
virtual void initializeNewGroupsInternal(
char** groups,
folly::Range<const vector_size_t*> indices) = 0;

Expand All @@ -525,7 +550,7 @@ This method is called by the HashAggregation operator every time it encounters n
GroupBy aggregation
^^^^^^^^^^^^^^^^^^^

At this point you have accumulatorFixedWidthSize() and initializeNewGroups() methods implemented. Now, we can proceed to implementing the end-to-end group-by aggregation. We need the following pieces:
At this point you have accumulatorFixedWidthSize() and initializeNewGroupsInternal() methods implemented. Now, we can proceed to implementing the end-to-end group-by aggregation. We need the following pieces:

* Logic for adding raw input to the accumulator:
* addRawInput() method.
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/Aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,13 @@ void Aggregate::setOffsetsInternal(
int32_t offset,
int32_t nullByte,
uint8_t nullMask,
int32_t initializedByte,
uint8_t initializedMask,
int32_t rowSizeOffset) {
nullByte_ = nullByte;
nullMask_ = nullMask;
initializedByte_ = initializedByte;
initializedMask_ = initializedMask;
offset_ = offset;
rowSizeOffset_ = rowSizeOffset;
}
Expand Down
62 changes: 55 additions & 7 deletions velox/exec/Aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ class Aggregate {
// @param offset Offset in bytes from the start of the row of the accumulator
// @param nullByte Offset in bytes from the start of the row of the null flag
// @param nullMask The specific bit in the nullByte that stores the null flag
// @param initializedByte Offset in bytes from the start of the row of the
// initialized flag
// @param initializedMask The specific bit in the initializedByte that stores
// the initialized flag
// @param rowSizeOffset The offset of a uint32_t row size from the start of
// the row. Only applies to accumulators that store variable size data out of
// line. Fixed length accumulators do not use this. 0 if the row does not have
Expand All @@ -113,8 +117,16 @@ class Aggregate {
int32_t offset,
int32_t nullByte,
uint8_t nullMask,
int32_t initializedByte,
int8_t initializedMask,
int32_t rowSizeOffset) {
setOffsetsInternal(offset, nullByte, nullMask, rowSizeOffset);
setOffsetsInternal(
offset,
nullByte,
nullMask,
initializedByte,
initializedMask,
rowSizeOffset);
}

// Initializes null flags and accumulators for newly encountered groups. This
Expand All @@ -124,7 +136,13 @@ class Aggregate {
// @param indices Indices into 'groups' of the new entries.
virtual void initializeNewGroups(
char** groups,
folly::Range<const vector_size_t*> indices) = 0;
folly::Range<const vector_size_t*> indices) {
initializeNewGroupsInternal(groups, indices);

for (auto index : indices) {
groups[index][initializedByte_] |= initializedMask_;
}
}

// Single Aggregate instance is able to take both raw data and
// intermediate result as input based on the assumption that Partial
Expand Down Expand Up @@ -242,8 +260,14 @@ class Aggregate {
}

// Frees any out of line storage for the accumulator in
// 'groups'. No-op for fixed length accumulators.
virtual void destroy(folly::Range<char**> /*groups*/) {}
// 'groups' and marks the aggregate as uninitialized.
virtual void destroy(folly::Range<char**> groups) {
destroyInternal(groups);

for (auto* group : groups) {
group[initializedByte_] &= ~initializedMask_;
}
}

// Clears state between reuses, e.g. this is called before reusing
// the aggregation operator's state after flushing a partial
Expand Down Expand Up @@ -294,10 +318,25 @@ class Aggregate {
int32_t offset,
int32_t nullByte,
uint8_t nullMask,
int32_t initializedByte,
uint8_t initializedMask,
int32_t rowSizeOffset);

virtual void clearInternal();

// Initializes null flags and accumulators for newly encountered groups. This
// function should be called only once for each group.
//
// @param groups Pointers to the start of the new group rows.
// @param indices Indices into 'groups' of the new entries.
virtual void initializeNewGroupsInternal(
char** groups,
folly::Range<const vector_size_t*> indices) = 0;

// Frees any out of line storage for the accumulator in
// 'groups'. No-op for fixed length accumulators.
virtual void destroyInternal(folly::Range<char**> groups) {}

// Shorthand for maintaining accumulator variable length size in
// accumulator update methods. Use like: { auto tracker =
// trackRowSize(group); update(group); }
Expand All @@ -310,6 +349,10 @@ class Aggregate {
return numNulls_ && (group[nullByte_] & nullMask_);
}

bool isInitialized(char* group) const {
return group[initializedByte_] & initializedMask_;
}

// Sets null flag for all specified groups to true.
// For any given group, this method can be called at most once.
void setAllNulls(char** groups, folly::Range<const vector_size_t*> indices) {
Expand Down Expand Up @@ -351,9 +394,11 @@ class Aggregate {

template <typename T>
void destroyAccumulator(char* group) const {
auto accumulator = value<T>(group);
std::destroy_at(accumulator);
memset(accumulator, 0, sizeof(T));
if (isInitialized(group)) {
auto accumulator = value<T>(group);
std::destroy_at(accumulator);
::memset(accumulator, 0, sizeof(T));
}
}

template <typename T>
Expand Down Expand Up @@ -384,6 +429,9 @@ class Aggregate {
// Byte position of null flag in group row.
int32_t nullByte_;
uint8_t nullMask_;
// Byte position of the initialized flag in group row.
int32_t initializedByte_;
uint8_t initializedMask_;
// Offset of fixed length accumulator state in group row.
int32_t offset_;

Expand Down
23 changes: 22 additions & 1 deletion velox/exec/AggregateCompanionAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,16 @@ void AggregateCompanionFunctionBase::setOffsetsInternal(
int32_t offset,
int32_t nullByte,
uint8_t nullMask,
int32_t initializedByte,
uint8_t initializedMask,
int32_t rowSizeOffset) {
fn_->setOffsets(offset, nullByte, nullMask, rowSizeOffset);
fn_->setOffsets(
offset,
nullByte,
nullMask,
initializedByte,
initializedMask,
rowSizeOffset);
}

int32_t AggregateCompanionFunctionBase::accumulatorFixedWidthSize() const {
Expand Down Expand Up @@ -56,6 +64,11 @@ void AggregateCompanionFunctionBase::destroy(folly::Range<char**> groups) {
fn_->destroy(groups);
}

void AggregateCompanionFunctionBase::destroyInternal(
folly::Range<char**> groups) {
fn_->destroy(groups);
}

void AggregateCompanionFunctionBase::clearInternal() {
fn_->clear();
}
Expand All @@ -66,6 +79,12 @@ void AggregateCompanionFunctionBase::initializeNewGroups(
fn_->initializeNewGroups(groups, indices);
}

void AggregateCompanionFunctionBase::initializeNewGroupsInternal(
char** groups,
folly::Range<const vector_size_t*> indices) {
fn_->initializeNewGroups(groups, indices);
}

void AggregateCompanionFunctionBase::addRawInput(
char** groups,
const SelectivityVector& rows,
Expand Down Expand Up @@ -154,6 +173,8 @@ int32_t AggregateCompanionAdapter::ExtractFunction::setOffset() const {
offset,
RowContainer::nullByte(0),
RowContainer::nullMask(0),
RowContainer::initializedByte(0),
RowContainer::initializedMask(0),
rowSizeOffset);
return offset;
}
Expand Down
8 changes: 8 additions & 0 deletions velox/exec/AggregateCompanionAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,20 @@ class AggregateCompanionFunctionBase : public Aggregate {
int32_t offset,
int32_t nullByte,
uint8_t nullMask,
int32_t initializedByte,
uint8_t initializedMask,
int32_t rowSizeOffset) override final;

void setAllocatorInternal(HashStringAllocator* allocator) override final;

void clearInternal() override final;

void initializeNewGroupsInternal(
char** groups,
folly::Range<const vector_size_t*> indices) override final;

void destroyInternal(folly::Range<char**> groups) override final;

std::unique_ptr<Aggregate> fn_;
};

Expand Down
10 changes: 6 additions & 4 deletions velox/exec/AggregateWindow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,19 @@ class AggregateWindowFunction : public exec::WindowFunction {
//
// Here we always make space for a row size since we only have one
// row and no RowContainer. We also have a single aggregate here, so there
// is only one null bit.
static const int32_t kNullOffset = 0;
// is only one null bit and one initialized bit.
static const int32_t kAccumulatorFlagsOffset = 0;
static const int32_t kRowSizeOffset = bits::nbytes(1);
singleGroupRowSize_ = kRowSizeOffset + sizeof(int32_t);
// Accumulator offset must be aligned by their alignment size.
singleGroupRowSize_ = bits::roundUp(
singleGroupRowSize_, aggregate_->accumulatorAlignmentSize());
aggregate_->setOffsets(
singleGroupRowSize_,
exec::RowContainer::nullByte(kNullOffset),
exec::RowContainer::nullMask(kNullOffset),
exec::RowContainer::nullByte(kAccumulatorFlagsOffset),
exec::RowContainer::nullMask(kAccumulatorFlagsOffset),
exec::RowContainer::initializedByte(kAccumulatorFlagsOffset),
exec::RowContainer::initializedMask(kAccumulatorFlagsOffset),
/* needed for out of line allocations */ kRowSizeOffset);
singleGroupRowSize_ += aggregate_->accumulatorFixedWidthSize();

Expand Down
29 changes: 15 additions & 14 deletions velox/exec/DistinctAggregations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,6 @@ class TypedDistinctAggregations : public DistinctAggregations {
}};
}

void initializeNewGroups(
char** groups,
folly::Range<const vector_size_t*> indices) override {
for (auto i : indices) {
groups[i][nullByte_] |= nullMask_;
new (groups[i] + offset_) AccumulatorType(inputType_, allocator_);
}

for (auto i = 0; i < aggregates_.size(); ++i) {
const auto& aggregate = *aggregates_[i];
aggregate.function->initializeNewGroups(groups, indices);
}
}

void addInput(
char** groups,
const RowVectorPtr& input,
Expand Down Expand Up @@ -148,6 +134,21 @@ class TypedDistinctAggregations : public DistinctAggregations {
}
}

protected:
void initializeNewGroupsInternal(
char** groups,
folly::Range<const vector_size_t*> indices) override {
for (auto i : indices) {
groups[i][nullByte_] |= nullMask_;
new (groups[i] + offset_) AccumulatorType(inputType_, allocator_);
}

for (auto i = 0; i < aggregates_.size(); ++i) {
const auto& aggregate = *aggregates_[i];
aggregate.function->initializeNewGroups(groups, indices);
}
}

private:
bool isSingleInputAggregate() const {
return aggregates_[0]->inputs.size() == 1;
Expand Down
Loading

0 comments on commit 748e4c2

Please sign in to comment.