-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
WIP Make remote sources a regular GR block
Remove special treatment for remote sources, make it a regular GR block. TODO: - readd remote flowgraph handling - readd adding new sources from the UI (wait for messages?)
- Loading branch information
1 parent
1bb53be
commit 841eee4
Showing
8 changed files
with
111 additions
and
272 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
#ifndef OPENDIGITIZER_REMOTESOURCE_HPP | ||
#define OPENDIGITIZER_REMOTESOURCE_HPP | ||
|
||
#include <gnuradio-4.0/Block.hpp> | ||
|
||
#include <daq_api.hpp> | ||
|
||
#include <IoSerialiserYaS.hpp> | ||
#include <MdpMessage.hpp> | ||
#include <opencmw.hpp> | ||
#include <RestClient.hpp> | ||
#include <type_traits> | ||
|
||
namespace opendigitizer { | ||
|
||
template<typename T> | ||
requires std::is_same_v<T, float> | ||
struct RemoteSource : public gr::Block<RemoteSource<T>> { | ||
gr::PortOut<float> out; | ||
std::string remote_uri; | ||
std::string signal_name; | ||
opencmw::client::RestClient m_client; | ||
|
||
struct Data { | ||
opendigitizer::acq::Acquisition data; | ||
std::size_t read = 0; | ||
}; | ||
|
||
std::deque<Data> m_data; | ||
std::mutex m_mutex; | ||
|
||
void append(opendigitizer::acq::Acquisition &&data) { | ||
std::lock_guard lock(m_mutex); | ||
m_data.push_back({ std::move(data), 0 }); | ||
} | ||
|
||
auto processBulk(gr::PublishableSpan auto &output) noexcept { | ||
std::size_t written = 0; | ||
std::lock_guard lock(m_mutex); | ||
while (written < output.size() && !m_data.empty()) { | ||
auto &d = m_data.front(); | ||
auto in = std::span<const float>(d.data.channelValue.begin(), d.data.channelValue.end()); | ||
in = in.subspan(d.read, std::min(output.size() - written, in.size() - d.read)); | ||
|
||
std::copy(in.begin(), in.end(), output.begin() + written); | ||
written += in.size(); | ||
d.read += in.size(); | ||
if (d.read == d.data.channelValue.size()) { | ||
m_data.pop_front(); | ||
} | ||
} | ||
output.publish(written); | ||
return gr::work::Status::OK; | ||
} | ||
|
||
void | ||
settingsChanged(const gr::property_map &old_settings, const gr::property_map & /*new_settings*/) { | ||
const auto oldValue = old_settings.find("remote_uri"); | ||
if (oldValue != old_settings.end()) { | ||
const auto oldUri = std::get<std::string>(oldValue->second); | ||
if (!oldUri.empty()) { | ||
fmt::print("Unsubscribing from {}\n", oldUri); | ||
opencmw::client::Command command; | ||
command.command = opencmw::mdp::Command::Unsubscribe; | ||
command.topic = opencmw::URI<>(remote_uri); | ||
command.callback = [oldUri](const opencmw::mdp::Message &) { | ||
// TODO: Add cleanup once openCMW starts calling the callback | ||
// on successful unsubscribe | ||
fmt::print("Unsubscribed from {} successfully\n", oldUri); | ||
}; | ||
} | ||
} | ||
|
||
opencmw::client::Command command; | ||
command.command = opencmw::mdp::Command::Subscribe; | ||
command.topic = opencmw::URI<>(remote_uri); | ||
fmt::print("Subscribing to {}\n", remote_uri); | ||
|
||
command.callback = [this](const opencmw::mdp::Message &rep) { | ||
if (rep.data.empty()) { | ||
return; | ||
} | ||
try { | ||
auto buf = rep.data; | ||
opendigitizer::acq::Acquisition acq; | ||
opencmw::deserialise<opencmw::YaS, opencmw::ProtocolCheck::IGNORE>(buf, acq); | ||
append(std::move(acq)); | ||
} catch (opencmw::ProtocolException &e) { | ||
fmt::print(std::cerr, "{}\n", e.what()); | ||
return; | ||
} | ||
}; | ||
m_client.request(command); | ||
} | ||
}; | ||
|
||
} // namespace opendigitizer | ||
|
||
ENABLE_REFLECTION_FOR_TEMPLATE(opendigitizer::RemoteSource, out, remote_uri, signal_name) | ||
|
||
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.