Skip to content

Commit

Permalink
feat(flagd): add graceful attempts
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Schrottner <simon.schrottner@dynatrace.com>
  • Loading branch information
aepfli committed Dec 6, 2024
1 parent c74d6ad commit f2eefab
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dataclasses
import os
import typing
from enum import Enum
Expand All @@ -19,10 +20,13 @@ class CacheType(Enum):
DEFAULT_HOST = "localhost"
DEFAULT_KEEP_ALIVE = 0
DEFAULT_OFFLINE_SOURCE_PATH: typing.Optional[str] = None
DEFAULT_OFFLINE_POLL_MS = 5000
DEFAULT_PORT_IN_PROCESS = 8015
DEFAULT_PORT_RPC = 8013
DEFAULT_RESOLVER_TYPE = ResolverType.RPC
DEFAULT_RETRY_BACKOFF = 1000
DEFAULT_RETRY_BACKOFF_MAX = 120000
DEFAULT_RETRY_GRACE_ATTEMPTS = 5
DEFAULT_STREAM_DEADLINE = 600000
DEFAULT_TLS = False

Expand All @@ -32,9 +36,12 @@ class CacheType(Enum):
ENV_VAR_HOST = "FLAGD_HOST"
ENV_VAR_KEEP_ALIVE_TIME_MS = "FLAGD_KEEP_ALIVE_TIME_MS"
ENV_VAR_OFFLINE_FLAG_SOURCE_PATH = "FLAGD_OFFLINE_FLAG_SOURCE_PATH"
ENV_VAR_OFFLINE_POLL_MS = "FLAGD_OFFLINE_POLL_MS"
ENV_VAR_PORT = "FLAGD_PORT"
ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER"
ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS"
ENV_VAR_RETRY_BACKOFF_MAX_MS = "FLAGD_RETRY_BACKOFF_MAX_MS"
ENV_VAR_RETRY_GRACE_ATTEMPTS = "FLAGD_RETRY_GRACE_ATTEMPTS"
ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS"
ENV_VAR_TLS = "FLAGD_TLS"

