Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable merging of LocalVocabs #1310

Merged
merged 23 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2cc4b58
Playing around with this and that, let's see how fast we can get this…
joka921 Mar 21, 2024
ce3afef
A first draft of merging local vocabs.
joka921 Mar 21, 2024
af376d0
Merge remote-tracking branch 'origin/master' into merge-local-vocab
Mar 21, 2024
aa8d372
A small fix for joins.
joka921 Mar 21, 2024
d2f1852
Merge remote-tracking branch 'origin/merge-local-vocab' into merge-lo…
joka921 Mar 21, 2024
c316139
Merge remote-tracking branch 'origin/master' into merge-local-vocab
Mar 22, 2024
2836a51
Minor clang-format fix
Mar 22, 2024
cbccbc8
working on this and that.
joka921 Apr 8, 2024
de10849
A Working version with improved test coverage.
joka921 Apr 10, 2024
58afc81
Merge remote-tracking branch 'origin/merge-local-vocab' into merge-lo…
joka921 Apr 10, 2024
4cedeeb
Fix the tests.
joka921 Apr 10, 2024
68a9be5
Merge branch 'master' into merge-local-vocab
joka921 Apr 10, 2024
2bc4d5d
Small round of reviews.
joka921 Apr 10, 2024
556c293
Fixed the remaining unit tests.
joka921 Apr 11, 2024
4bd0884
Fix compilation and hopefully coverage.
joka921 Apr 11, 2024
71d7a8b
Update the native macos build to clang 18 to fix std::to_address.
joka921 Apr 11, 2024
64f19df
Remove `std::to_address` because of MacOS.
joka921 Apr 11, 2024
700b518
Fix mac for good now...
joka921 Apr 11, 2024
5724a26
Update clang-16-macos
joka921 Apr 11, 2024
b3c6fe2
Finalinally fix the MacOS build.
joka921 Apr 11, 2024
53e9801
Merge remote-tracking branch 'origin/merge-local-vocab' into merge-lo…
joka921 Apr 11, 2024
a441f5e
A round of reviews.
joka921 Apr 11, 2024
814e172
The missing "d"
joka921 Apr 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions src/engine/AddCombinedRowToTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,21 +218,23 @@ class AddCombinedRowToIdTable {
AD_CORRECTNESS_CHECK(inputLeftAndRight_.has_value());
result.resize(oldSize + nextIndex_);

// Sometimes columns are combined where one value is UNDEF and the other one
// is not. This function very efficiently returns the not-UNDEF value in
// this case.
// TODO<joka921> If we keep track of the information that one of the
// involved columns contains no UNDEF values at all, we can omit this step
// and simply copy the values from this column without looking at the other
// input.
auto mergeWithUndefined = [](const ValueId a, const ValueId b) {
static_assert(ValueId::makeUndefined().getBits() == 0u);
return ValueId::fromBits(a.getBits() | b.getBits());
// Precondition: `a` and `b` compare equal or at least one of them is UNDEF
// If exactly one of them is UNDEF, return the other one, else return any of
// them (they are equal anyway).
auto getJoinValue = [](const ValueId a, const ValueId b) {
// NOTE: For localVocabIndices we might have different pointers that
// compare equal because they point to the same word. Therefore we cannot
// use a simple bitwise operation to handle the "one of them is UNDEF"
// case as we previously did.
if (a.isUndefined()) {
return b;
}
return a;
};

// A lambda that writes the join column with the given `colIdx` to the
// `nextResultColIdx`-th column of the result.
auto writeJoinColumn = [&result, &mergeWithUndefined, oldSize, this](
auto writeJoinColumn = [&result, &getJoinValue, oldSize, this](
size_t colIdx, size_t resultColIdx) {
const auto& colLeft = inputLeft().getColumn(colIdx);
const auto& colRight = inputRight().getColumn(colIdx);
Expand All @@ -242,8 +244,8 @@ class AddCombinedRowToIdTable {

// Write the matching rows.
for (const auto& [targetIndex, sourceIndices] : indexBuffer_) {
auto resultId = mergeWithUndefined(colLeft[sourceIndices[0]],
colRight[sourceIndices[1]]);
auto resultId =
getJoinValue(colLeft[sourceIndices[0]], colRight[sourceIndices[1]]);
numUndef += static_cast<size_t>(resultId.isUndefined());
resultCol[oldSize + targetIndex] = resultId;
}
Expand Down
2 changes: 1 addition & 1 deletion src/engine/CartesianProductJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ ResultTable CartesianProductJoin::computeResult() {
auto subResultsDeref = std::views::transform(
subResults, [](auto& x) -> decltype(auto) { return *x; });
return {std::move(result), resultSortedOn(),
ResultTable::getSharedLocalVocabFromNonEmptyOf(subResultsDeref)};
ResultTable::getMergedLocalVocab(subResultsDeref)};
}

// ____________________________________________________________________________
Expand Down
2 changes: 1 addition & 1 deletion src/engine/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ ResultTable Join::computeResult() {
// If only one of the two operands has a non-empty local vocabulary, share
// with that one (otherwise, throws an exception).
return {std::move(idTable), resultSortedOn(),
ResultTable::getSharedLocalVocabFromNonEmptyOf(*leftRes, *rightRes)};
ResultTable::getMergedLocalVocab(*leftRes, *rightRes)};
}

// _____________________________________________________________________________
Expand Down
89 changes: 37 additions & 52 deletions src/engine/LocalVocab.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,56 +11,35 @@
// _____________________________________________________________________________
LocalVocab LocalVocab::clone() const {
LocalVocab localVocabClone;
// First, make a deep copy of the `absl::node_hash_map` holding the actual
// map of strings to indexes.
localVocabClone.wordsToIndexesMap_ = this->wordsToIndexesMap_;
// The next free index should be the same.
localVocabClone.nextFreeIndex_ = this->nextFreeIndex_;
// The map from local ids to strings stores pointers to strings. So we cannot
// just copy these from `this->indexesToWordsMap_` to `localVocabClone`, but
// we need to make sure to store the pointers to the strings of the new map
// `localVocabClone.wordsToIndexesMap_`.
//
// NOTE: An alternative algorithm would be to sort the word-index pairs in
// `wordsToIndexesMap_` by index and then fill `indexesToWordsMap_` in order.
// This would have better locality, but the sorting takes non-linear time plus
// the sorting has to handle pairs of `LocalVocabIndex` and `std::string`. So
// for very large local vocabularies (and only then is this operation
// performance-criticial at all), the simpler approach below is probably
// better.
const size_t localVocabSize = this->size();
localVocabClone.indexesToWordsMap_.resize(localVocabSize);
for (const auto& [wordInMap, index] : localVocabClone.wordsToIndexesMap_) {
AD_CONTRACT_CHECK(index.get() < localVocabSize);
localVocabClone.indexesToWordsMap_[index.get()] = std::addressof(wordInMap);
}
localVocabClone.otherWordSets_ = otherWordSets_;
localVocabClone.otherWordSets_.push_back(primaryWordSet_);
// Return the clone.
return localVocabClone;
}

// _____________________________________________________________________________
LocalVocab LocalVocab::merge(std::span<const LocalVocab*> vocabs) {
LocalVocab res;
auto inserter = std::back_inserter(res.otherWordSets_);
for (const auto* vocab : vocabs) {
std::ranges::copy(vocab->otherWordSets_, inserter);
*inserter = vocab->primaryWordSet_;
}
return res;
}

// _____________________________________________________________________________
template <typename WordT>
LocalVocabIndex LocalVocab::getIndexAndAddIfNotContainedImpl(WordT&& word) {
// The following code contains two subtle, but important optimizations:
//
// 1. The variant of `insert` used covers the case that `word` was already
// contained in the map as well as the case that it is newly inserted. This
// avoids computing the hash for `word` twice in case we see it for the first
// time (note that hashing a string is not cheap).
//
// 2. The fact that we have a member variable `nextFreeIndex_` avoids that we
// tentatively have to compute the next free ID every time this function is
// called (even when the ID is not actually needed because the word is already
// contained in the map).
//
auto [wordInMapAndIndex, isNewWord] =
wordsToIndexesMap_.insert({std::forward<WordT>(word), nextFreeIndex_});
const auto& [wordInMap, index] = *wordInMapAndIndex;
if (isNewWord) {
indexesToWordsMap_.push_back(&wordInMap);
nextFreeIndex_ = LocalVocabIndex::make(indexesToWordsMap_.size());
}
return index;
// TODO<joka921> As soon as we store `IdOrString` in the local vocab, we
// should definitely use `insert` instead of `emplace` here for some
// transparency optimizations. We currently need `emplace` because of the
// explicit conversion from `string` to `AlignedString16`.
auto [wordIterator, isNewWord] =
primaryWordSet().emplace(std::forward<WordT>(word));
// TODO<Libc++18> Use std::to_address (more idiomatic, but currently breaks
// the MacOS build.
return &(*wordIterator);
}

// _____________________________________________________________________________
Expand All @@ -77,21 +56,27 @@ LocalVocabIndex LocalVocab::getIndexAndAddIfNotContained(std::string&& word) {
// _____________________________________________________________________________
std::optional<LocalVocabIndex> LocalVocab::getIndexOrNullopt(
const std::string& word) const {
auto localVocabIndex = wordsToIndexesMap_.find(word);
if (localVocabIndex != wordsToIndexesMap_.end()) {
return localVocabIndex->second;
auto localVocabIndex = primaryWordSet().find(StringAligned16{word});
if (localVocabIndex != primaryWordSet().end()) {
// TODO<Libc++18> Use std::to_address (more idiomatic, but currently breaks
// the MacOS build.
return &(*localVocabIndex);
} else {
return std::nullopt;
}
}

// _____________________________________________________________________________
const std::string& LocalVocab::getWord(LocalVocabIndex localVocabIndex) const {
if (localVocabIndex.get() >= indexesToWordsMap_.size()) {
throw std::runtime_error(absl::StrCat(
"LocalVocab error: request for word with local vocab index ",
localVocabIndex.get(), ", but size of local vocab is only ",
indexesToWordsMap_.size(), ", please contact the developers"));
return *localVocabIndex;
}

// _____________________________________________________________________________
std::vector<std::string> LocalVocab::getAllWordsForTesting() const {
std::vector<std::string> result;
std::ranges::copy(primaryWordSet(), std::back_inserter(result));
for (const auto& previous : otherWordSets_) {
std::ranges::copy(*previous, std::back_inserter(result));
}
return *(indexesToWordsMap_.at(localVocabIndex.get()));
return result;
}
44 changes: 30 additions & 14 deletions src/engine/LocalVocab.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

#include <cstdlib>
#include <memory>
#include <span>
#include <string>
#include <vector>

#include "absl/container/node_hash_map.h"
#include "absl/container/node_hash_set.h"
#include "global/Id.h"

// A class for maintaing a local vocabulary with contiguous (local) IDs. This is
Expand All @@ -24,18 +25,17 @@ class LocalVocab {
// A map of the words in the local vocabulary to their local IDs. This is a
// node hash map because we need the addresses of the words (which are of type
// `std::string`) to remain stable over their lifetime in the hash map because
// we refer to them in `wordsToIdsMap_` below.
absl::node_hash_map<std::string, LocalVocabIndex> wordsToIndexesMap_;
// we hand out pointers to them.
using Set = absl::node_hash_set<StringAligned16>;
std::shared_ptr<Set> primaryWordSet_ = std::make_shared<Set>();

// A map of the local IDs to the words. Since the IDs are contiguous, we can
// use a `std::vector`. We store pointers to the actual words in
// `wordsToIdsMap_` to avoid storing every word twice. This saves space, but
// costs us an indirection when looking up a word by its ID.
std::vector<const std::string*> indexesToWordsMap_;
// Local vocabularies from child operations that were merged into this
// vocabulary s.t. the pointers are kept alive. They have to be `const`
// because they are possibly shared concurrently (for example via the cache).
std::vector<std::shared_ptr<const Set>> otherWordSets_;

// The next free local ID (will be incremented by one each time we add a new
// word).
LocalVocabIndex nextFreeIndex_ = LocalVocabIndex::make(0);
auto& primaryWordSet() { return *primaryWordSet_; }
const auto& primaryWordSet() const { return *primaryWordSet_; }

public:
// Create a new, empty local vocabulary.
Expand All @@ -45,7 +45,9 @@ class LocalVocab {
LocalVocab(const LocalVocab&) = delete;
LocalVocab& operator=(const LocalVocab&) = delete;

// Make a deep copy explicitly.
// Make a logical copy. The clone will have an empty primary set so it can
// safely be modified. The contents are copied as shared pointers to const, so
// the function runs in linear time in the number of word sets.
LocalVocab clone() const;

// Moving a local vocabulary is not problematic (though the typical use case
Expand All @@ -65,14 +67,28 @@ class LocalVocab {
const std::string& word) const;

// The number of words in the vocabulary.
size_t size() const { return indexesToWordsMap_.size(); }
// Note: This is not constant time, but linear in the number of word sets.
size_t size() const {
auto result = primaryWordSet().size();
for (const auto& previous : otherWordSets_) {
result += previous->size();
}
return result;
}

// Return true if and only if the local vocabulary is empty.
bool empty() const { return indexesToWordsMap_.empty(); }
bool empty() const { return size() == 0; }

// Return a const reference to the word.
const std::string& getWord(LocalVocabIndex localVocabIndex) const;

// Create a local vocab that contains and keeps alive all the words from each
// of the `vocabs`. The primary word set of the newly created vocab is empty.
static LocalVocab merge(std::span<const LocalVocab*> vocabs);

// Return all the words from all the word sets as a vector.
std::vector<std::string> getAllWordsForTesting() const;

private:
// Common implementation for the two variants of
// `getIndexAndAddIfNotContainedImpl` above.
Expand Down
3 changes: 1 addition & 2 deletions src/engine/Minus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ ResultTable Minus::computeResult() {
// If only one of the two operands has a non-empty local vocabulary, share
// with that one (otherwise, throws an exception).
return {std::move(idTable), resultSortedOn(),
ResultTable::getSharedLocalVocabFromNonEmptyOf(*leftResult,
*rightResult)};
ResultTable::getMergedLocalVocab(*leftResult, *rightResult)};
}

// _____________________________________________________________________________
Expand Down
3 changes: 1 addition & 2 deletions src/engine/MultiColumnJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ ResultTable MultiColumnJoin::computeResult() {
// If only one of the two operands has a non-empty local vocabulary, share
// with that one (otherwise, throws an exception).
return {std::move(idTable), resultSortedOn(),
ResultTable::getSharedLocalVocabFromNonEmptyOf(*leftResult,
*rightResult)};
ResultTable::getMergedLocalVocab(*leftResult, *rightResult)};
}

// _____________________________________________________________________________
Expand Down
3 changes: 1 addition & 2 deletions src/engine/OptionalJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ ResultTable OptionalJoin::computeResult() {
// If only one of the two operands has a non-empty local vocabulary, share
// with that one (otherwise, throws an exception).
return {std::move(idTable), resultSortedOn(),
ResultTable::getSharedLocalVocabFromNonEmptyOf(*leftResult,
*rightResult)};
ResultTable::getMergedLocalVocab(*leftResult, *rightResult)};
}

// _____________________________________________________________________________
Expand Down
6 changes: 3 additions & 3 deletions src/engine/ResultTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ string ResultTable::asDebugString() const {
}

// _____________________________________________________________________________
auto ResultTable::getSharedLocalVocabFromNonEmptyOf(
const ResultTable& resultTable1, const ResultTable& resultTable2)
auto ResultTable::getMergedLocalVocab(const ResultTable& resultTable1,
const ResultTable& resultTable2)
-> SharedLocalVocabWrapper {
return getSharedLocalVocabFromNonEmptyOf(
return getMergedLocalVocab(
std::array{std::cref(resultTable1), std::cref(resultTable2)});
}

Expand Down
41 changes: 8 additions & 33 deletions src/engine/ResultTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,46 +133,21 @@ class ResultTable {
return SharedLocalVocabWrapper{localVocab_};
}

// Like `getSharedLocalVocabFrom`, but takes more than one result and assumes
// that exactly one of the local vocabularies is empty and gets the shared
// local vocab from the non-empty one (if all are empty, arbitrarily share
// with the first one).
//
// TODO: Eventually, we want to be able to merge two non-empty local
// vocabularies, but that requires more work since we have to rewrite IDs then
// (from the previous separate local vocabularies to the new merged one).
static SharedLocalVocabWrapper getSharedLocalVocabFromNonEmptyOf(
// Like `getSharedLocalVocabFrom`, but takes more than one result and merges
// all the corresponding local vocabs.
static SharedLocalVocabWrapper getMergedLocalVocab(
const ResultTable& resultTable1, const ResultTable& resultTable2);

// Overload for more than two `ResultTables`
template <std::ranges::forward_range R>
requires std::convertible_to<std::ranges::range_value_t<R>,
const ResultTable&>
static SharedLocalVocabWrapper getSharedLocalVocabFromNonEmptyOf(
R&& subResults) {
AD_CONTRACT_CHECK(!std::ranges::empty(subResults));
auto hasNonEmptyVocab = [](const ResultTable& tbl) {
return !tbl.localVocab_->empty();
};
auto numNonEmptyVocabs =
std::ranges::count_if(subResults, hasNonEmptyVocab);
if (numNonEmptyVocabs > 1) {
throw std::runtime_error(
"Merging of more than one non-empty local vocabularies is currently "
"not supported, please contact the developers");
}
// The static casts in the following are needed to make this code work for
// types that are implicitly convertible to `const ResultTable&`, in
// particular `std::reference_wrapper<const ResultTable>`.
if (numNonEmptyVocabs == 0) {
return SharedLocalVocabWrapper{
static_cast<const ResultTable&>(*subResults.begin()).localVocab_};
} else {
return SharedLocalVocabWrapper{
static_cast<const ResultTable&>(
*std::ranges::find_if(subResults, hasNonEmptyVocab))
.localVocab_};
static SharedLocalVocabWrapper getMergedLocalVocab(R&& subResults) {
std::vector<const LocalVocab*> vocabs;
for (const ResultTable& table : subResults) {
vocabs.push_back(std::to_address(table.localVocab_));
}
return SharedLocalVocabWrapper{LocalVocab::merge(vocabs)};
}

// Get a (deep) copy of the local vocabulary from the given result. Use this
Expand Down
2 changes: 1 addition & 1 deletion src/engine/TransitivePath.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@
sideRes->idTable());

return {std::move(idTable), resultSortedOn(),
ResultTable::getSharedLocalVocabFromNonEmptyOf(*sideRes, *subRes)};
ResultTable::getMergedLocalVocab(*sideRes, *subRes)};

Check warning on line 259 in src/engine/TransitivePath.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/TransitivePath.cpp#L259

Added line #L259 was not covered by tests
};

if (lhs_.isBoundVariable()) {
Expand Down
5 changes: 2 additions & 3 deletions src/engine/Union.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,8 @@ ResultTable Union::computeResult() {
LOG(DEBUG) << "Union result computation done" << std::endl;
// If only one of the two operands has a non-empty local vocabulary, share
// with that one (otherwise, throws an exception).
return ResultTable{
std::move(idTable), resultSortedOn(),
ResultTable::getSharedLocalVocabFromNonEmptyOf(*subRes1, *subRes2)};
return ResultTable{std::move(idTable), resultSortedOn(),
ResultTable::getMergedLocalVocab(*subRes1, *subRes2)};
}

void Union::computeUnion(
Expand Down
9 changes: 8 additions & 1 deletion src/global/IndexTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@
// this (very intrusive) renaming doesn't interfere with too many open pull
// requests.
using VocabIndex = ad_utility::TypedIndex<uint64_t, "VocabIndex">;
using LocalVocabIndex = ad_utility::TypedIndex<uint64_t, "LocalVocabIndex">;

// A `std::string` that is aligned to 16 bytes s.t. pointers always end with 4
// bits that are zero and that are reused for payloads in the `ValueId` class.
struct alignas(16) StringAligned16 : public std::string {
using std::string::basic_string;
explicit StringAligned16(std::string s) : std::string{std::move(s)} {}
};
using LocalVocabIndex = const StringAligned16*;
using TextRecordIndex = ad_utility::TypedIndex<uint64_t, "TextRecordIndex">;
using WordVocabIndex = ad_utility::TypedIndex<uint64_t, "WordVocabIndex">;
using BlankNodeIndex = ad_utility::TypedIndex<uint64_t, "BlankNodeIndex">;
Expand Down
Loading
Loading