Skip to content

Commit

Permalink
feat: add health server and prometheus metrics to worker
Browse files Browse the repository at this point in the history
  • Loading branch information
adamcooke committed Feb 23, 2024
1 parent 1ae8ef6 commit a2eb70e
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 3 deletions.
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ gem "mysql2"
gem "nifty-utils"
gem "nilify_blanks"
gem "nio4r"
gem "prometheus-client"
gem "puma"
gem "rails", "= 7.0.8.1"
gem "resolv"
gem "secure_headers"
gem "sentry-rails"
gem "turbolinks", "~> 5"
gem "webrick"

group :development, :assets do
gem "coffee-rails", "~> 5.0"
Expand Down
4 changes: 4 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ GEM
parallel (1.22.1)
parser (3.2.1.1)
ast (~> 2.4.1)
prometheus-client (4.2.2)
public_suffix (5.0.4)
puma (6.4.2)
nio4r (~> 2.0)
Expand Down Expand Up @@ -314,6 +315,7 @@ GEM
addressable (>= 2.8.0)
crack (>= 0.3.2)
hashdiff (>= 0.4.0, < 2.0.0)
webrick (1.8.1)
websocket-driver (0.7.6)
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.5)
Expand Down Expand Up @@ -351,6 +353,7 @@ DEPENDENCIES
nifty-utils
nilify_blanks
nio4r
prometheus-client
puma
rails (= 7.0.8.1)
resolv
Expand All @@ -366,6 +369,7 @@ DEPENDENCIES
turbolinks (~> 5)
uglifier (>= 1.3.0)
webmock
webrick

BUNDLED WITH
2.5.6
30 changes: 27 additions & 3 deletions app/lib/worker/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ module Worker
# after it has completed any outstanding jobs which are already inflight.
class Process

include HasPrometheusMetrics

# An array of job classes that should be processed each time the worker ticks.
#
# @return [Array<Class>]
Expand Down Expand Up @@ -48,6 +50,8 @@ def initialize(thread_count: 2, work_sleep_time: 5, task_sleep_time: 60)
@work_sleep_time = work_sleep_time
@task_sleep_time = task_sleep_time
@threads = []

setup_prometheus
end

def run
Expand Down Expand Up @@ -114,7 +118,7 @@ def start_work_thread(index)
logger.tagged(component: "worker", thread: "work#{index}") do
logger.info "started work thread #{index}"
loop do
work_completed = work
work_completed = work(index)

if shutdown_after_wait?(work_completed ? 0 : @work_sleep_time)
break
Expand All @@ -129,15 +133,22 @@ def start_work_thread(index)
# Actually perform the work for this tick. This will call each job which has been registered.
#
# @return [Boolean] Whether any work was completed in this job or not
def work
def work(thread)
completed_work = 0
ActiveRecord::Base.connection_pool.with_connection do
JOBS.each do |job_class|
capture_errors do
job = job_class.new(logger: logger)
job.call

completed_work += 1 if job.work_completed?
if job.work_completed?
completed_work += 1
increment_prometheus_counter :postal_worker_job_executions,
labels: {
thread: thread,
job: job_class.to_s.split("::").last
}
end
end
end
end
Expand Down Expand Up @@ -236,6 +247,19 @@ def capture_errors
logger.error "#{e.class} (#{e.message})"
e.backtrace.each { |line| logger.error line }
Sentry.capture_exception(e) if defined?(Sentry)

increment_prometheus_counter :postal_worker_errors,
labels: { error: e.class.to_s }
end

def setup_prometheus
register_prometheus_counter :postal_worker_job_executions,
docstring: "The number of jobs worked by a worker",
labels: [:thread, :job]

register_prometheus_counter :postal_worker_errors,
docstring: "The number of errors encountered while processing jobs",
labels: [:error]
end

end
Expand Down
23 changes: 23 additions & 0 deletions app/util/has_prometheus_metrics.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# frozen_string_literal: true

module HasPrometheusMetrics

def register_prometheus_counter(name, **kwargs)
counter = Prometheus::Client::Counter.new(name, **kwargs)
registry.register(counter)
end

def increment_prometheus_counter(name, labels: {})
counter = registry.get(name)
return if counter.nil?

counter.increment(labels: labels)
end

private

def registry
Prometheus::Client.registry
end

end
107 changes: 107 additions & 0 deletions app/util/health_server.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# frozen_string_literal: true

require "socket"
require "rack/handler/webrick"
require "prometheus/client/formats/text"

class HealthServer

def initialize(name: "unnamed-process")
@name = name
end

def call(env)
case env["PATH_INFO"]
when "/health"
ok
when "/metrics"
metrics
when "/"
root
else
not_found
end
end

private

def root
[200, { "Content-Type" => "text/plain" }, ["#{@name} (pid: #{Process.pid}, host: #{Socket.gethostname})"]]
end

def ok
[200, { "Content-Type" => "text/plain" }, ["OK"]]
end

def not_found
[404, { "Content-Type" => "text/plain" }, ["Not Found"]]
end

def metrics
registry = Prometheus::Client.registry
body = Prometheus::Client::Formats::Text.marshal(registry)
[200, { "Content-Type" => "text/plain" }, [body]]
end

class << self

def run(default_port: 9090, **options)
port = ENV.fetch("HEALTH_SERVER_PORT", default_port)

Rack::Handler::WEBrick.run(new(**options),
Port: port,
BindAddress: bind_address,
AccessLog: [],
Logger: LoggerProxy.new)
rescue Errno::EADDRINUSE
Postal.logger.info "health server port (#{bind_address}:#{port}) is already " \
"in use, not starting health server"
end

def bind_address
ENV.fetch("HEALTH_SERVER_BIND_ADDRESS", "127.0.0.1")
end

def start(**options)
thread = Thread.new { run(**options) }
thread.abort_on_exception = false
thread
end

end

class LoggerProxy

[:info, :debug, :warn, :error, :fatal].each do |severity|
define_method(severity) do |message|
add(severity, message)
end

define_method("#{severity}?") do
severity != :debug
end
end

def add(severity, message)
return if severity == :debug

case message
when /\AWEBrick::HTTPServer#start:.*port=(\d+)/
Postal.logger.info "started health server on port #{::Regexp.last_match(1)}", component: "true"
when /\AWEBrick::HTTPServer#start done/
Postal.logger.info "stopped health server"
when /\AWEBrick [\d.]+/,
/\Aruby ([\d.]+)/,
/\ARack::Handler::WEBrick is mounted/,
/\Aclose TCPSocket/,
/\Agoing to shutdown/
# Don't actually print routine messages to avoid too much
# clutter when processes start it
else
Postal.logger.debug message, component: "true"
end
end

end

end
2 changes: 2 additions & 0 deletions script/smtp_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@
$stderr.sync = true

require_relative "../config/environment"

HealthServer.start(name: "smtp-server", default_port: 9091)
SMTPServer::Server.new(debug: true).run
2 changes: 2 additions & 0 deletions script/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@
$stderr.sync = true

require_relative "../config/environment"

HealthServer.start(name: "worker", default_port: 9090)
Worker::Process.new.run

0 comments on commit a2eb70e

Please sign in to comment.