Skip to content

Commit

Permalink
Update mysql.py to create index only if not exists during update
Browse files Browse the repository at this point in the history
  • Loading branch information
gee-senbong committed Jan 25, 2024
1 parent 9a3590e commit e98ae0b
Showing 1 changed file with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class MySQLOnlineStore(OnlineStore):
_conn: Optional[Connection] = None

def _get_conn(self, config: RepoConfig) -> Connection:

online_store_config = config.online_store
assert isinstance(online_store_config, MySQLOnlineStoreConfig)

Expand All @@ -65,7 +64,6 @@ def online_write_batch(
],
progress: Optional[Callable[[int], Any]],
) -> None:

conn = self._get_conn(config)
cur = conn.cursor()

Expand Down Expand Up @@ -178,18 +176,26 @@ def update(

# We don't create any special state for the entities in this implementation.
for table in tables_to_keep:
table_name = _table_id(project, table)
index_name = f"{table_name}_ek"
cur.execute(
f"""CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key VARCHAR(512),
f"""CREATE TABLE IF NOT EXISTS {table_name} (entity_key VARCHAR(512),
feature_name VARCHAR(256),
value BLOB,
event_ts timestamp NULL DEFAULT NULL,
created_ts timestamp NULL DEFAULT NULL,
PRIMARY KEY(entity_key, feature_name))"""
)

cur.execute(
f"ALTER TABLE {_table_id(project, table)} ADD INDEX {_table_id(project, table)}_ek (entity_key);"
index_exists = cur.execute(
f"""
SELECT 1 FROM information_schema.statistics
WHERE table_schema = DATABASE() AND table_name = '{table_name}' AND index_name = '{index_name}'
"""
)
if not index_exists:
cur.execute(
f"ALTER TABLE {table_name} ADD INDEX {index_name} (entity_key);"
)

for table in tables_to_delete:
_drop_table_and_index(cur, project, table)
Expand Down

0 comments on commit e98ae0b

Please sign in to comment.