Skip to content

Commit

Permalink
Change Block to handle SyncInput-AsyncOutput ports case.
Browse files Browse the repository at this point in the history
Signed-off-by: drslebedev <dr.s.lebedev@gmail.com>
  • Loading branch information
drslebedev committed Jun 21, 2024
1 parent 93800f3 commit a4b7ce1
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 233 deletions.
50 changes: 31 additions & 19 deletions core/include/gnuradio-4.0/Block.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1226,23 +1226,30 @@ class Block : public lifecycle::StateMachine<Derived>, public std::tuple<Argumen

auto computeResampling(std::size_t minSyncIn, std::size_t maxSyncIn, std::size_t minSyncOut, std::size_t maxSyncOut) {
struct ResamplingResult {
std::size_t decimatedIn;
std::size_t decimatedOut;
std::size_t resampledIn;
std::size_t resampledOut;
work::Status status = work::Status::OK;
};

if constexpr (!Resampling::kEnabled) { // no resampling
std::size_t n = std::min(maxSyncIn, maxSyncOut);
const std::size_t n = std::min(maxSyncIn, maxSyncOut);
if (n < minSyncIn) {
return ResamplingResult{.decimatedIn = 0UZ, .decimatedOut = 0UZ};
return ResamplingResult{.resampledIn = 0UZ, .resampledOut = 0UZ, .status = work::Status::INSUFFICIENT_INPUT_ITEMS};
}
return ResamplingResult{.decimatedIn = n, .decimatedOut = n};
if (n < minSyncOut) {
return ResamplingResult{.resampledIn = 0UZ, .resampledOut = 0UZ, .status = work::Status::INSUFFICIENT_OUTPUT_ITEMS};
}
return ResamplingResult{.resampledIn = n, .resampledOut = n};
}
if (denominator == 1UL && numerator == 1UL) { // no resampling
std::size_t n = std::min(maxSyncIn, maxSyncOut);
const std::size_t n = std::min(maxSyncIn, maxSyncOut);
if (n < minSyncIn) {
return ResamplingResult{.decimatedIn = 0UZ, .decimatedOut = 0UZ};
return ResamplingResult{.resampledIn = 0UZ, .resampledOut = 0UZ, .status = work::Status::INSUFFICIENT_INPUT_ITEMS};
}
if (n < minSyncOut) {
return ResamplingResult{.resampledIn = 0UZ, .resampledOut = 0UZ, .status = work::Status::INSUFFICIENT_OUTPUT_ITEMS};
}
return ResamplingResult{.decimatedIn = n, .decimatedOut = n};
return ResamplingResult{.resampledIn = n, .resampledOut = n};
}
std::size_t nResamplingChunks;
if constexpr (StrideControl::kEnabled) { // with stride, we cannot process more than one chunk
Expand All @@ -1254,10 +1261,13 @@ class Block : public lifecycle::StateMachine<Derived>, public std::tuple<Argumen
} else {
nResamplingChunks = std::min(maxSyncIn / denominator, maxSyncOut / numerator);
}
if (nResamplingChunks * denominator < minSyncIn || nResamplingChunks * numerator < minSyncOut) {
return ResamplingResult{.decimatedIn = 0UZ, .decimatedOut = 0UZ};

if (nResamplingChunks * denominator < minSyncIn) {
return ResamplingResult{.resampledIn = 0UZ, .resampledOut = 0UZ, .status = work::Status::INSUFFICIENT_INPUT_ITEMS};
} else if (nResamplingChunks * numerator < minSyncOut) {
return ResamplingResult{.resampledIn = 0UZ, .resampledOut = 0UZ, .status = work::Status::INSUFFICIENT_OUTPUT_ITEMS};
} else {
return ResamplingResult{static_cast<std::size_t>(nResamplingChunks * denominator), static_cast<std::size_t>(nResamplingChunks * numerator)};
return ResamplingResult{.resampledIn = static_cast<std::size_t>(nResamplingChunks * denominator), .resampledOut = static_cast<std::size_t>(nResamplingChunks * numerator)};
}
}

Expand Down Expand Up @@ -1478,22 +1488,24 @@ class Block : public lifecycle::StateMachine<Derived>, public std::tuple<Argumen
const auto ensureMinimalDecimation = nextTagLimit >= denominator ? nextTagLimit : static_cast<long unsigned int>(denominator); // ensure to process at least one denominator (may shift tags)
const auto availableToProcess = std::min({maxSyncIn, maxChunk, (maxSyncAvailableIn - inputSkipBefore), ensureMinimalDecimation, (nextEosTag - inputSkipBefore)});
const auto availableToPublish = std::min({maxSyncOut, maxSyncAvailableOut});
const auto [resampledIn, resampledOut] = computeResampling(std::min(minSyncIn, nextEosTag), availableToProcess, minSyncOut, availableToPublish);
const auto [resampledIn, resampledOut, resampledStatus] = computeResampling(std::min(minSyncIn, nextEosTag), availableToProcess, minSyncOut, availableToPublish);
const auto nextEosTagSkipBefore = nextEosTag - inputSkipBefore;
const bool isEosTagPresent = nextEosTag <= 0 || nextEosTagSkipBefore < minSyncIn || nextEosTagSkipBefore < denominator || numerator * (nextEosTagSkipBefore / denominator) < minSyncOut;

if (inputSkipBefore > 0) { // consume samples on sync ports that need to be consumed due to the stride
updateInputAndOutputTags(inputSkipBefore); // apply all tags in the skipped data range
const auto inputSpans = prepareStreams(inputPorts<PortType::STREAM>(&self()), inputSkipBefore); // only way to consume is via the ConsumableSpan now
consumeReaders(inputSkipBefore, inputSpans);
}
// return if there is no work to be performed // todo: add eos policy
if (isEosTagPresent || lifecycle::isShuttingDown(this->state()) || asyncEoS) {
emitErrorMessageIfAny("workInternal(): EOS tag arrived -> REQUESTED_STOP", this->changeStateTo(lifecycle::State::REQUESTED_STOP));
publishEoS();
this->setAndNotifyState(lifecycle::State::STOPPED);
return {requested_work, 0UZ, work::Status::DONE};
}
if (asyncEoS || (resampledIn == 0 && resampledOut == 0 && !hasAsyncIn && !hasAsyncOut)) {
if (nextEosTag <= 0 || lifecycle::isShuttingDown(this->state()) || asyncEoS || (nextEosTag - inputSkipBefore <= minSyncIn) || (nextEosTag - inputSkipBefore <= denominator) || (nextEosTag - inputSkipBefore) / denominator <= minSyncOut / numerator) {
emitErrorMessageIfAny("workInternal(): EOS tag arrived -> REQUESTED_STOP", this->changeStateTo(lifecycle::State::REQUESTED_STOP));
publishEoS();
this->setAndNotifyState(lifecycle::State::STOPPED);
return {requested_work, 0UZ, work::Status::DONE};
}
return {requested_work, 0UZ, resampledOut == 0 ? INSUFFICIENT_OUTPUT_ITEMS : INSUFFICIENT_INPUT_ITEMS};
return {requested_work, 0UZ, resampledStatus};
}

// for non-bulk processing, the processed span has to be limited to the first sample if it contains a tag s.t. the tag is not applied to every sample
Expand Down
Loading

0 comments on commit a4b7ce1

Please sign in to comment.