Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added bulk mode UDP #75

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions xtransmit/misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ namespace xtransmit {

#define LOG_SC_CONN "CONN "

#ifdef __linux__
extern bool g_udp_mode_bulk;
#endif

shared_sock_t create_connection(const vector<UriParser>& parsed_urls, shared_sock_t& listening_sock)
{
Expand Down Expand Up @@ -55,6 +58,10 @@ shared_sock_t create_connection(const vector<UriParser>& parsed_urls, shared_soc

if (uri.type() == UriParser::UDP)
{
#ifdef __linux__
if (g_udp_mode_bulk)
return make_shared<socket::mudp>(uri);
#endif
return make_shared<socket::udp>(uri);
}

Expand Down
152 changes: 150 additions & 2 deletions xtransmit/udp_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ using shared_udp = shared_ptr<socket::udp>;

#define LOG_SOCK_UDP "SOCKET::UDP "

socket::udp::udp(const UriParser &src_uri)
socket::udp_base::udp_base(const UriParser &src_uri)
: m_host(src_uri.host())
, m_port(src_uri.portno())
, m_options(src_uri.parameters())
Expand Down Expand Up @@ -102,7 +102,7 @@ socket::udp::udp(const UriParser &src_uri)
}
}

socket::udp::~udp() { closesocket(m_bind_socket); }
socket::udp_base::~udp_base() { closesocket(m_bind_socket); }

size_t socket::udp::read(const mutable_buffer &buffer, int timeout_ms)
{
Expand Down Expand Up @@ -188,3 +188,151 @@ int socket::udp::write(const const_buffer &buffer, int timeout_ms)

return static_cast<size_t>(res);
}

#ifdef __linux__

socket::mudp::mudp(const UriParser& u)
: udp_base(u)
{
for (int i = 0; i < MAX_SINGLE_READ; ++i)
{
iovec_array[i][0].iov_base = bufspace[i];
iovec_array[i][0].iov_len = SRT_LIVE_MAX_PLSIZE;
mm_array[i].msg_hdr = msghdr {
addresses[i].get(),
sizeof(sockaddr_storage),
iovec_array[i],
1, // We use one block for one packet
nullptr, 0, // CMSG - use later for PKTINFO here
0 // flax
};

// Shortcut
bufsizes[i] = &mm_array[i].msg_len;

// Just in case, although this should be set back on return
mm_array[i].msg_len = 0;
}
}

size_t socket::mudp::read(const mutable_buffer &buffer, int timeout_ms)
{
while (!m_blocking_mode)
{
fd_set set;
timeval tv;
FD_ZERO(&set);
FD_SET(m_bind_socket, &set);
tv.tv_sec = 0;
tv.tv_usec = 10000;
const int select_ret = ::select((int)m_bind_socket + 1, &set, NULL, &set, &tv);

if (select_ret != 0) // ready
break;

if (timeout_ms >= 0) // timeout
return 0;
}
Comment on lines +220 to +235
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to block if there is a cached buffer to read.


// This condition is satisfied if:
// 1. We have initial situation when both are 0
// = No data read yet
// 2. `nbuffers` was previously set to a nonzero value,
// and `cbuffer` after increasing reached that value
// = All previously read data have been already extracted
if (cbuffer == nbuffers)
{
// Call recvmmsg to refill the cache.
// In case of failure, report the failure.
const int res = ::recvmmsg(m_bind_socket, mm_array, MAX_SINGLE_READ, 0, 0);

if (res == -1)
{
#define NET_ERROR errno
const int err = NET_ERROR;
if (err != EAGAIN && err != EINTR && err != ECONNREFUSED)
throw socket::exception("udp::read::recv");

spdlog::info("UDP reading failed: error {0}. Again.", err);
return 0;
}

// Theoretically impossible, but JIC
if (res == 0)
{
spdlog::info("UDP recvmmsg returned 0 ???");
return 0;
}

/* TRACE - enable if necessary for development
for (int i = 0; i < res; ++i)
{
std::cout << "[" << (*bufsizes[i]) << "]";
}
std::cout << std::endl;
*/
Comment on lines +267 to +273

Check notice

Code scanning / CodeQL

Commented-out code

This comment appears to contain commented-out code.

// Reset conditions to "freshly filled"
cbuffer = 0;
nbuffers = res;
}
// If this condition was't satisfied, it means that we still
// have data from previous refilling, so simply supply a single
// buffer by copying from the cache.

if (buffer.size() < *bufsizes[cbuffer])
throw socket::exception("mudp::read: too small buffer for extracting");

// Copy the buffer to the destination and update the cache read position
size_t datasize = *bufsizes[cbuffer];
memcpy(buffer.data(), bufspace[cbuffer], datasize);
++cbuffer;
return datasize;
}

int socket::mudp::write(const const_buffer &buffer, int timeout_ms)
{
while (!m_blocking_mode)
{
fd_set set;
timeval tv;
FD_ZERO(&set);
FD_SET(m_bind_socket, &set);
tv.tv_sec = 0;
tv.tv_usec = 10000;
const int select_ret = ::select((int)m_bind_socket + 1, nullptr, &set, &set, &tv);

if (select_ret != 0) // ready
break;

if (timeout_ms >= 0) // timeout
return 0;
}

const int res = ::sendto(m_bind_socket,
static_cast<const char *>(buffer.data()),
(int)buffer.size(),
0,
(sockaddr *)&m_dst_addr,
sizeof m_dst_addr);
if (res == -1)
{
#ifndef _WIN32
#define NET_ERROR errno
#else
#define NET_ERROR WSAGetLastError()
#endif
const int err = NET_ERROR;
if (err != EAGAIN && err != EINTR && err != ECONNREFUSED)
{
spdlog::info("udp::write::sendto: error {0}.", err);
throw socket::exception("udp::write::sendto error");
}

spdlog::info("udp::sendto failed: error {0}. Again.", err);
return 0;
}

return static_cast<size_t>(res);
}
#endif // __linux__
71 changes: 58 additions & 13 deletions xtransmit/udp_socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@

// OpenSRT
#include "uriparser.hpp"
#include "netinet_any.h"
#include "srt.h"

namespace xtransmit
{
namespace socket
{

class udp
: public std::enable_shared_from_this<udp>
, public isocket
class udp_base

Check warning

Code scanning / CodeQL

Non-virtual destructor in base class

A base class with a virtual function should define a virtual destructor.
: public isocket
{
using shared_udp = std::shared_ptr<udp>;
using string = std::string;

public:
udp(const UriParser &src_uri);
~udp();
udp_base(const UriParser &src_uri);
~udp_base();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
~udp_base();
virtual ~udp_base();


public:
void listen();
Expand All @@ -34,6 +34,25 @@ class udp

SOCKET id() const final { return m_bind_socket; }

protected:
SOCKET m_bind_socket = -1; // INVALID_SOCK;
sockaddr_in m_dst_addr = {};

bool m_blocking_mode = false;
string m_host;
int m_port;
std::map<string, string> m_options; // All other options, as provided in the URI
};

class udp
: public std::enable_shared_from_this<udp>
, public udp_base
{
using shared_udp = std::shared_ptr<udp>;

public:
using udp_base::udp_base;

public:
/**
* @returns The number of bytes received.
Expand All @@ -43,15 +62,41 @@ class udp
size_t read(const mutable_buffer &buffer, int timeout_ms = -1) final;
int write(const const_buffer &buffer, int timeout_ms = -1) final;

private:
SOCKET m_bind_socket = -1; // INVALID_SOCK;
sockaddr_in m_dst_addr = {};
};

#ifdef __linux__

const auto MAX_SINGLE_READ = 10;

class mudp
: public std::enable_shared_from_this<mudp>
, public udp_base
{
using shared_udp = std::shared_ptr<mudp>;
char bufspace[MAX_SINGLE_READ][SRT_LIVE_MAX_PLSIZE];
::srt::sockaddr_any addresses[MAX_SINGLE_READ];
mmsghdr mm_array[MAX_SINGLE_READ];
iovec iovec_array[MAX_SINGLE_READ][1];

unsigned int* bufsizes[MAX_SINGLE_READ];
size_t nbuffers = 0;
size_t cbuffer = 0;


public:
mudp(const UriParser& u);

public:
/**
* @returns The number of bytes received.
*
* @throws socket_exception Thrown on failure.
*/
size_t read(const mutable_buffer &buffer, int timeout_ms = -1) override final;
int write(const const_buffer &buffer, int timeout_ms = -1) override final;

bool m_blocking_mode = false;
string m_host;
int m_port;
std::map<string, string> m_options; // All other options, as provided in the URI
};
#endif

} // namespace socket
} // namespace xtransmit
17 changes: 17 additions & 0 deletions xtransmit/xtransmit-app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ using namespace std;

atomic_bool force_break(false);

namespace xtransmit {
#ifdef __linux__
bool g_udp_mode_bulk = false;
#endif
}

void OnINT_ForceExit(int)
{
cerr << "\n-------- REQUESTED INTERRUPT!\n";
Expand Down Expand Up @@ -177,6 +183,17 @@ int main(int argc, char** argv)
},
"log level [debug, error, note, info, fatal]");

#ifdef __linux__
app.add_option(
"--udp-mode",
[](CLI::results_t val) {
if (val[0] == "bulk")
::xtransmit::g_udp_mode_bulk = true;
return true;
},
"UDP mode: simple or bulk [Linux only]");
Comment on lines +190 to +194
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To detect possible typos (if returning false results in an error):

else if (val[0] != "simple")
    return false;

#endif

const string logfa_desc = create_srt_logfa_description();
app.add_option(
"--logfa",
Expand Down