-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor workInternal #297
Conversation
56ee033
to
26da7e9
Compare
restructure the workInternal function to be easier to read and modify. This also subtly changes the meaning of the resampling and stride annotations.
@@ -955,7 +956,7 @@ template<typename T> | |||
concept TagPredicate = requires(const T &t, const Tag &tag, Tag::signed_index_type readPosition) { | |||
{ t(tag, readPosition) } -> std::convertible_to<bool>; | |||
}; | |||
inline constexpr TagPredicate auto defaultTagMatcher = [](const Tag &tag, Tag::signed_index_type readPosition) noexcept { return tag.index >= readPosition || tag.index < 0; }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the drop of tag.index < 0
? This used to indicate that a tag should be applied immediately regardless of other index constraints.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This predicate was and is used to get the next matching tag and the matching tag after that in nSamplesUntilNextTag(port, offset=0)
, where offset is the default of 0 for getting the first tag and 1 for getting the Tag not at position 1. But since offset
ends up in readPosition
of the predicate tag index only, as soon a there is a tag with index < 0, it will also be return when looking for the second tag... I did not follow up more detailed how this worked previously, re-adding it makes qa_Block spin indefinitely.
Tag propagation and setting _cachedTag is not influenced by this since there the -1
condition is checked separately in Port::getTags(untilOffset)
.
I'm not completely sure on whether this is the correct change, this is just the rationale for removing it. I tried to keep the changes outside of the workInternal (and its subfunctions) small, as i've been bitten sometimes by doing extensive changes thoughout the codebase at too many places at once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wirew0rm kudos, this was a tremendous amount of work to get it done this neatly and cleanly. 👍
The changes make the code much more readable and the now linear flow of processing w/o the earlier jumps easier to follow, to test and -- I believe -- also to maintain.
I added some small comments and suggestions for improvements. These are minor and the PR could from my point-of-view be merged as is, but I think these are quality items that would be beneficial and/or at least should be discussed/documented for a follow-up.
Main things are:
- use of structs and named fields for function returns rather than std::tuple. This would complement and mirror the already used structured bindings.
- use of
emitErrorMessage(..)
rather than throwing exceptions in core-lib code
The others things and missing features you mentioned in the PR description could be done in a later follow-up.
// assuming buffer size is approx 65k | ||
stride_test( {.n_samples = 1000000, .stride = 250000, .in_port_max = 100, .exp_in = 100, .exp_out = 100, .exp_counter = 4, .exp_total_in = 400, .exp_total_out = 400 }, thread_pool); | ||
stride_test( {.n_samples = 1000000, .stride = 249900, .in_port_max = 100, .exp_in = 100, .exp_out = 100, .exp_counter = 5, .exp_total_in = 500, .exp_total_out = 500 }, thread_pool); | ||
stride_test( {.n_samples = 1000000, .numerator = 100, .denominator = 100, .stride = 250000, .exp_in = 100, .exp_out = 100, .exp_counter = 4, .exp_total_in = 400, .exp_total_out = 400 }, thread_pool); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: as discussed, naming is hard: we need to find better descriptive names for 'numerator' and 'denominator'.
This number pair does not describe the resampling ratio of the up- or down-conversion but also the default number of 'M' input samples that are converted into 'N' output samples ... Maybe our other SYS colleagues may have some suggestions since it's also them that should recognise/understand this complex ... the more intuitive the name the less we have to document the up-/downconversion.
core/include/gnuradio-4.0/Block.hpp
Outdated
if constexpr (Out::isMultiThreadedStrategy()) { | ||
if (!out.isFullyPublished()) { | ||
fmt::print(stderr, "Block::write_to_outputs - did not publish all samples for MultiThreadedStrategy\n"); | ||
fmt::print(stderr, "Block::publishWriters - did not publish all samples for MultiThreadedStrategy\n"); | ||
std::abort(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps this should be part of the policy definition inside the buffer itself and handled in the ~ProducableSpan()
destructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know we had some discussion about this with @drslebedev I don't remember what what the outcome was there.
core/include/gnuradio-4.0/Block.hpp
Outdated
if constexpr (Out::spanReleasePolicy() == SpanReleasePolicy::Terminate) { | ||
fmt::print(stderr, "Block::write_to_outputs - did not publish samples, default SpanReleasePolicy is {}\n", magic_enum::enum_name(SpanReleasePolicy::Terminate)); | ||
fmt::print(stderr, "Block::publishWriters - samples were not published, default SpanReleasePolicy is {}\n", magic_enum::enum_name(SpanReleasePolicy::Terminate)); | ||
std::abort(); | ||
} else if constexpr (Out::spanReleasePolicy() == SpanReleasePolicy::ProcessAll) { | ||
out.publish(available_values_count); | ||
out.publish(nSamples); | ||
} else if constexpr (Out::spanReleasePolicy() == SpanReleasePolicy::ProcessNone) { | ||
out.publish(0U); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above: functionally correct but should perhaps be pushed to the ~ProducableSpan()
destructor.
core/include/gnuradio-4.0/Block.hpp
Outdated
work::Status | ||
invokeProcessBulk(auto &inputSpans, auto &writersTuple) { | ||
// cannot use std::apply because it requires tuple_cat(inputSpans, writersTuple). The latter doesn't work because writersTuple isn't copyable. | ||
return [&]<std::size_t... InIdx, std::size_t... OutIdx>(std::index_sequence<InIdx...>, std::index_sequence<OutIdx...>) { | ||
return self().processBulk(std::get<InIdx>(inputSpans)..., std::get<OutIdx>(writersTuple)...); | ||
}(std::make_index_sequence<std::tuple_size_v<std::remove_cvref_t<decltype(inputSpans)>>>(), std::make_index_sequence<std::tuple_size_v<std::remove_cvref_t<decltype(writersTuple)>>>()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not (yet) fully tested but the below could convert the present std::vector<std::span>
or std::vector<"ConsumableSpan">
to std::span<std::span..> or
std::span<"ConsumableSpan">`
work::Status invokeProcessBulk(auto &inputSpans, auto &writersTuple) {
auto convertIfNeeded = [](auto&& arg) -> decltype(auto) {
using T = std::decay_t<decltype(arg)>;
if constexpr (std::is_same_v<T, std::vector<typename T::value_type>>>) {
// convert std::vector to std::span of [Consumable,Producable]Spans
return std::span<typename T::value_type>{arg.data(), arg.size()};
} else if (...) { // similar for std::array<...>
} else { // pass-through for other types
return std::forward<decltype(arg)>(arg);
}
};
return [&]<std::size_t... InIdx, std::size_t... OutIdx>(std::index_sequence<InIdx...>, std::index_sequence<OutIdx...>) {
return self().processBulk(convertIfNeeded(std::get<InIdx>(inputSpans))..., convertIfNeeded(std::get<OutIdx>(writersTuple))...);
}(std::make_index_sequence<std::tuple_size_v<std::remove_cvref_t<decltype(inputSpans)>>>(), std::make_index_sequence<std::tuple_size_v<std::remove_cvref_t<decltype(writersTuple)>>>());
}
@drslebedev your view?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
N.B. this way we do not expose if the dynamic input ports/spans are stored as vector or array and makes IMO a more maintainable and future-proof API. Feedback is welcome.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using std::span
for an array of ports as the signature for processBulk() looks good to me.
std::span<std::span..>
is better than std::vector<std::span..>
.
However, I recommend postponing this to the next pull request, as it requires further modifications in the unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR!
Really great work to simplify the code and make it clear and readable!
Given our previous private discussions about your changes, I've added a few minor comments. Please consider implementing them at your convenience.
@@ -618,11 +523,10 @@ class Block : public lifecycle::StateMachine<Derived>, // | |||
, numerator(std::move(other.numerator)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've previously had a private discussion regarding potential new names for numerator
and denominator
, given their altered meaning following your changes. Essentially, they now align more closely with the limits on the number of samples or chunk sizes for input and output. Are we still considering this adjustment, or do you prefer to retain the original names?
core/include/gnuradio-4.0/Block.hpp
Outdated
auto writersTuple = prepareStreams(outputPorts<PortType::STREAM>(&self()), limitByFirstTag ? 1 : decimatedOut); | ||
work::Status ret; | ||
std::size_t processed = limitByFirstTag ? 1 : decimatedIn; | ||
std::size_t processedOut = limitByFirstTag ? 1 : decimatedOut; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::size_t processedOut = limitByFirstTag ? 1 : decimatedOut; | |
std::size_t nSamplesToProcessOut = limitByFirstTag ? 1 : decimatedOut; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are the only 2 vairables where I didn't manage to make them const, otherwise i'd agree with the rename, but these are also updated with the result of the processing...
What i'm missing here is the ability to define an uninitialized const variable and then have the compiler check that it will be assigned exactly once in every branch... Java does that, but in c++ the only way to do get something like that is to wrap it in a lambda and return from there, but that would make the code a lot less straightforward to read...
Addressed most of the review comments and some sonarcloud issues and compiler warnings, open points:
|
At least second and third points should be addressed in the next PRs. |
Quality Gate failedFailed conditions |
throw std::runtime_error(fmt::format("Block is not defined as `ResamplingRatio<>`, but numerator = {}, denominator = {}, they both must equal to 1.", numerator, denominator)); | ||
auto e = gr::Error(fmt::format("Block is not defined as `ResamplingRatio<>`, but numerator = {}, denominator = {}, they both must equal to 1.", numerator, denominator)); | ||
emitErrorMessage("Block::checkParametersAndThrowIfNeeded:", e); | ||
requestStop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this return DONE where it should return ERROR?
// if the block state changed to DONE, publish EOS tag on the next sample | ||
if (ret == work::Status::DONE) { | ||
this->setAndNotifyState(lifecycle::State::STOPPED); | ||
publishTag({ { gr::tag::END_OF_STREAM, true } }, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why index 1 here, and not 0 (like in other places?)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The done tag always gets the index of the next sample (which will never be published). Otherwise a block could not publish EoS without publishing more samples. But you are right, that seems to be already considered in publishTag, it does not publish to the last written but to the next written index. Probably it doesn't matter because in the next iteration it sees the state and publishes another tag on the correct position... so maybe it should just be removed s.t. all eos tag handling is done at the beginning of the function...
if (ret == work::Status::DONE) { | ||
this->setAndNotifyState(lifecycle::State::STOPPED); | ||
publishTag({ { gr::tag::END_OF_STREAM, true } }, 1); | ||
ret = work::Status::DONE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line should be necessary (ret already DONE)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there probably was a second condition in the if before... can be removed when fixing the eos behavior for process bulk that doesn't consume anything
static_assert(gr::meta::always_false<gr::traits::block::stream_input_port_types_tuple<Derived>>, "neither processBulk(...) nor processOne(...) implemented"); | ||
} | ||
forwardTags(); | ||
if (lifecycle::isShuttingDown(this->state()) || nextEosTag <= processedIn + 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here something is going wrong for my Delay block (processBulk(ConsumableSpan, PublishableSpan): The block produces/consumes 0, but both processedIn
and processedOut
are 100
, which is the data available at the input port. This check then considers the EOS tag in range and finishes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, see also this comment: https://github.com/fair-acc/graph-prototype/pull/297/files#diff-d0a6abe85a332778390dc74e1c498e4c580677fe46db76395cdf2afefc41fb1eR1507 Until now, this has not been a problem, because there was no block that relied on being able to not consume all input samples in process bulk. Probably the easiest and most generic way to fix it would be to remove the eosTag condition and only shutdown the block in the next call to process bulk... have to check if this breaks something else.
This PR aims to restructure the core work function dispatch mechanism implemented in
Block::workInternal
.Noteworthy changes:
Open issues: