From 8edc1d1621e725042886f0a78253b4bda64b9137 Mon Sep 17 00:00:00 2001 From: zanmato Date: Thu, 14 Dec 2023 10:58:50 -0800 Subject: [PATCH] Fix the issue of `ExecBatchBuilder` when appending consecutive tail rows with the same id may exceed buffer boundary --- cpp/src/arrow/compute/light_array.cc | 6 +++++- cpp/src/arrow/compute/light_array_test.cc | 25 +++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/light_array.cc b/cpp/src/arrow/compute/light_array.cc index 37d4421fd79f7..7aaf01ab9180d 100644 --- a/cpp/src/arrow/compute/light_array.cc +++ b/cpp/src/arrow/compute/light_array.cc @@ -395,8 +395,12 @@ int ExecBatchBuilder::NumRowsToSkip(const std::shared_ptr& column, --num_rows_left; int row_id_removed = row_ids[num_rows_left]; const uint32_t* offsets = - reinterpret_cast(column->buffers[1]->data()); + reinterpret_cast(column->buffers[1]->data()) + column->offset; num_bytes_skipped += offsets[row_id_removed + 1] - offsets[row_id_removed]; + // Skip consecutive rows with the same id + while (num_rows_left > 0 && row_id_removed == row_ids[num_rows_left - 1]) { + --num_rows_left; + } } } diff --git a/cpp/src/arrow/compute/light_array_test.cc b/cpp/src/arrow/compute/light_array_test.cc index 71d228bf3f614..827537c2514d0 100644 --- a/cpp/src/arrow/compute/light_array_test.cc +++ b/cpp/src/arrow/compute/light_array_test.cc @@ -411,6 +411,31 @@ TEST(ExecBatchBuilder, AppendBatchesSomeRows) { ASSERT_EQ(0, pool->bytes_allocated()); } +TEST(ExecBatchBuilder, AppendBatchDupRows) { + std::unique_ptr owned_pool = MemoryPool::CreateDefault(); + MemoryPool* pool = owned_pool.get(); + // Case of cross-word copying for the last row, which may exceed the buffer boundary. + { + // 64-byte data fully occupying one minimal 64-byte aligned memory region. + ExecBatch batch_string = JSONToExecBatch({binary()}, R"([["123456789ABCDEF0"], + ["123456789ABCDEF0"], + ["123456789ABCDEF0"], + ["ABCDEF0"], + ["123456789"]])"); // 9-byte tail row, larger than a word. + ASSERT_EQ(batch_string[0].array()->buffers[1]->capacity(), 64); + ASSERT_EQ(batch_string[0].array()->buffers[2]->capacity(), 64); + ExecBatchBuilder builder; + uint16_t row_ids[2] = {4, 4}; + ASSERT_OK(builder.AppendSelected(pool, batch_string, 2, row_ids, /*num_cols=*/1)); + ExecBatch built = builder.Flush(); + ExecBatch batch_string_appended = + JSONToExecBatch({binary()}, R"([["123456789"], ["123456789"]])"); + ASSERT_EQ(batch_string_appended, built); + ASSERT_NE(0, pool->bytes_allocated()); + } + ASSERT_EQ(0, pool->bytes_allocated()); +} + TEST(ExecBatchBuilder, AppendBatchesSomeCols) { std::unique_ptr owned_pool = MemoryPool::CreateDefault(); MemoryPool* pool = owned_pool.get();