Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support cron scheduling via GCP Cloud Scheduler #107

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cloudtasker.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Gem::Specification.new do |spec|
spec.add_dependency 'activesupport'
spec.add_dependency 'connection_pool'
spec.add_dependency 'fugit'
spec.add_dependency 'google-cloud-scheduler'
spec.add_dependency 'google-cloud-tasks'
spec.add_dependency 'jwt'
spec.add_dependency 'redis'
Expand Down
1 change: 1 addition & 0 deletions lib/cloudtasker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

require 'cloudtasker/middleware/chain'
require 'cloudtasker/authenticator'
require 'cloudtasker/cloud_scheduler'
require 'cloudtasker/cloud_task'
require 'cloudtasker/worker_logger'
require 'cloudtasker/worker_handler'
Expand Down
11 changes: 11 additions & 0 deletions lib/cloudtasker/cloud_scheduler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true

require_relative 'cloud_scheduler/schedule'
require_relative 'cloud_scheduler/job'
require_relative 'cloud_scheduler/manager'

module Cloudtasker
# Schedule jobs using GCP Cloud Scheduler
module CloudScheduler
end
end
141 changes: 141 additions & 0 deletions lib/cloudtasker/cloud_scheduler/job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# frozen_string_literal: true

require 'google/cloud/scheduler/v1'
require 'cloudtasker/worker_handler'

module Cloudtasker
module CloudScheduler
# Manage cron jobs
class Job
#
# Return all jobs from a hash.
#
# @param [Hash] hash The hash to load jobs from.
#
# @return [Array<Cloudtasker::CloudScheduler::Job>] The list of jobs.
#
def self.load_from_hash!(hash)
Schedule.load_from_hash!(hash).map do |schedule|
new(schedule)
end
end

attr_reader :schedule

#
# Build a new instance of the class.
#
# @param [Cloudtasker::CloudScheduler::Schedule] schedule The schedule to run.
#
def initialize(schedule)
@schedule = schedule
end

#
# Prefix for all jobs that includes the parent path and the queue prefix.
#
# @return [String] The job prefix.
#
def prefix
"#{parent}/jobs/#{config.gcp_queue_prefix}--"
end

#
# Return name of the job in the remote scheduler.
#
# @return [String] The job name.
#
def remote_name
"#{prefix}#{schedule.id}"
end

#
# Return the job name.
#
# @return [String] The job name.
#
def name
schedule.id
end

#
# Create the job in the remote scheduler.
#
# @return [Google::Cloud::Scheduler::V1::Job] The job instance.
#
def create!
client.create_job(parent: parent, job: payload)
end

#
# Update the job in the remote scheduler.
#
# @return [Google::Cloud::Scheduler::V1::Job] The job instance.
#
def update!
client.update_job(job: payload)
end

#
# Delete the job from the remote scheduler.
#
# @return [Google::Protobuf::Empty] The job instance.
#
def delete!
client.delete_job(name: remote_name)
end

#
# Return a hash that can be used to create/update a job in the remote scheduler.
#
# @return [Hash<Symbol, String>] The job hash.
#
def payload
{
name: remote_name,
schedule: schedule.cron,
time_zone: schedule.time_zone,
http_target: {
http_method: 'POST',
uri: config.processor_url,
oidc_token: config.oidc,
body: schedule.job_payload.to_json,
headers: {
Cloudtasker::Config::CONTENT_TYPE_HEADER => 'application/json',
Cloudtasker::Config::CT_AUTHORIZATION_HEADER => Authenticator.bearer_token
}.compact
}.compact
}
end

private

#
# Return the parent path for all jobs.
#
# @return [String] The parent path.
#
def parent
@parent ||= client.location_path(project: config.gcp_project_id, location: config.gcp_location_id)
end

#
# Return the Cloudtasker configuration.
#
# @return [Cloudtasker::Config] The configuration.
#
def config
@config ||= Cloudtasker.config
end

#
# Return the Cloud Scheduler client.
#
# @return [Google::Cloud::Scheduler::V1::CloudSchedulerClient] The client.
#
def client
@client ||= Google::Cloud::Scheduler.cloud_scheduler
end
end
end
end
36 changes: 36 additions & 0 deletions lib/cloudtasker/cloud_scheduler/job/active_job_payload.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# frozen_string_literal: true

module Cloudtasker
module CloudScheduler
class Job
# Payload used to schedule ActiveJob jobs on Cloud Scheduler
class ActiveJobPayload
attr_reader :worker

