Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multi workers assign syntax on <worker> section. Fix #2289 #2292

Merged
merged 9 commits into from
Feb 25, 2019

Conversation

cosmo0920
Copy link
Contributor

This PR contains a naive <worker N-M> syntax implementation.
Signed-off-by: Hiroshi Hatake hatake@clear-code.com

Signed-off-by: Hiroshi Hatake <hatake@clear-code.com>
@cosmo0920 cosmo0920 requested a review from repeatedly February 13, 2019 06:44
@cosmo0920
Copy link
Contributor Author

Any advice and feedback are appreciated.

@repeatedly repeatedly added the v1 label Feb 13, 2019
raise ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
end
target_worker_ids = target_worker_id_str.split("-")
if target_worker_ids.size == 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add duplication check to avoid following case?

<worker 0-3>
</worker>

<worker 3-7> # should be 4-7
</worker>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a collisions checker in d900a73.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I added more concreate error message commit: 03da2b3.

first_worker_id = target_worker_ids.first.to_i
last_worker_id = target_worker_ids.last.to_i
if first_worker_id > last_worker_id
raise ConfigError, "greater first_worker_id<#{first_worker_id}> than last_worker_id<#{last_worker_id}> specified by <worker> directive is not allowed. Available multi worker assign syntax is <smaller_worker_id>-<greater_worker_id>"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fluent::ConfigError

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used full-qualified name: 4200740.

first_worker_id.step(last_worker_id, 1) do |worker_id|
target_worker_id = worker_id.to_i
if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1)
raise ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


e.elements.each do |elem|
unless ['source', 'match', 'filter', 'label'].include?(elem.name)
raise ConfigError, "<worker> section cannot have <#{elem.name}> directive"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

else
target_worker_id = target_worker_id_str.to_i
if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1)
raise ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

raise ConfigError, "<worker> section cannot have <#{elem.name}> directive"
e.elements.each do |elem|
unless ['source', 'match', 'filter', 'label'].include?(elem.name)
raise ConfigError, "<worker> section cannot have <#{elem.name}> directive"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Signed-off-by: Hiroshi Hatake <hatake@clear-code.com>
Signed-off-by: Hiroshi Hatake <hatake@clear-code.com>
@cosmo0920 cosmo0920 force-pushed the support-multi-workers-assign-syntax branch from d900a73 to 6b6fb93 Compare February 19, 2019 01:26
Signed-off-by: Hiroshi Hatake <hatake@clear-code.com>
@cosmo0920 cosmo0920 force-pushed the support-multi-workers-assign-syntax branch from 297cabb to 03da2b3 Compare February 19, 2019 02:32
@repeatedly
Copy link
Member

This patch something wrong.
I tried following configuration but it doesn't work correctly.

<system>
  workers 6
</system>

<worker 0-1>
<source>
  @type forward
</source>

<filter test>
  @type record_transformer
  enable_ruby
  <record>
    worker_id ${ENV['SERVERENGINE_WORKER_ID']}
  </record>
</filter>

<match test>
  @type stdout
</match>
</worker>

<worker 2-3>
<source>
  @type tcp
  <parse>
    @type none
  </parse>
  tag test
</source>

<filter test>
  @type record_transformer
  enable_ruby
  <record>
    worker_id ${ENV['SERVERENGINE_WORKER_ID']}
  </record>
</filter>

<match test>
  @type stdout
</match>
</worker>

<worker 4-5>
<source>
  @type udp
  <parse>
    @type none
  </parse>
  tag test
</source>

<filter test>
  @type record_transformer
  enable_ruby
  <record>
    worker_id ${ENV['SERVERENGINE_WORKER_ID']}
  </record>
</filter>

<match test>
  @type stdout
</match>
</worker>

The result should show two worker ids for each input plugin but it always shows fixed number, e.g. 1 for forward case.

@cosmo0920
Copy link
Contributor Author

Oh, Element#set_target_worker_id(worker_id) can specify only one worker id....
We should do more work to support this feature. 😰

