Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parsing JSON payload with cppkafka #276

Open
raghavendraramya opened this issue Apr 28, 2021 · 3 comments
Open

Parsing JSON payload with cppkafka #276

raghavendraramya opened this issue Apr 28, 2021 · 3 comments

Comments

@raghavendraramya
Copy link

raghavendraramya commented Apr 28, 2021

Hi, I am using cppkafka to have a Kafka consumer and connect to a Kafka server. The payload I receive is in a JSON format, so I am looking at converting it to a JSON object and then parse it. However, when I add in the code to create an object, I get the following error:

terminate called after throwing an instance of 'cppkafka::Exception'
what(): Failed to create consumer handle: Failed to create thread: Success (0)
Aborted (core dumped)

If I comment the JSON object creation, the code runs fine and I can read messages from the Kafka server. I am confused as to why the JSON object creation has any relationship to the error.
The code I am using is attached. Lines 20 and 21, when commented, makes the code run fine and the consumer code is able to connect to a Kafka server and read the messages. When I uncomment the lines, I get the error above.

Main.txt.txt

I am thankful to any help in advance!

@jlcordeiro
Copy link
Contributor

tested it with a random json parser I found on github and it works fine.
I bet the problem is in your json.hpp file (which you didn't attach). I'd look there.

If you need further help just drop that file in here also and I can have a look.

@raghavendraramya
Copy link
Author

Thank you for getting back to me on this. Which parser did you use? Could you kindly send me a link?
json_hpp.txt

I am using the nlohmann JSON parser. Please see the attached json.hpp.

@jlcordeiro
Copy link
Contributor

jlcordeiro commented May 6, 2021

yeah you're using the library wrong. I'm even surprised it is compiling for you, so I'd suggest you look at your build process and see if something funky is going on.

Either way please see below a couple of changes that made your code work for me, and printing out the received events, in json.

diff bin/Main.txt.txt bin/main.cpp 
20,21c20,21
< 	//json jsonObj;
< 	//std::stringstream currentMessage >> jsonObj;
---
> 	nlohmann::json jsonObj = nlohmann::json::parse(currentMessage);
> 	std::cout << jsonObj.dump(4) << std::endl;

And here's a full example where I just removed unnecessary lines for brevity sake:

#include <stdexcept>
#include <iostream>
#include <csignal>
#include "json.hpp"
#include "cppkafka/consumer.h"
#include "cppkafka/configuration.h"

void ParseKafkaDetection(std::string currentMessage)
{
    nlohmann::json jsonObj = nlohmann::json::parse(currentMessage);
    std::cout << jsonObj.dump(4) << std::endl;
}

int main() {

    std::string brokers;
    std::string topic_name = "my-topic";
    std::string group_id;

    cppkafka::Configuration config = {
        { "metadata.broker.list", "localhost" },
        { "group.id", "kafka-consumer-test" },
        { "enable.auto.commit", false },
        { "session.timeout.ms", 60000 }
    };

    cppkafka::Consumer consumer(config);
    consumer.subscribe({ topic_name });

    while (true) {
        cppkafka::Message msg = consumer.poll();
        if (!msg) {
            continue;
        }
        // There's an error
        if (msg.get_error()) {
            if (!msg.is_eof()) {
                printf("Error\n");
            }
            continue;
        }
        ParseKafkaDetection(std::string(msg.get_payload()));	
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants