Skip to content

Commit

Permalink
fix(reviews): addressing reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
vibhatha committed Jan 9, 2023
1 parent 4898dc4 commit 9fc4c55
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 34 deletions.
7 changes: 3 additions & 4 deletions cpp/examples/arrow/execution_plan_documentation_examples.cc
Original file line number Diff line number Diff line change
Expand Up @@ -784,10 +784,9 @@ arrow::Status RecordBatchReaderSourceSinkExample() {
std::shared_ptr<arrow::RecordBatchReader> reader =
std::make_shared<arrow::TableBatchReader>(table);

ARROW_ASSIGN_OR_RAISE(
cp::ExecNode * source,
cp::MakeExecNode("record_batch_reader_source", plan.get(), {},
cp::RecordBatchReaderSourceNodeOptions{table->schema(), reader}));
ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
cp::MakeExecNode("record_batch_reader_source", plan.get(), {},
cp::RecordBatchReaderSourceNodeOptions{reader}));
ARROW_RETURN_NOT_OK(cp::MakeExecNode(
"order_by_sink", plan.get(), {source},
cp::OrderBySinkNodeOptions{
Expand Down
8 changes: 2 additions & 6 deletions cpp/src/arrow/compute/exec/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,9 @@ class ARROW_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions {

class ARROW_EXPORT RecordBatchReaderSourceNodeOptions : public ExecNodeOptions {
public:
RecordBatchReaderSourceNodeOptions(std::shared_ptr<Schema> schema,
std::shared_ptr<RecordBatchReader> reader,
RecordBatchReaderSourceNodeOptions(std::shared_ptr<RecordBatchReader> reader,
arrow::internal::Executor* io_executor = NULLPTR)
: schema(schema), reader(std::move(reader)), io_executor(io_executor) {}

/// \brief The schema of the record batches from the iterator
std::shared_ptr<Schema> schema;
: reader(std::move(reader)), io_executor(io_executor) {}

/// \brief The RecordBatchReader which acts as the data source
std::shared_ptr<RecordBatchReader> reader;
Expand Down
24 changes: 11 additions & 13 deletions cpp/src/arrow/compute/exec/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -354,13 +354,11 @@ void TestRecordBatchReaderSourceSink(

auto exp_batches = MakeBasicBatches();
ASSERT_OK_AND_ASSIGN(std::shared_ptr<RecordBatchReader> reader, to_reader(exp_batches));

ASSERT_OK(Declaration::Sequence(
{
{"record_batch_reader_source",
RecordBatchReaderSourceNodeOptions{exp_batches.schema, reader}},
{"sink", SinkNodeOptions{&sink_gen}},
})
RecordBatchReaderSourceNodeOptions options{reader, io::internal::GetIOThreadPool()};
ASSERT_OK(Declaration::Sequence({
{"record_batch_reader_source", options},
{"sink", SinkNodeOptions{&sink_gen}},
})
.AddToPlan(plan.get()));

ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
Expand All @@ -376,12 +374,12 @@ void TestRecordBatchReaderSourceSinkError(
auto exp_batches = MakeBasicBatches();
ASSERT_OK_AND_ASSIGN(std::shared_ptr<RecordBatchReader> reader, to_reader(exp_batches));

auto null_executor_options =
RecordBatchReaderSourceNodeOptions{exp_batches.schema, reader};
auto null_executor_options = RecordBatchReaderSourceNodeOptions{reader};
ASSERT_OK(MakeExecNode(source_factory_name, plan.get(), {}, null_executor_options));

auto null_schema_options = RecordBatchReaderSourceNodeOptions{no_schema, reader};
ASSERT_THAT(MakeExecNode(source_factory_name, plan.get(), {}, null_schema_options),
std::shared_ptr<RecordBatchReader> no_reader;
auto null_reader_options = RecordBatchReaderSourceNodeOptions{no_reader};
ASSERT_THAT(MakeExecNode(source_factory_name, plan.get(), {}, null_reader_options),
Raises(StatusCode::Invalid, HasSubstr("not null")));
}

Expand Down Expand Up @@ -416,11 +414,11 @@ TEST(ExecPlanExecution, RecordBatchSourceSinkError) {
}

TEST(ExecPlanExecution, RecordBatchReaderSourceSink) {
TestRecordBatchReaderSourceSink(ToRecordBatcheReader);
TestRecordBatchReaderSourceSink(ToRecordBatchReader);
}

TEST(ExecPlanExecution, RecordBatchReaderSourceSinkError) {
TestRecordBatchReaderSourceSinkError(ToRecordBatcheReader);
TestRecordBatchReaderSourceSinkError(ToRecordBatchReader);
}

TEST(ExecPlanExecution, SinkNodeBackpressure) {
Expand Down
16 changes: 7 additions & 9 deletions cpp/src/arrow/compute/exec/source_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,27 +338,25 @@ struct RecordBatchReaderSourceNode : public SourceNode {
const auto& cast_options =
checked_cast<const RecordBatchReaderSourceNodeOptions&>(options);
auto& reader = cast_options.reader;
auto& schema = cast_options.schema;
auto io_executor = cast_options.io_executor;

if (io_executor == NULLPTR) {
io_executor = plan->query_context()->exec_context()->executor();
if (reader == NULLPTR) {
return Status::Invalid(kKindName, " requires a reader which is not null");
}

if (schema == NULLPTR) {
return Status::Invalid(kKindName, " requires schema which is not null");
}
if (io_executor == NULLPTR) {
io_executor = io::internal::GetIOThreadPool();
}

ARROW_ASSIGN_OR_RAISE(auto generator, MakeGenerator(reader, io_executor, schema));
return plan->EmplaceNode<RecordBatchReaderSourceNode>(plan, schema, generator);
ARROW_ASSIGN_OR_RAISE(auto generator, MakeGenerator(reader, io_executor));
return plan->EmplaceNode<RecordBatchReaderSourceNode>(plan, reader->schema(),
generator);
}

static Result<arrow::AsyncGenerator<std::optional<ExecBatch>>> MakeGenerator(
const std::shared_ptr<RecordBatchReader>& reader,
arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>& schema) {
arrow::internal::Executor* io_executor) {
const auto& schema = reader->schema();
auto to_exec_batch =
[schema](const std::shared_ptr<RecordBatch>& batch) -> std::optional<ExecBatch> {
if (batch == NULLPTR || *batch->schema() != *schema) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches(
return record_batches;
}

Result<std::shared_ptr<RecordBatchReader>> ToRecordBatcheReader(
Result<std::shared_ptr<RecordBatchReader>> ToRecordBatchReader(
const BatchesWithSchema& batches_with_schema) {
std::vector<std::shared_ptr<RecordBatch>> record_batches;
for (auto batch : batches_with_schema.batches) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches(
const BatchesWithSchema& batches);

ARROW_TESTING_EXPORT
Result<std::shared_ptr<RecordBatchReader>> ToRecordBatcheReader(
Result<std::shared_ptr<RecordBatchReader>> ToRecordBatchReader(
const BatchesWithSchema& batches_with_schema);

ARROW_TESTING_EXPORT
Expand Down

0 comments on commit 9fc4c55

Please sign in to comment.