From 4b52b5decf13f03ec7f0d49b543f348604af099a Mon Sep 17 00:00:00 2001 From: Armando Montanez Date: Tue, 19 Apr 2022 13:47:32 -0700 Subject: [PATCH] pw_rpc: Expose integration test socket Exposes the socket file descriptor for socket-based RPC integration testing servers/clients to enable socket configuration. Change-Id: I04d0cb8674a10436da926a07347182c7c844ac59 Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/91942 Pigweed-Auto-Submit: Armando Montanez Reviewed-by: Erik Gilling Commit-Queue: Auto-Submit --- pw_rpc/integration_testing.cc | 2 ++ .../pw_rpc/integration_test_socket_client.h | 2 ++ pw_rpc/public/pw_rpc/integration_testing.h | 4 ++++ .../public/pw_rpc_system_server/socket.h | 4 ++++ pw_stream/docs.rst | 5 +++++ pw_stream/public/pw_stream/socket_stream.h | 9 ++++++++- pw_stream/socket_stream.cc | 20 ++++++++++--------- targets/host/system_rpc_server.cc | 2 ++ 8 files changed, 38 insertions(+), 10 deletions(-) diff --git a/pw_rpc/integration_testing.cc b/pw_rpc/integration_testing.cc index 381185d3ed..2d715eae2e 100644 --- a/pw_rpc/integration_testing.cc +++ b/pw_rpc/integration_testing.cc @@ -31,6 +31,8 @@ unit_test::LoggingEventHandler log_test_events; Client& client() { return context.client(); } +int GetClientSocketFd() { return context.GetSocketFd(); } + void SetEgressChannelManipulator(ChannelManipulator* new_channel_manipulator) { context.SetEgressChannelManipulator(new_channel_manipulator); } diff --git a/pw_rpc/public/pw_rpc/integration_test_socket_client.h b/pw_rpc/public/pw_rpc/integration_test_socket_client.h index eb6d62c9fe..4d9e9164ef 100644 --- a/pw_rpc/public/pw_rpc/integration_test_socket_client.h +++ b/pw_rpc/public/pw_rpc/integration_test_socket_client.h @@ -47,6 +47,8 @@ class SocketClientContext { return OkStatus(); } + int GetSocketFd() { return stream_.connection_fd(); } + void SetEgressChannelManipulator( ChannelManipulator* new_channel_manipulator) { channel_output_with_manipulator_.set_channel_manipulator( diff --git a/pw_rpc/public/pw_rpc/integration_testing.h b/pw_rpc/public/pw_rpc/integration_testing.h index 475bf9efea..52c8a6a7a3 100644 --- a/pw_rpc/public/pw_rpc/integration_testing.h +++ b/pw_rpc/public/pw_rpc/integration_testing.h @@ -86,6 +86,10 @@ void SetIngressChannelManipulator(ChannelManipulator* new_channel_manipulator); // Returns the global RPC client for integration test use. Client& client(); +// The file descriptor for the socket associated with the client. This may be +// used to configure socket options. +int GetClientSocketFd(); + // Initializes logging and the global RPC client for integration testing. Starts // a background thread that processes incoming. Status InitializeClient(int argc, diff --git a/pw_rpc/system_server/public/pw_rpc_system_server/socket.h b/pw_rpc/system_server/public/pw_rpc_system_server/socket.h index 647633564c..dc34e93354 100644 --- a/pw_rpc/system_server/public/pw_rpc_system_server/socket.h +++ b/pw_rpc/system_server/public/pw_rpc_system_server/socket.h @@ -20,4 +20,8 @@ namespace pw::rpc::system_server { // Sets the port to use for pw::rpc::system_server backends that use sockets. void set_socket_port(uint16_t port); +// The file descriptor for the socket associated with the server. This may be +// used to configure socket options. +int GetServerSocketFd(); + } // namespace pw::rpc::system_server diff --git a/pw_stream/docs.rst b/pw_stream/docs.rst index 951e9d4563..f9d645c9f3 100644 --- a/pw_stream/docs.rst +++ b/pw_stream/docs.rst @@ -415,6 +415,11 @@ Implementations ``StdFileReader`` wraps an ``std::ifstream`` with the :cpp:class:`Reader` interface. +.. cpp:class:: SocketStream : public NonSeekableReaderWriter + + ``SocketStream`` wraps posix-style sockets with the :cpp:class:`Reader` and + :cpp:class:`Writer` interfaces. + ------------------ Why use pw_stream? ------------------ diff --git a/pw_stream/public/pw_stream/socket_stream.h b/pw_stream/public/pw_stream/socket_stream.h index 058b810535..99168d93d1 100644 --- a/pw_stream/public/pw_stream/socket_stream.h +++ b/pw_stream/public/pw_stream/socket_stream.h @@ -38,6 +38,13 @@ class SocketStream : public NonSeekableReaderWriter { // Close the socket stream and release all resources void Close(); + // Exposes the file descriptor for the active connection. This is exposed to + // allow configuration and introspection of this socket's current + // configuration using setsockopt() and getsockopt(). + // + // Returns -1 if there is no active connection. + int connection_fd() { return connection_fd_; } + private: static constexpr int kInvalidFd = -1; @@ -47,7 +54,7 @@ class SocketStream : public NonSeekableReaderWriter { uint16_t listen_port_ = 0; int socket_fd_ = kInvalidFd; - int conn_fd_ = kInvalidFd; + int connection_fd_ = kInvalidFd; struct sockaddr_in sockaddr_client_ = {}; }; diff --git a/pw_stream/socket_stream.cc b/pw_stream/socket_stream.cc index fb455dcb69..10e617e895 100644 --- a/pw_stream/socket_stream.cc +++ b/pw_stream/socket_stream.cc @@ -70,16 +70,16 @@ Status SocketStream::Serve(uint16_t port) { socklen_t len = sizeof(sockaddr_client_); - conn_fd_ = + connection_fd_ = accept(socket_fd_, reinterpret_cast(&sockaddr_client_), &len); - if (conn_fd_ < 0) { + if (connection_fd_ < 0) { return Status::Unknown(); } return OkStatus(); } Status SocketStream::SocketStream::Connect(const char* host, uint16_t port) { - conn_fd_ = socket(AF_INET, SOCK_STREAM, 0); + connection_fd_ = socket(AF_INET, SOCK_STREAM, 0); sockaddr_in addr; addr.sin_family = AF_INET; @@ -94,7 +94,9 @@ Status SocketStream::SocketStream::Connect(const char* host, uint16_t port) { return Status::InvalidArgument(); } - if (connect(conn_fd_, reinterpret_cast(&addr), sizeof(addr)) < 0) { + if (connect(connection_fd_, + reinterpret_cast(&addr), + sizeof(addr)) < 0) { PW_LOG_ERROR( "Failed to connect to %s:%d: %s", host, port, std::strerror(errno)); return Status::Unknown(); @@ -109,9 +111,9 @@ void SocketStream::Close() { socket_fd_ = kInvalidFd; } - if (conn_fd_ != kInvalidFd) { - close(conn_fd_); - conn_fd_ = kInvalidFd; + if (connection_fd_ != kInvalidFd) { + close(connection_fd_); + connection_fd_ = kInvalidFd; } } @@ -119,7 +121,7 @@ Status SocketStream::DoWrite(std::span data) { // Use MSG_NOSIGNAL to avoid getting a SIGPIPE signal when the remote // peer drops the connection. ssize_t bytes_sent = - send(conn_fd_, data.data(), data.size_bytes(), MSG_NOSIGNAL); + send(connection_fd_, data.data(), data.size_bytes(), MSG_NOSIGNAL); if (bytes_sent < 0 || static_cast(bytes_sent) != data.size()) { if (errno == EPIPE) { @@ -134,7 +136,7 @@ Status SocketStream::DoWrite(std::span data) { } StatusWithSize SocketStream::DoRead(ByteSpan dest) { - ssize_t bytes_rcvd = recv(conn_fd_, dest.data(), dest.size_bytes(), 0); + ssize_t bytes_rcvd = recv(connection_fd_, dest.data(), dest.size_bytes(), 0); if (bytes_rcvd < 0) { return StatusWithSize::Unknown(); } diff --git a/targets/host/system_rpc_server.cc b/targets/host/system_rpc_server.cc index 69da5387aa..413cceb964 100644 --- a/targets/host/system_rpc_server.cc +++ b/targets/host/system_rpc_server.cc @@ -43,6 +43,8 @@ void set_socket_port(uint16_t new_socket_port) { socket_port = new_socket_port; } +int GetServerSocketFd() { return socket_stream.connection_fd(); } + void Init() { log_basic::SetOutput([](std::string_view log) { std::fprintf(stderr, "%.*s\n", static_cast(log.size()), log.data());