Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

demo clock and source handling BlockingIO #140

Merged
merged 5 commits into from
Aug 11, 2023
Merged

Conversation

RalphSteinhagen
Copy link
Member

Implements a BlockingIO clock source that generates burst samples with an average constant sampling rate.

The meaty part for the end-user and targeted reference implementation for blocking IO is:

template<typename T, typename ClockSourceType = std::chrono::system_clock, bool basicPeriodAlgorithm = true>
class ClockSource : public node<ClockSource<T, ClockSourceType>, BlockingIO, Doc<R""(
ClockSource Documentation -- add here
)"">> {
    std::chrono::time_point<ClockSourceType> nextTimePoint = ClockSourceType::now();

public:
    OUT<T>             out;
    //
    A<std::uint64_t, "n_samples_max", Visible, Doc<"0: unlimited">>              n_samples_max = 1024;
    std::uint64_t                                                                n_samples_produced{ 0 };
    A<float, "avg. sample rate", Visible>                                        sample_rate = 1000.f;
    A<std::uint64_t, "chunk_size", Visible, Doc<"number of samples per update">> chunk_size  = 100;

    void
    settings_changed(const property_map & /*old_settings*/, const property_map & /*new_settings*/) {
        nextTimePoint = ClockSourceType::now();
    }

    work_return_status_t
    process_bulk(PublishableSpan auto &output) noexcept { // <- called in a separate thread from the ioThreadPool
        if (n_samples_max > 0 && n_samples_produced >= n_samples_max) {
            output.publish(0_UZ); // <- N.B. PublishableSpan allows to publish an arbitrary number of samples -> is explicit (assertion)
            return work_return_status_t::DONE;
        }

        std::this_thread::sleep_until(nextTimePoint); // <- blocking here
        const std::size_t writableSamples = output.size();
        if (writableSamples < chunk_size) {
            output.publish(0_UZ);
            return work_return_status_t::INSUFFICIENT_OUTPUT_ITEMS;
        }

        const std::size_t remaining_samples  = n_samples_max - n_samples_produced;
        const std::size_t limit              = std::min(writableSamples, remaining_samples);
        const std::size_t n_available        = std::min(limit, chunk_size.value);
        // [...]
        output.publish(std::min(n_available, n_samples_max.value));
        n_samples_produced += samples_to_produce;


        const auto updatePeriod = std::chrono::microseconds(static_cast<long>(1e6f * static_cast<float>(chunk_size) / sample_rate));
        nextTimePoint += updatePeriod;
        return work_return_status_t::OK;
    }
};

Added a graph-specific atomic 'progress' counter that can be listened to that is updated whenever a blocking block unblocked and processed data. This can be used, for example, by the scheduler to sleep -- if no other block can process data -- and be woken up when one of the blocking blocks produced some data.

Added node::available_[input,output]_samples(Container&) and DONE workaround

... re-checking whether there are no pending unprocessed samples for blocks implementing the BlockingIO policy. The issue is that the present 'DONE' concept assumes a global state that does not hold up for all multithreading corner cases.

To be discussed: a better solution could be to propagate the DONE state to dependent blocks (i.e. via tags) and only declare DONE until all blocks ordered from source to sink can communicate done. ... to be further evaluated.

@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 23, 2023 09:35 — with GitHub Actions Inactive
@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 23, 2023 09:35 — with GitHub Actions Inactive
@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 23, 2023 09:35 — with GitHub Actions Inactive
@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 23, 2023 09:35 — with GitHub Actions Inactive
@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 23, 2023 09:35 — with GitHub Actions Inactive
@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 23, 2023 09:35 — with GitHub Actions Inactive
@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 23, 2023 09:35 — with GitHub Actions Inactive
@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 23, 2023 09:35 — with GitHub Actions Inactive
@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 23, 2023 09:35 — with GitHub Actions Inactive
@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 23, 2023 09:35 — with GitHub Actions Inactive
@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 23, 2023 09:35 — with GitHub Actions Inactive
@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 23, 2023 09:35 — with GitHub Actions Inactive
@RalphSteinhagen
Copy link
Member Author

RalphSteinhagen commented Jul 23, 2023

Emscripten blocks on the CI ... need to further investigate.

@pr-explainer-bot
Copy link

Pull Request Review Markdown

Hey there! 👋 Here's a summary of the previous results and suggestions for the pull request:

Changes

  1. Line 1-4: Updated cmake_minimum_required version from 3.22 to 3.25.
  2. Line 99: Modified target_link_directories function to include ${FFTW_PREFIX}/install/lib64.
  3. Line 10: Set CMAKE_BUILD_WITH_INSTALL_RPATH variable to ON.
  4. Line 175-177: Added two additional parameters to the init function in the node_model class.
  5. Line 305-329: Added new member functions to the node_wrapper class.
  6. Line 427-466: Added two new member variables to the graph struct.
  7. Line 591-634: Added two new member functions to the graph struct.

Suggestions

  1. Line 99: Consider dynamically adding directories to target_link_directories using a loop.
  2. Line 175-177: Store std::shared_ptr<gr::Sequence> progress and std::shared_ptr<fair::thread_pool::BasicThreadPool> ioThreadPool as member variables in the node_model class.
  3. Line 305-329: Create a separate interface or base class for the new member functions in the node_wrapper class.
  4. Line 427-466: Pass std::shared_ptr<gr::Sequence> progress and std::shared_ptr<fair::thread_pool::BasicThreadPool> ioThreadPool as parameters to the add_node function and store them as member variables in the node_model class.
  5. Line 591-634: Create a separate interface or base class for the new member functions in the graph struct.

