Skip to content

Commit

Permalink
feat(unique-count): Add grouped_unique_count to ClickhouseStore
Browse files Browse the repository at this point in the history
  • Loading branch information
rsempe committed Feb 16, 2024
1 parent 0212870 commit 875a513
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 6 deletions.
82 changes: 78 additions & 4 deletions app/services/events/stores/clickhouse/unique_count_query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,36 @@ def prorated_query
SQL
end

def grouped_query
<<-SQL
#{grouped_events_cte_sql},
event_values AS (
SELECT
#{group_names},
property,
SUM(adjusted_value) AS sum_adjusted_value
FROM (
SELECT
timestamp,
property,
operation_type,
#{group_names},
#{grouped_operation_value_sql} AS adjusted_value
FROM events_data
ORDER BY timestamp ASC
) adjusted_event_values
GROUP BY #{group_names}, property
)
SELECT
#{group_names},
SUM(sum_adjusted_value) as aggregation
FROM event_values
GROUP BY #{group_names}
SQL
end

# NOTE: Not used in production, only for debug purpose to check the computed values before aggregation
# Returns an array of event's timestamp, property, operation type and operation value
# Example:
Expand Down Expand Up @@ -111,6 +141,24 @@ def events_cte_sql
SQL
end

def grouped_events_cte_sql
groups = store.grouped_by.map.with_index do |group, index|
"#{sanitized_property_name(group)} AS g_#{index}"
end

<<-SQL
WITH events_data AS (#{
events
.select(
"#{groups.join(', ')}, \
toDateTime64(timestamp, 5, 'UTC') as timestamp, \
#{sanitized_property_name} AS property, \
coalesce(NULLIF(events_raw.properties['operation_type'], ''), 'add') AS operation_type",
).to_sql
})
SQL
end

def operation_value_sql
# NOTE: Returns 1 for relevant addition, -1 for relevant removal
# If property already added, another addition returns 0 ; it returns 1 otherwise
Expand All @@ -120,14 +168,36 @@ def operation_value_sql
operation_type = 'add',
(if(
(lagInFrame(operation_type, 1) OVER (PARTITION BY property ORDER BY timestamp ASC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) = 'add',
0,
1
toDecimal128(0, :decimal_scale),
toDecimal128(1, :decimal_scale)
))
,
(if(
(lagInFrame(operation_type, 1, 'remove') OVER (PARTITION BY property ORDER BY timestamp ASC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) = 'remove',
-1,
0
toDecimal128(-1, :decimal_scale),
toDecimal128(0, :decimal_scale)
))
)
SQL
end

def grouped_operation_value_sql
# NOTE: Returns 1 for relevant addition, -1 for relevant removal
# If property already added, another addition returns 0 ; it returns 1 otherwise
# If property already removed or not yet present, another removal returns 0 ; it returns -1 otherwise
<<-SQL
if (
operation_type = 'add',
(if(
(lagInFrame(operation_type, 1) OVER (PARTITION BY #{group_names}, property ORDER BY timestamp ASC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) = 'add',
toDecimal128(0, :decimal_scale),
toDecimal128(1, :decimal_scale)
))
,
(if(
(lagInFrame(operation_type, 1, 'remove') OVER (PARTITION BY #{group_names}, property ORDER BY timestamp ASC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)) = 'remove',
toDecimal128(-1, :decimal_scale),
toDecimal128(0, :decimal_scale)
))
)
SQL
Expand All @@ -151,6 +221,10 @@ def period_ratio_sql
)
SQL
end

def group_names
@group_names ||= store.grouped_by.map.with_index { |_, index| "g_#{index}" }.join(', ')
end
end
end
end
Expand Down
24 changes: 22 additions & 2 deletions app/services/events/stores/clickhouse_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,15 @@ def grouped_count

def unique_count
query = Events::Stores::Clickhouse::UniqueCountQuery.new(store: self)
sql = ActiveRecord::Base.sanitize_sql_for_conditions([query.query])
sql = ActiveRecord::Base.sanitize_sql_for_conditions(
[
query.query,
{ decimal_scale: DECIMAL_SCALE },
],
)
result = ::Clickhouse::EventsRaw.connection.select_one(sql)

BigDecimal(result['aggregation'])
result['aggregation']
end

# NOTE: not used in production, only for debug purpose to check the computed values before aggregation
Expand All @@ -141,6 +146,21 @@ def prorated_unique_count
result['aggregation']
end

def grouped_unique_count
query = Events::Stores::Clickhouse::UniqueCountQuery.new(store: self)
sql = ActiveRecord::Base.sanitize_sql_for_conditions(
[
query.grouped_query,
{
to_datetime: to_datetime.ceil,
decimal_scale: DECIMAL_SCALE,
},
],
)

prepare_grouped_result(::Clickhouse::EventsRaw.connection.select_all(sql).rows)
end

def max
events.maximum(Arel.sql(sanitized_numeric_property))
end
Expand Down
77 changes: 77 additions & 0 deletions spec/services/events/stores/clickhouse_store_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,83 @@
end
end

describe '#grouped_unique_count' do
let(:grouped_by) { %w[agent_name other] }
let(:started_at) { Time.zone.parse('2023-03-01') }

let(:events) do
[
Clickhouse::EventsRaw.create!(
organization_id: organization.id,
external_subscription_id: subscription.external_id,
external_customer_id: customer.external_id,
code:,
timestamp: boundaries[:from_datetime] + 1.hour,
properties: {
billable_metric.field_name => 2,
agent_name: 'frodo',
},
),
Clickhouse::EventsRaw.create!(
organization_id: organization.id,
external_subscription_id: subscription.external_id,
external_customer_id: customer.external_id,
code:,
timestamp: boundaries[:from_datetime] + 1.day,
properties: {
billable_metric.field_name => 2,
agent_name: 'aragorn',
},
),
Clickhouse::EventsRaw.create!(
organization_id: organization.id,
external_subscription_id: subscription.external_id,
external_customer_id: customer.external_id,
code:,
timestamp: boundaries[:from_datetime] + 2.days,
properties: {
billable_metric.field_name => 2,
agent_name: 'aragorn',
operation_type: 'remove',
},
),
Clickhouse::EventsRaw.create!(
organization_id: organization.id,
external_subscription_id: subscription.external_id,
external_customer_id: customer.external_id,
code:,
timestamp: boundaries[:from_datetime] + 2.days,
properties: { billable_metric.field_name => 2 },
),
]
end

before do
event_store.aggregation_property = billable_metric.field_name
end

it 'returns the unique count of event properties' do
result = event_store.grouped_unique_count

expect(result.count).to eq(3)

null_group = result.find { |v| v[:groups]['agent_name'].nil? }
expect(null_group[:groups]['other']).to be_nil
expect(null_group[:value]).to eq(1)

expect(result[...-1].map { |r| r[:value] }).to contain_exactly(1, 0)
end

context 'with no events' do
let(:events) { [] }

it 'returns the unique count of event properties' do
result = event_store.grouped_unique_count
expect(result.count).to eq(0)
end
end
end

describe '.events_values' do
it 'returns the value attached to each event' do
event_store.aggregation_property = billable_metric.field_name
Expand Down

0 comments on commit 875a513

Please sign in to comment.