Signed-off-by: Hiroshi Hatake <hatake@clear-code.com>
Signed-off-by: Hiroshi Hatake <hatake@clear-code.com>
Signed-off-by: Hiroshi Hatake <hatake@clear-code.com>
@cosmo0920 cosmo0920 force-pushed the support-multi-workers-assign-syntax branch from c7e1885 to 9b898c1 Compare February 20, 2019 06:10
@cosmo0920
Copy link
Contributor Author

cosmo0920 commented Feb 20, 2019

I've confirmed that the above configuration works correctly with fluent-cat, telnet, and nc in 9b898c1.

Fluentd Log

Boot Log

2019-02-20 15:13:53 +0900 [info]: parsing config file is succeeded path="pr2292.conf"
2019-02-20 15:13:53 +0900 [info]: adding filter pattern="test" type="record_transformer"
2019-02-20 15:13:53 +0900 [info]: adding match pattern="test" type="stdout"
2019-02-20 15:13:53 +0900 [info]: adding filter pattern="test" type="record_transformer"
2019-02-20 15:13:53 +0900 [info]: adding match pattern="test" type="stdout"
2019-02-20 15:13:53 +0900 [info]: adding filter pattern="test" type="record_transformer"
2019-02-20 15:13:53 +0900 [info]: adding match pattern="test" type="stdout"
2019-02-20 15:13:53 +0900 [info]: adding source type="forward"
2019-02-20 15:13:53 +0900 [info]: adding source type="tcp"
2019-02-20 15:13:53 +0900 [info]: adding source type="udp"
2019-02-20 15:13:53 +0900 [info]: using configuration file: <ROOT>
  <system>
    workers 6
  </system>
  <worker 0-1>
    <source>
      @type forward
    </source>
    <filter test>
      @type record_transformer
      enable_ruby 
      <record>
        worker_id ${ENV['SERVERENGINE_WORKER_ID']}
      </record>
    </filter>
    <match test>
      @type stdout
    </match>
  </worker>
  <worker 2-3>
    <source>
      @type tcp
      tag "test"
      <parse>
        @type "none"
      </parse>
    </source>
    <filter test>
      @type record_transformer
      enable_ruby 
      <record>
        worker_id ${ENV['SERVERENGINE_WORKER_ID']}
      </record>
    </filter>
    <match test>
      @type stdout
    </match>
  </worker>
  <worker 4-5>
    <source>
      @type udp
      tag "test"
      <parse>
        @type "none"
      </parse>
    </source>
    <filter test>
      @type record_transformer
      enable_ruby 
      <record>
        worker_id ${ENV['SERVERENGINE_WORKER_ID']}
      </record>
    </filter>
    <match test>
      @type stdout
    </match>
  </worker>
