Skip to content

Commit

Permalink
Fix the issue of ExecBatchBuilder when appending consecutive tail r…
Browse files Browse the repository at this point in the history
…ows with the same id may exceed buffer boundary
  • Loading branch information
zanmato1984 committed Dec 14, 2023
1 parent 4653918 commit 8edc1d1
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 1 deletion.
6 changes: 5 additions & 1 deletion cpp/src/arrow/compute/light_array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,12 @@ int ExecBatchBuilder::NumRowsToSkip(const std::shared_ptr<ArrayData>& column,
--num_rows_left;
int row_id_removed = row_ids[num_rows_left];
const uint32_t* offsets =
reinterpret_cast<const uint32_t*>(column->buffers[1]->data());
reinterpret_cast<const uint32_t*>(column->buffers[1]->data()) + column->offset;
num_bytes_skipped += offsets[row_id_removed + 1] - offsets[row_id_removed];
// Skip consecutive rows with the same id
while (num_rows_left > 0 && row_id_removed == row_ids[num_rows_left - 1]) {
--num_rows_left;
}
}
}

Expand Down
25 changes: 25 additions & 0 deletions cpp/src/arrow/compute/light_array_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,31 @@ TEST(ExecBatchBuilder, AppendBatchesSomeRows) {
ASSERT_EQ(0, pool->bytes_allocated());
}

TEST(ExecBatchBuilder, AppendBatchDupRows) {
std::unique_ptr<MemoryPool> owned_pool = MemoryPool::CreateDefault();
MemoryPool* pool = owned_pool.get();
// Case of cross-word copying for the last row, which may exceed the buffer boundary.
{
// 64-byte data fully occupying one minimal 64-byte aligned memory region.
ExecBatch batch_string = JSONToExecBatch({binary()}, R"([["123456789ABCDEF0"],
["123456789ABCDEF0"],
["123456789ABCDEF0"],
["ABCDEF0"],
["123456789"]])"); // 9-byte tail row, larger than a word.
ASSERT_EQ(batch_string[0].array()->buffers[1]->capacity(), 64);
ASSERT_EQ(batch_string[0].array()->buffers[2]->capacity(), 64);
ExecBatchBuilder builder;
uint16_t row_ids[2] = {4, 4};
ASSERT_OK(builder.AppendSelected(pool, batch_string, 2, row_ids, /*num_cols=*/1));
ExecBatch built = builder.Flush();
ExecBatch batch_string_appended =
JSONToExecBatch({binary()}, R"([["123456789"], ["123456789"]])");
ASSERT_EQ(batch_string_appended, built);
ASSERT_NE(0, pool->bytes_allocated());
}
ASSERT_EQ(0, pool->bytes_allocated());
}

TEST(ExecBatchBuilder, AppendBatchesSomeCols) {
std::unique_ptr<MemoryPool> owned_pool = MemoryPool::CreateDefault();
MemoryPool* pool = owned_pool.get();
Expand Down

0 comments on commit 8edc1d1

Please sign in to comment.