Skip to content

Commit

Permalink
init and close methods in interface
Browse files Browse the repository at this point in the history
  • Loading branch information
robhowley committed Oct 17, 2024
1 parent 17773f0 commit 2822fd2
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 35 deletions.
8 changes: 8 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2078,6 +2078,14 @@ def list_saved_datasets(
self.project, allow_cache=allow_cache, tags=tags
)

async def initialize(self) -> None:
"""Initialize long-lived clients and/or resources needed for accessing datastores"""
await self._get_provider().initialize(self.config)

async def close(self) -> None:
"""Cleanup any long-lived clients and/or resources"""
await self._get_provider().close()


def _print_materialization_log(
start_date, end_date, num_feature_views: int, online_store: str
Expand Down
6 changes: 6 additions & 0 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,3 +388,9 @@ def get_table_column_names_and_types_from_data_source(
data_source: DataSource object
"""
return data_source.get_table_column_names_and_types(config=config)

async def initialize(self, config: RepoConfig) -> None:
pass

async def close(self) -> None:
pass
68 changes: 33 additions & 35 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,25 +91,14 @@ class DynamoDBOnlineStore(OnlineStore):

_dynamodb_client = None
_dynamodb_resource = None
_aioboto_client = None

@staticmethod
async def initialize(config: RepoConfig):
if not DynamoDBOnlineStore._aioboto_client:
context_stack = contextlib.AsyncExitStack()
DynamoDBOnlineStore._aioboto_client = (
await context_stack.enter_async_context(
_get_aiodynamodb_client(
region=config.online_store.region,
max_pool_connections=config.online_store.max_pool_connections,
)
)
)
async def initialize(self, config: RepoConfig):
await _get_aiodynamodb_client(
config.online_store.region, config.online_store.max_pool_connections
)

@staticmethod
async def close():
if DynamoDBOnlineStore._aioboto_client:
DynamoDBOnlineStore._aioboto_client.close()
async def close(self):
await _aiodynamodb_close()

def update(
self,
Expand Down Expand Up @@ -325,8 +314,6 @@ async def online_read_async(

deserialize = TypeDeserializer().deserialize

await self.initialize(config)

def to_tbl_resp(raw_client_response):
return {
"entity_id": deserialize(raw_client_response["entity_id"]),
Expand All @@ -346,18 +333,17 @@ def to_tbl_resp(raw_client_response):
batches.append(batch)
entity_id_batches.append(entity_id_batch)

client_context = _get_aiodynamodb_client(
client = _get_aiodynamodb_client(
online_config.region, online_config.max_pool_connections
)
async with client_context as client:
response_batches = await asyncio.gather(
*[
client.batch_get_item(
RequestItems=entity_id_batch,
)
for entity_id_batch in entity_id_batches
]
)
response_batches = await asyncio.gather(
*[
client.batch_get_item(
RequestItems=entity_id_batch,
)
for entity_id_batch in entity_id_batches
]
)

result_batches = []
for batch, response in zip(batches, response_batches):
Expand Down Expand Up @@ -505,6 +491,7 @@ def _to_client_batch_get_payload(online_config, table_name, batch):


_aioboto_session = None
_aioboto_client = None


def _get_aioboto_session():
Expand All @@ -514,12 +501,23 @@ def _get_aioboto_session():
return _aioboto_session


def _get_aiodynamodb_client(region: str, max_pool_connections: int):
return _get_aioboto_session().create_client(
"dynamodb",
region_name=region,
config=AioConfig(max_pool_connections=max_pool_connections),
)
async def _get_aiodynamodb_client(region: str, max_pool_connections: int):
global _aioboto_client
if _aioboto_client is None:
client_context = _get_aioboto_session().create_client(
"dynamodb",
region_name=region,
config=AioConfig(max_pool_connections=max_pool_connections),
)
context_stack = contextlib.AsyncExitStack()
_aioboto_client = await context_stack.enter_async_context(client_context)
return _aioboto_client


async def _aiodynamodb_close():
global _aioboto_client
if _aioboto_client:
await _aioboto_client.close()


def _initialize_dynamodb_client(
Expand Down
6 changes: 6 additions & 0 deletions sdk/python/feast/infra/online_stores/online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,3 +390,9 @@ def retrieve_online_documents(
raise NotImplementedError(
f"Online store {self.__class__.__name__} does not support online retrieval"
)

async def initialize(self, config: RepoConfig) -> None:
pass

async def close(self) -> None:
pass
8 changes: 8 additions & 0 deletions sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,3 +473,11 @@ def get_table_column_names_and_types_from_data_source(
return self.offline_store.get_table_column_names_and_types_from_data_source(
config=config, data_source=data_source
)

async def initialize(self, config: RepoConfig) -> None:
await self.online_store.initialize(config)
await self.offline_store.initialize(config)

async def close(self) -> None:
await self.online_store.close()
await self.offline_store.close()
8 changes: 8 additions & 0 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,14 @@ def get_table_column_names_and_types_from_data_source(
"""
pass

@abstractmethod
async def initialize(self, config: RepoConfig) -> None:
pass

@abstractmethod
async def close(self) -> None:
pass


def get_provider(config: RepoConfig) -> Provider:
if "." not in config.provider:
Expand Down

0 comments on commit 2822fd2

Please sign in to comment.