#
# Build a new instance of the class.
#
# @param [ActiveJob::Base] worker The ActiveJob instance.
#
def initialize(worker)
@worker = worker
end

#
# Return the Hash representation of the job payload.
#
# @return [Hash] The job payload.
#
def to_h
{
'worker' => 'ActiveJob::QueueAdapters::CloudtaskerAdapter::JobWrapper',
'job_queue' => worker.queue_name,
'job_id' => worker.job_id,
'job_meta' => {},
'job_args' => [worker.serialize]
}
end
end
end
end
end
52 changes: 52 additions & 0 deletions lib/cloudtasker/cloud_scheduler/job/worker_payload.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# frozen_string_literal: true

require 'cloudtasker/worker_handler'

module Cloudtasker
module CloudScheduler
class Job
# Payload used to schedule Cloudtasker Workers on Cloud Scheduler
class WorkerPayload
attr_reader :worker

#
# Build a new instance of the class.
#
# @param [Cloudtasker::Worker] worker The Cloudtasker Worker instance.
#
def initialize(worker)
@worker = worker
end

#
# Return the Hash representation of the job payload.
#
# @return [Hash] The job payload.
#
def to_h
JSON.parse(request_config[:body])
end

private

#
# Return the HTTP request configuration for a Cloud Task.
#
# @return [Hash] The request configuration.
#
def request_config
worker_handler.task_payload[:http_request]
end

#
# Return the worker handler.
#
# @return [Cloudtasker::WorkerHandler] The worker handler.
#
def worker_handler
@worker_handler ||= Cloudtasker::WorkerHandler.new(worker)
end
end
end
end
end
132 changes: 132 additions & 0 deletions lib/cloudtasker/cloud_scheduler/manager.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# frozen_string_literal: true

require 'google/cloud/scheduler/v1'
require 'google/cloud/scheduler'

module Cloudtasker
module CloudScheduler
# Manage the synchronization of jobs between the
# local configuration and the remote scheduler.
class Manager
#
# Synchronize the local configuration with the remote scheduler.
#
# @param [String] file The path to the schedule configuration file.
#
def self.synchronize!(file)
config = YAML.load_file(file)
jobs = Job.load_from_hash!(config)

new(jobs).synchronize!
end

attr_reader :jobs

#
# Build a new instance of the class.
#
# @param [Array<Cloudtasker::CloudScheduler::Job>] jobs The list of jobs to synchronize.
#
def initialize(jobs)
@jobs = jobs
end

#
# Synchronize the local configuration with the remote scheduler.
#
# @return [nil]
#
def synchronize!
new_jobs.map(&:create!)
stale_jobs.map(&:update!)
deleted_jobs.map { |job| client.delete_job(name: job) }

nil
end

private

#
# Return all jobs from the remote scheduler.
#
# @return [Array<String>] The list of job names.
#
def remote_jobs
@remote_jobs ||= client.list_jobs(parent: parent)
.response
.jobs
.map(&:name)
.select do |job|
job.start_with?(job_prefix)
end
end

#
# Return all jobs that are not yet created in the remote scheduler.
#
# @return [Array<Cloudtasker::CloudScheduler::Job>] The list of jobs.
#
def new_jobs
jobs.reject do |job|
remote_jobs.include?(job.remote_name)
end
end

#
# Return all jobs that are present in both local config and remote scheduler.
#
# @return [Array<Cloudtasker::CloudScheduler::Job>] The list of jobs.
#
def stale_jobs
jobs.select do |job|
remote_jobs.include?(job.remote_name)
end
end

#
# Return all jobs that are present in the remote scheduler but not in the local config.
#
# @return [Array<String>] The list of job names.
#
def deleted_jobs
remote_jobs - jobs.map(&:remote_name)
end

#
# Prefix for all jobs that includes the parent path and the queue prefix.
#
# @return [String] The job prefix.
#
def job_prefix
"#{parent}/jobs/#{config.gcp_queue_prefix}--"
end

#
# Return the parent path for all jobs.
#
# @return [String] The parent path.
#
def parent
@parent ||= client.location_path(project: config.gcp_project_id, location: config.gcp_location_id)
end

#
# Return the Cloudtasker configuration.
#
# @return [Cloudtasker::Config] The configuration.
#
def config
@config ||= Cloudtasker.config
end

#
# Return the Cloud Scheduler client.
#
# @return [Google::Cloud::Scheduler::V1::CloudSchedulerClient] The client.
#
def client
@client ||= Google::Cloud::Scheduler.cloud_scheduler
end
end
end
end
Loading