Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(unique-count): Add grouped_unique_count to ClickhouseStore #1701

Merged
merged 1 commit into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 78 additions & 4 deletions app/services/events/stores/clickhouse/unique_count_query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,36 @@ def prorated_query
SQL
end

def grouped_query
<<-SQL
#{grouped_events_cte_sql},
event_values AS (
SELECT
#{group_names},
property,
SUM(adjusted_value) AS sum_adjusted_value
FROM (
SELECT
timestamp,
property,
operation_type,
#{group_names},
#{grouped_operation_value_sql} AS adjusted_value
FROM events_data
ORDER BY timestamp ASC
) adjusted_event_values
GROUP BY #{group_names}, property
)
SELECT
#{group_names},
SUM(sum_adjusted_value) as aggregation
FROM event_values
GROUP BY #{group_names}
SQL
end

# NOTE: Not used in production, only for debug purpose to check the computed values before aggregation
# Returns an array of event's timestamp, property, operation type and operation value
# Example:
Expand Down Expand Up @@ -111,6 +141,24 @@ def events_cte_sql
SQL
end

def grouped_events_cte_sql
groups = store.grouped_by.map.with_index do |group, index|
"#{sanitized_property_name(group)} AS g_#{index}"
end

<<-SQL
WITH events_data AS (#{
events
.select(
"#{groups.join(', ')}, \
toDateTime64(timestamp, 5, 'UTC') as timestamp, \
#{sanitized_property_name} AS property, \
coalesce(NULLIF(events_raw.properties['operation_type'], ''), 'add') AS operation_type",
).to_sql
})
SQL
end

def operation_value_sql
# NOTE: Returns 1 for relevant addition, -1 for relevant removal
# If property already added, another addition returns 0 ; it returns 1 otherwise
Expand All @@ -120,14 +168,36 @@ def operation_value_sql
operation_type = 'add',
(if(
(lagInFrame(operation_type, 1) OVER (PARTITION BY property ORDER BY timestamp ASC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) = 'add',
0,
1
toDecimal128(0, :decimal_scale),
toDecimal128(1, :decimal_scale)
))
,
(if(
(lagInFrame(operation_type, 1, 'remove') OVER (PARTITION BY property ORDER BY timestamp ASC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) = 'remove',
-1,
0
toDecimal128(-1, :decimal_scale),
toDecimal128(0, :decimal_scale)
))
)
SQL
end

def grouped_operation_value_sql
# NOTE: Returns 1 for relevant addition, -1 for relevant removal
# If property already added, another addition returns 0 ; it returns 1 otherwise
# If property already removed or not yet present, another removal returns 0 ; it returns -1 otherwise
<<-SQL
if (
operation_type = 'add',
(if(
(lagInFrame(operation_type, 1) OVER (PARTITION BY #{group_names}, property ORDER BY timestamp ASC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) = 'add',
toDecimal128(0, :decimal_scale),
toDecimal128(1, :decimal_scale)
))
,
(if(
(lagInFrame(operation_type, 1, 'remove') OVER (PARTITION BY #{group_names}, property ORDER BY timestamp ASC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) = 'remove',
toDecimal128(-1, :decimal_scale),
toDecimal128(0, :decimal_scale)
))
)
SQL
Expand All @@ -151,6 +221,10 @@ def period_ratio_sql
)
SQL
end

def group_names
@group_names ||= store.grouped_by.map.with_index { |_, index| "g_#{index}" }.join(', ')
end
end
end
end
Expand Down
24 changes: 22 additions & 2 deletions app/services/events/stores/clickhouse_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,15 @@ def grouped_count

def unique_count
query = Events::Stores::Clickhouse::UniqueCountQuery.new(store: self)
sql = ActiveRecord::Base.sanitize_sql_for_conditions([query.query])
sql = ActiveRecord::Base.sanitize_sql_for_conditions(
[
query.query,
{ decimal_scale: DECIMAL_SCALE },
],
)
result = ::Clickhouse::EventsRaw.connection.select_one(sql)

BigDecimal(result['aggregation'])
result['aggregation']
end

# NOTE: not used in production, only for debug purpose to check the computed values before aggregation
Expand All @@ -141,6 +146,21 @@ def prorated_unique_count
result['aggregation']
end

def grouped_unique_count
query = Events::Stores::Clickhouse::UniqueCountQuery.new(store: self)
sql = ActiveRecord::Base.sanitize_sql_for_conditions(
[
query.grouped_query,
{
to_datetime: to_datetime.ceil,
decimal_scale: DECIMAL_SCALE,
},
],
)

prepare_grouped_result(::Clickhouse::EventsRaw.connection.select_all(sql).rows)
end

def max
events.maximum(Arel.sql(sanitized_numeric_property))
end
Expand Down
77 changes: 77 additions & 0 deletions spec/services/events/stores/clickhouse_store_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,83 @@
end
end

describe '#grouped_unique_count' do
let(:grouped_by) { %w[agent_name other] }
let(:started_at) { Time.zone.parse('2023-03-01') }

let(:events) do
[
Clickhouse::EventsRaw.create!(
organization_id: organization.id,
external_subscription_id: subscription.external_id,
external_customer_id: customer.external_id,
code:,
timestamp: boundaries[:from_datetime] + 1.hour,
properties: {
billable_metric.field_name => 2,
agent_name: 'frodo',
},
),
Clickhouse::EventsRaw.create!(
organization_id: organization.id,
external_subscription_id: subscription.external_id,
external_customer_id: customer.external_id,
code:,
timestamp: boundaries[:from_datetime] + 1.day,
properties: {
billable_metric.field_name => 2,
agent_name: 'aragorn',
},
),
Clickhouse::EventsRaw.create!(
organization_id: organization.id,
external_subscription_id: subscription.external_id,
external_customer_id: customer.external_id,
code:,
timestamp: boundaries[:from_datetime] + 2.days,
properties: {
billable_metric.field_name => 2,
agent_name: 'aragorn',
operation_type: 'remove',
},
),
Clickhouse::EventsRaw.create!(
organization_id: organization.id,
external_subscription_id: subscription.external_id,
external_customer_id: customer.external_id,
code:,
timestamp: boundaries[:from_datetime] + 2.days,
properties: { billable_metric.field_name => 2 },
),
]
end

before do
event_store.aggregation_property = billable_metric.field_name
end

it 'returns the unique count of event properties' do
result = event_store.grouped_unique_count

expect(result.count).to eq(3)

null_group = result.find { |v| v[:groups]['agent_name'].nil? }
expect(null_group[:groups]['other']).to be_nil
expect(null_group[:value]).to eq(1)

expect(result[...-1].map { |r| r[:value] }).to contain_exactly(1, 0)
end

context 'with no events' do
let(:events) { [] }

it 'returns the unique count of event properties' do
result = event_store.grouped_unique_count
expect(result.count).to eq(0)
end
end
end

describe '.events_values' do
it 'returns the value attached to each event' do
event_store.aggregation_property = billable_metric.field_name
Expand Down
Loading