-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add Postgres Online Store Async Feature Retrieval #2
feat: Add Postgres Online Store Async Feature Retrieval #2
Conversation
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Set connection read only Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Addition Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Use new ConnectionPool Pass kwargs as named argument Use executemany over execute_values Remove not-required open argument in psycopg.connect Improve Use SpooledTemporaryFile Use max_size and add docstring Properly write with StringIO Utils: Use SpooledTemporaryFile over StringIO object Add replace Fix df_to_postgres_table Remove import Utils Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Add log statement Lint: Fix _to_arrow_internal Lint: Fix _get_entity_df_event_timestamp_range Update exception Use ZeroColumnQueryResult Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Update warning Fix Format warning Add typehints Use better variable name Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
…nline store Remove ConnectionType.pool_async Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
… in online_read_async Apply fixes Online: Create helper functions for online_read, and re-use them also in online_read_async Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
if self._conn_async: | ||
await self._conn_async.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not too sure about this part, however, the following integration test is getting stuck without this part:
sdk/python/tests/integration/online_store/test_universal_online.py::test_async_online_retrieval_with_event_timestamps
@staticmethod | ||
def _construct_query_and_params( | ||
config: RepoConfig, | ||
table: FeatureView, | ||
keys: List[bytes], | ||
requested_features: Optional[List[str]] = None, | ||
) -> Tuple[sql.Composed, Union[Tuple[List[bytes], List[str]], Tuple[List[bytes]]]]: | ||
"""Construct the SQL query based on the given parameters.""" | ||
if requested_features: | ||
query = sql.SQL( | ||
""" | ||
SELECT entity_key, feature_name, value, event_ts | ||
FROM {} WHERE entity_key = ANY(%s) and feature_name = ANY(%s); | ||
""" | ||
).format( | ||
sql.Identifier(_table_id(config.project, table)), | ||
) | ||
params = (keys, requested_features) | ||
else: | ||
query = sql.SQL( | ||
""" | ||
SELECT entity_key, feature_name, value, event_ts | ||
FROM {} WHERE entity_key = ANY(%s); | ||
""" | ||
).format( | ||
sql.Identifier(_table_id(config.project, table)), | ||
) | ||
params = (keys, []) | ||
return query, params | ||
|
||
@staticmethod | ||
def _prepare_keys( | ||
config: RepoConfig, entity_keys: List[EntityKeyProto] | ||
) -> List[bytes]: | ||
"""Prepare all keys in a list to make fewer round trips to the database.""" | ||
return [ | ||
serialize_entity_key( | ||
entity_key, | ||
entity_key_serialization_version=config.entity_key_serialization_version, | ||
) | ||
for entity_key in entity_keys | ||
] | ||
|
||
@staticmethod | ||
def _process_rows( | ||
keys: List[bytes], rows: List[Tuple] | ||
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: | ||
"""Transform the retrieved rows in the desired output. | ||
|
||
PostgreSQL may return rows in an unpredictable order. Therefore, `values_dict` | ||
is created to quickly look up the correct row using the keys, since these are | ||
actually in the correct order. | ||
""" | ||
values_dict = defaultdict(list) | ||
for row in rows if rows is not None else []: | ||
values_dict[ | ||
row[0] if isinstance(row[0], bytes) else row[0].tobytes() | ||
].append(row[1:]) | ||
|
||
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] | ||
for key in keys: | ||
if key in values_dict: | ||
value = values_dict[key] | ||
res = {} | ||
for feature_name, value_bin, event_ts in value: | ||
val = ValueProto() | ||
val.ParseFromString(bytes(value_bin)) | ||
res[feature_name] = val | ||
result.append((event_ts, res)) | ||
else: | ||
result.append((None, None)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No new code is introduced here. This is the online_read
method broken down into helper functions.
2d86e41
to
fb0d977
Compare
7165c51
to
3328530
Compare
Will open PR with this branch once this PR is merged |
What this PR does / why we need it:
This PR implements the
online_read_async
method on thePostgreSQLOnlineStore
class.To make this happen, the
_get_conn_async
method is added toPostgreSQLOnlineStore
, to be able to retrieve anAsyncConnectionPool
online_read
method is broken down using several helper functions. These helper functions are re-used in theonline_read_async
method.test_async_online_retrieval_with_event_timestamps
test is activated forpostgres
Which issue(s) this PR fixes:
Fixes feast-dev#4260
Dependencies
Blocked by feast-dev#4303