Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: flagd file polling for offline mode #614

Merged
merged 8 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,73 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
* File connector reads flag configurations and expose the context through {@code Connector} contract.
* File connector reads flag configurations from a given file, polls for changes and expose the content through
* {@code Connector} contract.
* The implementation is kept minimal and suites testing, local development needs.
*/
@SuppressFBWarnings(value = {"EI_EXPOSE_REP", "PATH_TRAVERSAL_IN"},
justification = "File connector read feature flag from a file source.")
@Slf4j
public class FileConnector implements Connector {

private static final int POLL_INTERVAL_MS = 5000;

private final String flagSourcePath;
private final BlockingQueue<StreamPayload> queue = new LinkedBlockingQueue<>(1);
private boolean shutdown = false;

public FileConnector(final String flagSourcePath) {
this.flagSourcePath = flagSourcePath;
}
Kavindu-Dodan marked this conversation as resolved.
Show resolved Hide resolved

/**
* Initialize file connector. Reads content of the provided source file and offer it through queue.
* Initialize file connector. Reads file content, poll for changes and offer content through the queue.
*/
public void init() throws IOException {
final String flagData = new String(Files.readAllBytes(Paths.get(flagSourcePath)), StandardCharsets.UTF_8);
Thread watcherT = new Thread(() -> {
Kavindu-Dodan marked this conversation as resolved.
Show resolved Hide resolved
try {
final Path filePath = Paths.get(flagSourcePath);

// initial read
String flagData = new String(Files.readAllBytes(filePath), StandardCharsets.UTF_8);
if (!queue.offer(new StreamPayload(StreamPayloadType.DATA, flagData))) {
log.warn("Unable to offer file content to queue: queue is full");
}

long lastTS = Files.getLastModifiedTime(filePath).toMillis();

// start polling for changes
while (!shutdown) {
long currentTS = Files.getLastModifiedTime(filePath).toMillis();

if (currentTS > lastTS) {
lastTS = currentTS;
flagData = new String(Files.readAllBytes(filePath), StandardCharsets.UTF_8);
if (!queue.offer(new StreamPayload(StreamPayloadType.DATA, flagData))) {
log.warn("Unable to offer file content to queue: queue is full");
}
}

Thread.sleep(POLL_INTERVAL_MS);
}

if (!queue.offer(new StreamPayload(StreamPayloadType.DATA, flagData))) {
throw new RuntimeException("Unable to write to queue. Queue is full.");
}
log.info("Shutting down file connector.");
} catch (Throwable t) {
log.error("Error from file connector. File connector will exit", t);
if (!queue.offer(new StreamPayload(StreamPayloadType.ERROR, t.toString()))) {
log.warn("Unable to offer file content to queue: queue is full");
}
}
});

watcherT.setDaemon(true);
watcherT.start();
log.info(String.format("Using feature flag configurations from file %s", flagSourcePath));
}

Expand All @@ -50,9 +87,9 @@ public BlockingQueue<StreamPayload> getStream() {
}

/**
* NO-OP shutdown.
* Shutdown file connector.
*/
public void shutdown() throws InterruptedException {
// NO-OP nothing to do here
shutdown = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class TestUtils {
public static final String VALID_LONG = "flagConfigurations/valid-long.json";
public static final String INVALID_FLAG = "flagConfigurations/invalid-flag.json";
public static final String INVALID_CFG = "flagConfigurations/invalid-configuration.json";
public static final String UPDATABLE_FILE = "flagConfigurations/updatableFlags.json";


public static String getFlagsFromResource(final String file) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@

import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;

import static dev.openfeature.contrib.providers.flagd.resolver.process.TestUtils.UPDATABLE_FILE;
import static dev.openfeature.contrib.providers.flagd.resolver.process.TestUtils.VALID_LONG;
import static dev.openfeature.contrib.providers.flagd.resolver.process.TestUtils.getResourcePath;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;

class FileConnectorTest {
Expand All @@ -39,12 +44,61 @@ void readAndExposeFeatureFlagsFromSource() throws IOException {
}

@Test
void throwsErrorIfInvalidFile(){
void emitErrorStateForInvalidPath() throws IOException {
// given
final FileConnector connector = new FileConnector("INVALID_PATH");

// when
connector.init();

// then
final BlockingQueue<StreamPayload> stream = connector.getStream();

// Must emit an error within considerable time
final StreamPayload[] payload = new StreamPayload[1];
assertTimeoutPreemptively(Duration.ofMillis(200), () -> {
payload[0] = stream.take();
});

assertNotNull(payload[0].getData());
assertEquals(StreamPayloadType.ERROR, payload[0].getType());
}

@Test
@Disabled("Disabled as unstable on GH Action. Useful for functionality validation")
void watchForFileUpdatesAndEmitThem() throws IOException {
final String initial = "{\"flags\":{\"myBoolFlag\":{\"state\":\"ENABLED\",\"variants\":{\"on\":true,\"off\":false},\"defaultVariant\":\"on\"}}}";
final String updatedFlags = "{\"flags\":{\"myBoolFlag\":{\"state\":\"ENABLED\",\"variants\":{\"on\":true,\"off\":false},\"defaultVariant\":\"off\"}}}";

// given
final Path updPath = Paths.get(getResourcePath(UPDATABLE_FILE));
Files.write(updPath, initial.getBytes(), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);

final FileConnector connector = new FileConnector(updPath.toString());

// when
connector.init();

// then
assertThrows(IOException.class, connector::init);
final BlockingQueue<StreamPayload> stream = connector.getStream();
final StreamPayload[] payload = new StreamPayload[1];

// first validate the initial payload
assertTimeoutPreemptively(Duration.ofMillis(200), () -> {
payload[0] = stream.take();
});

assertEquals(initial, payload[0].getData());

// then update the flags
Files.write(updPath, updatedFlags.getBytes(), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);

// finally wait for updated payload
assertTimeoutPreemptively(Duration.ofSeconds(10), () -> {
payload[0] = stream.take();
});

assertEquals(updatedFlags, payload[0].getData());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"flags": {
"myBoolFlag": {
"state": "ENABLED",
"variants": {
"on": true,
"off": false
},
"defaultVariant": "on"
}
}
}
Loading