diff --git a/README.md b/README.md index 2ab41a1..cce1408 100644 --- a/README.md +++ b/README.md @@ -353,7 +353,7 @@ By default, this will call `end()` on the destination stream once the source stream emits an `end` event. This can be disabled like this: ```php -$source->pipe($dest, array('end' => false)); +$source->pipe($dest, ['end' => false]); ``` Note that this only applies to the `end` event. @@ -1126,7 +1126,7 @@ $through = new ThroughStream(function ($data) { }); $through->on('data', $this->expectCallableOnceWith("[2, true]\n")); -$through->write(array(2, true)); +$through->write([2, true]); ``` The callback function is allowed to throw an `Exception`. In this case, diff --git a/src/CompositeStream.php b/src/CompositeStream.php index dde091d..d0b934b 100644 --- a/src/CompositeStream.php +++ b/src/CompositeStream.php @@ -20,11 +20,11 @@ public function __construct(ReadableStreamInterface $readable, WritableStreamInt return; } - Util::forwardEvents($this->readable, $this, array('data', 'end', 'error')); - Util::forwardEvents($this->writable, $this, array('drain', 'error', 'pipe')); + Util::forwardEvents($this->readable, $this, ['data', 'end', 'error']); + Util::forwardEvents($this->writable, $this, ['drain', 'error', 'pipe']); - $this->readable->on('close', array($this, 'close')); - $this->writable->on('close', array($this, 'close')); + $this->readable->on('close', [$this, 'close']); + $this->writable->on('close', [$this, 'close']); } public function isReadable() @@ -46,7 +46,7 @@ public function resume() $this->readable->resume(); } - public function pipe(WritableStreamInterface $dest, array $options = array()) + public function pipe(WritableStreamInterface $dest, array $options = []) { return Util::pipe($this, $dest, $options); } diff --git a/src/DuplexResourceStream.php b/src/DuplexResourceStream.php index 4da2139..ed4dbe0 100644 --- a/src/DuplexResourceStream.php +++ b/src/DuplexResourceStream.php @@ -59,14 +59,9 @@ public function __construct($stream, LoopInterface $loop = null, $readChunkSize // Use unbuffered read operations on the underlying stream resource. // Reading chunks from the stream may otherwise leave unread bytes in // PHP's stream buffers which some event loop implementations do not - // trigger events on (edge triggered). - // This does not affect the default event loop implementation (level - // triggered), so we can ignore platforms not supporting this (HHVM). - // Pipe streams (such as STDIN) do not seem to require this and legacy - // PHP versions cause SEGFAULTs on unbuffered pipe streams, so skip this. - if (\function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) { - \stream_set_read_buffer($stream, 0); - } + // trigger events on (edge triggered). This does not affect the default + // event loop implementation (level triggered). + \stream_set_read_buffer($stream, 0); if ($buffer === null) { $buffer = new WritableResourceStream($stream, $loop); @@ -77,16 +72,14 @@ public function __construct($stream, LoopInterface $loop = null, $readChunkSize $this->bufferSize = ($readChunkSize === null) ? 65536 : (int)$readChunkSize; $this->buffer = $buffer; - $that = $this; - - $this->buffer->on('error', function ($error) use ($that) { - $that->emit('error', array($error)); + $this->buffer->on('error', function ($error) { + $this->emit('error', [$error]); }); - $this->buffer->on('close', array($this, 'close')); + $this->buffer->on('close', [$this, 'close']); - $this->buffer->on('drain', function () use ($that) { - $that->emit('drain'); + $this->buffer->on('drain', function () { + $this->emit('drain'); }); $this->resume(); @@ -113,7 +106,9 @@ public function pause() public function resume() { if (!$this->listening && $this->readable) { - $this->loop->addReadStream($this->stream, array($this, 'handleData')); + $this->loop->addReadStream($this->stream, function () { + $this->handleData(); + }); $this->listening = true; } } @@ -163,13 +158,12 @@ public function end($data = null) $this->buffer->end($data); } - public function pipe(WritableStreamInterface $dest, array $options = array()) + public function pipe(WritableStreamInterface $dest, array $options = []) { return Util::pipe($this, $dest, $options); } - /** @internal */ - public function handleData($stream) + private function handleData() { $error = null; \set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) { @@ -182,46 +176,22 @@ public function handleData($stream) ); }); - $data = \stream_get_contents($stream, $this->bufferSize); + $data = \stream_get_contents($this->stream, $this->bufferSize); \restore_error_handler(); if ($error !== null) { - $this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error))); + $this->emit('error', [new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)]); $this->close(); return; } if ($data !== '') { - $this->emit('data', array($data)); + $this->emit('data', [$data]); } elseif (\feof($this->stream)) { // no data read => we reached the end and close the stream $this->emit('end'); $this->close(); } } - - /** - * Returns whether this is a pipe resource in a legacy environment - * - * This works around a legacy PHP bug (#61019) that was fixed in PHP 5.4.28+ - * and PHP 5.5.12+ and newer. - * - * @param resource $resource - * @return bool - * @link https://github.com/reactphp/child-process/issues/40 - * - * @codeCoverageIgnore - */ - private function isLegacyPipe($resource) - { - if (\PHP_VERSION_ID < 50428 || (\PHP_VERSION_ID >= 50500 && \PHP_VERSION_ID < 50512)) { - $meta = \stream_get_meta_data($resource); - - if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') { - return true; - } - } - return false; - } } diff --git a/src/ReadableResourceStream.php b/src/ReadableResourceStream.php index 1b0b08c..8b66aab 100644 --- a/src/ReadableResourceStream.php +++ b/src/ReadableResourceStream.php @@ -61,14 +61,9 @@ public function __construct($stream, LoopInterface $loop = null, $readChunkSize // Use unbuffered read operations on the underlying stream resource. // Reading chunks from the stream may otherwise leave unread bytes in // PHP's stream buffers which some event loop implementations do not - // trigger events on (edge triggered). - // This does not affect the default event loop implementation (level - // triggered), so we can ignore platforms not supporting this (HHVM). - // Pipe streams (such as STDIN) do not seem to require this and legacy - // PHP versions cause SEGFAULTs on unbuffered pipe streams, so skip this. - if (\function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) { - \stream_set_read_buffer($stream, 0); - } + // trigger events on (edge triggered). This does not affect the default + // event loop implementation (level triggered). + \stream_set_read_buffer($stream, 0); $this->stream = $stream; $this->loop = $loop ?: Loop::get(); @@ -93,12 +88,14 @@ public function pause() public function resume() { if (!$this->listening && !$this->closed) { - $this->loop->addReadStream($this->stream, array($this, 'handleData')); + $this->loop->addReadStream($this->stream, function () { + $this->handleData(); + }); $this->listening = true; } } - public function pipe(WritableStreamInterface $dest, array $options = array()) + public function pipe(WritableStreamInterface $dest, array $options = []) { return Util::pipe($this, $dest, $options); } @@ -120,8 +117,7 @@ public function close() } } - /** @internal */ - public function handleData() + private function handleData() { $error = null; \set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) { @@ -139,41 +135,17 @@ public function handleData() \restore_error_handler(); if ($error !== null) { - $this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error))); + $this->emit('error', [new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)]); $this->close(); return; } if ($data !== '') { - $this->emit('data', array($data)); + $this->emit('data', [$data]); } elseif (\feof($this->stream)) { // no data read => we reached the end and close the stream $this->emit('end'); $this->close(); } } - - /** - * Returns whether this is a pipe resource in a legacy environment - * - * This works around a legacy PHP bug (#61019) that was fixed in PHP 5.4.28+ - * and PHP 5.5.12+ and newer. - * - * @param resource $resource - * @return bool - * @link https://github.com/reactphp/child-process/issues/40 - * - * @codeCoverageIgnore - */ - private function isLegacyPipe($resource) - { - if (\PHP_VERSION_ID < 50428 || (\PHP_VERSION_ID >= 50500 && \PHP_VERSION_ID < 50512)) { - $meta = \stream_get_meta_data($resource); - - if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') { - return true; - } - } - return false; - } } diff --git a/src/ReadableStreamInterface.php b/src/ReadableStreamInterface.php index fa3d59c..ecc5267 100644 --- a/src/ReadableStreamInterface.php +++ b/src/ReadableStreamInterface.php @@ -278,7 +278,7 @@ public function resume(); * source stream emits an `end` event. This can be disabled like this: * * ```php - * $source->pipe($dest, array('end' => false)); + * $source->pipe($dest, ['end' => false]); * ``` * * Note that this only applies to the `end` event. @@ -322,7 +322,7 @@ public function resume(); * @param array $options * @return WritableStreamInterface $dest stream as-is */ - public function pipe(WritableStreamInterface $dest, array $options = array()); + public function pipe(WritableStreamInterface $dest, array $options = []); /** * Closes the stream (forcefully). diff --git a/src/ThroughStream.php b/src/ThroughStream.php index 3b4fbb7..c49ebfc 100644 --- a/src/ThroughStream.php +++ b/src/ThroughStream.php @@ -48,7 +48,7 @@ * }); * $through->on('data', $this->expectCallableOnceWith("[2, true]\n")); * - * $through->write(array(2, true)); + * $through->write([2, true]); * ``` * * The callback function is allowed to throw an `Exception`. In this case, @@ -108,7 +108,7 @@ public function resume() } } - public function pipe(WritableStreamInterface $dest, array $options = array()) + public function pipe(WritableStreamInterface $dest, array $options = []) { return Util::pipe($this, $dest, $options); } @@ -133,14 +133,14 @@ public function write($data) try { $data = \call_user_func($this->callback, $data); } catch (\Exception $e) { - $this->emit('error', array($e)); + $this->emit('error', [$e]); $this->close(); return false; } } - $this->emit('data', array($data)); + $this->emit('data', [$data]); // emit drain event on next resume if currently paused (throttled) if ($this->paused) { diff --git a/src/Util.php b/src/Util.php index 056b037..114ccb8 100644 --- a/src/Util.php +++ b/src/Util.php @@ -13,7 +13,7 @@ final class Util * @return WritableStreamInterface $dest stream as-is * @see ReadableStreamInterface::pipe() for more details */ - public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = array()) + public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = []) { // source not readable => NO-OP if (!$source->isReadable()) { @@ -27,7 +27,7 @@ public static function pipe(ReadableStreamInterface $source, WritableStreamInter return $dest; } - $dest->emit('pipe', array($source)); + $dest->emit('pipe', [$source]); // forward all source data events as $dest->write() $source->on('data', $dataer = function ($data) use ($source, $dest) { diff --git a/src/WritableResourceStream.php b/src/WritableResourceStream.php index 1af16b1..a091e29 100644 --- a/src/WritableResourceStream.php +++ b/src/WritableResourceStream.php @@ -68,7 +68,9 @@ public function write($data) if (!$this->listening && $this->data !== '') { $this->listening = true; - $this->loop->addWriteStream($this->stream, array($this, 'handleWrite')); + $this->loop->addWriteStream($this->stream, function () { + $this->handleWrite(); + }); } return !isset($this->data[$this->softLimit - 1]); @@ -112,7 +114,6 @@ public function close() } } - /** @internal */ public function handleWrite() { $error = null; @@ -137,7 +138,7 @@ public function handleWrite() // Should this turn out to be a permanent error later, it will eventually // send *nothing* and we can detect this. if (($sent === 0 || $sent === false) && $error !== null) { - $this->emit('error', array(new \RuntimeException('Unable to write to stream: ' . $error))); + $this->emit('error', [new \RuntimeException('Unable to write to stream: ' . $error)]); $this->close(); return; diff --git a/tests/CompositeStreamTest.php b/tests/CompositeStreamTest.php index df89c3e..c858e6b 100644 --- a/tests/CompositeStreamTest.php +++ b/tests/CompositeStreamTest.php @@ -217,7 +217,7 @@ public function itShouldReceiveForwardedEvents() $composite->on('data', $this->expectCallableOnce()); $composite->on('drain', $this->expectCallableOnce()); - $readable->emit('data', array('foo')); + $readable->emit('data', ['foo']); $writable->emit('drain'); } @@ -241,7 +241,7 @@ public function itShouldHandlePipingCorrectly() $input = new ThroughStream(); $input->pipe($composite); - $input->emit('data', array('foo')); + $input->emit('data', ['foo']); } /** @test */ @@ -262,6 +262,6 @@ public function itShouldForwardPipeCallsToReadableStream() ->with('foo'); $composite->pipe($output); - $readable->emit('data', array('foo')); + $readable->emit('data', ['foo']); } } diff --git a/tests/DuplexResourceStreamTest.php b/tests/DuplexResourceStreamTest.php index e61f14b..f427f36 100644 --- a/tests/DuplexResourceStreamTest.php +++ b/tests/DuplexResourceStreamTest.php @@ -197,7 +197,10 @@ public function testDataEvent() fwrite($stream, "foobar\n"); rewind($stream); - $conn->handleData($stream); + $ref = new \ReflectionMethod($conn, 'handleData'); + $ref->setAccessible(true); + $ref->invoke($conn, 'handleData'); + $this->assertSame("foobar\n", $capturedData); } @@ -220,7 +223,10 @@ public function testDataEventDoesEmitOneChunkMatchingBufferSize() fwrite($stream, str_repeat("a", 100000)); rewind($stream); - $conn->handleData($stream); + $ref = new \ReflectionMethod($conn, 'handleData'); + $ref->setAccessible(true); + $ref->invoke($conn, 'handleData'); + $this->assertTrue($conn->isReadable()); $this->assertEquals(4321, strlen($capturedData)); @@ -246,7 +252,10 @@ public function testDataEventDoesEmitOneChunkUntilStreamEndsWhenBufferSizeIsInfi fwrite($stream, str_repeat("a", 100000)); rewind($stream); - $conn->handleData($stream); + $ref = new \ReflectionMethod($conn, 'handleData'); + $ref->setAccessible(true); + $ref->invoke($conn, 'handleData'); + $this->assertTrue($conn->isReadable()); $this->assertEquals(100000, strlen($capturedData)); @@ -263,7 +272,10 @@ public function testEmptyStreamShouldNotEmitData() $conn = new DuplexResourceStream($stream, $loop); $conn->on('data', $this->expectCallableNever()); - $conn->handleData($stream); + $ref = new \ReflectionMethod($conn, 'handleData'); + $ref->setAccessible(true); + $ref->invoke($conn, 'handleData'); + } /** @@ -427,7 +439,7 @@ public function testBufferEventsShouldBubbleUp() $conn->on('error', $this->expectCallableOnce()); $buffer->emit('drain'); - $buffer->emit('error', array(new \RuntimeException('Whoops'))); + $buffer->emit('error', [new \RuntimeException('Whoops')]); } /** @@ -447,7 +459,10 @@ public function testClosingStreamInDataEventShouldNotTriggerError() fwrite($stream, "foobar\n"); rewind($stream); - $conn->handleData($stream); + $ref = new \ReflectionMethod($conn, 'handleData'); + $ref->setAccessible(true); + $ref->invoke($conn, 'handleData'); + } /** @@ -474,7 +489,10 @@ public function testDataFiltered() fwrite($stream, "foobar\n"); rewind($stream); - $conn->handleData($stream); + $ref = new \ReflectionMethod($conn, 'handleData'); + $ref->setAccessible(true); + $ref->invoke($conn, 'handleData'); + $this->assertSame("foobr\n", $capturedData); } @@ -503,7 +521,10 @@ public function testDataErrorShouldEmitErrorAndClose() fwrite($stream, "foobar\n"); rewind($stream); - $conn->handleData($stream); + $ref = new \ReflectionMethod($conn, 'handleData'); + $ref->setAccessible(true); + $ref->invoke($conn, 'handleData'); + } private function createWriteableLoopMock() diff --git a/tests/ReadableResourceStreamTest.php b/tests/ReadableResourceStreamTest.php index f534488..7979910 100644 --- a/tests/ReadableResourceStreamTest.php +++ b/tests/ReadableResourceStreamTest.php @@ -150,7 +150,10 @@ public function testDataEvent() fwrite($stream, "foobar\n"); rewind($stream); - $conn->handleData($stream); + $ref = new \ReflectionMethod($conn, 'handleData'); + $ref->setAccessible(true); + $ref->invoke($conn, 'handleData'); + $this->assertSame("foobar\n", $capturedData); } @@ -173,7 +176,10 @@ public function testDataEventDoesEmitOneChunkMatchingBufferSize() fwrite($stream, str_repeat("a", 100000)); rewind($stream); - $conn->handleData($stream); + $ref = new \ReflectionMethod($conn, 'handleData'); + $ref->setAccessible(true); + $ref->invoke($conn, 'handleData'); + $this->assertTrue($conn->isReadable()); $this->assertEquals(4321, strlen($capturedData)); @@ -199,7 +205,10 @@ public function testDataEventDoesEmitOneChunkUntilStreamEndsWhenBufferSizeIsInfi fwrite($stream, str_repeat("a", 100000)); rewind($stream); - $conn->handleData($stream); + $ref = new \ReflectionMethod($conn, 'handleData'); + $ref->setAccessible(true); + $ref->invoke($conn, 'handleData'); + $this->assertTrue($conn->isReadable()); $this->assertEquals(100000, strlen($capturedData)); @@ -216,7 +225,10 @@ public function testEmptyStreamShouldNotEmitData() $conn = new ReadableResourceStream($stream, $loop); $conn->on('data', $this->expectCallableNever()); - $conn->handleData($stream); + $ref = new \ReflectionMethod($conn, 'handleData'); + $ref->setAccessible(true); + $ref->invoke($conn, 'handleData'); + } public function testPipeShouldReturnDestination() @@ -247,7 +259,10 @@ public function testClosingStreamInDataEventShouldNotTriggerError() fwrite($stream, "foobar\n"); rewind($stream); - $conn->handleData($stream); + $ref = new \ReflectionMethod($conn, 'handleData'); + $ref->setAccessible(true); + $ref->invoke($conn, 'handleData'); + } /** @@ -346,7 +361,10 @@ public function testDataFiltered() fwrite($stream, "foobar\n"); rewind($stream); - $conn->handleData($stream); + $ref = new \ReflectionMethod($conn, 'handleData'); + $ref->setAccessible(true); + $ref->invoke($conn, 'handleData'); + $this->assertSame("foobr\n", $capturedData); } @@ -375,7 +393,10 @@ public function testDataErrorShouldEmitErrorAndClose() fwrite($stream, "foobar\n"); rewind($stream); - $conn->handleData($stream); + $ref = new \ReflectionMethod($conn, 'handleData'); + $ref->setAccessible(true); + $ref->invoke($conn, 'handleData'); + } /** @@ -391,7 +412,9 @@ public function testEmptyReadShouldntFcloseStream() $conn->on('data', $this->expectCallableNever()); $conn->on('end', $this->expectCallableNever()); - $conn->handleData(); + $ref = new \ReflectionMethod($conn, 'handleData'); + $ref->setAccessible(true); + $ref->invoke($conn, 'handleData'); fclose($stream); fclose($_); diff --git a/tests/Stub/ReadableStreamStub.php b/tests/Stub/ReadableStreamStub.php index 6984f24..3fa56a8 100644 --- a/tests/Stub/ReadableStreamStub.php +++ b/tests/Stub/ReadableStreamStub.php @@ -20,19 +20,19 @@ public function isReadable() // trigger data event public function write($data) { - $this->emit('data', array($data)); + $this->emit('data', [$data]); } // trigger error event public function error($error) { - $this->emit('error', array($error)); + $this->emit('error', [$error]); } // trigger end event public function end() { - $this->emit('end', array()); + $this->emit('end', []); } public function pause() @@ -52,7 +52,7 @@ public function close() $this->emit('close'); } - public function pipe(WritableStreamInterface $dest, array $options = array()) + public function pipe(WritableStreamInterface $dest, array $options = []) { Util::pipe($this, $dest, $options); diff --git a/tests/ThroughStreamTest.php b/tests/ThroughStreamTest.php index 42c251a..3093136 100644 --- a/tests/ThroughStreamTest.php +++ b/tests/ThroughStreamTest.php @@ -185,7 +185,7 @@ public function pipingStuffIntoItShouldWork() $through->on('data', $this->expectCallableOnceWith('foo')); $readable->pipe($through); - $readable->emit('data', array('foo')); + $readable->emit('data', ['foo']); } /** @test */ @@ -243,7 +243,7 @@ public function writeAfterEndShouldReturnFalse() public function writeDataWillCloseStreamShouldReturnFalse() { $through = new ThroughStream(); - $through->on('data', array($through, 'close')); + $through->on('data', [$through, 'close']); $this->assertFalse($through->write('foo')); } diff --git a/tests/UtilTest.php b/tests/UtilTest.php index f12baad..47d3646 100644 --- a/tests/UtilTest.php +++ b/tests/UtilTest.php @@ -114,7 +114,7 @@ public function testPipeWithoutEnd() ->expects($this->never()) ->method('end'); - Util::pipe($readable, $writable, array('end' => false)); + Util::pipe($readable, $writable, ['end' => false]); $readable->end(); } @@ -253,12 +253,12 @@ public function forwardEventsShouldSetupForwards() $source = new ThroughStream(); $target = new ThroughStream(); - Util::forwardEvents($source, $target, array('data')); + Util::forwardEvents($source, $target, ['data']); $target->on('data', $this->expectCallableOnce()); $target->on('foo', $this->expectCallableNever()); - $source->emit('data', array('hello')); - $source->emit('foo', array('bar')); + $source->emit('data', ['hello']); + $source->emit('foo', ['bar']); } private function createLoopMock() diff --git a/tests/WritableResourceStreamTest.php b/tests/WritableResourceStreamTest.php index 678db98..6c1e658 100644 --- a/tests/WritableResourceStreamTest.php +++ b/tests/WritableResourceStreamTest.php @@ -411,7 +411,7 @@ public function testClose() $buffer->close(); $this->assertFalse($buffer->isWritable()); - $this->assertEquals(array(), $buffer->listeners('close')); + $this->assertEquals([], $buffer->listeners('close')); } /**