Skip to content

Commit

Permalink
fix(clean): adding move and cleaning up PR
Browse files Browse the repository at this point in the history
  • Loading branch information
vibhatha committed Jan 12, 2023
1 parent f8a536f commit f2f8937
Showing 1 changed file with 120 additions and 138 deletions.
258 changes: 120 additions & 138 deletions cpp/src/arrow/compute/exec/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ void TestSourceSink(
};
Declaration plan(source_factory_name,
OptionsType{exp_batches.schema, element_it_maker});
ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, parallel));
ASSERT_OK_AND_ASSIGN(auto result,
DeclarationToExecBatches(std::move(plan), parallel));
AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches,
exp_batches.batches);
}
Expand Down Expand Up @@ -739,7 +740,8 @@ TEST(ExecPlanExecution, StressSourceSink) {
schema({field("a", int32()), field("b", boolean())}), num_batches);
Declaration plan("source", SourceNodeOptions{random_data.schema,
random_data.gen(parallel, slow)});
ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, parallel));
ASSERT_OK_AND_ASSIGN(auto result,
DeclarationToExecBatches(std::move(plan), parallel));
AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches,
random_data.batches);
}
Expand Down Expand Up @@ -856,41 +858,35 @@ TEST(ExecPlanExecution, StressSourceSinkStopped) {
}

TEST(ExecPlanExecution, SourceFilterSink) {
for (bool parallel : {false, true}) {
SCOPED_TRACE(parallel ? "parallel" : "single threaded");
auto basic_data = MakeBasicBatches();
Declaration plan = Declaration::Sequence(
{{"source",
SourceNodeOptions{basic_data.schema, basic_data.gen(/*parallel=*/false,
/*slow=*/false)}},
{"filter", FilterNodeOptions{equal(field_ref("i32"), literal(6))}}});
ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, parallel));
auto exp_batches = {ExecBatchFromJSON({int32(), boolean()}, "[]"),
ExecBatchFromJSON({int32(), boolean()}, "[[6, false]]")};
AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches);
}
auto basic_data = MakeBasicBatches();
Declaration plan = Declaration::Sequence(
{{"source", SourceNodeOptions{basic_data.schema, basic_data.gen(/*parallel=*/false,
/*slow=*/false)}},
{"filter", FilterNodeOptions{equal(field_ref("i32"), literal(6))}}});
ASSERT_OK_AND_ASSIGN(auto result,
DeclarationToExecBatches(std::move(plan), /*user_threads=*/false));
auto exp_batches = {ExecBatchFromJSON({int32(), boolean()}, "[]"),
ExecBatchFromJSON({int32(), boolean()}, "[[6, false]]")};
AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches);
}

TEST(ExecPlanExecution, SourceProjectSink) {
for (bool parallel : {false, true}) {
SCOPED_TRACE(parallel ? "parallel" : "single threaded");
auto basic_data = MakeBasicBatches();
Declaration plan = Declaration::Sequence(
{{"source",
SourceNodeOptions{basic_data.schema, basic_data.gen(/*parallel=*/false,
/*slow=*/false)}},
{"project", ProjectNodeOptions{{
not_(field_ref("bool")),
call("add", {field_ref("i32"), literal(1)}),
},
{"!bool", "i32 + 1"}}}});

auto exp_batches = {
ExecBatchFromJSON({boolean(), int32()}, "[[false, null], [true, 5]]"),
ExecBatchFromJSON({boolean(), int32()}, "[[null, 6], [true, 7], [true, 8]]")};
ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, parallel));
AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches);
}
auto basic_data = MakeBasicBatches();
Declaration plan = Declaration::Sequence(
{{"source", SourceNodeOptions{basic_data.schema, basic_data.gen(/*parallel=*/false,
/*slow=*/false)}},
{"project", ProjectNodeOptions{{
not_(field_ref("bool")),
call("add", {field_ref("i32"), literal(1)}),
},
{"!bool", "i32 + 1"}}}});

