Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicitly exit to stop hung background threads in MySQL source connector #11910

Closed

Conversation

adammacleod
Copy link

What

I have been working with Airbyte for around a month now and encountered a problem when loading larger (50M+ record) tables from MySQL to Snowflake. For these loads the source connector would hang after the log message "completed source: class io.airbyte.integrations.source.mysql.MySqlSource".

The problem I had matches this issue exactly: #4322 . It also matches this issue: #5754 . It is possibly related to this issue, though this is for a different connector (db2): #8218 .

In the end I discovered that for certain MySQL loads (seemingly larger ones, though smaller loads failed for me at times too) the Debezium engine and executor both fail to shutdown and leave some remaining threads hanging in the background. These threads then prevent the JVM from exiting when the MySQL source connector's main() function exits. I am not a Java dev but to test this I added the following debugging code:

MySqlSource.java:

  public static void main(final String[] args) throws Exception {
    final Source source = MySqlSource.sshWrappedSource();
    LOGGER.info("starting source: {}", MySqlSource.class);
    new IntegrationRunner(source).run(args);
    LOGGER.info("completed source: {}", MySqlSource.class);
    Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
    for (Thread th : threadSet) {
      LOGGER.info("");
      LOGGER.info("");
      LOGGER.info("Thread name {}", th.getName());
      LOGGER.info("Thread {} state {}", th.getName(), th.getState());
      LOGGER.info("Thread {} stacktrace {}", th.getName(), th.getStackTrace());
      LOGGER.info("Thread {} isAlive {}", th.getName(), th.isAlive());
      LOGGER.info("Thread {} isDaemon {}", th.getName(), th.isDaemon());
      LOGGER.info("Thread {} isInterrupted {}", th.getName(), th.isInterrupted());
    }
  }

DebeziumRecordPublisher.java:

  public void close() throws Exception {
    if (isClosing.compareAndSet(false, true)) {
      // consumers should assume records can be produced until engine has closed.
      if (engine != null) {
        engine.close();
      }

      // wait for closure before shutting down executor service
      if (engineLatch.await(5, TimeUnit.MINUTES)) {
        LOGGER.info("Debezium engine shutdown!");
      } else {
        LOGGER.info("Debezium engine did not shutdown!");
      }

      // shut down and await for thread to actually go down
      executor.shutdown();
              
      if (executor.awaitTermination(5, TimeUnit.MINUTES)) {
        LOGGER.info("Debezium executor shutdown!");
      } else {
        LOGGER.info("Debezium executor did not shutdown!");
      }

      // after the engine is completely off, we can mark this as closed
      hasClosed.set(true);
      
      if (thrownError.get() != null) {
        throw new RuntimeException(thrownError.get());
      }
    }
  }

I found that failed runs never closed the Debezium engine or executor properly (likewise, the message "Debezium engine shutdown." which comes from the completion callback to the Debezium engine above, is also never called in this situation). I also found that there were extra threads hanging around when the sync would fail, which were output by the changes to MySqlSource.java.

How

I considered three options for a fix:

  1. Understand why the Debezium engine is hanging and fix it.
  2. Upgrade Debezium to 1.8 (from 1.4). From what I can see there is a new MySQL connector and I imagine it would be more reliable.
  3. Take the quick way out and add a System.exit(0) to the end of the main method.

I have taken option 3 and am submitting it in this PR. My reasoning is that I believe ultimately Debezium should be upgraded to 1.8, which is much more work than I am able to contribute right now. Adding a System.exit(0) to the end of the main method forces the JVM to close any background threads and allows the source container to exit without hanging. To my understanding AirByte only closes the engine once it has received all records of interest, which means there is very little risk that the background thread hasn't satisfactorily completed our goal.

Sadly I am not able to replicate this in a unit test, but if any additional supporting info is required I would be happy to provide.

Recommended reading order

  1. MySqlSource.Java

🚨 User Impact 🚨

Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.

No breaking changes.

Pre-merge Checklist

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • Community member? Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • docs/SUMMARY.md
    • docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
    • docs/integrations/README.md
    • airbyte-integrations/builds.md
  • PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
  • /test connector=connectors/<name> command is passing
  • New Connector version released on Dockerhub by running the /publish command described here
  • After the connector is published, connector added to connector index as described here
  • Seed specs have been re-generated by building the platform and committing the changes to the seed spec files, as described here
Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • Changelog updated in docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
  • PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
  • /test connector=connectors/<name> command is passing
  • New Connector version released on Dockerhub and connector version bumped by running the /publish command described here
Connector Generator
  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • If adding a new generator, add it to the list of scaffold modules being tested
  • The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates then checking in your changes
  • Documentation which references the generator is updated as needed

Tests

Unit

Put your unit tests output here.

Integration

Put your integration tests output here.

Acceptance

Put your acceptance tests output here.

For larger tables the Debezium engine can fail to shut down cleanly. In this case extra threads are left running in the background which can cause this process to never exit. When this happens the entire AirByte pipeline will hang. This isn't the most ideal fix, but explicitly exiting will force any hung threads to close and allow the pipeline to complete.
@CLAassistant
Copy link

CLAassistant commented Apr 12, 2022

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@github-actions github-actions bot added the area/connectors Connector related issues label Apr 12, 2022
@agiletich
Copy link

I've built container with this fix and now it works nicely on big datasets.
@adammacleod, Thank you so much!

@marcosmarxm marcosmarxm requested a review from edgao April 18, 2022 23:05
@marcosmarxm marcosmarxm self-assigned this Apr 18, 2022
@marcosmarxm
Copy link
Member

Sorry the delay @adammacleod I asked the team to review your contribution; looks is good but we're discussing if maybe there is a better place to add this logic.

@edgao
Copy link
Contributor

edgao commented Apr 19, 2022

@adammacleod thanks for the patience here - we added a watchForOrphanThreads method to all destination connectors a few months ago, which seems to have been working well; I think a similar approach here (i.e. wrapping the messageIterator.forEachRemaining in watchForOrphanThreads) would fix this problem across all source connectors, rather than having a oneoff fix for source-mysql.

(@marcosmarxm we'll need to publish all the connectors afterward, unfortunately)

@marcosmarxm
Copy link
Member

@adammacleod do you want to implement the suggested change?

@marcosmarxm
Copy link
Member

Closing this because was implement by #12544

@marcosmarxm marcosmarxm closed this May 9, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues community
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants