diff --git a/providers/openfeature-provider-flagd/openfeature/test-harness b/providers/openfeature-provider-flagd/openfeature/test-harness index 96d0744e..e132d258 160000 --- a/providers/openfeature-provider-flagd/openfeature/test-harness +++ b/providers/openfeature-provider-flagd/openfeature/test-harness @@ -1 +1 @@ -Subproject commit 96d0744e65ff81f748ab137ef37cdbf7b42ff882 +Subproject commit e132d25822eaad367f81cf2a06b422edac32a76d diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py index fb80f9db..596af31c 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py @@ -1,3 +1,4 @@ +import dataclasses import os import typing from enum import Enum @@ -19,10 +20,13 @@ class CacheType(Enum): DEFAULT_HOST = "localhost" DEFAULT_KEEP_ALIVE = 0 DEFAULT_OFFLINE_SOURCE_PATH: typing.Optional[str] = None +DEFAULT_OFFLINE_POLL_MS = 5000 DEFAULT_PORT_IN_PROCESS = 8015 DEFAULT_PORT_RPC = 8013 DEFAULT_RESOLVER_TYPE = ResolverType.RPC DEFAULT_RETRY_BACKOFF = 1000 +DEFAULT_RETRY_BACKOFF_MAX = 120000 +DEFAULT_RETRY_GRACE_ATTEMPTS = 5 DEFAULT_STREAM_DEADLINE = 600000 DEFAULT_TLS = False @@ -32,9 +36,12 @@ class CacheType(Enum): ENV_VAR_HOST = "FLAGD_HOST" ENV_VAR_KEEP_ALIVE_TIME_MS = "FLAGD_KEEP_ALIVE_TIME_MS" ENV_VAR_OFFLINE_FLAG_SOURCE_PATH = "FLAGD_OFFLINE_FLAG_SOURCE_PATH" +ENV_VAR_OFFLINE_POLL_MS = "FLAGD_OFFLINE_POLL_MS" ENV_VAR_PORT = "FLAGD_PORT" ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER" ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS" +ENV_VAR_RETRY_BACKOFF_MAX_MS = "FLAGD_RETRY_BACKOFF_MAX_MS" +ENV_VAR_RETRY_GRACE_ATTEMPTS = "FLAGD_RETRY_GRACE_ATTEMPTS" ENV_VAR_SELECTOR = "FLAGD_SELECTOR" ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS" ENV_VAR_TLS = "FLAGD_TLS" @@ -63,6 +70,7 @@ def env_or_default( return val if cast is None else cast(val) +@dataclasses.dataclass class Config: def __init__( # noqa: PLR0913 self, @@ -72,7 +80,10 @@ def __init__( # noqa: PLR0913 selector: typing.Optional[str] = None, resolver: typing.Optional[ResolverType] = None, offline_flag_source_path: typing.Optional[str] = None, + offline_poll_interval_ms: typing.Optional[int] = None, retry_backoff_ms: typing.Optional[int] = None, + retry_backoff_max_ms: typing.Optional[int] = None, + retry_grace_attempts: typing.Optional[int] = None, deadline_ms: typing.Optional[int] = None, stream_deadline_ms: typing.Optional[int] = None, keep_alive_time: typing.Optional[int] = None, @@ -96,6 +107,25 @@ def __init__( # noqa: PLR0913 if retry_backoff_ms is None else retry_backoff_ms ) + self.retry_backoff_max_ms: int = ( + int( + env_or_default( + ENV_VAR_RETRY_BACKOFF_MAX_MS, DEFAULT_RETRY_BACKOFF_MAX, cast=int + ) + ) + if retry_backoff_max_ms is None + else retry_backoff_max_ms + ) + + self.retry_grace_attempts: int = ( + int( + env_or_default( + ENV_VAR_RETRY_GRACE_ATTEMPTS, DEFAULT_RETRY_GRACE_ATTEMPTS, cast=int + ) + ) + if retry_grace_attempts is None + else retry_grace_attempts + ) self.resolver = ( env_or_default( @@ -125,6 +155,16 @@ def __init__( # noqa: PLR0913 else offline_flag_source_path ) + self.offline_poll_interval_ms: int = ( + int( + env_or_default( + ENV_VAR_OFFLINE_POLL_MS, DEFAULT_OFFLINE_POLL_MS, cast=int + ) + ) + if offline_poll_interval_ms is None + else offline_poll_interval_ms + ) + self.deadline_ms: int = ( int(env_or_default(ENV_VAR_DEADLINE_MS, DEFAULT_DEADLINE, cast=int)) if deadline_ms is None diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py index 6616457d..15e223bb 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py @@ -54,6 +54,8 @@ def __init__( # noqa: PLR0913 keep_alive_time: typing.Optional[int] = None, cache_type: typing.Optional[CacheType] = None, max_cache_size: typing.Optional[int] = None, + retry_backoff_max_ms: typing.Optional[int] = None, + retry_grace_attempts: typing.Optional[int] = None, ): """ Create an instance of the FlagdProvider @@ -83,6 +85,8 @@ def __init__( # noqa: PLR0913 tls=tls, deadline_ms=deadline, retry_backoff_ms=retry_backoff_ms, + retry_backoff_max_ms=retry_backoff_max_ms, + retry_grace_attempts=retry_grace_attempts, selector=selector, resolver=resolver_type, offline_flag_source_path=offline_flag_source_path, @@ -100,6 +104,7 @@ def setup_resolver(self) -> AbstractResolver: self.config, self.emit_provider_ready, self.emit_provider_error, + self.emit_provider_stale, self.emit_provider_configuration_changed, ) elif self.config.resolver == ResolverType.IN_PROCESS: diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py index 3c8e5550..3c260156 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py @@ -27,7 +27,6 @@ from ..config import CacheType, Config from ..flag_type import FlagType -from .protocol import AbstractResolver if typing.TYPE_CHECKING: from google.protobuf.message import Message @@ -37,16 +36,13 @@ logger = logging.getLogger("openfeature.contrib") -class GrpcResolver(AbstractResolver): - MAX_BACK_OFF = 120 - - MAX_BACK_OFF = 120 - +class GrpcResolver: def __init__( self, config: Config, emit_provider_ready: typing.Callable[[ProviderEventDetails], None], emit_provider_error: typing.Callable[[ProviderEventDetails], None], + emit_provider_stale: typing.Callable[[ProviderEventDetails], None], emit_provider_configuration_changed: typing.Callable[ [ProviderEventDetails], None ], @@ -54,6 +50,7 @@ def __init__( self.config = config self.emit_provider_ready = emit_provider_ready self.emit_provider_error = emit_provider_error + self.emit_provider_stale = emit_provider_stale self.emit_provider_configuration_changed = emit_provider_configuration_changed self.cache: typing.Optional[BaseCacheImpl] = ( LRUCache(maxsize=self.config.max_cache_size) @@ -62,6 +59,8 @@ def __init__( ) self.stub, self.channel = self._create_stub() self.retry_backoff_seconds = config.retry_backoff_ms * 0.001 + self.retry_backoff_max_seconds = config.retry_backoff_ms * 0.001 + self.retry_grace_attempts = config.retry_grace_attempts self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001 self.deadline = config.deadline_ms * 0.001 self.connected = False @@ -77,9 +76,6 @@ def _create_stub( ) stub = evaluation_pb2_grpc.ServiceStub(channel) - if self.cache: - self.cache.clear() - return stub, channel def initialize(self, evaluation_context: EvaluationContext) -> None: @@ -116,8 +112,10 @@ def listen(self) -> None: if self.streamline_deadline_seconds > 0 else {} ) + retry_counter = 0 while self.active: request = evaluation_pb2.EventStreamRequest() + try: logger.debug("Setting up gRPC sync flags connection") for message in self.stub.EventStream(request, **call_args): @@ -129,6 +127,7 @@ def listen(self) -> None: ) ) self.connected = True + retry_counter = 0 # reset retry delay after successsful read retry_delay = self.retry_backoff_seconds @@ -149,15 +148,37 @@ def listen(self) -> None: ) self.connected = False + self.handle_error(retry_counter, retry_delay) + + retry_delay = self.handle_retry(retry_counter, retry_delay) + + retry_counter = retry_counter + 1 + + def handle_retry(self, retry_counter: int, retry_delay: float) -> float: + if retry_counter == 0: + logger.info("gRPC sync disconnected, reconnecting immediately") + else: + logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s") + time.sleep(retry_delay) + retry_delay = min(1.1 * retry_delay, self.retry_backoff_max_seconds) + return retry_delay + + def handle_error(self, retry_counter: int, retry_delay: float) -> None: + if retry_counter == self.retry_grace_attempts: + if self.cache: + self.cache.clear() self.emit_provider_error( ProviderEventDetails( message=f"gRPC sync disconnected, reconnecting in {retry_delay}s", error_code=ErrorCode.GENERAL, ) ) - logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s") - time.sleep(retry_delay) - retry_delay = min(1.1 * retry_delay, self.MAX_BACK_OFF) + elif retry_counter == 1: + self.emit_provider_stale( + ProviderEventDetails( + message=f"gRPC sync disconnected, reconnecting in {retry_delay}s", + ) + ) def handle_changed_flags(self, data: typing.Any) -> None: changed_flags = list(data["flags"].keys()) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py index eeaf5218..41dc4eae 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py @@ -9,7 +9,7 @@ from openfeature.evaluation_context import EvaluationContext from openfeature.event import ProviderEventDetails from openfeature.exception import ErrorCode, ParseError, ProviderNotReadyError -from openfeature.schemas.protobuf.flagd.sync.v1 import ( # type:ignore[import-not-found] +from openfeature.schemas.protobuf.flagd.sync.v1 import ( sync_pb2, sync_pb2_grpc, ) @@ -22,8 +22,6 @@ class GrpcWatcher(FlagStateConnector): - MAX_BACK_OFF = 120 - def __init__( self, config: Config, @@ -36,6 +34,8 @@ def __init__( self.stub, self.channel = self.create_stub() self.retry_backoff_seconds = config.retry_backoff_ms * 0.001 + self.retry_backoff_max_seconds = config.retry_backoff_ms * 0.001 + self.retry_grace_attempts = config.retry_grace_attempts self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001 self.deadline = config.deadline_ms * 0.001 self.selector = config.selector @@ -85,9 +85,14 @@ def sync_flags(self) -> None: if self.streamline_deadline_seconds > 0 else {} ) + while self.active: try: - request = sync_pb2.SyncFlagsRequest(selector=self.selector) + request_args = ( + {"selector": self.selector} if self.selector is not None else {} + ) + request = sync_pb2.SyncFlagsRequest(**request_args) + logger.debug("Setting up gRPC sync flags connection") for flag_rsp in self.stub.SyncFlags(request, **call_args): flag_str = flag_rsp.flag_configuration @@ -130,4 +135,4 @@ def sync_flags(self) -> None: ) logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s") time.sleep(retry_delay) - retry_delay = min(1.1 * retry_delay, self.MAX_BACK_OFF) + retry_delay = min(1.1 * retry_delay, self.retry_backoff_max_seconds) diff --git a/providers/openfeature-provider-flagd/tests/e2e/steps.py b/providers/openfeature-provider-flagd/tests/e2e/steps.py index 74755204..6959f4f5 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/steps.py +++ b/providers/openfeature-provider-flagd/tests/e2e/steps.py @@ -597,7 +597,7 @@ def assert_handlers( ) ) def assert_handler_run(event_type: ProviderEvent, event_handles): - assert_handlers(event_handles, event_type, max_wait=6) + assert_handlers(event_handles, event_type, max_wait=30) @then( diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py b/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py index b99df2be..f56e82b7 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py +++ b/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py @@ -27,4 +27,5 @@ def image(): scenarios( f"{TEST_HARNESS_PATH}/gherkin/flagd-reconnect.feature", + f"{TEST_HARNESS_PATH}/gherkin/events.feature", )