Skip to content

Commit

Permalink
Merge pull request #11 from coingaming/feature/cluster-migrations
Browse files Browse the repository at this point in the history
feat: 🎸 cluster supported migrations
  • Loading branch information
karlosmid authored Aug 30, 2024
2 parents 10be0cf + 756839e commit 3fbc745
Show file tree
Hide file tree
Showing 17 changed files with 65 additions and 38 deletions.
23 changes: 17 additions & 6 deletions lib/plausible/data_migration/versioned_sessions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ defmodule Plausible.DataMigration.VersionedSessions do

cluster? = Plausible.MigrationUtils.clustered_table?("sessions_v2")

cluster_name =
if cluster? do
Plausible.MigrationUtils.cluster_name()
else
nil
end

{:ok, %{rows: partitions}} = run_sql("list-partitions")
partitions = Enum.map(partitions, fn [part] -> part end)

Expand All @@ -32,11 +39,13 @@ defmodule Plausible.DataMigration.VersionedSessions do
if Enum.member?(@versioned_table_engines, current_table_engine) do
IO.puts("sessions_v2 table is already versioned, no migration needed")
else
{:ok, _} = run_sql("drop-sessions-tmp-table", cluster?: cluster?)
{:ok, _} =
run_sql("drop-sessions-tmp-table", cluster?: cluster?, cluster_name: cluster_name)

{:ok, _} =
run_sql("create-sessions-tmp-table",
cluster?: cluster?,
cluster_name: cluster_name,
table_settings: table_settings,
unique_suffix: unique_suffix
)
Expand All @@ -46,15 +55,15 @@ defmodule Plausible.DataMigration.VersionedSessions do
end

if run_exchange? do
run_exchange(cluster?)
run_exchange(cluster?, cluster_name)
end

IO.puts("Migration done!")
end
end

defp run_exchange(cluster?) do
case run_sql("exchange-sessions-tables", cluster?: cluster?) do
defp run_exchange(cluster?, cluster_name) do
case run_sql("exchange-sessions-tables", cluster?: cluster?, cluster_name: cluster_name) do
{:ok, _} ->
nil

Expand All @@ -66,14 +75,16 @@ defmodule Plausible.DataMigration.VersionedSessions do
run_sql("rename-table",
from: "sessions_v2",
to: "sessions_v2_backup",
cluster?: cluster?
cluster?: cluster?,
cluster_name: cluster_name
)

{:ok, _} =
run_sql("rename-table",
from: "sessions_v2_tmp_versioned",
to: "sessions_v2",
cluster?: cluster?
cluster?: cluster?,
cluster_name: cluster_name
)
end
end
Expand Down
4 changes: 2 additions & 2 deletions priv/data_migrations/NumericIDs/sql/create-events-v2.sql.eex
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE IF NOT EXISTS events_v2 <%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
CREATE TABLE IF NOT EXISTS events_v2 <%= if @cluster? do %>ON CLUSTER '<%= @cluster_name %>'<% end %>
(
`timestamp` DateTime CODEC(Delta(4), LZ4),
`name` LowCardinality(String),
Expand Down Expand Up @@ -28,7 +28,7 @@ CREATE TABLE IF NOT EXISTS events_v2 <%= if @cluster? do %>ON CLUSTER '{cluster}
`transferred_from` String
)
<%= if @cluster? do %>
ENGINE = ReplicatedMergeTree('/clickhouse/{cluster}/tables/{shard}/{database}/events_v2', '{replica}')
ENGINE = ReplicatedMergeTree('/clickhouse/<%= @cluster_name %>/tables/{shard}/{database}/events_v2', '{replica}')
<% else %>
ENGINE = MergeTree()
<% end %>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE IF NOT EXISTS sessions_v2 <%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
CREATE TABLE IF NOT EXISTS sessions_v2 <%= if @cluster? do %>ON CLUSTER '<%= @cluster_name %>'<% end %>
(
`session_id` UInt64,
`sign` Int8,
Expand Down Expand Up @@ -34,7 +34,7 @@ CREATE TABLE IF NOT EXISTS sessions_v2 <%= if @cluster? do %>ON CLUSTER '{cluste
`entry_meta.value` Array(String)
)
<%= if @cluster? do %>
ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/{cluster}/tables/{shard}/plausible_prod/sessions_v2', '{replica}', sign)
ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/<%= @cluster_name %>/tables/{shard}/plausible_prod/sessions_v2', '{replica}', sign)
<% else %>
ENGINE = CollapsingMergeTree(sign)
<% end %>
Expand Down
2 changes: 1 addition & 1 deletion priv/data_migrations/NumericIDs/sql/drop-events-v2.sql.eex
Original file line number Diff line number Diff line change
@@ -1 +1 @@
DROP TABLE IF EXISTS events_v2 <%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %> SYNC
DROP TABLE IF EXISTS events_v2 <%= if @cluster? do %>ON CLUSTER '<%= @cluster_name %>'<% end %> SYNC
Original file line number Diff line number Diff line change
@@ -1 +1 @@
DROP TABLE IF EXISTS sessions_v2 <%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %> SYNC
DROP TABLE IF EXISTS sessions_v2 <%= if @cluster? do %>ON CLUSTER '<%= @cluster_name %>'<% end %> SYNC
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CREATE OR REPLACE DICTIONARY sessions_dict
<%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
<%= if @cluster? do %>ON CLUSTER '<%= @cluster_name %>'<% end %>
(
site_id UInt64,
session_id UInt64,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
DROP DICTIONARY IF EXISTS sessions_dict
<%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
<%= if @cluster? do %>ON CLUSTER '<%= @cluster_name %>'<% end %>
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
KILL MUTATION <%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
KILL MUTATION <%= if @cluster? do %>ON CLUSTER '<%= @cluster_name %>'<% end %>
WHERE command ILIKE '%sessions_dict%'
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ALTER TABLE events_v2
<%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
<%= if @cluster? do %>ON CLUSTER '<%= @cluster_name %>'<% end %>
UPDATE
referrer = dictGet('sessions_dict', 'referrer', tuple(site_id, session_id)),
referrer_source = dictGet('sessions_dict', 'referrer_source', tuple(site_id, session_id)),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
CREATE TABLE sessions_v2_tmp_versioned
<%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
<%= if @cluster? do %>ON CLUSTER '<%= @cluster_name %>'<% end %>
AS sessions_v2
<%= if @cluster? do %>
ENGINE = ReplicatedVersionedCollapsingMergeTree('/clickhouse/{cluster}/tables/{shard}/plausible_prod/sessions_v2_<%= @unique_suffix %>', '{replica}', sign, events)
ENGINE = ReplicatedVersionedCollapsingMergeTree('/clickhouse/<%= @cluster_name %>/tables/{shard}/plausible_prod/sessions_v2_<%= @unique_suffix %>', '{replica}', sign, events)
<% else %>
ENGINE = VersionedCollapsingMergeTree(sign, events)
<% end %>
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
DROP TABLE IF EXISTS sessions_v2_tmp_versioned <%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
DROP TABLE IF EXISTS sessions_v2_tmp_versioned <%= if @cluster? do %>ON CLUSTER '<%= @cluster_name %>'<% end %>
Original file line number Diff line number Diff line change
@@ -1 +1 @@
EXCHANGE TABLES sessions_v2_tmp_versioned AND sessions_v2 <%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
EXCHANGE TABLES sessions_v2_tmp_versioned AND sessions_v2
Original file line number Diff line number Diff line change
@@ -1 +1 @@
RENAME TABLE <%= @from %> TO <%= @to %> <%= if @cluster? do %>ON CLUSTER '{cluster}'<% end %>
RENAME TABLE <%= @from %> TO <%= @to %> <%= if @cluster? do %>ON CLUSTER '<%= @cluster_name %>'<% end %>
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ defmodule Plausible.ClickhouseRepo.Migrations.CreateEventsAndSessions do
create_if_not_exists table(:events,
primary_key: false,
engine: "MergeTree",
options:
"PARTITION BY toYYYYMM(timestamp) ORDER BY (domain, toDate(timestamp), user_id) SETTINGS index_granularity = 8192"
options: [
partition_by: "toYYYYMM(timestamp)",
order_by: "(domain, toDate(timestamp), user_id)",
settings: "index_granularity = 8192"
]
) do
add(:name, :string)
add(:domain, :string)
Expand All @@ -34,8 +37,11 @@ defmodule Plausible.ClickhouseRepo.Migrations.CreateEventsAndSessions do
create_if_not_exists table(:sessions,
primary_key: false,
engine: "CollapsingMergeTree(sign)",
options:
"PARTITION BY toYYYYMM(start) ORDER BY (domain, toDate(start), user_id, session_id) SETTINGS index_granularity = 8192"
options: [
partition_by: "toYYYYMM(start)",
order_by: "(domain, toDate(start), user_id, session_id)",
settings: "index_granularity = 8192"
]
) do
add(:session_id, :UInt64)
add(:sign, :Int8)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Plausible.ClickhouseRepo.Migrations.CreateImportedVisitors do
create_if_not_exists table(:imported_visitors,
primary_key: false,
engine: "MergeTree",
options: "ORDER BY (site_id, date)"
options: [order_by: "(site_id, date)"]
) do
add(:site_id, :UInt64)
add(:date, :date)
Expand All @@ -19,7 +19,7 @@ defmodule Plausible.ClickhouseRepo.Migrations.CreateImportedVisitors do
create_if_not_exists table(:imported_sources,
primary_key: false,
engine: "MergeTree",
options: "ORDER BY (site_id, date, source)"
options: [order_by: "(site_id, date, source)"]
) do
add(:site_id, :UInt64)
add(:date, :date)
Expand All @@ -37,7 +37,7 @@ defmodule Plausible.ClickhouseRepo.Migrations.CreateImportedVisitors do
create_if_not_exists table(:imported_pages,
primary_key: false,
engine: "MergeTree",
options: "ORDER BY (site_id, date, hostname, page)"
options: [order_by: "(site_id, date, hostname, page)"]
) do
add(:site_id, :UInt64)
add(:date, :date)
Expand All @@ -52,7 +52,7 @@ defmodule Plausible.ClickhouseRepo.Migrations.CreateImportedVisitors do
create_if_not_exists table(:imported_entry_pages,
primary_key: false,
engine: "MergeTree",
options: "ORDER BY (site_id, date, entry_page)"
options: [order_by: "(site_id, date, entry_page)"]
) do
add(:site_id, :UInt64)
add(:date, :date)
Expand All @@ -66,7 +66,7 @@ defmodule Plausible.ClickhouseRepo.Migrations.CreateImportedVisitors do
create_if_not_exists table(:imported_exit_pages,
primary_key: false,
engine: "MergeTree",
options: "ORDER BY (site_id, date, exit_page)"
options: [order_by: "(site_id, date, exit_page)"]
) do
add(:site_id, :UInt64)
add(:date, :date)
Expand All @@ -78,7 +78,7 @@ defmodule Plausible.ClickhouseRepo.Migrations.CreateImportedVisitors do
create_if_not_exists table(:imported_locations,
primary_key: false,
engine: "MergeTree",
options: "ORDER BY (site_id, date, country, region, city)"
options: [order_by: "(site_id, date, country, region, city)"]
) do
add(:site_id, :UInt64)
add(:date, :date)
Expand All @@ -94,7 +94,7 @@ defmodule Plausible.ClickhouseRepo.Migrations.CreateImportedVisitors do
create_if_not_exists table(:imported_devices,
primary_key: false,
engine: "MergeTree",
options: "ORDER BY (site_id, date, device)"
options: [order_by: "(site_id, date, device)"]
) do
add(:site_id, :UInt64)
add(:date, :date)
Expand All @@ -108,7 +108,7 @@ defmodule Plausible.ClickhouseRepo.Migrations.CreateImportedVisitors do
create_if_not_exists table(:imported_browsers,
primary_key: false,
engine: "MergeTree",
options: "ORDER BY (site_id, date, browser)"
options: [order_by: "(site_id, date, browser)"]
) do
add(:site_id, :UInt64)
add(:date, :date)
Expand All @@ -122,7 +122,7 @@ defmodule Plausible.ClickhouseRepo.Migrations.CreateImportedVisitors do
create_if_not_exists table(:imported_operating_systems,
primary_key: false,
engine: "MergeTree",
options: "ORDER BY (site_id, date, operating_system)"
options: [order_by: "(site_id, date, operating_system)"]
) do
add(:site_id, :UInt64)
add(:date, :date)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ defmodule Plausible.IngestRepo.Migrations.CreateIngestCountersTable do
create_if_not_exists table(:ingest_counters,
primary_key: false,
engine: "SummingMergeTree(value)",
options:
"ORDER BY (domain, toDate(event_timebucket), metric, toStartOfMinute(event_timebucket))"
options: [
order_by:
"(domain, toDate(event_timebucket), metric, toStartOfMinute(event_timebucket))"
]
) do
add(:event_timebucket, :utc_datetime)
add(:domain, :"LowCardinality(String)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,21 @@ defmodule Plausible.IngestRepo.Migrations.AddImportedCustomEvents do
def change do
# NOTE: Using another table for determining cluster presence
on_cluster = Plausible.MigrationUtils.on_cluster_statement("imported_pages")
cluster? = Plausible.MigrationUtils.clustered_table?("imported_pages")

cluster_name =
if cluster? do
Plausible.MigrationUtils.cluster_name()
else
nil
end

settings =
if Plausible.MigrationUtils.clustered_table?("imported_pages") do
"""
ENGINE = ReplicatedMergeTree('/clickhouse/{cluster}/tables/{shard}/{database}/imported_custom_events', '{replica}')
ENGINE = ReplicatedMergeTree('/clickhouse/#{cluster_name}/tables/{shard}/{database}/imported_custom_events', '{replica}')
ORDER BY (site_id, import_id, date, name)
SETTINGS replicated_deduplication_window = 0, storage_policy = 'tiered'
SETTINGS replicated_deduplication_window = 0, storage_policy = 's3_with_keeper'
"""
else
"""
Expand Down

0 comments on commit 3fbc745

Please sign in to comment.