auto exp_batches = {
ExecBatchFromJSON({boolean(), int32()}, "[[false, null], [true, 5]]"),
ExecBatchFromJSON({boolean(), int32()}, "[[null, 6], [true, 7], [true, 8]]")};
ASSERT_OK_AND_ASSIGN(auto result,
DeclarationToExecBatches(std::move(plan), /*use_threads=*/false));
AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches);
}

namespace {
Expand Down Expand Up @@ -975,11 +971,12 @@ TEST(ExecPlanExecution, SourceMinMaxScalar) {
AggregateNodeOptions{
/*aggregates=*/{{"min_max", std::move(minmax_opts), "i32", "min_max"}},
/*keys=*/{}}}});
ASSERT_OK_AND_ASSIGN(auto result_table, DeclarationToTable(plan, parallel));
ASSERT_OK_AND_ASSIGN(auto result_table,
DeclarationToTable(std::move(plan), parallel));
AssertTablesEqual(*result_table, *expected_table);
}
}
/// VIBHATHA WORKING

TEST(ExecPlanExecution, NestedSourceFilter) {
for (bool parallel : {false, true}) {
SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
Expand All @@ -996,7 +993,8 @@ TEST(ExecPlanExecution, NestedSourceFilter) {
{{"source", SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}},
{"filter", FilterNodeOptions{greater_equal(field_ref(FieldRef("struct", "i32")),
literal(5))}}});
ASSERT_OK_AND_ASSIGN(auto result_table, DeclarationToTable(plan, parallel));
ASSERT_OK_AND_ASSIGN(auto result_table,
DeclarationToTable(std::move(plan), parallel));
AssertTablesEqual(*result_table, *expected_table);
}
}
Expand Down Expand Up @@ -1134,27 +1132,22 @@ TEST(ExecPlanExecution, SourceFilterProjectGroupedSumTopK) {
}

TEST(ExecPlanExecution, SourceScalarAggSink) {
for (bool slow : {false, true}) {
SCOPED_TRACE(slow ? "slowed" : "unslowed");
for (bool parallel : {false, true}) {
SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
auto basic_data = MakeBasicBatches();
auto basic_data = MakeBasicBatches();

Declaration plan = Declaration::Sequence(
{{"source",
SourceNodeOptions{basic_data.schema, basic_data.gen(parallel, slow)}},
{"aggregate", AggregateNodeOptions{
/*aggregates=*/{{"sum", nullptr, "i32", "sum(i32)"},
{"any", nullptr, "bool", "any(bool)"}},
}}});
auto exp_batches = {
ExecBatchFromJSON({int64(), boolean()}, {ArgShape::SCALAR, ArgShape::SCALAR},
"[[22, true]]"),
};
ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, false));
AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches);
}
}
Declaration plan = Declaration::Sequence(
{{"source", SourceNodeOptions{basic_data.schema,
basic_data.gen(/*parallel=*/false, /*slow=*/false)}},
{"aggregate", AggregateNodeOptions{
/*aggregates=*/{{"sum", nullptr, "i32", "sum(i32)"},
{"any", nullptr, "bool", "any(bool)"}},
}}});
auto exp_batches = {
ExecBatchFromJSON({int64(), boolean()}, {ArgShape::SCALAR, ArgShape::SCALAR},
"[[22, true]]"),
};
ASSERT_OK_AND_ASSIGN(auto result,
DeclarationToExecBatches(std::move(plan), /*use_threads=*/false));
AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches);
}

