Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OffsetMonitor stopped producing metrics after disabling anonymous access on SRC and DST Kafka clusters #350

Open
disserakt opened this issue Sep 13, 2021 · 3 comments

Comments

@disserakt
Copy link

Hi to all!
@yangy0000 - I found strange OffsetMonitor behavior when I disabled ports without encryption and with anonymous access on Kafka clusters SRC and DST. That is, those ports that worked under the PLAINTEXT protocol. And only those ports on Kafka clusters, where the protocol is SASL_SSL, continued to work - that is, with encryption and authorization using the SCRAM mechanism.
And after that, the OffsetMonitor began to produce the following errors in logs:

{
    "level": "INFO",
    "location": {
        "class": "com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor$1",
        "file": "OffsetMonitor.java",
        "method": "run",
        "line": "132"
    },
    "logger": "com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor",
    "message": "TopicList starts updating",
    "host": "ureplicator-controller-2-778558bbf8-5twpf",
    "tags": [
        "ureplicator"
    ],
    "@timestamp": "2021-09-10T08:59:19.493Z",
    "thread": "topic-list-cron-0",
    "@version": "1"
}
{
    "level": "INFO",
    "location": {
        "class": "com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor",
        "file": "OffsetMonitor.java",
        "method": "updateTopicList",
        "line": "197"
    },
    "logger": "com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor",
    "message": "Update topicList",
    "host": "ureplicator-controller-2-778558bbf8-5twpf",
    "tags": [
        "ureplicator"
    ],
    "@timestamp": "2021-09-10T08:59:24.493Z",
    "thread": "topic-list-cron-0",
    "@version": "1"
}
    {
    "exception": {
        "class": "java.nio.channels.ClosedChannelException",
        "stacktrace": "java.nio.channels.ClosedChannelException\n\tat kafka.network.BlockingChannel.send(BlockingChannel.scala:112)\n\tat kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:88)\n\tat kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:86)\n\tat kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:114)\n\tat kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:70)\n\tat com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor.updateTopicList(OffsetMonitor.java:211)\n\tat com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor.access$600(OffsetMonitor.java:55)\n\tat com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor$1.run(OffsetMonitor.java:158)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)"
    },
    "level": "INFO",
    "location": {
        "class": "kafka.utils.Logging$class",
        "file": "Logging.scala",
        "method": "info",
        "line": "68"
    },
    "logger": "kafka.consumer.SimpleConsumer",
    "message": "Reconnect due to error:",
    "host": "ureplicator-controller-2-778558bbf8-5twpf",
    "tags": [
        "ureplicator"
    ],
    "@timestamp": "2021-09-10T08:59:24.494Z",
    "thread": "topic-list-cron-0",
    "@version": "1"
}
    {
    "exception": {
        "class": "java.nio.channels.ClosedChannelException",
        "stacktrace": "java.nio.channels.ClosedChannelException\n\tat kafka.network.BlockingChannel.send(BlockingChannel.scala:112)\n\tat kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:101)\n\tat kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:86)\n\tat kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:114)\n\tat kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:70)\n\tat com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor.updateTopicList(OffsetMonitor.java:211)\n\tat com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor.access$600(OffsetMonitor.java:55)\n\tat com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor$1.run(OffsetMonitor.java:158)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)"
    },
    "level": "WARN",
    "location": {
        "class": "com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor",
        "file": "OffsetMonitor.java",
        "method": "updateTopicList",
        "line": "234"
    },
    "logger": "com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor",
    "message": "Got exception to get metadata from broker=null:-1",
    "host": "ureplicator-controller-2-778558bbf8-5twpf",
    "tags": [
        "ureplicator"
    ],
    "@timestamp": "2021-09-10T08:59:24.499Z",
    "thread": "topic-list-cron-0",
    "@version": "1"
}

Is it somehow possible to fix this behavior for me on my own, by correcting some configuration, for example, or do I need to wait for a fix of the uReplicator itself and its new version?

I would be glad to hear any advice, thanks.

@disserakt
Copy link
Author

disserakt commented Sep 14, 2021

@yangy0000 - I turned on logging in debug mode and got this complete error log with OffsetMonitor:

INFO TopicList starts updating (com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor)
INFO Update topicList (com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor)
DEBUG Reading reply sessionid:0x309607c86e80242, packet:: clientPath:null serverPath:null finished:false header:: 1307,8  replyHeader:: 1307,21484188540,0  request:: '/ureplicator-databusfed-dev/controller-worker-cluster1-cluster2-0/IDEALSTATES,T  response:: v{'ureplicator-databus-fed-dev}  (org.apache.zookeeper.ClientCnxn)
DEBUG TopicList: [ureplicator-databus-fed-dev] (com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor)
INFO Reconnect due to error: (kafka.consumer.SimpleConsumer)
java.nio.channels.ClosedChannelException
	at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:88)
	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:86)
	at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:114)
	at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:70)
	at com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor.updateTopicList(OffsetMonitor.java:211)
	at com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor.access$600(OffsetMonitor.java:55)
	at com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor$1.run(OffsetMonitor.java:158)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
WARN Got exception to get metadata from broker=null:-1 (com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor)
java.nio.channels.ClosedChannelException
	at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:101)
	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:86)
	at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:114)
	at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:70)
	at com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor.updateTopicList(OffsetMonitor.java:211)
	at com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor.access$600(OffsetMonitor.java:55)
	at com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor$1.run(OffsetMonitor.java:158)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
DEBUG partitionLeader: {} (com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor)
DEBUG OffsetMonitor updates offset with leaders={} (com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor)

Perhaps this will help in some way.
Also my clusters.properties looks like this:

kafka.cluster.zkStr.cluster1=data-bus-zk09.rmk.ru:2181,data-bus-zk10.rmk.ru:2181,data-bus-zk11.rmk.ru:2181
kafka.cluster.servers.cluster1=databus-kafka16.rmk.ru:9192,databus-kafka17.rmk.ru:9192,databus-kafka18.rmk.ru:9192
secure.kafka.cluster.servers.cluster1=databus-kafka16.rmk.ru:9192,databus-kafka17.rmk.ru:9192,databus-kafka18.rmk.ru:9192
kafka.cluster.zkStr.cluster2=data-bus-zk01.rmk.ru:2181,data-bus-zk02.rmk.ru:2181,data-bus-zk03.rmk.ru:2181,data-bus-zk04.rmk.ru:2181,data-bus-zk05.rmk.ru:2181
kafka.cluster.servers.cluster2=databus-kafka04.rmk.ru:9192,databus-kafka05.rmk.ru:9192,databus-kafka06.rmk.ru:9192,databus-kafka07.rmk.ru:9192,databus-kafka08.rmk.ru:9192,databus-kafka09.rmk.ru:9192,databus-kafka10.rmk.ru:9192,databus-kafka11.rmk.ru:9192,databus-kafka12.rmk.ru:9192
secure.kafka.cluster.servers.cluster2=databus-kafka04.rmk.ru:9192,databus-kafka05.rmk.ru:9192,databus-kafka06.rmk.ru:9192,databus-kafka07.rmk.ru:9192,databus-kafka08.rmk.ru:9192,databus-kafka09.rmk.ru:9192,databus-kafka10.rmk.ru:9192,databus-kafka11.rmk.ru:9192,databus-kafka12.rmk.ru:9192

Port 9192 use SASL_SSL protocol and SCRAM-SHA-256,SCRAM-SHA-512 authorization mechanisms.

@yangy0000
Copy link
Collaborator

Unfortunately, offset monitor is built on top of SimpleConsumer which doesn't support SASL_SSL protocal. We are planning to fix this too @laosiaudi

You can make contributions to this if you are interested. :)

@disserakt
Copy link
Author

disserakt commented Sep 15, 2021

@yangy0000, @laosiaudi - here's the thing, it's clear - thanks a lot for the info!
About contributions - well, we will try, maybe it will work out =)

And at the moment - perhaps there are alternative options, to replace OffsetMonitor with something, for monitoring offset lag - maybe some external utilities?
I tried burrow - but nothing worked, most likely because the uReplicator does not create consumer groups.

I would be glad to hear any advice, thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants