Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

METRON-1594: KafkaWriter is asynchronous and may lose data on node failure #1045

Closed
wants to merge 12 commits into from

Conversation

mmiklavc
Copy link
Contributor

@mmiklavc mmiklavc commented May 31, 2018

Contributor Comments

https://issues.apache.org/jira/browse/METRON-1594

This covers the work to convert the KafkaWriter from a basic MessageWriter to a BulkMessageWriter in order to address making producer.send() synchronous. This impacts: parsers, enrichment, indexing (error topic output), and profiler. Anything previously using the KafkaWriter as a single-record MessageWriter has been converted over to using it as a BulkMessageWriter.

Other:

  • ParserBolt was lacking Tick Tuples so I've ported over the functionality almost verbatim from the BulkMessageWriterBolt.
  • BulkMessageWriterBolt has been generalized. Previously, it was extending ConfiguredIndexingBolt in both enrichments and indexing.
  • Configuring batchSize and batchTimeout for the BulkWriterComponent that wraps BulkMessageWriter(s):
    • parsers stays the same, default updated to 15 (unless/until perf testing suggests a different default more suitable)
    • enrichment - pulls from global config: enrichment.writer.batchSize and enrichment.writer.batchTimeout
    • indexing - stays the same except for the error output to kafka topic, which has been set to batch size 15 by default
    • profiler - Added a ProfilerWriterConfiguration to match the pattern used by other writers. Pulls from global config: profiler.writer.batchSize and profiler.writer.batchTimeout

Tested in full dev and data flows into the ES indexes. Currently undergoing performance testing to establish a proper baseline batch size that does not result in performance regressions.

Pull Request Checklist

Thank you for submitting a contribution to Apache Metron.
Please refer to our Development Guidelines for the complete guide to follow for contributions.
Please refer also to our Build Verification Guidelines for complete smoke testing guides.

In order to streamline the review of the contribution we ask you follow these guidelines and ask you to double check the following:

For all changes:

  • Is there a JIRA ticket associated with this PR? If not one needs to be created at Metron Jira.
  • Does your PR title start with METRON-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
  • Has your PR been rebased against the latest commit within the target branch (typically master)?

For code changes:

  • Have you included steps to reproduce the behavior or problem that is being changed or addressed?

  • Have you included steps or a guide to how the change may be verified and tested manually?

  • Have you ensured that the full suite of tests and checks have been executed in the root metron folder via:

    mvn -q clean integration-test install && dev-utilities/build-utils/verify_licenses.sh 
    
  • Have you written or updated unit tests and or integration tests to verify your changes?

  • Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered by building and verifying the site-book? If not then run the following commands and the verify changes via site-book/target/site/index.html:

    cd site-book
    mvn site
    

Note:

Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
It is also recommended that travis-ci is set up for your personal repository such that your branches are built there before submitting a pull request.

