Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix exception on re-create server nodes #87

Merged
merged 24 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* fixed exception on re-create server nodes
* fixed key name in createTable function
* added simple std looger

Expand Down
2 changes: 2 additions & 0 deletions src/Discovery.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class Discovery
*/
public function __construct(Ydb $ydb, LoggerInterface $logger = null)
{
$this->ydb = $ydb;

$this->client = new ServiceClient($ydb->endpoint(), [
'credentials' => $ydb->iam()->getCredentials(),
]);
Expand Down
26 changes: 18 additions & 8 deletions src/Logger/SimpleStdLogger.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,17 @@ class SimpleStdLogger implements \Psr\Log\LoggerInterface
self::EMERGENCY => 'EMERGENCY',
];

protected static function getLevelName(int $level): string
{
if (!isset(static::$levels[$level])) {
throw new InvalidArgumentException('Level "'.$level.'" is not defined, use one of: '.implode(', ', array_keys(static::$levels)));
}

return static::$levels[$level];
}

protected $level;

public function __construct(int $level)
{
$this->level = $level;
Expand All @@ -37,44 +47,44 @@ public function emergency($message, array $context = []): void

public function alert($message, array $context = []): void
{
$this->log(self::EMERGENCY, $message, $context);
$this->log(self::ALERT, $message, $context);
}

public function critical($message, array $context = []): void
{
$this->log(self::EMERGENCY, $message, $context);
$this->log(self::CRITICAL, $message, $context);
}

public function error($message, array $context = []): void
{
$this->log(self::EMERGENCY, $message, $context);
$this->log(self::ERROR, $message, $context);
}

public function warning($message, array $context = []): void
{
$this->log(self::EMERGENCY, $message, $context);
$this->log(self::WARNING, $message, $context);
}

public function notice($message, array $context = []): void
{
$this->log(self::EMERGENCY, $message, $context);
$this->log(self::NOTICE, $message, $context);
}

public function info($message, array $context = []): void
{
$this->log(self::EMERGENCY, $message, $context);
$this->log(self::INFO, $message, $context);
}

public function debug($message, array $context = []): void
{
$this->log(self::EMERGENCY, $message, $context);
$this->log(self::DEBUG, $message, $context);
}

public function log($level, $message, array $context = []): void
{
if ($level>$this->level) return;
fwrite(STDERR,
date("d/m/y H:i")." ".$level. " ".$message." ".json_encode($context)
date("d/m/y H:i:s")." ".self::getLevelName($level). " ".$message." ".json_encode($context)."\n"
);
}
}
2 changes: 2 additions & 0 deletions src/Operations.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class Operations
*/
public function __construct(Ydb $ydb, LoggerInterface $logger = null)
{
$this->ydb = $ydb;

$this->client = new ServiceClient($ydb->endpoint(), [
'credentials' => $ydb->iam()->getCredentials(),
]);
Expand Down
33 changes: 19 additions & 14 deletions src/Retry/Retry.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,8 @@

use Closure;
use YdbPlatform\Ydb\Exception;
use YdbPlatform\Ydb\Exceptions\Grpc\DeadlineExceededException;
use YdbPlatform\Ydb\Exceptions\NonRetryableException;
use YdbPlatform\Ydb\Exceptions\RetryableException;
use YdbPlatform\Ydb\Exceptions\Ydb\AbortedException;
use YdbPlatform\Ydb\Exceptions\Ydb\BadSessionException;
use YdbPlatform\Ydb\Exceptions\Ydb\SessionBusyException;
use YdbPlatform\Ydb\Exceptions\Ydb\UnavailableException;
use YdbPlatform\Ydb\Exceptions\Ydb\UndeterminedException;
use Psr\Log\LoggerInterface;

class Retry
{
Expand All @@ -22,9 +16,14 @@ class Retry

protected $fastBackOff;
protected $noBackOff;
/**
* @var LoggerInterface
*/
protected $logger;

public function __construct()
public function __construct(LoggerInterface $logger)
{
$this->logger = $logger;
$this->timeoutMs = 2000;
$this->fastBackOff = new Backoff(6, 5);
$this->slowBackOff = new Backoff(6, 1000);
Expand Down Expand Up @@ -79,15 +78,19 @@ public function retry(Closure $closure, bool $idempotent)
$retryCount = 0;
$lastException = null;
while (microtime(true) < $startTime + $this->timeoutMs / 1000) {
$this->logger->debug("YDB: Run user function. Retry count: $retryCount. Ms: ".(microtime(true) - $startTime));
try {
return $closure();
} catch (Exception $e) {
$this->logger->warning("YDB: Received exception: ".$e->getMessage());
if (!$this->canRetry($e, $idempotent)){
throw $e;
$lastException = $e;
break;
}
$retryCount++;
$this->retryDelay($retryCount, $this->backoffType($e));
$lastException = $e;
$delay = $this->retryDelay($retryCount, $this->backoffType($e))*1000;
usleep($delay);
}
}
throw $lastException;
Expand All @@ -99,18 +102,20 @@ public function retry(Closure $closure, bool $idempotent)
*/
protected function backoffType(string $e): Backoff
{
return in_array($e, self::$immediatelyBackoff)?$this->noBackOff:
(in_array($e, self::$fastBackoff)?$this->fastBackOff:$this->slowBackOff);
return in_array($e, self::$immediatelyBackoff) ? $this->noBackOff :
(in_array($e, self::$fastBackoff) ? $this->fastBackOff : $this->slowBackOff);
}

protected function alwaysRetry(string $exception){
protected function alwaysRetry(string $exception)
{
return in_array($exception, self::$alwaysRetry);
}

protected function canRetry(Exception $e, bool $idempotent)
{
return is_a($e, RetryableException::class)&&($this->alwaysRetry(get_class($e)) || $idempotent);
return is_a($e, RetryableException::class) && ($this->alwaysRetry(get_class($e)) || $idempotent);
}

private static $immediatelyBackoff = [
\YdbPlatform\Ydb\Exceptions\Grpc\AbortedException::class,
\YdbPlatform\Ydb\Exceptions\Ydb\BadSessionException::class,
Expand Down
2 changes: 2 additions & 0 deletions src/Scheme.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class Scheme
*/
public function __construct(Ydb $ydb, LoggerInterface $logger = null)
{
$this->ydb = $ydb;

$this->client = new ServiceClient($ydb->endpoint(), [
'credentials' => $ydb->iam()->getCredentials(),
]);
Expand Down
2 changes: 2 additions & 0 deletions src/Scripting.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class Scripting
*/
public function __construct(Ydb $ydb, LoggerInterface $logger = null)
{
$this->ydb = $ydb;

$this->client = new ServiceClient($ydb->endpoint(), [
'credentials' => $ydb->iam()->getCredentials(),
]);
Expand Down
2 changes: 2 additions & 0 deletions src/Session.php
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class Session
*/
public function __construct(Table $table, $session_id)
{
$this->ydb = $table->ydb();

$this->table = $table;

$this->session_id = $session_id;
Expand Down
19 changes: 16 additions & 3 deletions src/Table.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,19 @@ class Table
*/
private $retry;

/**
* @var Ydb
*/
protected $ydb;

/**
* @param Ydb $ydb
* @param LoggerInterface|null $logger
*/
public function __construct(Ydb $ydb, LoggerInterface $logger = null, Retry &$retry)
{
$this->ydb = $ydb;

$this->client = new ServiceClient($ydb->endpoint(), [
'credentials' => $ydb->iam()->getCredentials(),
]);
Expand Down Expand Up @@ -142,6 +149,14 @@ public function session()
return $session;
}

/**
* @return Ydb
*/
public function ydb()
{
return $this->ydb;
}

/**
* @return Session|null
*/
Expand Down Expand Up @@ -468,8 +483,7 @@ public function retrySession(Closure $userFunc, bool $idempotent = false, RetryP

public function retryTransaction(Closure $userFunc, bool $idempotent = false, RetryParams $params = null){

return $this->retry->withParams($params)->retry(function () use ($params, $idempotent, $userFunc){
$this->retrySession(function (Session $session) use ($userFunc) {
return $this->retrySession(function (Session $session) use ($userFunc) {
try{
$session->beginTransaction();
$result = $userFunc($session);
Expand All @@ -482,7 +496,6 @@ public function retryTransaction(Closure $userFunc, bool $idempotent = false, Re
throw $exception;
}
}, $idempotent, $params);
}, $idempotent);

}

Expand Down
67 changes: 64 additions & 3 deletions src/Traits/RequestTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use YdbPlatform\Ydb\Issue;
use YdbPlatform\Ydb\Exception;
use YdbPlatform\Ydb\QueryResult;
use YdbPlatform\Ydb\Ydb;

trait RequestTrait
{
Expand All @@ -30,6 +31,16 @@ trait RequestTrait
*/
protected $last_request_try_count = 0;

/**
* @var Ydb
*/
protected $ydb;

/**
* @var int
*/
protected $lastDiscovery = 0;

/**
* Make a request to the service with the given method.
*
Expand All @@ -41,6 +52,8 @@ trait RequestTrait
*/
protected function doRequest($service, $method, array $data = [])
{
$this->checkDiscovery();

$this->meta['x-ydb-auth-ticket'] = [$this->credentials->token()];

$this->saveLastRequest($service, $method, $data);
Expand Down Expand Up @@ -82,7 +95,7 @@ protected function doRequest($service, $method, array $data = [])
if (method_exists($call, 'wait')) {
list($response, $status) = $call->wait();

$this->checkGrpcStatus($service, $method, $status);
$this->handleGrpcStatus($service, $method, $status);

return $this->processResponse($service, $method, $response, $resultClass);
}
Expand All @@ -101,6 +114,10 @@ protected function doRequest($service, $method, array $data = [])
*/
protected function doStreamRequest($service, $method, $data = [])
{
$this->checkDiscovery();

$this->meta['x-ydb-auth-ticket'] = [$this->credentials->token()];

if (method_exists($this, 'take')) {
$this->take();
}
Expand Down Expand Up @@ -149,10 +166,25 @@ protected function doStreamRequest($service, $method, $data = [])
* @param object $status
* @throws Exception
*/
protected function checkGrpcStatus($service, $method, $status)
protected function handleGrpcStatus($service, $method, $status)
{
if (isset($status->code) && $status->code !== 0) {
$message = 'YDB ' . $service . ' ' . $method . ' (status code GRPC_' . $status->code . '): ' . ($status->details ?? 'no details');
$message = 'YDB ' . $service . ' ' . $method . ' (status code GRPC_'.
(isset(self::$grpcExceptions[$status->code])?self::$grpcNames[$status->code]:$status->code)
.' ' . $status->code . '): ' . ($status->details ?? 'no details');
$this->logger->error($message);
if ($this->ydb->needDiscovery()){
try{
$this->ydb->discover();
}catch (\Exception $e){}
}
$endpoint = $this->ydb->endpoint();
if ($this->ydb->needDiscovery()){
$endpoint = $this->ydb->cluster()->all()[array_rand($this->ydb->cluster()->all())]->endpoint();
}
$this->client = new $this->client($endpoint,[
'credentials' => $this->ydb->iam()->getCredentials()
]);
if (isset(self::$grpcExceptions[$status->code])) {
throw new self::$grpcExceptions[$status->code]($message);
} else {
Expand Down Expand Up @@ -272,6 +304,17 @@ protected function resetLastRequest()
$this->last_request_try_count = 0;
}

protected function checkDiscovery(){
if ($this->ydb->needDiscovery() && time()-$this->lastDiscovery>$this->ydb->discoveryInterval()){
try{
$this->lastDiscovery = time();
$this->ydb->discover();
} catch (\Exception $e){

}
}
}

private static $ydbExceptions = [
StatusCode::STATUS_CODE_UNSPECIFIED => \YdbPlatform\Ydb\Exceptions\Ydb\StatusCodeUnspecified::class,
StatusCode::BAD_REQUEST => \YdbPlatform\Ydb\Exceptions\Ydb\BadRequestException::class,
Expand Down Expand Up @@ -313,4 +356,22 @@ protected function resetLastRequest()
16 => \YdbPlatform\Ydb\Exceptions\Grpc\UnauthenticatedException::class
];

private static $grpcNames = [
1 => "CANCELLED",
2 => "UNKNOWN",
3 => "INVALID_ARGUMENT",
4 => "DEADLINE_EXCEEDED",
5 => "NOT_FOUND",
6 => "ALREADY_EXISTS",
7 => "PERMISSION_DENIED",
8 => "RESOURCE_EXHAUSTED",
9 => "FAILED_PRECONDITION",
10 => "ABORTED",
11 => "OUT_OF_RANGE",
12 => "UNIMPLEMENTED",
13 => "INTERNAL",
14 => "UNAVAILABLE",
15 => "DATA_LOSS",
16 => "UNAUTHENTICATED"
];
}
Loading