Skip to content

Commit

Permalink
feat: Port app/job_manager to cmake and absl
Browse files Browse the repository at this point in the history
Issue shaka-project#1047 (cmake)
Issue shaka-project#346 (absl)
  • Loading branch information
joeyparrish committed Jul 20, 2023
1 parent 4515a98 commit 0430490
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 100 deletions.
1 change: 1 addition & 0 deletions packager/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ add_subdirectory(third_party)
add_subdirectory(tools)
add_subdirectory(utils)
add_subdirectory(version)
add_subdirectory(app)
116 changes: 67 additions & 49 deletions packager/app/job_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,110 +6,128 @@

#include "packager/app/job_manager.h"

#include "packager/app/libcrypto_threading.h"
#include <functional>
#include <set>

#include "packager/media/chunking/sync_point_queue.h"
#include "packager/media/origin/origin_handler.h"

namespace shaka {
namespace media {

Job::Job(const std::string& name, std::shared_ptr<OriginHandler> work)
: SimpleThread(name),
Job::Job(const std::string& name,
std::shared_ptr<OriginHandler> work,
OnCompleteFunction on_complete)
: name_(name),
work_(std::move(work)),
wait_(base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED) {
on_complete_(on_complete),
status_(error::Code::UNKNOWN, "Job uninitialized") {
DCHECK(work_);
}

const Status& Job::Initialize() {
status_ = work_->Initialize();
return status_;
}

void Job::Start() {
thread_.reset(new std::thread(&Job::Run, this));
}

void Job::Cancel() {
work_->Cancel();
}

void Job::Run() {
status_ = work_->Run();
wait_.Signal();
const Status& Job::Run() {
if (status_.ok()) // initialized correctly
status_ = work_->Run();

on_complete_(this);

return status_;
}

void Job::Join() {
if (thread_)
thread_->join();
}

JobManager::JobManager(std::unique_ptr<SyncPointQueue> sync_points)
: sync_points_(std::move(sync_points)) {}

void JobManager::Add(const std::string& name,
std::shared_ptr<OriginHandler> handler) {
// Stores Job entries for delayed construction of Job objects, to avoid
// setting up SimpleThread until we know all workers can be initialized
// successfully.
job_entries_.push_back({name, std::move(handler)});
jobs_.emplace_back(new Job(
name, std::move(handler),
std::bind(&JobManager::OnJobComplete, this, std::placeholders::_1)));
}

Status JobManager::InitializeJobs() {
Status status;
for (const JobEntry& job_entry : job_entries_)
status.Update(job_entry.worker->Initialize());
if (!status.ok())
return status;

// Create Job objects after successfully initialized all workers.
for (const JobEntry& job_entry : job_entries_)
jobs_.emplace_back(new Job(job_entry.name, std::move(job_entry.worker)));
for (auto& job : jobs_)
status.Update(job->Initialize());
return status;
}

Status JobManager::RunJobs() {
// We need to store the jobs and the waits separately in order to use the
// |WaitMany| function. |WaitMany| takes an array of WaitableEvents but we
// need to access the jobs in order to join the thread and check the status.
// The indexes needs to be check in sync or else we won't be able to relate a
// WaitableEvent back to the job.
std::vector<Job*> active_jobs;
std::vector<base::WaitableEvent*> active_waits;
std::set<Job*> active_jobs;

// Start every job and add it to the active jobs list so that we can wait
// on each one.
for (auto& job : jobs_) {
job->Start();

active_jobs.push_back(job.get());
active_waits.push_back(job->wait());
active_jobs.insert(job.get());
}

// Wait for all jobs to complete or an error occurs.
// Wait for all jobs to complete or any job to error.
Status status;
while (status.ok() && active_jobs.size()) {
// Wait for an event to finish and then update our status so that we can
// quit if something has gone wrong.
const size_t done =
base::WaitableEvent::WaitMany(active_waits.data(), active_waits.size());
Job* job = active_jobs[done];

job->Join();
status.Update(job->status());

// Remove the job and the wait from our tracking.
active_jobs.erase(active_jobs.begin() + done);
active_waits.erase(active_waits.begin() + done);
{
absl::MutexLock lock(&mutex_);
while (status.ok() && active_jobs.size()) {
// any_job_complete_ is protected by mutex_.
any_job_complete_.Wait(&mutex_);

// complete_ is protected by mutex_.
for (const auto& entry : complete_) {
Job* job = entry.first;
bool complete = entry.second;
if (complete) {
job->Join();
status.Update(job->status());
active_jobs.erase(job);
}
}
}
}

// If the main loop has exited and there are still jobs running,
// we need to cancel them and clean-up.
if (sync_points_)
sync_points_->Cancel();
for (auto& job : active_jobs) {

for (auto& job : active_jobs)
job->Cancel();
}

for (auto& job : active_jobs) {
for (auto& job : active_jobs)
job->Join();
}

return status;
}

void JobManager::OnJobComplete(Job* job) {
absl::MutexLock lock(&mutex_);
// These are both protected by mutex_.
complete_[job] = true;
any_job_complete_.Signal();
}

void JobManager::CancelJobs() {
if (sync_points_)
sync_points_->Cancel();
for (auto& job : jobs_) {

for (auto& job : jobs_)
job->Cancel();
}
}

} // namespace media
Expand Down
73 changes: 48 additions & 25 deletions packager/app/job_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
#ifndef PACKAGER_APP_JOB_MANAGER_H_
#define PACKAGER_APP_JOB_MANAGER_H_

#include <map>
#include <memory>
#include <thread>
#include <vector>

#include "packager/base/threading/simple_thread.h"
#include "packager/status.h"
#include "absl/synchronization/mutex.h"
#include "packager/status/status.h"

namespace shaka {
namespace media {
Expand All @@ -21,33 +23,53 @@ class SyncPointQueue;

// A job is a single line of work that is expected to run in parallel with
// other jobs.
class Job : public base::SimpleThread {
class Job {
public:
Job(const std::string& name, std::shared_ptr<OriginHandler> work);

// Request that the job stops executing. This is only a request and
// will not block. If you want to wait for the job to complete, use
// |wait|.
typedef std::function<void(Job*)> OnCompleteFunction;

Job(const std::string& name,
std::shared_ptr<OriginHandler> work,
OnCompleteFunction on_complete);

// Initialize the work object. Call before Start() or Run(). Updates status()
// and returns it for convenience.
const Status& Initialize();

// Begin the job in a new thread. This is only a request and will not block.
// If you want to wait for the job to complete, use |complete|.
// Use either Start() for threaded operation or Run() for non-threaded
// operation. DO NOT USE BOTH!
void Start();

// Run the job's work synchronously, blocking until complete. Updates status()
// and returns it for convenience.
// Use either Start() for threaded operation or Run() for non-threaded
// operation. DO NOT USE BOTH!
const Status& Run();

// Request that the job stops executing. This is only a request and will not
// block. If you want to wait for the job to complete, use |complete|.
void Cancel();

// Get the current status of the job. If the job failed to initialize
// or encountered an error during execution this will return the error.
// Join the thread, if any was started. Blocks until the thread has stopped.
void Join();

// Get the current status of the job. If the job failed to initialize or
// encountered an error during execution this will return the error.
const Status& status() const { return status_; }

// If you want to wait for this job to complete, this will return the
// WaitableEvent you can wait on.
base::WaitableEvent* wait() { return &wait_; }
// The name given to this job in the constructor.
const std::string& name() const { return name_; }

private:
Job(const Job&) = delete;
Job& operator=(const Job&) = delete;

void Run() override;

std::string name_;
std::shared_ptr<OriginHandler> work_;
OnCompleteFunction on_complete_;
std::unique_ptr<std::thread> thread_;
Status status_;

base::WaitableEvent wait_;
};

// Similar to a thread pool, JobManager manages multiple jobs that are expected
Expand All @@ -70,7 +92,7 @@ class JobManager {
// Initialize all registered jobs. If any job fails to initialize, this will
// return the error and it will not be safe to call |RunJobs| as not all jobs
// will be properly initialized.
virtual Status InitializeJobs();
Status InitializeJobs();

// Run all registered jobs. Before calling this make sure that
// |InitializedJobs| returned |Status::OK|. This call is blocking and will
Expand All @@ -87,16 +109,17 @@ class JobManager {
JobManager(const JobManager&) = delete;
JobManager& operator=(const JobManager&) = delete;

struct JobEntry {
std::string name;
std::shared_ptr<OriginHandler> worker;
};
// Stores Job entries for delayed construction of Job object.
std::vector<JobEntry> job_entries_;
std::vector<std::unique_ptr<Job>> jobs_;
void OnJobComplete(Job* job);

// Stored in JobManager so JobManager can cancel |sync_points| when any job
// fails or is cancelled.
std::unique_ptr<SyncPointQueue> sync_points_;

std::vector<std::unique_ptr<Job>> jobs_;

absl::Mutex mutex_;
std::map<Job*, bool> complete_ ABSL_GUARDED_BY(mutex_);
absl::CondVar any_job_complete_ ABSL_GUARDED_BY(mutex_);
};

} // namespace media
Expand Down
13 changes: 4 additions & 9 deletions packager/app/single_thread_job_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,12 @@ SingleThreadJobManager::SingleThreadJobManager(
std::unique_ptr<SyncPointQueue> sync_points)
: JobManager(std::move(sync_points)) {}

Status SingleThreadJobManager::InitializeJobs() {
Status status;
for (const JobEntry& job_entry : job_entries_)
status.Update(job_entry.worker->Initialize());
return status;
}

Status SingleThreadJobManager::RunJobs() {
Status status;
for (const JobEntry& job_entry : job_entries_)
status.Update(job_entry.worker->Run());

for (auto& job : jobs_)
status.Update(job->Run());

return status;
}

Expand Down
2 changes: 1 addition & 1 deletion packager/app/single_thread_job_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class SingleThreadJobManager : public JobManager {
// fails or is cancelled. It can be NULL.
explicit SingleThreadJobManager(std::unique_ptr<SyncPointQueue> sync_points);

Status InitializeJobs() override;
// Run all registered jobs serially in this thread.
Status RunJobs() override;
};

Expand Down
12 changes: 6 additions & 6 deletions packager/media/chunking/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,26 @@
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd

add_library(chunking STATIC
add_library(media_chunking STATIC
chunking_handler.cc
cue_alignment_handler.cc
sync_point_queue.cc
text_chunker.cc
)
target_link_libraries(chunking
target_link_libraries(media_chunking
media_base
)

add_executable(chunking_unittest
add_executable(media_chunking_unittest
chunking_handler_unittest.cc
cue_alignment_handler_unittest.cc
text_chunker_unittest.cc
)
target_link_libraries(chunking_unittest
target_link_libraries(media_chunking_unittest
gmock
gtest
gtest_main
media_chunking
media_handler_test_base
chunking
)
add_gtest(chunking_unittest)
add_gtest(media_chunking_unittest)
Loading

0 comments on commit 0430490

Please sign in to comment.