Skip to content

Commit

Permalink
Merge pull request #9 from sartor/master
Browse files Browse the repository at this point in the history
Fix infinitive loop on broken pipe
Fix socket unlock
  • Loading branch information
sartor authored Apr 24, 2019
2 parents c6653e6 + 650bea1 commit 5829b3c
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 3 deletions.
23 changes: 21 additions & 2 deletions Command.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ protected function queryInternal($method, $fetchMode = null)
{
$rawSql = $this->getRawSql();

// Add LIMIT 1 for single result SELECT queries to save transmission bandwidth and properly reuse socket
if (in_array($fetchMode, ['fetch', 'fetchColumn']) && !preg_match('/LIMIT\s+\d+$/i', $rawSql)) {
Yii::trace('LIMIT 1 added for single result query explicitly! Try to add LIMIT 1 manually', 'bashkarev\clickhouse\Command::query');
$rawSql = $rawSql.' LIMIT 1';
}

Yii::info($rawSql, 'bashkarev\clickhouse\Command::query');
if ($method !== '') {
$info = $this->db->getQueryCacheInfo($this->queryCacheDuration, $this->queryCacheDependency);
Expand Down Expand Up @@ -199,7 +205,10 @@ protected function fetchColumn(\Generator $generator, $mode)
if (!$generator->valid()) {
return false;
}
return current($generator->current());
$result = current($generator->current());
$this->readRest($generator);

return $result;
}

/**
Expand All @@ -212,7 +221,17 @@ protected function fetch(\Generator $generator, $mode)
if (!$generator->valid()) {
return false;
}
return $generator->current();
$result = $generator->current();
$this->readRest($generator);

return $result;
}

private function readRest(\Generator $generator)
{
while ($generator->valid()) {
$generator->next();
}
}

}
2 changes: 1 addition & 1 deletion Socket.php
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public function write($string, $length = null)

while (true) {
$bytes = @fwrite($this->socket, $string);
if ($bytes === false) {
if ($bytes === false || $bytes === 0) {
$message = "Failed to write to socket";
if ($error = error_get_last()) {
$message .= sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
Expand Down
97 changes: 97 additions & 0 deletions tests/SocketTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
<?php
namespace bashkarev\clickhouse\tests;

use bashkarev\clickhouse\Configuration;
use bashkarev\clickhouse\Socket;
use bashkarev\clickhouse\SocketException;

/**
* @author Sartor <sartorua@gmail.com>
*/
class SocketTest extends TestCase
{
private $server;
private $port;

protected function setUp()
{
parent::setUp();

$this->prepareDummyServer();
}

protected function tearDown()
{
$this->shutdownDummyServer();

parent::tearDown();
}

private function prepareDummyServer()
{
$this->server = stream_socket_server("tcp://0.0.0.0:0");
$name = stream_socket_get_name($this->server, 0);

$this->port = substr($name, strpos($name, ':') + 1);
}

private function shutdownDummyServer()
{
fclose($this->server);
}

private function socket(): Socket
{
$config = new Configuration('host=0.0.0.0;port='.$this->port, '', '');

return new Socket($config);
}

public function testOpen()
{
$socket = $this->socket();

$socket->close();
}

public function testWrite()
{
$socket = $this->socket();

$client = stream_socket_accept($this->server);

$socket->write("Test string");

$socket->close();

$result = fgets($client);

$this->assertEquals("Test string", $result);
}

public function testBrokenPipe()
{
$this->expectException(SocketException::class);
$socket = $this->socket();

$socket->write("Write to valid pipe");

$socket->close();

$socket->write("Write to broken pipe");
}

public function testRead()
{
$socket = $this->socket();

$client = stream_socket_accept($this->server);

fwrite($client, "Test read string");
fclose($client);

$result = fgets($socket->getNative());

$this->assertEquals("Test read string", $result);
}
}

0 comments on commit 5829b3c

Please sign in to comment.