diff --git a/src/Ipc/SocketIpcHub.php b/src/Ipc/SocketIpcHub.php index 19d1575..69f621f 100644 --- a/src/Ipc/SocketIpcHub.php +++ b/src/Ipc/SocketIpcHub.php @@ -34,6 +34,8 @@ final class SocketIpcHub implements IpcHub /** @var \Closure(): void */ private readonly \Closure $accept; + private bool $queued = false; + /** * @param float $keyReceiveTimeout Timeout to receive the key on accepted connections. * @param positive-int $keyLength Length of the random key exchanged on the IPC channel when connecting. @@ -49,10 +51,28 @@ public function __construct( SocketAddressType::Internet => 'tcp://' . $address->toString(), }; + $queued = &$this->queued; $keys = &$this->keys; $pending = &$this->pending; - $this->accept = static function () use (&$keys, &$pending, $server, $keyReceiveTimeout, $keyLength): void { - while ($pending && $client = $server->accept()) { + $this->accept = static function () use ( + &$queued, + &$keys, + &$pending, + $server, + $keyReceiveTimeout, + $keyLength, + ): void { + while ($pending) { + $client = $server->accept(); + if (!$client) { + $queued = false; + $exception = new Socket\SocketException('IPC socket closed before the client connected'); + foreach ($pending as $deferred) { + $deferred->error($exception); + } + return; + } + try { $received = readKey($client, new TimeoutCancellation($keyReceiveTimeout), $keyLength); } catch (\Throwable) { @@ -77,6 +97,8 @@ public function __construct( $deferred->complete($client); } + + $queued = false; }; } @@ -124,6 +146,10 @@ public function generateKey(): string */ public function accept(string $key, ?Cancellation $cancellation = null): ResourceSocket { + if ($this->server->isClosed()) { + throw new Socket\SocketException('The IPC server has been closed'); + } + if (\strlen($key) !== $this->keyLength) { throw new \ValueError(\sprintf( "Key provided is of length %d, expected %d", @@ -138,12 +164,13 @@ public function accept(string $key, ?Cancellation $cancellation = null): Resourc $id = $this->nextId++; - if (!$this->pending) { + if (!$this->queued) { EventLoop::queue($this->accept); + $this->queued = true; } $this->keys[$key] = $id; - $this->pending[$id] = $deferred = new DeferredFuture; + $this->pending[$id] = $deferred = new DeferredFuture(); try { $client = $deferred->getFuture()->await($cancellation); diff --git a/test/Ipc/SocketIpcHubTest.php b/test/Ipc/SocketIpcHubTest.php new file mode 100644 index 0000000..e6bcdd2 --- /dev/null +++ b/test/Ipc/SocketIpcHubTest.php @@ -0,0 +1,53 @@ +server = Socket\listen('127.0.0.1:0'); + $this->ipcHub = new SocketIpcHub($this->server); + } + + public function testAcceptAfterCancel(): void + { + $key = $this->ipcHub->generateKey(); + + $deferredCancellation = new DeferredCancellation(); + EventLoop::delay(0.1, static fn () => $deferredCancellation->cancel()); + + try { + $this->ipcHub->accept($key, $deferredCancellation->getCancellation()); + self::fail('Expecting accept to have been cancelled'); + } catch (CancelledException) { + // Expected accept to be cancelled. + } + + $key = $this->ipcHub->generateKey(); + + async(function () use ($key): void { + $client = Socket\connect($this->server->getAddress()); + $client->write($key); + }); + + $client = $this->ipcHub->accept($key, new TimeoutCancellation(1)); + + self::assertSame($this->server->getAddress()->toString(), $client->getLocalAddress()->toString()); + } +}