Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added handling of NULL records and un-parseable records. #35

Merged
merged 11 commits into from
Aug 20, 2024

Conversation

Tiihott
Copy link

@Tiihott Tiihott commented Aug 9, 2024

Added handling of NULL records and un-parseable records.

Partial fix for issue #30.

@@ -254,16 +254,21 @@ public void accept(List<RecordOffset> recordOffsetObjectList) {
}

byte[] byteArray = recordOffsetObject.getRecord(); // loads the byte[] contained in recordOffsetObject.getRecord() to byteArray.
batchBytes = batchBytes + byteArray.length;
if (byteArray == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data error must cause a software to abort and abort in a way that it states why it aborted, invalid data must not be processed by default. please create a flag in configuration that allows invalid data to be skipped but logged.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this flag should be regarding empty records.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added flag for empty records.
Solved in commit 9fb3949

@@ -323,6 +328,13 @@ public void accept(List<RecordOffset> recordOffsetObjectList) {
catch (IOException e) {
throw new UncheckedIOException(e);
}
catch (ParseException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invalid data must not be processed, please create a flag that allows parse failures to be skipped but logged.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added flag for non RFC5424 records.
Solved in commit 9fb3949

// enable kerberus
conf.set("hadoop.security.authentication", config.getHadoopAuthentication());
conf.set("hadoop.security.authorization", config.getHadoopAuthorization());
conf.set("hadoop.kerberos.keytab.login.autorenewal.enabled", "true");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it configureable, please notice that some KDCs allow only limited amount of renewals and have a maximum total renewal time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added configuration flag for autorenewal in commit 9d4311d

How should the ticket lifetime problem be approached? Can periodically using checkTGTAndReloginFromKeytab() on the UGI maintain the ticket indefinitely if automatic renewal won't do?

@@ -97,9 +98,9 @@ public void readCoordinatorTest2Threads() {

Thread.sleep(10000);
Assertions.assertEquals(2, messages.size());
Assertions.assertEquals(140, messages.get(0).size() + messages.get(1).size()); // Assert that expected amount of records has been consumed by the consumer group.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why test amounts change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 new records were added to every test topic partition in MockKafkaConsumerFactory. One null record and one non rfc5424 record. With 10 topic partitions there are now 20 more records in the test data.

@Tiihott Tiihott requested a review from kortemik August 14, 2024 10:30
Copy link
Member

@kortemik kortemik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approved conditionally that code is refactored before merging to main as objectified one.

LOGGER.debug("Null record metadata: {}", recordOffsetObject.offsetToJSON());
}
syslogFile.delete(); // Clean up
throw new NullPointerException("Record with null content detected during processing.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw custom exception instead of NPE because the case of null is handled by removing the file.

@kortemik kortemik merged commit 60cc06c into teragrep:beta Aug 20, 2024
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants