-
-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RPC Implementation using multiple Relays to enable async communication #24
Conversation
…pportunities to lose a relay
…ting in lost relays (actually improves performance as well)
…more test coverage
…ll the sockets created
WalkthroughThe updates introduce interfaces and classes enhancing connection management, asynchronous RPC calls, and multi-relay handling in the Spiral Goridge library. Key changes include a new interface for connected relays, utility classes for managing multiple relays, and improvements in RPC functionality. Additionally, the library now better supports asynchronous operations and testing, with added tests for new functionalities and corrected minor issues, ensuring robust communication capabilities for RoadRunner services. Changes
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 12
Configuration used: CodeRabbit UI
Files ignored due to path filters (1)
composer.json
is excluded by:!**/*.json
Files selected for processing (15)
- src/ConnectedRelayInterface.php (1 hunks)
- src/MultiRelayHelper.php (1 hunks)
- src/RPC/AbstractRPC.php (1 hunks)
- src/RPC/AsyncRPCInterface.php (1 hunks)
- src/RPC/MultiRPC.php (1 hunks)
- src/RPC/RPC.php (4 hunks)
- src/RPC/RPCInterface.php (1 hunks)
- src/SocketRelay.php (4 hunks)
- src/StreamRelay.php (1 hunks)
- tests/Goridge/MsgPackMultiRPCTest.php (1 hunks)
- tests/Goridge/MultiRPC.php (1 hunks)
- tests/Goridge/MultiRelayHelperTest.php (1 hunks)
- tests/Goridge/TCPMultiRPCTest.php (1 hunks)
- tests/Goridge/TCPRPCTest.php (1 hunks)
- tests/test-server/server.go (2 hunks)
Files skipped from review due to trivial changes (3)
- src/RPC/RPCInterface.php
- tests/Goridge/TCPMultiRPCTest.php
- tests/Goridge/TCPRPCTest.php
Additional comments: 34
tests/Goridge/MultiRelayHelperTest.php (2)
- 11-16: The test
testSupportsStreamRelay
correctly checks the behavior ofMultiRelayHelper::findRelayWithMessage
withStreamRelay
instances. It verifies that the method returnsfalse
when no message is available, which aligns with the expected behavior.- 18-26: The test
testSupportsReadingFromStreamRelay
effectively tests reading from aStreamRelay
. It writes a message to a temporary stream, seeks back to the beginning, and then checks ifMultiRelayHelper::findRelayWithMessage
can detect the message. This test is well-constructed and covers an important functionality.src/ConnectedRelayInterface.php (1)
- 7-36: The
ConnectedRelayInterface
is well-defined, with clear method signatures for managing connections (isConnected
,connect
,close
) and enforcing a clone method (__clone
). This interface is crucial for implementing relays that require explicit connection management. The documentation is clear and provides a good understanding of each method's purpose.tests/Goridge/MsgPackMultiRPCTest.php (1)
- 35-39: The
makeRPC
method is overridden to set the codec toMsgpackCodec
, ensuring that all tests use the correct codec. This setup is crucial for the tests to run as intended.src/RPC/RPC.php (4)
- 13-21: The constructor of the
RPC
class now directly accepts aCodecInterface
instance, defaulting toJsonCodec
. This change simplifies codec management and ensures that the codec is explicitly defined upon instantiation. The use ofprivate readonly
for the$relay
property enhances immutability and clarity.- 28-34: > 📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [31-41]
The
call
method has been updated to include logic for handling options, though the detailed implementation of options handling is not visible. Ensure that the handling of options is consistent with the rest of the system and that any necessary validation or sanitation of options is performed.Verification successful
The verification process, supported by the grep results, indicates that the handling of
$options
is consistent across the system, particularly within theRPC
andCodec
classes. The presence of$options
in method signatures and its usage in various implementations suggest a systematic approach to options handling. This consistency aligns with the expectations set in the original review comment regarding the update to thecall
method to include logic for handling options. Therefore, the review comment is verified as accurate in the context of ensuring consistent options handling across the system.* 41-41: The removal of `decodeResponse` and `packFrame` methods from the `RPC` class and their inclusion in `AbstractRPC` is a good practice, promoting code reuse and reducing duplication. However, ensure that this change does not affect the behavior of existing RPC calls. * 41-41: The static `create` method simplifies the instantiation of the `RPC` class with a specified connection and codec. This factory method enhances usability and provides a clear entry point for creating `RPC` instances.Scripts Executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Verify that options handling is consistent across the system grep -r "options handling" src/Length of output: 33
Script:
#!/bin/bash # Attempt to find methods that accept an 'options' parameter, indicating potential options handling ast-grep --lang php --pattern '$__->__($_, $options) { $$$ }'Length of output: 170
Script:
#!/bin/bash # Search for explicit handling of 'options' in method calls or parameter assignments grep -r --include=*.php "\$options" src/Length of output: 3462
tests/test-server/server.go (1)
- 65-70: The
SleepEcho
function correctly implements a simulated delay usingtime.Sleep
before returning the incoming message. This addition is useful for testing the async RPC functionality by simulating real-world delays. The implementation follows Go best practices for sleep and response handling.src/RPC/AbstractRPC.php (1)
- 16-90: The
AbstractRPC
class is well-structured, providing essential functionality for RPC communication. The methodswithServicePrefix
andwithCodec
allow for fluent configuration of RPC instances, which is a good practice for flexible and readable code. ThedecodeResponse
andpackFrame
methods are crucial for handling RPC communication and are correctly implemented. The use of static sequence numbers ($seq
) for tracking requests is appropriate, but ensure thread safety if applicable.src/RPC/AsyncRPCInterface.php (1)
- 10-69: The
AsyncRPCInterface
clearly defines the necessary methods for asynchronous RPC communication, including methods for making non-blocking calls, checking for responses, and retrieving responses. The method signatures are clear, and the documentation provides sufficient detail on the expected behavior. This interface is a crucial part of enabling efficient asynchronous communication in the Roadrunner server environment.src/MultiRelayHelper.php (1)
- 10-89: The
MultiRelayHelper
class provides essential functionality for working with multiple relays, particularly in asynchronous environments. ThefindRelayWithMessage
method correctly handles bothSocketRelay
andStreamRelay
instances, usingsocket_select
andstream_select
respectively. ThecheckConnected
method is a useful utility for ensuring all provided relays are connected. The implementation is efficient and adheres to best practices for handling multiple relays. Ensure that error handling is robust, especially in edge cases where socket or stream operations might fail.src/StreamRelay.php (1)
- 25-29: Changing the visibility of the
$in
property from private to public in theStreamRelay
class is justified by the need to access the underlying stream for operations likestream_select
across multipleStreamRelay
instances. While making properties public can generally be considered bad practice due to encapsulation concerns, the provided rationale and the internal documentation explaining the necessity of this change make it acceptable in this context.src/SocketRelay.php (1)
- 38-49: > 📝 NOTE
This review was outside the diff hunks and was mapped to the diff hunk with the greatest overlap. Original lines [27-46]
The
SocketRelay
class's implementation of theConnectedRelayInterface
and the change to make thesocket
property public are necessary for the async RPC functionality. The public visibility of thesocket
property, similar to theStreamRelay
class, is justified for operations likesocket_select
. The addition of the__clone
method to nullify thesocket
reference on cloning is a thoughtful detail that prevents potential issues with shared socket resources. These changes are well-justified and adhere to the requirements of the async RPC implementation.src/RPC/MultiRPC.php (16)
- 17-17: The class
MultiRPC
correctly extendsAbstractRPC
and implementsAsyncRPCInterface
, aligning with the PR objectives to introduce asynchronous RPC capabilities.- 23-23: The use of a constant for the default buffer threshold is a good practice for maintainability and configurability.
- 60-84: The constructor's parameter validation ensures that at least one relay is provided and that all relays implement the
ConnectedRelayInterface
. This is crucial for the correct operation of theMultiRPC
class.- 91-106: The static
create
method provides a convenient way to instantiateMultiRPC
with a specified number of relays. The use of assertions to ensure a positive count is a good practice.- 113-118: The
preConnectRelays
method ensures all relays are connected before use. This preemptive connection strategy can improve performance by reducing connection delays during RPC calls.- 121-133: The
call
method correctly implements synchronous RPC calls by ensuring a free relay is available, sending the request, and waiting for the response. This method maintains compatibility with synchronous operations.- 136-147: The
callIgnoreResponse
method allows for fire-and-forget RPC calls, which can be useful for operations where the response is not needed. This method correctly manages relay occupancy.- 150-172: The
callAsync
method introduces the core asynchronous functionality by sending requests without immediately waiting for responses. The method's logic for managing relay occupancy and response buffering is sound.- 175-187: The
hasResponse
method provides a way to check if a response is available for a given sequence number, either in the buffer or in the relay's buffer. This method is essential for asynchronous operations.- 190-217: The
hasResponses
method extends the functionality ofhasResponse
to handle multiple sequence numbers. The use ofMultiRelayHelper::findRelayWithMessage
to efficiently check multiple relays is a good practice.- 220-238: The
getResponse
method retrieves a response for a given sequence number, either from the buffer or directly from the relay. The method's error handling and relay management logic are well-implemented.- 241-298: The
getResponses
method for retrieving multiple responses demonstrates complex logic for handling asynchronous operations. The method correctly manages relay occupancy and error handling.- 305-358: The
ensureFreeRelayAvailable
method's logic for managing relay availability is complex but necessary for the asynchronous functionality. The method's approach to handling occupied relays and ensuring a free relay is available is well thought out.- 365-395: The
getResponseFromRelay
method includes detailed error handling for sequence mismatches and relay connection issues. The method's approach to response buffering is crucial for the asynchronous functionality.- 404-410: The
getResponseFromBuffer
method provides a straightforward way to retrieve a response from the buffer, if available. This method is a key component of the response handling logic.- 413-425: The
checkAllOccupiedRelaysStillConnected
method ensures the reliability of the relay connections, which is essential for maintaining the integrity of asynchronous operations.tests/Goridge/MultiRPC.php (4)
- 58-75: The
testReconnect
method correctly tests the reconnection logic. However, it's important to ensure that theclose
method on the relay indeed simulates a realistic disconnection scenario. Additionally, consider adding assertions to verify that the relay is not connected after callingclose
and before making the nextcall
.Please verify that the
close
method on the relay accurately simulates a realistic disconnection scenario.
- 730-740: The
testAllowsOnlySockets
method correctly tests that only socket relays are allowed. This is an important constraint to enforce, and the test effectively verifies it by expecting anRPCException
. The use ofsprintf
for the exception message is a good practice for dynamic message construction.- 743-747: The
testNeedsAtLeastOne
method effectively tests that at least one relay is required forMultiRPC
to function. This is a critical validation to prevent runtime errors due to misconfiguration. The test is concise and directly addresses the requirement.- 795-804: The
testGetResponsesReturnsWhenNoRelaysAvailableToAvoidInfiniteLoop
method tests an important edge case to ensure the system behaves correctly when no relays are available. This test prevents potential infinite loops, which is crucial for system stability. The explicit setting offreeRelays
andoccupiedRelays
to empty arrays is a good way to simulate this scenario.
use Spiral\Goridge\SocketType; | ||
use Spiral\Goridge\StreamRelay; | ||
|
||
abstract class MultiRPC extends TestCase |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The class MultiRPC
is declared as abstract, which means it cannot be instantiated directly. This is unusual for a test class, as PHPUnit typically instantiates each test class to run its test methods. Please verify if this is intentional and consider removing the abstract
keyword if the class is meant to be instantiated by PHPUnit.
- abstract class MultiRPC extends TestCase
+ class MultiRPC extends TestCase
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
abstract class MultiRPC extends TestCase | |
class MultiRPC extends TestCase |
public function testManualConnect(): void | ||
{ | ||
$relays = []; | ||
for ($i = 0; $i < 10; $i++) { | ||
$relays[] = $this->makeRelay(); | ||
} | ||
/** @var SocketRelay $relay */ | ||
$relay = $relays[0]; | ||
$this->rpc = new GoridgeMultiRPC($relays); | ||
$this->expectedNumberOfRelays = 10; | ||
|
||
$this->assertFalse($relay->isConnected()); | ||
|
||
$relay->connect(); | ||
$this->assertTrue($relay->isConnected()); | ||
|
||
$this->assertSame('pong', $this->rpc->call('Service.Ping', 'ping')); | ||
$this->assertTrue($relay->isConnected()); | ||
|
||
$this->rpc->preConnectRelays(); | ||
foreach ($relays as $relay) { | ||
$this->assertTrue($relay->isConnected()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The testManualConnect
method effectively tests the manual connection of relays and the basic functionality of the call
method. However, it's recommended to split this test into smaller, more focused tests. For example, one test could specifically verify the connection status of relays, while another could test the call
method's functionality. This approach enhances test readability and maintainability.
Consider splitting this test into multiple focused tests.
public function testPingPongAsync(): void | ||
{ | ||
$id = $this->rpc->callAsync('Service.Ping', 'ping'); | ||
$this->assertSame('pong', $this->rpc->getResponse($id)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The testPingPongAsync
method correctly tests asynchronous ping-pong functionality. It's good practice to include a timeout mechanism or a way to prevent the test from hanging indefinitely if the response is not received as expected.
Consider adding a timeout mechanism to prevent the test from hanging indefinitely.
$payload = base64_encode(random_bytes(65000 * 5)); | ||
|
||
$resp = $this->rpc->call('Service.Echo', $payload); | ||
|
||
$this->assertSame(strlen($payload), strlen($resp)); | ||
$this->assertSame(md5($payload), md5($resp)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The testLongEcho
method tests the echo functionality with a large payload. While the test correctly checks the length and MD5 hash of the response, it's also important to consider the performance implications of generating and sending such a large payload. If this test is significantly slower than others, it might be worth marking it as a separate group or optimizing the payload size.
Consider optimizing the payload size or marking this test as a separate group if it significantly impacts test suite performance.
{ | ||
$this->expectException(ServiceException::class); | ||
$this->expectExceptionMessage('unknown Raw payload type'); | ||
|
||
$payload = base64_encode(random_bytes(65000 * 5)); | ||
|
||
$resp = $this->rpc->withCodec(new RawCodec())->call( | ||
'Service.Echo', | ||
$payload | ||
); | ||
|
||
$this->assertSame(strlen($payload), strlen($resp)); | ||
$this->assertSame(md5($payload), md5($resp)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The testConvertException
method is designed to test the conversion exception scenario. However, the test expects an exception before asserting the response length and MD5 hash, which means these assertions will never be reached. This seems to be a logical error in the test design.
- $this->assertSame(strlen($payload), strlen($resp));
- $this->assertSame(md5($payload), md5($resp));
Remove the unreachable assertions.
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
{ | |
$this->expectException(ServiceException::class); | |
$this->expectExceptionMessage('unknown Raw payload type'); | |
$payload = base64_encode(random_bytes(65000 * 5)); | |
$resp = $this->rpc->withCodec(new RawCodec())->call( | |
'Service.Echo', | |
$payload | |
); | |
$this->assertSame(strlen($payload), strlen($resp)); | |
$this->assertSame(md5($payload), md5($resp)); | |
{ | |
$this->expectException(ServiceException::class); | |
$this->expectExceptionMessage('unknown Raw payload type'); | |
$payload = base64_encode(random_bytes(65000 * 5)); | |
$resp = $this->rpc->withCodec(new RawCodec())->call( | |
'Service.Echo', | |
$payload | |
); |
$time = hrtime(true) / 1e9; | ||
$this->assertSame('Hello', $this->rpc->call('Service.SleepEcho', 'Hello')); | ||
// sleep is 100ms, so we check if we are further along than 100ms | ||
$this->assertGreaterThanOrEqual($time + 0.1, hrtime(true) / 1e9); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The testSleepEcho
method tests the delay introduced by the Service.SleepEcho
method. The use of hrtime(true) / 1e9
for timing is accurate, but it's important to ensure that the environment running these tests has a reliable and precise high-resolution timer. Additionally, consider making the expected delay a constant or a configurable parameter to easily adjust the test for different environments.
Consider making the expected delay a constant or a configurable parameter.
Implements an "Async" RPC Interface and Implementation using multiple relays to effectively offer non-blocking IO in regards to the Roadrunner communication.
As discussed in the linked discussion, there's a (small but signifcant) delay in the RPC communication to Roadrunner that can have an impact on a service. In my case the service had a nominal response time of ~2ms, went up to 3ms with metric collection and also had a 2ms
kernel.terminate
listener to collect further metrics, so all in all was busy for ~5ms with only ~2ms spent on the actual request->response cycle.As a fix I propose an "async" implementation of the RPC interface. I don't really like calling it async, because it doesn't offer the same versatility as a proper async implementation, but it changes the control flow and thus qualifies for that name, I guess.
The interface has 3 significant new methods as well as a few supporting changes in other code:
hasResponse
andhasAnyResponse
just check if a response has been receivedResults
I've measured the impact in a testcase using the same repro used in the discussion. The results are the following:
As you can see the "good" implementation (this change) is within the normal deviation of the always-ignore implementation and offers an improvement ~10x performance. I've been using a (slightly modified) implementation of this in the aforementioned production service and could cut down the time spent collecting metrics from a total of 3ms to just 0,2ms, with a min/max of respectively 0.1ms and 0.3ms.
I've also added a change (PR following) in roadrunner-kv that uses the
callAsync
method to implement asetAsync
function. The results so far are promising. With a follow-up check on whether thekv.Set
call was successful, the async implementation is significantly faster:Full results for the Metrics calls:
Considerations
socket_select
andstream_select
. I've commented it as well as marked them with@internal
but it's definitely not ideal.The overflow response buffer currently has no upper limit and thus it's somewhat easy to get a memory leak here. I could add a check inImplemented a flush at 1000 entries (realistically the most a normal application should ever see is maybe ~50) -- I've upped this to 10_000 entries because in testing in particular Doctrine caches get hammered extremely hard and 1_000 is not enough.callAsync
to see if the response buffer is above a certain limit (1000?) and if so flush it. I'll probably do so when adding the tests.TODO
I have glanced at the tests in this repo and will try to implement a few for this change if you've given me the go-ahead with the current implementation.Added a number of testsSummary by CodeRabbit
New Features
Bug Fixes
Refactor
SocketRelay
andStreamRelay
classes for better cloning support and visibility adjustments.AbstractRPC
, refining its structure and functionality.Tests