Skip to content

Commit

Permalink
pw_rpc_transport: Unblock sockets when stopping
Browse files Browse the repository at this point in the history
When stopping the transport, close the sockets. Change blocking socket
operations to block in poll and use a separate pipe to unblock poll when
closing the socket.

Also, when closing sockets, disconnect the underlying sockets by using
the socket shutdown API.

Bug: 309680612
Test: Verified socket unit tests pass.
Test: Verified sample-project-default-build-windows and
 pigweed-mac-x86-bazel-test-host-clang succeed.
Test: See details in testing done comment in the code review.
Change-Id: I40b124f2abdd0243517bdba31a504a98f5f7aebf
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/181308
Presubmit-Verified: CQ Bot Account <pigweed-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Erik Gilling <konkers@google.com>
Commit-Queue: Erik Staats <estaats@google.com>
Reviewed-by: Carlos Chinchilla <cachinchilla@google.com>
  • Loading branch information
estaats-google authored and CQ Bot Account committed Nov 22, 2023
1 parent 5d6c877 commit 4f55cd8
Show file tree
Hide file tree
Showing 17 changed files with 419 additions and 149 deletions.
18 changes: 0 additions & 18 deletions pw_rpc/client_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// License for the specific language governing permissions and limitations under
// the License.

#include <sys/socket.h>

