diff --git a/app/controllers/analysis_jobs_controller.rb b/app/controllers/analysis_jobs_controller.rb index 62d30f89..c4898c0b 100644 --- a/app/controllers/analysis_jobs_controller.rb +++ b/app/controllers/analysis_jobs_controller.rb @@ -35,18 +35,13 @@ def new def create do_new_resource do_set_attributes(analysis_job_create_params) - - # ensure analysis_job is valid by initialising status attributes - # must occur before authorization as CanCanCan ability checks for validity - @analysis_job.update_status_attributes - do_authorize_instance if @analysis_job.save # now create and enqueue job items (which updates status attributes again) # needs to be called after save as it makes use of the analysis_job id. - @analysis_job.enqueue_items(current_user) + @analysis_job.begin_work(current_user) respond_create_success else diff --git a/app/controllers/media_controller.rb b/app/controllers/media_controller.rb index b2459da4..f62946ca 100644 --- a/app/controllers/media_controller.rb +++ b/app/controllers/media_controller.rb @@ -198,7 +198,7 @@ def create_media(media_category, files_info, generation_request) time_start_waiting = Time.now - Rails.logger.debug " media_controller#create_media: Submitted processing job for #{expected_files}" + Rails.logger.debug "media_controller#create_media: Submitted processing job for #{expected_files}" # poll disk for audio # will throw with a timeout if file does not appear on disk diff --git a/app/models/analysis_job.rb b/app/models/analysis_job.rb index 8b82643a..2a26a47e 100644 --- a/app/models/analysis_job.rb +++ b/app/models/analysis_job.rb @@ -32,8 +32,10 @@ class AnalysisJob < ActiveRecord::Base # overall_count is the number of audio_recordings/resque jobs. These should be equal. validates :overall_count, presence: true, numericality: {only_integer: true, greater_than_or_equal_to: 0} validates :overall_duration_seconds, presence: true, numericality: {only_integer: false, greater_than_or_equal_to: 0} - validates :started_at, :overall_status_modified_at, :overall_progress_modified_at, + validates :overall_status_modified_at, :overall_progress_modified_at, presence: true, timeliness: {on_or_before: lambda { Time.zone.now }, type: :datetime} + validates :started_at, allow_blank: true, allow_nil: true, timeliness: {on_or_before: lambda { Time.zone.now }, type: :datetime} + validates :overall_data_length_bytes, presence: true, numericality: {only_integer: true, greater_than_or_equal_to: 0} # job status values - completed just means all processing has finished, whether it succeeds or not. AVAILABLE_JOB_STATUS_SYMBOLS = [:new, :preparing, :processing, :suspended, :completed] @@ -49,11 +51,26 @@ class AnalysisJob < ActiveRecord::Base enumerize :overall_status, in: AVAILABLE_JOB_STATUS, predicates: true + after_initialize :initialise_job_tracking, if: Proc.new { |analysis_job| analysis_job.new_record? } + def self.filter_settings + + fields = [ + :id, :name, :description, :annotation_name, + :custom_settings, + :creator_id, :updater_id, :deleter_id, + :created_at, :updated_at, :deleted_at, + :script_id, :saved_search_id, + :started_at, + :overall_status, :overall_status_modified_at, + :overall_progress, :overall_progress_modified_at, + :overall_count, :overall_duration_seconds, :overall_data_length_bytes + ] + { - valid_fields: [:id, :name, :description, :script_id, :saved_search_id, :created_at, :creator_id, :updated_at, :updater_id], - render_fields: [:id, :name, :description, :script_id, :saved_search_id, :created_at, :creator_id, :updated_at, :updater_id], - text_fields: [], + valid_fields: fields, + render_fields: fields, + text_fields: [:name, :description, :annotation_name], custom_fields: lambda { |analysis_job, user| # do a query for the attributes that may not be in the projection @@ -107,129 +124,163 @@ def self.filter_settings } end - # Create payloads from audio recordings extracted from saved search. - # @param [User] user - # @return [Array] payloads - def saved_search_items_extract(user) - user = Access::Core.validate_user(user) + # analysis_job lifecycle: + # 1. when a new analysis job is created, the required attributes will be initialised by `initialise_job_tracking` + # the new analysis job can be saved at this point (and is saved if created via create action on controller), + # but it has not been started and no resque jobs have been enqueued + # 2. Start an analysis job by calling `begin_work`. Calling `begin_work` when :overall_status is not 'new' or analysis job has not been saved is an error + # If :overall_status is 'new', the analysis job will immediately transition to 'preparing' status, then create and enqueue resque jobs. + # 3. Once all resque jobs have been enqeued, the analysis job will transition to 'processing' status. + # 4. resque jobs will update the analysis job as resque jobs change states using `update_job_progress` + # TODO more... + + + # Update status and modified timestamp if changes are made. Does not persist changes. + # @param [Symbol, String] status + # @return [void] + def update_job_status(status) + current_status = self.overall_status.blank? ? 'new' : self.overall_status.to_s + new_status = status.blank? ? current_status : status.to_s - # TODO add logging and timing - # TODO This may need to be an async operation itself depending on how fast it runs + self.overall_status = new_status + self.overall_status_modified_at = Time.zone.now if current_status != new_status || self.overall_status_modified_at.blank? + end - # execute associated saved_search to get audio_recordings - audio_recordings_query = self.saved_search.audio_recordings_extract(user) + # Update progress and modified timestamp if changes are made. Does not persist changes. + # @param [Integer] queued_count + # @param [Integer] working_count + # @param [Integer] successful_count + # @param [Integer] failed_count + # @return [void] + def update_job_progress(queued_count = nil, working_count = nil, successful_count = nil, failed_count = nil) + current_progress = self.overall_progress + current_queued_count = current_progress.blank? ? 0 : current_progress['queued'].to_i + current_working_count = current_progress.blank? ? 0 : current_progress['working'].to_i + current_successful_count = current_progress.blank? ? 0 : current_progress['successful'].to_i + current_failed_count = current_progress.blank? ? 0 : current_progress['failed'].to_i + + new_queued_count = queued_count.blank? ? current_queued_count : queued_count.to_i + new_working_count = working_count.blank? ? current_working_count : working_count.to_i + new_successful_count = successful_count.blank? ? current_successful_count : successful_count.to_i + new_failed_count = failed_count.blank? ? current_failed_count : failed_count.to_i + + calculated_total = new_queued_count + new_working_count + new_successful_count + new_failed_count + + new_progress = { + queued: new_queued_count, + working: new_working_count, + successful: new_successful_count, + failed: new_failed_count, + total: calculated_total, + } + + self.overall_progress = new_progress + self.overall_progress_modified_at = Time.zone.now if current_progress != new_progress || self.overall_progress_modified_at.blank? + end + + def create_payload(audio_recording) + + # common payload info command_format = self.script.executable_command.to_s file_executable = '' - copy_paths = nil - config_string = self.custom_settings - job_id = self.id + copy_paths = [] + config_string = self.custom_settings.to_s + job_id = self.id.to_i - # create one payload per each audio_recording - payloads = [] - audio_recordings_query.find_each(batch_size: 1000) do |audio_recording| - payloads.push( - { - command_format: command_format, - file_executable: file_executable, - copy_paths: copy_paths, - config_string: config_string, - job_id: job_id, - - uuid: audio_recording.uuid, - id: audio_recording.id, - datetime_with_offset: audio_recording.recorded_date.iso8601(3), - original_format: audio_recording.original_format_calculated - }) - end + { + command_format: command_format, + + # TODO: where do file_executable and copy_paths come from? + file_executable: file_executable, + copy_paths: copy_paths, - payloads + config: config_string, + job_id: job_id, + + uuid: audio_recording.uuid, + id: audio_recording.id, + datetime_with_offset: audio_recording.recorded_date.iso8601(3), + original_format: audio_recording.original_format_calculated + } end - # Enqueue payloads representing audio recordings from saved search to asnc processing queue. + # Create payloads from audio recordings extracted from saved search. # @param [User] user # @return [Array] payloads - def enqueue_items(user) + def begin_work(user) + user = Access::Core.validate_user(user) + + # ensure status is 'new' and analysis job has been saved + if self.overall_status != 'new' || !self.persisted? + msg_status = self.overall_status == 'new' ? '' : " Status must be 'new', but was '#{self.overall_status}'." + msg_saved = self.persisted? ? '' : ' Analysis job has not been saved.' + fail CustomErrors::AnalysisJobStartError.new("Analysis job cannot start.#{msg_status}#{msg_saved}") + end # TODO add logging and timing # TODO This may need to be an async operation itself depending on how fast it runs - payloads = saved_search_items_extract(user) + # counters + count = 0 + duration_seconds_sum = 0 + data_length_bytes_sum = 0 + queued_count = 0 + failed_count = 0 + # update status and started timestamp + update_job_status('preparing') + self.started_at = Time.zone.now if self.started_at.blank? + self.save! + + # query associated saved_search to get audio_recordings + query = self.saved_search.audio_recordings_extract(user) + + # create one payload per each audio_recording results = [] + query.find_each(batch_size: 1000) do |audio_recording| + payload = create_payload(audio_recording) - payloads.each do |payload| + # update counters + count = count + 1 + duration_seconds_sum = duration_seconds_sum + audio_recording.duration_seconds + data_length_bytes_sum = data_length_bytes_sum + audio_recording.data_length_bytes + # Enqueue payloads representing audio recordings from saved search to asynchronous processing queue. result = nil error = nil begin result = BawWorkers::Analysis::Action.action_enqueue(payload) + queued_count = queued_count + 1 rescue => e error = e - Rails.logger(self.to_s) { e } + Rails.logger.error "An error occurred when enqueuing an analysis job item: #{e}" + failed_count = failed_count + 1 end results.push({payload: payload, result: result, error: error}) end - # update status attributes after creating and enqueuing job items - #update_status_attributes - # overall_count - # overall_duration_seconds - #started_at + # update counters, status, progress + update_job_status('processing') + # don't update progress - resque jobs may already be processing or completed + # the resque jobs can do the updating + self.overall_count = count + self.overall_duration_seconds = duration_seconds_sum + self.overall_data_length_bytes = data_length_bytes_sum + self.save! results end - # Gather current status for this analysis job, and update attributes - # Will set all required values. Uses 0 if required values not given. - def update_status_attributes( - status = nil, - queued_count = nil, working_count = nil, - successful_count = nil, failed_count = nil) + private - # set required attributes to valid values if they are not set + def initialise_job_tracking + update_job_status('new') + update_job_progress + self.overall_count = 0 if self.overall_count.blank? self.overall_duration_seconds = 0 if self.overall_duration_seconds.blank? - self.started_at = Time.zone.now if self.started_at.blank? - - # status - current_status = self.overall_status.blank? ? 'new' : self.overall_status.to_s - new_status = status.blank? ? current_status : status.to_s - - self.overall_status = new_status - self.overall_status_modified_at = Time.zone.now if current_status != new_status || self.overall_status_modified_at.blank? - - # progress - current_progress = self.overall_progress - - current_queued_count = current_progress.blank? ? 0 : current_progress['queued'].to_i - current_working_count = current_progress.blank? ? 0 : current_progress['working'].to_i - current_successful_count = current_progress.blank? ? 0 : current_progress['successful'].to_i - current_failed_count = current_progress.blank? ? 0 : current_progress['failed'].to_i - - new_queued_count = queued_count.blank? ? current_queued_count : queued_count.to_i - new_working_count = working_count.blank? ? current_working_count : working_count.to_i - new_successful_count = successful_count.blank? ? current_successful_count : successful_count.to_i - new_failed_count = failed_count.blank? ? current_failed_count : failed_count.to_i - - calculated_total = new_queued_count + new_working_count + new_successful_count + new_failed_count - - new_progress = { - queued: new_queued_count, - working: new_working_count, - successful: new_successful_count, - failed: new_failed_count, - total: calculated_total, - } - - self.overall_progress = new_progress - self.overall_progress_modified_at = Time.zone.now if current_progress != new_progress || self.overall_progress_modified_at.blank? - - # count - self.overall_count = calculated_total < 0 ? 0 : calculated_total - - self.save + self.overall_data_length_bytes = 0 if self.overall_data_length_bytes.blank? end - end diff --git a/app/views/admin/analysis_jobs/show.html.haml b/app/views/admin/analysis_jobs/show.html.haml index e117a12b..2f2547c9 100644 --- a/app/views/admin/analysis_jobs/show.html.haml +++ b/app/views/admin/analysis_jobs/show.html.haml @@ -17,7 +17,8 @@ :started_at, :overall_status, :overall_status_modified_at, :overall_progress, :overall_progress_modified_at, - :overall_count, :overall_duration_seconds, + :overall_count, + :overall_duration_seconds, :overall_data_length_bytes, :script_id, :saved_search_id, :custom_settings].each do |item| - human_ed = item.to_s.humanize diff --git a/app/views/admin/audio_recordings/show.html.haml b/app/views/admin/audio_recordings/show.html.haml index c728b530..756b9e56 100644 --- a/app/views/admin/audio_recordings/show.html.haml +++ b/app/views/admin/audio_recordings/show.html.haml @@ -40,7 +40,10 @@ - elsif sym_ed == :data_length_bytes = number_to_human_size(value) - elsif sym_ed == :site_id - = @audio_recording.site.name + - if @audio_recording.site.projects.count == 1 + = link_to @audio_recording.site.name, project_site_path(@audio_recording.site.projects.first, @audio_recording.site) + - else + = @audio_recording.site.name %small ="(#{value})" - elsif sym_ed == :notes diff --git a/config/settings/default.yml b/config/settings/default.yml index f5b71af1..14df2452 100644 --- a/config/settings/default.yml +++ b/config/settings/default.yml @@ -19,6 +19,8 @@ defaults: &defaults namespace: resque log_level: 'Logger::INFO' actions: + analysis: + queue: analysis_default harvest: # name of queue to append jobs onto queue: harvest_default diff --git a/db/migrate/20160226130353_add_overall_data_length_bytes_to_analysis_jobs_table.rb b/db/migrate/20160226130353_add_overall_data_length_bytes_to_analysis_jobs_table.rb new file mode 100644 index 00000000..8ad99ab4 --- /dev/null +++ b/db/migrate/20160226130353_add_overall_data_length_bytes_to_analysis_jobs_table.rb @@ -0,0 +1,5 @@ +class AddOverallDataLengthBytesToAnalysisJobsTable < ActiveRecord::Migration + def change + add_column :analysis_jobs, :overall_data_length_bytes, :integer, limit: 8, null: false, default: 0 + end +end diff --git a/db/structure.sql b/db/structure.sql index 4b10779c..0703e3ab 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -53,7 +53,8 @@ CREATE TABLE analysis_jobs ( overall_progress text NOT NULL, overall_progress_modified_at timestamp without time zone NOT NULL, overall_count integer NOT NULL, - overall_duration_seconds numeric(14,4) NOT NULL + overall_duration_seconds numeric(14,4) NOT NULL, + overall_data_length_bytes bigint DEFAULT 0 NOT NULL ); @@ -1772,3 +1773,5 @@ INSERT INTO schema_migrations (version) VALUES ('20150905234917'); INSERT INTO schema_migrations (version) VALUES ('20160226103516'); +INSERT INTO schema_migrations (version) VALUES ('20160226130353'); + diff --git a/lib/modules/custom_errors.rb b/lib/modules/custom_errors.rb index 68949b29..3c8e146d 100644 --- a/lib/modules/custom_errors.rb +++ b/lib/modules/custom_errors.rb @@ -2,6 +2,7 @@ module CustomErrors public class RoutingArgumentError < ArgumentError; end class ItemNotFoundError < StandardError; end + class AnalysisJobStartError < StandardError; end class RequestedMediaTypeError < StandardError attr_reader :available_formats_info def initialize(message = nil, available_formats_info = nil) diff --git a/spec/acceptance/media_spec.rb b/spec/acceptance/media_spec.rb index 56515d98..62f59407 100644 --- a/spec/acceptance/media_spec.rb +++ b/spec/acceptance/media_spec.rb @@ -1,6 +1,7 @@ require 'rails_helper' require 'rspec_api_documentation/dsl' require 'helpers/acceptance_spec_helper' +require 'helpers/resque_helper' # https://github.com/zipmark/rspec_api_documentation resource 'Media' do diff --git a/spec/factories/analysis_job_factory.rb b/spec/factories/analysis_job_factory.rb index 1fb881e5..2d1ab10a 100644 --- a/spec/factories/analysis_job_factory.rb +++ b/spec/factories/analysis_job_factory.rb @@ -11,6 +11,7 @@ overall_progress { { queued: 1, working: 0, success: 0, failed: 0, total: 1}.to_json } overall_count 1 overall_duration_seconds 60 + overall_data_length_bytes 1024 started_at { Time.zone.now } overall_status_modified_at { Time.zone.now } diff --git a/spec/helpers/acceptance_spec_helper.rb b/spec/helpers/acceptance_spec_helper.rb index 032a25be..49e69670 100644 --- a/spec/helpers/acceptance_spec_helper.rb +++ b/spec/helpers/acceptance_spec_helper.rb @@ -530,35 +530,6 @@ def create_media_options(audio_recording) options end -def emulate_resque_worker_with_job(job_class, job_args, opts={}) - # see http://stackoverflow.com/questions/5141378/how-to-bridge-the-testing-using-resque-with-rspec-examples - queue = opts[:queue] || 'test_queue' - - Resque::Job.create(queue, job_class, *job_args) - - emulate_resque_worker(queue, opts[:verbose], opts[:fork]) -end - -def emulate_resque_worker(queue, verbose, fork) - queue = queue || 'test_queue' - - worker = Resque::Worker.new(queue) - worker.very_verbose = true if verbose - - if fork - # do a single job then shutdown - def worker.done_working - super - shutdown - end - - worker.work(0.5) - else - job = worker.reserve - worker.perform(job) - end -end - def process_custom(method, path, params = {}, headers ={}) do_request(method, path, params, headers) document_example(method.to_s.upcase, path) diff --git a/spec/helpers/resque_helper.rb b/spec/helpers/resque_helper.rb new file mode 100644 index 00000000..895ef909 --- /dev/null +++ b/spec/helpers/resque_helper.rb @@ -0,0 +1,28 @@ +def emulate_resque_worker_with_job(job_class, job_args, opts={}) + # see http://stackoverflow.com/questions/5141378/how-to-bridge-the-testing-using-resque-with-rspec-examples + queue = opts[:queue] || 'test_queue' + + Resque::Job.create(queue, job_class, *job_args) + + emulate_resque_worker(queue, opts[:verbose], opts[:fork]) +end + +def emulate_resque_worker(queue, verbose, fork) + queue = queue || 'test_queue' + + worker = Resque::Worker.new(queue) + worker.very_verbose = true if verbose + + if fork + # do a single job then shutdown + def worker.done_working + super + shutdown + end + + worker.work(0.5) + else + job = worker.reserve + worker.perform(job) + end +end \ No newline at end of file diff --git a/spec/models/analysis_job_spec.rb b/spec/models/analysis_job_spec.rb index df550388..35b690ed 100644 --- a/spec/models/analysis_job_spec.rb +++ b/spec/models/analysis_job_spec.rb @@ -1,4 +1,5 @@ require 'rails_helper' +require 'helpers/resque_helper' describe AnalysisJob, type: :model do it 'has a valid factory' do @@ -38,7 +39,7 @@ expect(subject.errors[:script].size).to eq(1) expect(subject.errors[:script].to_s).to match(/must exist as an object or foreign key/) end - + it { is_expected.to validate_presence_of(:custom_settings) } it 'is invalid without a custom_settings' do expect(build(:analysis_job, custom_settings: nil)).not_to be_valid @@ -70,20 +71,46 @@ ss = create(:saved_search, creator: user, stored_query: {id: {in: [audio_recording_2.id]}}) s = create(:script, creator: user, verified: true) - aj = build(:analysis_job, creator: user, script: s, saved_search: ss, ) - - result = aj.saved_search_items_extract(user) - - # TODO compare to entire expected payload hash + aj = create(:analysis_job, creator: user, script: s, saved_search: ss) + payload = aj.create_payload(audio_recording_2) + result = aj.begin_work(user) + + # ensure result is as expected expect(result.size).to eq(1) expect(result[0].is_a?(Hash)).to be_truthy - expect(result[0][:command_format]).to eq(aj.script.executable_command) - + expect(result[0][:payload][:command_format]).to eq(aj.script.executable_command) + expect(result[0][:error]).to be_blank + expect(result[0][:payload]).to eq(payload) + expect(result[0][:result].is_a?(String)).to be_truthy + expect(result[0][:result].size).to eq(32) end it 'enqueues and processes payloads' do + project_1 = create(:project) + user = project_1.creator + site_1 = create(:site, projects: [project_1], creator: user) + + create(:audio_recording, site: site_1, creator: user, uploader: user) + + project_2 = create(:project, creator: user) + site_2 = create(:site, projects: [project_2], creator: user) + audio_recording_2 = create(:audio_recording, site: site_2, creator: user, uploader: user) + + ss = create(:saved_search, creator: user, stored_query: {id: {in: [audio_recording_2.id]}}) + s = create(:script, creator: user, verified: true) + + aj = create(:analysis_job, creator: user, script: s, saved_search: ss) + + payload = aj.create_payload(audio_recording_2) + result = aj.begin_work(user) + + queue_name = Settings.actions.analysis.queue + expect(Resque.size(queue_name)).to eq(1) + + emulate_resque_worker(queue_name, false, true) + expect(Resque.size(queue_name)).to eq(0) end end