Skip to content

Commit

Permalink
feat: add an option to disable augmented fields for Substrait processing
Browse files Browse the repository at this point in the history
  • Loading branch information
EpsilonPrime committed May 8, 2024
1 parent dddaf0b commit c9b53f3
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 27 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/acero/sink_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ class ConsumingSinkNode : public ExecNode,
std::atomic<int32_t> backpressure_counter_ = 0;
std::unique_ptr<util::SerialSequencingQueue> sequencer_;
};

static Result<ExecNode*> MakeTableConsumingSinkNode(ExecPlan* plan,
std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/dataset/discovery_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ class FileSystemDatasetFactoryTest : public DatasetFactoryTest {
}
options_ = std::make_shared<ScanOptions>();
options_->dataset_schema = schema;
ASSERT_OK_AND_ASSIGN(auto projection, ProjectionDescr::Default(*schema));
ASSERT_OK_AND_ASSIGN(auto projection, ProjectionDescr::Default(
*schema, options_->add_augmented_fields));
SetProjection(options_.get(), std::move(projection));
ASSERT_OK_AND_ASSIGN(dataset_, factory_->Finish(schema));
ASSERT_OK_AND_ASSIGN(auto fragment_it, dataset_->GetFragments());
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ TEST_F(TestParquetFileFormat, CachedMetadata) {
auto options = std::make_shared<ScanOptions>();
options->filter = literal(true);
ASSERT_OK_AND_ASSIGN(auto projection_descr,
ProjectionDescr::FromNames({"x"}, *test_schema));
ProjectionDescr::FromNames({"x"}, *test_schema,
options->add_augmented_fields));
options->projected_schema = projection_descr.schema;
options->projection = projection_descr.expression;
ASSERT_OK_AND_ASSIGN(auto generator, fragment->ScanBatchesAsync(options));
Expand Down
38 changes: 26 additions & 12 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ Status NormalizeScanOptions(const std::shared_ptr<ScanOptions>& scan_options,
// create the projected schema only if the provided expressions
// produces valid set of fields.
ARROW_ASSIGN_OR_RAISE(auto projection_descr,
ProjectionDescr::Default(*projected_schema));
ProjectionDescr::Default(
*projected_schema,
scan_options->add_augmented_fields));
scan_options->projected_schema = std::move(projection_descr.schema);
scan_options->projection = projection_descr.expression;
ARROW_ASSIGN_OR_RAISE(scan_options->projection,
Expand All @@ -220,7 +222,9 @@ Status NormalizeScanOptions(const std::shared_ptr<ScanOptions>& scan_options,
// if projected_fields are not found, we default to creating the projected_schema
// and projection from the dataset_schema.
ARROW_ASSIGN_OR_RAISE(auto projection_descr,
ProjectionDescr::Default(*dataset_schema));
ProjectionDescr::Default(
*dataset_schema,
scan_options->add_augmented_fields));
scan_options->projected_schema = std::move(projection_descr.schema);
scan_options->projection = projection_descr.expression;
}
Expand All @@ -231,7 +235,8 @@ Status NormalizeScanOptions(const std::shared_ptr<ScanOptions>& scan_options,
ARROW_ASSIGN_OR_RAISE(
auto projection_descr,
ProjectionDescr::FromNames(scan_options->projected_schema->field_names(),
*dataset_schema));
*dataset_schema,
scan_options->add_augmented_fields));
scan_options->projection = projection_descr.expression;
}

Expand Down Expand Up @@ -730,7 +735,8 @@ Future<int64_t> AsyncScanner::CountRowsAsync(Executor* executor) {
const auto options = std::make_shared<ScanOptions>(*scan_options_);
ARROW_ASSIGN_OR_RAISE(auto empty_projection,
ProjectionDescr::FromNames(std::vector<std::string>(),
*scan_options_->dataset_schema));
*scan_options_->dataset_schema,
scan_options_->add_augmented_fields));
SetProjection(options.get(), empty_projection);

auto total = std::make_shared<std::atomic<int64_t>>(0);
Expand Down Expand Up @@ -828,7 +834,8 @@ Result<ProjectionDescr> ProjectionDescr::FromExpressions(
}

Result<ProjectionDescr> ProjectionDescr::FromNames(std::vector<std::string> names,
const Schema& dataset_schema) {
const Schema& dataset_schema,
bool add_augmented_fields) {
std::vector<compute::Expression> exprs(names.size());
for (size_t i = 0; i < exprs.size(); ++i) {
// If name isn't in schema, try finding it by dotted path.
Expand All @@ -846,15 +853,19 @@ Result<ProjectionDescr> ProjectionDescr::FromNames(std::vector<std::string> name
}
}
auto fields = dataset_schema.fields();
for (const auto& aug_field : kAugmentedFields) {
fields.push_back(aug_field);
if (add_augmented_fields) {
for (const auto &aug_field: kAugmentedFields) {
fields.push_back(aug_field);
}
}
return ProjectionDescr::FromExpressions(std::move(exprs), std::move(names),
Schema(fields, dataset_schema.metadata()));
}

Result<ProjectionDescr> ProjectionDescr::Default(const Schema& dataset_schema) {
return ProjectionDescr::FromNames(dataset_schema.field_names(), dataset_schema);
Result<ProjectionDescr> ProjectionDescr::Default(const Schema& dataset_schema,
bool add_augmented_fields) {
return ProjectionDescr::FromNames(dataset_schema.field_names(), dataset_schema,
add_augmented_fields);
}

void SetProjection(ScanOptions* options, ProjectionDescr projection) {
Expand Down Expand Up @@ -899,7 +910,8 @@ const std::shared_ptr<Schema>& ScannerBuilder::projected_schema() const {
Status ScannerBuilder::Project(std::vector<std::string> columns) {
ARROW_ASSIGN_OR_RAISE(
auto projection,
ProjectionDescr::FromNames(std::move(columns), *scan_options_->dataset_schema));
ProjectionDescr::FromNames(std::move(columns), *scan_options_->dataset_schema,
scan_options_->add_augmented_fields));
SetProjection(scan_options_.get(), std::move(projection));
return Status::OK();
}
Expand Down Expand Up @@ -1052,8 +1064,10 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
});

auto fields = scan_options->dataset_schema->fields();
for (const auto& aug_field : kAugmentedFields) {
fields.push_back(aug_field);
if (scan_options->add_augmented_fields) {
for (const auto &aug_field: kAugmentedFields) {
fields.push_back(aug_field);
}
}

return acero::MakeExecNode(
Expand Down
9 changes: 7 additions & 2 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ struct ARROW_DS_EXPORT ScanOptions {
/// Note: This must be true in order for any readahead to happen
bool use_threads = false;

/// If true the scanner will add augmented fields to the output schema.
bool add_augmented_fields = true;

/// Fragment-specific scan options.
std::shared_ptr<FragmentScanOptions> fragment_scan_options;

Expand Down Expand Up @@ -287,10 +290,12 @@ struct ARROW_DS_EXPORT ProjectionDescr {

/// \brief Create a default projection referencing fields in the dataset schema
static Result<ProjectionDescr> FromNames(std::vector<std::string> names,
const Schema& dataset_schema);
const Schema& dataset_schema,
bool add_augmented_fields);

/// \brief Make a projection that projects every field in the dataset schema
static Result<ProjectionDescr> Default(const Schema& dataset_schema);
static Result<ProjectionDescr> Default(const Schema& dataset_schema,
bool add_augmented_fields);
};

/// \brief Utility method to set the projection expression and schema
Expand Down
12 changes: 8 additions & 4 deletions cpp/src/arrow/dataset/scanner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,8 @@ TEST_P(TestScanner, ProjectionDefaults) {
}
// If we only specify a projection expression then infer the projected schema
// from the projection expression
auto projection_desc = ProjectionDescr::FromNames({"i32"}, *schema_);
auto projection_desc = ProjectionDescr::FromNames(
{"i32"}, *schema_, /*add_augmented_fields=*/true);
{
ARROW_SCOPED_TRACE("User only specifies projection");
options_->projection = projection_desc->expression;
Expand Down Expand Up @@ -1148,7 +1149,8 @@ TEST_P(TestScanner, ProjectedScanNestedFromNames) {
});
ASSERT_OK_AND_ASSIGN(auto descr,
ProjectionDescr::FromNames({".struct.i32", "nested.right.f64"},
*options_->dataset_schema))
*options_->dataset_schema,
options_->add_augmented_fields))
SetProjection(options_.get(), std::move(descr));
auto batch_in = ConstantArrayGenerator::Zeroes(GetParam().items_per_batch, schema_);
auto batch_out = ConstantArrayGenerator::Zeroes(
Expand Down Expand Up @@ -2106,7 +2108,8 @@ TEST(ScanOptions, TestMaterializedFields) {

auto set_projection_from_names = [&opts](std::vector<std::string> names) {
ASSERT_OK_AND_ASSIGN(auto projection, ProjectionDescr::FromNames(
std::move(names), *opts->dataset_schema));
std::move(names), *opts->dataset_schema,
opts->add_augmented_fields));
SetProjection(opts.get(), std::move(projection));
};

Expand Down Expand Up @@ -2160,7 +2163,8 @@ TEST(ScanOptions, TestMaterializedFields) {
// project top-level field, filter nothing
opts->filter = literal(true);
ASSERT_OK_AND_ASSIGN(projection,
ProjectionDescr::FromNames({"nested"}, *opts->dataset_schema));
ProjectionDescr::FromNames({"nested"}, *opts->dataset_schema,
opts->add_augmented_fields));
SetProjection(opts.get(), std::move(projection));
EXPECT_THAT(opts->MaterializedFields(), ElementsAre(FieldRef("nested")));

Expand Down
18 changes: 12 additions & 6 deletions cpp/src/arrow/dataset/test_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ class DatasetFixtureMixin : public ::testing::Test {
options_ = std::make_shared<ScanOptions>();
options_->dataset_schema = schema_;
ASSERT_OK_AND_ASSIGN(auto projection,
ProjectionDescr::FromNames(schema_->field_names(), *schema_));
ProjectionDescr::FromNames(schema_->field_names(), *schema_,
options_->add_augmented_fields));
SetProjection(options_.get(), std::move(projection));
SetFilter(literal(true));
}
Expand All @@ -398,7 +399,8 @@ class DatasetFixtureMixin : public ::testing::Test {
void SetProjectedColumns(std::vector<std::string> column_names) {
ASSERT_OK_AND_ASSIGN(
auto projection,
ProjectionDescr::FromNames(std::move(column_names), *options_->dataset_schema));
ProjectionDescr::FromNames(std::move(column_names), *options_->dataset_schema,
/*add_augmented_fields=*/true));
SetProjection(options_.get(), std::move(projection));
}

Expand Down Expand Up @@ -502,7 +504,8 @@ class FileFormatFixtureMixin : public ::testing::Test {
void SetSchema(std::vector<std::shared_ptr<Field>> fields) {
opts_->dataset_schema = schema(std::move(fields));
ASSERT_OK_AND_ASSIGN(auto projection,
ProjectionDescr::Default(*opts_->dataset_schema));
ProjectionDescr::Default(*opts_->dataset_schema,
/*add_augmented_fields=*/true));
SetProjection(opts_.get(), std::move(projection));
}

Expand All @@ -512,7 +515,8 @@ class FileFormatFixtureMixin : public ::testing::Test {

void Project(std::vector<std::string> names) {
ASSERT_OK_AND_ASSIGN(auto projection, ProjectionDescr::FromNames(
std::move(names), *opts_->dataset_schema));
std::move(names), *opts_->dataset_schema,
/*add_augmented_fields=*/true));
SetProjection(opts_.get(), std::move(projection));
}

Expand Down Expand Up @@ -993,7 +997,8 @@ class FileFormatScanMixin : public FileFormatFixtureMixin<FormatHelper>,
auto i64 = field("i64", int64());
this->opts_->dataset_schema = schema({i32, i32, i64});
ASSERT_RAISES(Invalid,
ProjectionDescr::FromNames({"i32"}, *this->opts_->dataset_schema));
ProjectionDescr::FromNames({"i32"}, *this->opts_->dataset_schema,
/*add_augmented_fields=*/true));
}
void TestScanWithPushdownNulls() {
// Regression test for ARROW-15312
Expand Down Expand Up @@ -1933,7 +1938,8 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin {
scan_options_->dataset_schema = dataset_->schema();
ASSERT_OK_AND_ASSIGN(
auto projection,
ProjectionDescr::FromNames(source_schema_->field_names(), *dataset_->schema()));
ProjectionDescr::FromNames(source_schema_->field_names(), *dataset_->schema(),
scan_options_->add_augmented_fields));
SetProjection(scan_options_.get(), std::move(projection));
}

Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/engine/substrait/relation_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&

auto scan_options = std::make_shared<dataset::ScanOptions>();
scan_options->use_threads = true;
scan_options->add_augmented_fields = false;

if (read.has_filter()) {
ARROW_ASSIGN_OR_RAISE(scan_options->filter,
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/engine/substrait/serde_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,7 @@ NamedTableProvider AlwaysProvideSameTable(std::shared_ptr<Table> table) {
};
}

TEST(Substrait, ReadRelWithRoot) {
TEST(Substrait, ExecReadRelWithLocalFiles) {
ASSERT_OK_AND_ASSIGN(std::string dir_string,
arrow::internal::GetEnvVar("PARQUET_TEST_DATA"));

Expand Down

0 comments on commit c9b53f3

Please sign in to comment.