From 8da36e99180accdebc31e7c943b3712ca12728b0 Mon Sep 17 00:00:00 2001 From: Vincent Pochet <vincent@getlago.com> Date: Fri, 8 Nov 2024 11:02:33 +0100 Subject: [PATCH] feat(DailyUsage): Add daily_usages:fill_history task (#2751) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Context This PR is part of the Usage Revenue and unit. Today, Lago does not offer a way to retrieve customer usage with a granularity lower than the billing period (via invoices and fees). On the other end, it is possible to get the current usage for a customer/subscription, but this usage is just a “snapshot” of the usage a the current time. ## Description This PR adds a rake task to fill the daily usage history for a specific organization --- app/services/daily_usages/compute_service.rb | 2 +- .../invoices/customer_usage_service.rb | 33 ++++++----- lib/tasks/daily_usages.rake | 58 +++++++++++++++++++ 3 files changed, 79 insertions(+), 14 deletions(-) create mode 100644 lib/tasks/daily_usages.rake diff --git a/app/services/daily_usages/compute_service.rb b/app/services/daily_usages/compute_service.rb index d1fa2a2ca11..0ab2b9839c1 100644 --- a/app/services/daily_usages/compute_service.rb +++ b/app/services/daily_usages/compute_service.rb @@ -19,7 +19,7 @@ def call customer: subscription.customer, subscription:, external_subscription_id: subscription.external_id, - usage: ::V1::Customers::UsageSerializer.new(current_usage).serialize, + usage: ::V1::Customers::UsageSerializer.new(current_usage, includes: %i[charges_usage]).serialize, from_datetime: current_usage.from_datetime, to_datetime: current_usage.to_datetime, refreshed_at: timestamp diff --git a/app/services/invoices/customer_usage_service.rb b/app/services/invoices/customer_usage_service.rb index 4eefabfa4da..e1f81567168 100644 --- a/app/services/invoices/customer_usage_service.rb +++ b/app/services/invoices/customer_usage_service.rb @@ -2,12 +2,16 @@ module Invoices class CustomerUsageService < BaseService - def initialize(customer:, subscription:, apply_taxes: true) + def initialize(customer:, subscription:, apply_taxes: true, with_cache: true, max_to_datetime: nil) super @apply_taxes = apply_taxes @customer = customer @subscription = subscription + @with_cache = with_cache + + # NOTE: used to force charges_to_datetime boundary + @max_to_datetime = max_to_datetime end def self.with_external_ids(customer_external_id:, external_subscription_id:, organization_id:, apply_taxes: true) @@ -15,7 +19,7 @@ def self.with_external_ids(customer_external_id:, external_subscription_id:, org subscription = customer&.active_subscriptions&.find_by(external_id: external_subscription_id) new(customer:, subscription:, apply_taxes:) rescue ActiveRecord::RecordNotFound - result.not_found_failure!(resource: 'customer') + result.not_found_failure!(resource: "customer") end def self.with_ids(organization_id:, customer_id:, subscription_id:, apply_taxes: true) @@ -23,12 +27,12 @@ def self.with_ids(organization_id:, customer_id:, subscription_id:, apply_taxes: subscription = customer&.active_subscriptions&.find_by(id: subscription_id) new(customer:, subscription:, apply_taxes:) rescue ActiveRecord::RecordNotFound - result.not_found_failure!(resource: 'customer') + result.not_found_failure!(resource: "customer") end def call - return result.not_found_failure!(resource: 'customer') unless @customer - return result.not_allowed_failure!(code: 'no_active_subscription') if subscription.blank? + return result.not_found_failure!(resource: "customer") unless @customer + return result.not_allowed_failure!(code: "no_active_subscription") if subscription.blank? result.usage = compute_usage result.invoice = invoice @@ -37,8 +41,7 @@ def call private - attr_reader :invoice, :subscription, :apply_taxes - + attr_reader :invoice, :subscription, :apply_taxes, :with_cache, :max_to_datetime delegate :plan, to: :subscription delegate :organization, to: :subscription @@ -71,12 +74,12 @@ def compute_usage def add_charge_fees query = subscription.plan.charges.joins(:billable_metric) .includes(:taxes, billable_metric: :organization, filters: {values: :billable_metric_filter}) - .order(Arel.sql('lower(unaccent(billable_metrics.name)) ASC')) + .order(Arel.sql("lower(unaccent(billable_metrics.name)) ASC")) # we're capturing the context here so we can re-use inside the threads. This will correctly propagate spans to this current span context = OpenTelemetry::Context.current - invoice.fees = Parallel.flat_map(query.all, in_threads: ENV['LAGO_PARALLEL_THREADS_COUNT']&.to_i || 0) do |charge| + invoice.fees = Parallel.flat_map(query.all, in_threads: ENV["LAGO_PARALLEL_THREADS_COUNT"]&.to_i || 0) do |charge| OpenTelemetry::Context.with_current(context) do ActiveRecord::Base.connection_pool.with_connection do charge_usage(charge) @@ -90,11 +93,15 @@ def charge_usage(charge) subscription:, charge:, to_datetime: boundaries[:charges_to_datetime], - cache: !organization.clickhouse_events_store? # NOTE: Will be turned on in the future + # NOTE: Will be turned on for clickhouse in the future + cache: organization.clickhouse_events_store? ? false : with_cache ) + applied_boundaries = boundaries + applied_boundaries = applied_boundaries.merge(charges_to_datetime: max_to_datetime) if max_to_datetime + Fees::ChargeService - .call(invoice:, charge:, subscription:, boundaries:, current_usage: true, cache_middleware:) + .call(invoice:, charge:, subscription:, boundaries: applied_boundaries, current_usage: true, cache_middleware:) .raise_if_error! .fees end @@ -171,10 +178,10 @@ def compute_amounts_with_provider_taxes def provider_taxes_cache_key [ - 'provider-taxes', + "provider-taxes", subscription.id, plan.updated_at.iso8601 - ].join('/') + ].join("/") end def format_usage diff --git a/lib/tasks/daily_usages.rake b/lib/tasks/daily_usages.rake new file mode 100644 index 00000000000..6a34da35e50 --- /dev/null +++ b/lib/tasks/daily_usages.rake @@ -0,0 +1,58 @@ +# frozen_string_literal: true + +require 'timecop' + +namespace :daily_usages do + desc "Fill past daily usage" + task :fill_history, [:organization_id, :days_ago] => :environment do |_task, args| + abort "Missing organization_id\n\n" unless args[:organization_id] + + Rails.logger.level = Logger::INFO + + days_ago = (args[:days_ago] || 120).to_i.days.ago + organization = Organization.find(args[:organization_id]) + + subscriptions = organization.subscriptions + .where(status: [:active, :terminated]) + .where.not(started_at: nil) + .where('terminated_at IS NULL OR terminated_at >= ?', days_ago) + .includes(customer: :organization) + + subscriptions.find_each do |subscription| + from = subscription.started_at.to_date + if from < days_ago + from = days_ago.to_date + end + + to = (subscription.terminated_at || Time.current).to_date + + (from..to).each do |date| + datetime = date.in_time_zone(subscription.customer.applicable_timezone).beginning_of_day.utc + + next if date == Date.today && + DailyUsage.refreshed_at_in_timezone(datetime).where(subscription_id: subscription.id).exists? + + Timecop.freeze(datetime + 5.minutes) do + usage = Invoices::CustomerUsageService.call( + customer: subscription.customer, + subscription: subscription, + apply_taxes: false, + with_cache: false, + max_to_datetime: datetime + ).raise_if_error!.usage + + DailyUsage.create!( + organization:, + customer: subscription.customer, + subscription:, + external_subscription_id: subscription.external_id, + usage: ::V1::Customers::UsageSerializer.new(usage, includes: %i[charges_usage]).serialize, + from_datetime: usage.from_datetime, + to_datetime: usage.to_datetime, + refreshed_at: datetime + ) + end + end + end + end +end