Skip to content

Commit

Permalink
feat(unique-count): Add grouped unique count to PostgresStore
Browse files Browse the repository at this point in the history
  • Loading branch information
rsempe committed Feb 15, 2024
1 parent 5f8a692 commit 4116cca
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 10 deletions.
90 changes: 80 additions & 10 deletions app/services/events/stores/postgres/unique_count_query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,36 @@ def prorated_query
SQL
end

def grouped_query
<<-SQL
#{grouped_events_cte_sql},
event_values AS (
SELECT
#{group_names},
property,
SUM(adjusted_value) AS sum_adjusted_value
FROM (
SELECT
timestamp,
property,
operation_type,
#{group_names},
#{grouped_operation_value_sql} AS adjusted_value
FROM events_data
ORDER BY timestamp ASC
) adjusted_event_values
GROUP BY #{group_names}, property
)
SELECT
#{group_names},
SUM(sum_adjusted_value) as aggregation
FROM event_values
GROUP BY #{group_names}
SQL
end

# NOTE: Not used in production, only for debug purpose to check the computed values before aggregation
# Returns an array of event's timestamp, property, operation type and operation value
# Example:
Expand Down Expand Up @@ -96,16 +126,32 @@ def breakdown_query
def events_cte_sql
# NOTE: Common table expression returning event's timestamp, property name and operation type.
<<-SQL
WITH events_data AS (
(#{
events
.select(
"timestamp, \
#{sanitized_property_name} AS property, \
COALESCE(events.properties->>'operation_type', 'add') AS operation_type",
).to_sql
})
)
WITH events_data AS (#{
events
.select(
"timestamp, \
#{sanitized_property_name} AS property, \
COALESCE(events.properties->>'operation_type', 'add') AS operation_type",
).to_sql
})
SQL
end

def grouped_events_cte_sql
groups = store.grouped_by.map.with_index do |group, index|
"#{sanitized_property_name(group)} AS g_#{index}"
end

<<-SQL
WITH events_data AS (#{
events
.select(
"#{groups.join(', ')}, \
timestamp, \
#{sanitized_property_name} AS property, \
COALESCE(events.properties->>'operation_type', 'add') AS operation_type",
).to_sql
})
SQL
end

Expand All @@ -129,6 +175,26 @@ def operation_value_sql
SQL
end

def grouped_operation_value_sql
# NOTE: Returns 1 for relevant addition, -1 for relevant removal
# If property already added, another addition returns 0 ; it returns 1 otherwise
# If property already removed or not yet present, another removal returns 0 ; it returns -1 otherwise
<<-SQL
CASE WHEN operation_type = 'add'
THEN
CASE WHEN LAG(operation_type, 1) OVER (PARTITION BY #{group_names}, property ORDER BY timestamp) = 'add'
THEN 0
ELSE 1
END
ELSE
CASE LAG(operation_type, 1, 'remove') OVER (PARTITION BY #{group_names}, property ORDER BY timestamp)
WHEN 'remove' THEN 0
ELSE -1
END
END
SQL
end

def period_ratio_sql
<<-SQL
CASE WHEN operation_type = 'add'
Expand All @@ -143,6 +209,10 @@ def period_ratio_sql
END
SQL
end

def group_names
@group_names ||= store.grouped_by.map.with_index { |_, index| "g_#{index}" }.join(', ')
end
end
end
end
Expand Down
10 changes: 10 additions & 0 deletions app/services/events/stores/postgres_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ def prorated_unique_count
result['aggregation']
end

def grouped_unique_count
query = Events::Stores::Postgres::UniqueCountQuery.new(store: self)

sql = ActiveRecord::Base.sanitize_sql_for_conditions(
[query.grouped_query],
)

prepare_grouped_result(Event.connection.select_all(sql).rows)
end

def max
events.maximum("(#{sanitized_property_name})::numeric")
end
Expand Down
82 changes: 82 additions & 0 deletions spec/services/events/stores/postgres_store_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,88 @@
end
end

describe '#grouped_unique_count' do
let(:grouped_by) { %w[agent_name other] }
let(:started_at) { Time.zone.parse('2023-03-01') }

let(:events) do
[
create(
:event,
organization_id: organization.id,
external_subscription_id: subscription.external_id,
external_customer_id: customer.external_id,
code:,
timestamp: boundaries[:from_datetime] + 1.hour,
properties: {
billable_metric.field_name => 2,
agent_name: 'frodo',
},
),
create(
:event,
organization_id: organization.id,
external_subscription_id: subscription.external_id,
external_customer_id: customer.external_id,
code:,
timestamp: boundaries[:from_datetime] + 1.day,
properties: {
billable_metric.field_name => 2,
agent_name: 'aragorn',
},
),
create(
:event,
organization_id: organization.id,
external_subscription_id: subscription.external_id,
external_customer_id: customer.external_id,
code:,
timestamp: boundaries[:from_datetime] + 2.days,
properties: {
billable_metric.field_name => 2,
agent_name: 'aragorn',
operation_type: 'remove',
},
),
create(
:event,
organization_id: organization.id,
external_subscription_id: subscription.external_id,
external_customer_id: customer.external_id,
code:,
timestamp: boundaries[:from_datetime] + 2.days,
properties: { billable_metric.field_name => 2 },
),
]
end

before do
event_store.aggregation_property = billable_metric.field_name
end

it 'returns the unique count of event properties' do
result = event_store.grouped_unique_count

expect(result.count).to eq(3)

null_group = result.last
expect(null_group[:groups]['agent_name']).to be_nil
expect(null_group[:groups]['other']).to be_nil
expect(null_group[:value]).to eq(1)

expect(result[...-1].map { |r| r[:value] }).to contain_exactly(1, 0)
end

context 'with no events' do
let(:events) { [] }

it 'returns the unique count of event properties' do
result = event_store.grouped_unique_count
expect(result.count).to eq(0)
end
end
end

describe '#events_values' do
it 'returns the value attached to each event' do
event_store.aggregation_property = billable_metric.field_name
Expand Down

0 comments on commit 4116cca

Please sign in to comment.