Skip to content

Commit

Permalink
Added overall_data_length_bytes to analysis_job. Closes #256
Browse files Browse the repository at this point in the history
Also reworked analysis_job lifecycle up to enqueing resque jobs to be more obvious.
All analysis_job attributes are now available in api.
  • Loading branch information
cofiem committed Feb 26, 2016
1 parent 43da10a commit 0b9c26d
Show file tree
Hide file tree
Showing 14 changed files with 227 additions and 138 deletions.
7 changes: 1 addition & 6 deletions app/controllers/analysis_jobs_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,13 @@ def new
def create
do_new_resource
do_set_attributes(analysis_job_create_params)

# ensure analysis_job is valid by initialising status attributes
# must occur before authorization as CanCanCan ability checks for validity
@analysis_job.update_status_attributes

do_authorize_instance

if @analysis_job.save

# now create and enqueue job items (which updates status attributes again)
# needs to be called after save as it makes use of the analysis_job id.
@analysis_job.enqueue_items(current_user)
@analysis_job.begin_work(current_user)

respond_create_success
else
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/media_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def create_media(media_category, files_info, generation_request)

time_start_waiting = Time.now

Rails.logger.debug " media_controller#create_media: Submitted processing job for #{expected_files}"
Rails.logger.debug "media_controller#create_media: Submitted processing job for #{expected_files}"

# poll disk for audio
# will throw with a timeout if file does not appear on disk
Expand Down
233 changes: 142 additions & 91 deletions app/models/analysis_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ class AnalysisJob < ActiveRecord::Base
# overall_count is the number of audio_recordings/resque jobs. These should be equal.
validates :overall_count, presence: true, numericality: {only_integer: true, greater_than_or_equal_to: 0}
validates :overall_duration_seconds, presence: true, numericality: {only_integer: false, greater_than_or_equal_to: 0}
validates :started_at, :overall_status_modified_at, :overall_progress_modified_at,
validates :overall_status_modified_at, :overall_progress_modified_at,
presence: true, timeliness: {on_or_before: lambda { Time.zone.now }, type: :datetime}
validates :started_at, allow_blank: true, allow_nil: true, timeliness: {on_or_before: lambda { Time.zone.now }, type: :datetime}
validates :overall_data_length_bytes, presence: true, numericality: {only_integer: true, greater_than_or_equal_to: 0}

# job status values - completed just means all processing has finished, whether it succeeds or not.
AVAILABLE_JOB_STATUS_SYMBOLS = [:new, :preparing, :processing, :suspended, :completed]
Expand All @@ -49,11 +51,26 @@ class AnalysisJob < ActiveRecord::Base

enumerize :overall_status, in: AVAILABLE_JOB_STATUS, predicates: true

after_initialize :initialise_job_tracking, if: Proc.new { |analysis_job| analysis_job.new_record? }

def self.filter_settings

fields = [
:id, :name, :description, :annotation_name,
:custom_settings,
:creator_id, :updater_id, :deleter_id,
:created_at, :updated_at, :deleted_at,
:script_id, :saved_search_id,
:started_at,
:overall_status, :overall_status_modified_at,
:overall_progress, :overall_progress_modified_at,
:overall_count, :overall_duration_seconds, :overall_data_length_bytes
]

