Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add more logging in sync stream #929

Merged
merged 4 commits into from
Aug 27, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Implements the {@link Connector} contract and emit flags obtained from flagd sync gRPC contract.
* Implements the {@link Connector} contract and emit flags obtained from flagd
* sync gRPC contract.
*/
@Slf4j
@SuppressFBWarnings(value = {"PREDICTABLE_RANDOM", "EI_EXPOSE_REP"},
justification = "Random is used to generate a variation & flag configurations require exposing")
@SuppressFBWarnings(value = { "PREDICTABLE_RANDOM",
"EI_EXPOSE_REP" }, justification = "Random is used to generate a variation & flag configurations require exposing")
public class GrpcStreamConnector implements Connector {
private static final Random RANDOM = new Random();

Expand Down Expand Up @@ -111,40 +112,47 @@ public void shutdown() throws InterruptedException {
* Contains blocking calls, to be used concurrently.
*/
static void observeEventStream(final BlockingQueue<StreamPayload> writeTo,
final AtomicBoolean shutdown,
final FlagSyncServiceStub serviceStub,
final SyncFlagsRequest request)
final AtomicBoolean shutdown,
final FlagSyncServiceStub serviceStub,
final SyncFlagsRequest request)
throws InterruptedException {

final BlockingQueue<GrpcResponseModel> streamReceiver = new LinkedBlockingQueue<>(QUEUE_SIZE);
int retryDelay = INIT_BACK_OFF;

log.info("Initializing sync stream observer");

while (!shutdown.get()) {
log.debug("Initializing sync stream request");
serviceStub.syncFlags(request, new GrpcStreamHandler(streamReceiver));

while (!shutdown.get()) {
final GrpcResponseModel response = streamReceiver.take();

if (response.isComplete()) {
// The stream is complete. This is not considered as an error
log.info("Sync stream completed");
// The stream is complete, this isn't really an error but we should try to
// reconnect
break;
}

if (response.getError() != null) {
log.warn(String.format("Error from grpc connection, retrying in %dms", retryDelay),
log.error(String.format("Error from grpc connection, retrying in %dms", retryDelay),
response.getError());

if (!writeTo.offer(
new StreamPayload(StreamPayloadType.ERROR, "Error from stream connection, retrying"))) {
log.warn("Failed to convey ERROR satus, queue is full");
log.error("Failed to convey ERROR status, queue is full");
}
break;
}

final SyncFlagsResponse flagsResponse = response.getSyncFlagsResponse();
String data = flagsResponse.getFlagConfiguration();
log.debug("Got stream response: " + data);
if (!writeTo.offer(
new StreamPayload(StreamPayloadType.DATA, flagsResponse.getFlagConfiguration()))) {
log.warn("Stream writing failed");
new StreamPayload(StreamPayloadType.DATA, data))) {
log.error("Stream writing failed");
}

// reset retry delay if we succeeded in a retry attempt
Expand All @@ -158,6 +166,7 @@ static void observeEventStream(final BlockingQueue<StreamPayload> writeTo,
}

// busy wait till next attempt
log.warn(String.format("Stream failed, retrying in %dms", retryDelay));
Thread.sleep(retryDelay + RANDOM.nextInt(INIT_BACK_OFF));

if (retryDelay < MAX_BACK_OFF) {
Expand Down
Loading