diff --git a/cpp/examples/arrow/execution_plan_documentation_examples.cc b/cpp/examples/arrow/execution_plan_documentation_examples.cc index 4aed041be33a9..a72db97930cfb 100644 --- a/cpp/examples/arrow/execution_plan_documentation_examples.cc +++ b/cpp/examples/arrow/execution_plan_documentation_examples.cc @@ -561,8 +561,6 @@ arrow::Status SourceOrderBySinkExample() { ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches()); - std::cout << "basic data created" << std::endl; - arrow::AsyncGenerator> sink_gen; auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()}; @@ -761,8 +759,29 @@ arrow::Status TableSinkExample() { std::cout << "Results : " << output_table->ToString() << std::endl; return arrow::Status::OK(); } + // (Doc section: Table Sink Example) +// (Doc section: RecordBatchReaderSource Example) + +/// \brief An example showing the usage of a RecordBatchReader as the data source. +/// +/// RecordBatchReaderSourceSink Example +/// This example shows how a record_batch_reader_source can be used +/// in an execution plan. This includes the source node +/// receiving data from a TableRecordBatchReader. + +arrow::Status RecordBatchReaderSourceSinkExample() { + ARROW_ASSIGN_OR_RAISE(auto table, GetTable()); + std::shared_ptr reader = + std::make_shared(table); + cp::Declaration reader_source{"record_batch_reader_source", + cp::RecordBatchReaderSourceNodeOptions{reader}}; + return ExecutePlanAndCollectAsTable(std::move(reader_source)); +} + +// (Doc section: RecordBatchReaderSource Example) + enum ExampleMode { SOURCE_SINK = 0, TABLE_SOURCE_SINK = 1, @@ -777,7 +796,8 @@ enum ExampleMode { KSELECT = 10, WRITE = 11, UNION = 12, - TABLE_SOURCE_TABLE_SINK = 13 + TABLE_SOURCE_TABLE_SINK = 13, + RECORD_BATCH_READER_SOURCE = 14 }; int main(int argc, char** argv) { @@ -848,6 +868,10 @@ int main(int argc, char** argv) { PrintBlock("TableSink Example"); status = TableSinkExample(); break; + case RECORD_BATCH_READER_SOURCE: + PrintBlock("RecordBatchReaderSource Example"); + status = RecordBatchReaderSourceSinkExample(); + break; default: break; } diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 325e8e514d189..0ef75cbedcf81 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -126,6 +126,21 @@ class ARROW_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions { arrow::internal::Executor* io_executor; }; +class ARROW_EXPORT RecordBatchReaderSourceNodeOptions : public ExecNodeOptions { + public: + RecordBatchReaderSourceNodeOptions(std::shared_ptr reader, + arrow::internal::Executor* io_executor = NULLPTR) + : reader(std::move(reader)), io_executor(io_executor) {} + + /// \brief The RecordBatchReader which acts as the data source + std::shared_ptr reader; + + /// \brief The executor to use for the reader + /// + /// Defaults to the default I/O executor. + arrow::internal::Executor* io_executor; +}; + using ArrayVectorIteratorMaker = std::function>()>; /// \brief An extended Source node which accepts a schema and array-vectors class ARROW_EXPORT ArrayVectorSourceNodeOptions diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 2db6c78d4c1cc..eb560da99cf21 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -344,6 +344,39 @@ void TestSourceSink( Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches)))); } +void TestRecordBatchReaderSourceSink( + std::function>(const BatchesWithSchema&)> + to_reader) { + for (bool parallel : {false, true}) { + SCOPED_TRACE(parallel ? "parallel/merged" : "serial"); + auto exp_batches = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr reader, + to_reader(exp_batches)); + RecordBatchReaderSourceNodeOptions options{reader}; + Declaration plan("record_batch_reader_source", std::move(options)); + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, parallel)); + AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, + exp_batches.batches); + } +} + +void TestRecordBatchReaderSourceSinkError( + std::function>(const BatchesWithSchema&)> + to_reader) { + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + auto source_factory_name = "record_batch_reader_source"; + auto exp_batches = MakeBasicBatches(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr reader, to_reader(exp_batches)); + + auto null_executor_options = RecordBatchReaderSourceNodeOptions{reader}; + ASSERT_OK(MakeExecNode(source_factory_name, plan.get(), {}, null_executor_options)); + + std::shared_ptr no_reader; + auto null_reader_options = RecordBatchReaderSourceNodeOptions{no_reader}; + ASSERT_THAT(MakeExecNode(source_factory_name, plan.get(), {}, null_reader_options), + Raises(StatusCode::Invalid, HasSubstr("not null"))); +} + TEST(ExecPlanExecution, ArrayVectorSourceSink) { TestSourceSink, ArrayVectorSourceNodeOptions>( "array_vector_source", ToArrayVectors); @@ -374,6 +407,14 @@ TEST(ExecPlanExecution, RecordBatchSourceSinkError) { "record_batch_source", ToRecordBatches); } +TEST(ExecPlanExecution, RecordBatchReaderSourceSink) { + TestRecordBatchReaderSourceSink(ToRecordBatchReader); +} + +TEST(ExecPlanExecution, RecordBatchReaderSourceSinkError) { + TestRecordBatchReaderSourceSinkError(ToRecordBatchReader); +} + TEST(ExecPlanExecution, SinkNodeBackpressure) { std::optional batch = ExecBatchFromJSON({int32(), boolean()}, diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 3d21dece97e09..9a96ddf285360 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -327,6 +327,52 @@ struct SchemaSourceNode : public SourceNode { } }; +struct RecordBatchReaderSourceNode : public SourceNode { + RecordBatchReaderSourceNode(ExecPlan* plan, std::shared_ptr schema, + arrow::AsyncGenerator> generator) + : SourceNode(plan, schema, generator) {} + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, kKindName)); + const auto& cast_options = + checked_cast(options); + auto& reader = cast_options.reader; + auto io_executor = cast_options.io_executor; + + if (reader == nullptr) { + return Status::Invalid(kKindName, " requires a reader which is not null"); + } + + if (io_executor == nullptr) { + io_executor = io::internal::GetIOThreadPool(); + } + + ARROW_ASSIGN_OR_RAISE(auto generator, MakeGenerator(reader, io_executor)); + return plan->EmplaceNode(plan, reader->schema(), + generator); + } + + static Result>> MakeGenerator( + const std::shared_ptr& reader, + arrow::internal::Executor* io_executor) { + auto to_exec_batch = + [](const std::shared_ptr& batch) -> std::optional { + if (batch == NULLPTR) { + return std::nullopt; + } + return std::optional(ExecBatch(*batch)); + }; + Iterator> batch_it = MakeIteratorFromReader(reader); + auto exec_batch_it = MakeMapIterator(to_exec_batch, std::move(batch_it)); + return MakeBackgroundGenerator(std::move(exec_batch_it), io_executor); + } + + static const char kKindName[]; +}; + +const char RecordBatchReaderSourceNode::kKindName[] = "RecordBatchReaderSourceNode"; + struct RecordBatchSourceNode : public SchemaSourceNode { using RecordBatchSchemaSourceNode = @@ -444,6 +490,8 @@ void RegisterSourceNode(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("source", SourceNode::Make)); DCHECK_OK(registry->AddFactory("table_source", TableSourceNode::Make)); DCHECK_OK(registry->AddFactory("record_batch_source", RecordBatchSourceNode::Make)); + DCHECK_OK(registry->AddFactory("record_batch_reader_source", + RecordBatchReaderSourceNode::Make)); DCHECK_OK(registry->AddFactory("exec_batch_source", ExecBatchSourceNode::Make)); DCHECK_OK(registry->AddFactory("array_vector_source", ArrayVectorSourceNode::Make)); DCHECK_OK(registry->AddFactory("named_table", MakeNamedTableNode)); diff --git a/cpp/src/arrow/compute/exec/test_util.cc b/cpp/src/arrow/compute/exec/test_util.cc index 038171c8489c6..72ddbbeb0d4db 100644 --- a/cpp/src/arrow/compute/exec/test_util.cc +++ b/cpp/src/arrow/compute/exec/test_util.cc @@ -273,8 +273,7 @@ Result>> ToExecBatches( const BatchesWithSchema& batches_with_schema) { std::vector> exec_batches; for (auto batch : batches_with_schema.batches) { - auto exec_batch = std::make_shared(batch); - exec_batches.push_back(exec_batch); + exec_batches.push_back(std::make_shared(batch)); } return exec_batches; } @@ -285,11 +284,23 @@ Result>> ToRecordBatches( for (auto batch : batches_with_schema.batches) { ARROW_ASSIGN_OR_RAISE(auto record_batch, batch.ToRecordBatch(batches_with_schema.schema)); - record_batches.push_back(record_batch); + record_batches.push_back(std::move(record_batch)); } return record_batches; } +Result> ToRecordBatchReader( + const BatchesWithSchema& batches_with_schema) { + std::vector> record_batches; + for (auto batch : batches_with_schema.batches) { + ARROW_ASSIGN_OR_RAISE(auto record_batch, + batch.ToRecordBatch(batches_with_schema.schema)); + record_batches.push_back(std::move(record_batch)); + } + ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches(std::move(record_batches))); + return std::make_shared(std::move(table)); +} + Result> SortTableOnAllFields(const std::shared_ptr& tab) { std::vector sort_keys; for (int i = 0; i < tab->num_columns(); i++) { diff --git a/cpp/src/arrow/compute/exec/test_util.h b/cpp/src/arrow/compute/exec/test_util.h index a4eea798357ba..1eb50223249cc 100644 --- a/cpp/src/arrow/compute/exec/test_util.h +++ b/cpp/src/arrow/compute/exec/test_util.h @@ -127,6 +127,10 @@ ARROW_TESTING_EXPORT Result>> ToRecordBatches( const BatchesWithSchema& batches); +ARROW_TESTING_EXPORT +Result> ToRecordBatchReader( + const BatchesWithSchema& batches_with_schema); + ARROW_TESTING_EXPORT Result>> ToArrayVectors( const BatchesWithSchema& batches_with_schema); diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index feff0a21873ac..ac5b8d2febf7f 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -30,15 +30,6 @@ #include #include -// GH-15151: Best path forward to make this available without a hack like this one -namespace arrow { -namespace io { -namespace internal { -arrow::internal::ThreadPool* GetIOThreadPool(); -} -} // namespace io -} // namespace arrow - namespace compute = ::arrow::compute; std::shared_ptr make_compute_options(std::string func_name, @@ -459,12 +450,8 @@ std::shared_ptr ExecNode_Union( std::shared_ptr ExecNode_SourceNode( const std::shared_ptr& plan, const std::shared_ptr& reader) { - arrow::compute::SourceNodeOptions options{ - /*output_schema=*/reader->schema(), - /*generator=*/ValueOrStop( - compute::MakeReaderGenerator(reader, arrow::io::internal::GetIOThreadPool()))}; - - return MakeExecNodeOrStop("source", plan.get(), {}, options); + arrow::compute::RecordBatchReaderSourceNodeOptions options{reader}; + return MakeExecNodeOrStop("record_batch_reader_source", plan.get(), {}, options); } // [[arrow::export]]