From 0430490c1272eab6a0cfdc6f19b713f3b9b369f7 Mon Sep 17 00:00:00 2001 From: Joey Parrish Date: Thu, 20 Jul 2023 14:27:29 -0700 Subject: [PATCH] feat: Port app/job_manager to cmake and absl Issue #1047 (cmake) Issue #346 (absl) --- packager/CMakeLists.txt | 1 + packager/app/job_manager.cc | 116 ++++++++++++--------- packager/app/job_manager.h | 73 ++++++++----- packager/app/single_thread_job_manager.cc | 13 +-- packager/app/single_thread_job_manager.h | 2 +- packager/media/chunking/CMakeLists.txt | 12 +-- packager/media/codecs/CMakeLists.txt | 12 +-- packager/media/crypto/CMakeLists.txt | 4 +- packager/media/event/CMakeLists.txt | 2 +- packager/media/formats/webm/CMakeLists.txt | 2 +- 10 files changed, 137 insertions(+), 100 deletions(-) diff --git a/packager/CMakeLists.txt b/packager/CMakeLists.txt index c614f702504..76fbf93904b 100644 --- a/packager/CMakeLists.txt +++ b/packager/CMakeLists.txt @@ -59,3 +59,4 @@ add_subdirectory(third_party) add_subdirectory(tools) add_subdirectory(utils) add_subdirectory(version) +add_subdirectory(app) diff --git a/packager/app/job_manager.cc b/packager/app/job_manager.cc index bcc4e06a03b..df9411fa6ad 100644 --- a/packager/app/job_manager.cc +++ b/packager/app/job_manager.cc @@ -6,28 +6,50 @@ #include "packager/app/job_manager.h" -#include "packager/app/libcrypto_threading.h" +#include +#include + #include "packager/media/chunking/sync_point_queue.h" #include "packager/media/origin/origin_handler.h" namespace shaka { namespace media { -Job::Job(const std::string& name, std::shared_ptr work) - : SimpleThread(name), +Job::Job(const std::string& name, + std::shared_ptr work, + OnCompleteFunction on_complete) + : name_(name), work_(std::move(work)), - wait_(base::WaitableEvent::ResetPolicy::MANUAL, - base::WaitableEvent::InitialState::NOT_SIGNALED) { + on_complete_(on_complete), + status_(error::Code::UNKNOWN, "Job uninitialized") { DCHECK(work_); } +const Status& Job::Initialize() { + status_ = work_->Initialize(); + return status_; +} + +void Job::Start() { + thread_.reset(new std::thread(&Job::Run, this)); +} + void Job::Cancel() { work_->Cancel(); } -void Job::Run() { - status_ = work_->Run(); - wait_.Signal(); +const Status& Job::Run() { + if (status_.ok()) // initialized correctly + status_ = work_->Run(); + + on_complete_(this); + + return status_; +} + +void Job::Join() { + if (thread_) + thread_->join(); } JobManager::JobManager(std::unique_ptr sync_points) @@ -35,81 +57,77 @@ JobManager::JobManager(std::unique_ptr sync_points) void JobManager::Add(const std::string& name, std::shared_ptr handler) { - // Stores Job entries for delayed construction of Job objects, to avoid - // setting up SimpleThread until we know all workers can be initialized - // successfully. - job_entries_.push_back({name, std::move(handler)}); + jobs_.emplace_back(new Job( + name, std::move(handler), + std::bind(&JobManager::OnJobComplete, this, std::placeholders::_1))); } Status JobManager::InitializeJobs() { Status status; - for (const JobEntry& job_entry : job_entries_) - status.Update(job_entry.worker->Initialize()); - if (!status.ok()) - return status; - - // Create Job objects after successfully initialized all workers. - for (const JobEntry& job_entry : job_entries_) - jobs_.emplace_back(new Job(job_entry.name, std::move(job_entry.worker))); + for (auto& job : jobs_) + status.Update(job->Initialize()); return status; } Status JobManager::RunJobs() { - // We need to store the jobs and the waits separately in order to use the - // |WaitMany| function. |WaitMany| takes an array of WaitableEvents but we - // need to access the jobs in order to join the thread and check the status. - // The indexes needs to be check in sync or else we won't be able to relate a - // WaitableEvent back to the job. - std::vector active_jobs; - std::vector active_waits; + std::set active_jobs; // Start every job and add it to the active jobs list so that we can wait // on each one. for (auto& job : jobs_) { job->Start(); - active_jobs.push_back(job.get()); - active_waits.push_back(job->wait()); + active_jobs.insert(job.get()); } - // Wait for all jobs to complete or an error occurs. + // Wait for all jobs to complete or any job to error. Status status; - while (status.ok() && active_jobs.size()) { - // Wait for an event to finish and then update our status so that we can - // quit if something has gone wrong. - const size_t done = - base::WaitableEvent::WaitMany(active_waits.data(), active_waits.size()); - Job* job = active_jobs[done]; - - job->Join(); - status.Update(job->status()); - - // Remove the job and the wait from our tracking. - active_jobs.erase(active_jobs.begin() + done); - active_waits.erase(active_waits.begin() + done); + { + absl::MutexLock lock(&mutex_); + while (status.ok() && active_jobs.size()) { + // any_job_complete_ is protected by mutex_. + any_job_complete_.Wait(&mutex_); + + // complete_ is protected by mutex_. + for (const auto& entry : complete_) { + Job* job = entry.first; + bool complete = entry.second; + if (complete) { + job->Join(); + status.Update(job->status()); + active_jobs.erase(job); + } + } + } } // If the main loop has exited and there are still jobs running, // we need to cancel them and clean-up. if (sync_points_) sync_points_->Cancel(); - for (auto& job : active_jobs) { + + for (auto& job : active_jobs) job->Cancel(); - } - for (auto& job : active_jobs) { + for (auto& job : active_jobs) job->Join(); - } return status; } +void JobManager::OnJobComplete(Job* job) { + absl::MutexLock lock(&mutex_); + // These are both protected by mutex_. + complete_[job] = true; + any_job_complete_.Signal(); +} + void JobManager::CancelJobs() { if (sync_points_) sync_points_->Cancel(); - for (auto& job : jobs_) { + + for (auto& job : jobs_) job->Cancel(); - } } } // namespace media diff --git a/packager/app/job_manager.h b/packager/app/job_manager.h index 82a30673843..5860dd1e678 100644 --- a/packager/app/job_manager.h +++ b/packager/app/job_manager.h @@ -7,11 +7,13 @@ #ifndef PACKAGER_APP_JOB_MANAGER_H_ #define PACKAGER_APP_JOB_MANAGER_H_ +#include #include +#include #include -#include "packager/base/threading/simple_thread.h" -#include "packager/status.h" +#include "absl/synchronization/mutex.h" +#include "packager/status/status.h" namespace shaka { namespace media { @@ -21,33 +23,53 @@ class SyncPointQueue; // A job is a single line of work that is expected to run in parallel with // other jobs. -class Job : public base::SimpleThread { +class Job { public: - Job(const std::string& name, std::shared_ptr work); - - // Request that the job stops executing. This is only a request and - // will not block. If you want to wait for the job to complete, use - // |wait|. + typedef std::function OnCompleteFunction; + + Job(const std::string& name, + std::shared_ptr work, + OnCompleteFunction on_complete); + + // Initialize the work object. Call before Start() or Run(). Updates status() + // and returns it for convenience. + const Status& Initialize(); + + // Begin the job in a new thread. This is only a request and will not block. + // If you want to wait for the job to complete, use |complete|. + // Use either Start() for threaded operation or Run() for non-threaded + // operation. DO NOT USE BOTH! + void Start(); + + // Run the job's work synchronously, blocking until complete. Updates status() + // and returns it for convenience. + // Use either Start() for threaded operation or Run() for non-threaded + // operation. DO NOT USE BOTH! + const Status& Run(); + + // Request that the job stops executing. This is only a request and will not + // block. If you want to wait for the job to complete, use |complete|. void Cancel(); - // Get the current status of the job. If the job failed to initialize - // or encountered an error during execution this will return the error. + // Join the thread, if any was started. Blocks until the thread has stopped. + void Join(); + + // Get the current status of the job. If the job failed to initialize or + // encountered an error during execution this will return the error. const Status& status() const { return status_; } - // If you want to wait for this job to complete, this will return the - // WaitableEvent you can wait on. - base::WaitableEvent* wait() { return &wait_; } + // The name given to this job in the constructor. + const std::string& name() const { return name_; } private: Job(const Job&) = delete; Job& operator=(const Job&) = delete; - void Run() override; - + std::string name_; std::shared_ptr work_; + OnCompleteFunction on_complete_; + std::unique_ptr thread_; Status status_; - - base::WaitableEvent wait_; }; // Similar to a thread pool, JobManager manages multiple jobs that are expected @@ -70,7 +92,7 @@ class JobManager { // Initialize all registered jobs. If any job fails to initialize, this will // return the error and it will not be safe to call |RunJobs| as not all jobs // will be properly initialized. - virtual Status InitializeJobs(); + Status InitializeJobs(); // Run all registered jobs. Before calling this make sure that // |InitializedJobs| returned |Status::OK|. This call is blocking and will @@ -87,16 +109,17 @@ class JobManager { JobManager(const JobManager&) = delete; JobManager& operator=(const JobManager&) = delete; - struct JobEntry { - std::string name; - std::shared_ptr worker; - }; - // Stores Job entries for delayed construction of Job object. - std::vector job_entries_; - std::vector> jobs_; + void OnJobComplete(Job* job); + // Stored in JobManager so JobManager can cancel |sync_points| when any job // fails or is cancelled. std::unique_ptr sync_points_; + + std::vector> jobs_; + + absl::Mutex mutex_; + std::map complete_ ABSL_GUARDED_BY(mutex_); + absl::CondVar any_job_complete_ ABSL_GUARDED_BY(mutex_); }; } // namespace media diff --git a/packager/app/single_thread_job_manager.cc b/packager/app/single_thread_job_manager.cc index 0d0c3a8b856..6c7ade0f3b3 100644 --- a/packager/app/single_thread_job_manager.cc +++ b/packager/app/single_thread_job_manager.cc @@ -16,17 +16,12 @@ SingleThreadJobManager::SingleThreadJobManager( std::unique_ptr sync_points) : JobManager(std::move(sync_points)) {} -Status SingleThreadJobManager::InitializeJobs() { - Status status; - for (const JobEntry& job_entry : job_entries_) - status.Update(job_entry.worker->Initialize()); - return status; -} - Status SingleThreadJobManager::RunJobs() { Status status; - for (const JobEntry& job_entry : job_entries_) - status.Update(job_entry.worker->Run()); + + for (auto& job : jobs_) + status.Update(job->Run()); + return status; } diff --git a/packager/app/single_thread_job_manager.h b/packager/app/single_thread_job_manager.h index fa99e894282..6e4ff0006c9 100644 --- a/packager/app/single_thread_job_manager.h +++ b/packager/app/single_thread_job_manager.h @@ -22,7 +22,7 @@ class SingleThreadJobManager : public JobManager { // fails or is cancelled. It can be NULL. explicit SingleThreadJobManager(std::unique_ptr sync_points); - Status InitializeJobs() override; + // Run all registered jobs serially in this thread. Status RunJobs() override; }; diff --git a/packager/media/chunking/CMakeLists.txt b/packager/media/chunking/CMakeLists.txt index ed2472614ac..28acadb7888 100644 --- a/packager/media/chunking/CMakeLists.txt +++ b/packager/media/chunking/CMakeLists.txt @@ -4,26 +4,26 @@ # license that can be found in the LICENSE file or at # https://developers.google.com/open-source/licenses/bsd -add_library(chunking STATIC +add_library(media_chunking STATIC chunking_handler.cc cue_alignment_handler.cc sync_point_queue.cc text_chunker.cc ) -target_link_libraries(chunking +target_link_libraries(media_chunking media_base ) -add_executable(chunking_unittest +add_executable(media_chunking_unittest chunking_handler_unittest.cc cue_alignment_handler_unittest.cc text_chunker_unittest.cc ) -target_link_libraries(chunking_unittest +target_link_libraries(media_chunking_unittest gmock gtest gtest_main + media_chunking media_handler_test_base - chunking ) -add_gtest(chunking_unittest) +add_gtest(media_chunking_unittest) diff --git a/packager/media/codecs/CMakeLists.txt b/packager/media/codecs/CMakeLists.txt index a5ad0284bfe..0b31e4cd24b 100644 --- a/packager/media/codecs/CMakeLists.txt +++ b/packager/media/codecs/CMakeLists.txt @@ -4,7 +4,7 @@ # license that can be found in the LICENSE file or at # https://developers.google.com/open-source/licenses/bsd -add_library(codecs STATIC +add_library(media_codecs STATIC aac_audio_specific_config.cc ac3_audio_util.cc av1_codec_configuration_record.cc @@ -31,10 +31,10 @@ add_library(codecs STATIC vp9_parser.cc ) -target_link_libraries(codecs +target_link_libraries(media_codecs media_base) -add_executable(codecs_unittest +add_executable(media_codecs_unittest aac_audio_specific_config_unittest.cc ac3_audio_util_unittest.cc av1_codec_configuration_record_unittest.cc @@ -59,11 +59,11 @@ add_executable(codecs_unittest vp9_parser_unittest.cc ) -target_link_libraries(codecs_unittest - codecs +target_link_libraries(media_codecs_unittest gmock gtest gtest_main + media_codecs test_data_util) -add_gtest(codecs_unittest) +add_gtest(media_codecs_unittest) diff --git a/packager/media/crypto/CMakeLists.txt b/packager/media/crypto/CMakeLists.txt index 4847cccb511..ed0759c3be9 100644 --- a/packager/media/crypto/CMakeLists.txt +++ b/packager/media/crypto/CMakeLists.txt @@ -10,8 +10,8 @@ add_library(media_crypto STATIC sample_aes_ec3_cryptor.cc subsample_generator.cc) target_link_libraries(media_crypto - codecs media_base + media_codecs absl::base glog) @@ -20,8 +20,8 @@ add_executable(media_crypto_unittest sample_aes_ec3_cryptor_unittest.cc subsample_generator_unittest.cc) target_link_libraries(media_crypto_unittest - codecs media_base + media_codecs media_crypto media_handler_test_base status diff --git a/packager/media/event/CMakeLists.txt b/packager/media/event/CMakeLists.txt index 359ff942fdc..04d59477eeb 100644 --- a/packager/media/event/CMakeLists.txt +++ b/packager/media/event/CMakeLists.txt @@ -17,7 +17,7 @@ target_link_libraries(media_event file mpd_media_info_proto media_base - codecs + media_codecs ) add_library(mock_muxer_listener STATIC diff --git a/packager/media/formats/webm/CMakeLists.txt b/packager/media/formats/webm/CMakeLists.txt index 2040911a5d0..423efc87cc7 100644 --- a/packager/media/formats/webm/CMakeLists.txt +++ b/packager/media/formats/webm/CMakeLists.txt @@ -30,7 +30,7 @@ target_link_libraries(formats_webm webm file media_base - codecs + media_codecs ) add_executable(webm_unittest