</ROOT>
2019-02-20 15:13:53 +0900 [info]: starting fluentd-1.3.3 pid=98800 ruby="2.6.0"
2019-02-20 15:13:53 +0900 [info]: spawn command to main:  cmdline=["/Users/cosmo/.rbenv/versions/2.6.0/bin/ruby", "-Eascii-8bit:ascii-8bit", "-rbundler/setup", "/Users/cosmo/GitHub/fluentd/vendor/bundle/ruby/2.6.0/bin/fluentd", "-c", "pr2292.conf", "--under-supervisor"]
2019-02-20 15:13:54 +0900 [info]: #3 adding filter pattern="test" type="record_transformer"
2019-02-20 15:13:54 +0900 [info]: #1 adding filter pattern="test" type="record_transformer"
2019-02-20 15:13:54 +0900 [info]: gem 'fluentd' version '1.3.3'
2019-02-20 15:13:54 +0900 [info]: gem 'fluent-plugin-kinesis' version '2.1.1'
2019-02-20 15:13:54 +0900 [info]: gem 'fluent-plugin-prometheus' version '1.3.0'
2019-02-20 15:13:54 +0900 [info]: #0 adding filter pattern="test" type="record_transformer"
2019-02-20 15:13:54 +0900 [info]: #2 adding filter pattern="test" type="record_transformer"
2019-02-20 15:13:54 +0900 [info]: #5 adding filter pattern="test" type="record_transformer"
2019-02-20 15:13:55 +0900 [info]: #4 adding filter pattern="test" type="record_transformer"
2019-02-20 15:13:55 +0900 [info]: #3 adding match pattern="test" type="stdout"
2019-02-20 15:13:55 +0900 [info]: #1 adding match pattern="test" type="stdout"
2019-02-20 15:13:55 +0900 [info]: #0 adding match pattern="test" type="stdout"
2019-02-20 15:13:55 +0900 [info]: #2 adding match pattern="test" type="stdout"
2019-02-20 15:13:55 +0900 [info]: #5 adding match pattern="test" type="stdout"
2019-02-20 15:13:55 +0900 [info]: #4 adding match pattern="test" type="stdout"
2019-02-20 15:13:55 +0900 [info]: #3 adding source type="tcp"
2019-02-20 15:13:55 +0900 [info]: #1 adding source type="forward"
2019-02-20 15:13:55 +0900 [info]: #0 adding source type="forward"
2019-02-20 15:13:55 +0900 [info]: #5 adding source type="udp"
2019-02-20 15:13:55 +0900 [info]: #2 adding source type="tcp"
2019-02-20 15:13:55 +0900 [info]: #1 starting fluentd worker pid=98830 ppid=98800 worker=1
2019-02-20 15:13:55 +0900 [info]: #1 listening port port=24224 bind="0.0.0.0"
2019-02-20 15:13:55 +0900 [info]: #4 adding source type="udp"
2019-02-20 15:13:55 +0900 [info]: #0 starting fluentd worker pid=98829 ppid=98800 worker=0
2019-02-20 15:13:55 +0900 [info]: #0 listening port port=24224 bind="0.0.0.0"
2019-02-20 15:13:55 +0900 [info]: #3 starting fluentd worker pid=98832 ppid=98800 worker=3
2019-02-20 15:13:55 +0900 [info]: #3 fluentd worker is now running worker=3
2019-02-20 15:13:55 +0900 [info]: #1 fluentd worker is now running worker=1
2019-02-20 15:13:55 +0900 [info]: #0 fluentd worker is now running worker=0
2019-02-20 15:13:55 +0900 [info]: #2 starting fluentd worker pid=98831 ppid=98800 worker=2
2019-02-20 15:13:55 +0900 [info]: #5 starting fluentd worker pid=98834 ppid=98800 worker=5
2019-02-20 15:13:55 +0900 [info]: #5 listening udp socket bind="0.0.0.0" port=5160
2019-02-20 15:13:55 +0900 [info]: #2 fluentd worker is now running worker=2
2019-02-20 15:13:55 +0900 [info]: #5 fluentd worker is now running worker=5
2019-02-20 15:13:55 +0900 [info]: #4 starting fluentd worker pid=98833 ppid=98800 worker=4
2019-02-20 15:13:55 +0900 [info]: #4 listening udp socket bind="0.0.0.0" port=5160
2019-02-20 15:13:55 +0900 [info]: #4 fluentd worker is now running worker=4

Running Log

2019-02-20 15:13:56.693574000 +0900 test: {"message":"yaaaaay","worker_id":"5"}
2019-02-20 15:13:58.018795000 +0900 test: {"message":"yaaaaay","worker_id":"4"}
2019-02-20 15:13:59.043151000 +0900 test: {"message":"yaaaaay","worker_id":"5"}
2019-02-20 15:13:59.923099000 +0900 test: {"message":"yaaaaay","worker_id":"5"}
2019-02-20 15:14:06.209343000 +0900 test: {"message":"yaaay!\r","worker_id":"2"}
2019-02-20 15:14:12.112125000 +0900 test: {"message":"yaaay!\r","worker_id":"3"}
2019-02-20 15:14:31.220185000 +0900 test: {"message":"yaaaay!","worker_id":"0"}
2019-02-20 15:14:34.112021000 +0900 test: {"message":"yaaaay!","worker_id":"0"}
2019-02-20 15:14:36.437463000 +0900 test: {"message":"yaaaay!","worker_id":"0"}
2019-02-20 15:14:38.731741000 +0900 test: {"message":"yaaaay!","worker_id":"0"}
2019-02-20 15:14:40.946731000 +0900 test: {"message":"yaaaay!","worker_id":"0"}
2019-02-20 15:14:43.942655000 +0900 test: {"message":"yaaaay!","worker_id":"0"}
2019-02-20 15:14:44.909934000 +0900 test: {"message":"yaaaay!","worker_id":"0"}
2019-02-20 15:14:47.698479000 +0900 test: {"message":"yaaaay!","worker_id":"1"}
2019-02-20 15:14:48.826874000 +0900 test: {"message":"yaaaay!","worker_id":"0"}

