Skip to content

Commit

Permalink
Merge pull request #5 from robhowley/rh-async-dynamo
Browse files Browse the repository at this point in the history
feat: online_read_async for dynamodb
  • Loading branch information
robhowley authored May 31, 2024
2 parents f5a37c1 + d7a982f commit 3db88cb
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 35 deletions.
128 changes: 95 additions & 33 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

try:
import boto3
from aiobotocore import session
from botocore.config import Config
from botocore.exceptions import ClientError
except ImportError as e:
Expand Down Expand Up @@ -80,6 +81,7 @@ class DynamoDBOnlineStore(OnlineStore):

_dynamodb_client = None
_dynamodb_resource = None
_aioboto_session = None

def update(
self,
Expand Down Expand Up @@ -206,38 +208,11 @@ def online_write_batch(
)
self._write_batch_non_duplicates(table_instance, data, progress, config)

def online_read(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
def _read_batches(
self, online_config, entity_ids, table_name, table_client
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Retrieve feature values from the online DynamoDB store.
Args:
config: The RepoConfig for the current FeatureStore.
table: Feast FeatureView.
entity_keys: a list of entity keys that should be read from the FeatureStore.
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
)
table_instance = dynamodb_resource.Table(
_get_table_name(online_config, config, table)
)

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
entity_ids = [
compute_entity_id(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
for entity_key in entity_keys
]

batch_size = online_config.batch_size
entity_ids_iter = iter(entity_ids)
while True:
Expand All @@ -249,16 +224,16 @@ def online_read(
if len(batch) == 0:
break
batch_entity_ids = {
table_instance.name: {
table_name: {
"Keys": [{"entity_id": entity_id} for entity_id in batch],
"ConsistentRead": online_config.consistent_reads,
}
}
response = dynamodb_resource.batch_get_item(
response = table_client.batch_get_item(
RequestItems=batch_entity_ids,
)
response = response.get("Responses")
table_responses = response.get(table_instance.name)
table_responses = response.get(table_name)
if table_responses:
table_responses = self._sort_dynamodb_response(
table_responses, entity_ids
Expand Down Expand Up @@ -286,6 +261,93 @@ def online_read(
result.extend(batch_result)
return result

def online_read(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Retrieve feature values from the online DynamoDB store.
Args:
config: The RepoConfig for the current FeatureStore.
table: Feast FeatureView.
entity_keys: a list of entity keys that should be read from the FeatureStore.
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
)
table_instance = dynamodb_resource.Table(
_get_table_name(online_config, config, table)
)

entity_ids = [
compute_entity_id(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
for entity_key in entity_keys
]

return self._read_batches(
online_config,
entity_ids,
table_instance.name,
dynamodb_resource,
)

async def online_read_async(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Reads features values for the given entity keys asynchronously.
Args:
config: The config for the current feature store.
table: The feature view whose feature values should be read.
entity_keys: The list of entity keys for which feature values should be read.
requested_features: The list of features that should be read.
Returns:
A list of the same length as entity_keys. Each item in the list is a tuple where the first
item is the event timestamp for the row, and the second item is a dict mapping feature names
to values, which are returned in proto format.
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)

entity_ids = [
compute_entity_id(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
for entity_key in entity_keys
]

async with self._get_aiodynamodb_client(online_config.region) as client:
return self._read_batches(
online_config,
entity_ids,
_get_table_name(online_config, config, table),
client,
)

def _get_aioboto_session(self):
if self._aioboto_session is None:
self._aioboto_session = session.get_session()
return self._aioboto_session

def _get_aiodynamodb_client(self, region: str):
return self._get_aioboto_session().create_client("dynamodb", region_name=region)

def _get_dynamodb_client(self, region: str, endpoint_url: Optional[str] = None):
if self._dynamodb_client is None:
self._dynamodb_client = _initialize_dynamodb_client(region, endpoint_url)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ def test_online_retrieval_with_event_timestamps(environment, universal_data_sour


@pytest.mark.integration
@pytest.mark.universal_online_stores(only=["redis"])
@pytest.mark.universal_online_stores(only=["redis", "dynamodb"])
def test_async_online_retrieval_with_event_timestamps(
environment, universal_data_sources
):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
"hiredis>=2.0.0,<3",
]

AWS_REQUIRED = ["boto3>=1.17.0,<2", "docker>=5.0.2", "fsspec<=2024.1.0"]
AWS_REQUIRED = ["boto3>=1.17.0,<2", "docker>=5.0.2", "fsspec<=2024.1.0", "aiobotocore>=2.13.0"]

KUBERNETES_REQUIRED = ["kubernetes<=20.13.0"]

Expand Down

0 comments on commit 3db88cb

Please sign in to comment.