Skip to content

Commit

Permalink
[EXP][C++] Deduplicate schemas when scanning Dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Jan 27, 2025
1 parent ac1e7ec commit 11b7ad8
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 5 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ set(ARROW_UTIL_SRCS
util/crc32.cc
util/debug.cc
util/decimal.cc
util/deduplicate_internal.cc
util/delimiting.cc
util/dict_util.cc
util/fixed_width_internal.cc
Expand Down
21 changes: 17 additions & 4 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "arrow/table.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/config.h"
#include "arrow/util/deduplicate_internal.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/task_group.h"
Expand Down Expand Up @@ -291,6 +292,16 @@ class AsyncScanner : public Scanner, public std::enable_shared_from_this<AsyncSc
std::shared_ptr<Dataset> dataset_;
};

void DeduplicateSchema(std::shared_ptr<RecordBatch>* batch) {
const auto& schema = (*batch)->schema();
auto deduplicated_schema = ::arrow::util::Deduplicate(schema);
if (deduplicated_schema != schema) {
// TODO ReplaceSchema creates a new RecordBatch object,
// should have an in-place RecordBatch::DeduplicateSchema
*batch = (*batch)->ReplaceSchema(deduplicated_schema).ValueOrDie();
}
}

Result<EnumeratedRecordBatchGenerator> FragmentToBatches(
const Enumerated<std::shared_ptr<Fragment>>& fragment,
const std::shared_ptr<ScanOptions>& options) {
Expand Down Expand Up @@ -318,10 +329,11 @@ Result<EnumeratedRecordBatchGenerator> FragmentToBatches(
RecordBatch::Make(options->dataset_schema, /*num_rows=*/0, std::move(columns)));
auto enumerated_batch_gen = MakeEnumeratedGenerator(std::move(batch_gen));

auto combine_fn =
[fragment](const Enumerated<std::shared_ptr<RecordBatch>>& record_batch) {
return EnumeratedRecordBatch{record_batch, fragment};
};
auto combine_fn = [fragment](Enumerated<std::shared_ptr<RecordBatch>> record_batch) {
EnumeratedRecordBatch out{record_batch, fragment};
DeduplicateSchema(&out.record_batch.value);
return out;
};

return MakeMappedGenerator(enumerated_batch_gen, std::move(combine_fn));
}
Expand Down Expand Up @@ -423,6 +435,7 @@ Result<EnumeratedRecordBatch> ToEnumeratedRecordBatch(
out.record_batch.last = batch->values[num_fields + 2].scalar_as<BooleanScalar>().value;
ARROW_ASSIGN_OR_RAISE(out.record_batch.value,
batch->ToRecordBatch(options.projected_schema, options.pool));
DeduplicateSchema(&out.record_batch.value);
return out;
}

Expand Down
91 changes: 91 additions & 0 deletions cpp/src/arrow/util/deduplicate_internal.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/util/deduplicate_internal.h"

#include <mutex>
#include <type_traits>
#include <unordered_map>

#include "arrow/type.h"
#include "arrow/util/logging.h"

namespace arrow::util {
namespace {

template <typename T>
struct Deduplicator {
static_assert(std::is_base_of_v<::arrow::detail::Fingerprintable, T>);

std::shared_ptr<T> Deduplicate(std::shared_ptr<T> obj) {
auto lock = std::lock_guard{mutex_};
if (ARROW_PREDICT_FALSE(++lookups_since_last_pruning_ >= kPruneEvery)) {
Prune();
lookups_since_last_pruning_ = 0;
}

const std::string& fingerprint = obj->fingerprint();
if (fingerprint.empty()) {
// Fingerprinting failure
return obj;
}
auto [it, inserted] = cache_.try_emplace(fingerprint, obj);
if (inserted) {
return obj;
}
auto cached = it->second.lock();
if (cached) {
return cached;
}
it->second = obj;
return obj;
}

private:
void Prune() {
auto it = cache_.begin();
while (it != cache_.end()) {
auto cur = it++; // cur will be invalidated on erasure, so increment now
if (cur->second.expired()) {
cache_.erase(cur);
}
}
}

static constexpr int kPruneEvery = 100;

std::mutex mutex_;
// TODO fingerprints can be large, we should use a fast cryptographic hash instead,
// such as Blake3
std::unordered_map<std::string, std::weak_ptr<T>> cache_;
int lookups_since_last_pruning_ = 0;
};

Deduplicator<Field> g_field_deduplicator;
Deduplicator<Schema> g_schema_deduplicator;

} // namespace

std::shared_ptr<Field> Deduplicate(std::shared_ptr<Field> field) {
return g_field_deduplicator.Deduplicate(std::move(field));
}

std::shared_ptr<Schema> Deduplicate(std::shared_ptr<Schema> schema) {
return g_schema_deduplicator.Deduplicate(std::move(schema));
}

} // namespace arrow::util
35 changes: 35 additions & 0 deletions cpp/src/arrow/util/deduplicate_internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <memory>
#include <type_traits>
#include <utility>

#include "arrow/type_fwd.h"
#include "arrow/util/macros.h"

namespace arrow::util {

ARROW_EXPORT
std::shared_ptr<Field> Deduplicate(std::shared_ptr<Field> field);

ARROW_EXPORT
std::shared_ptr<Schema> Deduplicate(std::shared_ptr<Schema> schema);

} // namespace arrow::util
2 changes: 1 addition & 1 deletion cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1259,7 +1259,7 @@ Future<std::shared_ptr<Table>> FileReaderImpl::DecodeRowGroups(
return column;
};
auto make_table = [result_schema, row_groups, self,
this](const ::arrow::ChunkedArrayVector& columns)
this](const ::arrow::ChunkedArrayVector& columns) mutable
-> ::arrow::Result<std::shared_ptr<Table>> {
int64_t num_rows = 0;
if (!columns.empty()) {
Expand Down

0 comments on commit 11b7ad8

Please sign in to comment.