Skip to content

Commit

Permalink
StaticServiceDiscovery can handle multiple services
Browse files Browse the repository at this point in the history
Signed-off-by: Yuta Iwama <ganmacs@gmail.com>
  • Loading branch information
ganmacs committed Aug 7, 2019
1 parent 9883173 commit be04758
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 23 deletions.
17 changes: 14 additions & 3 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,17 @@ def configure(conf)
socket_cache: socket_cache,
)

configs = conf.elements(name: 'server').map { |c| { type: :static, conf: c } }
configs = []

# rewrite for using server as sd_static
conf.elements(name: 'server').each do |s|
s.name = 'service'
end

unless conf.elements(name: 'service').empty?
configs << { type: :static, conf: conf }
end

conf.elements(name: 'service_discovery').each_with_index do |c, i|
configs << { type: @service_discovery[i][:@type], conf: c }
end
Expand All @@ -227,6 +237,7 @@ def configure(conf)
)

discovery_manager.services.each do |server|
# it's only for test
@nodes << server
unless @heartbeat_type == :none
begin
Expand All @@ -247,8 +258,8 @@ def configure(conf)
end
end

if @nodes.empty?
raise Fluent::ConfigError, "forward output plugin requires at least one <server> is required"
if discovery_manager.services.empty?
raise Fluent::ConfigError, "forward output plugin requires at least one node is required. Add <server> or <service_discovery>"
end

if !@keepalive && @keepalive_timeout
Expand Down
38 changes: 21 additions & 17 deletions lib/fluent/plugin/sd_static.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,31 @@ class StaticServiceDiscovery < ServiceDiscovery

LISTEN_PORT = 24224

desc 'The IP address or host name of the server.'
config_param :host, :string
desc 'The name of the server. Used for logging and certificate verification in TLS transport (when host is address).'
config_param :name, :string, default: nil
desc 'The port number of the host.'
config_param :port, :integer, default: LISTEN_PORT
desc 'The shared key per server.'
config_param :shared_key, :string, default: nil, secret: true
desc 'The username for authentication.'
config_param :username, :string, default: ''
desc 'The password for authentication.'
config_param :password, :string, default: '', secret: true
desc 'Marks a node as the standby node for an Active-Standby model between Fluentd nodes.'
config_param :standby, :bool, default: false
desc 'The load balancing weight.'
config_param :weight, :integer, default: 60
config_section :service, param_name: :service_configs do
desc 'The IP address or host name of the server.'
config_param :host, :string
desc 'The name of the server. Used for logging and certificate verification in TLS transport (when host is address).'
config_param :name, :string, default: nil
desc 'The port number of the host.'
config_param :port, :integer, default: LISTEN_PORT
desc 'The shared key per server.'
config_param :shared_key, :string, default: nil, secret: true
desc 'The username for authentication.'
config_param :username, :string, default: ''
desc 'The password for authentication.'
config_param :password, :string, default: '', secret: true
desc 'Marks a node as the standby node for an Active-Standby model between Fluentd nodes.'
config_param :standby, :bool, default: false
desc 'The load balancing weight.'
config_param :weight, :integer, default: 60
end

def configure(conf)
super

@services << ServiceDiscovery::Service.new(:static, @host, @port, @name, @weight, @standby, @username, @password, @shared_key)
@services = @service_configs.map do |s|
ServiceDiscovery::Service.new(:static, s.host, s.port, s.name, s.weight, s.standby, s.username, s.password, s.shared_key)
end
end

def start(queue = nil)
Expand Down
26 changes: 25 additions & 1 deletion test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def teardown

def create_driver(conf=CONFIG)
Fluent::Test::Driver::Output.new(Fluent::Plugin::ForwardOutput) {
attr_reader :sent_chunk_ids, :ack_handler
attr_reader :sent_chunk_ids, :ack_handler, :discovery_manager

def initialize
super
Expand Down Expand Up @@ -185,6 +185,30 @@ def try_write(chunk)
assert_equal([dummy_cert_path], d.instance.tls_ca_cert_path)
end

test 'server is an abbreviation of static type of service_discovery' do
@d = d = create_driver(%[
<server>
host 127.0.0.1
port 1234
</server>
<service_discovery>
@type static
<service>
host 127.0.0.1
port 1235
</service>
</service_discovery>
])

assert_equal 2, d.instance.discovery_manager.services.size
assert_equal '127.0.0.1', d.instance.discovery_manager.services[0].host
assert_equal 1234, d.instance.discovery_manager.services[0].port
assert_equal '127.0.0.1', d.instance.discovery_manager.services[1].host
assert_equal 1235, d.instance.discovery_manager.services[1].port
end

test 'compress_default_value' do
@d = d = create_driver
assert_equal :text, d.instance.compress
Expand Down
4 changes: 2 additions & 2 deletions test/plugin_helper/service_discovery/test_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def start(queue)
sdm.configure(
[
{ type: :file, conf: config_element('service_discovery', '', { 'path' => File.join(@sd_file_dir, 'config.yml') }) },
{ type: :static, conf: config_element('server', '', { 'host' => '127.0.0.2', 'port' => '5432' }) },
{ type: :static, conf: config_element('root', '', {}, [config_element('service', '', { 'host' => '127.0.0.2', 'port' => '5432' })]) },
],
)

Expand All @@ -56,7 +56,7 @@ def start(queue)
test 'no need to timer if only static' do
sdm = Fluent::PluginHelper::ServiceDiscovery::Manager.new(log: $log)
sdm.configure(
[{ type: :static, conf: config_element('server', '', { 'host' => '127.0.0.2', 'port' => '5432' }) }],
[{ type: :static, conf: config_element('root', '', {}, [config_element('service', '', { 'host' => '127.0.0.2', 'port' => '5432' })]) }]
)

assert_equal 1, sdm.services.size
Expand Down

0 comments on commit be04758

Please sign in to comment.