Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flagd-rpc): add caching #110

Merged
merged 3 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions providers/openfeature-provider-flagd/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies = [
"panzi-json-logic>=1.0.1",
"semver>=3,<4",
"pyyaml>=6.0.1",
"cachebox"
toddbaert marked this conversation as resolved.
Show resolved Hide resolved
]
requires-python = ">=3.8"

Expand Down Expand Up @@ -59,6 +60,7 @@ cov = [
"cov-report",
]


[tool.hatch.envs.mypy]
dependencies = [
"mypy[faster-cache]>=1.13.0",
Expand Down
10 changes: 10 additions & 0 deletions providers/openfeature-provider-flagd/pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[pytest]
markers =
rpc: tests for rpc mode.
in-process: tests for rpc mode.
customCert: Supports custom certs.
unixsocket: Supports unixsockets.
events: Supports events.
sync: Supports sync.
caching: Supports caching.
offline: Supports offline.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
IN_PROCESS = "in-process"


class CacheType(Enum):
LRU = "lru"
DISABLED = "disabled"


DEFAULT_CACHE = CacheType.LRU
DEFAULT_CACHE_SIZE = 1000
DEFAULT_DEADLINE = 500
DEFAULT_HOST = "localhost"
DEFAULT_KEEP_ALIVE = 0
Expand All @@ -19,12 +26,14 @@
DEFAULT_STREAM_DEADLINE = 600000
DEFAULT_TLS = False

ENV_VAR_CACHE_SIZE = "FLAGD_MAX_CACHE_SIZE"
ENV_VAR_CACHE_TYPE = "FLAGD_CACHE"
ENV_VAR_DEADLINE_MS = "FLAGD_DEADLINE_MS"
ENV_VAR_HOST = "FLAGD_HOST"
ENV_VAR_KEEP_ALIVE_TIME_MS = "FLAGD_KEEP_ALIVE_TIME_MS"
ENV_VAR_OFFLINE_FLAG_SOURCE_PATH = "FLAGD_OFFLINE_FLAG_SOURCE_PATH"
ENV_VAR_PORT = "FLAGD_PORT"
ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER_TYPE"
ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER"
ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS"
ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS"
ENV_VAR_TLS = "FLAGD_TLS"
Expand All @@ -36,6 +45,14 @@
return val.lower() == "true"


def convert_resolver_type(val: typing.Union[str, ResolverType]) -> ResolverType:
if isinstance(val, str):
v = val.lower()
return ResolverType(v)
else:
return ResolverType(val)

Check warning on line 53 in providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py

View check run for this annotation

Codecov / codecov/patch

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py#L53

Added line #L53 was not covered by tests


def env_or_default(
env_var: str, default: T, cast: typing.Optional[typing.Callable[[str], T]] = None
) -> typing.Union[str, T]:
Expand All @@ -56,7 +73,9 @@
retry_backoff_ms: typing.Optional[int] = None,
deadline: typing.Optional[int] = None,
stream_deadline_ms: typing.Optional[int] = None,
keep_alive_time: typing.Optional[int] = None,
keep_alive: typing.Optional[int] = None,
cache_type: typing.Optional[CacheType] = None,
max_cache_size: typing.Optional[int] = None,
):
self.host = env_or_default(ENV_VAR_HOST, DEFAULT_HOST) if host is None else host

Expand All @@ -77,7 +96,9 @@
)

self.resolver_type = (
ResolverType(env_or_default(ENV_VAR_RESOLVER_TYPE, DEFAULT_RESOLVER_TYPE))
env_or_default(
ENV_VAR_RESOLVER_TYPE, DEFAULT_RESOLVER_TYPE, cast=convert_resolver_type
)
if resolver_type is None
else resolver_type
)
Expand Down Expand Up @@ -118,10 +139,22 @@
else stream_deadline_ms
)

