From e85884825ea35e9b3b6f86ab0d12d1a1c5e3518d Mon Sep 17 00:00:00 2001 From: Austin Foxley Date: Fri, 6 Dec 2024 20:09:17 +0000 Subject: [PATCH] pw_bluetooth_proxy: Add write flow control mechanism MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Notify on L2capWriteChannel queue space available if a previous write failed because of no space. Expose this for RfcommChannel and optionally for L2capCoc and BasicL2capChannel's Bug: 380299794 Change-Id: I413fade05375dcbd4ef7ca1865ac809288f2cefd Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/251435 Pigweed-Auto-Submit: Austin Foxley Reviewed-by: David Rees Docs-Not-Needed: Austin Foxley Lint: Lint 🤖 Commit-Queue: Auto-Submit --- pw_bluetooth_proxy/basic_l2cap_channel.cc | 13 +- pw_bluetooth_proxy/gatt_notify_channel.cc | 3 +- pw_bluetooth_proxy/l2cap_channel.cc | 29 +- pw_bluetooth_proxy/l2cap_coc.cc | 16 +- pw_bluetooth_proxy/l2cap_signaling_channel.cc | 3 +- pw_bluetooth_proxy/proxy_host.cc | 21 +- pw_bluetooth_proxy/proxy_host_test.cc | 564 +++++++++--------- .../pw_bluetooth_proxy/basic_l2cap_channel.h | 21 +- .../internal/l2cap_channel.h | 22 +- .../internal/l2cap_coc_internal.h | 8 +- .../public/pw_bluetooth_proxy/l2cap_coc.h | 25 +- .../public/pw_bluetooth_proxy/proxy_host.h | 24 +- .../pw_bluetooth_proxy/rfcomm_channel.h | 16 +- pw_bluetooth_proxy/rfcomm_channel.cc | 24 +- 14 files changed, 423 insertions(+), 366 deletions(-) diff --git a/pw_bluetooth_proxy/basic_l2cap_channel.cc b/pw_bluetooth_proxy/basic_l2cap_channel.cc index a0611dd5a..a5c00be50 100644 --- a/pw_bluetooth_proxy/basic_l2cap_channel.cc +++ b/pw_bluetooth_proxy/basic_l2cap_channel.cc @@ -27,8 +27,8 @@ pw::Result BasicL2capChannel::Create( uint16_t connection_handle, uint16_t local_cid, uint16_t remote_cid, - pw::Function payload)>&& - payload_from_controller_fn) { + Function payload)>&& payload_from_controller_fn, + Function&& queue_space_available_fn) { if (!AreValidParameters(/*connection_handle=*/connection_handle, /*local_cid=*/local_cid, /*remote_cid=*/remote_cid)) { @@ -40,7 +40,8 @@ pw::Result BasicL2capChannel::Create( /*connection_handle=*/connection_handle, /*local_cid=*/local_cid, /*remote_cid=*/remote_cid, - /*payload_from_controller_fn=*/std::move(payload_from_controller_fn)); + /*payload_from_controller_fn=*/std::move(payload_from_controller_fn), + /*queue_space_available_fn=*/std::move(queue_space_available_fn)); } pw::Status BasicL2capChannel::Write(pw::span payload) { @@ -74,14 +75,16 @@ BasicL2capChannel::BasicL2capChannel( uint16_t connection_handle, uint16_t local_cid, uint16_t remote_cid, - pw::Function payload)>&& payload_from_controller_fn) + Function payload)>&& payload_from_controller_fn, + Function&& queue_space_available_fn) : L2capChannel(/*l2cap_channel_manager=*/l2cap_channel_manager, /*connection_handle=*/connection_handle, /*transport=*/AclTransportType::kLe, /*local_cid=*/local_cid, /*remote_cid=*/remote_cid, /*payload_from_controller_fn=*/ - std::move(payload_from_controller_fn)) {} + std::move(payload_from_controller_fn), + std::move(queue_space_available_fn)) {} bool BasicL2capChannel::HandlePduFromController(pw::span bframe) { Result bframe_view = diff --git a/pw_bluetooth_proxy/gatt_notify_channel.cc b/pw_bluetooth_proxy/gatt_notify_channel.cc index 865d2507a..944b3f10f 100644 --- a/pw_bluetooth_proxy/gatt_notify_channel.cc +++ b/pw_bluetooth_proxy/gatt_notify_channel.cc @@ -91,7 +91,8 @@ GattNotifyChannel::GattNotifyChannel(L2capChannelManager& l2cap_channel_manager, /*transport=*/AclTransportType::kLe, /*local_cid=*/kAttributeProtocolCID, /*remote_cid=*/kAttributeProtocolCID, - /*payload_from_controller_fn=*/nullptr), + /*payload_from_controller_fn=*/nullptr, + /*queue_space_available_fn=*/nullptr), attribute_handle_(attribute_handle) {} } // namespace pw::bluetooth::proxy diff --git a/pw_bluetooth_proxy/l2cap_channel.cc b/pw_bluetooth_proxy/l2cap_channel.cc index 5d82b968e..ca8207062 100644 --- a/pw_bluetooth_proxy/l2cap_channel.cc +++ b/pw_bluetooth_proxy/l2cap_channel.cc @@ -33,6 +33,7 @@ L2capChannel::L2capChannel(L2capChannel&& other) transport_(other.transport()), local_cid_(other.local_cid()), remote_cid_(other.remote_cid()), + queue_space_available_fn_(std::move(other.queue_space_available_fn_)), payload_from_controller_fn_( std::move(other.payload_from_controller_fn_)) { // All L2capChannels share a static mutex, so only one lock needs to be @@ -41,6 +42,7 @@ L2capChannel::L2capChannel(L2capChannel&& other) // static, elide this operator or acquire a lock on both channels' mutexes. std::lock_guard lock(global_send_queue_mutex_); send_queue_ = std::move(other.send_queue_); + notify_on_dequeue_ = other.notify_on_dequeue_; l2cap_channel_manager_.ReleaseChannel(other); l2cap_channel_manager_.RegisterChannel(*this); } @@ -55,12 +57,14 @@ L2capChannel& L2capChannel::operator=(L2capChannel&& other) { local_cid_ = other.local_cid(); remote_cid_ = other.remote_cid(); payload_from_controller_fn_ = std::move(other.payload_from_controller_fn_); + queue_space_available_fn_ = std::move(other.queue_space_available_fn_); // All L2capWriteChannels share a static mutex, so only one lock needs to be // acquired here. // TODO: https://pwbug.dev/369849508 - Once mutex is no longer static, // elide this operator or acquire a lock on both channels' mutexes. std::lock_guard lock(global_send_queue_mutex_); send_queue_ = std::move(other.send_queue_); + notify_on_dequeue_ = other.notify_on_dequeue_; l2cap_channel_manager_.ReleaseChannel(other); l2cap_channel_manager_.RegisterChannel(*this); return *this; @@ -77,6 +81,7 @@ Status L2capChannel::QueuePacket(H4PacketWithH4&& packet) { std::lock_guard lock(global_send_queue_mutex_); if (send_queue_.full()) { status = Status::Unavailable(); + notify_on_dequeue_ = true; } else { send_queue_.push(std::move(packet)); status = OkStatus(); @@ -87,12 +92,22 @@ Status L2capChannel::QueuePacket(H4PacketWithH4&& packet) { } std::optional L2capChannel::DequeuePacket() { - std::lock_guard lock(global_send_queue_mutex_); - if (send_queue_.empty()) { - return std::nullopt; + std::optional packet; + bool should_notify = false; + { + std::lock_guard lock(global_send_queue_mutex_); + if (!send_queue_.empty()) { + packet.emplace(std::move(send_queue_.front())); + send_queue_.pop(); + should_notify = notify_on_dequeue_; + notify_on_dequeue_ = false; + } } - H4PacketWithH4 packet = std::move(send_queue_.front()); - send_queue_.pop(); + + if (queue_space_available_fn_ && should_notify) { + queue_space_available_fn_(); + } + return packet; } @@ -108,12 +123,14 @@ L2capChannel::L2capChannel( AclTransportType transport, uint16_t local_cid, uint16_t remote_cid, - pw::Function payload)>&& payload_from_controller_fn) + Function payload)>&& payload_from_controller_fn, + Function&& queue_space_available_fn) : l2cap_channel_manager_(l2cap_channel_manager), connection_handle_(connection_handle), transport_(transport), local_cid_(local_cid), remote_cid_(remote_cid), + queue_space_available_fn_(std::move(queue_space_available_fn)), payload_from_controller_fn_(std::move(payload_from_controller_fn)) { l2cap_channel_manager_.RegisterChannel(*this); } diff --git a/pw_bluetooth_proxy/l2cap_coc.cc b/pw_bluetooth_proxy/l2cap_coc.cc index 178b0127b..1ccbcaad0 100644 --- a/pw_bluetooth_proxy/l2cap_coc.cc +++ b/pw_bluetooth_proxy/l2cap_coc.cc @@ -102,8 +102,9 @@ pw::Result L2capCoc::Create( uint16_t connection_handle, CocConfig rx_config, CocConfig tx_config, - pw::Function payload)>&& payload_from_controller_fn, - pw::Function&& event_fn) { + Function payload)>&& payload_from_controller_fn, + Function&& event_fn, + Function&& queue_space_available_fn) { if (!AreValidParameters(/*connection_handle=*/connection_handle, /*local_cid=*/rx_config.cid, /*remote_cid=*/tx_config.cid)) { @@ -125,7 +126,8 @@ pw::Result L2capCoc::Create( /*rx_config=*/rx_config, /*tx_config=*/tx_config, /*payload_from_controller_fn=*/std::move(payload_from_controller_fn), - /*event_fn=*/std::move(event_fn)); + /*event_fn=*/std::move(event_fn), + /*queue_space_available_fn=*/std::move(queue_space_available_fn)); } bool L2capCoc::HandlePduFromController(pw::span kframe) { @@ -232,14 +234,16 @@ L2capCoc::L2capCoc( uint16_t connection_handle, CocConfig rx_config, CocConfig tx_config, - pw::Function payload)>&& payload_from_controller_fn, - pw::Function&& event_fn) + Function payload)>&& payload_from_controller_fn, + Function&& event_fn, + Function&& queue_space_available_fn) : L2capChannel(l2cap_channel_manager, connection_handle, AclTransportType::kLe, rx_config.cid, tx_config.cid, - std::move(payload_from_controller_fn)), + std::move(payload_from_controller_fn), + std::move(queue_space_available_fn)), state_(CocState::kRunning), rx_mtu_(rx_config.mtu), rx_mps_(rx_config.mps), diff --git a/pw_bluetooth_proxy/l2cap_signaling_channel.cc b/pw_bluetooth_proxy/l2cap_signaling_channel.cc index 58340c33b..45765aec6 100644 --- a/pw_bluetooth_proxy/l2cap_signaling_channel.cc +++ b/pw_bluetooth_proxy/l2cap_signaling_channel.cc @@ -36,7 +36,8 @@ L2capSignalingChannel::L2capSignalingChannel( /*connection_handle=*/connection_handle, /*local_cid=*/fixed_cid, /*remote_cid=*/fixed_cid, - /*payload_from_controller_fn=*/nullptr), + /*payload_from_controller_fn=*/nullptr, + /*queue_space_available_fn=*/nullptr), l2cap_channel_manager_(l2cap_channel_manager) {} L2capSignalingChannel& L2capSignalingChannel::operator=( diff --git a/pw_bluetooth_proxy/proxy_host.cc b/pw_bluetooth_proxy/proxy_host.cc index 847836fb9..f6670a46e 100644 --- a/pw_bluetooth_proxy/proxy_host.cc +++ b/pw_bluetooth_proxy/proxy_host.cc @@ -397,8 +397,9 @@ pw::Result ProxyHost::AcquireL2capCoc( uint16_t connection_handle, L2capCoc::CocConfig rx_config, L2capCoc::CocConfig tx_config, - pw::Function payload)>&& receive_fn, - pw::Function&& event_fn, + Function payload)>&& receive_fn, + Function&& event_fn, + Function&& queue_space_available_fn, uint16_t rx_additional_credits) { Status status = acl_data_channel_.CreateAclConnection(connection_handle, AclTransportType::kLe); @@ -423,7 +424,8 @@ pw::Result ProxyHost::AcquireL2capCoc( rx_config, tx_config, std::move(receive_fn), - std::move(event_fn)); + std::move(event_fn), + std::move(queue_space_available_fn)); } pw::Result ProxyHost::AcquireBasicL2capChannel( @@ -431,8 +433,8 @@ pw::Result ProxyHost::AcquireBasicL2capChannel( uint16_t local_cid, uint16_t remote_cid, AclTransportType transport, - pw::Function payload)>&& - payload_from_controller_fn) { + Function payload)>&& payload_from_controller_fn, + Function&& queue_space_available_fn) { Status status = acl_data_channel_.CreateAclConnection(connection_handle, transport); if (status.IsResourceExhausted()) { @@ -444,7 +446,8 @@ pw::Result ProxyHost::AcquireBasicL2capChannel( /*connection_handle=*/connection_handle, /*local_cid=*/local_cid, /*remote_cid=*/remote_cid, - /*payload_from_controller_fn=*/std::move(payload_from_controller_fn)); + /*payload_from_controller_fn=*/std::move(payload_from_controller_fn), + /*queue_space_available_fn=*/std::move(queue_space_available_fn)); } pw::Status ProxyHost::SendGattNotify(uint16_t connection_handle, @@ -470,7 +473,8 @@ pw::Result ProxyHost::AcquireRfcommChannel( RfcommChannel::Config rx_config, RfcommChannel::Config tx_config, uint8_t channel_number, - pw::Function payload)>&& receive_fn) { + Function payload)>&& receive_fn, + Function&& queue_space_available_fn) { Status status = acl_data_channel_.CreateAclConnection( connection_handle, AclTransportType::kBrEdr); if (status != OkStatus() && status != Status::AlreadyExists()) { @@ -481,7 +485,8 @@ pw::Result ProxyHost::AcquireRfcommChannel( rx_config, tx_config, channel_number, - std::move(receive_fn)); + std::move(receive_fn), + std::move(queue_space_available_fn)); } bool ProxyHost::HasSendLeAclCapability() const { diff --git a/pw_bluetooth_proxy/proxy_host_test.cc b/pw_bluetooth_proxy/proxy_host_test.cc index 35b13c4b9..761aa42e3 100644 --- a/pw_bluetooth_proxy/proxy_host_test.cc +++ b/pw_bluetooth_proxy/proxy_host_test.cc @@ -47,6 +47,83 @@ static constexpr size_t kMaxProxyActiveConnections = 10; // ########## Util functions +struct AclFrameWithStorage { + std::vector storage; + emboss::AclDataFrameWriter writer; + + static constexpr size_t kH4HeaderSize = 1; + pw::span h4_span() { return storage; } + pw::span hci_span() { + return pw::span(storage).subspan(kH4HeaderSize); + } +}; + +// Allocate storage and populate an ACL packet header with the given length. +Result SetupAcl(uint16_t handle, uint16_t l2cap_length) { + AclFrameWithStorage frame; + + frame.storage.resize(l2cap_length + emboss::AclDataFrame::MinSizeInBytes() + + AclFrameWithStorage::kH4HeaderSize); + std::fill(frame.storage.begin(), frame.storage.end(), 0); + PW_TRY_ASSIGN(frame.writer, + MakeEmbossWriter(frame.hci_span())); + frame.writer.header().handle().Write(handle); + frame.writer.data_total_length().Write(l2cap_length); + EXPECT_EQ(l2cap_length, + frame.writer.payload().BackingStorage().SizeInBytes()); + return frame; +} + +struct CFrameWithStorage { + AclFrameWithStorage acl; + emboss::CFrameWriter writer; +}; + +Result SetupCFrame(uint16_t handle, + uint16_t channel_id, + uint16_t cframe_len) { + CFrameWithStorage frame; + PW_TRY_ASSIGN( + frame.acl, + SetupAcl(handle, + cframe_len + emboss::BasicL2capHeader::IntrinsicSizeInBytes())); + + PW_TRY_ASSIGN(frame.writer, + MakeEmbossWriter( + frame.acl.writer.payload().BackingStorage().data(), + frame.acl.writer.payload().SizeInBytes())); + frame.writer.pdu_length().Write(cframe_len); + frame.writer.channel_id().Write(channel_id); + EXPECT_TRUE(frame.writer.Ok()); + EXPECT_EQ(frame.writer.payload().SizeInBytes(), cframe_len); + return frame; +} + +struct BFrameWithStorage { + AclFrameWithStorage acl; + emboss::BFrameWriter writer; +}; + +Result SetupBFrame(uint16_t handle, + uint16_t channel_id, + uint16_t bframe_len) { + BFrameWithStorage frame; + PW_TRY_ASSIGN( + frame.acl, + SetupAcl(handle, + bframe_len + emboss::BasicL2capHeader::IntrinsicSizeInBytes())); + + PW_TRY_ASSIGN(frame.writer, + MakeEmbossWriter( + frame.acl.writer.payload().BackingStorage().data(), + frame.acl.writer.payload().SizeInBytes())); + frame.writer.pdu_length().Write(bframe_len); + frame.writer.channel_id().Write(channel_id); + EXPECT_TRUE(frame.writer.Ok()); + EXPECT_EQ(frame.writer.payload().SizeInBytes(), bframe_len); + return frame; +} + // Populate passed H4 command buffer and return Emboss view on it. template Result CreateAndPopulateToControllerView(H4PacketWithH4& h4_packet, @@ -256,6 +333,7 @@ L2capCoc BuildCoc(ProxyHost& proxy, CocParameters params) { .credits = params.tx_credits}, std::move(params.receive_fn), std::move(params.event_fn), + /*queue_space_available_fn=*/nullptr, params.rx_additional_credits); return std::move(channel.value()); } @@ -269,13 +347,18 @@ struct RfcommParameters { uint8_t rfcomm_channel = 3; }; -RfcommChannel BuildRfcomm(ProxyHost& proxy, RfcommParameters params) { +RfcommChannel BuildRfcomm( + ProxyHost& proxy, + RfcommParameters params = {}, + pw::Function payload)>&& receive_fn = nullptr, + pw::Function&& queue_space_available_fn = nullptr) { pw::Result channel = proxy.AcquireRfcommChannel(params.handle, params.rx_config, params.tx_config, params.rfcomm_channel, - nullptr); + std::move(receive_fn), + std::move(queue_space_available_fn)); PW_TEST_EXPECT_OK(channel); return std::move((channel.value())); } @@ -4355,9 +4438,61 @@ TEST(AcluSignalingChannelTest, InvalidPacketForwarded) { // ########## RfcommWriteTest -constexpr uint8_t kBFrameOverAclMinSize = - emboss::AclDataFrameHeader::IntrinsicSizeInBytes() + - emboss::BFrame::MinSizeInBytes(); +// Construct and send an RFCOMM frame from controller->host. +Status SendRfcommFromController(ProxyHost& proxy, + RfcommParameters params, + uint8_t fcs, + std::optional credits, + pw::span payload) { + constexpr size_t kMaxShortLength = 0x7f; + const size_t credits_field_size = credits.has_value() ? 1 : 0; + const bool uses_extended_length = payload.size() > kMaxShortLength; + const size_t length_extended_size = uses_extended_length ? 1 : 0; + const size_t frame_size = emboss::RfcommFrame::MinSizeInBytes() + + length_extended_size + credits_field_size + + payload.size(); + + PW_TRY_ASSIGN(BFrameWithStorage bframe, + SetupBFrame(params.handle, params.rx_config.cid, frame_size)); + + auto rfcomm = emboss::MakeRfcommFrameView( + bframe.writer.payload().BackingStorage().data(), + bframe.writer.payload().SizeInBytes()); + rfcomm.extended_address().Write(true); + rfcomm.command_response_direction().Write( + emboss::RfcommCommandResponseAndDirection::COMMAND_FROM_INITIATOR); + rfcomm.channel().Write(params.rfcomm_channel); + + if (!uses_extended_length) { + rfcomm.length_extended_flag().Write(emboss::RfcommLengthExtended::NORMAL); + rfcomm.length().Write(payload.size()); + } else { + rfcomm.length_extended_flag().Write(emboss::RfcommLengthExtended::EXTENDED); + rfcomm.length_extended().Write(payload.size()); + } + + if (credits.has_value()) { + rfcomm.control().Write( + emboss::RfcommFrameType:: + UNNUMBERED_INFORMATION_WITH_HEADER_CHECK_AND_POLL_FINAL); + rfcomm.credits().Write(*credits); + } else { + rfcomm.control().Write( + emboss::RfcommFrameType::UNNUMBERED_INFORMATION_WITH_HEADER_CHECK); + } + + EXPECT_EQ(rfcomm.information().SizeInBytes(), payload.size()); + std::memcpy(rfcomm.information().BackingStorage().data(), + payload.data(), + payload.size()); + rfcomm.fcs().Write(fcs); + auto hci_span = bframe.acl.hci_span(); + H4PacketWithHci packet{emboss::H4PacketType::ACL_DATA, hci_span}; + + proxy.HandleH4HciFromController(std::move(packet)); + + return OkStatus(); +} TEST(RfcommWriteTest, BasicWrite) { struct { @@ -4450,24 +4585,13 @@ TEST(RfcommWriteTest, BasicWrite) { // Allow proxy to reserve 1 credit. PW_TEST_EXPECT_OK(SendReadBufferResponseFromController(proxy, 1)); - constexpr uint8_t kMaxCredits = 10; - constexpr uint16_t kMaxFrameSize = 900; - constexpr uint8_t kRfcommChannel = 0x03; - - PW_TEST_ASSERT_OK_AND_ASSIGN( - auto channel, - proxy.AcquireRfcommChannel( - capture.handle, - /*rx_config=*/ - RfcommChannel::Config{.cid = capture.channel_id, - .max_information_length = kMaxFrameSize, - .credits = kMaxCredits}, - /*tx_config*/ - RfcommChannel::Config{.cid = capture.channel_id, - .max_information_length = kMaxFrameSize, - .credits = kMaxCredits}, - kRfcommChannel, - nullptr)); + RfcommParameters params = {.handle = capture.handle, + .tx_config = { + .cid = capture.channel_id, + .max_information_length = 900, + .credits = 10, + }}; + RfcommChannel channel = BuildRfcomm(proxy, params); EXPECT_EQ(channel.Write(capture.payload), PW_STATUS_OK); EXPECT_EQ(capture.sends_called, 1); } @@ -4573,26 +4697,81 @@ TEST(RfcommWriteTest, ExtendedWrite) { // Allow proxy to reserve 1 credit. PW_TEST_EXPECT_OK(SendReadBufferResponseFromController(proxy, 1)); - constexpr uint8_t kMaxCredits = 10; - constexpr uint16_t kMaxFrameSize = 900; - constexpr uint8_t kRfcommChannel = 0x03; + RfcommParameters params = {.handle = capture.handle, + .tx_config = { + .cid = capture.channel_id, + .max_information_length = 900, + .credits = 10, + }}; + RfcommChannel channel = BuildRfcomm(proxy, params); + EXPECT_EQ(channel.Write(capture.payload), PW_STATUS_OK); + EXPECT_EQ(capture.sends_called, 1); +} + +TEST(RfcommWriteTest, WriteFlowControl) { + struct { + int sends_called = 0; + int queue_unblocked = 0; + // RFCOMM information payload + std::array payload = {0xAB, 0xCD, 0xEF}; + } capture; - PW_TEST_ASSERT_OK_AND_ASSIGN( - auto channel, - proxy.AcquireRfcommChannel( - capture.handle, - /*rx_config=*/ - RfcommChannel::Config{.cid = capture.channel_id, - .max_information_length = kMaxFrameSize, - .credits = kMaxCredits}, - /*tx_config*/ - RfcommChannel::Config{.cid = capture.channel_id, - .max_information_length = kMaxFrameSize, - .credits = kMaxCredits}, - kRfcommChannel, - nullptr)); + pw::Function&& send_to_host_fn( + [](H4PacketWithHci&&) {}); + pw::Function&& send_to_controller_fn( + [&capture](H4PacketWithH4&&) { ++capture.sends_called; }); + pw::Function queue_space_available_fn( + [&capture]() { ++capture.queue_unblocked; }); + + ProxyHost proxy = ProxyHost(std::move(send_to_host_fn), + std::move(send_to_controller_fn), + /*le_acl_credits_to_reserve=*/0, + /*br_edr_acl_credits_to_reserve=*/10); + // Start with plenty of ACL credits to test RFCOMM logic. + PW_TEST_EXPECT_OK(SendReadBufferResponseFromController(proxy, 10)); + + RfcommParameters params = {.tx_config = { + .cid = 123, + .max_information_length = 900, + .credits = 0, + }}; + RfcommChannel channel = BuildRfcomm( + proxy, + params, + /*receive_fn=*/nullptr, + /*queue_space_available_fn=*/std::move(queue_space_available_fn)); + + // Writes while queue has space will return Ok. No RFCOMM credits yet though + // so no sends complete. EXPECT_EQ(channel.Write(capture.payload), PW_STATUS_OK); + EXPECT_EQ(capture.sends_called, 0); + EXPECT_EQ(capture.queue_unblocked, 0); + + // Provide a credit + constexpr uint8_t kExpectedFcs = 0xE6; + PW_TEST_EXPECT_OK(SendRfcommFromController( + proxy, params, kExpectedFcs, /*credits=*/1, /*payload=*/{})); + EXPECT_EQ(capture.queue_unblocked, 0); EXPECT_EQ(capture.sends_called, 1); + + // Now fill up queue + uint16_t queued = 0; + while (true) { + if (const auto status = channel.Write(capture.payload); + status == Status::Unavailable()) { + break; + } + ++queued; + } + + // Unblock queue with ACL and RFCOMM credits + PW_TEST_EXPECT_OK(SendNumberOfCompletedPackets( + proxy, FlatMap({{{params.handle, queued}}}))); + PW_TEST_EXPECT_OK(SendRfcommFromController( + proxy, params, kExpectedFcs, /*credits=*/queued, /*payload=*/{})); + + EXPECT_EQ(capture.sends_called, queued + 1); + EXPECT_EQ(capture.queue_unblocked, 1); } // ########## RfcommReadTest @@ -4611,71 +4790,24 @@ TEST(RfcommReadTest, BasicRead) { } capture; constexpr uint8_t kExpectedFcs = 0xFA; - constexpr uint16_t kHandle = 123; - constexpr uint16_t kCid = 234; - constexpr uint8_t kMaxCredits = 10; - constexpr uint16_t kMaxFrameSize = 900; - constexpr uint8_t kRfcommChannel = 0x03; - const size_t frame_size = emboss::RfcommFrame::MinSizeInBytes() + - /*payload*/ capture.expected_payload.size(); - const size_t acl_data_length = - emboss::BasicL2capHeader::IntrinsicSizeInBytes() + frame_size; - - PW_TEST_ASSERT_OK_AND_ASSIGN( - auto channel, - proxy.AcquireRfcommChannel( - kHandle, - /*rx_config=*/ - RfcommChannel::Config{.cid = kCid, - .max_information_length = kMaxFrameSize, - .credits = kMaxCredits}, - /*tx_config*/ - RfcommChannel::Config{.cid = kCid, - .max_information_length = kMaxFrameSize, - .credits = kMaxCredits}, - kRfcommChannel, - [&capture](pw::span payload) { - ++capture.rx_called; - EXPECT_TRUE(std::equal(payload.begin(), - payload.end(), - capture.expected_payload.begin(), - capture.expected_payload.end())); - })); - - std::array hci_arr{}; - H4PacketWithHci h4_packet{emboss::H4PacketType::ACL_DATA, hci_arr}; - - PW_TEST_ASSERT_OK_AND_ASSIGN( - auto acl, MakeEmbossWriter(hci_arr)); - acl.header().handle().Write(kHandle); - acl.data_total_length().Write(acl_data_length); - - auto bframe = emboss::MakeBFrameView(acl.payload().BackingStorage().data(), - acl.data_total_length().Read()); - bframe.pdu_length().Write(frame_size); - bframe.channel_id().Write(kCid); - - auto rfcomm = emboss::MakeRfcommFrameView( - bframe.payload().BackingStorage().data(), bframe.payload().SizeInBytes()); - rfcomm.extended_address().Write(true); - rfcomm.command_response_direction().Write( - emboss::RfcommCommandResponseAndDirection::COMMAND_FROM_INITIATOR); - rfcomm.channel().Write(kRfcommChannel); - - rfcomm.control().Write( - emboss::RfcommFrameType::UNNUMBERED_INFORMATION_WITH_HEADER_CHECK); - - rfcomm.length_extended_flag().Write(emboss::RfcommLengthExtended::NORMAL); - rfcomm.length().Write(capture.expected_payload.size()); - - std::memcpy(rfcomm.information().BackingStorage().data(), - capture.expected_payload.data(), - capture.expected_payload.size()); - rfcomm.fcs().Write(kExpectedFcs); - - // Send ACL data packet destined for the channel we registered. - proxy.HandleH4HciFromController(std::move(h4_packet)); + RfcommParameters params = {}; + RfcommChannel channel = + BuildRfcomm(proxy, + params, + /*receive_fn=*/[&capture](pw::span payload) { + ++capture.rx_called; + EXPECT_TRUE(std::equal(payload.begin(), + payload.end(), + capture.expected_payload.begin(), + capture.expected_payload.end())); + }); + + PW_TEST_EXPECT_OK(SendRfcommFromController(proxy, + params, + kExpectedFcs, + /*credits=*/std::nullopt, + capture.expected_payload)); EXPECT_EQ(capture.rx_called, 1); } @@ -4694,72 +4826,23 @@ TEST(RfcommReadTest, ExtendedRead) { } capture; constexpr uint8_t kExpectedFcs = 0xFA; - constexpr uint16_t kHandle = 123; - constexpr uint16_t kCid = 234; - constexpr uint8_t kMaxCredits = 10; - constexpr uint16_t kMaxFrameSize = 900; - constexpr uint8_t kRfcommChannel = 0x03; - const size_t frame_size = emboss::RfcommFrame::MinSizeInBytes() + - /*length_extended*/ 1 + - /*payload*/ capture.expected_payload.size(); - const size_t acl_data_length = - emboss::BasicL2capHeader::IntrinsicSizeInBytes() + frame_size; - - PW_TEST_ASSERT_OK_AND_ASSIGN( - auto channel, - proxy.AcquireRfcommChannel( - kHandle, - /*rx_config=*/ - RfcommChannel::Config{.cid = kCid, - .max_information_length = kMaxFrameSize, - .credits = kMaxCredits}, - /*tx_config*/ - RfcommChannel::Config{.cid = kCid, - .max_information_length = kMaxFrameSize, - .credits = kMaxCredits}, - kRfcommChannel, - [&capture](pw::span payload) { - ++capture.rx_called; - EXPECT_TRUE(std::equal(payload.begin(), - payload.end(), - capture.expected_payload.begin(), - capture.expected_payload.end())); - })); - - std::array hci_arr{}; - H4PacketWithHci h4_packet{emboss::H4PacketType::ACL_DATA, hci_arr}; - - PW_TEST_ASSERT_OK_AND_ASSIGN( - auto acl, MakeEmbossWriter(hci_arr)); - acl.header().handle().Write(kHandle); - acl.data_total_length().Write(acl_data_length); - - auto bframe = emboss::MakeBFrameView(acl.payload().BackingStorage().data(), - acl.data_total_length().Read()); - bframe.pdu_length().Write(frame_size); - bframe.channel_id().Write(kCid); - - auto rfcomm = emboss::MakeRfcommFrameView( - bframe.payload().BackingStorage().data(), bframe.payload().SizeInBytes()); - rfcomm.extended_address().Write(true); - rfcomm.command_response_direction().Write( - emboss::RfcommCommandResponseAndDirection::COMMAND_FROM_INITIATOR); - rfcomm.channel().Write(kRfcommChannel); - - rfcomm.control().Write( - emboss::RfcommFrameType::UNNUMBERED_INFORMATION_WITH_HEADER_CHECK); - - rfcomm.length_extended_flag().Write(emboss::RfcommLengthExtended::EXTENDED); - rfcomm.length_extended().Write(capture.expected_payload.size()); - - std::memcpy(rfcomm.information().BackingStorage().data(), - capture.expected_payload.data(), - capture.expected_payload.size()); - rfcomm.fcs().Write(kExpectedFcs); - - // Send ACL data packet destined for the channel we registered. - proxy.HandleH4HciFromController(std::move(h4_packet)); + RfcommParameters params = {}; + RfcommChannel channel = + BuildRfcomm(proxy, + params, /*receive_fn=*/ + [&capture](pw::span payload) { + ++capture.rx_called; + EXPECT_TRUE(std::equal(payload.begin(), + payload.end(), + capture.expected_payload.begin(), + capture.expected_payload.end())); + }); + PW_TEST_EXPECT_OK(SendRfcommFromController(proxy, + params, + kExpectedFcs, + /*credits=*/std::nullopt, + capture.expected_payload)); EXPECT_EQ(capture.rx_called, 1); } @@ -4781,65 +4864,20 @@ TEST(RfcommReadTest, InvalidReads) { constexpr uint8_t kExpectedFcs = 0xFA; constexpr uint8_t kInvalidFcs = 0xFF; - constexpr uint16_t kHandle = 123; - constexpr uint16_t kCid = 234; - constexpr uint8_t kMaxCredits = 10; - constexpr uint16_t kMaxFrameSize = 900; - constexpr uint8_t kRfcommChannel = 0x03; - const size_t frame_size = emboss::RfcommFrame::MinSizeInBytes() + - /*payload*/ 0; - const size_t acl_data_length = - emboss::BasicL2capHeader::IntrinsicSizeInBytes() + frame_size; - PW_TEST_ASSERT_OK_AND_ASSIGN( - auto channel, - proxy.AcquireRfcommChannel( - kHandle, - /*rx_config=*/ - RfcommChannel::Config{.cid = kCid, - .max_information_length = kMaxFrameSize, - .credits = kMaxCredits}, - /*tx_config*/ - RfcommChannel::Config{.cid = kCid, - .max_information_length = kMaxFrameSize, - .credits = kMaxCredits}, - kRfcommChannel, - [&capture](pw::span) { ++capture.rx_called; })); - - std::array hci_arr{}; + RfcommParameters params = {}; + RfcommChannel channel = + BuildRfcomm(proxy, + params, /*receive_fn=*/ + [&capture](pw::span) { ++capture.rx_called; }); // Construct valid packet but put invalid checksum on the end. Test that we // don't get it sent on to us. - { - H4PacketWithHci h4_packet{emboss::H4PacketType::ACL_DATA, hci_arr}; - PW_TEST_ASSERT_OK_AND_ASSIGN( - auto acl, MakeEmbossWriter(hci_arr)); - acl.header().handle().Write(kHandle); - acl.data_total_length().Write(acl_data_length); - - auto bframe = emboss::MakeBFrameView(acl.payload().BackingStorage().data(), - acl.data_total_length().Read()); - bframe.pdu_length().Write(frame_size); - bframe.channel_id().Write(kCid); - - auto rfcomm = - emboss::MakeRfcommFrameView(bframe.payload().BackingStorage().data(), - bframe.payload().SizeInBytes()); - rfcomm.extended_address().Write(true); - rfcomm.command_response_direction().Write( - emboss::RfcommCommandResponseAndDirection::COMMAND_FROM_INITIATOR); - rfcomm.channel().Write(kRfcommChannel); - - rfcomm.control().Write( - emboss::RfcommFrameType::UNNUMBERED_INFORMATION_WITH_HEADER_CHECK); - - rfcomm.length_extended_flag().Write(emboss::RfcommLengthExtended::NORMAL); - rfcomm.length().Write(0); - rfcomm.fcs().Write(kInvalidFcs); - - proxy.HandleH4HciFromController(std::move(h4_packet)); - } - + PW_TEST_EXPECT_OK(SendRfcommFromController(proxy, + params, + kInvalidFcs, + /*credits=*/std::nullopt, + /*payload=*/{})); EXPECT_EQ(capture.rx_called, 0); EXPECT_EQ(capture.host_called, 1); @@ -4847,25 +4885,19 @@ TEST(RfcommReadTest, InvalidReads) { // size. Ensure it doesn't end up being sent to our channel, but does get // forwarded to host. { - H4PacketWithHci h4_packet{emboss::H4PacketType::ACL_DATA, hci_arr}; - PW_TEST_ASSERT_OK_AND_ASSIGN( - auto acl, MakeEmbossWriter(hci_arr)); - acl.header().handle().Write(kHandle); - acl.data_total_length().Write(acl_data_length); - - auto bframe = emboss::MakeBFrameView(acl.payload().BackingStorage().data(), - acl.data_total_length().Read()); - bframe.pdu_length().Write(frame_size); - bframe.channel_id().Write(kCid); - - auto rfcomm = - emboss::MakeRfcommFrameView(bframe.payload().BackingStorage().data(), - bframe.payload().SizeInBytes()); + BFrameWithStorage bframe, + SetupBFrame(params.handle, + params.rx_config.cid, + emboss::RfcommFrame::MinSizeInBytes())); + + auto rfcomm = emboss::MakeRfcommFrameView( + bframe.writer.payload().BackingStorage().data(), + bframe.writer.payload().SizeInBytes()); rfcomm.extended_address().Write(true); rfcomm.command_response_direction().Write( emboss::RfcommCommandResponseAndDirection::COMMAND_FROM_INITIATOR); - rfcomm.channel().Write(kRfcommChannel); + rfcomm.channel().Write(params.rfcomm_channel); rfcomm.control().Write( emboss::RfcommFrameType::UNNUMBERED_INFORMATION_WITH_HEADER_CHECK); @@ -4875,9 +4907,11 @@ TEST(RfcommReadTest, InvalidReads) { rfcomm.length().Write(1); // Can't Write FCS as emboss will assert because of invalid length. Place // manually. - hci_arr[hci_arr.size() - 1] = kExpectedFcs; + pw::span hci_span = bframe.acl.hci_span(); + hci_span[hci_span.size() - 1] = kExpectedFcs; - proxy.HandleH4HciFromController(std::move(h4_packet)); + H4PacketWithHci packet{emboss::H4PacketType::ACL_DATA, hci_span}; + proxy.HandleH4HciFromController(std::move(packet)); } EXPECT_EQ(capture.rx_called, 0); @@ -4947,54 +4981,6 @@ TEST(ProxyHostConnectionEventTest, LeConnectionCompletePassthroughOk) { EXPECT_EQ(host_called, 1U); } -struct AclFrameWithStorage { - std::vector storage; - emboss::AclDataFrameWriter writer; - - pw::span h4_span() { return storage; } - pw::span hci_span() { return pw::span(storage).subspan(1); } -}; - -Result SetupAcl(uint16_t handle, uint16_t l2cap_length) { - AclFrameWithStorage frame; - frame.storage.resize(l2cap_length + emboss::AclDataFrame::MinSizeInBytes() + - 1); - std::fill(frame.storage.begin(), frame.storage.end(), 0); - PW_TRY_ASSIGN(frame.writer, - MakeEmbossWriter( - pw::span(frame.storage).subspan(1))); - frame.writer.header().handle().Write(handle); - frame.writer.data_total_length().Write(l2cap_length); - EXPECT_EQ(l2cap_length, - frame.writer.payload().BackingStorage().SizeInBytes()); - return frame; -} - -struct CFrameWithStorage { - AclFrameWithStorage acl; - emboss::CFrameWriter writer; -}; - -Result SetupCFrame(uint16_t handle, - uint16_t channel_id, - uint16_t cframe_len) { - CFrameWithStorage frame; - PW_TRY_ASSIGN( - frame.acl, - SetupAcl(handle, - cframe_len + emboss::BasicL2capHeader::IntrinsicSizeInBytes())); - - PW_TRY_ASSIGN(frame.writer, - MakeEmbossWriter( - frame.acl.writer.payload().BackingStorage().data(), - frame.acl.writer.payload().SizeInBytes())); - frame.writer.pdu_length().Write(cframe_len); - frame.writer.channel_id().Write(channel_id); - EXPECT_TRUE(frame.writer.Ok()); - EXPECT_EQ(frame.writer.payload().SizeInBytes(), cframe_len); - return frame; -} - Status SendConnectionReq(ProxyHost& proxy, uint16_t handle, uint16_t source_cid, diff --git a/pw_bluetooth_proxy/public/pw_bluetooth_proxy/basic_l2cap_channel.h b/pw_bluetooth_proxy/public/pw_bluetooth_proxy/basic_l2cap_channel.h index c8c1e0e92..b9438a8b2 100644 --- a/pw_bluetooth_proxy/public/pw_bluetooth_proxy/basic_l2cap_channel.h +++ b/pw_bluetooth_proxy/public/pw_bluetooth_proxy/basic_l2cap_channel.h @@ -27,8 +27,8 @@ class BasicL2capChannel : public L2capChannel { uint16_t connection_handle, uint16_t local_cid, uint16_t remote_cid, - pw::Function payload)>&& - payload_from_controller_fn); + Function payload)>&& payload_from_controller_fn, + Function&& queue_space_available_fn); BasicL2capChannel(const BasicL2capChannel& other) = delete; BasicL2capChannel& operator=(const BasicL2capChannel& other) = delete; @@ -46,18 +46,21 @@ class BasicL2capChannel : public L2capChannel { /// .. pw-status-codes:: /// OK: If packet was successfully queued for send. /// UNAVAILABLE: If channel could not acquire the resources to queue - /// the send at this time (transient error). + /// the send at this time (transient error). If a + /// `queue_space_available_fn` has been provided it will + /// be called when there is queue space available again. /// INVALID_ARGUMENT: If payload is too large. /// @endrst pw::Status Write(pw::span payload); protected: - explicit BasicL2capChannel(L2capChannelManager& l2cap_channel_manager, - uint16_t connection_handle, - uint16_t local_cid, - uint16_t remote_cid, - pw::Function payload)>&& - payload_from_controller_fn); + explicit BasicL2capChannel( + L2capChannelManager& l2cap_channel_manager, + uint16_t connection_handle, + uint16_t local_cid, + uint16_t remote_cid, + Function payload)>&& payload_from_controller_fn, + Function&& queue_space_available_fn); protected: bool HandlePduFromController(pw::span bframe) override; diff --git a/pw_bluetooth_proxy/public/pw_bluetooth_proxy/internal/l2cap_channel.h b/pw_bluetooth_proxy/public/pw_bluetooth_proxy/internal/l2cap_channel.h index 5a09551d5..ef49087d6 100644 --- a/pw_bluetooth_proxy/public/pw_bluetooth_proxy/internal/l2cap_channel.h +++ b/pw_bluetooth_proxy/public/pw_bluetooth_proxy/internal/l2cap_channel.h @@ -99,13 +99,14 @@ class L2capChannel : public IntrusiveForwardList::Item { AclTransportType transport() const { return transport_; } protected: - explicit L2capChannel(L2capChannelManager& l2cap_channel_manager, - uint16_t connection_handle, - AclTransportType transport, - uint16_t local_cid, - uint16_t remote_cid, - pw::Function payload)>&& - payload_from_controller_fn); + explicit L2capChannel( + L2capChannelManager& l2cap_channel_manager, + uint16_t connection_handle, + AclTransportType transport, + uint16_t local_cid, + uint16_t remote_cid, + Function payload)>&& payload_from_controller_fn, + Function&& queue_space_available_fn); // Returns whether or not ACL connection handle & L2CAP channel identifiers // are valid parameters for a packet. @@ -187,6 +188,13 @@ class L2capChannel : public IntrusiveForwardList::Item { InlineQueue send_queue_ PW_GUARDED_BY(global_send_queue_mutex_); + // Callback to notify writers after queue space opens up. + Function queue_space_available_fn_; + + // True if the last queue attempt didn't have space. Will be cleared on + // successful dequeue. + bool notify_on_dequeue_ PW_GUARDED_BY(global_send_queue_mutex_) = false; + //------- // Rx: //------- diff --git a/pw_bluetooth_proxy/public/pw_bluetooth_proxy/internal/l2cap_coc_internal.h b/pw_bluetooth_proxy/public/pw_bluetooth_proxy/internal/l2cap_coc_internal.h index c1cb3aa6d..106a09985 100644 --- a/pw_bluetooth_proxy/public/pw_bluetooth_proxy/internal/l2cap_coc_internal.h +++ b/pw_bluetooth_proxy/public/pw_bluetooth_proxy/internal/l2cap_coc_internal.h @@ -26,14 +26,16 @@ class L2capCocInternal final : public L2capCoc { uint16_t connection_handle, CocConfig rx_config, CocConfig tx_config, - pw::Function payload)>&& receive_fn, - pw::Function&& event_fn) { + Function payload)>&& receive_fn, + Function&& event_fn, + Function&& queue_space_available_fn) { return L2capCoc::Create(l2cap_channel_manager, connection_handle, rx_config, tx_config, std::move(receive_fn), - std::move(event_fn)); + std::move(event_fn), + std::move(queue_space_available_fn)); } // Increment L2CAP credits. This should be called by signaling channels in diff --git a/pw_bluetooth_proxy/public/pw_bluetooth_proxy/l2cap_coc.h b/pw_bluetooth_proxy/public/pw_bluetooth_proxy/l2cap_coc.h index 782a170dd..8b04497cb 100644 --- a/pw_bluetooth_proxy/public/pw_bluetooth_proxy/l2cap_coc.h +++ b/pw_bluetooth_proxy/public/pw_bluetooth_proxy/l2cap_coc.h @@ -99,7 +99,9 @@ class L2capCoc : public L2capChannel { /// .. pw-status-codes:: /// OK: If packet was successfully queued for send. /// UNAVAILABLE: If channel could not acquire the resources to queue - /// the send at this time (transient error). + /// the send at this time (transient error). If a + /// `queue_space_available_fn` has been provided it will + /// be called when there is queue space available again. /// INVALID_ARGUMENT: If payload is too large. /// FAILED_PRECONDITION: If channel is `kStopped`. /// @endrst @@ -111,9 +113,9 @@ class L2capCoc : public L2capChannel { uint16_t connection_handle, CocConfig rx_config, CocConfig tx_config, - pw::Function payload)>&& - payload_from_controller_fn, - pw::Function&& event_fn); + Function payload)>&& payload_from_controller_fn, + Function&& event_fn, + Function&& queue_space_available_fn); // `SendPayloadFromControllerToClient` with the information payload contained // in `kframe`. As packet desegmentation is not supported, segmented SDUs are @@ -133,13 +135,14 @@ class L2capCoc : public L2capChannel { kStopped, }; - explicit L2capCoc(L2capChannelManager& l2cap_channel_manager, - uint16_t connection_handle, - CocConfig rx_config, - CocConfig tx_config, - pw::Function payload)>&& - payload_from_controller_fn, - pw::Function&& event_fn); + explicit L2capCoc( + L2capChannelManager& l2cap_channel_manager, + uint16_t connection_handle, + CocConfig rx_config, + CocConfig tx_config, + Function payload)>&& payload_from_controller_fn, + Function&& event_fn, + Function&& queue_space_available_fn); // Stop channel & notify client. void OnFragmentedPduReceived() override; diff --git a/pw_bluetooth_proxy/public/pw_bluetooth_proxy/proxy_host.h b/pw_bluetooth_proxy/public/pw_bluetooth_proxy/proxy_host.h index 75343ce01..f902b789a 100644 --- a/pw_bluetooth_proxy/public/pw_bluetooth_proxy/proxy_host.h +++ b/pw_bluetooth_proxy/public/pw_bluetooth_proxy/proxy_host.h @@ -132,6 +132,10 @@ class ProxyHost { /// dispense remote peer additional credits /// for this channel. /// + /// @param[in] queue_space_available_fn + /// Callback to be invoked after resources become + /// available after an UNAVAILABLE Write. + /// /// @returns @rst /// /// .. pw-status-codes:: @@ -143,8 +147,9 @@ class ProxyHost { uint16_t connection_handle, L2capCoc::CocConfig rx_config, L2capCoc::CocConfig tx_config, - pw::Function payload)>&& receive_fn, - pw::Function&& event_fn, + Function payload)>&& receive_fn, + Function&& event_fn, + Function&& queue_space_available_fn = nullptr, uint16_t rx_additional_credits = 0); /// Returns an L2CAP channel operating in basic mode that supports writing to @@ -164,6 +169,10 @@ class ProxyHost { /// @param[in] payload_from_controller_fn Read callback to be invoked on Rx /// SDUs. /// + /// @param[in] queue_space_available_fn Callback to be invoked after + /// resources become available after an + /// UNAVAILABLE Write. + /// /// @returns @rst /// /// .. pw-status-codes:: @@ -176,8 +185,8 @@ class ProxyHost { uint16_t local_cid, uint16_t remote_cid, AclTransportType transport, - pw::Function payload)>&& - payload_from_controller_fn); + Function payload)>&& payload_from_controller_fn, + Function&& queue_space_available_fn = nullptr); /// Send a GATT Notify to the indicated connection. /// @@ -216,6 +225,10 @@ class ProxyHost { /// /// @param[in] receive_fn Read callback to be invoked on Rx frames. /// + /// @param[in] queue_space_available_fn + /// Callback to be invoked after resources become + /// available after an UNAVAILABLE Write. + /// /// @returns @rst /// /// .. pw-status-codes:: @@ -227,7 +240,8 @@ class ProxyHost { RfcommChannel::Config rx_config, RfcommChannel::Config tx_config, uint8_t channel_number, - pw::Function payload)>&& receive_fn); + Function payload)>&& receive_fn, + Function&& queue_space_available_fn); /// Indicates whether the proxy has the capability of sending LE ACL packets. /// Note that this indicates intention, so it can be true even if the proxy diff --git a/pw_bluetooth_proxy/public/pw_bluetooth_proxy/rfcomm_channel.h b/pw_bluetooth_proxy/public/pw_bluetooth_proxy/rfcomm_channel.h index 9baa5aed6..db6932614 100644 --- a/pw_bluetooth_proxy/public/pw_bluetooth_proxy/rfcomm_channel.h +++ b/pw_bluetooth_proxy/public/pw_bluetooth_proxy/rfcomm_channel.h @@ -71,6 +71,10 @@ class RfcommChannel final : public L2capChannel { /// /// @param[in] receive_fn Read callback to be invoked on Rx frames. /// + /// @param[in] queue_space_available_fn + /// Callback to be invoked after resources become + /// available after an UNAVAILABLE Write. + /// /// @returns @rst /// /// .. pw-status-codes:: @@ -83,7 +87,8 @@ class RfcommChannel final : public L2capChannel { Config rx_config, Config tx_config, uint8_t channel_number, - Function payload)>&& receive_fn); + Function payload)>&& receive_fn, + Function&& queue_space_available_fn); /// Send an RFCOMM payload to the remote peer. /// @@ -95,9 +100,9 @@ class RfcommChannel final : public L2capChannel { /// .. pw-status-codes:: /// OK: If packet was successfully queued for send. /// UNAVAILABLE: If channel could not acquire the resources to queue - /// the send at this time (transient error). - /// TODO: https://pwbug.dev/380299794 - Add more robust - /// flow control solution. + /// the send at this time (transient error). If a + /// `queue_space_available_fn` has been provided it will + /// be called when there is queue space available again. /// INVALID_ARGUMENT: If payload is too large. /// FAILED_PRECONDITION: If channel is `kStopped`. /// @endrst @@ -114,7 +119,8 @@ class RfcommChannel final : public L2capChannel { Config rx_config, Config tx_config, uint8_t channel_number, - Function payload)>&& receive_fn); + Function payload)>&& receive_fn, + Function&& queue_space_available_fn); // Parses out RFCOMM payload from `l2cap_pdu` and calls // `SendPayloadFromControllerToClient`. diff --git a/pw_bluetooth_proxy/rfcomm_channel.cc b/pw_bluetooth_proxy/rfcomm_channel.cc index bea4fe429..b1d5d870f 100644 --- a/pw_bluetooth_proxy/rfcomm_channel.cc +++ b/pw_bluetooth_proxy/rfcomm_channel.cc @@ -123,7 +123,6 @@ pw::Status RfcommChannel::Write(pw::span payload) { // TODO: https://pwbug.dev/379184978 - Support legacy non-credit based flow // control. - return QueuePacket(std::move(h4_packet)); } @@ -146,7 +145,8 @@ Result RfcommChannel::Create( Config rx_config, Config tx_config, uint8_t channel_number, - pw::Function payload)>&& receive_fn) { + Function payload)>&& receive_fn, + Function&& queue_space_available_fn) { if (!AreValidParameters(/*connection_handle=*/connection_handle, /*local_cid=*/rx_config.cid, /*remote_cid=*/tx_config.cid)) { @@ -158,7 +158,8 @@ Result RfcommChannel::Create( rx_config, tx_config, channel_number, - std::move(receive_fn)); + std::move(receive_fn), + std::move(queue_space_available_fn)); } bool RfcommChannel::HandlePduFromController(pw::span l2cap_pdu) { @@ -257,13 +258,16 @@ RfcommChannel::RfcommChannel( Config rx_config, Config tx_config, uint8_t channel_number, - pw::Function payload)>&& receive_fn) - : L2capChannel(/*l2cap_channel_manager=*/l2cap_channel_manager, - /*connection_handle=*/connection_handle, - /*transport=*/AclTransportType::kBrEdr, - /*local_cid=*/rx_config.cid, - /*remote_cid=*/tx_config.cid, - /*payload_from_controller_fn=*/std::move(receive_fn)), + Function payload)>&& receive_fn, + Function&& queue_space_available_fn) + : L2capChannel( + /*l2cap_channel_manager=*/l2cap_channel_manager, + /*connection_handle=*/connection_handle, + /*transport=*/AclTransportType::kBrEdr, + /*local_cid=*/rx_config.cid, + /*remote_cid=*/tx_config.cid, + /*payload_from_controller_fn=*/std::move(receive_fn), + /*queue_space_available_fn=*/std::move(queue_space_available_fn)), rx_config_(rx_config), tx_config_(tx_config), channel_number_(channel_number),