Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce protobuf serialization/deserialization support for transport requests/responses #9737

Conversation

VachaShah
Copy link
Collaborator

@VachaShah VachaShah commented Sep 4, 2023

Description

This is the first PR for adding support for protobuf serialization/deserialization coming from the POC and draft PR #9097. The idea is to add incremental changes (bottom-up) for the support to use protobuf for cat nodes API. In this PR, the following changes are included:

  • ProtobufWriteable - A version of Writeable where the reader reads from a byte array and writer writes to a java OutputStream. This is needed to serialize and deserialize to and from proto messages. There will be other changes in the OutboundHandler and InboundHandler to transfer bytes related to proto messages over the wire.
  • TransportRequest and TransportResponse support ProtobufWriteable.
  • TaskId is converted into a proto message.
  • TaskIdTests are moved to libs/core module since TaskId has been moved there.

Related Issues

#6844

Check List

  • New functionality includes testing.
    • All tests pass
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed per the DCO using --signoff
  • Commit changes are listed out in CHANGELOG.md file (See: Changelog)

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Contributor

github-actions bot commented Sep 4, 2023

Compatibility status:

Checks if related components are compatible with change e6885cb

Incompatible components

Incompatible components: [https://github.com/opensearch-project/asynchronous-search.git]

Skipped components

Compatible components

Compatible components: [https://github.com/opensearch-project/security.git, https://github.com/opensearch-project/alerting.git, https://github.com/opensearch-project/index-management.git, https://github.com/opensearch-project/anomaly-detection.git, https://github.com/opensearch-project/sql.git, https://github.com/opensearch-project/job-scheduler.git, https://github.com/opensearch-project/observability.git, https://github.com/opensearch-project/common-utils.git, https://github.com/opensearch-project/k-nn.git, https://github.com/opensearch-project/reporting.git, https://github.com/opensearch-project/cross-cluster-replication.git, https://github.com/opensearch-project/security-analytics.git, https://github.com/opensearch-project/custom-codecs.git, https://github.com/opensearch-project/performance-analyzer.git, https://github.com/opensearch-project/ml-commons.git, https://github.com/opensearch-project/performance-analyzer-rca.git, https://github.com/opensearch-project/opensearch-oci-object-storage.git, https://github.com/opensearch-project/geospatial.git, https://github.com/opensearch-project/notifications.git, https://github.com/opensearch-project/neural-search.git]

@github-actions
Copy link
Contributor

github-actions bot commented Sep 4, 2023

Gradle Check (Jenkins) Run Completed with:

@dblock
Copy link
Member

dblock commented Sep 5, 2023

I like it. A couple of ideas/comments.

  1. I think it's worth starting to document the new protobuf-based protocol from the start. This PR could contain some README content on how to enable protobuf, what parts of the protocol it replaces (e.g. some extension support is protobuf only).
  2. I quickly read the code, how do I exercise it in a build of OpenSearch? I think it's important to find a path in which we can have something "useful" early.
  3. Where can one read about the north star of completely replacing the custom-rolled OpenSearch protocol by Protobuf?

@dblock
Copy link
Member

dblock commented Sep 5, 2023

@dbwiddis will have lots of opinions here ;)

Copy link
Member

@dbwiddis dbwiddis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work so far. Some comments/questions in-line.

/**
* Implementers can be written to a {@linkplain StreamOutput} and read from a {@linkplain StreamInput}. This allows them to be "thrown
* across the wire" using OpenSearch's internal protocol. If the implementer also implements equals and hashCode then a copy made by
* serializing and deserializing must be equal and have the same hashCode. It isn't required that such a copy be entirely unchanged.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You missed swapping StreamOut(In)put to Out(In)putStream.

Also I'm really confused about the equals/hashCode bit (copied from Writeable) in the context of a protobuf byte stream (see next comment)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is my bad! I will fix the comments.

@@ -70,6 +76,7 @@ public TaskId(String nodeId, long id) {
private TaskId() {
nodeId = "";
id = -1;
taskIdProto = TaskIdProto.TaskId.newBuilder().setNodeId(nodeId).setId(id).build();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes little sense to build a whole protobuf object to contain what we currently encode as a single 0x00 byte. We're assigning this to the EMPTY_TASK_ID constant, can this object just be left null, and if we encounter a 0 byte when we get to this object, we don't need to deserialize a proto object.

Related, TaskId is one of several informational bits OpenSearch communicates that are separate from the writeable content generated by TransportRequest implementations. We also have thread context request and response headers, features, and the action name. It seems we lose a lot by serializing all of these individually vs. combining all "header" content in a single protobuf byte stream. If we are going to keep them as separate pieces, we probably should continue to keep the 1-byte representations they currently have when they are empty (e.g., empty map, empty list, empty string all "write" a single byte (Vint) 0).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my above comment of how the outbound message will also be a proto message which will contain header, thread context request, response headers, features and action name within a single proto object.

} else {
nodeId = "";
id = -1L;
taskIdProto = TaskIdProto.TaskId.newBuilder().setNodeId(nodeId).setId(id).build();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above on serializing "empty" information vs. never even looking at this object.

* {@linkplain byte[]} so we can return the {@link #EMPTY_TASK_ID} without allocating.
*/
public static TaskId readFromBytes(byte[] in) throws IOException {
TaskIdProto.TaskId taskIdProto = TaskIdProto.TaskId.parseFrom(in);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach forces us to always parse a bytestream for the "most common" empty task ID.

Can we have a single byte boolean (basically the "OptionalWriteable" implementation) here which directly shortcuts to EMPTY_TASK_ID instead of always deserializing some number of bytes > 1?

@@ -116,6 +142,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(id);
}

@Override
public void writeTo(OutputStream out) throws IOException {
out.write(this.taskIdProto.toByteArray());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May want to consider special-casing the empty-node-id = 0 case here if you implement it per my earlier comments.


message TaskId {
string nodeId = 1;
int64 id = 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you choose to serialize the task ID instead of using the "0" byte, consider making these optional parts of the message with the defaults from the empty string/-1, to accomplish the same behavior but permit serializing "nothing" rather than a string and int64 (I'm curious what a "nothing" byte stream looks like.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense! We can make the nodeId optional in the proto message.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make the nodeId optional in the proto message.

Note this is an alternative to the "0" shortcut which I think is a better option. Is it allowed to have all the values optional? What is the byte size of a protobuffer with a missing nodeId and a single -1 long, which would be the constant we'd store and use as an alternative to "0"?

@VachaShah
Copy link
Collaborator Author

Great work so far. Some comments/questions in-line.

Thank you @dbwiddis! I have added a few responses, LMK what you think. I will keep refining the PR :)

@VachaShah
Copy link
Collaborator Author

VachaShah commented Sep 6, 2023

I like it. A couple of ideas/comments.

  1. I think it's worth starting to document the new protobuf-based protocol from the start. This PR could contain some README content on how to enable protobuf, what parts of the protocol it replaces (e.g. some extension support is protobuf only).
  2. I quickly read the code, how do I exercise it in a build of OpenSearch? I think it's important to find a path in which we can have something "useful" early.
  3. Where can one read about the north star of completely replacing the custom-rolled OpenSearch protocol by Protobuf?

Thank you @dblock for your comments!

  1. Definitely! There will be more PRs for the same to add the support for the first API in OpenSearch (cat nodes). I will start a README for the same.
  2. It will still need a few PRs to reach a point where this code can be exercised. I am adding in the changes incrementally but all the changes for the POC are in Cat Nodes API with Protobuf #9097. The POC ofcourse has some duplicate files since I created a parallel API to cat nodes with protobuf de/serialization for comparison.
  3. I will write up an issue soon with all the work done, the next steps and what we are aiming for.

LMK what you think :)

@github-actions
Copy link
Contributor

github-actions bot commented Sep 6, 2023

Gradle Check (Jenkins) Run Completed with:

@VachaShah VachaShah force-pushed the introduce-protobuf-transport-req/res branch from 7f6a1b6 to fb1edf8 Compare September 6, 2023 06:06
@github-actions
Copy link
Contributor

github-actions bot commented Sep 6, 2023

Compatibility status:

Checks if related components are compatible with change 7f6a1b6

Incompatible components

Incompatible components: [https://github.com/opensearch-project/security.git, https://github.com/opensearch-project/security-analytics.git, https://github.com/opensearch-project/asynchronous-search.git]

Skipped components

Compatible components

Compatible components: [https://github.com/opensearch-project/geospatial.git, https://github.com/opensearch-project/notifications.git, https://github.com/opensearch-project/neural-search.git, https://github.com/opensearch-project/index-management.git, https://github.com/opensearch-project/sql.git, https://github.com/opensearch-project/observability.git, https://github.com/opensearch-project/job-scheduler.git, https://github.com/opensearch-project/opensearch-oci-object-storage.git, https://github.com/opensearch-project/k-nn.git, https://github.com/opensearch-project/cross-cluster-replication.git, https://github.com/opensearch-project/alerting.git, https://github.com/opensearch-project/anomaly-detection.git, https://github.com/opensearch-project/ml-commons.git, https://github.com/opensearch-project/performance-analyzer.git, https://github.com/opensearch-project/performance-analyzer-rca.git, https://github.com/opensearch-project/common-utils.git, https://github.com/opensearch-project/reporting.git]

@github-actions
Copy link
Contributor

github-actions bot commented Sep 6, 2023

Compatibility status:

Checks if related components are compatible with change fb1edf8

Incompatible components

Incompatible components: [https://github.com/opensearch-project/security.git, https://github.com/opensearch-project/asynchronous-search.git, https://github.com/opensearch-project/security-analytics.git]

Skipped components

Compatible components

Compatible components: [https://github.com/opensearch-project/alerting.git, https://github.com/opensearch-project/index-management.git, https://github.com/opensearch-project/sql.git, https://github.com/opensearch-project/anomaly-detection.git, https://github.com/opensearch-project/job-scheduler.git, https://github.com/opensearch-project/observability.git, https://github.com/opensearch-project/common-utils.git, https://github.com/opensearch-project/k-nn.git, https://github.com/opensearch-project/reporting.git, https://github.com/opensearch-project/cross-cluster-replication.git, https://github.com/opensearch-project/geospatial.git, https://github.com/opensearch-project/performance-analyzer.git, https://github.com/opensearch-project/notifications.git, https://github.com/opensearch-project/ml-commons.git, https://github.com/opensearch-project/performance-analyzer-rca.git, https://github.com/opensearch-project/neural-search.git, https://github.com/opensearch-project/opensearch-oci-object-storage.git]

@github-actions
Copy link
Contributor

github-actions bot commented Sep 6, 2023

Gradle Check (Jenkins) Run Completed with:

Copy link
Member

@dbwiddis dbwiddis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarifications. Down to some summary comments.


message TaskId {
string nodeId = 1;
int64 id = 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make the nodeId optional in the proto message.

Note this is an alternative to the "0" shortcut which I think is a better option. Is it allowed to have all the values optional? What is the byte size of a protobuffer with a missing nodeId and a single -1 long, which would be the constant we'd store and use as an alternative to "0"?

@dbwiddis dbwiddis self-requested a review September 6, 2023 15:57
@wbeckler wbeckler changed the title Introduce protobuf serialization/deserilization support for transport requests/responses Introduce protobuf serialization/deserialization support for transport requests/responses Sep 6, 2023
Signed-off-by: Vacha Shah <vachshah@amazon.com>
Signed-off-by: Vacha Shah <vachshah@amazon.com>
…ments

Signed-off-by: Vacha Shah <vachshah@amazon.com>
Signed-off-by: Vacha Shah <vachshah@amazon.com>
Signed-off-by: Vacha Shah <vachshah@amazon.com>
@VachaShah VachaShah force-pushed the introduce-protobuf-transport-req/res branch from fb1edf8 to e6885cb Compare September 14, 2023 20:42
@VachaShah
Copy link
Collaborator Author

@dbwiddis I updated the code to represent the empty taskId using an empty byte array for protobuf taskId as discussed. Let me know what you think.

@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@codecov
Copy link

codecov bot commented Sep 14, 2023

Codecov Report

Merging #9737 (e6885cb) into main (71f6136) will increase coverage by 0.06%.
The diff coverage is 41.93%.

@@             Coverage Diff              @@
##               main    #9737      +/-   ##
============================================
+ Coverage     71.10%   71.16%   +0.06%     
- Complexity    58070    58136      +66     
============================================
  Files          4824     4824              
  Lines        273918   273949      +31     
  Branches      39918    39919       +1     
============================================
+ Hits         194768   194963     +195     
+ Misses        62802    62660     -142     
+ Partials      16348    16326      -22     
Files Changed Coverage Δ
...rg/opensearch/core/transport/TransportMessage.java 83.33% <0.00%> (-16.67%) ⬇️
...g/opensearch/core/transport/TransportResponse.java 54.54% <0.00%> (-31.17%) ⬇️
...ava/org/opensearch/transport/TransportRequest.java 66.66% <0.00%> (-33.34%) ⬇️
...rc/main/java/org/opensearch/core/tasks/TaskId.java 71.83% <68.42%> (-1.25%) ⬇️

... and 459 files with indirect coverage changes

*/
public static TaskId readFromBytes(byte[] in) throws IOException {
TaskIdProto.TaskId taskIdProto = TaskIdProto.TaskId.parseFrom(in);
if (!taskIdProto.hasNodeId()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help me understand this line of code as I don't see where it's defined in Java, so it may be a generated class.

My thought was that we would read/write a byte array with a VInt length before it, and the array itself gets parsed with protobuf. If the length is 0 then we skip reading anything and return empty.

If the entire TaskId was optional (see optionalWriteable) this would essentially be the same thing.

But this looks like we are still reading some number of bytes and just determining whether the nodeId index is present; we'd still have 2 bytes for the int even if 0 bytes for the string.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hasNodeId method comes from the generated class from protobuf which checks if the optional field nodeId is set or not. For line 132, I kept the same logic as readFromStream where if the nodeId is present, it creates a TaskId out of it else it just returns an empty taskId.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think from what I understand, we would be able to achieve the same with keeping both nodeId and id both are optional in the proto message, so similar to the readFromStream where it reads the string first as the nodeId and determines if empty taskId should be returned. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable. Sorry to be so into the details here, but my main concern has been replacing a single-byte solution with anything more complex. If making both nodeId and id optional results in a 1-byte buffer that works great.

I'm still not clear on how protobuf communicates the total bytes, though. Looking at #6960 I see the pattern parseFrom(in.readByteArray()) which implies a length VInt followed by that number of bytes.

I don't see any equivalent here, or how if everything is optional it provides any signal to the reader that there's nothing to read. Does protobuf assume reading to end-of-stream? In that case, reading TaskId would spill over into the TransportMessage. Or is there an end-of-stream marker?

TLDR: the savings from fewer bytes on the variable length long need to outweigh the extra bytes for field indices and length bytes and any other overhead to make serializing TaskId worth it, and I'm not sure of those numbers here.

Copy link
Collaborator Author

@VachaShah VachaShah Sep 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally makes sense! I was thinking about this and we are only writing the bytes when the nodeId is present (https://github.com/opensearch-project/OpenSearch/pull/9737/files#diff-440ec1fd7eb50816eb2d1ec1eb201229b810efcf1939a9953afb0025ffca2f32R154) so I am thinking when we read the bytes, the byte array will have both nodeId and id or nothing at all. Would that not be similar to the current implementation? I might be totally misunderstanding it so would love to know your thoughts.

For a proto message structure, protobuf can deserialize it without any problem. I get your concern that currently only TaskId is the proto message here while TransportMessage is not but that implementation will come in the upcoming PRs where ClusterStateRequest (for example) which is a type of TransportMessage will also be a proto and all of them will be part of a proto message as the OutboundMessage which is sent over the wire. So, currently TaskId looks like a separate proto in the middle of a stream but ultimately it will be part of the OutboundMessageProto with all the bytes and protobuf will deserialize it into various parts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally makes sense! I was thinking about this and we are only writing the bytes when the nodeId is present (https://github.com/opensearch-project/OpenSearch/pull/9737/files#diff-440ec1fd7eb50816eb2d1ec1eb201229b810efcf1939a9953afb0025ffca2f32R154) so I am thinking when we read the bytes, the byte array will have both nodeId and id or nothing at all. Would that not be similar to the current implementation? I might be totally misunderstanding it so would love to know your thoughts.

The issue we're facing here here is that the TaskId is in the middle of the byte stream. StreamInput needs to know where TaskID starts and ends so it can move on to read the bytes for the TransportRequest.

Take for example one of the simplest TransportRequests, the initial TCP Handshake Request:

static final class HandshakeRequest extends TransportRequest {
private final Version version;
HandshakeRequest(Version version) {
this.version = version;
}

Here's what the byte stream looks like (non-protobuf):

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 45 53 00 00 00 31 00 00 00 00 00 00 00 08 08 08 |ES...1..........|
|00000010| 20 0b 83 00 00 00 1a 00 00 00 16 69 6e 74 65 72 | ..........inter|
|00000020| 6e 61 6c 3a 74 63 70 2f 68 61 6e 64 73 68 61 6b |nal:tcp/handshak|
|00000030| 65 00 04 a3 8e b7 41                            |e.....A         |
+--------+-------------------------------------------------+----------------+

The first 49 bytes (all the way up to the 65 (e) starting the last row) are all part of the OutboundMessageRequest header. So we are receiving the last six bytes that we need to parse into a TransportMessage.

The last 5 bytes (04 a3 8e b7 41) are the Writeable payload for that HandshakeRequest class linked above (see readBytesReference(), it's a length byte followed by the 4 bytes encoding the version).

That single 00 byte in between the headers and payload is the Task ID. It's the length of the ID string (0) which shortcuts the 8-byte id. This is good; when writing if it's "Empty" we just write a 0 byte; when reading when we see a 0 byte we just skip it.

If you "write nothing" how will Protobuf know not to parse it? There has to be at lease some way to indicate where the TaskID protobuf bytes start and end before we go on to read those last bytes encoding the message.

By the time you get to where hasNodeId() works, you've already tried to parse the message into a protobuf, but you don't know where it ends and the "version" in this handshake begins.

I think you need to do something similar to this implementation:

public RegisterRestActionsRequest(StreamInput in) throws IOException {
super(in);
request = RegisterRestActions.parseFrom(in.readByteArray());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByteArray(request.toByteArray());
}

This reads/writes the protobuf bytes wrapped inside a ByteArray. You can read the byte array and if it is StreamInput.EMPTY_BYTE_ARRAY then you can just return the EMPTY_TASK_ID. Otherwise, you take the byte array and call parseFrom() on it.

This preserves the 1-byte "empty" behavior we currently have and gives us a slightly shorter (due to variable length task id long) set of bytes (plus the length byte) in the case that we actually have a Task ID.

*
* @opensearch.internal
*/
public interface ProtobufWriteable {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have this writeable extend Writeable interface?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @Bukhtawar. These changes should be least intrusive and should use existing interfaces. It would enable us to experiment with other protocols as well.


/**
* Task id that consists of node id and id of the task on the node
*
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public final class TaskId implements Writeable {
public final class TaskId implements Writeable, ProtobufWriteable {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks confusing, why do we need two interfaces?

@VachaShah
Copy link
Collaborator Author

I discussed with Dan and I am closing this PR in favor of a new one with starting top down from the POC #9097 as it will make better sense as a PR and give proper context. We also discussed that we can leave TaskId alone for now and focus on the larger request/response classes to get immediate benefits.

@VachaShah VachaShah closed this Sep 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants