From e1c2a0d22153841d2294c09b6afbb502e7bc80e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Tue, 14 Mar 2023 11:47:17 +0100 Subject: [PATCH 1/2] Added bulk mode UDP --- xtransmit/misc.cpp | 7 ++ xtransmit/udp_socket.cpp | 149 +++++++++++++++++++++++++++++++++++- xtransmit/udp_socket.hpp | 71 +++++++++++++---- xtransmit/xtransmit-app.cpp | 17 ++++ 4 files changed, 229 insertions(+), 15 deletions(-) diff --git a/xtransmit/misc.cpp b/xtransmit/misc.cpp index 62aaf2e..720fd06 100644 --- a/xtransmit/misc.cpp +++ b/xtransmit/misc.cpp @@ -13,6 +13,9 @@ namespace xtransmit { #define LOG_SC_CONN "CONN " +#ifdef __linux__ +extern bool g_udp_mode_bulk; +#endif shared_sock_t create_connection(const vector& parsed_urls, shared_sock_t& listening_sock) { @@ -55,6 +58,10 @@ shared_sock_t create_connection(const vector& parsed_urls, shared_soc if (uri.type() == UriParser::UDP) { +#ifdef __linux__ + if (g_udp_mode_bulk) + return make_shared(uri); +#endif return make_shared(uri); } diff --git a/xtransmit/udp_socket.cpp b/xtransmit/udp_socket.cpp index 5b6d7c7..a3c7d8f 100644 --- a/xtransmit/udp_socket.cpp +++ b/xtransmit/udp_socket.cpp @@ -11,7 +11,7 @@ using shared_udp = shared_ptr; #define LOG_SOCK_UDP "SOCKET::UDP " -socket::udp::udp(const UriParser &src_uri) +socket::udp_base::udp_base(const UriParser &src_uri) : m_host(src_uri.host()) , m_port(src_uri.portno()) , m_options(src_uri.parameters()) @@ -102,7 +102,7 @@ socket::udp::udp(const UriParser &src_uri) } } -socket::udp::~udp() { closesocket(m_bind_socket); } +socket::udp_base::~udp_base() { closesocket(m_bind_socket); } size_t socket::udp::read(const mutable_buffer &buffer, int timeout_ms) { @@ -188,3 +188,148 @@ int socket::udp::write(const const_buffer &buffer, int timeout_ms) return static_cast(res); } + +socket::mudp::mudp(const UriParser& u) + : udp_base(u) +{ + for (int i = 0; i < MAX_SINGLE_READ; ++i) + { + iovec_array[i][0].iov_base = bufspace[i]; + iovec_array[i][0].iov_len = SRT_LIVE_MAX_PLSIZE; + mm_array[i].msg_hdr = msghdr { + addresses[i].get(), + sizeof(sockaddr_storage), + iovec_array[i], + 1, // We use one block for one packet + nullptr, 0, // CMSG - use later for PKTINFO here + 0 // flax + }; + + // Shortcut + bufsizes[i] = &mm_array[i].msg_len; + + // Just in case, although this should be set back on return + mm_array[i].msg_len = 0; + } +} + +size_t socket::mudp::read(const mutable_buffer &buffer, int timeout_ms) +{ + while (!m_blocking_mode) + { + fd_set set; + timeval tv; + FD_ZERO(&set); + FD_SET(m_bind_socket, &set); + tv.tv_sec = 0; + tv.tv_usec = 10000; + const int select_ret = ::select((int)m_bind_socket + 1, &set, NULL, &set, &tv); + + if (select_ret != 0) // ready + break; + + if (timeout_ms >= 0) // timeout + return 0; + } + + // This condition is satisfied if: + // 1. We have initial situation when both are 0 + // = No data read yet + // 2. `nbuffers` was previously set to a nonzero value, + // and `cbuffer` after increasing reached that value + // = All previously read data have been already extracted + if (cbuffer == nbuffers) + { + // Call recvmmsg to refill the cache. + // In case of failure, report the failure. + const int res = ::recvmmsg(m_bind_socket, mm_array, MAX_SINGLE_READ, 0, 0); + + if (res == -1) + { +#define NET_ERROR errno + const int err = NET_ERROR; + if (err != EAGAIN && err != EINTR && err != ECONNREFUSED) + throw socket::exception("udp::read::recv"); + + spdlog::info("UDP reading failed: error {0}. Again.", err); + return 0; + } + + // Theoretically impossible, but JIC + if (res == 0) + { + spdlog::info("UDP recvmmsg returned 0 ???"); + return 0; + } + + /* TRACE - enable if necessary for development + for (int i = 0; i < res; ++i) + { + std::cout << "[" << (*bufsizes[i]) << "]"; + } + std::cout << std::endl; + */ + + // Reset conditions to "freshly filled" + cbuffer = 0; + nbuffers = res; + } + // If this condition was't satisfied, it means that we still + // have data from previous refilling, so simply supply a single + // buffer by copying from the cache. + + if (buffer.size() < *bufsizes[cbuffer]) + throw socket::exception("mudp::read: too small buffer for extracting"); + + // Copy the buffer to the destination and update the cache read position + size_t datasize = *bufsizes[cbuffer]; + memcpy(buffer.data(), bufspace[cbuffer], datasize); + ++cbuffer; + return datasize; +} + +int socket::mudp::write(const const_buffer &buffer, int timeout_ms) +{ + while (!m_blocking_mode) + { + fd_set set; + timeval tv; + FD_ZERO(&set); + FD_SET(m_bind_socket, &set); + tv.tv_sec = 0; + tv.tv_usec = 10000; + const int select_ret = ::select((int)m_bind_socket + 1, nullptr, &set, &set, &tv); + + if (select_ret != 0) // ready + break; + + if (timeout_ms >= 0) // timeout + return 0; + } + + const int res = ::sendto(m_bind_socket, + static_cast(buffer.data()), + (int)buffer.size(), + 0, + (sockaddr *)&m_dst_addr, + sizeof m_dst_addr); + if (res == -1) + { +#ifndef _WIN32 +#define NET_ERROR errno +#else +#define NET_ERROR WSAGetLastError() +#endif + const int err = NET_ERROR; + if (err != EAGAIN && err != EINTR && err != ECONNREFUSED) + { + spdlog::info("udp::write::sendto: error {0}.", err); + throw socket::exception("udp::write::sendto error"); + } + + spdlog::info("udp::sendto failed: error {0}. Again.", err); + return 0; + } + + return static_cast(res); +} diff --git a/xtransmit/udp_socket.hpp b/xtransmit/udp_socket.hpp index 4c47065..3fc85f0 100644 --- a/xtransmit/udp_socket.hpp +++ b/xtransmit/udp_socket.hpp @@ -9,22 +9,22 @@ // OpenSRT #include "uriparser.hpp" +#include "netinet_any.h" +#include "srt.h" namespace xtransmit { namespace socket { -class udp - : public std::enable_shared_from_this - , public isocket +class udp_base + : public isocket { - using shared_udp = std::shared_ptr; using string = std::string; public: - udp(const UriParser &src_uri); - ~udp(); + udp_base(const UriParser &src_uri); + ~udp_base(); public: void listen(); @@ -34,6 +34,25 @@ class udp SOCKET id() const final { return m_bind_socket; } +protected: + SOCKET m_bind_socket = -1; // INVALID_SOCK; + sockaddr_in m_dst_addr = {}; + + bool m_blocking_mode = false; + string m_host; + int m_port; + std::map m_options; // All other options, as provided in the URI +}; + +class udp + : public std::enable_shared_from_this + , public udp_base +{ + using shared_udp = std::shared_ptr; + +public: + using udp_base::udp_base; + public: /** * @returns The number of bytes received. @@ -43,15 +62,41 @@ class udp size_t read(const mutable_buffer &buffer, int timeout_ms = -1) final; int write(const const_buffer &buffer, int timeout_ms = -1) final; -private: - SOCKET m_bind_socket = -1; // INVALID_SOCK; - sockaddr_in m_dst_addr = {}; +}; + +#ifdef __linux__ + +const auto MAX_SINGLE_READ = 10; + +class mudp + : public std::enable_shared_from_this + , public udp_base +{ + using shared_udp = std::shared_ptr; + char bufspace[MAX_SINGLE_READ][SRT_LIVE_MAX_PLSIZE]; + ::srt::sockaddr_any addresses[MAX_SINGLE_READ]; + mmsghdr mm_array[MAX_SINGLE_READ]; + iovec iovec_array[MAX_SINGLE_READ][1]; + + unsigned int* bufsizes[MAX_SINGLE_READ]; + size_t nbuffers = 0; + size_t cbuffer = 0; + + +public: + mudp(const UriParser& u); + +public: + /** + * @returns The number of bytes received. + * + * @throws socket_exception Thrown on failure. + */ + size_t read(const mutable_buffer &buffer, int timeout_ms = -1) override final; + int write(const const_buffer &buffer, int timeout_ms = -1) override final; - bool m_blocking_mode = false; - string m_host; - int m_port; - std::map m_options; // All other options, as provided in the URI }; +#endif } // namespace socket } // namespace xtransmit diff --git a/xtransmit/xtransmit-app.cpp b/xtransmit/xtransmit-app.cpp index 4b0a473..21f8c56 100644 --- a/xtransmit/xtransmit-app.cpp +++ b/xtransmit/xtransmit-app.cpp @@ -30,6 +30,12 @@ using namespace std; atomic_bool force_break(false); +namespace xtransmit { +#ifdef __linux__ +bool g_udp_mode_bulk = false; +#endif +} + void OnINT_ForceExit(int) { cerr << "\n-------- REQUESTED INTERRUPT!\n"; @@ -177,6 +183,17 @@ int main(int argc, char** argv) }, "log level [debug, error, note, info, fatal]"); +#ifdef __linux__ + app.add_option( + "--udp-mode", + [](CLI::results_t val) { + if (val[0] == "bulk") + ::xtransmit::g_udp_mode_bulk = true; + return true; + }, + "UDP mode: simple or bulk [Linux only]"); +#endif + const string logfa_desc = create_srt_logfa_description(); app.add_option( "--logfa", From 7835f8a560eb60cb91276fc754ae0910a60dd43c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Tue, 14 Mar 2023 11:58:43 +0100 Subject: [PATCH 2/2] Fixed blocking mudp for non-linux systems --- xtransmit/udp_socket.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/xtransmit/udp_socket.cpp b/xtransmit/udp_socket.cpp index a3c7d8f..f1ae1dc 100644 --- a/xtransmit/udp_socket.cpp +++ b/xtransmit/udp_socket.cpp @@ -189,6 +189,8 @@ int socket::udp::write(const const_buffer &buffer, int timeout_ms) return static_cast(res); } +#ifdef __linux__ + socket::mudp::mudp(const UriParser& u) : udp_base(u) { @@ -333,3 +335,4 @@ int socket::mudp::write(const const_buffer &buffer, int timeout_ms) return static_cast(res); } +#endif // __linux__