Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

C++ consumer APIs have memory leaks under certain conditions #4486

Open
zhangyafeikimi opened this issue Oct 27, 2023 · 9 comments · May be fixed by #4667
Open

C++ consumer APIs have memory leaks under certain conditions #4486

zhangyafeikimi opened this issue Oct 27, 2023 · 9 comments · May be fixed by #4667

Comments

@zhangyafeikimi
Copy link

zhangyafeikimi commented Oct 27, 2023

Description

C++ consumer APIs have memory leaks under certain conditions.

How to reproduce

OS: Linux CentOS 7.9.2009
VERSION: librdkafka 2.1.0

I verified it with examples/rdkafka_example.cpp.

The command line is like:

valgrind --leak-check=full --show-leak-kinds=all ./rdkafka_example -C -t test2 -p 0

When the kafka is not started or the topic does not exist, the result is like:

==2446== Memcheck, a memory error detector
==2446== Copyright (C) 2002-2022, and GNU GPL'd, by Julian Seward et al.
==2446== Using Valgrind-3.21.0 and LibVEX; rerun with -h for copyright info
==2446== Command: ./rdkafka_example -C -t test2 -p 0
==2446==
% Created consumer rdkafka#consumer-1
^C==2446==
==2446== HEAP SUMMARY:
==2446==     in use at exit: 2,988 bytes in 10 blocks
==2446==   total heap usage: 337 allocs, 327 frees, 78,889 bytes allocated
==2446==
==2446== 20 bytes in 2 blocks are indirectly lost in loss record 1 of 9
==2446==    at 0x4C2B0D5: malloc (in /opt/p13n-valgrind/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==2446==    by 0x5B08B89: strdup (in /usr/lib64/libc-2.17.so)
==2446==    by 0x4A6138: rd_strdup (rd.h:156)
==2446==    by 0x4A6138: rd_kafka_anyconf_set_prop0 (rdkafka_conf.c:1725)
==2446==    by 0x4A6138: rd_kafka_defaultconf_set (rdkafka_conf.c:2171)
==2446==    by 0x4A63D7: rd_kafka_topic_conf_new (rdkafka_conf.c:2191)
==2446==    by 0x4A68DE: rd_kafka_topic_conf_dup (rdkafka_conf.c:2623)
==2446==    by 0x40CF06: RdKafka::Topic::create(RdKafka::Handle*, std::string const&, RdKafka::Conf const*, std::string&) (TopicImpl.cpp:92)
==2446==    by 0x405FF4: main (rdkafka_example.cpp:531)
==2446==
==2446== 24 bytes in 1 blocks are indirectly lost in loss record 2 of 9
==2446==    at 0x4C2B0D5: malloc (in /opt/p13n-valgrind/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==2446==    by 0x456274: rd_malloc (rd.h:139)
==2446==    by 0x456274: rd_kafkap_str_new (rdkafka_proto.h:304)
==2446==    by 0x456274: rd_kafka_topic_new0 (rdkafka_topic.c:335)
==2446==    by 0x4579CA: rd_kafka_topic_new (rdkafka_topic.c:512)
==2446==    by 0x40CF40: RdKafka::Topic::create(RdKafka::Handle*, std::string const&, RdKafka::Conf const*, std::string&) (TopicImpl.cpp:114)
==2446==    by 0x405FF4: main (rdkafka_example.cpp:531)
==2446==
==2446== 112 bytes in 1 blocks are indirectly lost in loss record 3 of 9
==2446==    at 0x4C2FF55: calloc (in /opt/p13n-valgrind/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==2446==    by 0x4BA12A: rd_calloc (rd.h:133)
==2446==    by 0x4BA12A: rd_kafka_op_new0 (rdkafka_op.c:265)
==2446==    by 0x4ACCF8: rd_kafka_toppar_op_version_bump (rdkafka_partition.c:199)
==2446==    by 0x4B5E46: rd_kafka_toppar_fetch_stop (rdkafka_partition.c:1724)
==2446==    by 0x4B79F0: rd_kafka_toppar_op_serve (rdkafka_partition.c:2012)
==2446==    by 0x4A0DC1: rd_kafka_q_serve (rdkafka_queue.c:513)
==2446==    by 0x422C9B: rd_kafka_thread_main (rdkafka.c:2117)
==2446==    by 0x423F97: _thrd_wrapper_function (tinycthread.c:576)
==2446==    by 0x5867EA4: start_thread (in /usr/lib64/libpthread-2.17.so)
==2446==    by 0x5B7AB0C: clone (in /usr/lib64/libc-2.17.so)
==2446==
==2446== 128 bytes in 1 blocks are indirectly lost in loss record 4 of 9
==2446==    at 0x4C3015B: realloc (in /opt/p13n-valgrind/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==2446==    by 0x4418C3: rd_realloc (rd.h:145)
==2446==    by 0x4418C3: rd_list_grow (rdlist.c:48)
==2446==    by 0x44190B: rd_list_init (rdlist.c:56)
==2446==    by 0x45658F: rd_kafka_topic_new0 (rdkafka_topic.c:459)
==2446==    by 0x4579CA: rd_kafka_topic_new (rdkafka_topic.c:512)
==2446==    by 0x40CF40: RdKafka::Topic::create(RdKafka::Handle*, std::string const&, RdKafka::Conf const*, std::string&) (TopicImpl.cpp:114)
==2446==    by 0x405FF4: main (rdkafka_example.cpp:531)
==2446==
==2446== 176 bytes in 1 blocks are indirectly lost in loss record 5 of 9
==2446==    at 0x4C2B0D5: malloc (in /opt/p13n-valgrind/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==2446==    by 0x49E14C: rd_malloc (rd.h:139)
==2446==    by 0x49E14C: rd_kafka_q_new0 (rdkafka_queue.c:110)
==2446==    by 0x42065C: rd_kafka_new (rdkafka.c:2253)
==2446==    by 0x40A599: RdKafka::Consumer::create(RdKafka::Conf const*, std::string&) (ConsumerImpl.cpp:60)
==2446==    by 0x405F59: main (rdkafka_example.cpp:520)
==2446==
==2446== 176 bytes in 1 blocks are indirectly lost in loss record 6 of 9
==2446==    at 0x4C2B0D5: malloc (in /opt/p13n-valgrind/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==2446==    by 0x49E14C: rd_malloc (rd.h:139)
==2446==    by 0x49E14C: rd_kafka_q_new0 (rdkafka_queue.c:110)
==2446==    by 0x4AD591: rd_kafka_toppar_new0 (rdkafka_partition.c:255)
==2446==    by 0x4ADEF1: rd_kafka_toppar_desired_add (rdkafka_partition.c:643)
==2446==    by 0x415AD6: rd_kafka_consume_start0 (rdkafka.c:2700)
==2446==    by 0x415AD6: rd_kafka_consume_start (rdkafka.c:2745)
==2446==    by 0x409CB6: RdKafka::ConsumerImpl::start(RdKafka::Topic*, int, long) (ConsumerImpl.cpp:85)
==2446==    by 0x40601C: main (rdkafka_example.cpp:540)
==2446==
==2446== 176 bytes in 1 blocks are indirectly lost in loss record 7 of 9
==2446==    at 0x4C2B0D5: malloc (in /opt/p13n-valgrind/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==2446==    by 0x49E14C: rd_malloc (rd.h:139)
==2446==    by 0x49E14C: rd_kafka_q_new0 (rdkafka_queue.c:110)
==2446==    by 0x4AD5B1: rd_kafka_toppar_new0 (rdkafka_partition.c:256)
==2446==    by 0x4ADEF1: rd_kafka_toppar_desired_add (rdkafka_partition.c:643)
==2446==    by 0x415AD6: rd_kafka_consume_start0 (rdkafka.c:2700)
==2446==    by 0x415AD6: rd_kafka_consume_start (rdkafka.c:2745)
==2446==    by 0x409CB6: RdKafka::ConsumerImpl::start(RdKafka::Topic*, int, long) (ConsumerImpl.cpp:85)
==2446==    by 0x40601C: main (rdkafka_example.cpp:540)
==2446==
==2446== 1,176 bytes in 1 blocks are indirectly lost in loss record 8 of 9
==2446==    at 0x4C2FF55: calloc (in /opt/p13n-valgrind/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==2446==    by 0x4AD39B: rd_calloc (rd.h:133)
==2446==    by 0x4AD39B: rd_kafka_toppar_new0 (rdkafka_partition.c:219)
==2446==    by 0x4ADEF1: rd_kafka_toppar_desired_add (rdkafka_partition.c:643)
==2446==    by 0x415AD6: rd_kafka_consume_start0 (rdkafka.c:2700)
==2446==    by 0x415AD6: rd_kafka_consume_start (rdkafka.c:2745)
==2446==    by 0x409CB6: RdKafka::ConsumerImpl::start(RdKafka::Topic*, int, long) (ConsumerImpl.cpp:85)
==2446==    by 0x40601C: main (rdkafka_example.cpp:540)
==2446==
==2446== 2,988 (1,000 direct, 1,988 indirect) bytes in 1 blocks are definitely lost in loss record 9 of 9
==2446==    at 0x4C2FF55: calloc (in /opt/p13n-valgrind/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==2446==    by 0x456244: rd_calloc (rd.h:133)
==2446==    by 0x456244: rd_kafka_topic_new0 (rdkafka_topic.c:331)
==2446==    by 0x4579CA: rd_kafka_topic_new (rdkafka_topic.c:512)
==2446==    by 0x40CF40: RdKafka::Topic::create(RdKafka::Handle*, std::string const&, RdKafka::Conf const*, std::string&) (TopicImpl.cpp:114)
==2446==    by 0x405FF4: main (rdkafka_example.cpp:531)
==2446==
==2446== LEAK SUMMARY:
==2446==    definitely lost: 1,000 bytes in 1 blocks
==2446==    indirectly lost: 1,988 bytes in 9 blocks
==2446==      possibly lost: 0 bytes in 0 blocks
==2446==    still reachable: 0 bytes in 0 blocks
==2446==         suppressed: 0 bytes in 0 blocks
==2446==
==2446== For lists of detected and suppressed errors, rerun with: -s
==2446== ERROR SUMMARY: 1 errors from 1 contexts (suppressed: 0 from 0)
@zhangyafeikimi
Copy link
Author

Tried 2.3.0, have the same leaks.

@dholm
Copy link

dholm commented Nov 22, 2023

We have the same issue using the C API directly. I did a bisect and the problem was introduced with 8e20e1e. If I revert that commit the leaks go away.

@emasab
Copy link
Contributor

emasab commented Nov 24, 2023

Thanks for the report. In provided stack trace it's just the example that is leaking memory, because the topic isn't destroyed.

@zhangyafeikimi
Copy link
Author

zhangyafeikimi commented Nov 24, 2023

Not just for examples.
These leaks are fatal for long running servers when the topic is unavailable.

@qingzhongli
Copy link

qingzhongli commented Jan 14, 2024

I got the same problem. As @dholm said , if I revert that commit 8e20e1e, the leaks go away.

@Quuxplusone
Copy link
Contributor

My employer has also tentatively bisected an issue to 8e20e1e. We see that if we have a groupconsumer, and the broker goes down (that is, we shut down the Kafka broker), and then comes back up, and then we try to shut down the groupconsumer, it simply hangs forever. This doesn't reproduce in 1.9.3rc2 or earlier, but it does reproduce in 2.0.0rc1 through 2.3.0. In fact, it appears not to reproduce in a83cadf but to start reproducing with 8e20e1e.

I see a lot of separate reporters pointing at #4117 (and the followup #4208), and things like "TODO" appearing in the changelog. Is it possible that those commits weren't reviewed as thoroughly as they should have been?

Quuxplusone added a commit to Quuxplusone/librdkafka that referenced this issue Apr 1, 2024
…pconsumer

We observed that destroying a groupconsumer would often hang waiting for
the broker thread to exit. We tediously bisected the problem to
the specific commit 8e20e1e (the last commit before the v2.0.0rc1 tag).
Only then did we find that a lot of people on GitHub were already complaining
about that commit as introducing a resource leak: the commit adds a call to
`rd_kafka_toppar_keep` that bumps the refcount of the toppar, and I don't
immediately see a corresponding `rd_kafka_toppar_destroy` anywhere.

Reverting 8e20e1e (as in this commit) does fix the hang in groupconsumer
destruction which we were observing, so we've applied this patch
to our downstream library.

Fixes confluentinc#4486.
@emasab
Copy link
Contributor

emasab commented Apr 3, 2024

Found and fixed the memory leak that happens in these examples rdkafka_example and rdkafka_example_cpp here.
Happens since 2.0.2 when using rd_kafka_consume_stop on a partition unknown to the client, for example if the client hasn't received metadata yet or the partition number is higher than the maximum.

In these examples it doesn't hangs in the destroy call, but only causes a memory leak.
@Quuxplusone in case it causes a hang, please try this fix and in case it's still happening tell me how to reproduce it.

@emasab
Copy link
Contributor

emasab commented Apr 3, 2024

This seems related #4362

@Quuxplusone
Copy link
Contributor

@Quuxplusone in case it causes a hang, please try this fix and in case it's still happening tell me how to reproduce it.

Replied on #4669 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants