Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: partition xapi_events_all table #586

Merged
merged 1 commit into from
Feb 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""
Partition the event_sink.user_profile table

.. pii: Stores Open edX user profile data.
.. pii_types: user_id, name, username, location, phone_number, email_address, birth_date, biography, gender
.. pii_retirement: local_api, consumer_api
"""
from alembic import op


revision = "0032"
down_revision = "0031"
branch_labels = None
depends_on = None
on_cluster = " ON CLUSTER '{{CLICKHOUSE_CLUSTER_NAME}}' " if "{{CLICKHOUSE_CLUSTER_NAME}}" else ""
engine = "ReplicatedReplacingMergeTree" if "{{CLICKHOUSE_CLUSTER_NAME}}" else "ReplacingMergeTree"

old_user_profile_table = "{{ASPECTS_XAPI_DATABASE}}.old_{{ASPECTS_RAW_XAPI_TABLE}}"

def upgrade():
# Partition event_sink.user_profile table
# 1. Rename old table
op.execute(
f"""
RENAME TABLE {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }}
TO {old_user_profile_table}
{on_cluster}
"""
)
# 2. Create partitioned table from old data
op.execute(
f"""
CREATE TABLE IF NOT EXISTS {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }}
{on_cluster}
(
event_id UUID NOT NULL,
emission_time DateTime64(6) NOT NULL,
event String NOT NULL
) ENGINE {engine}
ORDER BY (emission_time, event_id)
PARTITION BY toYYYYMM(emission_time)
PRIMARY KEY (emission_time, event_id);
"""
)
# 3. Insert data from the old table into the new one
op.execute(
f"""
INSERT INTO {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }}
SELECT * FROM {old_user_profile_table}
"""
)
# 4. Drop the old table
op.execute(
f"""
DROP TABLE {old_user_profile_table}
{on_cluster}
"""
)


def downgrade():
# Un-partition the event_sink.user_profile table
# 1a. Rename old table
op.execute(
f"""
RENAME TABLE {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }}
TO {old_user_profile_table}
{on_cluster}
"""
)

# 2. Create un-partitioned table from old data
op.execute(
f"""
CREATE OR REPLACE TABLE {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }}
{on_cluster}
(
event_id UUID NOT NULL,
emission_time DateTime64(6) NOT NULL,
event String NOT NULL
) ENGINE {engine}
ORDER BY (emission_time, event_id)
PRIMARY KEY (emission_time, event_id);
"""
)
# 3. Insert into new table from old one
op.execute(
f"""
INSERT INTO {{ ASPECTS_XAPI_DATABASE }}.{{ ASPECTS_RAW_XAPI_TABLE }}
SELECT * FROM {old_user_profile_table}
"""

)
# 4. Drop the old table
op.execute(
f"""
DROP TABLE {old_user_profile_table}
{on_cluster}
"""
)
Loading