Skip to content

Commit

Permalink
feat: in-process offline flagd resolver (#74)
Browse files Browse the repository at this point in the history
Signed-off-by: Cole Bailey <cole.bailey@deliveryhero.com>
Co-authored-by: Michael Beemer <beeme1mr@users.noreply.github.com>
  • Loading branch information
colebaileygit and beeme1mr authored Apr 11, 2024
1 parent 3e5f850 commit 8cea506
Show file tree
Hide file tree
Showing 40 changed files with 1,437 additions and 95 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[submodule "schemas"]
path = providers/openfeature-provider-flagd/schemas
url = https://github.com/open-feature/schemas
[submodule "providers/openfeature-provider-flagd/test-harness"]
path = providers/openfeature-provider-flagd/test-harness
url = git@github.com:open-feature/flagd-testbed.git
5 changes: 5 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@ repos:
rev: v1.9.0
hooks:
- id: mypy
args: [--python-version=3.8]
additional_dependencies:
- openfeature-sdk>=0.4.0
- opentelemetry-api
- types-protobuf
- types-PyYAML
- mmh3
- semver
- panzi-json-logic
exclude: proto|tests
13 changes: 13 additions & 0 deletions providers/openfeature-provider-flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ from openfeature.contrib.provider.flagd import FlagdProvider
api.set_provider(FlagdProvider())
```

To use in-process evaluation in offline mode with a file as source:

```python
from openfeature import api
from openfeature.contrib.provider.flagd import FlagdProvider
from openfeature.contrib.provider.flagd.config import ResolverType

api.set_provider(FlagdProvider(
resolver_type=ResolverType.IN_PROCESS,
offline_flag_source_path="my-flag.json",
))
```

### Configuration options

The default options can be defined in the FlagdProvider constructor.
Expand Down
7 changes: 7 additions & 0 deletions providers/openfeature-provider-flagd/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ dependencies = [
"openfeature-sdk>=0.4.0",
"grpcio>=1.60.0",
"protobuf>=4.25.2",
"mmh3>=4.1.0",
"panzi-json-logic>=1.0.1",
"semver>=3,<4",
"pyyaml>=6.0.1",
]
requires-python = ">=3.8"

Expand All @@ -32,6 +36,7 @@ Homepage = "https://github.com/open-feature/python-sdk-contrib"
dependencies = [
"coverage[toml]>=6.5",
"pytest",
"pytest-bdd",
]
post-install-commands = [
"./scripts/gen_protos.sh"
Expand All @@ -42,6 +47,7 @@ test = "pytest {args:tests}"
test-cov = "coverage run -m pytest {args:tests}"
cov-report = [
"coverage xml",
"coverage html",
]
cov = [
"test-cov",
Expand All @@ -61,4 +67,5 @@ packages = ["src/openfeature"]
omit = [
# exclude generated files
"src/openfeature/contrib/provider/flagd/proto/*",
"tests/**",
]
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import typing
from enum import Enum

T = typing.TypeVar("T")

Expand All @@ -17,13 +18,21 @@ def env_or_default(
return val if cast is None else cast(val)


class ResolverType(Enum):
GRPC = "grpc"
IN_PROCESS = "in-process"


class Config:
def __init__(
def __init__( # noqa: PLR0913
self,
host: typing.Optional[str] = None,
port: typing.Optional[int] = None,
tls: typing.Optional[bool] = None,
timeout: typing.Optional[int] = None,
resolver_type: typing.Optional[ResolverType] = None,
offline_flag_source_path: typing.Optional[str] = None,
offline_poll_interval_seconds: typing.Optional[float] = None,
):
self.host = env_or_default("FLAGD_HOST", "localhost") if host is None else host
self.port = (
Expand All @@ -33,3 +42,18 @@ def __init__(
env_or_default("FLAGD_TLS", False, cast=str_to_bool) if tls is None else tls
)
self.timeout = 5 if timeout is None else timeout
self.resolver_type = (
ResolverType(env_or_default("FLAGD_RESOLVER_TYPE", "grpc"))
if resolver_type is None
else resolver_type
)
self.offline_flag_source_path = (
env_or_default("FLAGD_OFFLINE_FLAG_SOURCE_PATH", None)
if offline_flag_source_path is None
else offline_flag_source_path
)
self.offline_poll_interval_seconds = (
float(env_or_default("FLAGD_OFFLINE_POLL_INTERVAL_SECONDS", 1.0))
if offline_poll_interval_seconds is None
else offline_poll_interval_seconds
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,29 @@

import typing

import grpc
from google.protobuf.struct_pb2 import Struct

from openfeature.evaluation_context import EvaluationContext
from openfeature.exception import (
FlagNotFoundError,
GeneralError,
InvalidContextError,
ParseError,
TypeMismatchError,
)
from openfeature.flag_evaluation import FlagResolutionDetails
from openfeature.provider.metadata import Metadata
from openfeature.provider.provider import AbstractProvider

from .config import Config
from .flag_type import FlagType
from .proto.schema.v1 import schema_pb2, schema_pb2_grpc
from .config import Config, ResolverType
from .resolvers import AbstractResolver, GrpcResolver, InProcessResolver

T = typing.TypeVar("T")


class FlagdProvider(AbstractProvider):
"""Flagd OpenFeature Provider"""

def __init__(
def __init__( # noqa: PLR0913
self,
host: typing.Optional[str] = None,
port: typing.Optional[int] = None,
tls: typing.Optional[bool] = None,
timeout: typing.Optional[int] = None,
resolver_type: typing.Optional[ResolverType] = None,
offline_flag_source_path: typing.Optional[str] = None,
offline_poll_interval_seconds: typing.Optional[float] = None,
):
"""
Create an instance of the FlagdProvider
Expand All @@ -68,14 +60,26 @@ def __init__(
port=port,
tls=tls,
timeout=timeout,
resolver_type=resolver_type,
offline_flag_source_path=offline_flag_source_path,
offline_poll_interval_seconds=offline_poll_interval_seconds,
)

channel_factory = grpc.secure_channel if tls else grpc.insecure_channel
self.channel = channel_factory(f"{self.config.host}:{self.config.port}")
self.stub = schema_pb2_grpc.ServiceStub(self.channel)
self.resolver = self.setup_resolver()

def setup_resolver(self) -> AbstractResolver:
if self.config.resolver_type == ResolverType.GRPC:
return GrpcResolver(self.config)
elif self.config.resolver_type == ResolverType.IN_PROCESS:
return InProcessResolver(self.config, self)
else:
raise ValueError(
f"`resolver_type` parameter invalid: {self.config.resolver_type}"
)

def shutdown(self) -> None:
self.channel.close()
if self.resolver:
self.resolver.shutdown()

def get_metadata(self) -> Metadata:
"""Returns provider metadata"""
Expand All @@ -87,108 +91,46 @@ def resolve_boolean_details(
default_value: bool,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[bool]:
return self._resolve(key, FlagType.BOOLEAN, default_value, evaluation_context)
return self.resolver.resolve_boolean_details(
key, default_value, evaluation_context
)

def resolve_string_details(
self,
key: str,
default_value: str,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[str]:
return self._resolve(key, FlagType.STRING, default_value, evaluation_context)
return self.resolver.resolve_string_details(
key, default_value, evaluation_context
)

def resolve_float_details(
self,
key: str,
default_value: float,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[float]:
return self._resolve(key, FlagType.FLOAT, default_value, evaluation_context)
return self.resolver.resolve_float_details(
key, default_value, evaluation_context
)

def resolve_integer_details(
self,
key: str,
default_value: int,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[int]:
return self._resolve(key, FlagType.INTEGER, default_value, evaluation_context)
return self.resolver.resolve_integer_details(
key, default_value, evaluation_context
)

def resolve_object_details(
self,
key: str,
default_value: typing.Union[dict, list],
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[typing.Union[dict, list]]:
return self._resolve(key, FlagType.OBJECT, default_value, evaluation_context)

def _resolve(
self,
flag_key: str,
flag_type: FlagType,
default_value: T,
evaluation_context: typing.Optional[EvaluationContext],
) -> FlagResolutionDetails[T]:
context = self._convert_context(evaluation_context)
call_args = {"timeout": self.config.timeout}
try:
if flag_type == FlagType.BOOLEAN:
request = schema_pb2.ResolveBooleanRequest( # type:ignore[attr-defined]
flag_key=flag_key, context=context
)
response = self.stub.ResolveBoolean(request, **call_args)
elif flag_type == FlagType.STRING:
request = schema_pb2.ResolveStringRequest( # type:ignore[attr-defined]
flag_key=flag_key, context=context
)
response = self.stub.ResolveString(request, **call_args)
elif flag_type == FlagType.OBJECT:
request = schema_pb2.ResolveObjectRequest( # type:ignore[attr-defined]
flag_key=flag_key, context=context
)
response = self.stub.ResolveObject(request, **call_args)
elif flag_type == FlagType.FLOAT:
request = schema_pb2.ResolveFloatRequest( # type:ignore[attr-defined]
flag_key=flag_key, context=context
)
response = self.stub.ResolveFloat(request, **call_args)
elif flag_type == FlagType.INTEGER:
request = schema_pb2.ResolveIntRequest( # type:ignore[attr-defined]
flag_key=flag_key, context=context
)
response = self.stub.ResolveInt(request, **call_args)
else:
raise ValueError(f"Unknown flag type: {flag_type}")

except grpc.RpcError as e:
code = e.code()
message = f"received grpc status code {code}"

if code == grpc.StatusCode.NOT_FOUND:
raise FlagNotFoundError(message) from e
elif code == grpc.StatusCode.INVALID_ARGUMENT:
raise TypeMismatchError(message) from e
elif code == grpc.StatusCode.DATA_LOSS:
raise ParseError(message) from e
raise GeneralError(message) from e

# Got a valid flag and valid type. Return it.
return FlagResolutionDetails(
value=response.value,
reason=response.reason,
variant=response.variant,
return self.resolver.resolve_object_details(
key, default_value, evaluation_context
)

def _convert_context(
self, evaluation_context: typing.Optional[EvaluationContext]
) -> Struct:
s = Struct()
if evaluation_context:
try:
s["targetingKey"] = evaluation_context.targeting_key
s.update(evaluation_context.attributes)
except ValueError as exc:
message = (
"could not serialize evaluation context to google.protobuf.Struct"
)
raise InvalidContextError(message) from exc
return s
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import typing

from typing_extensions import Protocol

from openfeature.evaluation_context import EvaluationContext
from openfeature.flag_evaluation import FlagResolutionDetails

from .grpc import GrpcResolver
from .in_process import InProcessResolver


class AbstractResolver(Protocol):
def shutdown(self) -> None: ...

def resolve_boolean_details(
self,
key: str,
default_value: bool,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[bool]: ...

def resolve_string_details(
self,
key: str,
default_value: str,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[str]: ...

def resolve_float_details(
self,
key: str,
default_value: float,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[float]: ...

def resolve_integer_details(
self,
key: str,
default_value: int,
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[int]: ...

def resolve_object_details(
self,
key: str,
default_value: typing.Union[dict, list],
evaluation_context: typing.Optional[EvaluationContext] = None,
) -> FlagResolutionDetails[typing.Union[dict, list]]: ...


__all__ = ["AbstractResolver", "GrpcResolver", "InProcessResolver"]
Loading

0 comments on commit 8cea506

Please sign in to comment.