From a5f472a942a48cba0e9af0a09d00c19cc1ff7fed Mon Sep 17 00:00:00 2001 From: vibhatha Date: Tue, 3 Jan 2023 19:24:23 +0530 Subject: [PATCH 01/12] feat(initial): test idea --- cpp/src/arrow/compute/exec/options.h | 5 +++ cpp/src/arrow/compute/exec/source_node.cc | 42 +++++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 325e8e514d189..104c25ee8266d 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -147,6 +147,11 @@ class ARROW_EXPORT RecordBatchSourceNodeOptions using SchemaSourceNodeOptions::SchemaSourceNodeOptions; }; +class ARROW_EXPORT RecordBatchReaderSourceNodeOptions + : public SchemaSourceNodeOptions { + using SchemaSourceNodeOptions::SchemaSourceNodeOptions; +}; + /// \brief Make a node which excludes some rows from batches passed through it /// /// filter_expression will be evaluated against each batch which is pushed to diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 3d21dece97e09..6ad7ef202ae73 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -360,6 +360,46 @@ struct RecordBatchSourceNode const char RecordBatchSourceNode::kKindName[] = "RecordBatchSourceNode"; + +/// RecordBatchReaderSourceNode Start + +struct RecordBatchReaderSourceNode + : public SchemaSourceNode { + using RecordBatchReaderSchemaSourceNode = + SchemaSourceNode; + + using RecordBatchReaderSchemaSourceNode::RecordBatchReaderSchemaSourceNode; + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + return RecordBatchReaderSchemaSourceNode::Make(plan, inputs, options); + } + + const char* kind_name() const override { return kKindName; } + + static Result>> MakeGenerator( + Iterator>& batch_it, + arrow::internal::Executor* io_executor, const std::shared_ptr& schema) { + auto to_exec_batch = + [schema](const std::shared_ptr& batch) -> std::optional { + if (batch == NULLPTR || *batch->schema() != *schema) { + return std::nullopt; + } + return std::optional(ExecBatch(*batch)); + }; + auto exec_batch_it = MakeMapIterator(to_exec_batch, std::move(batch_it)); + return MakeBackgroundGenerator(std::move(exec_batch_it), io_executor); + } + + static const char kKindName[]; +}; + +const char RecordBatchReaderSourceNode::kKindName[] = "RecordBatchReaderSourceNode"; + + + +/// RecordBatchReaderSourceNode End + struct ExecBatchSourceNode : public SchemaSourceNode { using ExecBatchSchemaSourceNode = @@ -436,6 +476,8 @@ Result MakeNamedTableNode(compute::ExecPlan* plan, "converted into an exec plan or executed"); } + + } // namespace namespace internal { From 581147afe1bcd47eefd2bbeea1a7ed0206963f6c Mon Sep 17 00:00:00 2001 From: vibhatha Date: Wed, 4 Jan 2023 12:46:37 +0530 Subject: [PATCH 02/12] feat(initial): record_batch_reader_source added --- .../execution_plan_documentation_examples.cc | 42 ++++++++++++- cpp/src/arrow/compute/exec/options.h | 24 ++++++-- cpp/src/arrow/compute/exec/source_node.cc | 61 +++++++++++-------- 3 files changed, 97 insertions(+), 30 deletions(-) diff --git a/cpp/examples/arrow/execution_plan_documentation_examples.cc b/cpp/examples/arrow/execution_plan_documentation_examples.cc index 4aed041be33a9..3b21ebc30b606 100644 --- a/cpp/examples/arrow/execution_plan_documentation_examples.cc +++ b/cpp/examples/arrow/execution_plan_documentation_examples.cc @@ -761,6 +761,41 @@ arrow::Status TableSinkExample() { std::cout << "Results : " << output_table->ToString() << std::endl; return arrow::Status::OK(); } + +// (Doc section: Table Sink Example) + +// (Doc section: RecordBatchReaderSource Example) + +/// \brief An example showing the usage of a RecordBatchReader as the data source. +/// +/// RecordBatchReaderSourceSink Example +/// This example shows how a record_batch_reader_source can be used +/// in an execution plan. This includes the source node +/// receiving data from a TableRecordBatchReader. + +arrow::Status RecordBatchReaderSourceSinkExample() { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr plan, + cp::ExecPlan::Make(*cp::threaded_exec_context())); + + std::cout << "basic data created" << std::endl; + + arrow::AsyncGenerator> sink_gen; + ARROW_ASSIGN_OR_RAISE(auto table, GetTable()); + std::shared_ptr reader = + std::make_shared(table); + + ARROW_ASSIGN_OR_RAISE( + cp::ExecNode * source, + cp::MakeExecNode("record_batch_reader_source", plan.get(), {}, + cp::RecordBatchReaderSourceNodeOptions{table->schema(), reader})); + ARROW_RETURN_NOT_OK(cp::MakeExecNode( + "order_by_sink", plan.get(), {source}, + cp::OrderBySinkNodeOptions{ + cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen})); + + return ExecutePlanAndCollectAsTableWithCustomSink(plan, table->schema(), sink_gen); +} + // (Doc section: Table Sink Example) enum ExampleMode { @@ -777,7 +812,8 @@ enum ExampleMode { KSELECT = 10, WRITE = 11, UNION = 12, - TABLE_SOURCE_TABLE_SINK = 13 + TABLE_SOURCE_TABLE_SINK = 13, + RECORD_BATCH_READER_SOURCE = 14 }; int main(int argc, char** argv) { @@ -848,6 +884,10 @@ int main(int argc, char** argv) { PrintBlock("TableSink Example"); status = TableSinkExample(); break; + case RECORD_BATCH_READER_SOURCE: + PrintBlock("RecordBatchReaderSource Example"); + status = RecordBatchReaderSourceSinkExample(); + break; default: break; } diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 104c25ee8266d..adcd4b98a5700 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -126,6 +126,25 @@ class ARROW_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions { arrow::internal::Executor* io_executor; }; +class ARROW_EXPORT RecordBatchReaderSourceNodeOptions : public ExecNodeOptions { + public: + RecordBatchReaderSourceNodeOptions(std::shared_ptr schema, + std::shared_ptr reader, + arrow::internal::Executor* io_executor = NULLPTR) + : schema(schema), reader(std::move(reader)), io_executor(io_executor) {} + + /// \brief The schema of the record batches from the iterator + std::shared_ptr schema; + + /// \brief A maker of an iterator which acts as the data source + std::shared_ptr reader; + + /// \brief The executor to use for scanning the iterator + /// + /// Defaults to the default I/O executor. + arrow::internal::Executor* io_executor; +}; + using ArrayVectorIteratorMaker = std::function>()>; /// \brief An extended Source node which accepts a schema and array-vectors class ARROW_EXPORT ArrayVectorSourceNodeOptions @@ -147,11 +166,6 @@ class ARROW_EXPORT RecordBatchSourceNodeOptions using SchemaSourceNodeOptions::SchemaSourceNodeOptions; }; -class ARROW_EXPORT RecordBatchReaderSourceNodeOptions - : public SchemaSourceNodeOptions { - using SchemaSourceNodeOptions::SchemaSourceNodeOptions; -}; - /// \brief Make a node which excludes some rows from batches passed through it /// /// filter_expression will be evaluated against each batch which is pushed to diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 6ad7ef202ae73..2d494415b869e 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -327,22 +327,37 @@ struct SchemaSourceNode : public SourceNode { } }; -struct RecordBatchSourceNode - : public SchemaSourceNode { - using RecordBatchSchemaSourceNode = - SchemaSourceNode; - - using RecordBatchSchemaSourceNode::RecordBatchSchemaSourceNode; +struct RecordBatchReaderSourceNode : public SourceNode { + RecordBatchReaderSourceNode(ExecPlan* plan, std::shared_ptr schema, + arrow::AsyncGenerator> generator) + : SourceNode(plan, schema, generator) {} static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { - return RecordBatchSchemaSourceNode::Make(plan, inputs, options); - } + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, kKindName)); + const auto& cast_options = + checked_cast(options); + auto& reader = cast_options.reader; + auto& schema = cast_options.schema; + auto io_executor = cast_options.io_executor; - const char* kind_name() const override { return kKindName; } + if (io_executor == NULLPTR) { + io_executor = plan->query_context()->exec_context()->executor(); + } + + if (schema == NULLPTR) { + return Status::Invalid(kKindName, " requires schema which is not null"); + } + if (io_executor == NULLPTR) { + io_executor = io::internal::GetIOThreadPool(); + } + + ARROW_ASSIGN_OR_RAISE(auto generator, MakeGenerator(reader, io_executor, schema)); + return plan->EmplaceNode(plan, schema, generator); + } static Result>> MakeGenerator( - Iterator>& batch_it, + const std::shared_ptr& reader, arrow::internal::Executor* io_executor, const std::shared_ptr& schema) { auto to_exec_batch = [schema](const std::shared_ptr& batch) -> std::optional { @@ -351,6 +366,7 @@ struct RecordBatchSourceNode } return std::optional(ExecBatch(*batch)); }; + Iterator> batch_it = MakeIteratorFromReader(reader); auto exec_batch_it = MakeMapIterator(to_exec_batch, std::move(batch_it)); return MakeBackgroundGenerator(std::move(exec_batch_it), io_executor); } @@ -358,21 +374,18 @@ struct RecordBatchSourceNode static const char kKindName[]; }; -const char RecordBatchSourceNode::kKindName[] = "RecordBatchSourceNode"; - - -/// RecordBatchReaderSourceNode Start +const char RecordBatchReaderSourceNode::kKindName[] = "RecordBatchReaderSourceNode"; -struct RecordBatchReaderSourceNode - : public SchemaSourceNode { - using RecordBatchReaderSchemaSourceNode = - SchemaSourceNode; +struct RecordBatchSourceNode + : public SchemaSourceNode { + using RecordBatchSchemaSourceNode = + SchemaSourceNode; - using RecordBatchReaderSchemaSourceNode::RecordBatchReaderSchemaSourceNode; + using RecordBatchSchemaSourceNode::RecordBatchSchemaSourceNode; static Result Make(ExecPlan* plan, std::vector inputs, const ExecNodeOptions& options) { - return RecordBatchReaderSchemaSourceNode::Make(plan, inputs, options); + return RecordBatchSchemaSourceNode::Make(plan, inputs, options); } const char* kind_name() const override { return kKindName; } @@ -394,9 +407,9 @@ struct RecordBatchReaderSourceNode static const char kKindName[]; }; -const char RecordBatchReaderSourceNode::kKindName[] = "RecordBatchReaderSourceNode"; - +const char RecordBatchSourceNode::kKindName[] = "RecordBatchSourceNode"; +/// RecordBatchReaderSourceNode Start /// RecordBatchReaderSourceNode End @@ -476,8 +489,6 @@ Result MakeNamedTableNode(compute::ExecPlan* plan, "converted into an exec plan or executed"); } - - } // namespace namespace internal { @@ -486,6 +497,8 @@ void RegisterSourceNode(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("source", SourceNode::Make)); DCHECK_OK(registry->AddFactory("table_source", TableSourceNode::Make)); DCHECK_OK(registry->AddFactory("record_batch_source", RecordBatchSourceNode::Make)); + DCHECK_OK(registry->AddFactory("record_batch_reader_source", + RecordBatchReaderSourceNode::Make)); DCHECK_OK(registry->AddFactory("exec_batch_source", ExecBatchSourceNode::Make)); DCHECK_OK(registry->AddFactory("array_vector_source", ArrayVectorSourceNode::Make)); DCHECK_OK(registry->AddFactory("named_table", MakeNamedTableNode)); From 88670a62578d743cb379d75e98a2619864984c1c Mon Sep 17 00:00:00 2001 From: vibhatha Date: Wed, 4 Jan 2023 13:45:30 +0530 Subject: [PATCH 03/12] fix(tests): adding test cases --- cpp/src/arrow/compute/exec/plan_test.cc | 49 +++++++++++++++++++++++++ cpp/src/arrow/compute/exec/test_util.cc | 12 ++++++ cpp/src/arrow/compute/exec/test_util.h | 4 ++ 3 files changed, 65 insertions(+) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 2db6c78d4c1cc..07e0853b0818c 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -344,6 +344,47 @@ void TestSourceSink( Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)))); } +void TestRecordBatchReaderSourceSink( + std::function>(const BatchesWithSchema&)> + to_reader) { + ASSERT_OK_AND_ASSIGN(auto executor, arrow::internal::ThreadPool::Make(1)); + ExecContext exec_context(default_memory_pool(), executor.get()); + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_context)); + AsyncGenerator> sink_gen; + + auto exp_batches = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr reader, to_reader(exp_batches)); + + ASSERT_OK(Declaration::Sequence( + { + {"record_batch_reader_source", + RecordBatchReaderSourceNodeOptions{exp_batches.schema, reader}}, + {"sink", SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get())); + + ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), + Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)))); +} + +void TestRecordBatchReaderSourceSinkError( + std::function>(const BatchesWithSchema&)> + to_reader) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + std::shared_ptr no_schema; + auto source_factory_name = "record_batch_reader_source"; + auto exp_batches = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr reader, to_reader(exp_batches)); + + auto null_executor_options = + RecordBatchReaderSourceNodeOptions{exp_batches.schema, reader}; + ASSERT_OK(MakeExecNode(source_factory_name, plan.get(), {}, null_executor_options)); + + auto null_schema_options = RecordBatchReaderSourceNodeOptions{no_schema, reader}; + ASSERT_THAT(MakeExecNode(source_factory_name, plan.get(), {}, null_schema_options), + Raises(StatusCode::Invalid, HasSubstr("not null"))); +} + TEST(ExecPlanExecution, ArrayVectorSourceSink) { TestSourceSink, ArrayVectorSourceNodeOptions>( "array_vector_source", ToArrayVectors); @@ -374,6 +415,14 @@ TEST(ExecPlanExecution, RecordBatchSourceSinkError) { "record_batch_source", ToRecordBatches); } +TEST(ExecPlanExecution, RecordBatchReaderSourceSink) { + TestRecordBatchReaderSourceSink(ToRecordBatcheReader); +} + +TEST(ExecPlanExecution, RecordBatchReaderSourceSinkError) { + TestRecordBatchReaderSourceSinkError(ToRecordBatcheReader); +} + TEST(ExecPlanExecution, SinkNodeBackpressure) { std::optional batch = ExecBatchFromJSON({int32(), boolean()}, diff --git a/cpp/src/arrow/compute/exec/test_util.cc b/cpp/src/arrow/compute/exec/test_util.cc index 038171c8489c6..81d4e873ffffa 100644 --- a/cpp/src/arrow/compute/exec/test_util.cc +++ b/cpp/src/arrow/compute/exec/test_util.cc @@ -290,6 +290,18 @@ Result>> ToRecordBatches( return record_batches; } +Result> ToRecordBatcheReader( + const BatchesWithSchema& batches_with_schema) { + std::vector> record_batches; + for (auto batch : batches_with_schema.batches) { + ARROW_ASSIGN_OR_RAISE(auto record_batch, + batch.ToRecordBatch(batches_with_schema.schema)); + record_batches.push_back(record_batch); + } + ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches(std::move(record_batches))); + return std::make_shared(std::move(table)); +} + Result> SortTableOnAllFields(const std::shared_ptr& tab) { std::vector sort_keys; for (int i = 0; i < tab->num_columns(); i++) { diff --git a/cpp/src/arrow/compute/exec/test_util.h b/cpp/src/arrow/compute/exec/test_util.h index a4eea798357ba..db1523bc59602 100644 --- a/cpp/src/arrow/compute/exec/test_util.h +++ b/cpp/src/arrow/compute/exec/test_util.h @@ -127,6 +127,10 @@ ARROW_TESTING_EXPORT Result>> ToRecordBatches( const BatchesWithSchema& batches); +ARROW_TESTING_EXPORT +Result> ToRecordBatcheReader( + const BatchesWithSchema& batches_with_schema); + ARROW_TESTING_EXPORT Result>> ToArrayVectors( const BatchesWithSchema& batches_with_schema); From 4898dc4d69141e91445e5d900cdee87f660c2c9d Mon Sep 17 00:00:00 2001 From: vibhatha Date: Wed, 4 Jan 2023 13:54:42 +0530 Subject: [PATCH 04/12] fix(cleanup): update comments and cleaning up code --- cpp/src/arrow/compute/exec/options.h | 4 ++-- cpp/src/arrow/compute/exec/source_node.cc | 4 ---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index adcd4b98a5700..0880cfff6f2c3 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -136,10 +136,10 @@ class ARROW_EXPORT RecordBatchReaderSourceNodeOptions : public ExecNodeOptions { /// \brief The schema of the record batches from the iterator std::shared_ptr schema; - /// \brief A maker of an iterator which acts as the data source + /// \brief The RecordBatchReader which acts as the data source std::shared_ptr reader; - /// \brief The executor to use for scanning the iterator + /// \brief The executor to use for the reader /// /// Defaults to the default I/O executor. arrow::internal::Executor* io_executor; diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 2d494415b869e..0da4aee043939 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -409,10 +409,6 @@ struct RecordBatchSourceNode const char RecordBatchSourceNode::kKindName[] = "RecordBatchSourceNode"; -/// RecordBatchReaderSourceNode Start - -/// RecordBatchReaderSourceNode End - struct ExecBatchSourceNode : public SchemaSourceNode { using ExecBatchSchemaSourceNode = From 9fc4c55d6203d12a70710e89b85a1d4c1d435ee7 Mon Sep 17 00:00:00 2001 From: vibhatha Date: Thu, 5 Jan 2023 12:01:59 +0530 Subject: [PATCH 05/12] fix(reviews): addressing reviews --- .../execution_plan_documentation_examples.cc | 7 +++--- cpp/src/arrow/compute/exec/options.h | 8 ++----- cpp/src/arrow/compute/exec/plan_test.cc | 24 +++++++++---------- cpp/src/arrow/compute/exec/source_node.cc | 16 ++++++------- cpp/src/arrow/compute/exec/test_util.cc | 2 +- cpp/src/arrow/compute/exec/test_util.h | 2 +- 6 files changed, 25 insertions(+), 34 deletions(-) diff --git a/cpp/examples/arrow/execution_plan_documentation_examples.cc b/cpp/examples/arrow/execution_plan_documentation_examples.cc index 3b21ebc30b606..10b0d6229ec1c 100644 --- a/cpp/examples/arrow/execution_plan_documentation_examples.cc +++ b/cpp/examples/arrow/execution_plan_documentation_examples.cc @@ -784,10 +784,9 @@ arrow::Status RecordBatchReaderSourceSinkExample() { std::shared_ptr reader = std::make_shared(table); - ARROW_ASSIGN_OR_RAISE( - cp::ExecNode * source, - cp::MakeExecNode("record_batch_reader_source", plan.get(), {}, - cp::RecordBatchReaderSourceNodeOptions{table->schema(), reader})); + ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source, + cp::MakeExecNode("record_batch_reader_source", plan.get(), {}, + cp::RecordBatchReaderSourceNodeOptions{reader})); ARROW_RETURN_NOT_OK(cp::MakeExecNode( "order_by_sink", plan.get(), {source}, cp::OrderBySinkNodeOptions{ diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 0880cfff6f2c3..0ef75cbedcf81 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -128,13 +128,9 @@ class ARROW_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions { class ARROW_EXPORT RecordBatchReaderSourceNodeOptions : public ExecNodeOptions { public: - RecordBatchReaderSourceNodeOptions(std::shared_ptr schema, - std::shared_ptr reader, + RecordBatchReaderSourceNodeOptions(std::shared_ptr reader, arrow::internal::Executor* io_executor = NULLPTR) - : schema(schema), reader(std::move(reader)), io_executor(io_executor) {} - - /// \brief The schema of the record batches from the iterator - std::shared_ptr schema; + : reader(std::move(reader)), io_executor(io_executor) {} /// \brief The RecordBatchReader which acts as the data source std::shared_ptr reader; diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 07e0853b0818c..a18e1e034d823 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -354,13 +354,11 @@ void TestRecordBatchReaderSourceSink( auto exp_batches = MakeBasicBatches(); ASSERT_OK_AND_ASSIGN(std::shared_ptr reader, to_reader(exp_batches)); - - ASSERT_OK(Declaration::Sequence( - { - {"record_batch_reader_source", - RecordBatchReaderSourceNodeOptions{exp_batches.schema, reader}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) + RecordBatchReaderSourceNodeOptions options{reader, io::internal::GetIOThreadPool()}; + ASSERT_OK(Declaration::Sequence({ + {"record_batch_reader_source", options}, + {"sink", SinkNodeOptions{&sink_gen}}, + }) .AddToPlan(plan.get())); ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), @@ -376,12 +374,12 @@ void TestRecordBatchReaderSourceSinkError( auto exp_batches = MakeBasicBatches(); ASSERT_OK_AND_ASSIGN(std::shared_ptr reader, to_reader(exp_batches)); - auto null_executor_options = - RecordBatchReaderSourceNodeOptions{exp_batches.schema, reader}; + auto null_executor_options = RecordBatchReaderSourceNodeOptions{reader}; ASSERT_OK(MakeExecNode(source_factory_name, plan.get(), {}, null_executor_options)); - auto null_schema_options = RecordBatchReaderSourceNodeOptions{no_schema, reader}; - ASSERT_THAT(MakeExecNode(source_factory_name, plan.get(), {}, null_schema_options), + std::shared_ptr no_reader; + auto null_reader_options = RecordBatchReaderSourceNodeOptions{no_reader}; + ASSERT_THAT(MakeExecNode(source_factory_name, plan.get(), {}, null_reader_options), Raises(StatusCode::Invalid, HasSubstr("not null"))); } @@ -416,11 +414,11 @@ TEST(ExecPlanExecution, RecordBatchSourceSinkError) { } TEST(ExecPlanExecution, RecordBatchReaderSourceSink) { - TestRecordBatchReaderSourceSink(ToRecordBatcheReader); + TestRecordBatchReaderSourceSink(ToRecordBatchReader); } TEST(ExecPlanExecution, RecordBatchReaderSourceSinkError) { - TestRecordBatchReaderSourceSinkError(ToRecordBatcheReader); + TestRecordBatchReaderSourceSinkError(ToRecordBatchReader); } TEST(ExecPlanExecution, SinkNodeBackpressure) { diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 0da4aee043939..4b7572f8b745a 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -338,27 +338,25 @@ struct RecordBatchReaderSourceNode : public SourceNode { const auto& cast_options = checked_cast(options); auto& reader = cast_options.reader; - auto& schema = cast_options.schema; auto io_executor = cast_options.io_executor; - if (io_executor == NULLPTR) { - io_executor = plan->query_context()->exec_context()->executor(); + if (reader == NULLPTR) { + return Status::Invalid(kKindName, " requires a reader which is not null"); } - if (schema == NULLPTR) { - return Status::Invalid(kKindName, " requires schema which is not null"); - } if (io_executor == NULLPTR) { io_executor = io::internal::GetIOThreadPool(); } - ARROW_ASSIGN_OR_RAISE(auto generator, MakeGenerator(reader, io_executor, schema)); - return plan->EmplaceNode(plan, schema, generator); + ARROW_ASSIGN_OR_RAISE(auto generator, MakeGenerator(reader, io_executor)); + return plan->EmplaceNode(plan, reader->schema(), + generator); } static Result>> MakeGenerator( const std::shared_ptr& reader, - arrow::internal::Executor* io_executor, const std::shared_ptr& schema) { + arrow::internal::Executor* io_executor) { + const auto& schema = reader->schema(); auto to_exec_batch = [schema](const std::shared_ptr& batch) -> std::optional { if (batch == NULLPTR || *batch->schema() != *schema) { diff --git a/cpp/src/arrow/compute/exec/test_util.cc b/cpp/src/arrow/compute/exec/test_util.cc index 81d4e873ffffa..95ae470874ac1 100644 --- a/cpp/src/arrow/compute/exec/test_util.cc +++ b/cpp/src/arrow/compute/exec/test_util.cc @@ -290,7 +290,7 @@ Result>> ToRecordBatches( return record_batches; } -Result> ToRecordBatcheReader( +Result> ToRecordBatchReader( const BatchesWithSchema& batches_with_schema) { std::vector> record_batches; for (auto batch : batches_with_schema.batches) { diff --git a/cpp/src/arrow/compute/exec/test_util.h b/cpp/src/arrow/compute/exec/test_util.h index db1523bc59602..1eb50223249cc 100644 --- a/cpp/src/arrow/compute/exec/test_util.h +++ b/cpp/src/arrow/compute/exec/test_util.h @@ -128,7 +128,7 @@ Result>> ToRecordBatches( const BatchesWithSchema& batches); ARROW_TESTING_EXPORT -Result> ToRecordBatcheReader( +Result> ToRecordBatchReader( const BatchesWithSchema& batches_with_schema); ARROW_TESTING_EXPORT From a638e41ca608596bc4904b3b53e9349be48f7fc3 Mon Sep 17 00:00:00 2001 From: vibhatha Date: Mon, 9 Jan 2023 16:37:50 +0530 Subject: [PATCH 06/12] fix(reviews) --- cpp/src/arrow/compute/exec/plan_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index a18e1e034d823..3657bcddfdd9c 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -354,7 +354,7 @@ void TestRecordBatchReaderSourceSink( auto exp_batches = MakeBasicBatches(); ASSERT_OK_AND_ASSIGN(std::shared_ptr reader, to_reader(exp_batches)); - RecordBatchReaderSourceNodeOptions options{reader, io::internal::GetIOThreadPool()}; + RecordBatchReaderSourceNodeOptions options{reader}; ASSERT_OK(Declaration::Sequence({ {"record_batch_reader_source", options}, {"sink", SinkNodeOptions{&sink_gen}}, From f105511a687aa2c43cbd4ea10a72245a41df58ba Mon Sep 17 00:00:00 2001 From: Vibhatha Lakmal Abeykoon Date: Mon, 9 Jan 2023 21:26:52 +0530 Subject: [PATCH 07/12] fix(r): adding record_batch_reader_source for R API --- r/src/compute-exec.cpp | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index feff0a21873ac..ac5b8d2febf7f 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -30,15 +30,6 @@ #include #include -// GH-15151: Best path forward to make this available without a hack like this one -namespace arrow { -namespace io { -namespace internal { -arrow::internal::ThreadPool* GetIOThreadPool(); -} -} // namespace io -} // namespace arrow - namespace compute = ::arrow::compute; std::shared_ptr make_compute_options(std::string func_name, @@ -459,12 +450,8 @@ std::shared_ptr ExecNode_Union( std::shared_ptr ExecNode_SourceNode( const std::shared_ptr& plan, const std::shared_ptr& reader) { - arrow::compute::SourceNodeOptions options{ - /*output_schema=*/reader->schema(), - /*generator=*/ValueOrStop( - compute::MakeReaderGenerator(reader, arrow::io::internal::GetIOThreadPool()))}; - - return MakeExecNodeOrStop("source", plan.get(), {}, options); + arrow::compute::RecordBatchReaderSourceNodeOptions options{reader}; + return MakeExecNodeOrStop("record_batch_reader_source", plan.get(), {}, options); } // [[arrow::export]] From dae7076fda2d7a56836373d03a9cc23c919bb238 Mon Sep 17 00:00:00 2001 From: Vibhatha Lakmal Abeykoon Date: Mon, 9 Jan 2023 21:42:35 +0530 Subject: [PATCH 08/12] fix(reviews): reformat test case to use test abstraction --- cpp/src/arrow/compute/exec/plan_test.cc | 27 ++++++++++--------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 3657bcddfdd9c..1be02b4241739 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -347,22 +347,17 @@ void TestSourceSink( void TestRecordBatchReaderSourceSink( std::function>(const BatchesWithSchema&)> to_reader) { - ASSERT_OK_AND_ASSIGN(auto executor, arrow::internal::ThreadPool::Make(1)); - ExecContext exec_context(default_memory_pool(), executor.get()); - ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_context)); - AsyncGenerator> sink_gen; - - auto exp_batches = MakeBasicBatches(); - ASSERT_OK_AND_ASSIGN(std::shared_ptr reader, to_reader(exp_batches)); - RecordBatchReaderSourceNodeOptions options{reader}; - ASSERT_OK(Declaration::Sequence({ - {"record_batch_reader_source", options}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); - - ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), - Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)))); + for (bool parallel : {false, true}) { + SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); + auto exp_batches = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr reader, + to_reader(exp_batches)); + RecordBatchReaderSourceNodeOptions options{reader}; + Declaration plan("record_batch_reader_source", std::move(options)); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, false)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, + exp_batches.batches); + } } void TestRecordBatchReaderSourceSinkError( From 10610ebfc5819b982e999cbc63445be8d99a1716 Mon Sep 17 00:00:00 2001 From: vibhatha Date: Wed, 11 Jan 2023 08:09:25 +0530 Subject: [PATCH 09/12] fix(reviews) --- .../execution_plan_documentation_examples.cc | 18 +++--------------- cpp/src/arrow/compute/exec/source_node.cc | 9 ++++----- 2 files changed, 7 insertions(+), 20 deletions(-) diff --git a/cpp/examples/arrow/execution_plan_documentation_examples.cc b/cpp/examples/arrow/execution_plan_documentation_examples.cc index 10b0d6229ec1c..170c8a2e195c5 100644 --- a/cpp/examples/arrow/execution_plan_documentation_examples.cc +++ b/cpp/examples/arrow/execution_plan_documentation_examples.cc @@ -561,8 +561,6 @@ arrow::Status SourceOrderBySinkExample() { ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches()); - std::cout << "basic data created" << std::endl; - arrow::AsyncGenerator> sink_gen; auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}; @@ -777,22 +775,12 @@ arrow::Status RecordBatchReaderSourceSinkExample() { ARROW_ASSIGN_OR_RAISE(std::shared_ptr plan, cp::ExecPlan::Make(*cp::threaded_exec_context())); - std::cout << "basic data created" << std::endl; - - arrow::AsyncGenerator> sink_gen; ARROW_ASSIGN_OR_RAISE(auto table, GetTable()); std::shared_ptr reader = std::make_shared(table); - - ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source, - cp::MakeExecNode("record_batch_reader_source", plan.get(), {}, - cp::RecordBatchReaderSourceNodeOptions{reader})); - ARROW_RETURN_NOT_OK(cp::MakeExecNode( - "order_by_sink", plan.get(), {source}, - cp::OrderBySinkNodeOptions{ - cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen})); - - return ExecutePlanAndCollectAsTableWithCustomSink(plan, table->schema(), sink_gen); + cp::Declaration reader_source{"record_batch_reader_source", + cp::RecordBatchReaderSourceNodeOptions{reader}}; + return ExecutePlanAndCollectAsTable(std::move(reader_source)); } // (Doc section: Table Sink Example) diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 4b7572f8b745a..9a96ddf285360 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -340,11 +340,11 @@ struct RecordBatchReaderSourceNode : public SourceNode { auto& reader = cast_options.reader; auto io_executor = cast_options.io_executor; - if (reader == NULLPTR) { + if (reader == nullptr) { return Status::Invalid(kKindName, " requires a reader which is not null"); } - if (io_executor == NULLPTR) { + if (io_executor == nullptr) { io_executor = io::internal::GetIOThreadPool(); } @@ -356,10 +356,9 @@ struct RecordBatchReaderSourceNode : public SourceNode { static Result>> MakeGenerator( const std::shared_ptr& reader, arrow::internal::Executor* io_executor) { - const auto& schema = reader->schema(); auto to_exec_batch = - [schema](const std::shared_ptr& batch) -> std::optional { - if (batch == NULLPTR || *batch->schema() != *schema) { + [](const std::shared_ptr& batch) -> std::optional { + if (batch == NULLPTR) { return std::nullopt; } return std::optional(ExecBatch(*batch)); From 746d7c28f7a024651d00969c4fc43adfd74f5287 Mon Sep 17 00:00:00 2001 From: vibhatha Date: Wed, 11 Jan 2023 08:14:50 +0530 Subject: [PATCH 10/12] fix(reviews) --- cpp/src/arrow/compute/exec/plan_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 1be02b4241739..60d4caeca83ef 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -354,7 +354,7 @@ void TestRecordBatchReaderSourceSink( to_reader(exp_batches)); RecordBatchReaderSourceNodeOptions options{reader}; Declaration plan("record_batch_reader_source", std::move(options)); - ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, false)); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, parallel)); AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches.batches); } From ff495b6c6c461fb357dabad74677bf5778fa1098 Mon Sep 17 00:00:00 2001 From: vibhatha Date: Thu, 12 Jan 2023 06:22:55 +0530 Subject: [PATCH 11/12] fix(reviews): minor changes --- cpp/examples/arrow/execution_plan_documentation_examples.cc | 5 +---- cpp/src/arrow/compute/exec/plan_test.cc | 1 - cpp/src/arrow/compute/exec/test_util.cc | 2 +- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/cpp/examples/arrow/execution_plan_documentation_examples.cc b/cpp/examples/arrow/execution_plan_documentation_examples.cc index 170c8a2e195c5..a72db97930cfb 100644 --- a/cpp/examples/arrow/execution_plan_documentation_examples.cc +++ b/cpp/examples/arrow/execution_plan_documentation_examples.cc @@ -772,9 +772,6 @@ arrow::Status TableSinkExample() { /// receiving data from a TableRecordBatchReader. arrow::Status RecordBatchReaderSourceSinkExample() { - ARROW_ASSIGN_OR_RAISE(std::shared_ptr plan, - cp::ExecPlan::Make(*cp::threaded_exec_context())); - ARROW_ASSIGN_OR_RAISE(auto table, GetTable()); std::shared_ptr reader = std::make_shared(table); @@ -783,7 +780,7 @@ arrow::Status RecordBatchReaderSourceSinkExample() { return ExecutePlanAndCollectAsTable(std::move(reader_source)); } -// (Doc section: Table Sink Example) +// (Doc section: RecordBatchReaderSource Example) enum ExampleMode { SOURCE_SINK = 0, diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 60d4caeca83ef..eb560da99cf21 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -364,7 +364,6 @@ void TestRecordBatchReaderSourceSinkError( std::function>(const BatchesWithSchema&)> to_reader) { ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); - std::shared_ptr no_schema; auto source_factory_name = "record_batch_reader_source"; auto exp_batches = MakeBasicBatches(); ASSERT_OK_AND_ASSIGN(std::shared_ptr reader, to_reader(exp_batches)); diff --git a/cpp/src/arrow/compute/exec/test_util.cc b/cpp/src/arrow/compute/exec/test_util.cc index 95ae470874ac1..61815e2c73180 100644 --- a/cpp/src/arrow/compute/exec/test_util.cc +++ b/cpp/src/arrow/compute/exec/test_util.cc @@ -296,7 +296,7 @@ Result> ToRecordBatchReader( for (auto batch : batches_with_schema.batches) { ARROW_ASSIGN_OR_RAISE(auto record_batch, batch.ToRecordBatch(batches_with_schema.schema)); - record_batches.push_back(record_batch); + record_batches.push_back(std::move(record_batch)); } ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches(std::move(record_batches))); return std::make_shared(std::move(table)); From 76b3674230c7f15acf9b178f8412e1cdcb5856c7 Mon Sep 17 00:00:00 2001 From: vibhatha Date: Thu, 12 Jan 2023 07:03:37 +0530 Subject: [PATCH 12/12] fix(move) --- cpp/src/arrow/compute/exec/test_util.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/compute/exec/test_util.cc b/cpp/src/arrow/compute/exec/test_util.cc index 61815e2c73180..72ddbbeb0d4db 100644 --- a/cpp/src/arrow/compute/exec/test_util.cc +++ b/cpp/src/arrow/compute/exec/test_util.cc @@ -273,8 +273,7 @@ Result>> ToExecBatches( const BatchesWithSchema& batches_with_schema) { std::vector> exec_batches; for (auto batch : batches_with_schema.batches) { - auto exec_batch = std::make_shared(batch); - exec_batches.push_back(exec_batch); + exec_batches.push_back(std::make_shared(batch)); } return exec_batches; } @@ -285,7 +284,7 @@ Result>> ToRecordBatches( for (auto batch : batches_with_schema.batches) { ARROW_ASSIGN_OR_RAISE(auto record_batch, batch.ToRecordBatch(batches_with_schema.schema)); - record_batches.push_back(record_batch); + record_batches.push_back(std::move(record_batch)); } return record_batches; }