Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: streaming transformer #196

Merged
merged 8 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions pynumaflow/proto/sourcetransformer/transform.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,35 @@ service SourceTransform {
// SourceTransformFn applies a function to each request element.
// In addition to map function, SourceTransformFn also supports assigning a new event time to response.
// SourceTransformFn can be used only at source vertex by source data transformer.
rpc SourceTransformFn(SourceTransformRequest) returns (SourceTransformResponse);
rpc SourceTransformFn(stream SourceTransformRequest) returns (stream SourceTransformResponse);

// IsReady is the heartbeat endpoint for gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
}

/*
* Handshake message between client and server to indicate the start of transmission.
*/
message Handshake {
// Required field indicating the start of transmission.
bool sot = 1;
}

/**
* SourceTransformerRequest represents a request element.
*/
message SourceTransformRequest {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
message Request {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
// This ID is used to uniquely identify a transform request
string id = 6;
}
Request request = 1;
optional Handshake handshake = 2;
}

/**
Expand All @@ -37,11 +51,15 @@ message SourceTransformResponse {
repeated string tags = 4;
}
repeated Result results = 1;
// This ID is used to refer the responses to the request it corresponds to.
string id = 2;
// Handshake message between client and server to indicate the start of transmission.
optional Handshake handshake = 3;
}

/**
* ReadyResponse is the health check result.
*/
message ReadyResponse {
bool ready = 1;
}
}
34 changes: 19 additions & 15 deletions pynumaflow/proto/sourcetransformer/transform_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 52 additions & 24 deletions pynumaflow/proto/sourcetransformer/transform_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,58 @@ from typing import (

DESCRIPTOR: _descriptor.FileDescriptor

class Handshake(_message.Message):
__slots__ = ("sot",)
SOT_FIELD_NUMBER: _ClassVar[int]
sot: bool
def __init__(self, sot: bool = ...) -> None: ...

class SourceTransformRequest(_message.Message):
__slots__ = ("keys", "value", "event_time", "watermark", "headers")
__slots__ = ("request", "handshake")

class Request(_message.Message):
__slots__ = ("keys", "value", "event_time", "watermark", "headers", "id")

class HeadersEntry(_message.Message):
__slots__ = ("key", "value")
KEY_FIELD_NUMBER: _ClassVar[int]
class HeadersEntry(_message.Message):
__slots__ = ("key", "value")
KEY_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
key: str
value: str
def __init__(self, key: _Optional[str] = ..., value: _Optional[str] = ...) -> None: ...
KEYS_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
key: str
value: str
def __init__(self, key: _Optional[str] = ..., value: _Optional[str] = ...) -> None: ...
KEYS_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
EVENT_TIME_FIELD_NUMBER: _ClassVar[int]
WATERMARK_FIELD_NUMBER: _ClassVar[int]
HEADERS_FIELD_NUMBER: _ClassVar[int]
keys: _containers.RepeatedScalarFieldContainer[str]
value: bytes
event_time: _timestamp_pb2.Timestamp
watermark: _timestamp_pb2.Timestamp
headers: _containers.ScalarMap[str, str]
EVENT_TIME_FIELD_NUMBER: _ClassVar[int]
WATERMARK_FIELD_NUMBER: _ClassVar[int]
HEADERS_FIELD_NUMBER: _ClassVar[int]
ID_FIELD_NUMBER: _ClassVar[int]
keys: _containers.RepeatedScalarFieldContainer[str]
value: bytes
event_time: _timestamp_pb2.Timestamp
watermark: _timestamp_pb2.Timestamp
headers: _containers.ScalarMap[str, str]
id: str
def __init__(
self,
keys: _Optional[_Iterable[str]] = ...,
value: _Optional[bytes] = ...,
event_time: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ...,
watermark: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ...,
headers: _Optional[_Mapping[str, str]] = ...,
id: _Optional[str] = ...,
) -> None: ...
REQUEST_FIELD_NUMBER: _ClassVar[int]
HANDSHAKE_FIELD_NUMBER: _ClassVar[int]
request: SourceTransformRequest.Request
handshake: Handshake
def __init__(
self,
keys: _Optional[_Iterable[str]] = ...,
value: _Optional[bytes] = ...,
event_time: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ...,
watermark: _Optional[_Union[_timestamp_pb2.Timestamp, _Mapping]] = ...,
headers: _Optional[_Mapping[str, str]] = ...,
request: _Optional[_Union[SourceTransformRequest.Request, _Mapping]] = ...,
handshake: _Optional[_Union[Handshake, _Mapping]] = ...,
) -> None: ...

class SourceTransformResponse(_message.Message):
__slots__ = ("results",)
__slots__ = ("results", "id", "handshake")

class Result(_message.Message):
__slots__ = ("keys", "value", "event_time", "tags")
Expand All @@ -63,9 +84,16 @@ class SourceTransformResponse(_message.Message):
tags: _Optional[_Iterable[str]] = ...,
) -> None: ...
RESULTS_FIELD_NUMBER: _ClassVar[int]
ID_FIELD_NUMBER: _ClassVar[int]
HANDSHAKE_FIELD_NUMBER: _ClassVar[int]
results: _containers.RepeatedCompositeFieldContainer[SourceTransformResponse.Result]
id: str
handshake: Handshake
def __init__(
self, results: _Optional[_Iterable[_Union[SourceTransformResponse.Result, _Mapping]]] = ...
self,
results: _Optional[_Iterable[_Union[SourceTransformResponse.Result, _Mapping]]] = ...,
id: _Optional[str] = ...,
handshake: _Optional[_Union[Handshake, _Mapping]] = ...,
) -> None: ...

