From e98ae0bbf76dcfd8772e11a70e49feed951d7585 Mon Sep 17 00:00:00 2001 From: senbong Date: Thu, 25 Jan 2024 21:18:00 +0800 Subject: [PATCH] Update mysql.py to create index only if not exists during update --- .../contrib/mysql_online_store/mysql.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/mysql_online_store/mysql.py b/sdk/python/feast/infra/online_stores/contrib/mysql_online_store/mysql.py index fa7dd2c2a49..2f5b2ef05cb 100644 --- a/sdk/python/feast/infra/online_stores/contrib/mysql_online_store/mysql.py +++ b/sdk/python/feast/infra/online_stores/contrib/mysql_online_store/mysql.py @@ -41,7 +41,6 @@ class MySQLOnlineStore(OnlineStore): _conn: Optional[Connection] = None def _get_conn(self, config: RepoConfig) -> Connection: - online_store_config = config.online_store assert isinstance(online_store_config, MySQLOnlineStoreConfig) @@ -65,7 +64,6 @@ def online_write_batch( ], progress: Optional[Callable[[int], Any]], ) -> None: - conn = self._get_conn(config) cur = conn.cursor() @@ -178,18 +176,26 @@ def update( # We don't create any special state for the entities in this implementation. for table in tables_to_keep: + table_name = _table_id(project, table) + index_name = f"{table_name}_ek" cur.execute( - f"""CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key VARCHAR(512), + f"""CREATE TABLE IF NOT EXISTS {table_name} (entity_key VARCHAR(512), feature_name VARCHAR(256), value BLOB, event_ts timestamp NULL DEFAULT NULL, created_ts timestamp NULL DEFAULT NULL, PRIMARY KEY(entity_key, feature_name))""" ) - - cur.execute( - f"ALTER TABLE {_table_id(project, table)} ADD INDEX {_table_id(project, table)}_ek (entity_key);" + index_exists = cur.execute( + f""" + SELECT 1 FROM information_schema.statistics + WHERE table_schema = DATABASE() AND table_name = '{table_name}' AND index_name = '{index_name}' + """ ) + if not index_exists: + cur.execute( + f"ALTER TABLE {table_name} ADD INDEX {index_name} (entity_key);" + ) for table in tables_to_delete: _drop_table_and_index(cur, project, table)