Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

C++ client: process Barrage shifts properly #5285

Merged
merged 1 commit into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ struct ImmerColumnSourceImpls {
* flags. On the other hand if this pointer is null, then the caller doesn't care about null flags
* and we don't have to do any special work to determine nullness.
*/

template<typename T>
static void FillChunk(const immer::flex_vector<T> &src_data,
const immer::flex_vector<bool> *src_null_flags,
Expand All @@ -41,59 +40,81 @@ struct ImmerColumnSourceImpls {

constexpr bool kTypeIsNumeric = deephaven::dhcore::DeephavenTraits<T>::kIsNumeric;

TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= src_data.size()));
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= typed_dest->Size()));
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(optional_dest_null_flags == nullptr ||
rows.Size() <= optional_dest_null_flags->Size()));
if (!kTypeIsNumeric) {
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(src_null_flags != nullptr));
} else {
// avoid CLion warning about unused variable.
(void)src_null_flags;
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= src_null_flags->size()));
}
auto *dest_datap = typed_dest->data();
auto *dest_nullp = optional_dest_null_flags != nullptr ? optional_dest_null_flags->data() : nullptr;

auto copy_data_inner = [&dest_datap, &dest_nullp](const T *data_begin, const T *data_end) {
for (const T *current = data_begin; current != data_end; ++current) {
auto value = *current;
*dest_datap++ = value;
if constexpr(deephaven::dhcore::DeephavenTraits<T>::kIsNumeric) {
if (dest_nullp != nullptr) {
*dest_nullp++ = value == deephaven::dhcore::DeephavenTraits<T>::kNullValue;
}
} else {
// avoid clang complaining about unused variables
(void)dest_nullp;
}
}
};
if (optional_dest_null_flags != nullptr) {
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= optional_dest_null_flags->Size()));
}
(void) src_null_flags; // avoid CLion warning about unused variable.

auto copy_nulls_inner = [&dest_nullp](const bool *null_begin, const bool *null_end) {
for (const bool *current = null_begin; current != null_end; ++current) {
*dest_nullp++ = *current;
auto *dest_datap = typed_dest->data();
// We have a nested loop here, represented by two lambdas. This code invokes
// RowSequence::ForEachInterval which takes contiguous ranges from 'rows' and feeds them
// to 'copy_data_outer'. Then 'copy_data_outer' turns that contiguous range into a
// pair of [begin, end) Immer iterators. But then, rather than store into that iterator range
// directly, those Immer iterators are passed to immer::for_each_chunk. This breaks down the
// Immer range into subranges of plain data, and invokes the copy_data_inner lambda. Then,
// 'copy_data_inner' just copies data in the normal C++ way.
auto copy_data_inner = [&dest_datap](const T *src_beginp, const T *src_endp) {
for (const T *current = src_beginp; current != src_endp; ++current) {
*dest_datap++ = *current;
}
};

auto copy_outer = [&src_data, src_null_flags, dest_nullp, &copy_data_inner,
&copy_nulls_inner](uint64_t src_begin, uint64_t src_end) {
auto copy_data_outer = [&src_data, &copy_data_inner](uint64_t src_begin, uint64_t src_end) {
auto src_beginp = src_data.begin() + src_begin;
auto src_endp = src_data.begin() + src_end;
immer::for_each_chunk(src_beginp, src_endp, copy_data_inner);
};

rows.ForEachInterval(copy_data_outer);

// If the caller has opted out of getting null flags, we are done.
if (optional_dest_null_flags == nullptr) {
return;
}

if constexpr(!deephaven::dhcore::DeephavenTraits<T>::kIsNumeric) {
if (dest_nullp != nullptr) {
auto nulls_begin = src_null_flags->begin() + src_begin;
auto nulls_end = src_null_flags->begin() + src_end;
immer::for_each_chunk(nulls_begin, nulls_end, copy_nulls_inner);
// Otherwise (if the caller wants null flags), we do a similar algorithm to copy null flags.
// The one complication is that the column source only stores null flags explicitly for
// non-numeric types. For numeric types, the column source uses the Deephaven convention
// for nullness. To handle this, we have two different forms of the operation,
// one which supports the numeric convention and one which supports the non-numeric convention.
auto *dest_nullp = optional_dest_null_flags->data();

if constexpr (kTypeIsNumeric) {
auto copy_nulls_inner = [&dest_nullp](const T *data_begin, const T *data_end) {
for (const T *current = data_begin; current != data_end; ++current) {
auto is_null = *current == deephaven::dhcore::DeephavenTraits<T>::kNullValue;
*dest_nullp++ = is_null;
}
} else {
// avoid clang complaining about unused variables.
(void)src_null_flags;
(void)dest_nullp;
(void)copy_nulls_inner;
}
};
rows.ForEachInterval(copy_outer);
};

auto copy_nulls_outer = [&src_data, src_null_flags, &copy_nulls_inner](uint64_t src_begin,
uint64_t src_end) {
auto src_beginp = src_data.begin() + src_begin;
auto src_endp = src_data.begin() + src_end;
immer::for_each_chunk(src_beginp, src_endp, copy_nulls_inner);
};
rows.ForEachInterval(copy_nulls_outer);
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the function is about to return right after the else anyway, can you consider short-circuiting this else by returning in the body of the if above, and then removing the else?

Looking at the actual file is going to be easier than trying to follow the logic in the diff, but in either case, I find that is easier for me to read code when you allow me to drop stacks of context earlier.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed on slack, it is an if constexpr so my comment doesn't apply/work.

auto copy_nulls_inner = [&dest_nullp](const bool *null_begin, const bool *null_end) {
for (const bool *current = null_begin; current != null_end; ++current) {
*dest_nullp++ = *current;
}
};

auto copy_nulls_outer = [&src_data, src_null_flags, &copy_nulls_inner](uint64_t src_begin,
uint64_t src_end) {
auto nulls_begin = src_null_flags->begin() + src_begin;
auto nulls_end = src_null_flags->begin() + src_end;
immer::for_each_chunk(nulls_begin, nulls_end, copy_nulls_inner);
};
rows.ForEachInterval(copy_nulls_outer);
}
}

template<typename T>
Expand All @@ -109,12 +130,17 @@ struct ImmerColumnSourceImpls {
constexpr bool kTypeIsNumeric = deephaven::dhcore::DeephavenTraits<T>::kIsNumeric;

auto *typed_dest = VerboseCast<chunkType_t *>(DEEPHAVEN_LOCATION_EXPR(dest_data));
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= src_data.size()));
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= typed_dest->Size()));
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(optional_dest_null_flags == nullptr ||
rows.Size() <= optional_dest_null_flags->Size()));
if (!kTypeIsNumeric) {
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(src_null_flags != nullptr));
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= src_null_flags->size()));
}
if (optional_dest_null_flags != nullptr) {
TrueOrThrow(DEEPHAVEN_LOCATION_EXPR(rows.Size() <= optional_dest_null_flags->Size()));
}
(void) src_null_flags; // avoid CLion warning about unused variable.

auto *destp = typed_dest->data();
auto *dest_nullp = optional_dest_null_flags != nullptr ? optional_dest_null_flags->data() : nullptr;

Expand Down
9 changes: 4 additions & 5 deletions cpp-client/deephaven/dhcore/src/container/row_sequence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,12 @@ RowSequenceIterator RowSequence::GetRowSequenceIterator() const {

std::ostream &operator<<(std::ostream &s, const RowSequence &o) {
s << '[';
auto iter = o.GetRowSequenceIterator();
const char *sep = "";
uint64_t item;
while (iter.TryGetNext(&item)) {
s << sep << item;
o.ForEachInterval([&](uint64_t start, uint64_t end) {
s << sep;
sep = ", ";
}
s << '[' << start << ',' << end << ')';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is an opening brace and a close parens... is that intentional?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is because is an open interval, nevermind...

});
s << ']';
return s;
}
Expand Down
47 changes: 44 additions & 3 deletions cpp-client/deephaven/dhcore/src/ticking/space_mapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

using deephaven::dhcore::container::RowSequence;
using deephaven::dhcore::container::RowSequenceBuilder;
using deephaven::dhcore::utility::MakeReservedVector;
using deephaven::dhcore::utility::separatedList;

namespace deephaven::dhcore::ticking {
Expand All @@ -34,9 +35,47 @@ uint64_t SpaceMapper::EraseRange(uint64_t begin_key, uint64_t end_key) {
}

void SpaceMapper::ApplyShift(uint64_t begin_key, uint64_t end_key, uint64_t dest_key) {
auto size = end_key - begin_key;
// Shifts do not change the size of the set. So, note the original size as a sanity check.
auto original_size = set_.cardinality();

// Note that [begin_key, end_key) is potentially a superset of the keys we have.
// We need to remove all our keys in the range [begin_key, end_key),
// and then, for each key k that we removed, add a new key (k - begin_key + dest_key).

// As we scan the keys in our set, we build this vector which contains contiguous ranges.
std::vector<std::pair<uint64_t, uint64_t>> new_ranges;
auto it = set_.begin();
if (!it.move(begin_key)) {
// begin_key is bigger than any key in our set, so the shift request has no effect.
return;
}

while (it != set_.end() && *it < end_key) {
auto offset = *it - begin_key;
auto new_key = dest_key + offset;
if (!new_ranges.empty() && new_ranges.back().second == new_key) {
// This key is contiguous with the last range, so extend it by one.
++new_ranges.back().second;
} else {
// This key is not contiguous with the last range (or there is no last range), so
// start a new range here having size 1.
new_ranges.emplace_back(new_key, new_key + 1);
}
++it;
}

set_.removeRange(begin_key, end_key);
set_.addRange(dest_key, dest_key + size);
for (const auto &range : new_ranges) {
set_.addRange(range.first, range.second);
}

// Sanity check.
auto final_size = set_.cardinality();
if (original_size != final_size) {
auto message = fmt::format("Unexpected rowkey size change: from {} to {}", original_size,
final_size);
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}
}

std::shared_ptr<RowSequence> SpaceMapper::AddKeys(const RowSequence &keys) {
Expand All @@ -59,7 +98,7 @@ std::shared_ptr<RowSequence> SpaceMapper::ConvertKeysToIndices(const RowSequence
auto convert_interval = [this, &builder](uint64_t begin_key, uint64_t end_key) {
auto beginp = set_.begin();
if (!beginp.move(begin_key)) {
auto message = fmt::format("begin key {} is not in the src map", begin_key);
auto message = fmt::format("begin key {} is too large for the src map", begin_key);
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}
auto next_rank = ZeroBasedRank(begin_key);
Expand All @@ -72,6 +111,8 @@ std::shared_ptr<RowSequence> SpaceMapper::ConvertKeysToIndices(const RowSequence
}
++currentp;
}
// It is ok to add a chunk like this because rowkeys [begin_key, end_key) are contiguous;
// therefore their corresponding index space indices are also contiguous.
auto size = end_key - begin_key;
builder.AddInterval(next_rank, next_rank + size);
};
Expand Down
Loading