Skip to content

Commit

Permalink
Merge pull request #139 from neo4j-php/open-streams-counter
Browse files Browse the repository at this point in the history
added track of open streams in transaction. updated server state hand…
  • Loading branch information
stefanak-michal authored Apr 3, 2024
2 parents f6d1ad8 + f442bcd commit 219999c
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 12 deletions.
14 changes: 12 additions & 2 deletions src/protocol/AProtocol.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ abstract class AProtocol

public ServerState $serverState;

/**
* Multiple RUN statements in transaction generates "streams" which are pulled or discarded
* We are keeping track of open streams to keep valid Server State
* @link https://www.neo4j.com/docs/bolt/current/bolt/message/#transaction
* @var int
*/
protected int $openStreams = 0;

/**
* @throws UnpackException
* @throws PackException
Expand Down Expand Up @@ -141,8 +149,10 @@ public function getResponse(): Response
foreach (($this->serverStateTransition ?? []) as $transition) {
if ($transition[0] === $serverState && $transition[1] === $response->message && $transition[2] === $response->signature) {
$this->serverState = $transition[3];
if ($response->signature === Signature::SUCCESS && ($response->content['has_more'] ?? false))
$this->serverState = ($serverState === ServerState::TX_READY || $serverState === ServerState::TX_STREAMING) ? ServerState::TX_STREAMING : ServerState::STREAMING;
if (in_array($response->message, [Message::PULL, Message::DISCARD], true)
&& $response->signature === Signature::SUCCESS
&& (($response->content['has_more'] ?? false) || $this->openStreams))
$this->serverState = $this->serverState === ServerState::TX_READY ? ServerState::TX_STREAMING : ServerState::STREAMING;
if ($transition[3] === ServerState::DEFUNCT)
$this->connection->disconnect();
break;
Expand Down
14 changes: 13 additions & 1 deletion src/protocol/v3/CommitMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace Bolt\protocol\v3;

use Bolt\enum\Message;
use Bolt\protocol\{V3, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4};
use Bolt\protocol\{Response, V3, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4};
use Bolt\error\BoltException;

trait CommitMessage
Expand All @@ -21,4 +21,16 @@ public function commit(): V3|V4|V4_1|V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4
$this->pipelinedMessages[] = Message::COMMIT;
return $this;
}

/**
* Read COMMIT response
* @return iterable
* @throws BoltException
*/
protected function _commit(): iterable
{
$this->openStreams = 0;
$content = $this->read($signature);
yield new Response(Message::COMMIT, $signature, $content);
}
}
14 changes: 13 additions & 1 deletion src/protocol/v3/RollbackMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace Bolt\protocol\v3;

use Bolt\enum\Message;
use Bolt\protocol\{V3, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4};
use Bolt\protocol\{Response, V3, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4};
use Bolt\error\BoltException;

trait RollbackMessage
Expand All @@ -21,4 +21,16 @@ public function rollback(): V3|V4|V4_1|V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4
$this->pipelinedMessages[] = Message::ROLLBACK;
return $this;
}

/**
* Read ROLLBACK response
* @return iterable
* @throws BoltException
*/
protected function _rollback(): iterable
{
$this->openStreams = 0;
$content = $this->read($signature);
yield new Response(Message::ROLLBACK, $signature, $content);
}
}
15 changes: 14 additions & 1 deletion src/protocol/v3/RunMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace Bolt\protocol\v3;

use Bolt\enum\Message;
use Bolt\protocol\{V3, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4};
use Bolt\protocol\{Response, V3, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4};
use Bolt\error\BoltException;

trait RunMessage
Expand All @@ -26,4 +26,17 @@ public function run(string $query, array $parameters = [], array $extra = []): V
$this->pipelinedMessages[] = Message::RUN;
return $this;
}

/**
* Read RUN response
* @return iterable
* @throws BoltException
*/
protected function _run(): iterable
{
$content = $this->read($signature);
if (array_key_exists('qid', $content))
$this->openStreams++;
yield new Response(Message::RUN, $signature, $content);
}
}
16 changes: 15 additions & 1 deletion src/protocol/v4/DiscardMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
namespace Bolt\protocol\v4;

use Bolt\enum\Message;
use Bolt\protocol\{V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4};
use Bolt\enum\Signature;
use Bolt\protocol\{Response, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4};
use Bolt\error\BoltException;

trait DiscardMessage
Expand All @@ -24,4 +25,17 @@ public function discard(array $extra = []): V4|V4_1|V4_2|V4_3|V4_4|V5|V5_1|V5_2|
$this->pipelinedMessages[] = Message::DISCARD;
return $this;
}

/**
* Read DISCARD response
* @return iterable
* @throws BoltException
*/
protected function _discard(): iterable
{
$content = $this->read($signature);
if (!($content['has_more'] ?? false) && $this->openStreams)
$this->openStreams = $signature === Signature::SUCCESS ? $this->openStreams - 1 : 0;
yield new Response(Message::DISCARD, $signature, $content);
}
}
10 changes: 8 additions & 2 deletions src/protocol/v4/PullMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,20 @@ public function pull(array $extra = []): V4|V4_1|V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_
}

/**
* Read PULL response
* @return array
* Read PULL responses
* @return iterable
* @throws BoltException
*/
protected function _pull(): iterable
{
do {
$content = $this->read($signature);
if (!($content['has_more'] ?? false) && $this->openStreams) {
if ($signature === Signature::SUCCESS)
$this->openStreams--;
elseif ($signature === Signature::FAILURE)
$this->openStreams = 0;
}
yield new Response(Message::PULL, $signature, $content);
} while ($signature == Signature::RECORD);
}
Expand Down
4 changes: 0 additions & 4 deletions src/protocol/v5_1/ServerStateTransition.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ trait ServerStateTransition
[ServerState::READY, Message::ROUTE, Signature::SUCCESS, ServerState::READY],
[ServerState::READY, Message::RESET, Signature::SUCCESS, ServerState::READY],
[ServerState::READY, Message::RESET, Signature::FAILURE, ServerState::DEFUNCT],
[ServerState::STREAMING, Message::PULL, Signature::SUCCESS, ServerState::STREAMING],
[ServerState::STREAMING, Message::PULL, Signature::SUCCESS, ServerState::READY],
[ServerState::STREAMING, Message::PULL, Signature::FAILURE, ServerState::FAILED],
[ServerState::STREAMING, Message::DISCARD, Signature::SUCCESS, ServerState::STREAMING],
[ServerState::STREAMING, Message::DISCARD, Signature::SUCCESS, ServerState::READY],
[ServerState::STREAMING, Message::DISCARD, Signature::FAILURE, ServerState::FAILED],
[ServerState::STREAMING, Message::RESET, Signature::SUCCESS, ServerState::READY],
Expand All @@ -40,10 +38,8 @@ trait ServerStateTransition
[ServerState::TX_READY, Message::RESET, Signature::FAILURE, ServerState::DEFUNCT],
[ServerState::TX_STREAMING, Message::RUN, Signature::SUCCESS, ServerState::TX_STREAMING],
[ServerState::TX_STREAMING, Message::RUN, Signature::FAILURE, ServerState::FAILED],
[ServerState::TX_STREAMING, Message::PULL, Signature::SUCCESS, ServerState::TX_STREAMING],
[ServerState::TX_STREAMING, Message::PULL, Signature::SUCCESS, ServerState::TX_READY],
[ServerState::TX_STREAMING, Message::PULL, Signature::FAILURE, ServerState::FAILED],
[ServerState::TX_STREAMING, Message::DISCARD, Signature::SUCCESS, ServerState::TX_STREAMING],
[ServerState::TX_STREAMING, Message::DISCARD, Signature::SUCCESS, ServerState::TX_READY],
[ServerState::TX_STREAMING, Message::DISCARD, Signature::FAILURE, ServerState::FAILED],
[ServerState::TX_STREAMING, Message::RESET, Signature::SUCCESS, ServerState::READY],
Expand Down

0 comments on commit 219999c

Please sign in to comment.