From a2eb70edf1507ad7e235f3a817c44e1a028e42a8 Mon Sep 17 00:00:00 2001 From: Adam Cooke Date: Fri, 23 Feb 2024 18:11:05 +0000 Subject: [PATCH] feat: add health server and prometheus metrics to worker --- Gemfile | 2 + Gemfile.lock | 4 ++ app/lib/worker/process.rb | 30 +++++++- app/util/has_prometheus_metrics.rb | 23 +++++++ app/util/health_server.rb | 107 +++++++++++++++++++++++++++++ script/smtp_server.rb | 2 + script/worker.rb | 2 + 7 files changed, 167 insertions(+), 3 deletions(-) create mode 100644 app/util/has_prometheus_metrics.rb create mode 100644 app/util/health_server.rb diff --git a/Gemfile b/Gemfile index afa87d26..f3865645 100644 --- a/Gemfile +++ b/Gemfile @@ -21,12 +21,14 @@ gem "mysql2" gem "nifty-utils" gem "nilify_blanks" gem "nio4r" +gem "prometheus-client" gem "puma" gem "rails", "= 7.0.8.1" gem "resolv" gem "secure_headers" gem "sentry-rails" gem "turbolinks", "~> 5" +gem "webrick" group :development, :assets do gem "coffee-rails", "~> 5.0" diff --git a/Gemfile.lock b/Gemfile.lock index 64f83672..f59b4dbb 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -193,6 +193,7 @@ GEM parallel (1.22.1) parser (3.2.1.1) ast (~> 2.4.1) + prometheus-client (4.2.2) public_suffix (5.0.4) puma (6.4.2) nio4r (~> 2.0) @@ -314,6 +315,7 @@ GEM addressable (>= 2.8.0) crack (>= 0.3.2) hashdiff (>= 0.4.0, < 2.0.0) + webrick (1.8.1) websocket-driver (0.7.6) websocket-extensions (>= 0.1.0) websocket-extensions (0.1.5) @@ -351,6 +353,7 @@ DEPENDENCIES nifty-utils nilify_blanks nio4r + prometheus-client puma rails (= 7.0.8.1) resolv @@ -366,6 +369,7 @@ DEPENDENCIES turbolinks (~> 5) uglifier (>= 1.3.0) webmock + webrick BUNDLED WITH 2.5.6 diff --git a/app/lib/worker/process.rb b/app/lib/worker/process.rb index 34eaec21..1e17dc3b 100644 --- a/app/lib/worker/process.rb +++ b/app/lib/worker/process.rb @@ -19,6 +19,8 @@ module Worker # after it has completed any outstanding jobs which are already inflight. class Process + include HasPrometheusMetrics + # An array of job classes that should be processed each time the worker ticks. # # @return [Array] @@ -48,6 +50,8 @@ def initialize(thread_count: 2, work_sleep_time: 5, task_sleep_time: 60) @work_sleep_time = work_sleep_time @task_sleep_time = task_sleep_time @threads = [] + + setup_prometheus end def run @@ -114,7 +118,7 @@ def start_work_thread(index) logger.tagged(component: "worker", thread: "work#{index}") do logger.info "started work thread #{index}" loop do - work_completed = work + work_completed = work(index) if shutdown_after_wait?(work_completed ? 0 : @work_sleep_time) break @@ -129,7 +133,7 @@ def start_work_thread(index) # Actually perform the work for this tick. This will call each job which has been registered. # # @return [Boolean] Whether any work was completed in this job or not - def work + def work(thread) completed_work = 0 ActiveRecord::Base.connection_pool.with_connection do JOBS.each do |job_class| @@ -137,7 +141,14 @@ def work job = job_class.new(logger: logger) job.call - completed_work += 1 if job.work_completed? + if job.work_completed? + completed_work += 1 + increment_prometheus_counter :postal_worker_job_executions, + labels: { + thread: thread, + job: job_class.to_s.split("::").last + } + end end end end @@ -236,6 +247,19 @@ def capture_errors logger.error "#{e.class} (#{e.message})" e.backtrace.each { |line| logger.error line } Sentry.capture_exception(e) if defined?(Sentry) + + increment_prometheus_counter :postal_worker_errors, + labels: { error: e.class.to_s } + end + + def setup_prometheus + register_prometheus_counter :postal_worker_job_executions, + docstring: "The number of jobs worked by a worker", + labels: [:thread, :job] + + register_prometheus_counter :postal_worker_errors, + docstring: "The number of errors encountered while processing jobs", + labels: [:error] end end diff --git a/app/util/has_prometheus_metrics.rb b/app/util/has_prometheus_metrics.rb new file mode 100644 index 00000000..ca717319 --- /dev/null +++ b/app/util/has_prometheus_metrics.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +module HasPrometheusMetrics + + def register_prometheus_counter(name, **kwargs) + counter = Prometheus::Client::Counter.new(name, **kwargs) + registry.register(counter) + end + + def increment_prometheus_counter(name, labels: {}) + counter = registry.get(name) + return if counter.nil? + + counter.increment(labels: labels) + end + + private + + def registry + Prometheus::Client.registry + end + +end diff --git a/app/util/health_server.rb b/app/util/health_server.rb new file mode 100644 index 00000000..0e8d4819 --- /dev/null +++ b/app/util/health_server.rb @@ -0,0 +1,107 @@ +# frozen_string_literal: true + +require "socket" +require "rack/handler/webrick" +require "prometheus/client/formats/text" + +class HealthServer + + def initialize(name: "unnamed-process") + @name = name + end + + def call(env) + case env["PATH_INFO"] + when "/health" + ok + when "/metrics" + metrics + when "/" + root + else + not_found + end + end + + private + + def root + [200, { "Content-Type" => "text/plain" }, ["#{@name} (pid: #{Process.pid}, host: #{Socket.gethostname})"]] + end + + def ok + [200, { "Content-Type" => "text/plain" }, ["OK"]] + end + + def not_found + [404, { "Content-Type" => "text/plain" }, ["Not Found"]] + end + + def metrics + registry = Prometheus::Client.registry + body = Prometheus::Client::Formats::Text.marshal(registry) + [200, { "Content-Type" => "text/plain" }, [body]] + end + + class << self + + def run(default_port: 9090, **options) + port = ENV.fetch("HEALTH_SERVER_PORT", default_port) + + Rack::Handler::WEBrick.run(new(**options), + Port: port, + BindAddress: bind_address, + AccessLog: [], + Logger: LoggerProxy.new) + rescue Errno::EADDRINUSE + Postal.logger.info "health server port (#{bind_address}:#{port}) is already " \ + "in use, not starting health server" + end + + def bind_address + ENV.fetch("HEALTH_SERVER_BIND_ADDRESS", "127.0.0.1") + end + + def start(**options) + thread = Thread.new { run(**options) } + thread.abort_on_exception = false + thread + end + + end + + class LoggerProxy + + [:info, :debug, :warn, :error, :fatal].each do |severity| + define_method(severity) do |message| + add(severity, message) + end + + define_method("#{severity}?") do + severity != :debug + end + end + + def add(severity, message) + return if severity == :debug + + case message + when /\AWEBrick::HTTPServer#start:.*port=(\d+)/ + Postal.logger.info "started health server on port #{::Regexp.last_match(1)}", component: "true" + when /\AWEBrick::HTTPServer#start done/ + Postal.logger.info "stopped health server" + when /\AWEBrick [\d.]+/, + /\Aruby ([\d.]+)/, + /\ARack::Handler::WEBrick is mounted/, + /\Aclose TCPSocket/, + /\Agoing to shutdown/ + # Don't actually print routine messages to avoid too much + # clutter when processes start it + else + Postal.logger.debug message, component: "true" + end + end + + end + +end diff --git a/script/smtp_server.rb b/script/smtp_server.rb index fabadcd6..87e57c0f 100644 --- a/script/smtp_server.rb +++ b/script/smtp_server.rb @@ -4,4 +4,6 @@ $stderr.sync = true require_relative "../config/environment" + +HealthServer.start(name: "smtp-server", default_port: 9091) SMTPServer::Server.new(debug: true).run diff --git a/script/worker.rb b/script/worker.rb index 695ed8d7..f53c3740 100755 --- a/script/worker.rb +++ b/script/worker.rb @@ -5,4 +5,6 @@ $stderr.sync = true require_relative "../config/environment" + +HealthServer.start(name: "worker", default_port: 9090) Worker::Process.new.run