Skip to content

Commit

Permalink
WIP; catch change counts before import
Browse files Browse the repository at this point in the history
  • Loading branch information
eanders committed Jan 27, 2025
1 parent 263f208 commit 4585241
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 67 deletions.
2 changes: 1 addition & 1 deletion app/mailers/notify_user.rb
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def new_account_created(new_user)

def import_processing
@user = params[:user]
@import = params[:import]
@import = params[:import_log_id]
@data_source = params[:data_source]
@error = params[:error]
@count = params[:count]
Expand Down
3 changes: 2 additions & 1 deletion app/models/concerns/service_history/builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ def wait_for_processing(interval: 30, max_wait_seconds: DEFAULT_MAX_WAIT_SECONDS
Delayed::Worker.new.work_off(2)
else
started = Time.current
while builder_batch_job_scope.exists?
while builder_batch_job_scope.any?
break if (Time.current - started) > max_wait_seconds
break if builder_batch_job_scope.empty?

sleep(interval)
end
Expand Down
4 changes: 2 additions & 2 deletions app/models/grda_warehouse/data_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ def self.options_for_select(user:)
end
end

delegate :error_count_threshold_reached?, to: :import_threshold
delegate :error_count_threshold_reached?, to: :import_threshold, allow_nil: nil
##
# Determines whether imports should be paused based on error thresholds.
#
Expand Down Expand Up @@ -610,7 +610,7 @@ def ever_pause_imports?
import_threshold.error_count_min_threshold.present? && import_threshold.error_percent_threshold.present?
end

delegate :record_count_threshold_reached?, to: :import_threshold
delegate :record_count_threshold_reached?, to: :import_threshold, allow_nil: nil
##
# Determines whether imports should be paused based on record count change thresholds.
#
Expand Down
1 change: 1 addition & 0 deletions app/models/grda_warehouse/import_log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def import_time(details: false)
'processing...'
end
end

# Overrides some methods, so must be included at the end
include RailsDrivers::Extensions
end
62 changes: 35 additions & 27 deletions app/models/grda_warehouse/import_threshold.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,44 +75,52 @@ def error_count_threshold_reached?(total, count)
# 2. Those that have notifications for count changes
# 3. Those that have both
def send_status_notifications(import_log_id, error_threshold_met, record_count_threshold_met, paused)
return unless error_threshold_met || record_count_threshold_met

receive_both_user_ids = error_count_notification_user_ids & record_change_count_notification_user_ids

# Notify where the user receives both notifications
User.where(id: receive_both_user_ids).find_each do |user|
NotifyUser.with(
user: user,
import_log_id: import_log_id,
data_source: data_source,
error: error_threshold_met,
count: record_count_threshold_met,
paused: paused,
).import_processing.deliver_later
if error_threshold_met || record_count_threshold_met
User.where(id: receive_both_user_ids).find_each do |user|
NotifyUser.with(
user: user,
import_log_id: import_log_id,
data_source: data_source,
error: error_threshold_met,
count: record_count_threshold_met,
paused: paused,
).import_processing.deliver_later
end
end

only_error_user_ids = error_count_notification_user_ids - receive_both_user_ids
# Notify where the user receives only the error notification
User.where(id: only_error_user_ids).find_each do |user|
NotifyUser.with(
user: user,
import_log_id: import_log_id,
data_source: data_source,
error: error_threshold_met,
count: false, # never notify on counts in this scenario
paused: paused,
).import_processing.deliver_later
if error_threshold_met
User.where(id: only_error_user_ids).find_each do |user|
NotifyUser.with(
user: user,
import_log_id: import_log_id,
data_source: data_source,
error: error_threshold_met,
count: false, # never notify on counts in this scenario
paused: paused,
).import_processing.deliver_later
end
end

only_count_user_ids = record_change_count_notification_user_ids - receive_both_user_ids
# Notify where the user receives only the record count notification
User.where(id: only_count_user_ids).find_each do |user|
NotifyUser.with(
user: user,
import_log_id: import_log_id,
data_source: data_source,
error: false, # never notify on errors in this scenario
count: record_count_threshold_met,
paused: paused,
).import_processing.deliver_later
if record_count_threshold_met # rubocop:disable Style/GuardClause
User.where(id: only_count_user_ids).find_each do |user|
NotifyUser.with(
user: user,
import_log_id: import_log_id,
data_source: data_source,
error: false, # never notify on errors in this scenario
count: record_count_threshold_met,
paused: paused,
).import_processing.deliver_later
end
end
end

Expand Down
8 changes: 6 additions & 2 deletions app/views/import_thresholds/show.haml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@

= simple_form_for import_threshold, url: data_source_import_threshold_path, method: :patch do |f|
.well
%h2 Changes in Record Counts
%p The following settings apply when an import significantly changes the number of rows in the data that already exists in the warehouse for the data source. This will look at the aggregate change, so if 100 rows are removed and 100 added, no change is indicated.
.row
.col
= f.input :record_count_change_percent_threshold, label: 'Record count change threshold (%)', hint: 'Imports will pause and alert if more than the chosen percentage have changed', input_html: { style: 'width: 5em' }
.col
= f.input :record_count_change_min_threshold, label: 'Minium change count threshold', hint: 'Imports will not pause or alert below this threshold. This is designed to prevent alerting on a 30% change when only on of 3 rows changes.', input_html: { style: 'width: 8em' }
= f.input :record_count_change_min_threshold, label: 'Minium change count threshold', hint: 'Imports will not pause or alert below this threshold. This is designed to prevent alerting on a 30% change when only 1 of 3 rows changes.', input_html: { style: 'width: 8em' }

= f.input :pause_on_record_count_threshold, label: 'Pause if threshold is met?', hint: 'When the above threshold is met, should the import be paused? Notifications will be sent even if the import is not paused.'
- slug = import_threshold.record_count_change_notification_slug
Expand All @@ -21,11 +23,13 @@
%i.icon-plus
Add Notification
.well
%h2 Error Counts
%p The following settings apply when an import contains a significant number of errors or validation issues. The determination of change is completely contained to the import in question, looking at the errors in a given table against the number of records being processed.
.row
.col
= f.input :error_percent_threshold, label: 'Error count threshold (%)', hint: 'Imports will pause and alert if more than the chosen percentage of rows have errors', input_html: { style: 'width: 5em' }
.col
= f.input :error_count_min_threshold, label: 'Minium error count threshold', hint: 'Imports will not pause or alert below this threshold. This is designed to prevent alerting on a 30% error rate when only on of 3 rows contains an error.', input_html: { style: 'width: 8em' }
= f.input :error_count_min_threshold, label: 'Minium error count threshold', hint: 'Imports will not pause or alert below this threshold. This is designed to prevent alerting on a 30% error rate when only 1 of 3 rows contains an error.', input_html: { style: 'width: 8em' }
= f.input :pause_on_error_threshold, label: 'Pause if threshold is met?', hint: 'When the above threshold is met, should the import be paused? Notifications will be sent even if the import is not paused.'
- slug = import_threshold.error_count_notification_slug
- if import_threshold.items_for(slug).any?
Expand Down
6 changes: 0 additions & 6 deletions app/views/notify_user/import_processing.html.haml
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
%h2
A new account has been created for #{@new_user.name_with_email} in the #{Translation.translate('Open Path HMIS Warehouse')}.

%p
= link_to 'View account', edit_admin_user_url(@new_user)

%p
An import in the #{@data_source.name} data source has triggered notifications for the following reason(s):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ def resume!
return false unless @data_source.ever_pause_imports? # should we ever pause?
return true unless @loader_log.status == 'loaded'

any_error_thresholds_met? || any_record_count_thresholds_met?
return true if any_error_thresholds_met? && @data_source.ever_pause_imports_with_errors?

any_record_count_thresholds_met? && @data_source.ever_pause_imports_with_record_changes?
end

##
Expand All @@ -140,9 +142,9 @@ def resume!
totals_by_filename = @loader_log.summary.map { |filename, data| [filename, data['total_lines'].to_i] }.to_h

totals_by_filename.each do |filename, total_count|
pause = @data_source.pause_imports_with_errors?(total: total_count, errors: error_counts[filename])
met = @data_source.error_count_threshold_reached?(total_count, error_counts[filename])

return true if pause
return true if met
end
# No threshold met
false
Expand All @@ -161,9 +163,9 @@ def resume!
change_counts.each_value do |data|
total = data[:total_count]
changes = data[:change_count]
pause = @data_source.pause_imports_with_record_count_changes?(total: total, changes: changes)
met = @data_source.record_count_threshold_reached?(total, changes)

return true if pause
return true if met
end
# No threshold met
false
Expand Down Expand Up @@ -228,35 +230,38 @@ def resume!
# This method calculates changes in records (additions and removals) for each importable file.
# It determines the total number of changes by comparing additions and removals and associates
# the changes with the corresponding file. Additionally, it retrieves the total record count
# for each file to provide more context about the scale of the changes.
# for of existing data that matches each file to provide more context about the scale of the changes.
#
# @return [Hash{String => Hash{Symbol => Integer}}] A hash where keys are filenames, and values
# are hashes containing:
# - `:change_count` (Integer): The absolute value of the difference between additions and removals.
# - `:total_count` (Integer): The total number of records for the corresponding file.
#
private def change_counts
# hash of filenames to class names {"Client.csv" => "Client"}
file_lookup = self.class.importable_files_map

summary = @importer_log.summary
# Initialize change counts grouped by filename
# A change is the absolute value of the number of additions minus the number of removals
changes = summary.map do |filename, data|
# This makes an estimate of the changes that will occur as it needs to be done before the
# actual processing so that it can pause the import if necessary.
importable_files.map do |filename, klass|
warehouse_scope = klass.existing_data(
data_source_id: data_source.id,
project_ids: involved_project_ids,
date_range: date_range,
)
existing_data = warehouse_scope.pluck(klass.hud_key).to_set
incoming_data = klass.incoming_data(importer_log_id: importer_log.id).
pluck(klass.hud_key).to_set

to_add = (incoming_data - existing_data).count
to_remove = (existing_data - incoming_data).count
[
filename,
{
change_count: (data['added'].to_i - data['removed'].to_i).abs,
change_count: (to_add - to_remove).abs,
total_count: existing_data.count,
},
]
end.to_h
file_lookup.each do |filename, klass_name|
klass = "GrdaWarehouse::Hud::#{klass_name}".constantize
next unless changes[filename]

changes[filename][:total_count] = klass.where(data_source_id: @data_source.id).count
end
changes
end

##
Expand Down Expand Up @@ -950,13 +955,13 @@ def complete_import
importer_log.status = :complete
importer_log.completed_at = Time.current
importer_log.upload_id = @upload.id if @upload.present?
importer_log.save
data_source.update(last_imported_at: Time.current)
importer_log.save!
data_source.update!(last_imported_at: Time.current)
elapsed = importer_log.completed_at - @started_at
Rails.logger.tagged({ task_name: 'HMIS CSV Importer', repeating_task: true, task_runtime: elapsed }) do
log("Completed importing in #{elapsed_time(elapsed)} #{hash_as_log_str log_ids}. #{summary_as_log_str(importer_log.summary)}")
end
@import_log&.update(importer_log: importer_log)
@import_log&.update!(importer_log: importer_log)
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ def import_time
end
end

def post_processing_time(importer)
return unless completed_at
return unless importer&.completed_at

seconds = ((importer.completed_at - completed_at) / 1.minute).round * 60
"#{distance_of_time_in_words(seconds)} -#{importer.completed_at.strftime('%l:%M %P')} to #{completed_at.strftime('%l:%M %P')}"
end

def any_errors_or_validations?
import_errors.exists? || import_validations.exists?
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@
- status = @import.import_time(details: true)
- ds_name = @import.data_source.name

%h1
= Translation.translate("Import Log from #{ds_name}")
- if @import.importer_log&.paused?
.alert.alert-warning
%i.icon-warning.mr-2
%p #{ds_name} is currently set to prevent imports from loading if they contain any errors. This import contained at least one error, which you can review below. If the errors are unacceptable, fix them in the HMIS CSV files and re-upload.
%h1= Translation.translate("Import Log from #{ds_name}")
= render 'hmis_csv_importer/import_logs/import_threshold_description'
.row
.col
.card.mb-4
Expand All @@ -24,6 +20,9 @@
%tr
%th Imported
%td= import_time
%tr
%th Post-Processing
%td= @import&.importer_log&.post_processing_time(@import)
%tr
%th Completed In
%td
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
- if @import.importer_log&.paused? && @import.data_source&.import_threshold
.alert.alert-warning
%i.icon-warning.mr-2
.d-block
%p
%strong #{@import.data_source.name}
is currently set to prevent pause imports if the following conditions are met:
- threshold = @import.data_source.import_threshold
%ul
- if threshold.pause_on_error_threshold
%li If more than #{pluralize(threshold.error_count_min_threshold, 'record')} contain errors and constitute more than #{threshold.error_percent_threshold}% of the incoming records
- if threshold.pause_on_record_count_threshold
%li If more than #{pluralize(threshold.record_count_change_min_threshold, 'record')} change that would cause a change of more than #{threshold.record_count_change_percent_threshold}% of the data in any given table

0 comments on commit 4585241

Please sign in to comment.