for (Tuple tuple : tuples) {
JSONObject message = messages.get(i++);
Future future = kafkaProducer
.send(new ProducerRecord<String, String>(kafkaTopic, message.toJSONString()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be more defensive when we transform the message to JSON? Considering the broad use of this class, someone might inject something that causes problems during serialization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's effectively a HashMap, since that's what the JSONObject class extends. I think the writer should basically write out whatever has been passed to it, meaning that the responsibility would be on the invoking class to sort out whether the HashMap has been constructed correctly in order to properly serialize into a JSON string. I've made a conscious decision in this PR to not make any changes to how we go about managing serialization and deserialization. Are you worried about security threats, or just garbage data coming from parsers and enrichments, potentially?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made a conscious decision in this PR to not make any changes to how we go about managing serialization and deserialization.

I am not suggesting that we change how we do serialization. I think we need to wrap the message.toJSONString() in a try/catch, so that an exception during serialization is handled as an error and added to the BulkWriterResponse, just like the other errors that we handle on lines 218 and 226.

Right now, an error on one tuple will kill the whole batch. I would think we would want to handle all errors in the same way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @nickwallen - check out this recent commit and let me know what you think. I added a try/catch for a Throwable on serializing the JSON, add it as an error, and continue the loop. Does that look better? I'm hoping we don't have situations where that toJSON method will throw exceptions, but as you mentioned, if it does, now it won't kill the whole batch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks great, thanks.

@nickwallen
Copy link
Contributor

I noticed that we should probably use a timeout when we call KafkaProducer.close. If we get a huge backlog of messages, it will just block until the backlog is cleared. We can get in situations where we build-up such a large backlog that there just is no hope of clearing it. This might actually explain why some of the topologies just won't die sometimes.

@nickwallen
Copy link
Contributor

nickwallen commented Jun 1, 2018

This might actually explain why some of the topologies just won't die sometimes.

The more I think about it, this is very likely what is happening in the current code base when topologies just won't die. The queue builds a giant backlog, then it will block forever trying to clear the queue.

But with these code changes, we should not carry any messages in the internal Kafka producer queue between calls to KafkaWriter.write. So this should no longer be a problem with this PR; me thinks.

@ottobackwards
Copy link
Contributor

Maybe we need some kind of orchestration service that you use to shutdown metron without losing things in the pipeline already

@mmiklavc
Copy link
Contributor Author

mmiklavc commented Jun 4, 2018

@ottobackwards I believe this should already be handled by the acking strategy. Anything not acked will be replayed since we're leveraging at least once message processing semantics.

@mmiklavc
Copy link
Contributor Author

mmiklavc commented Jun 6, 2018

Went through a few variations of testing enrichments using the new KafkaWriter bulk message writer implementation with @anandsubbu and we are seeing results close to the numbers we see in master and prior versions. There is ever slight degradation (went from I believe 36k EPS in our setup for 1 enrichment to 33k EPS), however we are no longer at risk of dropping data.

For posterity purposes, the new implementation can be summarized as follows:

  • kafka producer has a number of settings for batching available via Kafka's client ProducerConfigs class. We are not manipulating these properties directly via the exposed config because we want to control batching in order to have the ability to have guarantees with Storm's acking mechanism.
  • The BulkWriterComponent class manages batching in terms of number of records and timeout. It's important to note that the KafkaProducer batches in terms of size in bytes whereas the writer component manages batches in terms or number of records, regardless of size. Previously, KafkaWriter was processing messages 1-by-1, asynchronously, and leveraging the KafkaProducer defaults. KafkaWriter now is a proper BulkMessageWriter that can handle sending multiple messages to Kafka by processing the Futures returned by the public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record) method. We call flush() to ensure all Futures have been processed before returning their status back to the calling class.
  • We tweaked the default slightly under the hood for the Kafka Producer batch size - it was originally 16KB and our performance tests found the setting to be optimal around 64KB. This has been set as a default constant in the writer as we want users to leverage the batchSize and batchTimeout settings exposed via the standard Metron configuration.
  • It is possible, though not advised, to override the KafkaProducer batch.size and batch.timeout settings by adding the property to the Kafka producer config map in Flux or via parser properties. This should only be done as a matter of optimizing the Kafka batching caches as the producer will always allocate 64KB regardless of being full or not. The settings exposed in Metron for batchSize and batchTimeout should be sufficient, and again, our performance tests suggest that the defaults should be sufficient. As usual, adjusting spout timeouts and spout and bolt parallelism will provide the necessary scaling knobs and levers.

@cestella
Copy link
Member

cestella commented Jun 6, 2018

+1 by inspection. I ran this up in full-dev and data flowed through just fine.

@asfgit asfgit closed this in 523c38c Jun 6, 2018
@nickwallen
Copy link
Contributor

@mmiklavc FYI You've got a TODO comment in ConfiguredIndexingBolt.java that seems like something you wanted to address before merging.

Usually good to at least give a chance for all those who have chimed in on a PR to sign-off before merging it in. At least that's what I try to do as a courtesy.

@mmiklavc
Copy link
Contributor Author

mmiklavc commented Jun 7, 2018

@nickwallen The PR has been up for 7 days, I addressed all community comments days ago, and no comments appeared to be dissenting. Was there a concern or issue you had before this was merged?

@nickwallen
Copy link
Contributor

I was still going through it. Not sure where @ottobackwards landed on this.

Besides the open TODO comment, I am not sure how much testing we did around the Profiler or testing changes made to the new configuration elements that you added. I didn't see a test plan around much of this besides spinning up Full Dev, which doesn't exercise the Profiler.

I guess its just a community courtesy that we've followed, no matter how long its open. Kind of like how I followed up on #1036. I don't want to make a stink of it. Like I said, its just a courtesy I suppose.

@ottobackwards
Copy link
Contributor

My comment was just about calling out a possible need for more shutdown orchestration.
I am not reviewing.

@mmiklavc
Copy link
Contributor Author

mmiklavc commented Jun 7, 2018

I didn't know you were still actively reviewing. The last comment I received besides a +1 was "that looks great, thanks." We do have unit and integration tests around all of this in addition to the batch performance testing performed on enrichments. They use the same KafkaWriter.

Below is verbatim from our bylaws. This is also the handling that I've personally seen on most PRs that don't have dissenting/outstanding concerns to be addressed.

Code Change

A change made to a codebase of the project requires lazy consensus of active committers other than the author of the patch. The code can be committed after the first +1.

@ottobackwards
Copy link
Contributor

I assume you are talking to @nickwallen there @mmiklavc ?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants