Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement reduce stream sdk #91

Merged
merged 11 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,23 @@
</to>
</configuration>
</execution>
<execution>
<id>reduce-stream-sum</id>
<phase>package</phase>
<goals>
<goal>dockerBuild</goal>
</goals>
<configuration>
<container>
<mainClass>
io.numaproj.numaflow.examples.reducestreamer.sum.SumFactory
</mainClass>
</container>
<to>
<image>numaflow-java-examples/reduce-stream-sum</image>
</to>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.numaproj.numaflow.examples.reducestreamer.sum;

import io.numaproj.numaflow.reducestreamer.Server;
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamerFactory;
import lombok.extern.slf4j.Slf4j;

/**
* SumFactory extends ReduceStreamerFactory to support creating instances of SumFunction.
* It also provides a main function to start a server for handling the reduce stream.
*/
@Slf4j
public class SumFactory extends ReduceStreamerFactory<SumFunction> {

public static void main(String[] args) throws Exception {
log.info("sum udf was invoked");
new Server(new SumFactory()).start();
}

@Override
public SumFunction createReduceStreamer() {
return new SumFunction();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.numaproj.numaflow.examples.reducestreamer.sum;

import io.numaproj.numaflow.reducestreamer.model.Message;
import io.numaproj.numaflow.reducestreamer.model.Metadata;
import io.numaproj.numaflow.reducestreamer.model.OutputStreamObserver;
import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer;
import lombok.extern.slf4j.Slf4j;

/**
* SumFunction is a User Defined Reduce Stream Function example which sums up the values for the given keys
* and outputs the sum when the sum is greater than 100.
* When the input stream closes, the function outputs the sum no matter what value it holds.
*/
@Slf4j
public class SumFunction extends ReduceStreamer {

private int sum = 0;

@Override
public void processMessage(
String[] keys,
io.numaproj.numaflow.reducestreamer.model.Datum datum,
OutputStreamObserver outputStreamObserver,
io.numaproj.numaflow.reducestreamer.model.Metadata md) {
try {
sum += Integer.parseInt(new String(datum.getValue()));
} catch (NumberFormatException e) {
log.info("error while parsing integer - {}", e.getMessage());
}
if (sum >= 100) {
outputStreamObserver.send(new Message(String.valueOf(sum).getBytes()));
sum = 0;
}
}

@Override
public void handleEndOfStream(
String[] keys,
OutputStreamObserver outputStreamObserver,
Metadata md) {
outputStreamObserver.send(new Message(String.valueOf(sum).getBytes()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.numaproj.numaflow.reducestreamer;

import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.AllArgsConstructor;
import lombok.Getter;

/**
* ActorRequest is a wrapper of the gRpc input request.
* It is constructed by the service when service receives an input request and then sent to
* the supervisor actor, to be distributed to reduce streamer actors.
*/
@Getter
@AllArgsConstructor
class ActorRequest {
ReduceOuterClass.ReduceRequest request;

// TODO - do we need to include window information in the id?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
// we will revisit this one later.
public String getUniqueIdentifier() {
return String.join(
Constants.DELIMITER,
this.getRequest().getPayload().getKeysList().toArray(new String[0]));
}

public String[] getKeySet() {
return this.getRequest().getPayload().getKeysList().toArray(new String[0]);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.numaproj.numaflow.reducestreamer;

import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

/**
* The actor response holds the final EOF response for a particular key set.
* <p>
* The isLast attribute indicates whether the response is globally the last one to be sent to
* the output gRPC stream, if set to true, it means the response is the very last response among
* all key sets. When output stream actor receives an isLast response, it sends the response and immediately
* closes the output stream.
*/
@Getter
@Setter
@AllArgsConstructor
class ActorResponse {
ReduceOuterClass.ReduceResponse response;
boolean isLast;

// TODO - do we need to include window information in the id?
// for aligned reducer, there is always single window.
// but at the same time, would like to be consistent with GO SDK implementation.
// we will revisit this one later.
public String getActorUniqueIdentifier() {
return String.join(
Constants.DELIMITER,
this.getResponse().getResult().getKeysList().toArray(new String[0]));
}
}
13 changes: 13 additions & 0 deletions src/main/java/io/numaproj/numaflow/reducestreamer/Constants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.numaproj.numaflow.reducestreamer;

class Constants {
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64;

public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/reducestream.sock";

public static final String EOF = "EOF";

public static final String SUCCESS = "SUCCESS";

public static final String DELIMITER = ":";
}
26 changes: 26 additions & 0 deletions src/main/java/io/numaproj/numaflow/reducestreamer/GRPCConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.numaproj.numaflow.reducestreamer;

import io.numaproj.numaflow.info.ServerInfoAccessor;
import lombok.Builder;
import lombok.Getter;

/**
* GRPCConfig is used to provide configurations for gRPC server.
*/
@Getter
@Builder(builderMethodName = "newBuilder")
public class GRPCConfig {
private String socketPath;
private int maxMessageSize;
private String infoFilePath;

/**
* Static method to create default GRPCConfig.
*/
static GRPCConfig defaultGrpcConfig() {
return GRPCConfig.newBuilder()
.infoFilePath(ServerInfoAccessor.DEFAULT_SERVER_INFO_FILE_PATH)
.maxMessageSize(Constants.DEFAULT_MESSAGE_SIZE)
.socketPath(Constants.DEFAULT_SOCKET_PATH).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.numaproj.numaflow.reducestreamer;

import io.numaproj.numaflow.reducestreamer.model.Datum;
import lombok.AllArgsConstructor;

import java.time.Instant;

@AllArgsConstructor
class HandlerDatum implements Datum {
private byte[] value;
private Instant watermark;
private Instant eventTime;

@Override
public Instant getWatermark() {
return this.watermark;
}

@Override
public byte[] getValue() {
return this.value;
}

@Override
public Instant getEventTime() {
return this.eventTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.numaproj.numaflow.reducestreamer;

import io.numaproj.numaflow.reducestreamer.model.IntervalWindow;
import lombok.AllArgsConstructor;

import java.time.Instant;

@AllArgsConstructor
class IntervalWindowImpl implements IntervalWindow {
private final Instant startTime;
private final Instant endTime;

@Override
public Instant getStartTime() {
return this.startTime;
}

@Override
public Instant getEndTime() {
return this.endTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.numaproj.numaflow.reducestreamer;

import io.numaproj.numaflow.reducestreamer.model.IntervalWindow;
import io.numaproj.numaflow.reducestreamer.model.Metadata;
import lombok.AllArgsConstructor;

@AllArgsConstructor
class MetadataImpl implements Metadata {
private final IntervalWindow intervalWindow;

@Override
public IntervalWindow getIntervalWindow() {
return intervalWindow;
}
}
48 changes: 48 additions & 0 deletions src/main/java/io/numaproj/numaflow/reducestreamer/OutputActor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.numaproj.numaflow.reducestreamer;

import akka.actor.AbstractActor;
import akka.actor.Props;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
* Output actor is a wrapper around the gRPC output stream.
* It ensures synchronized calls to the responseObserver onNext() and invokes onComplete at the end of the stream.
* ALL reduce responses are sent to the response stream actor before getting forwarded to the output gRPC stream.
* <p>
* More details about gRPC StreamObserver concurrency: https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/StreamObserver.html
*/
@Slf4j
@AllArgsConstructor
class OutputActor extends AbstractActor {
StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver;

public static Props props(
StreamObserver<ReduceOuterClass.ReduceResponse> responseObserver) {
return Props.create(OutputActor.class, responseObserver);
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(ActorResponse.class, this::handleResponse)
.build();
}

private void handleResponse(ActorResponse actorResponse) {
if (actorResponse.isLast()) {
// send the very last response.
responseObserver.onNext(actorResponse.getResponse());
// close the output stream.
responseObserver.onCompleted();
// stop the AKKA system right after we close the output stream.
// note: could make more sense if the supervisor actor stops the system,
// but it requires an extra tell.
getContext().getSystem().stop(getSender());
} else {
responseObserver.onNext(actorResponse.getResponse());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.numaproj.numaflow.reducestreamer;

import akka.actor.ActorRef;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
import io.numaproj.numaflow.reducestreamer.model.Message;
import io.numaproj.numaflow.reducestreamer.model.Metadata;
import io.numaproj.numaflow.reducestreamer.model.OutputStreamObserver;
import lombok.AllArgsConstructor;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

@AllArgsConstructor
class OutputStreamObserverImpl implements OutputStreamObserver {
private final Metadata md;
private final ActorRef responseStreamActor;

@Override
public void send(Message message) {
this.responseStreamActor.tell(buildResponse(message), ActorRef.noSender());
}

private ActorResponse buildResponse(Message message) {
ReduceOuterClass.ReduceResponse.Builder responseBuilder = ReduceOuterClass.ReduceResponse.newBuilder();
// set the window using the actor metadata.
responseBuilder.setWindow(ReduceOuterClass.Window.newBuilder()
.setStart(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getStartTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getStartTime().getNano()))
.setEnd(Timestamp.newBuilder()
.setSeconds(this.md.getIntervalWindow().getEndTime().getEpochSecond())
.setNanos(this.md.getIntervalWindow().getEndTime().getNano()))
.setSlot("slot-0").build());
responseBuilder.setEOF(false);
// set the result.
responseBuilder.setResult(ReduceOuterClass.ReduceResponse.Result
.newBuilder()
.setValue(ByteString.copyFrom(message.getValue()))
.addAllKeys(message.getKeys()
== null ? new ArrayList<>():Arrays.asList(message.getKeys()))
.addAllTags(
message.getTags() == null ? new ArrayList<>():List.of(message.getTags()))
.build());
return new ActorResponse(responseBuilder.build(), false);
}
}
Loading
Loading