Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Support for DynamodbOnlineStoreConfig endpoint_url parameter #2485

Merged
65 changes: 43 additions & 22 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple

from pydantic import StrictStr
from pydantic.typing import Literal
from pydantic.typing import Literal, Union

from feast import Entity, FeatureView, utils
from feast.infra.infra_object import DYNAMODB_INFRA_OBJECT_CLASS_TYPE, InfraObject
Expand Down Expand Up @@ -50,17 +50,20 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
type: Literal["dynamodb"] = "dynamodb"
"""Online store type selector"""

batch_size: int = 40
"""Number of items to retrieve in a DynamoDB BatchGetItem call."""

endpoint_url: Union[str, None] = None
"""DynamoDB local development endpoint Url, i.e. http://localhost:8000"""

region: StrictStr
"""AWS Region Name"""

table_name_template: StrictStr = "{project}.{table_name}"
"""DynamoDB table name template"""

sort_response: bool = True
"""Whether or not to sort BatchGetItem response."""

batch_size: int = 40
"""Number of items to retrieve in a DynamoDB BatchGetItem call."""
table_name_template: StrictStr = "{project}.{table_name}"
"""DynamoDB table name template"""


class DynamoDBOnlineStore(OnlineStore):
Expand Down Expand Up @@ -95,8 +98,12 @@ def update(
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_client = self._get_dynamodb_client(online_config.region)
dynamodb_resource = self._get_dynamodb_resource(online_config.region)
dynamodb_client = self._get_dynamodb_client(
online_config.region, online_config.endpoint_url
)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
)

for table_instance in tables_to_keep:
try:
Expand Down Expand Up @@ -141,7 +148,9 @@ def teardown(
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_resource = self._get_dynamodb_resource(online_config.region)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
)

