diff --git a/CHANGELOG.md b/CHANGELOG.md index a9b8dd70..42584c25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,3 +23,4 @@ * introduced a new `Psl\Network` component. * introduced a new `Psl\TCP` component. * introduced a new `Psl\Unix` component. +* introduced a new `Psl\Channel` component. diff --git a/docs/README.md b/docs/README.md index 1181f47f..58b7d1db 100644 --- a/docs/README.md +++ b/docs/README.md @@ -8,6 +8,7 @@ - [Psl](./component/psl.md) - [Psl\Async](./component/async.md) +- [Psl\Channel](./component/channel.md) - [Psl\Class](./component/class.md) - [Psl\Collection](./component/collection.md) - [Psl\DataStructure](./component/data-structure.md) diff --git a/docs/component/channel.md b/docs/component/channel.md new file mode 100644 index 00000000..4b81c2df --- /dev/null +++ b/docs/component/channel.md @@ -0,0 +1,23 @@ + + +[*index](./../README.md) + +--- + +### `Psl\Channel` Component + +#### `Functions` + +- [bounded](./../../src/Psl/Channel/bounded.php#L18) +- [unbounded](./../../src/Psl/Channel/unbounded.php#L16) + +#### `Interfaces` + +- [ReceiverInterface](./../../src/Psl/Channel/ReceiverInterface.php#L12) +- [SenderInterface](./../../src/Psl/Channel/SenderInterface.php#L12) + + diff --git a/docs/component/data-structure.md b/docs/component/data-structure.md index 193e9a3a..ac688a5a 100644 --- a/docs/component/data-structure.md +++ b/docs/component/data-structure.md @@ -18,8 +18,8 @@ #### `Classes` -- [PriorityQueue](./../../src/Psl/DataStructure/PriorityQueue.php#L18) -- [Queue](./../../src/Psl/DataStructure/Queue.php#L19) +- [PriorityQueue](./../../src/Psl/DataStructure/PriorityQueue.php#L20) +- [Queue](./../../src/Psl/DataStructure/Queue.php#L21) - [Stack](./../../src/Psl/DataStructure/Stack.php#L19) diff --git a/docs/documenter.php b/docs/documenter.php index dd2beb46..55a9ca76 100644 --- a/docs/documenter.php +++ b/docs/documenter.php @@ -184,6 +184,7 @@ function get_all_components(): array $components = [ 'Psl', 'Psl\\Async', + 'Psl\\Channel', 'Psl\\Class', 'Psl\\Collection', 'Psl\\DataStructure', diff --git a/examples/channel/main.php b/examples/channel/main.php new file mode 100644 index 00000000..31da59d1 --- /dev/null +++ b/examples/channel/main.php @@ -0,0 +1,25 @@ + $receiver + * @var Channel\SenderInterface $sender + */ +[$receiver, $sender] = Channel\unbounded(); + +Async\Scheduler::delay(1, static function () use ($sender) { + $sender->send('Hello, World!'); +}); + +$message = $receiver->receive(); + +IO\output_handle()->writeAll($message); diff --git a/src/Psl/Channel/ChannelInterface.php b/src/Psl/Channel/ChannelInterface.php new file mode 100644 index 00000000..21e09a32 --- /dev/null +++ b/src/Psl/Channel/ChannelInterface.php @@ -0,0 +1,59 @@ + + * + * @internal + */ +final class ChannelState implements ChannelInterface +{ + /** + * @var QueueInterface + */ + private QueueInterface $messages; + + private bool $closed = false; + + public function __construct( + private ?int $capacity = null, + ) { + $this->messages = new Queue(); + } + + /** + * {@inheritDoc} + */ + public function getCapacity(): ?int + { + return $this->capacity; + } + + /** + * {@inheritDoc} + */ + public function close(): void + { + $this->closed = true; + } + + /** + * {@inheritDoc} + */ + public function isClosed(): bool + { + return $this->closed; + } + + /** + * {@inheritDoc} + */ + public function count(): int + { + return $this->messages->count(); + } + + /** + * {@inheritDoc} + */ + public function isFull(): bool + { + if (null === $this->capacity) { + return false; + } + + return $this->capacity === $this->count(); + } + + /** + * {@inheritDoc} + */ + public function isEmpty(): bool + { + return 0 === $this->messages->count(); + } + + /** + * @param T $message + * + * @throws Exception\ClosedChannelException If the channel is closed. + * @throws Exception\FullChannelException If the channel is full. + */ + public function send(mixed $message): void + { + if ($this->closed) { + throw Exception\ClosedChannelException::forSending(); + } + + if (null === $this->capacity || $this->capacity > $this->count()) { + $this->messages->enqueue($message); + + return; + } + + throw Exception\FullChannelException::ofCapacity($this->capacity); + } + + /** + * @throws Exception\ClosedChannelException If the channel is closed, and there's no more messages to receive. + * @throws Exception\EmptyChannelException If the channel is empty. + * + * @return T + */ + public function receive(): mixed + { + $empty = 0 === $this->count(); + $closed = $this->closed; + if ($closed && $empty) { + throw Exception\ClosedChannelException::forReceiving(); + } + + if ($empty) { + throw Exception\EmptyChannelException::create(); + } + + /** @psalm-suppress MissingThrowsDocblock */ + return $this->messages->dequeue(); + } +} diff --git a/src/Psl/Channel/Internal/Receiver.php b/src/Psl/Channel/Internal/Receiver.php new file mode 100644 index 00000000..d10cf462 --- /dev/null +++ b/src/Psl/Channel/Internal/Receiver.php @@ -0,0 +1,133 @@ + + */ +final class Receiver implements ReceiverInterface +{ + /** + * @var null|Async\Deferred + */ + private ?Async\Deferred $deferred = null; + + /** + * @param ChannelState $state + */ + public function __construct( + private ChannelState $state + ) { + } + + /** + * {@inheritDoc} + */ + public function receive(): mixed + { + // there's a pending operation? wait for it. + $this->deferred?->getAwaitable()->then(static fn() => null, static fn() => null)->ignore()->await(); + + if ($this->state->isEmpty()) { + $this->deferred = new Async\Deferred(); + + Async\Scheduler::repeat(0.0001, function (string $identifier): void { + if (null === $this->deferred) { + Async\Scheduler::cancel($identifier); + } + + if ($this->state->isClosed()) { + /** + * Channel has been closed from the receiving side. + * + * @psalm-suppress PossiblyNullReference + */ + if (!$this->deferred->isComplete()) { + /** @psalm-suppress PossiblyNullReference */ + $this->deferred->error(Exception\ClosedChannelException::forSending()); + } + + return; + } + + if (!$this->state->isEmpty()) { + /** @psalm-suppress PossiblyNullReference */ + $this->deferred->complete(null); + } + }); + + /** @psalm-suppress PossiblyNullReference */ + $this->deferred->getAwaitable()->await(); + $this->deferred = null; + } + + /** @psalm-suppress MissingThrowsDocblock */ + return $this->state->receive(); + } + + /** + * {@inheritDoc} + */ + public function tryReceive(): mixed + { + return $this->state->receive(); + } + + /** + * {@inheritDoc} + */ + public function getCapacity(): ?int + { + return $this->state->getCapacity(); + } + + /** + * {@inheritDoc} + */ + public function close(): void + { + $this->deferred?->error(Exception\ClosedChannelException::forSending()); + + $this->state->close(); + } + + /** + * {@inheritDoc} + */ + public function isClosed(): bool + { + return $this->state->isClosed(); + } + + /** + * {@inheritDoc} + */ + public function count(): int + { + return $this->state->count(); + } + + /** + * {@inheritDoc} + */ + public function isFull(): bool + { + return $this->state->isFull(); + } + + /** + * {@inheritDoc} + */ + public function isEmpty(): bool + { + return $this->state->isEmpty(); + } +} diff --git a/src/Psl/Channel/Internal/Sender.php b/src/Psl/Channel/Internal/Sender.php new file mode 100644 index 00000000..6dd2f5bb --- /dev/null +++ b/src/Psl/Channel/Internal/Sender.php @@ -0,0 +1,133 @@ + + */ +final class Sender implements SenderInterface +{ + /** + * @var null|Async\Deferred + */ + private ?Async\Deferred $deferred = null; + + /** + * @param ChannelState $state + */ + public function __construct( + private ChannelState $state + ) { + } + + /** + * {@inheritDoc} + */ + public function send(mixed $message): void + { + // there's a pending operation? wait for it. + $this->deferred?->getAwaitable()->then(static fn() => null, static fn() => null)->ignore()->await(); + + if ($this->state->isFull()) { + $this->deferred = new Async\Deferred(); + + Async\Scheduler::repeat(0.0001, function (string $identifier): void { + if (null === $this->deferred) { + Async\Scheduler::cancel($identifier); + } + + if ($this->state->isClosed()) { + /** + * Channel has been closed from the receiving side. + * + * @psalm-suppress PossiblyNullReference + */ + if (!$this->deferred->isComplete()) { + /** @psalm-suppress PossiblyNullReference */ + $this->deferred->error(Exception\ClosedChannelException::forSending()); + } + + return; + } + + if (!$this->state->isFull()) { + /** @psalm-suppress PossiblyNullReference */ + $this->deferred->complete(null); + } + }); + + /** @psalm-suppress PossiblyNullReference */ + $this->deferred->getAwaitable()->await(); + $this->deferred = null; + } + + /** @psalm-suppress MissingThrowsDocblock */ + $this->state->send($message); + } + + /** + * {@inheritDoc} + */ + public function trySend(mixed $message): void + { + $this->state->send($message); + } + + /** + * {@inheritDoc} + */ + public function getCapacity(): ?int + { + return $this->state->getCapacity(); + } + + /** + * {@inheritDoc} + */ + public function close(): void + { + $this->deferred?->error(Exception\ClosedChannelException::forSending()); + + $this->state->close(); + } + + /** + * {@inheritDoc} + */ + public function isClosed(): bool + { + return $this->state->isClosed(); + } + + /** + * {@inheritDoc} + */ + public function count(): int + { + return $this->state->count(); + } + + /** + * {@inheritDoc} + */ + public function isFull(): bool + { + return $this->state->isFull(); + } + + /** + * {@inheritDoc} + */ + public function isEmpty(): bool + { + return $this->state->isEmpty(); + } +} diff --git a/src/Psl/Channel/ReceiverInterface.php b/src/Psl/Channel/ReceiverInterface.php new file mode 100644 index 00000000..6614ab9b --- /dev/null +++ b/src/Psl/Channel/ReceiverInterface.php @@ -0,0 +1,40 @@ + + */ +interface ReceiverInterface extends ChannelInterface +{ + /** + * Receives a message from the channel. + * + * If the channel is empty, this method waits until there is a message. + * + * If the channel is closed, this method receives a message or throws if there are no more messages. + * + * @throws Exception\ClosedChannelException If the channel is closed, and there's no more messages to receive. + * + * @return T + */ + public function receive(): mixed; + + /** + * Receives a message from the channel immediately. + * + * If the channel is empty, this method will throw an exception. + * + * If the channel is closed, this method receives a message or throws if there are no more messages. + * + * @throws Exception\ClosedChannelException If the channel is closed, and there's no more messages to receive. + * @throws Exception\EmptyChannelException If the channel is empty. + * + * @return T + */ + public function tryReceive(): mixed; +} diff --git a/src/Psl/Channel/SenderInterface.php b/src/Psl/Channel/SenderInterface.php new file mode 100644 index 00000000..8b26811b --- /dev/null +++ b/src/Psl/Channel/SenderInterface.php @@ -0,0 +1,40 @@ + + */ +interface SenderInterface extends ChannelInterface +{ + /** + * Send a message to the channel. + * + * If the channel is full, this method waits until there is space for a message. + * + * If the channel is closed, this method throws. + * + * @param T $message + * + * @throws Exception\ClosedChannelException If the channel is closed. + */ + public function send(mixed $message): void; + + /** + * Receives a message from the channel immediately. + * + * If the channel is full, this method will throw an exception. + * + * If the channel is closed, this method throws. + * + * @param T $message + * + * @throws Exception\ClosedChannelException If the channel is closed. + * @throws Exception\FullChannelException If the channel is full. + */ + public function trySend(mixed $message): void; +} diff --git a/src/Psl/Channel/bounded.php b/src/Psl/Channel/bounded.php new file mode 100644 index 00000000..379ea293 --- /dev/null +++ b/src/Psl/Channel/bounded.php @@ -0,0 +1,26 @@ +, 1: SenderInterface} + */ +function bounded(int $capacity): array +{ + $channel = new Internal\ChannelState($capacity); + + return [ + new Internal\Receiver($channel), + new Internal\Sender($channel), + ]; +} diff --git a/src/Psl/Channel/unbounded.php b/src/Psl/Channel/unbounded.php new file mode 100644 index 00000000..f6e1ecfb --- /dev/null +++ b/src/Psl/Channel/unbounded.php @@ -0,0 +1,24 @@ +, 1: SenderInterface} + */ +function unbounded(): array +{ + $channel = new Internal\ChannelState(); + + return [ + new Internal\Receiver($channel), + new Internal\Sender($channel), + ]; +} diff --git a/src/Psl/DataStructure/PriorityQueue.php b/src/Psl/DataStructure/PriorityQueue.php index ec799df0..6daebc78 100644 --- a/src/Psl/DataStructure/PriorityQueue.php +++ b/src/Psl/DataStructure/PriorityQueue.php @@ -10,6 +10,8 @@ use Psl\Math; use Psl\Vec; +use function count; + /** * @template T * @@ -128,14 +130,17 @@ private function drop(): void /** * Count the nodes in the queue. + * + * @return positive-int|0 */ public function count(): int { $count = 0; foreach ($this->queue as $_priority => $list) { - $count += Iter\count($list); + $count += count($list); } + /** @var positive-int|0 */ return $count; } } diff --git a/src/Psl/DataStructure/Queue.php b/src/Psl/DataStructure/Queue.php index b663a999..b61c4714 100644 --- a/src/Psl/DataStructure/Queue.php +++ b/src/Psl/DataStructure/Queue.php @@ -9,6 +9,8 @@ use Psl\Iter; use Psl\Vec; +use function count; + /** * A basic implementation of a queue data structure ( FIFO ). * @@ -79,9 +81,11 @@ public function dequeue(): mixed /** * Count the nodes in the queue. + * + * @return positive-int|0 */ public function count(): int { - return Iter\count($this->queue); + return count($this->queue); } } diff --git a/src/Psl/DataStructure/QueueInterface.php b/src/Psl/DataStructure/QueueInterface.php index b56631af..47801697 100644 --- a/src/Psl/DataStructure/QueueInterface.php +++ b/src/Psl/DataStructure/QueueInterface.php @@ -50,6 +50,8 @@ public function dequeue(): mixed; /** * Count the nodes in the queue. + * + * @return positive-int|0 */ public function count(): int; } diff --git a/src/Psl/Internal/Loader.php b/src/Psl/Internal/Loader.php index ee12e851..480520ae 100644 --- a/src/Psl/Internal/Loader.php +++ b/src/Psl/Internal/Loader.php @@ -476,6 +476,8 @@ final class Loader 'Psl\Network\Internal\server_listen', 'Psl\TCP\connect', 'Psl\Unix\connect', + 'Psl\Channel\bounded', + 'Psl\Channel\unbounded', ]; public const INTERFACES = [ @@ -533,6 +535,9 @@ final class Loader 'Psl\Network\ServerInterface', 'Psl\TCP\SocketInterface', 'Psl\Unix\SocketInterface', + 'Psl\Channel\SenderInterface', + 'Psl\Channel\ReceiverInterface', + 'Psl\Channel\Exception\ExceptionInterface', ]; public const TRAITS = [ @@ -665,6 +670,12 @@ final class Loader 'Psl\TCP\Internal\Socket', 'Psl\Unix\Server', 'Psl\Unix\Internal\Socket', + 'Psl\Channel\Internal\ChannelState', + 'Psl\Channel\Internal\Sender', + 'Psl\Channel\Internal\Receiver', + 'Psl\Channel\Exception\ClosedChannelException', + 'Psl\Channel\Exception\EmptyChannelException', + 'Psl\Channel\Exception\FullChannelException', ]; public const ENUMS = [ diff --git a/tests/unit/Channel/BoundedChannelTest.php b/tests/unit/Channel/BoundedChannelTest.php new file mode 100644 index 00000000..e36c951e --- /dev/null +++ b/tests/unit/Channel/BoundedChannelTest.php @@ -0,0 +1,268 @@ + $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\bounded(1); + + static::assertSame(1, $receiver->getCapacity()); + static::assertSame(1, $sender->getCapacity()); + } + + public function testCloseSender(): void + { + /** + * @var Channel\ReceiverInterface $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\bounded(1); + + static::assertFalse($receiver->isClosed()); + static::assertFalse($sender->isClosed()); + + $sender->close(); + + static::assertTrue($receiver->isClosed()); + static::assertTrue($sender->isClosed()); + } + + public function testCloseReceiver(): void + { + /** + * @var Channel\ReceiverInterface $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\bounded(1); + + static::assertFalse($receiver->isClosed()); + static::assertFalse($sender->isClosed()); + + $receiver->close(); + + static::assertTrue($receiver->isClosed()); + static::assertTrue($sender->isClosed()); + } + + public function testCount(): void + { + /** + * @var Channel\ReceiverInterface $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\bounded(2); + + static::assertSame(0, $receiver->count()); + static::assertSame(0, $sender->count()); + + $sender->send('foo'); + $sender->send('bar'); + + static::assertSame(2, $receiver->count()); + static::assertSame(2, $sender->count()); + + static::assertSame('foo', $receiver->receive()); + + static::assertSame(1, $receiver->count()); + static::assertSame(1, $sender->count()); + + static::assertSame('bar', $receiver->tryReceive()); + + static::assertSame(0, $receiver->count()); + static::assertSame(0, $sender->count()); + + $sender->trySend('baz'); + + static::assertSame(1, $receiver->count()); + static::assertSame(1, $sender->count()); + } + + public function testIsFull(): void + { + /** + * @var Channel\ReceiverInterface $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\bounded(1); + + static::assertFalse($receiver->isFull()); + static::assertFalse($sender->isFull()); + + $sender->send('foo'); + + static::assertTrue($receiver->isFull()); + static::assertTrue($sender->isFull()); + } + + public function testTrySendThrowsOnFullChannel(): void + { + /** + * @var Channel\ReceiverInterface $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\bounded(1); + + $sender->send('hello'); + + $this->expectException(Channel\Exception\FullChannelException::class); + + $sender->trySend('world'); + } + + public function testSendWaitsForFullChannel(): void + { + /** + * @var Channel\ReceiverInterface $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\bounded(1); + + $sender->send('hello'); + + Async\Scheduler::delay(0.001, static function () use ($receiver) { + $receiver->receive(); + }); + + static::assertTrue($receiver->isFull()); + + $sender->send('world'); + } + + public function testSendThrowsForClosedChannel(): void + { + /** + * @var Channel\ReceiverInterface $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\bounded(1); + + $receiver->close(); + + $this->expectException(Channel\Exception\ClosedChannelException::class); + + $sender->send('world'); + } + + public function testSendThrowsForLateClosedChannel(): void + { + /** + * @var Channel\ReceiverInterface $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\bounded(1); + + $sender->send('hello'); + + Async\Scheduler::delay(0.001, static function () use ($receiver): void { + $receiver->close(); + }); + + $this->expectException(Channel\Exception\ClosedChannelException::class); + + $sender->send('world'); + } + + public function testTrySendThrowsForClosedChannel(): void + { + /** + * @var Channel\ReceiverInterface $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\bounded(1); + + $receiver->close(); + + $this->expectException(Channel\Exception\ClosedChannelException::class); + + $sender->trySend('world'); + } + + public function testReceiveThrowsForClosedChannel(): void + { + /** + * @var Channel\ReceiverInterface $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\bounded(1); + + $sender->close(); + + $this->expectException(Channel\Exception\ClosedChannelException::class); + + $receiver->receive(); + } + + public function testReceiveThrowsForLateClosedChannel(): void + { + /** + * @var Channel\ReceiverInterface $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\bounded(1); + + Async\Scheduler::delay(0.0001, static function () use ($sender): void { + $sender->close(); + }); + + $this->expectException(Channel\Exception\ClosedChannelException::class); + + $receiver->receive(); + } + + public function testTryReceiveThrowsForClosedChannel(): void + { + /** + * @var Channel\ReceiverInterface $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\bounded(1); + + $sender->close(); + + $this->expectException(Channel\Exception\ClosedChannelException::class); + + $receiver->tryReceive(); + } + + public function testReceiveWaitsWhenChannelIsEmpty(): void + { + /** + * @var Channel\ReceiverInterface $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\bounded(1); + + Async\Scheduler::delay(0.001, static function () use ($sender) { + $sender->send('hello'); + }); + + static::assertTrue($receiver->isEmpty()); + + static::assertSame('hello', $receiver->receive()); + } + + public function testTryReceiveThrowsForEmptyChannel(): void + { + /** + * @var Channel\ReceiverInterface $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\bounded(1); + + $this->expectException(Channel\Exception\EmptyChannelException::class); + + $receiver->tryReceive(); + } +} diff --git a/tests/unit/Channel/UnboundedChannelTest.php b/tests/unit/Channel/UnboundedChannelTest.php new file mode 100644 index 00000000..f0026519 --- /dev/null +++ b/tests/unit/Channel/UnboundedChannelTest.php @@ -0,0 +1,124 @@ + $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\unbounded(); + + static::assertNull($receiver->getCapacity()); + static::assertNull($sender->getCapacity()); + } + + public function testCloseSender(): void + { + /** + * @var Channel\ReceiverInterface $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\unbounded(); + + static::assertFalse($receiver->isClosed()); + static::assertFalse($sender->isClosed()); + + $sender->close(); + + static::assertTrue($receiver->isClosed()); + static::assertTrue($sender->isClosed()); + } + + public function testCloseReceiver(): void + { + /** + * @var Channel\ReceiverInterface $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\unbounded(); + + static::assertFalse($receiver->isClosed()); + static::assertFalse($sender->isClosed()); + + $receiver->close(); + + static::assertTrue($receiver->isClosed()); + static::assertTrue($sender->isClosed()); + } + + public function testCount(): void + { + /** + * @var Channel\ReceiverInterface $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\unbounded(); + + static::assertSame(0, $receiver->count()); + static::assertSame(0, $sender->count()); + + $sender->send('foo'); + $sender->send('bar'); + + static::assertSame(2, $receiver->count()); + static::assertSame(2, $sender->count()); + + static::assertSame('foo', $receiver->receive()); + + static::assertSame(1, $receiver->count()); + static::assertSame(1, $sender->count()); + + static::assertSame('bar', $receiver->tryReceive()); + + static::assertSame(0, $receiver->count()); + static::assertSame(0, $sender->count()); + + $sender->trySend('baz'); + + static::assertSame(1, $receiver->count()); + static::assertSame(1, $sender->count()); + } + + public function testIsFull(): void + { + /** + * @var Channel\ReceiverInterface $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\unbounded(); + + static::assertFalse($receiver->isFull()); + static::assertFalse($sender->isFull()); + + $sender->send('foo'); + + static::assertFalse($receiver->isFull()); + static::assertFalse($sender->isFull()); + } + + public function testIsEmpty(): void + { + /** + * @var Channel\ReceiverInterface $receiver + * @var Channel\SenderInterface $sender + */ + [$receiver, $sender] = Channel\unbounded(); + + static::assertTrue($receiver->isEmpty()); + static::assertTrue($sender->isEmpty()); + + $sender->send('foo'); + + static::assertFalse($receiver->isEmpty()); + static::assertFalse($sender->isEmpty()); + } +}