From 0d40e6da980d56de8bbd292668becef6ed45a2a7 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Wed, 27 Dec 2017 12:36:28 -0600 Subject: [PATCH] Do not fclose socket or null thread ref; restore GC test --- lib/Context/Thread.php | 30 +++++------------------------- test/Worker/AbstractPoolTest.php | 20 ++++++++++++++++++++ test/Worker/ProcessPoolTest.php | 26 -------------------------- 3 files changed, 25 insertions(+), 51 deletions(-) diff --git a/lib/Context/Thread.php b/lib/Context/Thread.php index f04a6ba4..86a95b46 100644 --- a/lib/Context/Thread.php +++ b/lib/Context/Thread.php @@ -143,33 +143,18 @@ public function start() { list($channel, $this->socket) = $sockets; - $this->thread = new Internal\Thread($this->socket, $this->function, $this->args); + $thread = $this->thread = new Internal\Thread($this->socket, $this->function, $this->args); if (!$this->thread->start(\PTHREADS_INHERIT_INI)) { throw new ContextException('Failed to start the thread.'); } - $this->channel = new ChannelledSocket($channel, $channel); + $channel = $this->channel = new ChannelledSocket($channel, $channel); - $thread = &$this->thread; - $channel = &$this->channel; - $socket = &$this->socket; - - $this->watcher = Loop::repeat(self::EXIT_CHECK_FREQUENCY, static function ($watcher) use (&$thread, &$channel, &$socket) { - if ($thread === null || !$thread->isRunning()) { + $this->watcher = Loop::repeat(self::EXIT_CHECK_FREQUENCY, static function ($watcher) use ($thread, $channel) { + if (!$thread->isRunning()) { // Delay closing to avoid race condition between thread exiting and data becoming available. - Loop::delay(self::EXIT_CHECK_FREQUENCY, function () use (&$thread, &$channel, &$socket) { - if ($channel !== null) { - $channel->close(); - } - - if (\is_resource($socket)) { - @\fclose($socket); - } - - $channel = null; - $thread = null; - }); + Loop::delay(self::EXIT_CHECK_FREQUENCY, [$channel, "close"]); Loop::cancel($watcher); } }); @@ -202,11 +187,6 @@ private function close() { $this->channel->close(); } - if (\is_resource($this->socket)) { - @\fclose($this->socket); - } - - $this->thread = null; $this->channel = null; Loop::cancel($this->watcher); } diff --git a/test/Worker/AbstractPoolTest.php b/test/Worker/AbstractPoolTest.php index 0845d741..c53df2eb 100644 --- a/test/Worker/AbstractPoolTest.php +++ b/test/Worker/AbstractPoolTest.php @@ -130,4 +130,24 @@ public function testBusyPool() { yield $pool->shutdown(); }); } + + public function testCleanGarbageCollection() { + // See https://github.com/amphp/parallel-functions/issues/5 + Loop::run(function () { + for ($i = 0; $i < 15; $i++) { + $pool = $this->createPool(32); + + $values = \range(1, 50); + $tasks = \array_map(function (int $value): Task { + return new TestTask($value); + }, $values); + + $promises = \array_map(function (Task $task) use ($pool): Promise { + return $pool->enqueue($task); + }, $tasks); + + $this->assertSame($values, yield $promises); + } + }); + } } diff --git a/test/Worker/ProcessPoolTest.php b/test/Worker/ProcessPoolTest.php index f7379526..b21c2684 100644 --- a/test/Worker/ProcessPoolTest.php +++ b/test/Worker/ProcessPoolTest.php @@ -2,13 +2,10 @@ namespace Amp\Parallel\Test\Worker; -use Amp\Loop; use Amp\Parallel\Worker\DefaultPool; use Amp\Parallel\Worker\Pool; -use Amp\Parallel\Worker\Task; use Amp\Parallel\Worker\WorkerFactory; use Amp\Parallel\Worker\WorkerProcess; -use Amp\Promise; /** * @group process @@ -22,27 +19,4 @@ protected function createPool($max = Pool::DEFAULT_MAX_SIZE): Pool { return new DefaultPool($max, $factory); } - - /** - * @FIXME This test should be moved to AbstractPoolTest once the GC issues with pthreads are resolved. - */ - public function testCleanGarbageCollection() { - // See https://github.com/amphp/parallel-functions/issues/5 - Loop::run(function () { - for ($i = 0; $i < 15; $i++) { - $pool = $this->createPool(32); - - $values = \range(1, 50); - $tasks = \array_map(function (int $value): Task { - return new TestTask($value); - }, $values); - - $promises = \array_map(function (Task $task) use ($pool): Promise { - return $pool->enqueue($task); - }, $tasks); - - $this->assertSame($values, yield $promises); - } - }); - } }