for table in tables:
_delete_table_idempotent(
Expand Down Expand Up @@ -175,7 +184,9 @@ def online_write_batch(
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_resource = self._get_dynamodb_resource(online_config.region)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
)

table_instance = dynamodb_resource.Table(
_get_table_name(online_config, config, table)
Expand Down Expand Up @@ -217,7 +228,9 @@ def online_read(
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_resource = self._get_dynamodb_resource(online_config.region)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
)
table_instance = dynamodb_resource.Table(
_get_table_name(online_config, config, table)
)
Expand Down Expand Up @@ -260,14 +273,16 @@ def online_read(
result.extend(batch_size_nones)
return result

def _get_dynamodb_client(self, region: str):
def _get_dynamodb_client(self, region: str, endpoint_url: Optional[str] = None):
if self._dynamodb_client is None:
self._dynamodb_client = _initialize_dynamodb_client(region)
self._dynamodb_client = _initialize_dynamodb_client(region, endpoint_url)
return self._dynamodb_client

def _get_dynamodb_resource(self, region: str):
def _get_dynamodb_resource(self, region: str, endpoint_url: Optional[str] = None):
if self._dynamodb_resource is None:
self._dynamodb_resource = _initialize_dynamodb_resource(region)
self._dynamodb_resource = _initialize_dynamodb_resource(
region, endpoint_url
)
return self._dynamodb_resource

def _sort_dynamodb_response(self, responses: list, order: list):
Expand All @@ -285,12 +300,12 @@ def _sort_dynamodb_response(self, responses: list, order: list):
return table_responses_ordered


def _initialize_dynamodb_client(region: str):
return boto3.client("dynamodb", region_name=region)
def _initialize_dynamodb_client(region: str, endpoint_url: Optional[str] = None):
return boto3.client("dynamodb", region_name=region, endpoint_url=endpoint_url)


def _initialize_dynamodb_resource(region: str):
return boto3.resource("dynamodb", region_name=region)
def _initialize_dynamodb_resource(region: str, endpoint_url: Optional[str] = None):
return boto3.resource("dynamodb", region_name=region, endpoint_url=endpoint_url)


# TODO(achals): This form of user-facing templating is experimental.
Expand Down Expand Up @@ -362,8 +377,12 @@ def from_proto(dynamodb_table_proto: DynamoDBTableProto) -> Any:
)

def update(self):
dynamodb_client = _initialize_dynamodb_client(region=self.region)
dynamodb_resource = _initialize_dynamodb_resource(region=self.region)
dynamodb_client = _initialize_dynamodb_client(
region=self.region, endpoint_url=None
)
dynamodb_resource = _initialize_dynamodb_resource(
region=self.region, endpoint_url=None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the endpoint_url None here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@achals my mistake, should we add it as an attribute to DynamoDBTable ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with how the user interacts with the DynamoDBTable class, and how can the user configure it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add it for consistency, it's not currently used right now but will be used as part of upcoming features.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

)

try:
dynamodb_resource.create_table(
Expand All @@ -384,5 +403,7 @@ def update(self):
dynamodb_client.get_waiter("table_exists").wait(TableName=f"{self.name}")

def teardown(self):
dynamodb_resource = _initialize_dynamodb_resource(region=self.region)
dynamodb_resource = _initialize_dynamodb_resource(
region=self.region, endpoint_url=None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made some changes based on your suggestions, if you don't mind giving it a second check, please.

)
_delete_table_idempotent(dynamodb_resource, self.name)
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,70 @@ def repo_config():
)


def test_online_store_config_default():
"""Test DynamoDBOnlineStoreConfig default parameters."""
aws_region = "us-west-2"
dynamodb_store_config = DynamoDBOnlineStoreConfig(region=aws_region)
assert dynamodb_store_config.type == "dynamodb"
assert dynamodb_store_config.batch_size == 40
assert dynamodb_store_config.endpoint_url is None
assert dynamodb_store_config.region == aws_region
assert dynamodb_store_config.sort_response is True
assert dynamodb_store_config.table_name_template == "{project}.{table_name}"


def test_online_store_config_custom_params():
"""Test DynamoDBOnlineStoreConfig custom parameters."""
aws_region = "us-west-2"
batch_size = 20
endpoint_url = "http://localhost:8000"
sort_response = False
table_name_template = "feast_test.dynamodb_table"
dynamodb_store_config = DynamoDBOnlineStoreConfig(
region=aws_region,
batch_size=batch_size,
endpoint_url=endpoint_url,
sort_response=sort_response,
table_name_template=table_name_template,
)
assert dynamodb_store_config.type == "dynamodb"
assert dynamodb_store_config.batch_size == batch_size
assert dynamodb_store_config.endpoint_url == endpoint_url
assert dynamodb_store_config.region == aws_region
assert dynamodb_store_config.sort_response == sort_response
assert dynamodb_store_config.table_name_template == table_name_template


def test_online_store_config_dynamodb_client():
"""Test DynamoDBOnlineStoreConfig configure DynamoDB client with endpoint_url."""
aws_region = "us-west-2"
endpoint_url = "http://localhost:8000"
dynamodb_store = DynamoDBOnlineStore()
dynamodb_store_config = DynamoDBOnlineStoreConfig(
region=aws_region, endpoint_url=endpoint_url
)
dynamodb_client = dynamodb_store._get_dynamodb_client(
dynamodb_store_config.region, dynamodb_store_config.endpoint_url
)
assert dynamodb_client.meta.region_name == aws_region
assert dynamodb_client.meta.endpoint_url == endpoint_url


def test_online_store_config_dynamodb_resource():
"""Test DynamoDBOnlineStoreConfig configure DynamoDB Resource with endpoint_url."""
aws_region = "us-west-2"
endpoint_url = "http://localhost:8000"
dynamodb_store = DynamoDBOnlineStore()
dynamodb_store_config = DynamoDBOnlineStoreConfig(
region=aws_region, endpoint_url=endpoint_url
)
dynamodb_resource = dynamodb_store._get_dynamodb_resource(
dynamodb_store_config.region, dynamodb_store_config.endpoint_url
)
assert dynamodb_resource.meta.client.meta.region_name == aws_region
assert dynamodb_resource.meta.client.meta.endpoint_url == endpoint_url


@mock_dynamodb2
@pytest.mark.parametrize("n_samples", [5, 50, 100])
def test_online_read(repo_config, n_samples):
Expand Down