self.keep_alive_time: int = (
self.keep_alive: int = (
int(
env_or_default(ENV_VAR_KEEP_ALIVE_TIME_MS, DEFAULT_KEEP_ALIVE, cast=int)
)
if keep_alive_time is None
else keep_alive_time
if keep_alive is None
else keep_alive
)

self.cache_type = (
CacheType(env_or_default(ENV_VAR_CACHE_TYPE, DEFAULT_CACHE))
if cache_type is None
else cache_type
)

self.max_cache_size: int = (
int(env_or_default(ENV_VAR_CACHE_SIZE, DEFAULT_CACHE_SIZE, cast=int))
if max_cache_size is None
else max_cache_size
)
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from openfeature.provider.metadata import Metadata
from openfeature.provider.provider import AbstractProvider

from .config import Config, ResolverType
from .config import CacheType, Config, ResolverType
from .resolvers import AbstractResolver, GrpcResolver, InProcessResolver

T = typing.TypeVar("T")
Expand All @@ -50,6 +50,8 @@ def __init__( # noqa: PLR0913
offline_flag_source_path: typing.Optional[str] = None,
stream_deadline_ms: typing.Optional[int] = None,
keep_alive_time: typing.Optional[int] = None,
cache_type: typing.Optional[CacheType] = None,
max_cache_size: typing.Optional[int] = None,
):
"""
Create an instance of the FlagdProvider
Expand Down Expand Up @@ -82,7 +84,9 @@ def __init__( # noqa: PLR0913
resolver_type=resolver_type,
offline_flag_source_path=offline_flag_source_path,
stream_deadline_ms=stream_deadline_ms,
keep_alive_time=keep_alive_time,
keep_alive=keep_alive_time,
cache_type=cache_type,
max_cache_size=max_cache_size,
)

self.resolver = self.setup_resolver()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import typing

import grpc
from cachebox import BaseCacheImpl, LRUCache
from google.protobuf.json_format import MessageToDict
from google.protobuf.struct_pb2 import Struct

Expand All @@ -18,13 +19,13 @@
ProviderNotReadyError,
TypeMismatchError,
)
from openfeature.flag_evaluation import FlagResolutionDetails
from openfeature.flag_evaluation import FlagResolutionDetails, Reason
from openfeature.schemas.protobuf.flagd.evaluation.v1 import (
evaluation_pb2,
evaluation_pb2_grpc,
)

from ..config import Config
from ..config import CacheType, Config
from ..flag_type import FlagType

if typing.TYPE_CHECKING:
Expand All @@ -51,6 +52,11 @@ def __init__(
self.emit_provider_ready = emit_provider_ready
self.emit_provider_error = emit_provider_error
self.emit_provider_configuration_changed = emit_provider_configuration_changed
self.cache: typing.Optional[BaseCacheImpl] = (
LRUCache(maxsize=self.config.max_cache_size)
if self.config.cache_type == CacheType.LRU
else None
)
self.stub, self.channel = self._create_stub()
self.retry_backoff_seconds = config.retry_backoff_ms * 0.001
self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001
Expand All @@ -64,9 +70,13 @@ def _create_stub(
channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel
channel = channel_factory(
f"{config.host}:{config.port}",
options=(("grpc.keepalive_time_ms", config.keep_alive_time),),
options=(("grpc.keepalive_time_ms", config.keep_alive),),
)
stub = evaluation_pb2_grpc.ServiceStub(channel)

if self.cache:
self.cache.clear()

return stub, channel

def initialize(self, evaluation_context: EvaluationContext) -> None:
Expand All @@ -75,6 +85,8 @@ def initialize(self, evaluation_context: EvaluationContext) -> None:
def shutdown(self) -> None:
self.active = False
self.channel.close()
if self.cache:
self.cache.clear()

def connect(self) -> None:
self.active = True
Expand All @@ -96,7 +108,6 @@ def connect(self) -> None:

def listen(self) -> None:
retry_delay = self.retry_backoff_seconds

call_args = (
{"timeout": self.streamline_deadline_seconds}
if self.streamline_deadline_seconds > 0
Expand Down Expand Up @@ -148,6 +159,10 @@ def listen(self) -> None:
def handle_changed_flags(self, data: typing.Any) -> None:
changed_flags = list(data["flags"].keys())

if self.cache:
for flag in changed_flags:
self.cache.pop(flag)

self.emit_provider_configuration_changed(ProviderEventDetails(changed_flags))

def resolve_boolean_details(
Expand Down Expand Up @@ -190,13 +205,18 @@ def resolve_object_details(
) -> FlagResolutionDetails[typing.Union[dict, list]]:
return self._resolve(key, FlagType.OBJECT, default_value, evaluation_context)

def _resolve( # noqa: PLR0915
def _resolve( # noqa: PLR0915 C901
self,
flag_key: str,
flag_type: FlagType,
default_value: T,
evaluation_context: typing.Optional[EvaluationContext],
) -> FlagResolutionDetails[T]:
if self.cache is not None and flag_key in self.cache:
cached_flag: FlagResolutionDetails[T] = self.cache[flag_key]
cached_flag.reason = Reason.CACHED
return cached_flag

context = self._convert_context(evaluation_context)
call_args = {"timeout": self.deadline}
try:
Expand Down Expand Up @@ -249,12 +269,17 @@ def _resolve( # noqa: PLR0915
raise GeneralError(message) from e

# Got a valid flag and valid type. Return it.
return FlagResolutionDetails(
result = FlagResolutionDetails(
value=value,
reason=response.reason,
variant=response.variant,
)

if response.reason == Reason.STATIC and self.cache is not None:
self.cache.insert(flag_key, result)

return result

def _convert_context(
self, evaluation_context: typing.Optional[EvaluationContext]
) -> Struct:
Expand Down
9 changes: 9 additions & 0 deletions providers/openfeature-provider-flagd/tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@
SPEC_PATH = "../../openfeature/spec"


# running all gherkin tests, except the ones, not implemented
def pytest_collection_modifyitems(config):
marker = "not customCert and not unixsocket and not sync"

# this seems to not work with python 3.8
if hasattr(config.option, "markexpr") and config.option.markexpr == "":
config.option.markexpr = marker


@pytest.fixture(autouse=True, scope="module")
def setup(request, port, image):
container: DockerContainer = FlagdContainer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
class FlagdContainer(DockerContainer):
def __init__(
self,
image: str = "ghcr.io/open-feature/flagd-testbed:v0.5.13",
image: str = "ghcr.io/open-feature/flagd-testbed:v0.5.15",
port: int = 8013,
**kwargs,
) -> None:
Expand Down
10 changes: 10 additions & 0 deletions providers/openfeature-provider-flagd/tests/e2e/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ def setup_key_and_default(
return (key, default)


@when(
parsers.cfparse(
'a string flag with key "{key}" is evaluated with details',
),
target_fixture="key_and_default",
)
def setup_key_without_default(key: str) -> typing.Tuple[str, JsonPrimitive]:
return setup_key_and_default(key, "")


@when(
parsers.cfparse(
'an object flag with key "{key}" is evaluated with a null default value',
Expand Down
Loading
Loading