TEST(ExecPlanExecution, AggregationPreservesOptions) {
Expand Down Expand Up @@ -1207,46 +1200,41 @@ TEST(ExecPlanExecution, AggregationPreservesOptions) {
TEST(ExecPlanExecution, ScalarSourceScalarAggSink) {
// ARROW-9056: scalar aggregation can be done over scalars, taking
// into account batch.length > 1 (e.g. a partition column)
for (bool slow : {false, true}) {
SCOPED_TRACE(slow ? "slowed" : "unslowed");
for (bool parallel : {false, true}) {
SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
BatchesWithSchema scalar_data;
scalar_data.batches = {
ExecBatchFromJSON({int32(), boolean()}, {ArgShape::SCALAR, ArgShape::SCALAR},
"[[5, false], [5, false], [5, false]]"),
ExecBatchFromJSON({int32(), boolean()}, "[[5, true], [6, false], [7, true]]")};
scalar_data.schema = schema({field("a", int32()), field("b", boolean())});

// index can't be tested as it's order-dependent
// mode/quantile can't be tested as they're technically vector kernels
Declaration plan = Declaration::Sequence(
{{"source",
SourceNodeOptions{scalar_data.schema, scalar_data.gen(parallel, slow)}},
{"aggregate", AggregateNodeOptions{/*aggregates=*/{
{"all", nullptr, "b", "all(b)"},
{"any", nullptr, "b", "any(b)"},
{"count", nullptr, "a", "count(a)"},
{"mean", nullptr, "a", "mean(a)"},
{"product", nullptr, "a", "product(a)"},
{"stddev", nullptr, "a", "stddev(a)"},
{"sum", nullptr, "a", "sum(a)"},
{"tdigest", nullptr, "a", "tdigest(a)"},
{"variance", nullptr, "a", "variance(a)"}}}}});

auto exp_batches = {
ExecBatchFromJSON(
{boolean(), boolean(), int64(), float64(), int64(), float64(), int64(),
float64(), float64()},
{ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR,
ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::ARRAY,
ArgShape::SCALAR},
R"([[false, true, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0, 0.5833333333333334]])"),
};
ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, false));
AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches);
}
}
BatchesWithSchema scalar_data;
scalar_data.batches = {
ExecBatchFromJSON({int32(), boolean()}, {ArgShape::SCALAR, ArgShape::SCALAR},
"[[5, false], [5, false], [5, false]]"),
ExecBatchFromJSON({int32(), boolean()}, "[[5, true], [6, false], [7, true]]")};
scalar_data.schema = schema({field("a", int32()), field("b", boolean())});

// index can't be tested as it's order-dependent
// mode/quantile can't be tested as they're technically vector kernels
Declaration plan = Declaration::Sequence(
{{"source", SourceNodeOptions{scalar_data.schema,
scalar_data.gen(/*parallel=*/false, /*slow=*/false)}},
{"aggregate", AggregateNodeOptions{
/*aggregates=*/{{"all", nullptr, "b", "all(b)"},
{"any", nullptr, "b", "any(b)"},
{"count", nullptr, "a", "count(a)"},
{"mean", nullptr, "a", "mean(a)"},
{"product", nullptr, "a", "product(a)"},
{"stddev", nullptr, "a", "stddev(a)"},
{"sum", nullptr, "a", "sum(a)"},
{"tdigest", nullptr, "a", "tdigest(a)"},
{"variance", nullptr, "a", "variance(a)"}}}}});

auto exp_batches = {
ExecBatchFromJSON(
{boolean(), boolean(), int64(), float64(), int64(), float64(), int64(),
float64(), float64()},
{ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR,
ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::ARRAY,
ArgShape::SCALAR},
R"([[false, true, 6, 5.5, 26250, 0.7637626158259734, 33, 5.0, 0.5833333333333334]])"),
};
ASSERT_OK_AND_ASSIGN(auto result,
DeclarationToExecBatches(std::move(plan), /*use_threads=*/false));
AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, exp_batches);
}

TEST(ExecPlanExecution, ScalarSourceGroupedSum) {
Expand Down Expand Up @@ -1308,7 +1296,8 @@ TEST(ExecPlanExecution, SelfInnerHashJoinSink) {

auto plan = Declaration("hashjoin", {left, right}, std::move(join_opts));

ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, parallel));
ASSERT_OK_AND_ASSIGN(auto result,
DeclarationToExecBatches(std::move(plan), parallel));

std::vector<ExecBatch> expected = {
ExecBatchFromJSON({int32(), utf8(), int32(), utf8()}, R"([
Expand Down Expand Up @@ -1345,7 +1334,8 @@ TEST(ExecPlanExecution, SelfOuterHashJoinSink) {

auto plan = Declaration("hashjoin", {left, right}, std::move(join_opts));

ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, parallel));
ASSERT_OK_AND_ASSIGN(auto result,
DeclarationToExecBatches(std::move(plan), parallel));

std::vector<ExecBatch> expected = {
ExecBatchFromJSON({int32(), utf8(), int32(), utf8()}, R"([
Expand All @@ -1361,52 +1351,44 @@ TEST(ExecPlanExecution, SelfOuterHashJoinSink) {

TEST(ExecPlan, RecordBatchReaderSourceSink) {
// set up a RecordBatchReader:
for (bool parallel : {false, true}) {
SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
auto input = MakeBasicBatches();
auto input = MakeBasicBatches();

RecordBatchVector batches;
for (const ExecBatch& exec_batch : input.batches) {
ASSERT_OK_AND_ASSIGN(auto batch, exec_batch.ToRecordBatch(input.schema));
batches.push_back(std::move(batch));
}
RecordBatchVector batches;
for (const ExecBatch& exec_batch : input.batches) {
ASSERT_OK_AND_ASSIGN(auto batch, exec_batch.ToRecordBatch(input.schema));
batches.push_back(std::move(batch));
}

ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches));
std::shared_ptr<RecordBatchReader> reader =
std::make_shared<TableBatchReader>(*table);
ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches));
std::shared_ptr<RecordBatchReader> reader = std::make_shared<TableBatchReader>(*table);

// Map the RecordBatchReader to a SourceNode
ASSERT_OK_AND_ASSIGN(
auto batch_gen,
MakeReaderGenerator(std::move(reader), arrow::io::internal::GetIOThreadPool()));
// Map the RecordBatchReader to a SourceNode
ASSERT_OK_AND_ASSIGN(
auto batch_gen,
MakeReaderGenerator(std::move(reader), arrow::io::internal::GetIOThreadPool()));

Declaration plan = Declaration::Sequence(
{{"source", SourceNodeOptions{table->schema(), batch_gen}}});
ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, false));
AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, input.batches);
}
Declaration plan =
Declaration::Sequence({{"source", SourceNodeOptions{table->schema(), batch_gen}}});
ASSERT_OK_AND_ASSIGN(auto result,
DeclarationToExecBatches(std::move(plan), /*use_threads=*/false));
AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, input.batches);
}

TEST(ExecPlan, SourceEnforcesBatchLimit) {
for (bool slow : {false, true}) {
SCOPED_TRACE(slow ? "slowed" : "unslowed");
for (bool parallel : {false, true}) {
SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
auto random_data = MakeRandomBatches(
schema({field("a", int32()), field("b", boolean())}), /*num_batches=*/3,
/*batch_size=*/static_cast<int32_t>(std::floor(ExecPlan::kMaxBatchSize * 3.5)));

// AsyncGenerator<std::optional<ExecBatch>> sink_gen;
Declaration plan = Declaration::Sequence(
{{"source",
SourceNodeOptions{random_data.schema, random_data.gen(parallel, slow)}}});
ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(plan, true));
AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches,
random_data.batches);
for (const auto& batch : result.batches) {
ASSERT_LE(batch.length, ExecPlan::kMaxBatchSize);
}
}
auto random_data = MakeRandomBatches(
schema({field("a", int32()), field("b", boolean())}), /*num_batches=*/3,
/*batch_size=*/static_cast<int32_t>(std::floor(ExecPlan::kMaxBatchSize * 3.5)));

// AsyncGenerator<std::optional<ExecBatch>> sink_gen;
Declaration plan = Declaration::Sequence(
{{"source",
SourceNodeOptions{random_data.schema,
random_data.gen(/*parallel=*/false, /*slow=*/false)}}});
ASSERT_OK_AND_ASSIGN(auto result,
DeclarationToExecBatches(std::move(plan), /*use_threads=*/false));
AssertExecBatchesEqualIgnoringOrder(result.schema, result.batches, random_data.batches);
for (const auto& batch : result.batches) {
ASSERT_LE(batch.length, ExecPlan::kMaxBatchSize);
}
}

Expand Down

0 comments on commit f2f8937

Please sign in to comment.