Bugs

No potential bugs found. Good job! 🐛

Improvements

  1. Added new member functions and includes to specific files.
  2. Consider using a more descriptive name for the WorkCounter class.
  3. Use consistent indentation throughout the code.
  4. Avoid using std::ignore without assigning a value to it.
  5. Consider using a more descriptive name for the init function.
  6. Add comments to explain the purpose of each function.
  7. Refactor the stop function for better readability.

Rating

Please rate the code from 0 to 10 based on the criteria of readability, performance, and security. Explain briefly.

That's it! Let me know if you need any further assistance. 😄

@drslebedev
Copy link
Contributor

drslebedev commented Jul 24, 2023

The idea is that basically only source node "knows" that no more data should be expected.
What if we consider a variant when only source(s) can send DONE tag. This tag(s) should be propagated through all nodes and when node receives this tag it returns DONE. One does not need workarounds for blocking nodes or potentially some other edge cases. What are the potential disadvantages of such an approach?

@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 24, 2023 19:15 — with GitHub Actions Inactive
@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 24, 2023 19:15 — with GitHub Actions Inactive
@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 24, 2023 19:15 — with GitHub Actions Inactive
@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 24, 2023 19:15 — with GitHub Actions Inactive
@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 24, 2023 19:15 — with GitHub Actions Inactive
@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 24, 2023 19:15 — with GitHub Actions Inactive
@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 24, 2023 19:15 — with GitHub Actions Inactive
@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 24, 2023 19:15 — with GitHub Actions Inactive
@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 24, 2023 19:15 — with GitHub Actions Inactive
@RalphSteinhagen RalphSteinhagen temporarily deployed to configure coverage July 24, 2023 19:15 — with GitHub Actions Inactive
constrain the argument of the perfect forwarding constructor to be less
greedy in competition to the wrapped class' copy constructor.
see: https://sonarcloud.io/project/issues?resolved=false&sinceLeakPeriod=true&types=BUG&pullRequest=140&id=fair-acc_graph-prototype&open=AYmCNbrn5GyAuvTGfMP3
@wirew0rm wirew0rm temporarily deployed to configure coverage August 11, 2023 14:07 — with GitHub Actions Inactive
@wirew0rm wirew0rm temporarily deployed to configure coverage August 11, 2023 14:07 — with GitHub Actions Inactive
@wirew0rm wirew0rm temporarily deployed to configure coverage August 11, 2023 14:07 — with GitHub Actions Inactive
@wirew0rm wirew0rm temporarily deployed to configure coverage August 11, 2023 14:07 — with GitHub Actions Inactive
@wirew0rm wirew0rm temporarily deployed to configure coverage August 11, 2023 14:07 — with GitHub Actions Inactive
@wirew0rm wirew0rm temporarily deployed to configure coverage August 11, 2023 14:07 — with GitHub Actions Inactive
@wirew0rm wirew0rm temporarily deployed to configure coverage August 11, 2023 14:07 — with GitHub Actions Inactive
@wirew0rm wirew0rm temporarily deployed to configure coverage August 11, 2023 14:07 — with GitHub Actions Inactive
@wirew0rm wirew0rm temporarily deployed to configure coverage August 11, 2023 14:07 — with GitHub Actions Inactive
@wirew0rm wirew0rm temporarily deployed to configure coverage August 11, 2023 14:07 — with GitHub Actions Inactive
@wirew0rm wirew0rm temporarily deployed to configure coverage August 11, 2023 14:07 — with GitHub Actions Inactive
@wirew0rm wirew0rm temporarily deployed to configure coverage August 11, 2023 14:07 — with GitHub Actions Inactive
@wirew0rm wirew0rm temporarily deployed to configure coverage August 11, 2023 14:07 — with GitHub Actions Inactive
@sonarcloud
Copy link

sonarcloud bot commented Aug 11, 2023

SonarCloud Quality Gate failed.    Quality Gate failed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 45 Code Smells

59.6% 59.6% Coverage
1.9% 1.9% Duplication

warning The version of Java (11.0.17) you have used to run this analysis is deprecated and we will stop accepting it soon. Please update to at least Java 17.
Read more here

idea Catch issues before they fail your Quality Gate with our IDE extension sonarlint SonarLint

@wirew0rm wirew0rm temporarily deployed to configure coverage August 11, 2023 14:55 — with GitHub Actions Inactive
@wirew0rm wirew0rm temporarily deployed to configure coverage August 11, 2023 14:55 — with GitHub Actions Inactive
@wirew0rm wirew0rm temporarily deployed to configure coverage August 11, 2023 14:55 — with GitHub Actions Inactive
@wirew0rm wirew0rm temporarily deployed to configure coverage August 11, 2023 14:55 — with GitHub Actions Inactive
@wirew0rm wirew0rm temporarily deployed to configure coverage August 11, 2023 14:55 — with GitHub Actions Inactive
@wirew0rm wirew0rm temporarily deployed to configure coverage August 11, 2023 14:55 — with GitHub Actions Inactive
@wirew0rm wirew0rm temporarily deployed to configure coverage August 11, 2023 14:55 — with GitHub Actions Inactive
@wirew0rm wirew0rm merged commit 85e02f4 into main Aug 11, 2023
15 of 19 checks passed
@wirew0rm wirew0rm deleted the demo_clock_and_source branch August 11, 2023 14:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants