Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-15151: [C++] ]Adding RecordBatchReaderSource to solve an issue in R API #15183

Merged
merged 12 commits into from
Jan 12, 2023

Conversation

vibhatha
Copy link
Collaborator

@vibhatha vibhatha commented Jan 4, 2023

This PR includes the factory record_batch_reader_source for the Acero. This is a source node which takes in a RecordBatchReader as the data source along an executor which gives the freedom to choose the threadpool required for the execution. Also an example shows how this can be used in Acero.

  • Self-review

@github-actions
Copy link

github-actions bot commented Jan 4, 2023

@github-actions
Copy link

github-actions bot commented Jan 4, 2023

⚠️ GitHub issue #15151 has no components, please add labels for components.

@vibhatha vibhatha marked this pull request as ready for review January 4, 2023 12:29
@github-actions
Copy link

github-actions bot commented Jan 4, 2023

⚠️ GitHub issue #15151 has no components, please add labels for components.

@vibhatha
Copy link
Collaborator Author

vibhatha commented Jan 4, 2023

cc @westonpace @paleolimbot could you please take a look.

Copy link
Member

@paleolimbot paleolimbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! I think this will fix my problem, but could you also update the R package code? That will tell you in a hurry whether or not this works. I tried:

// [[arrow::export]]
std::shared_ptr<compute::ExecNode> ExecNode_SourceNode(
    const std::shared_ptr<compute::ExecPlan>& plan,
    const std::shared_ptr<arrow::RecordBatchReader>& reader) {
  arrow::compute::RecordBatchReaderSourceNodeOptions options{reader->schema(), reader};
  return MakeExecNodeOrStop("record_batch_reader_source", plan.get(), {}, options);
}

Replacing

std::shared_ptr<compute::ExecNode> ExecNode_SourceNode(
const std::shared_ptr<compute::ExecPlan>& plan,
const std::shared_ptr<arrow::RecordBatchReader>& reader) {
arrow::compute::SourceNodeOptions options{
/*output_schema=*/reader->schema(),
/*generator=*/ValueOrStop(
compute::MakeReaderGenerator(reader, arrow::io::internal::GetIOThreadPool()))};
return MakeExecNodeOrStop("source", plan.get(), {}, options);
}

...and that seemed to work. Maybe also remove

// GH-15151: Best path forward to make this available without a hack like this one
namespace arrow {
namespace io {
namespace internal {
arrow::internal::ThreadPool* GetIOThreadPool();
}
} // namespace io
} // namespace arrow

while you're there?

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing this. I have a few suggestions.

cpp/src/arrow/compute/exec/options.h Outdated Show resolved Hide resolved
Comment on lines 347 to 366
void TestRecordBatchReaderSourceSink(
std::function<Result<std::shared_ptr<RecordBatchReader>>(const BatchesWithSchema&)>
to_reader) {
ASSERT_OK_AND_ASSIGN(auto executor, arrow::internal::ThreadPool::Make(1));
ExecContext exec_context(default_memory_pool(), executor.get());
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_context));
AsyncGenerator<std::optional<ExecBatch>> sink_gen;

auto exp_batches = MakeBasicBatches();
ASSERT_OK_AND_ASSIGN(std::shared_ptr<RecordBatchReader> reader, to_reader(exp_batches));

ASSERT_OK(Declaration::Sequence(
{
{"record_batch_reader_source",
RecordBatchReaderSourceNodeOptions{exp_batches.schema, reader}},
{"sink", SinkNodeOptions{&sink_gen}},
})
.AddToPlan(plan.get()));

ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches))));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are specifying an executor but you aren't verifying that it is being used. Also, there is no test where we do not specify an executor.

Testing that the executor gets used would be a little tricky (maybe someday we can add a TotalTasksRun method to the executor). You would probably need to wrap the batch generator with something that verifies it is on the I/O executor (e.g. using OwnsThisThread or something similar).

I am probably being a bit harsh here as I suspect we test some of the other sources in a similar fashion. If we're only going to have one simple test I would prefer a test using the default instead of one specifying a custom executor as that is the more common case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@westonpace I am working on a wrapper to address this. Let me evaluate options and update the PR. Also, I wouldn't say this is harsh but thorough as I see other sources are not properly tested like this. I think it would be better to test them too. I will create a ticket to cover this. Thanks for thinking in this angel.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way TotalTasksRun sounds fun, could you elaborate a bit what you expect here, I am happy to work on that task.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably external to this PR. Appreciate your thoughts.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're only going to have one simple test I would prefer a test using the default instead of one specifying a custom executor as that is the more common case.

Are you referring to IOThreadPool being used as the default instead of NULLPTR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to IOThreadPool being used as the default instead of NULLPTR?

What you had before might actually have been correct (I thought you were using executor in the options but on closer examination it was only being used in the exec_ctx). I was saying the most common case is RecordBatchReaderSourceNodeOptions options(reader) where the executor is not specified and is left to be the default. We should have our test verify that this case works correctly. At the very least we should make sure we can run such a plan without crashing. Long term we should check, after we ran the plan, to ensure that there were tasks submitted to the I/O thread pool.

This kind of test should not need to create a thread pool at all. We should only need to create a thread pool when testing a custom executor in the node.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the code, is it okay?

cpp/src/arrow/compute/exec/test_util.h Outdated Show resolved Hide resolved
cpp/src/arrow/compute/exec/test_util.cc Outdated Show resolved Hide resolved
cpp/src/arrow/compute/exec/source_node.cc Outdated Show resolved Hide resolved
Comment on lines 366 to 365
ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches))));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use DeclarationToExecBatches for new tests (feel free to fix existing tests too).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind, if I do this in another PR for the rest of the items?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind if you fix the other tests in a separate PR. However, any newly added tests should follow the new format. TEST(ExecPlanExecution, SourceGroupedSum) is a good example from this file that you can follow. This test, for example, would become something like:

  auto exp_batches = MakeBasicBatches();
  ASSERT_OK_AND_ASSIGN(std::shared_ptr<RecordBatchReader> reader, to_reader(exp_batches));
  Declaration plan("record_batch_reader_source", RecordBatchReaderSourceNodeOptions{exp_batches.schema, reader});
  ASSERT_OK_AND_ASSIGN(auto out_batches, DeclarationToExecBatches(std::move(plan)));
  AssertExecBatchesEqualIgnoringOrder(exp_batches.schema, exp_batches.batches, out_batches);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The follow up PR is WIP here: #15288

@vibhatha vibhatha requested review from westonpace and paleolimbot and removed request for westonpace and paleolimbot January 9, 2023 16:13
Copy link
Member

@paleolimbot paleolimbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the R side of things...looks great! Thank you!

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more suggestions

cpp/src/arrow/compute/exec/plan_test.cc Outdated Show resolved Hide resolved
cpp/src/arrow/compute/exec/source_node.cc Outdated Show resolved Hide resolved
cpp/src/arrow/compute/exec/source_node.cc Outdated Show resolved Hide resolved
cpp/src/arrow/compute/exec/source_node.cc Outdated Show resolved Hide resolved
cpp/src/arrow/compute/exec/source_node.cc Outdated Show resolved Hide resolved
@vibhatha vibhatha requested a review from westonpace January 11, 2023 04:06
Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A final few cleanup nits and then we can merge.

cpp/src/arrow/compute/exec/plan_test.cc Outdated Show resolved Hide resolved
cpp/src/arrow/compute/exec/test_util.cc Outdated Show resolved Hide resolved
@vibhatha
Copy link
Collaborator Author

vibhatha commented Jan 12, 2023

@westonpace btw, I just searched for places where I could have missed move and found out the following in other sections of the code base. I haven't included it in here yet, but here is the diff (I tested locally and works fine).

diff --git a/cpp/src/arrow/compute/exec/test_util.cc b/cpp/src/arrow/compute/exec/test_util.cc
index 038171c84..912428acf 100644
--- a/cpp/src/arrow/compute/exec/test_util.cc
+++ b/cpp/src/arrow/compute/exec/test_util.cc
@@ -273,8 +273,8 @@ Result<std::vector<std::shared_ptr<ExecBatch>>> ToExecBatches(
     const BatchesWithSchema& batches_with_schema) {
   std::vector<std::shared_ptr<ExecBatch>> exec_batches;
   for (auto batch : batches_with_schema.batches) {
-    auto exec_batch = std::make_shared<ExecBatch>(batch);
-    exec_batches.push_back(exec_batch);
+    exec_batches.push_back(std::make_shared<ExecBatch>(batch));
   }
   return exec_batches;
 }
@@ -285,7 +285,7 @@ Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches(
   for (auto batch : batches_with_schema.batches) {
     ARROW_ASSIGN_OR_RAISE(auto record_batch,
                           batch.ToRecordBatch(batches_with_schema.schema));
-    record_batches.push_back(record_batch);
+    record_batches.push_back(std::move(record_batch));
   }
   return record_batches;
 }

Is this acceptable?

@vibhatha vibhatha requested a review from westonpace January 12, 2023 01:11
@westonpace
Copy link
Member

Is this acceptable?

Yes, both of those changes look correct. Do you want to apply them real quick?

@vibhatha
Copy link
Collaborator Author

Is this acceptable?

Yes, both of those changes look correct. Do you want to apply them real quick?

Yes I will. One minute.

@vibhatha
Copy link
Collaborator Author

@westonpace Done. Ready to review.

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into this. I'll merge when CI passes.

@vibhatha
Copy link
Collaborator Author

@westonpace CIs passed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Should GetIOThreadPool() be accessible from installed headers?
3 participants