Expand Down Expand Up @@ -62,6 +69,7 @@ def env_or_default(
return val if cast is None else cast(val)


@dataclasses.dataclass
class Config:
def __init__( # noqa: PLR0913
self,
Expand All @@ -70,7 +78,10 @@ def __init__( # noqa: PLR0913
tls: typing.Optional[bool] = None,
resolver_type: typing.Optional[ResolverType] = None,
offline_flag_source_path: typing.Optional[str] = None,
offline_poll_ms: typing.Optional[int] = None,
retry_backoff_ms: typing.Optional[int] = None,
retry_backoff_max_ms: typing.Optional[int] = None,
retry_grace_attempts: typing.Optional[int] = None,
deadline: typing.Optional[int] = None,
stream_deadline_ms: typing.Optional[int] = None,
keep_alive: typing.Optional[int] = None,
Expand All @@ -94,6 +105,25 @@ def __init__( # noqa: PLR0913
if retry_backoff_ms is None
else retry_backoff_ms
)
self.retry_backoff_max_ms: int = (
int(
env_or_default(
ENV_VAR_RETRY_BACKOFF_MAX_MS, DEFAULT_RETRY_BACKOFF_MAX, cast=int
)
)
if retry_backoff_max_ms is None
else retry_backoff_max_ms
)

self.retry_grace_attempts: int = (
int(
env_or_default(
ENV_VAR_RETRY_GRACE_ATTEMPTS, DEFAULT_RETRY_GRACE_ATTEMPTS, cast=int
)
)
if retry_grace_attempts is None
else retry_grace_attempts
)

self.resolver_type = (
env_or_default(
Expand Down Expand Up @@ -123,6 +153,16 @@ def __init__( # noqa: PLR0913
else offline_flag_source_path
)

self.offline_poll_ms: int = (
int(
env_or_default(
ENV_VAR_OFFLINE_POLL_MS, DEFAULT_OFFLINE_POLL_MS, cast=int
)
)
if offline_poll_ms is None
else offline_poll_ms
)

self.deadline: int = (
int(env_or_default(ENV_VAR_DEADLINE_MS, DEFAULT_DEADLINE, cast=int))
if deadline is None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def __init__( # noqa: PLR0913
keep_alive_time: typing.Optional[int] = None,
cache_type: typing.Optional[CacheType] = None,
max_cache_size: typing.Optional[int] = None,
retry_backoff_max_ms: typing.Optional[int] = None,
retry_grace_attempts: typing.Optional[int] = None,
):
"""
Create an instance of the FlagdProvider
Expand Down Expand Up @@ -81,6 +83,8 @@ def __init__( # noqa: PLR0913
tls=tls,
deadline=deadline,
retry_backoff_ms=retry_backoff_ms,
retry_backoff_max_ms=retry_backoff_max_ms,
retry_grace_attempts=retry_grace_attempts,
resolver_type=resolver_type,
offline_flag_source_path=offline_flag_source_path,
stream_deadline_ms=stream_deadline_ms,
Expand All @@ -97,6 +101,7 @@ def setup_resolver(self) -> AbstractResolver:
self.config,
self.emit_provider_ready,
self.emit_provider_error,
self.emit_provider_stale,
self.emit_provider_configuration_changed,
)
elif self.config.resolver_type == ResolverType.IN_PROCESS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,20 @@


class GrpcResolver:
MAX_BACK_OFF = 120

def __init__(
self,
config: Config,
emit_provider_ready: typing.Callable[[ProviderEventDetails], None],
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
emit_provider_stale: typing.Callable[[ProviderEventDetails], None],
emit_provider_configuration_changed: typing.Callable[
[ProviderEventDetails], None
],
):
self.config = config
self.emit_provider_ready = emit_provider_ready
self.emit_provider_error = emit_provider_error
self.emit_provider_stale = emit_provider_stale
self.emit_provider_configuration_changed = emit_provider_configuration_changed
self.cache: typing.Optional[BaseCacheImpl] = (
LRUCache(maxsize=self.config.max_cache_size)
Expand All @@ -59,6 +59,8 @@ def __init__(
)
self.stub, self.channel = self._create_stub()
self.retry_backoff_seconds = config.retry_backoff_ms * 0.001
self.retry_backoff_max_seconds = config.retry_backoff_ms * 0.001
self.retry_grace_attempts = config.retry_grace_attempts
self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001
self.deadline = config.deadline * 0.001
self.connected = False
Expand All @@ -74,9 +76,6 @@ def _create_stub(
)
stub = evaluation_pb2_grpc.ServiceStub(channel)

if self.cache:
self.cache.clear()

return stub, channel

def initialize(self, evaluation_context: EvaluationContext) -> None:
Expand Down Expand Up @@ -113,8 +112,10 @@ def listen(self) -> None:
if self.streamline_deadline_seconds > 0
else {}
)
retry_counter = 0
while self.active:
request = evaluation_pb2.EventStreamRequest()

try:
logger.debug("Setting up gRPC sync flags connection")
for message in self.stub.EventStream(request, **call_args):
Expand All @@ -126,6 +127,7 @@ def listen(self) -> None:
)
)
self.connected = True
retry_counter = 0
# reset retry delay after successsful read
retry_delay = self.retry_backoff_seconds

Expand All @@ -146,15 +148,37 @@ def listen(self) -> None:
)

self.connected = False
self.handle_error(retry_counter, retry_delay)

retry_delay = self.handle_retry(retry_counter, retry_delay)

retry_counter = retry_counter + 1

def handle_retry(self, retry_counter: int, retry_delay: float) -> float:
if retry_counter == 0:
logger.info("gRPC sync disconnected, reconnecting immediately")
else:
logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s")
time.sleep(retry_delay)
retry_delay = min(1.1 * retry_delay, self.retry_backoff_max_seconds)
return retry_delay

def handle_error(self, retry_counter: int, retry_delay: float) -> None:
if retry_counter == self.retry_grace_attempts:
if self.cache:
self.cache.clear()
self.emit_provider_error(
ProviderEventDetails(
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
error_code=ErrorCode.GENERAL,
)
)
logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s")
time.sleep(retry_delay)
retry_delay = min(1.1 * retry_delay, self.MAX_BACK_OFF)
elif retry_counter == 1:
self.emit_provider_stale(
ProviderEventDetails(
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
)
)

def handle_changed_flags(self, data: typing.Any) -> None:
changed_flags = list(data["flags"].keys())
Expand Down
29 changes: 0 additions & 29 deletions providers/openfeature-provider-flagd/tests/e2e/events.feature

This file was deleted.

2 changes: 1 addition & 1 deletion providers/openfeature-provider-flagd/tests/e2e/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ def assert_handlers(
)
)
def assert_handler_run(event_type: ProviderEvent, event_handles):
assert_handlers(event_handles, event_type, max_wait=6)
assert_handlers(event_handles, event_type, max_wait=30)


@then(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ def image():

scenarios(
f"{TEST_HARNESS_PATH}/gherkin/flagd-reconnect.feature",
f"{TEST_HARNESS_PATH}/gherkin/events.feature",
)

0 comments on commit f2eefab

Please sign in to comment.