Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: gRPC stream connection deadline #999

Merged
merged 4 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public final class Config {
static final String DEFAULT_HOST = "localhost";

static final int DEFAULT_DEADLINE = 500;
static final int DEFAULT_STREAM_DEADLINE_MS = 10 * 60 * 1000;
static final int DEFAULT_MAX_CACHE_SIZE = 1000;
static final long DEFAULT_KEEP_ALIVE = 0;

Expand All @@ -31,6 +32,7 @@ public final class Config {
static final String MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME = "FLAGD_MAX_EVENT_STREAM_RETRIES";
static final String BASE_EVENT_STREAM_RETRY_BACKOFF_MS_ENV_VAR_NAME = "FLAGD_RETRY_BACKOFF_MS";
static final String DEADLINE_MS_ENV_VAR_NAME = "FLAGD_DEADLINE_MS";
static final String STREAM_DEADLINE_MS_ENV_VAR_NAME = "FLAGD_STREAM_DEADLINE_MS";
static final String SOURCE_SELECTOR_ENV_VAR_NAME = "FLAGD_SOURCE_SELECTOR";
static final String OFFLINE_SOURCE_PATH = "FLAGD_OFFLINE_FLAG_SOURCE_PATH";
static final String KEEP_ALIVE_MS_ENV_VAR_NAME_OLD = "FLAGD_KEEP_ALIVE_TIME";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ public class FlagdOptions {
@Builder.Default
private int deadline = fallBackToEnvOrDefault(Config.DEADLINE_MS_ENV_VAR_NAME, Config.DEFAULT_DEADLINE);

/**
* Streaming connection deadline in milliseconds.
* Set to 0 to disable the deadline.
toddbaert marked this conversation as resolved.
Show resolved Hide resolved
* Defaults to 600000 (10 minutes); recommended to prevent infrastructure from killing idle connections.
*/
@Builder.Default
private int streamDeadlineMs = fallBackToEnvOrDefault(Config.STREAM_DEADLINE_MS_ENV_VAR_NAME,
Config.DEFAULT_STREAM_DEADLINE_MS);

/**
* Selector to be used with flag sync gRPC contract.
**/
Expand All @@ -101,7 +110,7 @@ public class FlagdOptions {
/**
* gRPC client KeepAlive in milliseconds. Disabled with 0.
* Defaults to 0 (disabled).
*
*
**/
@Builder.Default
private long keepAlive = fallBackToEnvOrDefault(Config.KEEP_ALIVE_MS_ENV_VAR_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamResponse;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -52,12 +54,18 @@ public void onNext(EventStreamResponse value) {
}

@Override
public void onError(Throwable t) {
log.warn("event stream", t);
if (this.cache.getEnabled()) {
this.cache.clear();
public void onError(Throwable throwable) {
if (throwable instanceof StatusRuntimeException
&& ((StatusRuntimeException) throwable).getStatus().getCode()
.equals(Code.DEADLINE_EXCEEDED)) {
log.debug(String.format("stream deadline reached; will re-establish"));
} else {
log.error(String.format("event stream error", throwable));
if (this.cache.getEnabled()) {
this.cache.clear();
}
this.onConnectionEvent.accept(false, Collections.emptyList());
}
this.onConnectionEvent.accept(false, Collections.emptyList());

// handle last call of this stream
handleEndOfStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class GrpcConnector {

private final int startEventStreamRetryBackoff;
private final long deadline;
private final long streamDeadlineMs;

private final Cache cache;
private final Consumer<ConnectionEvent> onConnectionEvent;
Expand Down Expand Up @@ -64,6 +65,7 @@ public GrpcConnector(final FlagdOptions options, final Cache cache, final Suppli
this.startEventStreamRetryBackoff = options.getRetryBackoffMs();
this.eventStreamRetryBackoff = options.getRetryBackoffMs();
this.deadline = options.getDeadline();
this.streamDeadlineMs = options.getStreamDeadlineMs();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[suggestion] using the options object instead of separate fields would reduce this over head of adding a new field all the time

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can go either way on this one. In fact it might be a good thing to do in a pure refactor/cleanup PR. There's also some naming that we can probably improve with the provider.

this.cache = cache;
this.onConnectionEvent = onConnectionEvent;
this.connectedSupplier = connectedSupplier;
Expand Down Expand Up @@ -126,7 +128,14 @@ private void observeEventStream() {
while (this.eventStreamAttempt <= this.maxEventStreamRetries) {
final StreamObserver<EventStreamResponse> responseObserver = new EventStreamObserver(sync, this.cache,
this::onConnectionEvent);
this.serviceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver);

ServiceGrpc.ServiceStub localServiceStub = this.serviceStub;

if (this.streamDeadlineMs > 0) {
localServiceStub = localServiceStub.withDeadlineAfter(this.streamDeadlineMs, TimeUnit.MILLISECONDS);
}

localServiceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver);

try {
synchronized (sync) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.ManagedChannel;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -43,6 +45,7 @@ public class GrpcStreamConnector implements Connector {
private final FlagSyncServiceStub serviceStub;
private final FlagSyncServiceBlockingStub serviceBlockingStub;
private final int deadline;
private final int streamDeadlineMs;
private final String selector;

/**
Expand All @@ -55,6 +58,7 @@ public GrpcStreamConnector(final FlagdOptions options) {
serviceStub = FlagSyncServiceGrpc.newStub(channel);
serviceBlockingStub = FlagSyncServiceGrpc.newBlockingStub(channel);
deadline = options.getDeadline();
streamDeadlineMs = options.getStreamDeadlineMs();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[suggestion] Cant we just use the options within this class, and pass it further down the call chain? - adding a new field and parameters might be tedious over time

selector = options.getSelector();
}

Expand All @@ -64,7 +68,8 @@ public GrpcStreamConnector(final FlagdOptions options) {
public void init() {
Thread listener = new Thread(() -> {
try {
observeEventStream(blockingQueue, shutdown, serviceStub, serviceBlockingStub, selector, deadline);
observeEventStream(blockingQueue, shutdown, serviceStub, serviceBlockingStub, selector, deadline,
streamDeadlineMs);
} catch (InterruptedException e) {
log.warn("gRPC event stream interrupted, flag configurations are stale", e);
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -114,7 +119,8 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo,
final FlagSyncServiceStub serviceStub,
final FlagSyncServiceBlockingStub serviceBlockingStub,
final String selector,
final int deadline)
final int deadline,
final int streamDeadlineMs)
throws InterruptedException {

final BlockingQueue<GrpcResponseModel> streamReceiver = new LinkedBlockingQueue<>(QUEUE_SIZE);
Expand All @@ -128,14 +134,20 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo,
log.debug("Initializing sync stream request");
final SyncFlagsRequest.Builder syncRequest = SyncFlagsRequest.newBuilder();
final GetMetadataRequest.Builder metadataRequest = GetMetadataRequest.newBuilder();
GetMetadataResponse metadataResponse = GetMetadataResponse.getDefaultInstance();
GetMetadataResponse metadataResponse = GetMetadataResponse.getDefaultInstance();

if (selector != null) {
syncRequest.setSelector(selector);
}

try (CancellableContext context = Context.current().withCancellation()) {
serviceStub.syncFlags(syncRequest.build(), new GrpcStreamHandler(streamReceiver));
FlagSyncServiceStub localServiceStub = serviceStub;
if (streamDeadlineMs > 0) {
localServiceStub = localServiceStub.withDeadlineAfter(streamDeadlineMs, TimeUnit.MILLISECONDS);
}

localServiceStub.syncFlags(syncRequest.build(), new GrpcStreamHandler(streamReceiver));

try {
metadataResponse = serviceBlockingStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS)
.getMetadata(metadataRequest.build());
Expand All @@ -158,14 +170,21 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo,
}

if (response.getError() != null || metadataException != null) {
log.error(String.format("Error from initializing stream or metadata, retrying in %dms",
retryDelay), response.getError());

if (!writeTo.offer(
new QueuePayload(QueuePayloadType.ERROR, "Error from stream or metadata",
metadataResponse))) {
log.error("Failed to convey ERROR status, queue is full");
if (response.getError() instanceof StatusRuntimeException
&& ((StatusRuntimeException) response.getError()).getStatus().getCode()
.equals(Code.DEADLINE_EXCEEDED)) {
log.debug(String.format("Stream deadline reached, re-establishing in %dms",
retryDelay));
} else {
log.error(String.format("Error initializing stream or metadata, retrying in %dms",
retryDelay), response.getError());
if (!writeTo.offer(
new QueuePayload(QueuePayloadType.ERROR, "Error from stream or metadata",
metadataResponse))) {
log.error("Failed to convey ERROR status, queue is full");
}
}

// close the context to cancel the stream in case just the metadata call failed
context.cancel(metadataException);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -24,6 +25,8 @@

import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamResponse;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;

class EventStreamObserverTest {

Expand Down Expand Up @@ -83,6 +86,15 @@ public void reconnections() {
assertFalse(states.get(0));
}

@Test
public void deadlineExceeded() {
stream.onError(new StatusRuntimeException(Status.DEADLINE_EXCEEDED));
// we flush the cache
verify(cache, never()).clear();
// we notify the error
assertEquals(0, states.size());
}

@Test
public void cacheBustingForKnownKeys() {
final String key1 = "myKey1";
Expand Down
Loading