Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache dynamodb client and resource in DynamoDB online store implement… #2138

Merged
merged 1 commit into from
Dec 14, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 27 additions & 11 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class DynamoDBOnlineStore(OnlineStore):
Online feature store for AWS DynamoDB.
"""

_dynamodb_client = None
_dynamodb_resource = None

@log_exceptions_and_usage(online_store="dynamodb")
def update(
self,
Expand All @@ -70,7 +73,8 @@ def update(
):
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_client, dynamodb_resource = _initialize_dynamodb(online_config.region)
dynamodb_client = self._get_dynamodb_client(online_config.region)
dynamodb_resource = self._get_dynamodb_resource(online_config.region)

for table_instance in tables_to_keep:
try:
Expand Down Expand Up @@ -105,7 +109,7 @@ def teardown(
):
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
_, dynamodb_resource = _initialize_dynamodb(online_config.region)
dynamodb_resource = self._get_dynamodb_resource(online_config.region)

for table in tables:
_delete_table_idempotent(dynamodb_resource, table.name)
Expand All @@ -122,7 +126,7 @@ def online_write_batch(
) -> None:
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
_, dynamodb_resource = _initialize_dynamodb(online_config.region)
dynamodb_resource = self._get_dynamodb_resource(online_config.region)

table_instance = dynamodb_resource.Table(f"{config.project}.{table.name}")
with table_instance.batch_writer() as batch:
Expand Down Expand Up @@ -151,7 +155,7 @@ def online_read(
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
_, dynamodb_resource = _initialize_dynamodb(online_config.region)
dynamodb_resource = self._get_dynamodb_resource(online_config.region)

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for entity_key in entity_keys:
Expand All @@ -172,12 +176,23 @@ def online_read(
result.append((None, None))
return result

def _get_dynamodb_client(self, region: str):
if self._dynamodb_client is None:
self._dynamodb_client = _initialize_dynamodb_client(region)
return self._dynamodb_client

def _get_dynamodb_resource(self, region: str):
if self._dynamodb_resource is None:
self._dynamodb_resource = _initialize_dynamodb_resource(region)
return self._dynamodb_resource


def _initialize_dynamodb_client(region: str):
return boto3.client("dynamodb", region_name=region)


def _initialize_dynamodb(region: str):
return (
boto3.client("dynamodb", region_name=region),
boto3.resource("dynamodb", region_name=region),
)
def _initialize_dynamodb_resource(region: str):
return boto3.resource("dynamodb", region_name=region)


def _delete_table_idempotent(
Expand Down Expand Up @@ -231,7 +246,8 @@ def from_proto(infra_object_proto: InfraObjectProto) -> Any:
)

def update(self):
dynamodb_client, dynamodb_resource = _initialize_dynamodb(self.region)
dynamodb_client = _initialize_dynamodb_client(region=self.region)
dynamodb_resource = _initialize_dynamodb_resource(region=self.region)

try:
dynamodb_resource.create_table(
Expand All @@ -252,5 +268,5 @@ def update(self):
dynamodb_client.get_waiter("table_exists").wait(TableName=f"{self.name}")

def teardown(self):
_, dynamodb_resource = _initialize_dynamodb(self.region)
dynamodb_resource = _initialize_dynamodb_resource(region=self.region)
_delete_table_idempotent(dynamodb_resource, self.name)