Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for ACL and ISO HCI packet queues #630

Merged
merged 2 commits into from
Jan 24, 2025
Merged

Conversation

barbibulle
Copy link
Collaborator

ISO packets for BIS and CIS are now sent to the controller via an HCI packet queue that respects the buffering requirements of the controller.
In addition, the packet queues are made visible to the API users (advanced use case), and can emit 'drain' events when packets are marked as completed by the controller (this can be used for sending data at the right rate, rather than rely on a timer).
NOTE: the change to use the drain event rather than a timer in the auracast app is not included in this PR (coming in a later PR), but the use of the drain even is add, just to print the buffer levels as the packets are being sent (and we can see that with a timer approach, packets end up being buffered, which may become a problem eventually for long running streams, but this will get fixed in the upcoming PR).

@barbibulle barbibulle requested a review from zxzxwu January 22, 2025 18:50
bumble/host.py Outdated
self.sco_links.keys(),
itertools.chain.from_iterable(self.bigs.values()),
):
connection.acl_packet_queue.on_packets_completed(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May use the new method?

Suggested change
connection.acl_packet_queue.on_packets_completed(
if queue := self.get_data_packet_queue(connection_handle):
queue.on_packets_completed(num_completed_packets, connection_handle)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


return None

def send_iso_sdu(self, connection_handle: int, sdu: bytes) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides using drain events, I think maybe we can also allow tracking the completeness of each packet, or even make sending methods async, such as:

@overload
async def send_iso_sdu(self, connection_handle: int, sdu: bytes, wait_for_completed: Literal[False]) -> None:
    ...

@overload
def send_iso_sdu(self, connection_handle: int, sdu: bytes, wait_for_completed: Literal[True]) -> None:
    ...

def send_iso_sdu(self, connection_handle: int, sdu: bytes,  wait_for_completed: Literal[True, False]) -> None | Awaitable[None]:
    ...
    packets = []
    while bytes_remaining:
        packet = ...
        packets.append(packet)
        iso_link.packet_queue.enqueue(packet)
    if wait_for_completed:
        return asyncio.gather([packet.completed for packet in packets])

For ISO there is probably just a small difference, but ACL links are usually used by multiple clients so they may never drain.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually for ISO usage, users can also be benefit from per-packet tracking. Because if we only have drain events, users must wait until all packets are completed, so if they want to have an audio buffer of multiple packets, there will still be a gap without any in flight packet like:

def on_drain(queue):
  # At this moment no packet is in flight
  for _ in range(5):
    queue.host.send_iso_pdu(...)

If they can track each packet

packets = Deque()
while True:
  if len(packets) < audio_buffer_size or await packets[0].completed:
    packet = ...
    packets.pop()
    packets.append(packet)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The drain even isn't emitted when the buffer is empty, it is emitted when some packets have been completed (i.e it isn't an empty event). It allows the listener to know when something has changed in the queue. Maybe I should rename it to flow to avoid the confusion?
Regarding tracking the completion of each packet, I think that could work, but may be a bit heavy. Typically, someone sending packets will want to queue a few, and wait for the queue to have space before putting more packets, but not wait for each packet individually. I was thinking of creating a helper class that would implement a stream function, with a packet threshold, which would be async, and manage sending more packets to the queue automatically. Something like: stream = Stream(packet_queue, buffer_size), then await stream.write(packet), which would block until there's less than buffer_size packets not completed. (I would add this helper class to my next PR).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up renaming the even from drain to flow (even if technically drain doesn't imply empty, it is sometimes used that way, so could be confusing, so flow makes it more clear that it's about packets going from one side of queue to the other)

@barbibulle barbibulle merged commit afee659 into main Jan 24, 2025
57 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants