Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Kafka: fix missing data #19587

Merged
merged 4 commits into from
Dec 6, 2022

Conversation

alexnikitchuk
Copy link
Contributor

@alexnikitchuk alexnikitchuk commented Nov 18, 2022

What

Kafka Source does not guarantee "at-least-once" delivery because it drops records in case max_records_process is exceeded.

How

Consume all data returned from each call to poll(long) before closing the consumer.

Recommended reading order

  1. x.java
  2. y.python

🚨 User Impact 🚨

Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.

Pre-merge Checklist

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • Community member? Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
    • docs/integrations/README.md
    • airbyte-integrations/builds.md
  • PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
  • /test connector=connectors/<name> command is passing
  • New Connector version released on Dockerhub by running the /publish command described here
  • After the connector is published, connector added to connector index as described here
  • Seed specs have been re-generated by building the platform and committing the changes to the seed spec files, as described here
Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • Changelog updated in docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
  • PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
  • /test connector=connectors/<name> command is passing
  • New Connector version released on Dockerhub and connector version bumped by running the /publish command described here
Connector Generator
  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • If adding a new generator, add it to the list of scaffold modules being tested
  • The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates then checking in your changes
  • Documentation which references the generator is updated as needed

Tests

Unit

Put your unit tests output here.

Integration

Put your integration tests output here.

Acceptance

Put your acceptance tests output here.

@marcosmarxm
Copy link
Member

Hello 👋, first thank you for this amazing contribution.

We really appreciate the effort you've made to improve the project.
We ask you patience for the code review. Last month our team was focused on Hacktoberfest event and that probably left some PR without the proper feedback. And this week, due to the Thanksgiving US Holiday, most our team is out of office with their families. Another important piece of information why code won't be merge this week is: as a safety measure the core team has decided to freeze merging code to main branch to keep the release stable. Next week we'll return to you with the proper code review and update the status of your contribution.

If you have any questions feel free to send me a message in Slack!
Thanks!

@marcosmarxm marcosmarxm changed the title Kafka Source - fix missing data 🐛 Source Kafka: fix missing data Dec 2, 2022
@marcosmarxm
Copy link
Member

@itaseskii are you going to review this?

@marcosmarxm
Copy link
Member

marcosmarxm commented Dec 5, 2022

/test connector=connectors/source-kafka

🕑 connectors/source-kafka https://github.com/airbytehq/airbyte/actions/runs/3619654858
✅ connectors/source-kafka https://github.com/airbytehq/airbyte/actions/runs/3619654858
Python tests coverage:

	 Name                                                 Stmts   Miss  Cover   Missing
	 ----------------------------------------------------------------------------------
	 source_acceptance_test/base.py                          12      4    67%   16-19
	 source_acceptance_test/config.py                       140      5    96%   87, 93, 238, 242-243
	 source_acceptance_test/conftest.py                     199     93    53%   35, 41-43, 48, 53, 59, 65, 71, 77-79, 98, 103-105, 111-113, 119-120, 125-126, 131, 137, 146-155, 161-166, 181, 205, 236, 242, 250-255, 263-268, 276-289, 294-300, 307-318, 325-341
	 source_acceptance_test/plugin.py                        69     25    64%   22-23, 31, 36, 120-140, 144-148
	 source_acceptance_test/tests/test_core.py              398    111    72%   53, 58, 87-95, 100-107, 111-112, 116-117, 299, 337-354, 363-371, 375-380, 386, 419-424, 462-469, 512-514, 517, 582-590, 602-605, 610, 666-667, 673, 676, 712-722, 735-760
	 source_acceptance_test/tests/test_incremental.py       158     14    91%   52-59, 64-77, 240
	 source_acceptance_test/utils/asserts.py                 39      2    95%   62-63
	 source_acceptance_test/utils/common.py                  94     10    89%   16-17, 32-38, 72, 75
	 source_acceptance_test/utils/compare.py                 62     23    63%   21-51, 68, 97-99
	 source_acceptance_test/utils/connector_runner.py       112     50    55%   23-26, 32, 36, 39-68, 71-73, 76-78, 81-83, 86-88, 91-93, 96-114, 148-150
	 source_acceptance_test/utils/json_schema_helper.py     107     13    88%   30-31, 38, 41, 65-68, 96, 120, 192-194
	 ----------------------------------------------------------------------------------
	 TOTAL                                                 1569    350    78%

Build Passed

Test summary info:

=========================== short test summary info ============================
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/source_acceptance_test/plugin.py:63: Skipping TestConnection.test_check: not found in the config.
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/source_acceptance_test/plugin.py:63: Skipping TestDiscovery.test_discover: not found in the config.
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/source_acceptance_test/plugin.py:63: Skipping TestBasicRead.test_read: not found in the config.
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/source_acceptance_test/plugin.py:63: Skipping TestFullRefresh.test_sequential_reads: not found in the config.
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/source_acceptance_test/plugin.py:63: Skipping TestIncremental.test_two_sequential_reads: not found in the config.
================= 14 passed, 5 skipped, 21 warnings in 38.19s ==================

@itaseskii
Copy link
Contributor

@itaseskii are you going to review this?

@marcosmarxm don't merge it yet I'll review it today.

@sajarin sajarin added the bounty-M Maintainer program: claimable medium bounty PR label Dec 5, 2022
@itaseskii
Copy link
Contributor

itaseskii commented Dec 5, 2022

@alexnikitchuk The fix looks good to me. It makes sense to avoid reading additional records once the max_records count is exceeded which the previous implementation was doing. The additional read shouldn't have been that problematic since the offset wasn't committed and dropped records would have been consumed in the next replication but still it is better to improve efficiency whenever possible 👍

With that in mind I'm unsure if the current implementation is correct when it comes to its constraint about not transmitting more records than specified in max_records. If the poll reads 20 records on initial read whereas the max_records count is set to 10 the records would still be committed/transmitted ignoring the constraint. I'm aware that this was not the point of this task but it is something that maybe we can address in future PR's.

@alexnikitchuk
Copy link
Contributor Author

alexnikitchuk commented Dec 6, 2022

@alexnikitchuk The fix looks good to me. It makes sense to avoid reading additional records once the max_records count is exceeded which the previous implementation was doing. The additional read shouldn't have been that problematic since the offset wasn't committed and dropped records would have been consumed in the next replication but still it is better to improve efficiency whenever possible 👍

Consumer close commits all consumed messages. Why do you say it is not committed?

@itaseskii
Copy link
Contributor

@alexnikitchuk The fix looks good to me. It makes sense to avoid reading additional records once the max_records count is exceeded which the previous implementation was doing. The additional read shouldn't have been that problematic since the offset wasn't committed and dropped records would have been consumed in the next replication but still it is better to improve efficiency whenever possible +1

Consumer close commits all consumed messages. Why do you say it is not committed?

because I wasn't aware that close() will auto commit if enable.auto.commit is set to true, which is by default. If that is the case then the issue was bigger than just reconsuming messages on next sync. 👍

@marcosmarxm
Copy link
Member

marcosmarxm commented Dec 6, 2022

/publish connector=connectors/source-kafka

🕑 Publishing the following connectors:
connectors/source-kafka
https://github.com/airbytehq/airbyte/actions/runs/3630342736


Connector Did it publish? Were definitions generated?
connectors/source-kafka

if you have connectors that successfully published but failed definition generation, follow step 4 here ▶️

Copy link
Member

@marcosmarxm marcosmarxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@octavia-squidington-iv octavia-squidington-iv added the area/documentation Improvements or additions to documentation label Dec 6, 2022
@marcosmarxm marcosmarxm merged commit 3e6d5ac into airbytehq:master Dec 6, 2022
@alexnikitchuk alexnikitchuk deleted the fix_missing_data branch April 16, 2023 17:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation bounty bounty-M Maintainer program: claimable medium bounty PR community connectors/source/kafka
Projects
No open projects
Development

Successfully merging this pull request may close these issues.

6 participants