Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka commitOffsetsInFinalize OOM on Flink #20689

Closed
damccorm opened this issue Jun 4, 2022 · 8 comments
Closed

Kafka commitOffsetsInFinalize OOM on Flink #20689

damccorm opened this issue Jun 4, 2022 · 8 comments

Comments

@damccorm
Copy link
Contributor

damccorm commented Jun 4, 2022

Hi,

I upgraded Beam from 2.19.0 (flink 1.9) to 2.25.0 (flink 1.11.1),And then it doesn't work。 

 
The cluster version I use is:
    jdk1.8
    apache-zookeeper-3.4.14
    hadoop-3.2.1
    flink-1.11.1
 
Submit job use command:


bin/flink run -m yarn-cluster -ynm "xxx" -yjm 2048 -ytm 8192 ./some-executable.jar \
--appName=xxxname
\
--runner=FlinkRunner \
--parallelism=2 \
--sourceKafkaUrl=192.168.12.13:9092 \
--sourceTopic=sometopic
\
--sourceGroupId=guofy-host-dev \
--sinkKafkaUrl=192.168.12.13:9092 \
--debug=true \
&

 
Yarn is ok but taskmanager.log has exceptioins. 
Kafka comsumer into an infinite loop, and finally report 
java.lang.OutOfMemoryError: GC overhead limit is exceeded.
Below is a partial log. Please help to analyze and solve it.

 
2020-10-27 21:54:19.685 INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-guofy-host-dev-6,
groupId=guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
2020-10-27 21:54:19.685 INFO  org.apache.kafka.common.utils.AppInfoParser 
- Kafka version: 2.6.0
2020-10-27 21:54:19.685 INFO  org.apache.kafka.clients.Metadata  - [Consumer
clientId=consumer-guofy-host-dev-6, groupId=guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
2020-10-27
21:54:19.685 INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
2020-10-27 21:54:19.685
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
2020-10-27 21:54:19.685
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
2020-10-27 21:54:19.685
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859685
2020-10-27 21:54:19.685
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859685
2020-10-27 21:54:19.686
INFO  org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7,
groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Subscribed to partition(s): guofangyu-vm-dev-0
2020-10-27
21:54:19.686 INFO  org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7,
groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Subscribed to partition(s): guofangyu-vm-dev-0
2020-10-27
21:54:19.686 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7,
groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Seeking to LATEST offset of partition g
uofangyu-vm-dev-0
2020-10-27
21:54:19.686 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7,
groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Seeking to LATEST offset of partition g
uofangyu-vm-dev-0
2020-10-27
21:54:19.688 INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7,
groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
2020-10-27
21:54:19.688 INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7,
groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
2020-10-27
21:54:19.690 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7,
groupId=Reader-0_offset_consumer_528781572_guofy-host-dev] Resetting offset for partition guofangy
u-vm-dev-0
to offset 0.
2020-10-27 21:54:19.690 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
- [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7, groupId=Reader-0_offset_consumer_528781572_guofy-host-dev]
Resetting offset for partition guofangy
u-vm-dev-0 to offset 0.
2020-10-27 21:54:19.691 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
- [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7, groupId=Reader-0_offset_consumer_528781572_guofy-host-dev]
Seeking to LATEST offset of partition g
uofangyu-vm-dev-0
2020-10-27 21:54:19.691 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
- [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7, groupId=Reader-0_offset_consumer_528781572_guofy-host-dev]
Seeking to LATEST offset of partition g
uofangyu-vm-dev-0
2020-10-27 21:54:19.693 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
- [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7, groupId=Reader-0_offset_consumer_528781572_guofy-host-dev]
Resetting offset for partition guofangy
u-vm-dev-0 to offset 0.
2020-10-27 21:54:19.693 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
- [Consumer clientId=consumer-Reader-0_offset_consumer_528781572_guofy-host-dev-7, groupId=Reader-0_offset_consumer_528781572_guofy-host-dev]
Resetting offset for partition guofangy
u-vm-dev-0 to offset 0.
2020-10-27 21:54:19.701 INFO  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource 
- Reader-0: Returning from consumer pool loop
2020-10-27 21:54:19.705 INFO  org.apache.kafka.clients.consumer.ConsumerConfig 
- ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset
= earliest
bootstrap.servers = [10.226.132.131:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id
= consumer-guofy-host-dev-8
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms
= 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms
= 500
fetch.min.bytes = 1
group.id = guofy-host-dev
group.instance.id = null
heartbeat.interval.ms
= 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported
= false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes
= 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters
= []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy
= [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms
= 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class
= null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin
= 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor
= 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds
= 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter
= 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes
= 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm
= https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location
= null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider
= null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location
= null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 
2020-10-27
21:54:19.705 INFO  org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values:
allow.auto.create.topics
= true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [10.226.132.131:9092]
check.crcs
= true
client.dns.lookup = use_all_dns_ips
client.id = consumer-guofy-host-dev-8
client.rack =
connections.max.idle.ms
= 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes
= 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = guofy-host-dev
group.instance.id
= null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported
= false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes
= 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters
= []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy
= [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms
= 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class
= null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin
= 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor
= 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds
= 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter
= 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes
= 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm
= https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location
= null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider
= null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location
= null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 
2020-10-27
21:54:19.707 INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
2020-10-27 21:54:19.707
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
2020-10-27 21:54:19.707 INFO 
org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
2020-10-27 21:54:19.707
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
2020-10-27 21:54:19.707
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859707
2020-10-27 21:54:19.707
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859707
2020-10-27 21:54:19.708
INFO  org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-guofy-host-dev-8,
groupId=guofy-host-dev] Subscribed to partition(s): guofangyu-vm-dev-1
2020-10-27 21:54:19.708 INFO 
org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-guofy-host-dev-8, groupId=guofy-host-dev]
Subscribed to partition(s): guofangyu-vm-dev-1
2020-10-27 21:54:19.708 INFO  org.apache.kafka.clients.consumer.KafkaConsumer 
- [Consumer clientId=consumer-guofy-host-dev-8, groupId=guofy-host-dev] Seeking to offset 0 for partition
guofangyu-vm-dev-1
2020-10-27 21:54:19.708 INFO  org.apache.kafka.clients.consumer.KafkaConsumer  -
[Consumer clientId=consumer-guofy-host-dev-8, groupId=guofy-host-dev] Seeking to offset 0 for partition
guofangyu-vm-dev-1
2020-10-27 21:54:19.708 INFO  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource 
- Reader-1: reading from guofangyu-vm-dev-1 starting at offset 0
2020-10-27 21:54:19.709 INFO  org.apache.kafka.clients.consumer.ConsumerConfig 
- ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset
= earliest
bootstrap.servers = [10.226.132.131:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id
= consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9
client.rack =
connections.max.idle.ms
= 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes
= 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = Reader-1_offset_consumer_819035414_guofy-host-dev
group.instance.id
= null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported
= false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes
= 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters
= []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy
= [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms
= 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class
= null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin
= 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor
= 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds
= 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter
= 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes
= 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm
= https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location
= null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider
= null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location
= null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 
2020-10-27
21:54:19.709 INFO  org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values:
allow.auto.create.topics
= true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [10.226.132.131:9092]
check.crcs
= true
client.dns.lookup = use_all_dns_ips
client.id = consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9
client.rack
=
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics
= true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = Reader-1_offset_consumer_819035414_guofy-host-dev
group.instance.id
= null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported
= false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes
= 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters
= []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy
= [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms
= 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class
= null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin
= 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor
= 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds
= 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter
= 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes
= 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm
= https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location
= null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider
= null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location
= null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 
2020-10-27
21:54:19.710 INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
2020-10-27 21:54:19.710
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
2020-10-27 21:54:19.710 INFO 
org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
2020-10-27 21:54:19.710
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
2020-10-27 21:54:19.710
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859710
2020-10-27 21:54:19.710
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859710
2020-10-27 21:54:19.711
INFO  org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9,
groupId=Reader-1_offset_consumer_819035414_guofy-host-dev] Subscribed to partition(s): guofangyu-vm-dev-1
2020-10-27
21:54:19.711 INFO  org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9,
groupId=Reader-1_offset_consumer_819035414_guofy-host-dev] Subscribed to partition(s): guofangyu-vm-dev-1
2020-10-27
21:54:19.711 INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-guofy-host-dev-8,
groupId=guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
2020-10-27 21:54:19.711 INFO  org.apache.kafka.clients.Metadata 
- [Consumer clientId=consumer-guofy-host-dev-8, groupId=guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
2020-10-27
21:54:19.711 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9,
groupId=Reader-1_offset_consumer_819035414_guofy-host-dev] Seeking to LATEST offset of partition g
uofangyu-vm-dev-1
2020-10-27
21:54:19.711 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9,
groupId=Reader-1_offset_consumer_819035414_guofy-host-dev] Seeking to LATEST offset of partition g
uofangyu-vm-dev-1
2020-10-27
21:54:19.714 INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9,
groupId=Reader-1_offset_consumer_819035414_guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
2020-10-27
21:54:19.714 INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9,
groupId=Reader-1_offset_consumer_819035414_guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
2020-10-27
21:54:19.716 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9,
groupId=Reader-1_offset_consumer_819035414_guofy-host-dev] Resetting offset for partition guofangy
u-vm-dev-1
to offset 0.
2020-10-27 21:54:19.716 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
- [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9, groupId=Reader-1_offset_consumer_819035414_guofy-host-dev]
Resetting offset for partition guofangy
u-vm-dev-1 to offset 0.
2020-10-27 21:54:19.717 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
- [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9, groupId=Reader-1_offset_consumer_819035414_guofy-host-dev]
Seeking to LATEST offset of partition g
uofangyu-vm-dev-1
2020-10-27 21:54:19.717 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
- [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9, groupId=Reader-1_offset_consumer_819035414_guofy-host-dev]
Seeking to LATEST offset of partition g
uofangyu-vm-dev-1
2020-10-27 21:54:19.719 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
- [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9, groupId=Reader-1_offset_consumer_819035414_guofy-host-dev]
Resetting offset for partition guofangy
u-vm-dev-1 to offset 0.
2020-10-27 21:54:19.719 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
- [Consumer clientId=consumer-Reader-1_offset_consumer_819035414_guofy-host-dev-9, groupId=Reader-1_offset_consumer_819035414_guofy-host-dev]
Resetting offset for partition guofangy
u-vm-dev-1 to offset 0.
2020-10-27 21:54:19.727 INFO  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource 
- Reader-1: Returning from consumer pool loop
2020-10-27 21:54:19.729 INFO  org.apache.kafka.clients.consumer.ConsumerConfig 
- ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset
= earliest
bootstrap.servers = [10.226.132.131:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id
= consumer-guofy-host-dev-10
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms
= 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms
= 500
fetch.min.bytes = 1
group.id = guofy-host-dev
group.instance.id = null
heartbeat.interval.ms
= 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported
= false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes
= 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters
= []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy
= [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms
= 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class
= null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin
= 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor
= 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds
= 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter
= 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes
= 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm
= https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location
= null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider
= null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location
= null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 
2020-10-27
21:54:19.729 INFO  org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values:
allow.auto.create.topics
= true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [10.226.132.131:9092]
check.crcs
= true
client.dns.lookup = use_all_dns_ips
client.id = consumer-guofy-host-dev-10
client.rack =
connections.max.idle.ms
= 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes
= 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = guofy-host-dev
group.instance.id
= null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported
= false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes
= 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters
= []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy
= [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms
= 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class
= null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin
= 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor
= 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds
= 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter
= 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes
= 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm
= https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location
= null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider
= null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location
= null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 
2020-10-27
21:54:19.731 INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
2020-10-27 21:54:19.731
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
2020-10-27 21:54:19.731 INFO 
org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
2020-10-27 21:54:19.731
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
2020-10-27 21:54:19.731
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859731
2020-10-27 21:54:19.731
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859731
2020-10-27 21:54:19.732
INFO  org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-guofy-host-dev-10,
groupId=guofy-host-dev] Subscribed to partition(s): guofangyu-vm-dev-0
2020-10-27 21:54:19.732 INFO 
org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-guofy-host-dev-10, groupId=guofy-host-dev]
Subscribed to partition(s): guofangyu-vm-dev-0
2020-10-27 21:54:19.732 INFO  org.apache.kafka.clients.consumer.KafkaConsumer 
- [Consumer clientId=consumer-guofy-host-dev-10, groupId=guofy-host-dev] Seeking to offset 0 for partition
guofangyu-vm-dev-0
2020-10-27 21:54:19.732 INFO  org.apache.kafka.clients.consumer.KafkaConsumer  -
[Consumer clientId=consumer-guofy-host-dev-10, groupId=guofy-host-dev] Seeking to offset 0 for partition
guofangyu-vm-dev-0
2020-10-27 21:54:19.732 INFO  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource 
- Reader-0: reading from guofangyu-vm-dev-0 starting at offset 0
2020-10-27 21:54:19.732 INFO  org.apache.kafka.clients.consumer.ConsumerConfig 
- ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset
= earliest
bootstrap.servers = [10.226.132.131:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id
= consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11
client.rack =
connections.max.idle.ms
= 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes
= 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = Reader-0_offset_consumer_803275858_guofy-host-dev
group.instance.id
= null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported
= false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes
= 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters
= []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy
= [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms
= 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class
= null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin
= 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor
= 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds
= 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter
= 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes
= 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm
= https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location
= null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider
= null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location
= null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 
2020-10-27
21:54:19.732 INFO  org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values:
allow.auto.create.topics
= true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [10.226.132.131:9092]
check.crcs
= true
client.dns.lookup = use_all_dns_ips
client.id = consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11
client.rack
=
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics
= true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = Reader-0_offset_consumer_803275858_guofy-host-dev
group.instance.id
= null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported
= false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes
= 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters
= []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy
= [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms
= 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class
= null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin
= 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor
= 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds
= 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter
= 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes
= 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm
= https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location
= null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider
= null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location
= null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 
2020-10-27
21:54:19.734 INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
2020-10-27 21:54:19.734
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
2020-10-27 21:54:19.734 INFO 
org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
2020-10-27 21:54:19.734
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
2020-10-27 21:54:19.734
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859734
2020-10-27 21:54:19.734
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859734
2020-10-27 21:54:19.734
INFO  org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11,
groupId=Reader-0_offset_consumer_803275858_guofy-host-dev] Subscribed to partition(s): guofangyu-vm-dev-0
2020-10-27
21:54:19.734 INFO  org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11,
groupId=Reader-0_offset_consumer_803275858_guofy-host-dev] Subscribed to partition(s): guofangyu-vm-dev-0
2020-10-27
21:54:19.735 INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-guofy-host-dev-10,
groupId=guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
2020-10-27 21:54:19.735 INFO  org.apache.kafka.clients.Metadata 
- [Consumer clientId=consumer-guofy-host-dev-10, groupId=guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
2020-10-27
21:54:19.735 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11,
groupId=Reader-0_offset_consumer_803275858_guofy-host-dev] Seeking to LATEST offset of partition
guofangyu-vm-dev-0
2020-10-27
21:54:19.735 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11,
groupId=Reader-0_offset_consumer_803275858_guofy-host-dev] Seeking to LATEST offset of partition
guofangyu-vm-dev-0
2020-10-27
21:54:19.737 INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11,
groupId=Reader-0_offset_consumer_803275858_guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
2020-10-27
21:54:19.737 INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11,
groupId=Reader-0_offset_consumer_803275858_guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
2020-10-27
21:54:19.739 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState  - [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11,
groupId=Reader-0_offset_consumer_803275858_guofy-host-dev] Resetting offset for partition guofang
yu-vm-dev-0
to offset 0.
2020-10-27 21:54:19.739 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
- [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11, groupId=Reader-0_offset_consumer_803275858_guofy-host-dev]
Resetting offset for partition guofang
yu-vm-dev-0 to offset 0.
2020-10-27 21:54:19.740 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
- [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11, groupId=Reader-0_offset_consumer_803275858_guofy-host-dev]
Seeking to LATEST offset of partition
guofangyu-vm-dev-0
2020-10-27 21:54:19.740 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
- [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11, groupId=Reader-0_offset_consumer_803275858_guofy-host-dev]
Seeking to LATEST offset of partition
guofangyu-vm-dev-0
2020-10-27 21:54:19.741 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
- [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11, groupId=Reader-0_offset_consumer_803275858_guofy-host-dev]
Resetting offset for partition guofang
yu-vm-dev-0 to offset 0.
2020-10-27 21:54:19.741 INFO  o.apache.kafka.clients.consumer.internals.SubscriptionState 
- [Consumer clientId=consumer-Reader-0_offset_consumer_803275858_guofy-host-dev-11, groupId=Reader-0_offset_consumer_803275858_guofy-host-dev]
Resetting offset for partition guofang
yu-vm-dev-0 to offset 0.
2020-10-27 21:54:19.750 INFO  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource 
- Reader-0: Returning from consumer pool loop
2020-10-27 21:54:19.752 INFO  org.apache.kafka.clients.consumer.ConsumerConfig 
- ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset
= earliest
bootstrap.servers = [10.226.132.131:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id
= consumer-guofy-host-dev-12
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms
= 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms
= 500
fetch.min.bytes = 1
group.id = guofy-host-dev
group.instance.id = null
heartbeat.interval.ms
= 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported
= false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes
= 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters
= []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy
= [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms
= 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class
= null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin
= 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor
= 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds
= 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter
= 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes
= 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm
= https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location
= null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider
= null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location
= null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 
2020-10-27
21:54:19.752 INFO  org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values:
allow.auto.create.topics
= true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [10.226.132.131:9092]
check.crcs
= true
client.dns.lookup = use_all_dns_ips
client.id = consumer-guofy-host-dev-12
client.rack =
connections.max.idle.ms
= 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes
= 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = guofy-host-dev
group.instance.id
= null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported
= false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes
= 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters
= []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy
= [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms
= 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class
= null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin
= 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor
= 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds
= 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter
= 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes
= 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm
= https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location
= null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider
= null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location
= null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 
2020-10-27
21:54:19.754 INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
2020-10-27 21:54:19.754
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
2020-10-27 21:54:19.754 INFO 
org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
2020-10-27 21:54:19.754
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
2020-10-27 21:54:19.754
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859754
2020-10-27 21:54:19.754
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859754
2020-10-27 21:54:19.754
INFO  org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-guofy-host-dev-12,
groupId=guofy-host-dev] Subscribed to partition(s): guofangyu-vm-dev-1
2020-10-27 21:54:19.754 INFO 
org.apache.kafka.clients.consumer.KafkaConsumer  - [Consumer clientId=consumer-guofy-host-dev-12, groupId=guofy-host-dev]
Subscribed to partition(s): guofangyu-vm-dev-1
2020-10-27 21:54:19.755 INFO  org.apache.kafka.clients.consumer.KafkaConsumer 
- [Consumer clientId=consumer-guofy-host-dev-12, groupId=guofy-host-dev] Seeking to offset 0 for partition
guofangyu-vm-dev-1
2020-10-27 21:54:19.755 INFO  org.apache.kafka.clients.consumer.KafkaConsumer  -
[Consumer clientId=consumer-guofy-host-dev-12, groupId=guofy-host-dev] Seeking to offset 0 for partition
guofangyu-vm-dev-1
2020-10-27 21:54:19.755 INFO  org.apache.beam.sdk.io.kafka.KafkaUnboundedSource 
- Reader-1: reading from guofangyu-vm-dev-1 starting at offset 0
2020-10-27 21:54:19.755 INFO  org.apache.kafka.clients.consumer.ConsumerConfig 
- ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset
= earliest
bootstrap.servers = [10.226.132.131:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id
= consumer-Reader-1_offset_consumer_576918038_guofy-host-dev-13
client.rack =
connections.max.idle.ms
= 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes
= 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = Reader-1_offset_consumer_576918038_guofy-host-dev
group.instance.id
= null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported
= false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes
= 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters
= []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy
= [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms
= 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class
= null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin
= 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor
= 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds
= 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter
= 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes
= 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm
= https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location
= null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider
= null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location
= null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 
2020-10-27
21:54:19.755 INFO  org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values:
allow.auto.create.topics
= true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [10.226.132.131:9092]
check.crcs
= true
client.dns.lookup = use_all_dns_ips
client.id = consumer-Reader-1_offset_consumer_576918038_guofy-host-dev-13
client.rack
=
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics
= true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = Reader-1_offset_consumer_576918038_guofy-host-dev
group.instance.id
= null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported
= false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes
= 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters
= []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy
= [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms
= 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class
= null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin
= 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor
= 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds
= 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter
= 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes
= 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm
= https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location
= null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider
= null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location
= null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 
2020-10-27
21:54:19.757 INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
2020-10-27 21:54:19.757
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka version: 2.6.0
2020-10-27 21:54:19.757 INFO 
org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
2020-10-27 21:54:19.757
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId: 62abe01bee039651
2020-10-27 21:54:19.757
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859757
2020-10-27 21:54:19.757
INFO  org.apache.kafka.common.utils.AppInfoParser  - Kafka startTimeMs: 1603806859757
2020-10-27 21:54:19.757
INFO  org.apache.kafka.clients.Metadata  - [Consumer clientId=consumer-guofy-host-dev-12, groupId=guofy-host-dev]
Cluster ID: EVoHjOG8SwG7x5F-8y2cYA
2020-10-27 21:54:19.757 INFO  org.apache.kafka.clients.Metadata 
- [Consumer clientId=consumer-guofy-host-dev-12, groupId=guofy-host-dev] Cluster ID: EVoHjOG8SwG7x5F-8y2cYA

 

It worked fine for me with beam version 2.19.0,But 2.25.0 doesn't work。

Imported from Jira BEAM-11148. Original Jira may contain additional context.
Reported by: titansfy.

@kennknowles
Copy link
Member

@johnjcasey has been looking at KafkaIO and may have updates on this one.

@kennknowles
Copy link
Member

Bringing over a significant comment:

After a lot of attempts, when I deleted this line of code, it worked fine.

   PCollection<KV<String, byte[]>> messages =
       pipeline
           .apply("Read Kafka", KafkaIO.<String, byte[]>read()
               .withBootstrapServers(options.getSourceKafkaUrl()) 
               .withTopic(options.getSourceTopic())
               .withKeyDeserializer(StringDeserializer.class)
               .withValueDeserializer(ByteArrayDeserializer.class)
               .withConsumerConfigUpdates(ImmutableMap.of(
                   "enable.auto.commit", true,
                   "group.id", options.getSourceGroupId()))
               //.commitOffsetsInFinalize() // This is the code that caused the memory leak. I think this is a serious BUG.
               .withoutMetadata()
           );

Looking at THE JVM GC, the old age space is not rising as much as before

@johnjcasey
Copy link
Contributor

I have very little idea where we could be leaking memory in commitOffsets, but the code hasn't substantially changed since the original bug report.

@Abacn can you take a look at this while working on Kafka testing?

@kennknowles kennknowles added P2 and removed P1 labels Mar 14, 2023
@kennknowles
Copy link
Member

This seems like P2, empirically. Have we got more reports of similar problems?

@johnjcasey
Copy link
Contributor

None that I'm aware of

@Abacn
Copy link
Contributor

Abacn commented Mar 14, 2023

The effect of this option is set commitOffsets() which setCommitOffsetEnabled(true). As far as I know ReadFromKafkaViaSDF is not quite working on Flink. This is likely related to #20979

@yianni
Copy link

yianni commented Apr 19, 2023

We faced this issue with the Flink runner, but we can't give more details since it was long ago.

@scwhittle
Copy link
Contributor

#31682 removes a reshuffle to random keys which was previously done when commit offsets in finalize were enabled with the SDF implementation. Previously this was very inefficient and would use much more resources. I'm going to mark this as fixed but reopen if there are more recent OOMs.

@github-actions github-actions bot added this to the 2.60.0 Release milestone Sep 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants