Skip to content
This repository has been archived by the owner on Jan 29, 2020. It is now read-only.

Commit

Permalink
Merge branch 'fix/#223-#224-high-memory-usage-in-sapi-stream-emitter'…
Browse files Browse the repository at this point in the history
… into develop

Forward port #223
Forward port #224
  • Loading branch information
Ocramius committed Jan 17, 2017
2 parents 972b0e1 + f9f3afe commit 620dd9a
Show file tree
Hide file tree
Showing 2 changed files with 239 additions and 21 deletions.
43 changes: 25 additions & 18 deletions src/Response/SapiStreamEmitter.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@ private function emitBody(ResponseInterface $response, $maxBufferLength)
{
$body = $response->getBody();

if (! $body->isSeekable()) {
if ($body->isSeekable()) {
$body->rewind();
}

if (! $body->isReadable()) {
echo $body;
return;
}

$body->rewind();
while (! $body->eof()) {
echo $body->read($maxBufferLength);
}
Expand All @@ -82,24 +84,29 @@ private function emitBodyRange(array $range, ResponseInterface $response, $maxBu

$body = $response->getBody();

if (! $body->isSeekable()) {
$contents = $body->getContents();
echo substr($contents, $first, $last - $first + 1);
return;
$length = $last - $first + 1;

if ($body->isSeekable()) {
$body->seek($first);

$first = 0;
}

$body = new RelativeStream($body, $first);
$body->rewind();
$pos = 0;
$length = $last - $first + 1;
while (! $body->eof() && $pos < $length) {
if (($pos + $maxBufferLength) > $length) {
echo $body->read($length - $pos);
break;
}
if (! $body->isReadable()) {
echo substr($body->getContents(), $first, $length);
}

echo $body->read($maxBufferLength);
$pos = $body->tell();
$remaining = $length;

while ($remaining >= $maxBufferLength && ! $body->eof()) {
$contents = $body->read($maxBufferLength);
$remaining -= strlen($contents);

echo $contents;
}

if ($remaining > 0 && ! $body->eof()) {
echo $body->read($remaining);
}
}

Expand Down
217 changes: 214 additions & 3 deletions test/Response/SapiStreamEmitterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@

namespace ZendTest\Diactoros\Response;

use PHPUnit_Framework_TestCase as TestCase;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\StreamInterface;
use Prophecy\Argument;
use Zend\Diactoros\CallbackStream;
use Zend\Diactoros\Response;
use Zend\Diactoros\Response\SapiStreamEmitter;
Expand Down Expand Up @@ -43,6 +41,7 @@ public function testDoesNotInjectContentLengthHeaderIfStreamSizeIsUnknown()
$stream = $this->prophesize('Psr\Http\Message\StreamInterface');
$stream->__toString()->willReturn('Content!');
$stream->isSeekable()->willReturn(false);
$stream->isReadable()->willReturn(false);
$stream->eof()->willReturn(true);
$stream->rewind()->willReturn(true);
$stream->getSize()->willReturn(null);
Expand All @@ -58,6 +57,218 @@ public function testDoesNotInjectContentLengthHeaderIfStreamSizeIsUnknown()
}
}

public function emitBodyProvider()
{
return [
[true, '01234567890123456789' , 10, 2],
[true, '012345678901234567890123', 10, 3],
[false, '01234567890123456789' , 10, 2],
[false, '012345678901234567890123', 10, 3],
];
}

/**
* @dataProvider emitBodyProvider
*/
public function testEmitBody($seekable, $contents, $maxBufferLength, $expectedReads = 0)
{
$position = 0;

$stream = $this->prophesize('Psr\Http\Message\StreamInterface');
$stream->getSize()->willReturn(strlen($contents));
$stream->isSeekable()->willReturn($seekable);
$stream->isReadable()->willReturn(true);
$stream->__toString()->willReturn($contents);
$stream->getContents()->willReturn($contents);
$stream->rewind()->willReturn(true);

$stream->eof()->will(function () use (& $contents, & $position) {
return ! isset($contents[$position]);
});

$stream->read(Argument::type('integer'))->will(function ($args) use (& $contents, & $position) {
$data = substr($contents, $position, $args[0]);
$position += strlen($data);

return $data;
});

$response = (new Response())
->withStatus(200)
->withBody($stream->reveal());

ob_start();
$this->emitter->emit($response, $maxBufferLength);
$stream->rewind()->shouldBeCalledTimes($seekable ? 1 : 0);
$stream->read(Argument::type('integer'))->shouldBeCalledTimes($expectedReads);
$this->assertEquals($contents, ob_get_clean());
}

public function emitMemoryUsageProvider()
{
return [
[true, 512, 1000, 20, null],
[true, 8192, 1000, 20, null],
[false, 512, 1000, 20, null],
[false, 8192, 1000, 20, null],
[true, 512, 1000, 20, [25, 75]],
[true, 8192, 1000, 20, [25, 75]],
[true, 512, 1000, 20, [250, 750]],
[true, 8192, 1000, 20, [250, 750]],
[false, 512, 1000, 20, [25, 75]],
[false, 8192, 1000, 20, [25, 75]],
[false, 512, 1000, 20, [250, 750]],
[false, 8192, 1000, 20, [250, 750]],
];
}

/**
* @dataProvider emitMemoryUsageProvider
*/
public function testEmitMemoryUsage($seekable, $maxBufferLength, $sizeBlocks, $maxAllowedBlocks, $rangeBlocks)
{
$sizeBytes = $maxBufferLength * $sizeBlocks;
$maxAllowedMemoryUsage = $maxBufferLength * $maxAllowedBlocks;
$peakBufferLength = 0;
$peakMemoryUsage = 0;

$position = 0;

if ($rangeBlocks) {
$first = $maxBufferLength * $rangeBlocks[0];
$last = $maxBufferLength * $rangeBlocks[1];
$position = $first;
}

$closureTrackMemoryUsage = function () use (& $peakMemoryUsage) {
$peakMemoryUsage = max($peakMemoryUsage, memory_get_usage());
};

$closureFullContents = function () use (& $sizeBytes) {
return str_repeat('0', $sizeBytes);
};

$stream = $this->prophesize('Psr\Http\Message\StreamInterface');
$stream->getSize()->willReturn($sizeBytes);
$stream->isSeekable()->willReturn($seekable);
$stream->isReadable()->willReturn(true);
$stream->__toString()->will($closureFullContents);
$stream->getContents()->will($closureFullContents);
$stream->rewind()->willReturn(true);

$stream->seek(Argument::type('integer'), Argument::any())->will(function ($args) use (& $position) {
$position = $args[0];
return true;
});

$stream->eof()->will(function () use (& $sizeBytes, & $position) {
return ($position >= $sizeBytes);
});

$stream->tell()->will(function () use (& $position) {
return $position;
});

$stream->read(Argument::type('integer'))->will(function ($args) use (& $position, & $peakBufferLength) {
if ($args[0] > $peakBufferLength) {
$peakBufferLength = $args[0];
}

$position += $args[0];

return str_repeat('0', $args[0]);
});

$response = (new Response())
->withStatus(200)
->withBody($stream->reveal());


if ($rangeBlocks) {
$response = $response->withHeader('Content-Range', 'bytes ' . $first . '-' . $last . '/*');
}

ob_start(
function () use (& $closureTrackMemoryUsage) {
$closureTrackMemoryUsage();

return '';
},
$maxBufferLength
);

gc_collect_cycles();

$this->emitter->emit($response, $maxBufferLength);

ob_end_flush();

gc_collect_cycles();

$localMemoryUsage = memory_get_usage();

$this->assertLessThanOrEqual($maxBufferLength, $peakBufferLength);
$this->assertLessThanOrEqual($maxAllowedMemoryUsage, $peakMemoryUsage - $localMemoryUsage);
}

public function emitBodyRangeProvider()
{
return [
[true, '01234567890123456789' , ['bytes', 10, 20, '*'], 10, 1],
[true, '012345678901234567890123', ['bytes', 10, 40, '*'], 10, 2],
[false, '01234567890123456789' , ['bytes', 11, 20, '*'], 10, 1],
[false, '012345678901234567890123', ['bytes', 11, 40, '*'], 10, 2],
];
}

/**
* @dataProvider emitBodyRangeProvider
*/
public function testEmitBodyRange($seekable, $contents, $range, $maxBufferLength, $expectedReads = 0)
{
list($unit, $first, $last, $length) = $range;

$position = $first;

$stream = $this->prophesize('Psr\Http\Message\StreamInterface');
$stream->getSize()->willReturn(strlen($contents));
$stream->isSeekable()->willReturn($seekable);
$stream->isReadable()->willReturn(true);
$stream->__toString()->willReturn($contents);
$stream->getContents()->willReturn($contents);
$stream->rewind()->willReturn(true);

$stream->seek(Argument::type('integer'), Argument::any())->will(function ($args) use (& $position) {
$position = $args[0];
return true;
});

$stream->eof()->will(function () use (& $contents, & $position) {
return ! isset($contents[$position]);
});

$stream->tell()->will(function () use (& $position) {
return $position;
});

$stream->read(Argument::type('integer'))->will(function ($args) use (& $contents, & $position) {
$data = substr($contents, $position, $args[0]);
$position += strlen($data);
return $data;
});

$response = (new Response())
->withStatus(200)
->withHeader('Content-Range', "$unit $first-$last/$length")
->withBody($stream->reveal());

ob_start();
$this->emitter->emit($response, $maxBufferLength);
$stream->seek(Argument::type('integer'), Argument::any())->shouldBeCalledTimes($seekable ? 1 : 0);
$stream->read(Argument::type('integer'))->shouldBeCalledTimes($expectedReads);
$this->assertEquals(substr($contents, $first, $last - $first + 1), ob_get_clean());
}

public function contentRangeProvider()
{
return [
Expand Down

0 comments on commit 620dd9a

Please sign in to comment.