Skip to content

Commit

Permalink
refactor retry logic
Browse files Browse the repository at this point in the history
Signed-off-by: Kavindu Dodanduwa <kavindudodanduwa@gmail.com>
  • Loading branch information
Kavindu-Dodan committed Aug 15, 2023
1 parent 9bb68ae commit 597e3b2
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
@SuppressFBWarnings(justification = "cache needs to be read and write by multiple objects")
class EventStreamObserver implements StreamObserver<EventStreamResponse> {
private final Consumer<ProviderState> stateConsumer;
private final Runnable reconnectEventStream;
private final Object sync;
private final Cache cache;

private static final String CONFIGURATION_CHANGE = "configuration_change";
Expand All @@ -32,10 +32,10 @@ class EventStreamObserver implements StreamObserver<EventStreamResponse> {
* @param stateConsumer lambda to call for setting the state
* @param reconnectEventStream callback for trying to recreate the stream
*/
EventStreamObserver(Cache cache, Consumer<ProviderState> stateConsumer, Runnable reconnectEventStream) {
EventStreamObserver(Object sync ,Cache cache, Consumer<ProviderState> stateConsumer) {
this.sync = sync;
this.cache = cache;
this.stateConsumer = stateConsumer;
this.reconnectEventStream = reconnectEventStream;
}

@Override
Expand All @@ -59,7 +59,9 @@ public void onError(Throwable t) {
this.cache.clear();
}
this.stateConsumer.accept(ProviderState.ERROR);
this.reconnectEventStream.run();
synchronized (this.sync) {
this.sync.notify();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
@SuppressFBWarnings(justification = "cache needs to be read and write by multiple objects")
public class GrpcConnector {

private final Object sync = new Object();
private final ServiceGrpc.ServiceBlockingStub serviceBlockingStub;
private final ServiceGrpc.ServiceStub serviceStub;
private final ManagedChannel channel;
Expand All @@ -45,8 +46,9 @@ public class GrpcConnector {

/**
* GrpcConnector creates an abstraction over gRPC communication.
* @param options options to build the gRPC channel.
* @param cache cache to use.
*
* @param options options to build the gRPC channel.
* @param cache cache to use.
* @param stateConsumer lambda to call for setting the state.
*/
public GrpcConnector(final FlagdOptions options, final Cache cache, Consumer<ProviderState> stateConsumer) {
Expand All @@ -60,9 +62,10 @@ public GrpcConnector(final FlagdOptions options, final Cache cache, Consumer<Pro
this.cache = cache;
this.stateConsumer = stateConsumer;
}

/**
* Initialize the gRPC stream.
*
* @throws RuntimeException if the connection cannot be established.
*/
public void initialize() throws RuntimeException {
Expand All @@ -79,12 +82,13 @@ public void initialize() throws RuntimeException {
}
} finally {
// try in background to open the event stream
this.startEventStream();
new Thread(this::observeEventStream).start();
}
}

/**
* Shuts down all gRPC resources.
*
* @throws Exception is something goes wrong while terminating the communication.
*/
public void shutdown() throws Exception {
Expand All @@ -103,48 +107,48 @@ public void shutdown() throws Exception {

/**
* Provide the object that can be used to resolve Feature Flag values.
*
* @return a {@link ServiceGrpc.ServiceBlockingStub} for running FF resolution.
*/
public ServiceGrpc.ServiceBlockingStub getResolver() {
return serviceBlockingStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS);
}

/**
* Resets internal state for retrying establishing the stream.
*/
private void resetRetryConnection() {
this.eventStreamAttempt = 1;
this.eventStreamRetryBackoff = this.startEventStreamRetryBackoff;
}
private void observeEventStream() {
while (this.eventStreamAttempt <= this.maxEventStreamRetries) {
StreamObserver<Schema.EventStreamResponse> responseObserver =
new EventStreamObserver(sync, this.cache, this::grpcStateConsumer);
this.serviceStub.eventStream(Schema.EventStreamRequest.getDefaultInstance(), responseObserver);

private void startEventStream() {
StreamObserver<Schema.EventStreamResponse> responseObserver =
new EventStreamObserver(this.cache, this::grpcStateConsumer, this::restartEventStream);
this.serviceStub
.eventStream(Schema.EventStreamRequest.getDefaultInstance(), responseObserver);
}
try {
synchronized (sync) {
sync.wait();
}
} catch (InterruptedException e) {
log.warn("interruption while waiting for condition", e);
return;
}

private void restartEventStream() {
this.eventStreamAttempt++;
if (this.eventStreamAttempt > this.maxEventStreamRetries) {
log.error("failed to connect to event stream, exhausted retries");
this.stateConsumer.accept(ProviderState.ERROR);
return;
}
this.eventStreamRetryBackoff = 2 * this.eventStreamRetryBackoff;
try {
Thread.sleep(this.eventStreamRetryBackoff);
} catch (InterruptedException e) {
log.debug("Failed to sleep while restarting gRPC Event Stream");
this.eventStreamAttempt++;
this.eventStreamRetryBackoff = 2 * this.eventStreamRetryBackoff;

try {
Thread.sleep(this.eventStreamRetryBackoff);
} catch (InterruptedException e) {
log.debug("failed to block while restarting gRPC Event Stream");
return;
}
}
this.startEventStream();
}

log.error("failed to connect to event stream, exhausted retries");
this.grpcStateConsumer(ProviderState.ERROR);
}

private void grpcStateConsumer(final ProviderState state){
private void grpcStateConsumer(final ProviderState state) {
// check for readiness
if (ProviderState.READY.equals(state)) {
resetRetryConnection();
this.eventStreamAttempt = 1;
this.eventStreamRetryBackoff = this.startEventStreamRetryBackoff;
}

// chain to initiator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
import com.google.protobuf.Struct;
import dev.openfeature.contrib.providers.flagd.cache.Cache;
import dev.openfeature.contrib.providers.flagd.cache.CacheType;
import dev.openfeature.contrib.providers.flagd.grpc.GrpcResolution;
import dev.openfeature.contrib.providers.flagd.grpc.GrpcConnector;
import dev.openfeature.flagd.grpc.Schema.EventStreamRequest;
import dev.openfeature.contrib.providers.flagd.grpc.GrpcResolution;
import dev.openfeature.flagd.grpc.Schema.EventStreamResponse;
import dev.openfeature.flagd.grpc.Schema.ResolveBooleanRequest;
import dev.openfeature.flagd.grpc.Schema.ResolveBooleanResponse;
Expand All @@ -27,10 +26,8 @@
import dev.openfeature.sdk.Value;
import io.grpc.Channel;
import io.grpc.Deadline;
import io.grpc.stub.StreamObserver;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;

import java.lang.reflect.Field;
Expand All @@ -51,7 +48,6 @@
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

class FlagdProviderTest {
Expand Down Expand Up @@ -491,9 +487,6 @@ void invalidate_cache() throws Exception {
FlagdProvider provider = createProvider(grpc);

provider.initialize(null);
ArgumentCaptor<StreamObserver<EventStreamResponse>> streamObserverCaptor =
ArgumentCaptor.forClass(StreamObserver.class);
verify(serviceStubMock).eventStream(any(EventStreamRequest.class), streamObserverCaptor.capture());

//provider.setState(ProviderState.READY);
OpenFeatureAPI.getInstance().setProvider(provider);
Expand Down Expand Up @@ -522,9 +515,6 @@ void invalidate_cache() throws Exception {
FlagEvaluationDetails<Double> floatDetails;
FlagEvaluationDetails<Value> objectDetails;

// should clear cache
streamObserverCaptor.getValue().onNext(eResponse);

// assert cache has been invalidated
booleanDetails = api.getClient().getBooleanDetails(FLAG_KEY_BOOLEAN, false);
assertTrue(booleanDetails.getValue());
Expand Down Expand Up @@ -713,10 +703,6 @@ void disabled_cache() {
FlagdProvider provider = createProvider(grpc, cache, () -> ProviderState.READY);
provider.initialize(null);

ArgumentCaptor<StreamObserver<EventStreamResponse>> streamObserverCaptor =
ArgumentCaptor.forClass(StreamObserver.class);
verify(serviceStubMock).eventStream(any(EventStreamRequest.class), streamObserverCaptor.capture());

OpenFeatureAPI.getInstance().setProvider(provider);

HashMap<String, com.google.protobuf.Value> flagsMap = new HashMap<>();
Expand All @@ -740,9 +726,6 @@ void disabled_cache() {
FlagEvaluationDetails<Double> floatDetails = api.getClient().getDoubleDetails(FLAG_KEY_DOUBLE, 0.1);
FlagEvaluationDetails<Value> objectDetails = api.getClient().getObjectDetails(FLAG_KEY_OBJECT, new Value());

// should not cause a change of state
streamObserverCaptor.getValue().onNext(eResponse);

// assert values are not cached
booleanDetails = api.getClient().getBooleanDetails(FLAG_KEY_BOOLEAN, false);
assertTrue(booleanDetails.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ class StateChange {
List<ProviderState> states;
EventStreamObserver stream;
Runnable reconnect;
Object sync;

@BeforeEach
void setUp() {
states = new ArrayList<>();
sync = new Object();
cache = mock(Cache.class);
reconnect = mock(Runnable.class);
when(cache.getEnabled()).thenReturn(true);
stream = new EventStreamObserver(cache, state -> states.add(state), reconnect);
stream = new EventStreamObserver(sync, cache, state -> states.add(state));
}

@Test
Expand Down

0 comments on commit 597e3b2

Please sign in to comment.