Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add handshake for readFn and ackFn #88

Merged
merged 6 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions examples/simple-source/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ update:

.PHONY: image
image: update
cd ../../ && docker build \
cd ../../ && docker buildx build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi
-t ${IMAGE_REGISTRY} . --platform linux/amd64,linux/arm64 --push

.PHONY: clean
clean:
Expand Down
15 changes: 8 additions & 7 deletions examples/simple-source/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let source_handle = simple_source::SimpleSource::new("Hello World!".to_string());
let source_handle = simple_source::SimpleSource::new();
numaflow::source::Server::new(source_handle).start().await
}

pub(crate) mod simple_source {
use std::{collections::HashSet, sync::RwLock};

use chrono::Utc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{collections::HashSet, sync::RwLock};
use tokio::sync::mpsc::Sender;

use numaflow::source::{Message, Offset, SourceReadRequest, Sourcer};
Expand All @@ -18,15 +18,15 @@ pub(crate) mod simple_source {
/// or Atomics to provide concurrent access. Numaflow actually does not require concurrent access but we are forced to do this because the SDK
/// does not provide a mutable reference as explained in [`Sourcer`]
pub(crate) struct SimpleSource {
payload: String,
yet_to_ack: RwLock<HashSet<String>>,
counter: AtomicUsize,
}

impl SimpleSource {
pub(crate) fn new(payload: String) -> Self {
pub(crate) fn new() -> Self {
Self {
payload,
yet_to_ack: RwLock::new(HashSet::new()),
counter: AtomicUsize::new(0),
}
}
}
Expand All @@ -42,9 +42,10 @@ pub(crate) mod simple_source {
let mut message_offsets = Vec::with_capacity(request.count);
for i in 0..request.count {
let offset = format!("{}-{}", event_time.timestamp_nanos_opt().unwrap(), i);
let payload = self.counter.fetch_add(1, Ordering::SeqCst).to_string();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not relaxed?

transmitter
.send(Message {
value: format!("{}-{}", self.payload, event_time).into_bytes(),
value: payload.into_bytes(),
event_time,
offset: Offset {
offset: offset.clone().into_bytes(),
Expand Down
27 changes: 21 additions & 6 deletions proto/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@ package source.v1;

service Source {
// Read returns a stream of datum responses.
// The size of the returned ReadResponse is less than or equal to the num_records specified in each ReadRequest.
// If the request timeout is reached on the server side, the returned ReadResponse will contain all the datum that have been read (which could be an empty list).
// The size of the returned responses is less than or equal to the num_records specified in each ReadRequest.
// If the request timeout is reached on the server side, the returned responses will contain all the datum that have been read (which could be an empty list).
// The server will continue to read and respond to subsequent ReadRequests until the client closes the stream.
// Once it has sent all the datum, the server will send a ReadResponse with the end of transmission flag set to true.
rpc ReadFn(stream ReadRequest) returns (stream ReadResponse);

// AckFn acknowledges a stream of datum offsets.
// When AckFn is called, it implicitly indicates that the datum stream has been processed by the source vertex.
// The caller (numa) expects the AckFn to be successful, and it does not expect any errors.
// If there are some irrecoverable errors when the callee (UDSource) is processing the AckFn request,
// then it is best to crash because there are no other retry mechanisms possible.
rpc AckFn(stream AckRequest) returns (AckResponse);
// Clients sends n requests and expects n responses.
rpc AckFn(stream AckRequest) returns (stream AckResponse);

// PendingFn returns the number of pending records at the user defined source.
rpc PendingFn(google.protobuf.Empty) returns (PendingResponse);
Expand All @@ -29,6 +31,14 @@ service Source {
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
}

/*
* Handshake message between client and server to indicate the start of transmission.
*/
message Handshake {
// Required field indicating the start of transmission.
bool sot = 1;
}

/*
* ReadRequest is the request for reading datum stream from user defined source.
*/
Expand All @@ -43,6 +53,7 @@ message ReadRequest {
}
// Required field indicating the request.
Request request = 1;
optional Handshake handshake = 2;
}

/*
Expand Down Expand Up @@ -82,14 +93,15 @@ message ReadResponse {
// End of transmission flag.
bool eot = 1;
Code code = 2;
Error error = 3;
optional Error error = 3;
optional string msg = 4;
}
// Required field holding the result.
Result result = 1;
// Status of the response. Holds the end of transmission flag and the status code.
//
Status status = 2;
// Handshake message between client and server to indicate the start of transmission.
optional Handshake handshake = 3;
}

/*
Expand All @@ -103,6 +115,7 @@ message AckRequest {
}
// Required field holding the request. The list will be ordered and will have the same order as the original Read response.
Request request = 1;
optional Handshake handshake = 2;
}

/*
Expand All @@ -122,6 +135,8 @@ message AckResponse {
}
// Required field holding the result.
Result result = 1;
// Handshake message between client and server to indicate the start of transmission.
optional Handshake handshake = 2;
}

/*
Expand Down Expand Up @@ -170,4 +185,4 @@ message Offset {
// It is useful for sources that have multiple partitions. e.g. Kafka.
// If the partition_id is not specified, it is assumed that the source has a single partition.
int32 partition_id = 2;
}
}
Loading
Loading