Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-15151: [C++] ]Adding RecordBatchReaderSource to solve an issue in R API #15183

Merged
merged 12 commits into from
Jan 12, 2023
Prev Previous commit
Next Next commit
fix(reviews)
  • Loading branch information
vibhatha committed Jan 11, 2023
commit 10610ebfc5819b982e999cbc63445be8d99a1716
18 changes: 3 additions & 15 deletions cpp/examples/arrow/execution_plan_documentation_examples.cc
Original file line number Diff line number Diff line change
@@ -561,8 +561,6 @@ arrow::Status SourceOrderBySinkExample() {

ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());

std::cout << "basic data created" << std::endl;

arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;

auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
@@ -777,22 +775,12 @@ arrow::Status RecordBatchReaderSourceSinkExample() {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
cp::ExecPlan::Make(*cp::threaded_exec_context()));

std::cout << "basic data created" << std::endl;

arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
std::shared_ptr<arrow::RecordBatchReader> reader =
std::make_shared<arrow::TableBatchReader>(table);

ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
cp::MakeExecNode("record_batch_reader_source", plan.get(), {},
cp::RecordBatchReaderSourceNodeOptions{reader}));
ARROW_RETURN_NOT_OK(cp::MakeExecNode(
"order_by_sink", plan.get(), {source},
cp::OrderBySinkNodeOptions{
cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));

return ExecutePlanAndCollectAsTableWithCustomSink(plan, table->schema(), sink_gen);
cp::Declaration reader_source{"record_batch_reader_source",
cp::RecordBatchReaderSourceNodeOptions{reader}};
return ExecutePlanAndCollectAsTable(std::move(reader_source));
}

// (Doc section: Table Sink Example)
9 changes: 4 additions & 5 deletions cpp/src/arrow/compute/exec/source_node.cc
Original file line number Diff line number Diff line change
@@ -340,11 +340,11 @@ struct RecordBatchReaderSourceNode : public SourceNode {
auto& reader = cast_options.reader;
auto io_executor = cast_options.io_executor;

if (reader == NULLPTR) {
if (reader == nullptr) {
return Status::Invalid(kKindName, " requires a reader which is not null");
}

if (io_executor == NULLPTR) {
if (io_executor == nullptr) {
io_executor = io::internal::GetIOThreadPool();
}

@@ -356,10 +356,9 @@ struct RecordBatchReaderSourceNode : public SourceNode {
static Result<arrow::AsyncGenerator<std::optional<ExecBatch>>> MakeGenerator(
const std::shared_ptr<RecordBatchReader>& reader,
arrow::internal::Executor* io_executor) {
const auto& schema = reader->schema();
auto to_exec_batch =
[schema](const std::shared_ptr<RecordBatch>& batch) -> std::optional<ExecBatch> {
if (batch == NULLPTR || *batch->schema() != *schema) {
[](const std::shared_ptr<RecordBatch>& batch) -> std::optional<ExecBatch> {
if (batch == NULLPTR) {
return std::nullopt;
}
return std::optional<ExecBatch>(ExecBatch(*batch));