Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(inputs.kafka_consumer): Fix deadlock #16074

Merged
merged 1 commit into from
Oct 24, 2024

Conversation

zak-pawel
Copy link
Collaborator

Summary

PR fixes a potential deadlock that most often could occur when plugin.Stop() was called immediately after plugin.Start() - this situation was most likely to happen during the execution of unit tests.
I also removed ticker, which was never initialized. It was likely a leftover from some functionality that no longer exists.

Root cause:

In the Stop() method, k.topicLock.Lock() was called along with defer k.topicLock.Unlock().
This caused k.topicLock to remain locked until the Stop() method finished executing.
At the very end of the Stop() method, k.wg.Wait() was waiting for all goroutines to complete.

Meanwhile, in the Start() method, a goroutine was started, which got stuck at k.topicLock.Lock() because it was waiting for k.topicLock to be unlocked (which would only happen after the Stop() method finished).

Checklist

  • No AI generated code was used in this PR

Related stacktrace:

=== FAIL: plugins/inputs/kafka_consumer  (0.00s)
panic: test timed out after 10m0s
        running tests:
                TestStartStop (10m0s)

goroutine 98 [running]:
testing.(*M).startAlarm.func1()
        /usr/local/go/src/testing/testing.go:2373 +0x265
created by time.goFunc
        /usr/local/go/src/time/sleep.go:215 +0x45

goroutine 1 [chan receive, 10 minutes]:
testing.(*T).Run(0xc00035e1a0, {0x16130ad, 0xd}, 0x16cee30)
        /usr/local/go/src/testing/testing.go:1751 +0x851
testing.runTests.func1(0xc00035e1a0)
        /usr/local/go/src/testing/testing.go:2168 +0x86
testing.tRunner(0xc00035e1a0, 0xc00050fae0)
        /usr/local/go/src/testing/testing.go:1690 +0x227
testing.runTests(0xc000012018, {0x1fd5d20, 0xc, 0xc}, {0x7f7504e92f18?, 0x40?, 0x1fe2040?})
        /usr/local/go/src/testing/testing.go:2166 +0x8bf
testing.(*M).Run(0xc00037a320)
        /usr/local/go/src/testing/testing.go:2034 +0xf18
main.main()
        _testmain.go:67 +0x165

goroutine 94 [semacquire, 10 minutes]:
sync.runtime_Semacquire(0xc00048c890?)
        /usr/local/go/src/runtime/sema.go:71 +0x25
sync.(*WaitGroup).Wait(0xc00048c888)
        /usr/local/go/src/sync/waitgroup.go:118 +0xa5
github.com/influxdata/telegraf/plugins/inputs/kafka_consumer.(*KafkaConsumer).Stop(0xc00048c488)
        /go/src/github.com/influxdata/telegraf/plugins/inputs/kafka_consumer/kafka_consumer.go:389 +0x1ac
github.com/influxdata/telegraf/plugins/inputs/kafka_consumer.TestStartStop(0xc00035fba0)
        /go/src/github.com/influxdata/telegraf/plugins/inputs/kafka_consumer/kafka_consumer_test.go:236 +0x318
testing.tRunner(0xc00035fba0, 0x16cee30)
        /usr/local/go/src/testing/testing.go:1690 +0x227
created by testing.(*T).Run in goroutine 1
        /usr/local/go/src/testing/testing.go:1743 +0x826

goroutine 95 [chan receive, 10 minutes]:
github.com/influxdata/telegraf/plugins/inputs/kafka_consumer.(*KafkaConsumer).startErrorAdder.func1()
        /go/src/github.com/influxdata/telegraf/plugins/inputs/kafka_consumer/kafka_consumer.go:289 +0x197
created by github.com/influxdata/telegraf/plugins/inputs/kafka_consumer.(*KafkaConsumer).startErrorAdder in goroutine 94
        /go/src/github.com/influxdata/telegraf/plugins/inputs/kafka_consumer/kafka_consumer.go:287 +0xf4

goroutine 96 [sync.Mutex.Lock, 10 minutes]:
sync.runtime_SemacquireMutex(0x30?, 0x8?, 0xc000100508?)
        /usr/local/go/src/runtime/sema.go:95 +0x25
sync.(*Mutex).lockSlow(0xc00048c880)
        /usr/local/go/src/sync/mutex.go:173 +0x213
sync.(*Mutex).Lock(0xc00048c880)
        /usr/local/go/src/sync/mutex.go:92 +0x55
github.com/influxdata/telegraf/plugins/inputs/kafka_consumer.(*KafkaConsumer).Start.func1()
        /go/src/github.com/influxdata/telegraf/plugins/inputs/kafka_consumer/kafka_consumer.go:356 +0x9fc
created by github.com/influxdata/telegraf/plugins/inputs/kafka_consumer.(*KafkaConsumer).Start in goroutine 94
        /go/src/github.com/influxdata/telegraf/plugins/inputs/kafka_consumer/kafka_consumer.go:321 +0x6af

goroutine 23 [chan receive, 10 minutes]:
github.com/influxdata/telegraf/plugins/inputs/kafka_consumer.(*KafkaConsumer).startErrorAdder.func1()
        /go/src/github.com/influxdata/telegraf/plugins/inputs/kafka_consumer/kafka_consumer.go:289 +0x197
created by github.com/influxdata/telegraf/plugins/inputs/kafka_consumer.(*KafkaConsumer).startErrorAdder in goroutine 96
        /go/src/github.com/influxdata/telegraf/plugins/inputs/kafka_consumer/kafka_consumer.go:287 +0xf4
FAIL    github.com/influxdata/telegraf/plugins/inputs/kafka_consumer    600.188s

=== FAIL: plugins/inputs/kafka_consumer TestStartStop (unknown)

@telegraf-tiger telegraf-tiger bot added fix pr to fix corresponding bug plugin/input 1. Request for new input plugins 2. Issues/PRs that are related to input plugins labels Oct 24, 2024
@zak-pawel zak-pawel requested review from srebhan and DStrand1 October 24, 2024 15:50
Copy link
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are awesome @zak-pawel! Thanks a bunch!

@telegraf-tiger
Copy link
Contributor

@srebhan srebhan added area/kafka ready for final review This pull request has been reviewed and/or tested by multiple users and is ready for a final review. labels Oct 24, 2024
@srebhan srebhan removed their assignment Oct 24, 2024
@DStrand1 DStrand1 merged commit 662607c into influxdata:master Oct 24, 2024
27 checks passed
@github-actions github-actions bot added this to the v1.32.2 milestone Oct 24, 2024
srebhan pushed a commit that referenced this pull request Oct 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/kafka fix pr to fix corresponding bug plugin/input 1. Request for new input plugins 2. Issues/PRs that are related to input plugins ready for final review This pull request has been reviewed and/or tested by multiple users and is ready for a final review.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants