Skip to content

Commit

Permalink
Merge branch 'apacheGH-35579-parquet-dataset-field-refs' into apacheG…
Browse files Browse the repository at this point in the history
  • Loading branch information
davisusanibar committed May 30, 2023
2 parents 4099ca6 + cf618ae commit d74c116
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 1 deletion.
42 changes: 41 additions & 1 deletion cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,41 @@ Status ResolveOneFieldRef(
return Status::OK();
}

bool IsValidFieldRef(const FieldRef& ref) {
if (ref.IsName()) return true;
if (const auto* nested_refs = ref.nested_refs()) {
for (const auto& nested_ref : *nested_refs) {
if (!nested_ref.IsName()) return false;
}
return true;
}
return false;
}

// Converts a field ref into a position-independent ref (containing only a sequence of
// names) based on the dataset schema. Returns `false` if no conversion was needed.
Result<bool> ValidateFieldRef(const FieldRef& ref, const Schema& dataset_schema,
FieldRef* out) {
if (ARROW_PREDICT_TRUE(IsValidFieldRef(ref))) {
return false;
}

ARROW_ASSIGN_OR_RAISE(auto path, ref.FindOne(dataset_schema));
std::vector<FieldRef> named_refs;
named_refs.reserve(path.indices().size());

const FieldVector* child_fields = &dataset_schema.fields();
for (auto index : path) {
const auto& child_field = *(*child_fields)[index];
named_refs.emplace_back(child_field.name());
child_fields = &child_field.type()->fields();
}

*out =
named_refs.size() == 1 ? std::move(named_refs[0]) : FieldRef(std::move(named_refs));
return true;
}

// Compute the column projection based on the scan options
Result<std::vector<int>> InferColumnProjection(const parquet::arrow::FileReader& reader,
const ScanOptions& options) {
Expand All @@ -248,7 +283,12 @@ Result<std::vector<int>> InferColumnProjection(const parquet::arrow::FileReader&
}

std::vector<int> columns_selection;
for (const auto& ref : field_refs) {
for (auto& ref : field_refs) {
// In the (unlikely) absence of a known dataset schema, we require that all
// materialized refs are named.
if (options.dataset_schema) {
ARROW_RETURN_NOT_OK(ValidateFieldRef(ref, *options.dataset_schema, &ref));
}
RETURN_NOT_OK(ResolveOneFieldRef(manifest, ref, field_lookup, duplicate_fields,
&columns_selection));
}
Expand Down
50 changes: 50 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -694,5 +694,55 @@ TEST(TestParquetStatistics, NullMax) {
EXPECT_EQ(stat_expression->ToString(), "(x >= 1)");
}

// Tests round-trip projection with nested/indexed FieldRefs
// https://github.com/apache/arrow/issues/35579
TEST(TestRoundTrip, ProjectedFieldRefs) {
auto test_schema = schema(
{field("id", uint32()),
field("info", struct_({field("name", utf8()),
field("data", struct_({field("amount", float64()),
field("percent", float32())}))}))});
auto test_table = TableFromJSON(test_schema, {R"([
{"id": 1, "info": {"name": "a", "data": {"amount": 10.3, "percent": 0.1}}},
{"id": 2, "info": {"name": "b", "data": {"amount": 11.6, "percent": 0.2}}},
{"id": 3, "info": {"name": "c", "data": {"amount": 12.9, "percent": 0.3}}},
{"id": 4, "info": {"name": "d", "data": {"amount": 14.2, "percent": 0.4}}},
{"id": 5, "info": {"name": "e", "data": {"amount": 15.5, "percent": 0.5}}},
{"id": 6, "info": {"name": "f", "data": {"amount": 16.8, "percent": 0.6}}}])"});
ASSERT_OK(test_table->ValidateFull());

ASSERT_OK_AND_ASSIGN(auto fs, fs::internal::MockFileSystem::Make(fs::kNoTime, {}));
ASSERT_OK_AND_ASSIGN(auto sink, fs->OpenOutputStream("test.parquet"));
ASSERT_OK(parquet::arrow::WriteTable(*test_table, arrow::default_memory_pool(), sink));
ASSERT_OK(sink->Close());
auto format = std::make_shared<ParquetFileFormat>();
ASSERT_OK_AND_ASSIGN(auto factory,
FileSystemDatasetFactory::Make(fs, fs::FileSelector(), format,
FileSystemFactoryOptions{}));
ASSERT_OK_AND_ASSIGN(auto dataset, factory->Finish());
AssertSchemaEqual(test_schema, dataset->schema());

auto expected_schema = schema({field("value", float32())});
auto expected_table = TableFromJSON(expected_schema, {R"([
{"value": 0.1},{"value": 0.2},{"value": 0.3},
{"value": 0.4},{"value": 0.5},{"value": 0.6}])"});

std::vector<FieldRef> equivalent_refs = {
FieldRef("info", "data", "percent"), FieldRef("info", 1, 1),
FieldRef(1, 1, "percent"), FieldRef(1, 1, 1),
FieldRef(1, FieldRef("data", 1)), FieldRef(FieldRef(1), FieldRef(1, 1)),
};
for (const auto& ref : equivalent_refs) {
ARROW_SCOPED_TRACE("ref = ", ref.ToString());

ScannerBuilder builder(dataset);
ASSERT_OK(builder.Project({field_ref(ref)}, {"value"}));
ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish());
ASSERT_OK_AND_ASSIGN(auto actual_table, scanner->ToTable());

AssertTablesEqual(*expected_table, *actual_table);
}
}

} // namespace dataset
} // namespace arrow

0 comments on commit d74c116

Please sign in to comment.