Skip to content

Commit

Permalink
fixup: fail init and delay parsing, readme
Browse files Browse the repository at this point in the history
Signed-off-by: Todd Baert <todd.baert@dynatrace.com>
  • Loading branch information
toddbaert committed Sep 27, 2024
1 parent 8b0bc85 commit 0891c8a
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 96 deletions.
7 changes: 7 additions & 0 deletions providers/flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ FlagdProvider flagdProvider = new FlagdProvider(

In the above example, in-process handlers attempt to connect to a sync service on address `localhost:8013` to obtain [flag definitions](https://github.com/open-feature/schemas/blob/main/json/flags.json).

#### Sync-metadata

To support the injection of contextual data configured in flagd for in-process evaluation, the provider exposes a `getSyncMetadata` accessor which provides the most recent value returned by the [GetMetadata RPC](https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1#flagd.sync.v1.FlagSyncService.GetMetadata).
The value is updated with every (re)connection to the sync implementation.
This can be used to enrich evaluations with such data.
If the `in-process` mode is not used, and before the provider is ready, the `getSyncMetadata` returns an empty map.

#### Offline mode

In-process resolvers can also work in an offline mode.
Expand Down
2 changes: 1 addition & 1 deletion providers/flagd/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
</properties>

<name>flagd</name>
<description>FlagD provider for Java</description>
<description>flagd provider for Java</description>
<url>https://openfeature.dev</url>

<developers>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
@Slf4j
@SuppressWarnings({ "PMD.TooManyStaticImports", "checkstyle:NoFinalizer" })
public class FlagdProvider extends EventProvider {
private static final String FLAGD_PROVIDER = "flagD Provider";
private static final String FLAGD_PROVIDER = "flagd Provider";
private final Resolver flagResolver;
private volatile boolean initialized = false;
private volatile boolean connected = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage;

import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import lombok.extern.slf4j.Slf4j;
import static dev.openfeature.contrib.providers.flagd.resolver.common.Convert.convertProtobufMapToStructure;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -17,6 +14,14 @@
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import java.util.stream.Collectors;

import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import lombok.extern.slf4j.Slf4j;

/**
* Feature flag storage.
*/
Expand Down Expand Up @@ -97,12 +102,14 @@ private void streamerListener(final Connector connector) throws InterruptedExcep
final BlockingQueue<QueuePayload> streamPayloads = connector.getStream();

while (!shutdown.get()) {
final QueuePayload take = streamPayloads.take();
switch (take.getType()) {
final QueuePayload payload = streamPayloads.take();
switch (payload.getType()) {
case DATA:
try {
List<String> changedFlagsKeys;
Map<String, FeatureFlag> flagMap = FlagParser.parseString(take.getFlagData(), throwIfInvalid);
Map<String, FeatureFlag> flagMap = FlagParser.parseString(payload.getFlagData(),
throwIfInvalid);
Map<String, Object> metadata = parseSyncMetadata(payload.getMetadataResponse());
writeLock.lock();
try {
changedFlagsKeys = getChangedFlagsKeys(flagMap);
Expand All @@ -111,7 +118,8 @@ private void streamerListener(final Connector connector) throws InterruptedExcep
} finally {
writeLock.unlock();
}
if (!stateBlockingQueue.offer(new StorageStateChange(StorageState.OK, changedFlagsKeys))) {
if (!stateBlockingQueue
.offer(new StorageStateChange(StorageState.OK, changedFlagsKeys, metadata))) {
log.warn("Failed to convey OK satus, queue is full");
}
} catch (Throwable e) {
Expand All @@ -128,13 +136,23 @@ private void streamerListener(final Connector connector) throws InterruptedExcep
}
break;
default:
log.info(String.format("Payload with unknown type: %s", take.getType()));
log.info(String.format("Payload with unknown type: %s", payload.getType()));
}
}

log.info("Shutting down store stream listener");
}

private Map<String, Object> parseSyncMetadata(GetMetadataResponse metadataResponse) {
try {
return convertProtobufMapToStructure(metadataResponse.getMetadata().getFieldsMap())
.asObjectMap();
} catch (Exception exception) {
log.error("Failed to parse metadataResponse, provider metadata may not be up-to-date");
}
return Collections.emptyMap();
}

private List<String> getChangedFlagsKeys(Map<String, FeatureFlag> newFlags) {
Map<String, FeatureFlag> changedFlags = new HashMap<>();
Map<String, FeatureFlag> addedFeatureFlags = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector;

import java.util.Map;
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
import lombok.AllArgsConstructor;
import lombok.Getter;

Expand All @@ -12,5 +12,9 @@
public class QueuePayload {
private final QueuePayloadType type;
private final String flagData;
private final Map<String, Object> syncMetadata;
private final GetMetadataResponse metadataResponse;

public QueuePayload(QueuePayloadType type, String flagData) {
this(type, flagData, GetMetadataResponse.getDefaultInstance());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

Expand Down Expand Up @@ -46,7 +45,7 @@ public void init() throws IOException {

// initial read
String flagData = new String(Files.readAllBytes(filePath), StandardCharsets.UTF_8);
if (!queue.offer(new QueuePayload(QueuePayloadType.DATA, flagData, Collections.emptyMap()))) {
if (!queue.offer(new QueuePayload(QueuePayloadType.DATA, flagData))) {
log.warn(OFFER_WARN);
}

Expand All @@ -59,7 +58,7 @@ public void init() throws IOException {
if (currentTS > lastTS) {
lastTS = currentTS;
flagData = new String(Files.readAllBytes(filePath), StandardCharsets.UTF_8);
if (!queue.offer(new QueuePayload(QueuePayloadType.DATA, flagData, Collections.emptyMap()))) {
if (!queue.offer(new QueuePayload(QueuePayloadType.DATA, flagData))) {
log.warn(OFFER_WARN);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc;

import static dev.openfeature.contrib.providers.flagd.resolver.common.Convert.convertProtobufMapToStructure;

import java.util.Collections;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -23,6 +19,8 @@
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest;
import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.ManagedChannel;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -125,68 +123,66 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo,
log.info("Initializing sync stream observer");

while (!shutdown.get()) {
writeTo.clear();
Exception metadataException = null;
log.debug("Initializing sync stream request");
final SyncFlagsRequest.Builder syncRequest = SyncFlagsRequest.newBuilder();
final GetMetadataRequest.Builder metadataRequest = GetMetadataRequest.newBuilder();
Map<String, Object> metadata = Collections.emptyMap();
GetMetadataResponse metadataResponse = GetMetadataResponse.getDefaultInstance();

if (selector != null) {
syncRequest.setSelector(selector);
}

serviceStub.syncFlags(syncRequest.build(), new GrpcStreamHandler(streamReceiver));
try {
GetMetadataResponse metadataResponse = serviceBlockingStub
.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS).getMetadata(metadataRequest.build());
metadata = convertProtobufMapToStructure(metadataResponse.getMetadata().getFieldsMap()).asObjectMap();
} catch (Exception e) {
// the chances this call fails but the syncRequest does not are slim
// it could be that the server doesn't implement this RPC
// instead of logging here, retain the exception and only log if the
// streamReceiver doesn't error
metadataException = e;
}

while (!shutdown.get()) {
final GrpcResponseModel response = streamReceiver.take();

if (response.isComplete()) {
log.info("Sync stream completed");
// The stream is complete, this isn't really an error but we should try to
// reconnect
break;
try (CancellableContext context = Context.current().withCancellation()) {
serviceStub.syncFlags(syncRequest.build(), new GrpcStreamHandler(streamReceiver));
try {
metadataResponse = serviceBlockingStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS)
.getMetadata(metadataRequest.build());
} catch (Exception e) {
// the chances this call fails but the syncRequest does not are slim
// it could be that the server doesn't implement this RPC
// instead of logging and throwing here, retain the exception and handle in the
// stream logic below
metadataException = e;
}

if (response.getError() != null) {
log.error(String.format("Error from grpc connection, retrying in %dms", retryDelay),
response.getError());
while (!shutdown.get()) {
final GrpcResponseModel response = streamReceiver.take();

if (!writeTo.offer(
new QueuePayload(QueuePayloadType.ERROR, "Error from stream connection, retrying",
metadata))) {
log.error("Failed to convey ERROR status, queue is full");
if (response.isComplete()) {
log.info("Sync stream completed");
// The stream is complete, this isn't really an error but we should try to
// reconnect
break;
}
break;
}

final SyncFlagsResponse flagsResponse = response.getSyncFlagsResponse();
String data = flagsResponse.getFlagConfiguration();
log.debug("Got stream response: " + data);
if (response.getError() != null || metadataException != null) {
log.error(String.format("Error from initializing stream or metadata, retrying in %dms",
retryDelay), response.getError());

if (!writeTo.offer(
new QueuePayload(QueuePayloadType.ERROR, "Error from stream or metadata",
metadataResponse))) {
log.error("Failed to convey ERROR status, queue is full");
}
// close the context to cancel the stream in case just the metadata call failed
context.cancel(metadataException);
break;
}

if (!writeTo.offer(
new QueuePayload(QueuePayloadType.DATA, data, metadata))) {
log.error("Stream writing failed");
}
final SyncFlagsResponse flagsResponse = response.getSyncFlagsResponse();
String data = flagsResponse.getFlagConfiguration();
log.debug("Got stream response: " + data);

if (metadataException != null) {
// if we somehow are connected but the metadata call failed, something strange
// happened
log.error("Stream connected but getMetadata RPC failed", metadataException);
}
if (!writeTo.offer(
new QueuePayload(QueuePayloadType.DATA, data, metadataResponse))) {
log.error("Stream writing failed");
}

// reset retry delay if we succeeded in a retry attempt
retryDelay = INIT_BACK_OFF;
// reset retry delay if we succeeded in a retry attempt
retryDelay = INIT_BACK_OFF;
}
}

// check for shutdown and avoid sleep
Expand All @@ -196,7 +192,7 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo,
}

// busy wait till next attempt
log.warn(String.format("Stream failed, retrying in %dms", retryDelay));
log.debug(String.format("Stream failed, retrying in %dms", retryDelay));
Thread.sleep(retryDelay + RANDOM.nextInt(INIT_BACK_OFF));

if (retryDelay < MAX_BACK_OFF) {
Expand All @@ -207,5 +203,4 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo,
// log as this can happen after awakened from backoff sleep
log.info("Shutdown invoked, exiting event stream listener");
}

}
Loading

0 comments on commit 0891c8a

Please sign in to comment.