#include <algorithm>
#include <array>
#include <cstring>
Expand All @@ -30,10 +28,6 @@ namespace {

constexpr int kIterations = 3;

// This client configures a socket read timeout to allow the RPC dispatch thread
// to exit gracefully.
constexpr timeval kSocketReadTimeout = {.tv_sec = 1, .tv_usec = 0};

using namespace std::chrono_literals;
using pw::ByteSpan;
using pw::ConstByteSpan;
Expand Down Expand Up @@ -145,18 +139,6 @@ int main(int argc, char* argv[]) {
return 1;
}

// Set read timout on socket to allow
// pw::rpc::integration_test::TerminateClient() to complete.
int retval = setsockopt(pw::rpc::integration_test::GetClientSocketFd(),
SOL_SOCKET,
SO_RCVTIMEO,
&rpc_test::kSocketReadTimeout,
sizeof(rpc_test::kSocketReadTimeout));
PW_CHECK_INT_EQ(retval,
0,
"Failed to configure socket receive timeout with errno=%d",
errno);

int test_retval = RUN_ALL_TESTS();

pw::rpc::integration_test::TerminateClient();
Expand Down
19 changes: 0 additions & 19 deletions pw_rpc/fuzz/client_fuzzer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
#include "pw_rpc/internal/log_config.h" // PW_LOG_* macros must be first.
// clang-format on

#include <sys/socket.h>

#include <cstring>

#include "pw_log/log.h"
Expand All @@ -28,10 +26,6 @@
namespace pw::rpc::fuzz {
namespace {

// This client configures a socket read timeout to allow the RPC dispatch thread
// to exit gracefully.
constexpr timeval kSocketReadTimeout = {.tv_sec = 1, .tv_usec = 0};

int FuzzClient(int argc, char** argv) {
// TODO(aarongreen): Incorporate descriptions into usage message.
Vector<ArgParserVariant, 5> parsers{
Expand Down Expand Up @@ -78,19 +72,6 @@ int FuzzClient(int argc, char** argv) {
return 1;
}

// Set read timout on socket to allow
// pw::rpc::integration_test::TerminateClient() to complete.
int fd = integration_test::GetClientSocketFd();
if (setsockopt(fd,
SOL_SOCKET,
SO_RCVTIMEO,
&kSocketReadTimeout,
sizeof(kSocketReadTimeout)) != 0) {
PW_LOG_ERROR("Failed to configure socket receive timeout with errno=%d",
errno);
return 1;
}

if (num_actions == 0) {
num_actions = std::numeric_limits<size_t>::max();
}
Expand Down
7 changes: 6 additions & 1 deletion pw_rpc/integration_testing.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ unit_test::LoggingEventHandler log_test_events;

Client& client() { return context.client(); }

int GetClientSocketFd() { return context.GetSocketFd(); }
int SetClientSockOpt(int level,
int optname,
const void* optval,
unsigned int optlen) {
return context.SetSockOpt(level, optname, optval, optlen);
}

void SetEgressChannelManipulator(ChannelManipulator* new_channel_manipulator) {
context.SetEgressChannelManipulator(new_channel_manipulator);
Expand Down
14 changes: 9 additions & 5 deletions pw_rpc/public/pw_rpc/integration_test_socket_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,21 @@ class SocketClientContext {
}

// Terminates the client, joining the RPC dispatch thread.
//
// WARNING: This may block forever if the socket is configured to block
// indefinitely on reads. Configuring the client socket's `SO_RCVTIMEO` to a
// nonzero timeout will allow the dispatch thread to always return.
void Terminate() {
PW_ASSERT(rpc_dispatch_thread_handle_.has_value());
should_terminate_.test_and_set();
// Close the stream to avoid blocking forever on a socket read.
stream_.Close();
rpc_dispatch_thread_handle_->join();
}

int GetSocketFd() { return stream_.connection_fd(); }
// Configure options for the socket associated with the client.
int SetSockOpt(int level,
int optname,
const void* optval,
unsigned int optlen) {
return stream_.SetSockOpt(level, optname, optval, optlen);
}

void SetEgressChannelManipulator(
ChannelManipulator* new_channel_manipulator) {
Expand Down
12 changes: 5 additions & 7 deletions pw_rpc/public/pw_rpc/integration_testing.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@ void SetIngressChannelManipulator(ChannelManipulator* new_channel_manipulator);
// Returns the global RPC client for integration test use.
Client& client();

// The file descriptor for the socket associated with the client. This may be
// used to configure socket options.
int GetClientSocketFd();
// Configure options for the socket associated with the client.
int SetClientSockOpt(int level,
int optname,
const void* optval,
unsigned int optlen);

// Initializes logging and the global RPC client for integration testing. Starts
// a background thread that processes incoming.
Expand All @@ -98,10 +100,6 @@ Status InitializeClient(int argc,
Status InitializeClient(int port);

// Terminates the client, joining the RPC dispatch thread.
//
// WARNING: This may block forever if the socket is configured to block
// indefinitely on reads. Configuring the client socket's `SO_RCVTIMEO` to a
// nonzero timeout will allow the dispatch thread to always return.
void TerminateClient();

} // namespace pw::rpc::integration_test
8 changes: 5 additions & 3 deletions pw_rpc/system_server/public/pw_rpc_system_server/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ namespace pw::rpc::system_server {
// Sets the port to use for pw::rpc::system_server backends that use sockets.
void set_socket_port(uint16_t port);

// The file descriptor for the socket associated with the server. This may be
// used to configure socket options.
int GetServerSocketFd();
// Configure options for the socket associated with the server.
int SetServerSockOpt(int level,
int optname,
const void* optval,
unsigned int optlen);

} // namespace pw::rpc::system_server
15 changes: 14 additions & 1 deletion pw_rpc_transport/public/pw_rpc_transport/socket_rpc_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ class SocketRpcTransport : public RpcFrameSender, public thread::ThreadCore {

while (!stopped_) {
const auto read_status = ReadData();
// Break if ReadData was cancelled after the transport was stopped.
if (stopped_) {
break;
}
if (!read_status.ok()) {
internal::LogSocketReadError(read_status);
}
Expand All @@ -122,7 +126,11 @@ class SocketRpcTransport : public RpcFrameSender, public thread::ThreadCore {
}
}

void Stop() { stopped_ = true; }
void Stop() {
stopped_ = true;
socket_stream_.Close();
server_socket_.Close();
}

private:
enum class ClientServerRole { kClient, kServer };
Expand Down Expand Up @@ -156,6 +164,11 @@ class SocketRpcTransport : public RpcFrameSender, public thread::ThreadCore {
NotifyReady();

Result<stream::SocketStream> stream = server_socket_.Accept();
// If Accept was cancelled due to stopping the transport, return without
// error.
if (stopped_) {
return OkStatus();
}
if (!stream.ok()) {
internal::LogSocketAcceptError(stream.status());
return stream.status();
Expand Down
6 changes: 0 additions & 6 deletions pw_rpc_transport/rpc_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,6 @@ TEST(RpcIntegrationTest, SocketTransport) {
a.transport.Stop();
b.transport.Stop();

// Unblock socket transports by sending terminator packets.
const std::array<std::byte, 1> terminator_bytes{std::byte{0x42}};
RpcFrame terminator{.header = {}, .payload = terminator_bytes};
EXPECT_EQ(a.transport.Send(terminator), OkStatus());
EXPECT_EQ(b.transport.Send(terminator), OkStatus());

a_local_egress_thread.join();
b_local_egress_thread.join();
a_transport_thread.join();
Expand Down
16 changes: 0 additions & 16 deletions pw_rpc_transport/socket_rpc_transport_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,12 @@ class SocketSender {
}
}

// stream::SocketStream doesn't support read timeouts so we have to
// unblock socket reads by sending more data after the transport is stopped.
pw::Status Terminate() { return transport_.Send(terminator_); }

private:
SocketRpcTransport<kReadBufferSize>& transport_;
std::vector<std::byte> sent_;
std::array<std::byte, 256> data_{};
std::uniform_int_distribution<size_t> offset_dist_{0, 255};
std::uniform_int_distribution<size_t> size_dist_{1, kMaxWriteSize};
std::array<std::byte, 1> terminator_bytes_{std::byte{0x42}};
RpcFrame terminator_{.header = {}, .payload = terminator_bytes_};
};

class SocketSenderThreadCore : public SocketSender, public thread::ThreadCore {
Expand Down Expand Up @@ -182,10 +176,6 @@ TEST(SocketRpcTransportTest, SendAndReceiveFramesOverSocketConnection) {
server.Stop();
client.Stop();

// Unblock socket reads to propagate the stop signal.
EXPECT_EQ(server_sender.Terminate(), OkStatus());
EXPECT_EQ(client_sender.Terminate(), OkStatus());

server_thread.join();
client_thread.join();

Expand Down Expand Up @@ -242,7 +232,6 @@ TEST(SocketRpcTransportTest, ServerReconnects) {
// Stop the client but not the server: we're re-using the same server
// with a new client below.
client.Stop();
EXPECT_EQ(server_sender.Terminate(), OkStatus());
client_thread.join();
}

Expand All @@ -267,13 +256,11 @@ TEST(SocketRpcTransportTest, ServerReconnects) {
std::back_inserter(received));

client.Stop();
EXPECT_EQ(server_sender.Terminate(), OkStatus());
client_thread.join();

// This time stop the server as well.
SocketSender client_sender(client);
server.Stop();
EXPECT_EQ(client_sender.Terminate(), OkStatus());
server_thread.join();
}

Expand Down Expand Up @@ -322,7 +309,6 @@ TEST(SocketRpcTransportTest, ClientReconnects) {
server1_sent.end(),
std::back_inserter(sent_by_server));

EXPECT_EQ(client_sender.Terminate(), OkStatus());
server_thread.join();
server = nullptr;

Expand All @@ -345,11 +331,9 @@ TEST(SocketRpcTransportTest, ClientReconnects) {
server2_sent.end(),
std::back_inserter(sent_by_server));

EXPECT_EQ(client_sender.Terminate(), OkStatus());
server_thread.join();

client.Stop();
EXPECT_EQ(server2_sender.Terminate(), OkStatus());
client_thread.join();
server = nullptr;

Expand Down
1 change: 1 addition & 0 deletions pw_stream/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pw_cc_library(
":pw_stream",
"//pw_log",
"//pw_string",
"//pw_sync:mutex",
"//pw_sys_io",
],
)
Expand Down
5 changes: 4 additions & 1 deletion pw_stream/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ pw_source_set("pw_stream") {

pw_source_set("socket_stream") {
public_configs = [ ":public_include_path" ]
public_deps = [ ":pw_stream" ]
public_deps = [
":pw_stream",
"$dir_pw_sync:mutex",
]
deps = [
dir_pw_assert,
dir_pw_log,
Expand Down
1 change: 1 addition & 0 deletions pw_stream/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pw_add_library(pw_stream.socket_stream STATIC
public
PUBLIC_DEPS
pw_stream
pw_sync.mutex
SOURCES
socket_stream.cc
PRIVATE_DEPS
Expand Down
Loading

0 comments on commit 4f55cd8

Please sign in to comment.