Skip to content

Commit

Permalink
Do not fclose socket or null thread ref; restore GC test
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Dec 27, 2017
1 parent e6a5de6 commit 0d40e6d
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 51 deletions.
30 changes: 5 additions & 25 deletions lib/Context/Thread.php
Original file line number Diff line number Diff line change
Expand Up @@ -143,33 +143,18 @@ public function start() {

list($channel, $this->socket) = $sockets;

$this->thread = new Internal\Thread($this->socket, $this->function, $this->args);
$thread = $this->thread = new Internal\Thread($this->socket, $this->function, $this->args);

if (!$this->thread->start(\PTHREADS_INHERIT_INI)) {
throw new ContextException('Failed to start the thread.');
}

$this->channel = new ChannelledSocket($channel, $channel);
$channel = $this->channel = new ChannelledSocket($channel, $channel);

$thread = &$this->thread;
$channel = &$this->channel;
$socket = &$this->socket;

$this->watcher = Loop::repeat(self::EXIT_CHECK_FREQUENCY, static function ($watcher) use (&$thread, &$channel, &$socket) {
if ($thread === null || !$thread->isRunning()) {
$this->watcher = Loop::repeat(self::EXIT_CHECK_FREQUENCY, static function ($watcher) use ($thread, $channel) {
if (!$thread->isRunning()) {
// Delay closing to avoid race condition between thread exiting and data becoming available.
Loop::delay(self::EXIT_CHECK_FREQUENCY, function () use (&$thread, &$channel, &$socket) {
if ($channel !== null) {
$channel->close();
}

if (\is_resource($socket)) {
@\fclose($socket);
}

$channel = null;
$thread = null;
});
Loop::delay(self::EXIT_CHECK_FREQUENCY, [$channel, "close"]);
Loop::cancel($watcher);
}
});
Expand Down Expand Up @@ -202,11 +187,6 @@ private function close() {
$this->channel->close();
}

if (\is_resource($this->socket)) {
@\fclose($this->socket);
}

$this->thread = null;
$this->channel = null;
Loop::cancel($this->watcher);
}
Expand Down
20 changes: 20 additions & 0 deletions test/Worker/AbstractPoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,24 @@ public function testBusyPool() {
yield $pool->shutdown();
});
}

public function testCleanGarbageCollection() {
// See https://github.com/amphp/parallel-functions/issues/5
Loop::run(function () {
for ($i = 0; $i < 15; $i++) {
$pool = $this->createPool(32);

$values = \range(1, 50);
$tasks = \array_map(function (int $value): Task {
return new TestTask($value);
}, $values);

$promises = \array_map(function (Task $task) use ($pool): Promise {
return $pool->enqueue($task);
}, $tasks);

$this->assertSame($values, yield $promises);
}
});
}
}
26 changes: 0 additions & 26 deletions test/Worker/ProcessPoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@

namespace Amp\Parallel\Test\Worker;

use Amp\Loop;
use Amp\Parallel\Worker\DefaultPool;
use Amp\Parallel\Worker\Pool;
use Amp\Parallel\Worker\Task;
use Amp\Parallel\Worker\WorkerFactory;
use Amp\Parallel\Worker\WorkerProcess;
use Amp\Promise;

/**
* @group process
Expand All @@ -22,27 +19,4 @@ protected function createPool($max = Pool::DEFAULT_MAX_SIZE): Pool {

return new DefaultPool($max, $factory);
}

/**
* @FIXME This test should be moved to AbstractPoolTest once the GC issues with pthreads are resolved.
*/
public function testCleanGarbageCollection() {
// See https://github.com/amphp/parallel-functions/issues/5
Loop::run(function () {
for ($i = 0; $i < 15; $i++) {
$pool = $this->createPool(32);

$values = \range(1, 50);
$tasks = \array_map(function (int $value): Task {
return new TestTask($value);
}, $values);

$promises = \array_map(function (Task $task) use ($pool): Promise {
return $pool->enqueue($task);
}, $tasks);

$this->assertSame($values, yield $promises);
}
});
}
}

0 comments on commit 0d40e6d

Please sign in to comment.