class ReadyResponse(_message.Message):
Expand Down
12 changes: 6 additions & 6 deletions pynumaflow/proto/sourcetransformer/transform_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def __init__(self, channel):
Args:
channel: A grpc.Channel.
"""
self.SourceTransformFn = channel.unary_unary(
self.SourceTransformFn = channel.stream_stream(
"/sourcetransformer.v1.SourceTransform/SourceTransformFn",
request_serializer=transform__pb2.SourceTransformRequest.SerializeToString,
response_deserializer=transform__pb2.SourceTransformResponse.FromString,
Expand All @@ -30,7 +30,7 @@ def __init__(self, channel):
class SourceTransformServicer(object):
"""Missing associated documentation comment in .proto file."""

def SourceTransformFn(self, request, context):
def SourceTransformFn(self, request_iterator, context):
"""SourceTransformFn applies a function to each request element.
In addition to map function, SourceTransformFn also supports assigning a new event time to response.
SourceTransformFn can be used only at source vertex by source data transformer.
Expand All @@ -48,7 +48,7 @@ def IsReady(self, request, context):

def add_SourceTransformServicer_to_server(servicer, server):
rpc_method_handlers = {
"SourceTransformFn": grpc.unary_unary_rpc_method_handler(
"SourceTransformFn": grpc.stream_stream_rpc_method_handler(
servicer.SourceTransformFn,
request_deserializer=transform__pb2.SourceTransformRequest.FromString,
response_serializer=transform__pb2.SourceTransformResponse.SerializeToString,
Expand All @@ -71,7 +71,7 @@ class SourceTransform(object):

@staticmethod
def SourceTransformFn(
request,
request_iterator,
target,
options=(),
channel_credentials=None,
Expand All @@ -82,8 +82,8 @@ def SourceTransformFn(
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
return grpc.experimental.stream_stream(
request_iterator,
target,
"/sourcetransformer.v1.SourceTransform/SourceTransformFn",
transform__pb2.SourceTransformRequest.SerializeToString,
Expand Down
2 changes: 1 addition & 1 deletion pynumaflow/sourcetransformer/multiproc_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pynumaflow.info.types import ServerInfo, MINIMUM_NUMAFLOW_VERSION, ContainerType
from pynumaflow.sourcetransformer.servicer.server import SourceTransformServicer
from pynumaflow.sourcetransformer.servicer._servicer import SourceTransformServicer

from pynumaflow.shared.server import start_multiproc_server

Expand Down
2 changes: 1 addition & 1 deletion pynumaflow/sourcetransformer/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pynumaflow.shared import NumaflowServer
from pynumaflow.shared.server import sync_server_start
from pynumaflow.sourcetransformer._dtypes import SourceTransformCallable
from pynumaflow.sourcetransformer.servicer.server import SourceTransformServicer
from pynumaflow.sourcetransformer.servicer._servicer import SourceTransformServicer


class SourceTransformServer(NumaflowServer):
Expand Down
Loading
Loading