Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(channel): introduce new channel component #283

Merged
merged 1 commit into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@
* introduced a new `Psl\Network` component.
* introduced a new `Psl\TCP` component.
* introduced a new `Psl\Unix` component.
* introduced a new `Psl\Channel` component.
1 change: 1 addition & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

- [Psl](./component/psl.md)
- [Psl\Async](./component/async.md)
- [Psl\Channel](./component/channel.md)
- [Psl\Class](./component/class.md)
- [Psl\Collection](./component/collection.md)
- [Psl\DataStructure](./component/data-structure.md)
Expand Down
23 changes: 23 additions & 0 deletions docs/component/channel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<!--
This markdown file was generated using `docs/documenter.php`.

Any edits to it will likely be lost.
-->

[*index](./../README.md)

---

### `Psl\Channel` Component

#### `Functions`

- [bounded](./../../src/Psl/Channel/bounded.php#L18)
- [unbounded](./../../src/Psl/Channel/unbounded.php#L16)

#### `Interfaces`

- [ReceiverInterface](./../../src/Psl/Channel/ReceiverInterface.php#L12)
- [SenderInterface](./../../src/Psl/Channel/SenderInterface.php#L12)


4 changes: 2 additions & 2 deletions docs/component/data-structure.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

#### `Classes`

- [PriorityQueue](./../../src/Psl/DataStructure/PriorityQueue.php#L18)
- [Queue](./../../src/Psl/DataStructure/Queue.php#L19)
- [PriorityQueue](./../../src/Psl/DataStructure/PriorityQueue.php#L20)
- [Queue](./../../src/Psl/DataStructure/Queue.php#L21)
- [Stack](./../../src/Psl/DataStructure/Stack.php#L19)


1 change: 1 addition & 0 deletions docs/documenter.php
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ function get_all_components(): array
$components = [
'Psl',
'Psl\\Async',
'Psl\\Channel',
'Psl\\Class',
'Psl\\Collection',
'Psl\\DataStructure',
Expand Down
25 changes: 25 additions & 0 deletions examples/channel/main.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace Psl\Example\IO;

use Psl\Async;
use Psl\Channel;
use Psl\IO;

require __DIR__ . '/../../vendor/autoload.php';

/**
* @var Channel\ReceiverInterface<string> $receiver
* @var Channel\SenderInterface<string> $sender
*/
[$receiver, $sender] = Channel\unbounded();

Async\Scheduler::delay(1, static function () use ($sender) {
$sender->send('Hello, World!');
});

$message = $receiver->receive();

IO\output_handle()->writeAll($message);
59 changes: 59 additions & 0 deletions src/Psl/Channel/ChannelInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?php

declare(strict_types=1);

namespace Psl\Channel;

use Countable;

/**
* @template T
*/
interface ChannelInterface extends Countable
{
/**
* Returns the channel capacity if it’s bounded.
*
* @mutation-free
*/
public function getCapacity(): ?int;

/**
* Closes the channel.
*
* The remaining messages can still be received.
*/
public function close(): void;

/**
* Returns true if the channel is closed.
*
* @mutation-free
*/
public function isClosed(): bool;

/**
* Returns the number of messages in the channel.
*
* @return positive-int|0
*
* @mutation-free
*/
public function count(): int;

/**
* Returns true if the channel is full.
*
* Unbounded channels are never full.
*
* @mutation-free
*/
public function isFull(): bool;

/**
* Returns true if the channel is empty.
*
* @mutation-free
*/
public function isEmpty(): bool;
}
34 changes: 34 additions & 0 deletions src/Psl/Channel/Exception/ClosedChannelException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

declare(strict_types=1);

namespace Psl\Channel\Exception;

use Psl\Channel;
use Psl\Exception\RuntimeException;

/**
* This exception is thrown when attempting to send or receive a message on a closed channel.
*
* @see Channel\SenderInterface::send()
* @see Channel\SenderInterface::trySend()
* @see Channel\ReceiverInterface::receive()
* @see Channel\ReceiverInterface::tryReceive()
*/
final class ClosedChannelException extends RuntimeException implements ExceptionInterface
{
public function __construct(string $message = 'Channel has been closed')
{
parent::__construct($message);
}

public static function forSending(): ClosedChannelException
{
return new self('Attempted to send a message to a closed channel.');
}

public static function forReceiving(): ClosedChannelException
{
return new self('Attempted to receive a message from a closed empty channel.');
}
}
19 changes: 19 additions & 0 deletions src/Psl/Channel/Exception/EmptyChannelException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

declare(strict_types=1);

namespace Psl\Channel\Exception;

use OutOfBoundsException;
use Psl\Channel;

/**
* This exception is throw when calling {@see Channel\ReceiverInterface::tryReceive()} on an empty channel.
*/
final class EmptyChannelException extends OutOfBoundsException implements ExceptionInterface
{
public static function create(): EmptyChannelException
{
return new self('Attempted to receiver from an empty channel.');
}
}
11 changes: 11 additions & 0 deletions src/Psl/Channel/Exception/ExceptionInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Psl\Channel\Exception;

use Psl\Exception;

interface ExceptionInterface extends Exception\ExceptionInterface
{
}
20 changes: 20 additions & 0 deletions src/Psl/Channel/Exception/FullChannelException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

declare(strict_types=1);

namespace Psl\Channel\Exception;

use OutOfBoundsException;
use Psl\Channel;
use Psl\Str;

/**
* This exception is throw when calling {@see Channel\SenderInterface::trySend()} on a full channel.
*/
final class FullChannelException extends OutOfBoundsException implements ExceptionInterface
{
public static function ofCapacity(int $capacity): FullChannelException
{
return new self(Str\format('Channel has reached its full capacity of %d.', $capacity));
}
}
128 changes: 128 additions & 0 deletions src/Psl/Channel/Internal/ChannelState.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
<?php

declare(strict_types=1);

namespace Psl\Channel\Internal;

use Psl\Channel\ChannelInterface;
use Psl\Channel\Exception;
use Psl\DataStructure\Queue;
use Psl\DataStructure\QueueInterface;

/**
* @template T
*
* @implements ChannelInterface<T>
*
* @internal
*/
final class ChannelState implements ChannelInterface
{
/**
* @var QueueInterface<T>
*/
private QueueInterface $messages;

private bool $closed = false;

public function __construct(
private ?int $capacity = null,
) {
$this->messages = new Queue();
}

/**
* {@inheritDoc}
*/
public function getCapacity(): ?int
{
return $this->capacity;
}

/**
* {@inheritDoc}
*/
public function close(): void
{
$this->closed = true;
}

/**
* {@inheritDoc}
*/
public function isClosed(): bool
{
return $this->closed;
}

/**
* {@inheritDoc}
*/
public function count(): int
{
return $this->messages->count();
}

/**
* {@inheritDoc}
*/
public function isFull(): bool
{
if (null === $this->capacity) {
return false;
}

return $this->capacity === $this->count();
}

/**
* {@inheritDoc}
*/
public function isEmpty(): bool
{
return 0 === $this->messages->count();
}

/**
* @param T $message
*
* @throws Exception\ClosedChannelException If the channel is closed.
* @throws Exception\FullChannelException If the channel is full.
*/
public function send(mixed $message): void
{
if ($this->closed) {
throw Exception\ClosedChannelException::forSending();
}

if (null === $this->capacity || $this->capacity > $this->count()) {
$this->messages->enqueue($message);

return;
}

throw Exception\FullChannelException::ofCapacity($this->capacity);
}

/**
* @throws Exception\ClosedChannelException If the channel is closed, and there's no more messages to receive.
* @throws Exception\EmptyChannelException If the channel is empty.
*
* @return T
*/
public function receive(): mixed
{
$empty = 0 === $this->count();
$closed = $this->closed;
if ($closed && $empty) {
throw Exception\ClosedChannelException::forReceiving();
}

if ($empty) {
throw Exception\EmptyChannelException::create();
}

/** @psalm-suppress MissingThrowsDocblock */
return $this->messages->dequeue();
}
}
Loading