{
valid_fields: [:id, :name, :description, :script_id, :saved_search_id, :created_at, :creator_id, :updated_at, :updater_id],
render_fields: [:id, :name, :description, :script_id, :saved_search_id, :created_at, :creator_id, :updated_at, :updater_id],
text_fields: [],
valid_fields: fields,
render_fields: fields,
text_fields: [:name, :description, :annotation_name],
custom_fields: lambda { |analysis_job, user|

# do a query for the attributes that may not be in the projection
Expand Down Expand Up @@ -107,129 +124,163 @@ def self.filter_settings
}
end

# Create payloads from audio recordings extracted from saved search.
# @param [User] user
# @return [Array<Hash>] payloads
def saved_search_items_extract(user)
user = Access::Core.validate_user(user)
# analysis_job lifecycle:
# 1. when a new analysis job is created, the required attributes will be initialised by `initialise_job_tracking`
# the new analysis job can be saved at this point (and is saved if created via create action on controller),
# but it has not been started and no resque jobs have been enqueued
# 2. Start an analysis job by calling `begin_work`. Calling `begin_work` when :overall_status is not 'new' or analysis job has not been saved is an error
# If :overall_status is 'new', the analysis job will immediately transition to 'preparing' status, then create and enqueue resque jobs.
# 3. Once all resque jobs have been enqeued, the analysis job will transition to 'processing' status.
# 4. resque jobs will update the analysis job as resque jobs change states using `update_job_progress`
# TODO more...


# Update status and modified timestamp if changes are made. Does not persist changes.
# @param [Symbol, String] status
# @return [void]
def update_job_status(status)
current_status = self.overall_status.blank? ? 'new' : self.overall_status.to_s
new_status = status.blank? ? current_status : status.to_s

# TODO add logging and timing
# TODO This may need to be an async operation itself depending on how fast it runs
self.overall_status = new_status
self.overall_status_modified_at = Time.zone.now if current_status != new_status || self.overall_status_modified_at.blank?
end

# execute associated saved_search to get audio_recordings
audio_recordings_query = self.saved_search.audio_recordings_extract(user)
# Update progress and modified timestamp if changes are made. Does not persist changes.
# @param [Integer] queued_count
# @param [Integer] working_count
# @param [Integer] successful_count
# @param [Integer] failed_count
# @return [void]
def update_job_progress(queued_count = nil, working_count = nil, successful_count = nil, failed_count = nil)
current_progress = self.overall_progress

current_queued_count = current_progress.blank? ? 0 : current_progress['queued'].to_i
current_working_count = current_progress.blank? ? 0 : current_progress['working'].to_i
current_successful_count = current_progress.blank? ? 0 : current_progress['successful'].to_i
current_failed_count = current_progress.blank? ? 0 : current_progress['failed'].to_i

new_queued_count = queued_count.blank? ? current_queued_count : queued_count.to_i
new_working_count = working_count.blank? ? current_working_count : working_count.to_i
new_successful_count = successful_count.blank? ? current_successful_count : successful_count.to_i
new_failed_count = failed_count.blank? ? current_failed_count : failed_count.to_i

calculated_total = new_queued_count + new_working_count + new_successful_count + new_failed_count

new_progress = {
queued: new_queued_count,
working: new_working_count,
successful: new_successful_count,
failed: new_failed_count,
total: calculated_total,
}

self.overall_progress = new_progress
self.overall_progress_modified_at = Time.zone.now if current_progress != new_progress || self.overall_progress_modified_at.blank?
end

def create_payload(audio_recording)

# common payload info
command_format = self.script.executable_command.to_s
file_executable = ''
copy_paths = nil
config_string = self.custom_settings
job_id = self.id
copy_paths = []
config_string = self.custom_settings.to_s
job_id = self.id.to_i

# create one payload per each audio_recording
payloads = []
audio_recordings_query.find_each(batch_size: 1000) do |audio_recording|
payloads.push(
{
command_format: command_format,
file_executable: file_executable,
copy_paths: copy_paths,
config_string: config_string,
job_id: job_id,

uuid: audio_recording.uuid,
id: audio_recording.id,
datetime_with_offset: audio_recording.recorded_date.iso8601(3),
original_format: audio_recording.original_format_calculated
})
end
{
command_format: command_format,

# TODO: where do file_executable and copy_paths come from?
file_executable: file_executable,
copy_paths: copy_paths,

payloads
config: config_string,
job_id: job_id,

uuid: audio_recording.uuid,
id: audio_recording.id,
datetime_with_offset: audio_recording.recorded_date.iso8601(3),
original_format: audio_recording.original_format_calculated
}
end

# Enqueue payloads representing audio recordings from saved search to asnc processing queue.
# Create payloads from audio recordings extracted from saved search.
# @param [User] user
# @return [Array<Hash>] payloads
def enqueue_items(user)
def begin_work(user)
user = Access::Core.validate_user(user)

# ensure status is 'new' and analysis job has been saved
if self.overall_status != 'new' || !self.persisted?
msg_status = self.overall_status == 'new' ? '' : " Status must be 'new', but was '#{self.overall_status}'."
msg_saved = self.persisted? ? '' : ' Analysis job has not been saved.'
fail CustomErrors::AnalysisJobStartError.new("Analysis job cannot start.#{msg_status}#{msg_saved}")
end

# TODO add logging and timing
# TODO This may need to be an async operation itself depending on how fast it runs

payloads = saved_search_items_extract(user)
# counters
count = 0
duration_seconds_sum = 0
data_length_bytes_sum = 0
queued_count = 0
failed_count = 0

# update status and started timestamp
update_job_status('preparing')
self.started_at = Time.zone.now if self.started_at.blank?
self.save!

# query associated saved_search to get audio_recordings
query = self.saved_search.audio_recordings_extract(user)

# create one payload per each audio_recording
results = []
query.find_each(batch_size: 1000) do |audio_recording|
payload = create_payload(audio_recording)

payloads.each do |payload|
# update counters
count = count + 1
duration_seconds_sum = duration_seconds_sum + audio_recording.duration_seconds
data_length_bytes_sum = data_length_bytes_sum + audio_recording.data_length_bytes

# Enqueue payloads representing audio recordings from saved search to asynchronous processing queue.
result = nil
error = nil

begin
result = BawWorkers::Analysis::Action.action_enqueue(payload)
queued_count = queued_count + 1
rescue => e
error = e
Rails.logger(self.to_s) { e }
Rails.logger.error "An error occurred when enqueuing an analysis job item: #{e}"
failed_count = failed_count + 1
end

results.push({payload: payload, result: result, error: error})
end

# update status attributes after creating and enqueuing job items
#update_status_attributes
# overall_count
# overall_duration_seconds
#started_at
# update counters, status, progress
update_job_status('processing')
# don't update progress - resque jobs may already be processing or completed
# the resque jobs can do the updating
self.overall_count = count
self.overall_duration_seconds = duration_seconds_sum
self.overall_data_length_bytes = data_length_bytes_sum
self.save!

results
end

# Gather current status for this analysis job, and update attributes
# Will set all required values. Uses 0 if required values not given.
def update_status_attributes(
status = nil,
queued_count = nil, working_count = nil,
successful_count = nil, failed_count = nil)
private

# set required attributes to valid values if they are not set
def initialise_job_tracking
update_job_status('new')
update_job_progress
self.overall_count = 0 if self.overall_count.blank?
self.overall_duration_seconds = 0 if self.overall_duration_seconds.blank?
self.started_at = Time.zone.now if self.started_at.blank?

# status
current_status = self.overall_status.blank? ? 'new' : self.overall_status.to_s
new_status = status.blank? ? current_status : status.to_s

self.overall_status = new_status
self.overall_status_modified_at = Time.zone.now if current_status != new_status || self.overall_status_modified_at.blank?

# progress
current_progress = self.overall_progress

current_queued_count = current_progress.blank? ? 0 : current_progress['queued'].to_i
current_working_count = current_progress.blank? ? 0 : current_progress['working'].to_i
current_successful_count = current_progress.blank? ? 0 : current_progress['successful'].to_i
current_failed_count = current_progress.blank? ? 0 : current_progress['failed'].to_i

new_queued_count = queued_count.blank? ? current_queued_count : queued_count.to_i
new_working_count = working_count.blank? ? current_working_count : working_count.to_i
new_successful_count = successful_count.blank? ? current_successful_count : successful_count.to_i
new_failed_count = failed_count.blank? ? current_failed_count : failed_count.to_i

calculated_total = new_queued_count + new_working_count + new_successful_count + new_failed_count

new_progress = {
queued: new_queued_count,
working: new_working_count,
successful: new_successful_count,
failed: new_failed_count,
total: calculated_total,
}

self.overall_progress = new_progress
self.overall_progress_modified_at = Time.zone.now if current_progress != new_progress || self.overall_progress_modified_at.blank?

# count
self.overall_count = calculated_total < 0 ? 0 : calculated_total

self.save
self.overall_data_length_bytes = 0 if self.overall_data_length_bytes.blank?
end

end
3 changes: 2 additions & 1 deletion app/views/admin/analysis_jobs/show.html.haml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
:started_at,
:overall_status, :overall_status_modified_at,
:overall_progress, :overall_progress_modified_at,
:overall_count, :overall_duration_seconds,
:overall_count,
:overall_duration_seconds, :overall_data_length_bytes,
:script_id, :saved_search_id,
:custom_settings].each do |item|
- human_ed = item.to_s.humanize
Expand Down
5 changes: 4 additions & 1 deletion app/views/admin/audio_recordings/show.html.haml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@
- elsif sym_ed == :data_length_bytes
= number_to_human_size(value)
- elsif sym_ed == :site_id
= @audio_recording.site.name
- if @audio_recording.site.projects.count == 1
= link_to @audio_recording.site.name, project_site_path(@audio_recording.site.projects.first, @audio_recording.site)
- else
= @audio_recording.site.name
%small
="(#{value})"
- elsif sym_ed == :notes
Expand Down
2 changes: 2 additions & 0 deletions config/settings/default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ defaults: &defaults
namespace: resque
log_level: 'Logger::INFO'
actions:
analysis:
queue: analysis_default
harvest:
# name of queue to append jobs onto
queue: harvest_default
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class AddOverallDataLengthBytesToAnalysisJobsTable < ActiveRecord::Migration
def change
add_column :analysis_jobs, :overall_data_length_bytes, :integer, limit: 8, null: false, default: 0
end
end
5 changes: 4 additions & 1 deletion db/structure.sql
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ CREATE TABLE analysis_jobs (
overall_progress text NOT NULL,
overall_progress_modified_at timestamp without time zone NOT NULL,
overall_count integer NOT NULL,
overall_duration_seconds numeric(14,4) NOT NULL
overall_duration_seconds numeric(14,4) NOT NULL,
overall_data_length_bytes bigint DEFAULT 0 NOT NULL
);


Expand Down Expand Up @@ -1772,3 +1773,5 @@ INSERT INTO schema_migrations (version) VALUES ('20150905234917');

INSERT INTO schema_migrations (version) VALUES ('20160226103516');

INSERT INTO schema_migrations (version) VALUES ('20160226130353');

1 change: 1 addition & 0 deletions lib/modules/custom_errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module CustomErrors
public
class RoutingArgumentError < ArgumentError; end
class ItemNotFoundError < StandardError; end
class AnalysisJobStartError < StandardError; end
class RequestedMediaTypeError < StandardError
attr_reader :available_formats_info
def initialize(message = nil, available_formats_info = nil)
Expand Down
1 change: 1 addition & 0 deletions spec/acceptance/media_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'rails_helper'
require 'rspec_api_documentation/dsl'
require 'helpers/acceptance_spec_helper'
require 'helpers/resque_helper'

# https://github.com/zipmark/rspec_api_documentation
resource 'Media' do
Expand Down
1 change: 1 addition & 0 deletions spec/factories/analysis_job_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
overall_progress { { queued: 1, working: 0, success: 0, failed: 0, total: 1}.to_json }
overall_count 1
overall_duration_seconds 60
overall_data_length_bytes 1024

started_at { Time.zone.now }
overall_status_modified_at { Time.zone.now }
Expand Down
Loading

0 comments on commit 0b9c26d

Please sign in to comment.