From 873865d3275ae890fe8a9cbb9781c8030e910359 Mon Sep 17 00:00:00 2001 From: Armando Montanez Date: Tue, 19 Apr 2022 10:36:50 -0700 Subject: [PATCH] pw_transfer: Rate limiting backpressure Uses unix socket send/receive buffer limits to cause rate-limiting backpressure to affect clients. Change-Id: I2eecda3043eca058abe08c5903a8bc89604cf5f3 Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/91940 Reviewed-by: Erik Gilling Pigweed-Auto-Submit: Armando Montanez Commit-Queue: Auto-Submit --- pw_transfer/integration_test/JavaClient.java | 23 ++++++++++++- pw_transfer/integration_test/client.cc | 34 ++++++++++++++++++-- pw_transfer/integration_test/proxy.py | 25 ++++++++++++-- pw_transfer/integration_test/server.cc | 26 +++++++++++++++ 4 files changed, 102 insertions(+), 6 deletions(-) diff --git a/pw_transfer/integration_test/JavaClient.java b/pw_transfer/integration_test/JavaClient.java index 95d9ede9e7..2ba606ceae 100644 --- a/pw_transfer/integration_test/JavaClient.java +++ b/pw_transfer/integration_test/JavaClient.java @@ -28,6 +28,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; +import java.net.SocketException; import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; @@ -44,6 +45,21 @@ public class JavaClient { private static final long RPC_HDLC_ADDRESS = 'R'; private static final String HOSTNAME = "localhost"; + // This is the maximum size of the socket send buffers. Ideally, this is set + // to the lowest allowed value to minimize buffering between the proxy and + // clients so rate limiting causes the client to block and wait for the + // integration test proxy to drain rather than allowing OS buffers to backlog + // large quantities of data. + // + // Note that the OS may chose to not strictly follow this requested buffer + // size. Still, setting this value to be as small as possible does reduce + // bufer sizes significantly enough to better reflect typical inter-device + // communication. + // + // For this to be effective, servers should also configure their sockets to a + // smaller receive buffer size. + private static final int MAX_SOCKET_SEND_BUFFER_SIZE = 1; + private HdlcRpcChannelOutput channelOutput; private Client rpcClient; private HdlcParseThread parseThread; @@ -176,7 +192,12 @@ public static void main(String[] args) { logger.atSevere().log("Failed to connect to %s:%d", HOSTNAME, port); System.exit(1); } - + try { + socket.setSendBufferSize(MAX_SOCKET_SEND_BUFFER_SIZE); + } catch (SocketException e) { + logger.atSevere().log("Invalid socket buffer size %d", MAX_SOCKET_SEND_BUFFER_SIZE); + System.exit(1); + } InputStream reader = null; OutputStream writer = null; diff --git a/pw_transfer/integration_test/client.cc b/pw_transfer/integration_test/client.cc index 139fac0e11..d697b7d4ba 100644 --- a/pw_transfer/integration_test/client.cc +++ b/pw_transfer/integration_test/client.cc @@ -21,10 +21,13 @@ // WORK IN PROGRESS, SEE b/228516801 #include "pw_transfer/client.h" +#include + #include #include #include "google/protobuf/text_format.h" +#include "pw_assert/check.h" #include "pw_log/log.h" #include "pw_rpc/integration_testing.h" #include "pw_status/status.h" @@ -36,9 +39,23 @@ #include "pw_transfer/integration_test/config.pb.h" #include "pw_transfer/transfer_thread.h" -namespace pw::transfer { +namespace pw::transfer::integration_test { namespace { +// This is the maximum size of the socket send buffers. Ideally, this is set +// to the lowest allowed value to minimize buffering between the proxy and +// clients so rate limiting causes the client to block and wait for the +// integration test proxy to drain rather than allowing OS buffers to backlog +// large quantities of data. +// +// Note that the OS may chose to not strictly follow this requested buffer size. +// Still, setting this value to be as small as possible does reduce bufer sizes +// significantly enough to better reflect typical inter-device communication. +// +// For this to be effective, servers should also configure their sockets to a +// smaller receive buffer size. +constexpr int kMaxSocketSendBufferSize = 1; + thread::Options& TransferThreadOptions() { static thread::stl::Options options; return options; @@ -90,7 +107,7 @@ pw::Status SendData(const pw::transfer::ClientConfig& config) { } } // namespace -} // namespace pw::transfer +} // namespace pw::transfer::integration_test int main(int argc, char* argv[]) { if (argc < 2) { @@ -121,7 +138,18 @@ int main(int argc, char* argv[]) { return 1; } - if (!pw::transfer::SendData(config).ok()) { + int retval = setsockopt( + pw::rpc::integration_test::GetClientSocketFd(), + SOL_SOCKET, + SO_SNDBUF, + &pw::transfer::integration_test::kMaxSocketSendBufferSize, + sizeof(pw::transfer::integration_test::kMaxSocketSendBufferSize)); + PW_CHECK_INT_EQ(retval, + 0, + "Failed to configure socket send buffer size with errno=%d", + errno); + + if (!pw::transfer::integration_test::SendData(config).ok()) { PW_LOG_INFO("Failed to transfer!"); return 1; } diff --git a/pw_transfer/integration_test/proxy.py b/pw_transfer/integration_test/proxy.py index cae0735b8f..42b6f44f31 100644 --- a/pw_transfer/integration_test/proxy.py +++ b/pw_transfer/integration_test/proxy.py @@ -24,6 +24,7 @@ import asyncio import logging import random +import socket import sys import time from typing import (Any, Awaitable, Callable, List, Optional) @@ -35,6 +36,20 @@ _LOG = logging.getLogger('pw_transfer_intergration_test_proxy') +# This is the maximum size of the socket receive buffers. Ideally, this is set +# to the lowest allowed value to minimize buffering between the proxy and +# clients so rate limiting causes the client to block and wait for the +# integration test proxy to drain rather than allowing OS buffers to backlog +# large quantities of data. +# +# Note that the OS may chose to not strictly follow this requested buffer size. +# Still, setting this value to be relatively small does reduce bufer sizes +# significantly enough to better reflect typical inter-device communication. +# +# For this to be effective, clients should also configure their sockets to a +# smaller send buffer size. +_RECEIVE_BUFFER_SIZE = 2048 + class Filter(abc.ABC): """An abstract interface for manipulating a stream of data. @@ -301,9 +316,15 @@ async def _main(server_port: int, client_port: int) -> None: config = text_format.Parse(text_config, config_pb2.ProxyConfig()) # Instantiate the TCP server. + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, + _RECEIVE_BUFFER_SIZE) + server_socket.bind(('localhost', client_port)) server = await asyncio.start_server( - lambda reader, writer: _handle_connection( - server_port, config, reader, writer), 'localhost', client_port) + lambda reader, writer: _handle_connection(server_port, config, reader, + writer), + limit=_RECEIVE_BUFFER_SIZE, + sock=server_socket) addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets) _LOG.info(f'Listening for client connection on {addrs}') diff --git a/pw_transfer/integration_test/server.cc b/pw_transfer/integration_test/server.cc index fe01b518ff..596081ebbb 100644 --- a/pw_transfer/integration_test/server.cc +++ b/pw_transfer/integration_test/server.cc @@ -21,6 +21,8 @@ // // integration_test_server 3300 <<< "resource_id: 12 file: '/tmp/gotbytes'" +#include + #include #include #include @@ -47,6 +49,20 @@ namespace { using stream::MemoryReader; using stream::MemoryWriter; +// This is the maximum size of the socket send buffers. Ideally, this is set +// to the lowest allowed value to minimize buffering between the proxy and +// clients so rate limiting causes the client to block and wait for the +// integration test proxy to drain rather than allowing OS buffers to backlog +// large quantities of data. +// +// Note that the OS may chose to not strictly follow this requested buffer size. +// Still, setting this value to be as small as possible does reduce bufer sizes +// significantly enough to better reflect typical inter-device communication. +// +// For this to be effective, servers should also configure their sockets to a +// smaller receive buffer size. +constexpr int kMaxSocketSendBufferSize = 1; + // TODO(tpudlik): This is copy-pasted from test_rpc_server.cc, break it out into // a shared library. class FileTransferHandler final : public ReadWriteHandler { @@ -106,6 +122,16 @@ void RunServer(int socket_port, ServerConfig config) { thread::DetachedThread(thread::stl::Options(), transfer_thread); + int retval = setsockopt(rpc::system_server::GetServerSocketFd(), + SOL_SOCKET, + SO_SNDBUF, + &kMaxSocketSendBufferSize, + sizeof(kMaxSocketSendBufferSize)); + PW_CHECK_INT_EQ(retval, + 0, + "Failed to configure socket send buffer size with errno=%d", + errno); + // It's fine to allocate this on the stack since this thread doesn't return // until this process is killed. FileTransferHandler transfer_handler(