diff --git a/lib/racecar.rb b/lib/racecar.rb index 6c8b379e..934c5870 100644 --- a/lib/racecar.rb +++ b/lib/racecar.rb @@ -8,6 +8,7 @@ require "racecar/consumer_set" require "racecar/runner" require "racecar/parallel_runner" +require "racecar/forking_runner" require "racecar/producer" require "racecar/config" require "racecar/version" @@ -74,6 +75,8 @@ def self.runner(processor) if config.parallel_workers && config.parallel_workers > 1 ParallelRunner.new(runner: runner, config: config, logger: logger) + elsif config.forks && config.forks > 0 + ForkingRunner.new(runner: runner, config: config, logger: logger) else runner end diff --git a/lib/racecar/config.rb b/lib/racecar/config.rb index 93a0616c..3704fc5b 100644 --- a/lib/racecar/config.rb +++ b/lib/racecar/config.rb @@ -191,11 +191,24 @@ class Config < KingKonf::Config desc "Strategy for switching topics when there are multiple subscriptions. `exhaust-topic` will only switch when the consumer poll returns no messages. `round-robin` will switch after each poll regardless.\nWarning: `round-robin` will be the default in Racecar 3.x" string :multi_subscription_strategy, allowed_values: %w(round-robin exhaust-topic), default: "exhaust-topic" + desc "How many worker processes to fork" + integer :forks, default: 0 + # The error handler must be set directly on the object. attr_reader :error_handler attr_accessor :subscriptions, :logger, :parallel_workers + attr_accessor :prefork, :postfork + + def prefork + @prefork ||= lambda { |*_| } + end + + def postfork + @postfork ||= lambda { |*_| } + end + def statistics_interval_ms if Rdkafka::Config.statistics_callback statistics_interval * 1000 diff --git a/lib/racecar/forking_runner.rb b/lib/racecar/forking_runner.rb new file mode 100644 index 00000000..4de17c55 --- /dev/null +++ b/lib/racecar/forking_runner.rb @@ -0,0 +1,132 @@ +# frozen_string_literal: true + +module Racecar + class ForkingRunner + def initialize(runner:, config:, logger:, parent_monitor: ParentProcessMonitor.new) + @runner = runner + @config = config + @logger = logger + @pids = [] + @parent_monitor = parent_monitor + @running = false + end + + attr_reader :config, :runner, :logger, :pids, :parent_monitor + private :config, :runner, :logger, :pids, :parent_monitor + + def run + config.prefork.call + install_signal_handlers + + @running = true + + @pids = config.forks.times.map do |n| + pid = fork do + parent_monitor.child_post_fork + config.postfork.call + + parent_monitor.on_parent_exit do + logger.warn("Supervisor dead, exiting.") + runner.stop + end + + runner.run + end + logger.debug("Racecar forked consumer process #{pid}.") + + pid + end + + parent_monitor.parent_post_fork + + wait_for_child_processes + end + + def stop + @running = false + logger.debug("Racecar::ForkingRunner runner stopping #{Process.pid}.") + terminate_workers + end + + def running? + !!@running + end + + private + + def terminate_workers + pids.each do |pid| + begin + Process.kill("TERM", pid) + rescue Errno::ESRCH + logger.debug("Racecar::ForkingRunner Process not found #{Process.pid}.") + end + end + end + + def check_workers + pids.each do |pid| + unless worker_running?(pid) + logger.debug("A forked worker has exited unepxectedly. Shutting everything down.") + stop + return + end + end + end + + def worker_running?(pid) + _, status = Process.waitpid2(pid, Process::WNOHANG) + status.nil? + rescue Errno::ECHILD + false + end + + def wait_for_child_processes + pids.each do |pid| + begin + Process.wait(pid) + rescue Errno::ECHILD + end + end + end + + def install_signal_handlers + Signal.trap("CHLD") do |sid| + logger.warn("Received SIGCHLD") + check_workers if running? + end + Signal.trap("TERM") do |sid| + stop + end + Signal.trap("INT") do |sid| + stop + end + end + + class ParentProcessMonitor + def initialize(pipe_ends = IO.pipe) + @read_end, @write_end = pipe_ends + @monitor_thread = nil + end + + attr_reader :read_end, :write_end, :monitor_thread + private :read_end, :write_end, :monitor_thread + + def on_parent_exit(&block) + child_post_fork + monitor_thread = Thread.new do + IO.select([read_end]) + block.call + end + end + + def parent_post_fork + read_end.close + end + + def child_post_fork + write_end.close + end + end + end +end diff --git a/racecar-2.10.beta.3.3ad7680.gem b/racecar-2.10.beta.3.3ad7680.gem new file mode 100644 index 00000000..ea2cf559 Binary files /dev/null and b/racecar-2.10.beta.3.3ad7680.gem differ diff --git a/spec/integration/cooperative_sticky_assignment_spec.rb b/spec/integration/cooperative_sticky_assignment_spec.rb index df6388dd..3d2e5d8f 100644 --- a/spec/integration/cooperative_sticky_assignment_spec.rb +++ b/spec/integration/cooperative_sticky_assignment_spec.rb @@ -38,6 +38,7 @@ start_consumer wait_for_assignments(2) + reset_consumer_events publish_messages wait_for_a_few_messages @@ -46,8 +47,8 @@ wait_for_all_messages aggregate_failures do - expect_consumer0_did_not_have_partitions_revoked_but_consumer1_did expect_consumer0_took_over_processing_from_consumer1 + expect_consumer0_did_not_have_partitions_revoked_but_consumer1_did end end @@ -87,6 +88,10 @@ def start_consumer consumer_index_by_id["#{Process.pid}-#{thread.object_id}"] = consumers.index(runner) end + def reset_consumer_events + @received_consumer_events = [] + end + def terminate_consumer1 consumers[1].stop end @@ -105,10 +110,6 @@ def wait_for_all_messages end def set_config - Racecar.config.fetch_messages = 1 - Racecar.config.max_wait_time = 0.1 - Racecar.config.session_timeout = 6 # minimum allowed by default broker config - Racecar.config.heartbeat_interval = 1.5 Racecar.config.partition_assignment_strategy = "cooperative-sticky" Racecar.config.load_consumer_class(consumer_class) end diff --git a/spec/integration/forking_spec.rb b/spec/integration/forking_spec.rb new file mode 100644 index 00000000..61f6d9d1 --- /dev/null +++ b/spec/integration/forking_spec.rb @@ -0,0 +1,247 @@ +# frozen_string_literal: true + +require "racecar/cli" + +RSpec.describe "forking", type: :integration do + let!(:racecar_cli) { Racecar::Cli.new(["ForkingConsumer"]) } + let(:input_topic) { generate_input_topic_name } + let(:output_topic) { generate_output_topic_name } + let(:group_id) { generate_group_id } + + let(:input_messages) do + total_messages.times.map { |n| { payload: "message-#{n}", partition: n % topic_partitions } } + end + + let(:total_messages) { messages_per_topic * topic_partitions } + let(:topic_partitions) { 4 } + let(:messages_per_topic) { 10 } + let(:forks) { 2 } + let(:consumer_class) { ForkingConsumer ||= echo_consumer_class } + + before do + create_topic(topic: input_topic, partitions: topic_partitions) + create_topic(topic: output_topic, partitions: topic_partitions) + + consumer_class.subscribes_to(input_topic) + consumer_class.output_topic = output_topic + consumer_class.group_id = group_id + consumer_class.pipe_to_test = consumer_message_pipe + + Racecar.config.forks = forks + end + + after do |test| + Object.send(:remove_const, :ForkingConsumer) if defined?(ForkingConsumer) + end + + it "each fork consumes messages in parallel" do + start_racecar + + wait_for_assignments(forks) + publish_messages + wait_for_messages + + expect_equal_distribution_of_message_processing + expect_processing_times_to_mostly_overlap + end + + context "when the supervisor process receives TERM" do + let(:messages_per_topic) { 1 } + + it "terminates the worker processes" do + start_racecar + + wait_for_assignments(forks) + publish_messages + wait_for_messages + + Process.kill("TERM", supervisor_pid) + Process.wait(supervisor_pid) + + expect_processes_to_have_exited(worker_pids) + end + end + + context "when the supervisor process is killed (SIGKILL)" do + let(:messages_per_topic) { 1 } + + it "terminates the worker processes" do + start_racecar + + wait_for_assignments(forks) + publish_messages + wait_for_messages + + Process.kill("KILL", supervisor_pid) + Process.waitpid(supervisor_pid) + + expect_processes_to_have_exited(worker_pids) + end + end + + context "when a worker process exits" do + let(:messages_per_topic) { 1 } + + it "terminates all other processes gracefully" do + start_racecar + + wait_for_assignments(forks) + publish_messages + wait_for_messages + + Process.kill("KILL", worker_pids[0]) + Process.waitpid(supervisor_pid) + + expect_processes_to_have_exited(worker_pids) + end + end + + context "when prefork and postfork hooks have been set" do + before do + setup_hooks_and_message_pipe + end + + let(:messages) { [] } + + it "executes the prefork hook before forking and postfork after forking" do + start_racecar + + wait_for_fork_hook_messages + + prefork_message = messages.first + postfork_messages = messages.drop(1) + + expect(prefork_message).to match(hash_including({ + "hook" => "prefork", + "ppid" => Process.pid, + "pid" => supervisor_pid, + })) + + expect(postfork_messages).to match([ + hash_including({ + "hook" => "postfork", + "ppid" => supervisor_pid, + }) + ] * 2) + end + + def wait_for_fork_hook_messages + Timeout.timeout(15) do + sleep 0.2 until messages.length == 3 + end + end + + def raise_if_any_child_processes + Timeout.timeout(0.01) { Process.waitall } + rescue Timeout::Error + raise "Expected no child processes but `Process.waitall` blocked." + end + + def setup_hooks_and_message_pipe + pipe = IntegrationHelper::JSONPipe.new + + Racecar.config.prefork = ->(*_) { + pipe.write({hook: "prefork", pid: Process.pid, ppid: Process.ppid}) + raise_if_any_child_processes + } + + Racecar.config.postfork = ->(*_) { + pipe.write({hook: "postfork", pid: Process.pid, ppid: Process.ppid}) + } + + @hook_message_listener_thread = Thread.new do + loop do + messages << pipe.read + end + end + end + + after do + @hook_message_listener_thread&.terminate + end + end + + def expect_processes_to_have_exited(pids) + any_running = pids.any? { |pid| process_running?(pid) } + expect(any_running).to be false + end + + def worker_pids + messages_by_fork.keys.map(&:to_i) + end + + def process_running?(pid) + Process.waitpid(pid, Process::WNOHANG) + true + rescue Errno::ECHILD + false + end + + attr_accessor :supervisor_pid + def start_racecar + consumer_message_pipe + + self.supervisor_pid = fork do + at_exit do + nil + end + + racecar_cli.run + end + end + + def stop_racecar + Process.kill("TERM", supervisor_pid) + rescue Errno::ESRCH + end + + def expect_equal_distribution_of_message_processing + expect(message_count_by_fork.values).to eq([20, 20]) + end + + def expect_processing_times_to_mostly_overlap + expect(processing_time_intersection.size).to be_within(total_time*0.49).of(total_time) + end + + def total_time + processing_windows.map(&:size).max + end + + def processing_windows + processed_at_times_by_fork.values.map { |times| + ms = times.map { |s| s.to_f * 1000 } + (ms.min..ms.max) + } + end + + def processing_time_intersection + range_intersection(*processing_windows) + end + + def processed_at_times_by_fork + messages_by_fork.transform_values { |ms| + ms.map { |m| m.headers.fetch("processed_at") } + } + end + + def message_count_by_fork + messages_by_fork.transform_values(&:count) + end + + def messages_by_fork + incoming_messages.group_by { |m| m.headers.fetch("pid") } + end + + def range_intersection(range1, range2) + # Determine the maximum of the lower bounds and the minimum of the upper bounds + lower_bound = [range1.begin, range2.begin].max + upper_bound = [range1.end, range2.end].min + + # Check if the ranges actually intersect + if lower_bound < upper_bound || (lower_bound == upper_bound && range1.include?(upper_bound) && range2.include?(upper_bound)) + lower_bound...upper_bound + else + nil # or return an empty range, depending on your requirements + end + end +end diff --git a/spec/support/integration_helper.rb b/spec/support/integration_helper.rb index ec2dc208..e40f75bf 100644 --- a/spec/support/integration_helper.rb +++ b/spec/support/integration_helper.rb @@ -11,6 +11,7 @@ def self.included(klass) before do listen_for_consumer_events + set_config_for_speed end after do @@ -22,6 +23,7 @@ def self.included(klass) stop_racecar wait_for_child_processes reset_signal_handlers + reset_config end after(:all) do @@ -41,6 +43,7 @@ def stop_racecar return unless @cli_run_thread && @cli_run_thread.alive? racecar_cli.stop + $stderr.puts "Stopping racecar" @cli_run_thread.wakeup @cli_run_thread.join(2) @@ -57,7 +60,7 @@ def publish_messages(topic: input_topic, messages: input_messages) ) end.each(&:wait) - $stderr.puts "Published messages to topic: #{topic}; messages: #{messages}" + # $stderr.puts "Published messages to topic: #{topic}; messages: #{messages}" end def wait_for_messages(topic: output_topic, expected_message_count: input_messages.count) @@ -86,10 +89,13 @@ def wait_for_messages(topic: output_topic, expected_message_count: input_message end def wait_for_assignments(n) - $stderr.print "Waiting for assignments: #{n}" - Timeout.timeout(5*n) do - until assignment_events.length >= n + $stderr.print "\nWaiting for assignments (#{n} consumers) " + Timeout.timeout(10*n) do + loop do + consumer_count = assignment_events.uniq { |e| e["consumer_id"] }.length + break if consumer_count == n sleep 0.5 + print "." end end rescue Timeout::Error => e @@ -199,6 +205,17 @@ def reset_signal_handlers end end + def set_config_for_speed + Racecar.config.fetch_messages = 1 + Racecar.config.max_wait_time = 0.1 + Racecar.config.session_timeout = 6 # minimum allowed by default broker config + Racecar.config.heartbeat_interval = 0.5 + end + + def reset_config + Racecar.config = Racecar::Config.new + end + def wait_for_child_processes Timeout.timeout(5) do Process.waitall @@ -237,6 +254,7 @@ def process(message) def headers(message) { processed_by: self.class.consumer_id, + pid: Process.pid, processed_at: Process.clock_gettime(Process::CLOCK_MONOTONIC), partition: message.partition, } @@ -253,6 +271,7 @@ def initialize(actual_pipe = IO.pipe) def write(data) write_end.write(JSON.dump(data) + "\n") + write_end.flush end def read