Another Terminal Log

% echo "yaaaaay" | nc -u localhost 5160
% echo "yaaaaay" | nc -u localhost 5160
% echo "yaaaaay" | nc -u localhost 5160
% echo "yaaaaay" | nc -u localhost 5160
% telnet localhost 5170
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
yaaay!
^]
telnet> Connection closed.
% telnet localhost 5170
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
yaaay!
^]
telnet> Connection closed.
% echo '{"message":"yaaaay!"}' | bundle exec fluent-cat test 
% echo '{"message":"yaaaay!"}' | bundle exec fluent-cat test 
% echo '{"message":"yaaaay!"}' | bundle exec fluent-cat test
% echo '{"message":"yaaaay!"}' | bundle exec fluent-cat test 
% echo '{"message":"yaaaay!"}' | bundle exec fluent-cat test
% echo '{"message":"yaaaay!"}' | bundle exec fluent-cat test
% echo '{"message":"yaaaay!"}' | bundle exec fluent-cat test 
% echo '{"message":"yaaaay!"}' | bundle exec fluent-cat test 
% echo '{"message":"yaaaay!"}' | bundle exec fluent-cat test

@@ -64,26 +64,67 @@ def initialize(log:, system_config: SystemConfig.new)
attr_reader :labels

def configure(conf)
used_worker_ids = []
available_worker_ids = []
0.step(Fluent::Engine.system_config.workers - 1, 1).each do |id|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we use available_worker_ids = (0..Fluent::Engine.system_config.workers - 1).to_a like code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh.... We can use it. 44f206a

@@ -52,7 +52,8 @@ def fluentd_worker_id

def configure(conf)
if conf.respond_to?(:for_this_worker?) && conf.for_this_worker?
system_config_override(workers: 1)
workers = conf.target_worker_ids.size || 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When conf.target_worker_ids.size return nil or false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a checking code on 44b67c9.

Signed-off-by: Hiroshi Hatake <hatake@clear-code.com>
Signed-off-by: Hiroshi Hatake <hatake@clear-code.com>
@repeatedly
Copy link
Member

@cosmo0920 On your environment, does Errno::EBADF in in_udp happen during shutdown?

@cosmo0920
Copy link
Contributor Author

cosmo0920 commented Feb 22, 2019

In my environment, sometimes IOError error="stream closed in another thread" occurred during shutdown.

2019-02-22 21:02:15 +0900 [error]: #5 unexpected error in processing UDP data error_class=IOError error="stream closed in another thread"
  2019-02-22 21:02:15 +0900 [error]: #5 /Users/cosmo/GitHub/fluentd/lib/fluent/plugin_helper/server.rb:538:in `recvfrom'
  2019-02-22 21:02:15 +0900 [error]: #5 /Users/cosmo/GitHub/fluentd/lib/fluent/plugin_helper/server.rb:538:in `on_readable_with_sock'

@cosmo0920
Copy link
Contributor Author

I've tested the above configuration with the following code:

require 'socket'

u = UDPSocket.new
u.connect('localhost', 5160)

threads = []
threads << Thread.new {
  loop do
    u.send "uuuu", 0
    sleep 0.5
  end
}
threads << Thread.new {
  loop do
    u.send "aaaa", 0
    sleep 0.3
  end
}
threads << Thread.new {
  loop do
    u.send "waaa", 0
    sleep 0.1
  end
}

threads.each { |thr| thr.join }

Nothing Errno::EBADF happened in my macOS Mojave environment...

@repeatedly
Copy link
Member

Nothing Errno::EBADF happened in my macOS Mojave environment...

Yeah. Maybe, multiprocess or others trigger it.
BTW, I noticed UDPServer lacks several error handling so I added it.

#2310

@repeatedly repeatedly merged commit 07841c0 into master Feb 25, 2019
@repeatedly
Copy link
Member

Basic behabiour works as expected so merged.
Thanks for great work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants