-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🐛 fix mysql-cdc OOM: use capped queue #4203
Conversation
|
Huh interesting. Should we make a similar change on the Postgres source? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to wait for @cgardens 's comment to be resolved - though it sounds like slowing down the producer is noop since the producer is bottlenecked on consumption.
We should have a follow up ticket to double check this for Postgres CDC.
@cgardens debezium also maintains an internal queue of records but it is also bounded by capacity and has a fixed size. So if we slow down the consumer, producer would also be slowed down |
/test connector=source-mysql
|
created issue for postgres #4253 |
queue.add(e); | ||
boolean inserted = false; | ||
while (!inserted) { | ||
inserted = queue.offer(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we do a small sleep
here to ensure we don't busy-loop inside this thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sherifnada What do you think would be a right sleep value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally expo backoff for 1/2/4/8/16 ms maybe capping at half a second. But barring that a magic constant (erring on the smaller side) should do. For example 10ms.
@@ -233,7 +233,7 @@ private static boolean shouldUseCDC(ConfiguredAirbyteCatalog catalog) { | |||
final AirbyteFileOffsetBackingStore offsetManager = initializeState(stateManager); | |||
AirbyteSchemaHistoryStorage schemaHistoryManager = initializeDBHistory(stateManager); | |||
FilteredFileDatabaseHistory.setDatabaseName(config.get("database").asText()); | |||
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>(); | |||
final LinkedBlockingQueue<ChangeEvent<String, String>> queue = new LinkedBlockingQueue<>(10000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any context behind this magic number? (can we add a comment(
awesome! thanks for digging into this. can you add a comment explaining this? |
while (!inserted) { | ||
inserted = queue.offer(e); | ||
try { | ||
Thread.sleep(10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw this should only trigger if inserted == false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good catch! sorry about that!
/test connector=source-mysql
|
/publish connector=connectors/source-mysql
|
Merging this PR cause the integration tests pass and the other tests pass locally, we are having issues with Acceptance test |
Issue : #3969
Read #3969 (comment) for detailed understanding of the issue
By using a queue with capped capacity we avoid inserting new elements in the queue until there is space for new elements. This avoids heap getting bigger
Pre-merge Checklist
Expand the checklist which is relevant for this PR.
Connector checklist
airbyte_secret
in output spec./gradlew :airbyte-integrations:connectors:<name>:integrationTest
./test connector=connectors/<name>
command as documented here is passing.docs/integrations/
directory./publish
command described hereConnector Generator checklist
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changes