Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Flow topology safe on duplicate for JDBC #4825

Closed
tchiotludo opened this issue Sep 4, 2024 · 2 comments · Fixed by #4887, #5019 or #5062
Closed

Make Flow topology safe on duplicate for JDBC #4825

tchiotludo opened this issue Sep 4, 2024 · 2 comments · Fixed by #4887, #5019 or #5062
Assignees
Labels
area/backend Needs backend code changes bug Something isn't working kind/customer-request Requested by one or more customers kind/quick-win Seems to be quick to do

Comments

@tchiotludo
Copy link
Member

some flow topology insert can crash the application :

org.jooq.exception.IntegrityConstraintViolationException: SQL [insert into flow_topologies ("key", "value") values ('***, cast('{"source":{"uid":"'***","tenantId":"'***","namespace":"'***","id":"'***"},"relation":"FLOW_TASK","destination":{"uid":"'***","tenantId":"'***","namespace":"'***","id":"'***"}}' as jsonb))]; Batch entry 5 insert into flow_topologies ("key", "value") values (''***', cast('{"source":{"uid":"'***","tenantId":"'***","namespace":"'***","id":"'***"},"relation":"FLOW_TASK","destination":{"uid":"'***","tenantId":"'***","namespace":"'***","id":"'***"}}' as jsonb)) was aborted: ERROR: duplicate key value violates unique constraint "flow_topologies_pkey"

that lead to

2024-09-03 15:19:36,795 WARN  jdbc-queue-Execution_0 c.zaxxer.hikari.pool.ProxyConnection HikariPool-1 - Connection org.postgresql.jdbc.PgConnection@1bec7e3 marked as broken because of SQLSTATE(08006), ErrorCode(0)
org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:398)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:329)
	at org.postgresql.jdbc.PgConnection.executeTransactionCommand(PgConnection.java:981)
	at org.postgresql.jdbc.PgConnection.commit(PgConnection.java:1003)
	at com.zaxxer.hikari.pool.ProxyConnection.commit(ProxyConnection.java:378)
	at com.zaxxer.hikari.pool.HikariProxyConnection.commit(HikariProxyConnection.java)
	at io.micronaut.transaction.jdbc.DataSourceTransactionManager.doCommit(DataSourceTransactionManager.java:156)
	at io.micronaut.transaction.jdbc.DataSourceTransactionManager.doCommit(DataSourceTransactionManager.java:53)
	at io.micronaut.transaction.support.AbstractTransactionOperations.commitInternal(AbstractTransactionOperations.java:379)
	at io.micronaut.transaction.support.AbstractTransactionOperations.commit(AbstractTransactionOperations.java:571)
	at io.micronaut.configuration.jooq.MicronautTransactionProvider.commit(MicronautTransactionProvider.java:62)
	at org.jooq.impl.DefaultDSLContext.lambda$transactionResult0$3(DefaultDSLContext.java:534)
	at org.jooq.impl.Tools$3$1.block(Tools.java:6325)
	at java.base/java.util.concurrent.ForkJoinPool.unmanagedBlock(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinPool.managedBlock(Unknown Source)
	at org.jooq.impl.Tools$3.get(Tools.java:6322)
	at org.jooq.impl.DefaultDSLContext.transactionResult0(DefaultDSLContext.java:578)
	at org.jooq.impl.DefaultDSLContext.transactionResult(DefaultDSLContext.java:502)
	at org.jooq.impl.DefaultDSLContext.transaction(DefaultDSLContext.java:591)
	at io.kestra.jdbc.JooqDSLContextWrapper.lambda$transaction$1(JooqDSLContextWrapper.java:58)
	at dev.failsafe.Functions.lambda$toCtxSupplier$11(Functions.java:243)
	at dev.failsafe.Functions.lambda$get$0(Functions.java:46)
	at dev.failsafe.internal.RetryPolicyExecutor.lambda$apply$0(RetryPolicyExecutor.java:74)
	at dev.failsafe.internal.FallbackExecutor.lambda$apply$0(FallbackExecutor.java:51)
	at dev.failsafe.SyncExecutionImpl.executeSync(SyncExecutionImpl.java:187)
	at dev.failsafe.FailsafeExecutor.call(FailsafeExecutor.java:376)
	at dev.failsafe.FailsafeExecutor.get(FailsafeExecutor.java:112)
	at io.kestra.core.utils.RetryUtils$Instance.wrap(RetryUtils.java:144)
	at io.kestra.core.utils.RetryUtils$Instance.runRetryIf(RetryUtils.java:103)
	at io.kestra.jdbc.JooqDSLContextWrapper.transaction(JooqDSLContextWrapper.java:55)
	at io.kestra.jdbc.runner.JdbcQueue.produce(JdbcQueue.java:101)
	at io.kestra.jdbc.runner.JdbcQueue.emitOnly(JdbcQueue.java:116)
	at io.kestra.jdbc.runner.JdbcExecutor.toExecution(JdbcExecutor.java:764)
	at io.kestra.jdbc.runner.JdbcExecutor.executionQueue(JdbcExecutor.java:462)
	at java.base/java.util.ArrayList.forEach(Unknown Source)
	at io.kestra.jdbc.runner.JdbcQueue.lambda$receive$6(JdbcQueue.java:197)
	at io.kestra.jdbc.runner.JdbcQueue.lambda$receiveImpl$9(JdbcQueue.java:246)
	at io.kestra.jdbc.runner.JdbcQueue.lambda$poll$10(JdbcQueue.java:269)
	at io.micrometer.core.instrument.internal.TimedRunnable.run(TimedRunnable.java:49)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Socket closed
	at java.base/sun.nio.ch.NioSocketImpl.endRead(Unknown Source)
	at java.base/sun.nio.ch.NioSocketImpl.implRead(Unknown Source)
	at java.base/sun.nio.ch.NioSocketImpl.read(Unknown Source)
	at java.base/sun.nio.ch.NioSocketImpl$1.read(Unknown Source)
	at java.base/java.net.Socket$SocketInputStream.read(Unknown Source)
	at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:162)
	at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:129)
	at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:114)
	at org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:74)
	at org.postgresql.core.PGStream.receiveChar(PGStream.java:467)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2166)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:371)
	... 41 common frames omitted
2024-09-03 15:19:36,796 ERROR jdbc-queue-Execution_0 i.m.t.j.DataSourceTransactionManager Commit exception overridden by rollback exception
io.micronaut.transaction.exceptions.TransactionSystemException: Could not commit JDBC transaction
	at io.micronaut.transaction.jdbc.DataSourceTransactionManager.doCommit(DataSourceTransactionManager.java:158)
	at io.micronaut.transaction.jdbc.DataSourceTransactionManager.doCommit(DataSourceTransactionManager.java:53)
	at io.micronaut.transaction.support.AbstractTransactionOperations.commitInternal(AbstractTransactionOperations.java:379)
	at io.micronaut.transaction.support.AbstractTransactionOperations.commit(AbstractTransactionOperations.java:571)
	at io.micronaut.configuration.jooq.MicronautTransactionProvider.commit(MicronautTransactionProvider.java:62)
	at org.jooq.impl.DefaultDSLContext.lambda$transactionResult0$3(DefaultDSLContext.java:534)
	at org.jooq.impl.Tools$3$1.block(Tools.java:6325)
	at java.base/java.util.concurrent.ForkJoinPool.unmanagedBlock(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinPool.managedBlock(Unknown Source)
	at org.jooq.impl.Tools$3.get(Tools.java:6322)
	at org.jooq.impl.DefaultDSLContext.transactionResult0(DefaultDSLContext.java:578)
	at org.jooq.impl.DefaultDSLContext.transactionResult(DefaultDSLContext.java:502)
	at org.jooq.impl.DefaultDSLContext.transaction(DefaultDSLContext.java:591)
	at io.kestra.jdbc.JooqDSLContextWrapper.lambda$transaction$1(JooqDSLContextWrapper.java:58)
	at dev.failsafe.Functions.lambda$toCtxSupplier$11(Functions.java:243)
	at dev.failsafe.Functions.lambda$get$0(Functions.java:46)
	at dev.failsafe.internal.RetryPolicyExecutor.lambda$apply$0(RetryPolicyExecutor.java:74)
	at dev.failsafe.internal.FallbackExecutor.lambda$apply$0(FallbackExecutor.java:51)
	at dev.failsafe.SyncExecutionImpl.executeSync(SyncExecutionImpl.java:187)
	at dev.failsafe.FailsafeExecutor.call(FailsafeExecutor.java:376)
	at dev.failsafe.FailsafeExecutor.get(FailsafeExecutor.java:112)
	at io.kestra.core.utils.RetryUtils$Instance.wrap(RetryUtils.java:144)
	at io.kestra.core.utils.RetryUtils$Instance.runRetryIf(RetryUtils.java:103)
	at io.kestra.jdbc.JooqDSLContextWrapper.transaction(JooqDSLContextWrapper.java:55)
	at io.kestra.jdbc.runner.JdbcQueue.produce(JdbcQueue.java:101)
	at io.kestra.jdbc.runner.JdbcQueue.emitOnly(JdbcQueue.java:116)
	at io.kestra.jdbc.runner.JdbcExecutor.toExecution(JdbcExecutor.java:764)
	at io.kestra.jdbc.runner.JdbcExecutor.executionQueue(JdbcExecutor.java:462)
	at java.base/java.util.ArrayList.forEach(Unknown Source)
	at io.kestra.jdbc.runner.JdbcQueue.lambda$receive$6(JdbcQueue.java:197)
	at io.kestra.jdbc.runner.JdbcQueue.lambda$receiveImpl$9(JdbcQueue.java:246)
	at io.kestra.jdbc.runner.JdbcQueue.lambda$poll$10(JdbcQueue.java:269)
	at io.micrometer.core.instrument.internal.TimedRunnable.run(TimedRunnable.java:49)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:398)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:329)
	at org.postgresql.jdbc.PgConnection.executeTransactionCommand(PgConnection.java:981)
	at org.postgresql.jdbc.PgConnection.commit(PgConnection.java:1003)
	at com.zaxxer.hikari.pool.ProxyConnection.commit(ProxyConnection.java:378)
	at com.zaxxer.hikari.pool.HikariProxyConnection.commit(HikariProxyConnection.java)
	at io.micronaut.transaction.jdbc.DataSourceTransactionManager.doCommit(DataSourceTransactionManager.java:156)
	... 35 common frames omitted
Caused by: java.net.SocketException: Socket closed
	at java.base/sun.nio.ch.NioSocketImpl.endRead(Unknown Source)
	at java.base/sun.nio.ch.NioSocketImpl.implRead(Unknown Source)
	at java.base/sun.nio.ch.NioSocketImpl.read(Unknown Source)
	at java.base/sun.nio.ch.NioSocketImpl$1.read(Unknown Source)
	at java.base/java.net.Socket$SocketInputStream.read(Unknown Source)
	at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:162)
	at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:129)
	at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:114)
	at org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:74)
	at org.postgresql.core.PGStream.receiveChar(PGStream.java:467)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2166)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:371)
	... 41 common frames omitted
2024-09-03 15:19:36,799 WARN  command-shutdown io.kestra.cli.AbstractCommand Receiving shutdown ! Try to graceful exit
2024-09-03 15:19:36,799 INFO  command-shutdown i.kestra.core.contexts.KestraContext Kestra server - Shutdown initiated
2024-09-03 15:19:36,799 INFO  command-shutdown i.kestra.core.contexts.KestraContext Kestra server - Shutdown completed

We should:

  • Be sure to be transaction safe, the topology can be edited on multiple thread
  • or use a simple REPLACE INTO instead INSERT INTO if we are not sure at 100%, it will don't change the data behind
@tchiotludo tchiotludo added bug Something isn't working area/backend Needs backend code changes kind/quick-win Seems to be quick to do kind/customer-request Requested by one or more customers labels Sep 4, 2024
@tchiotludo tchiotludo added this to Issues Sep 4, 2024
@github-project-automation github-project-automation bot moved this to Backlog in Issues Sep 4, 2024
@loicmathieu loicmathieu self-assigned this Sep 11, 2024
loicmathieu added a commit that referenced this issue Sep 11, 2024
We usually always include `onDuplicateKeyUpdate()` when we save an object in the database, it was missing for batch insert of flow topologies.

Fixes #4825
loicmathieu added a commit that referenced this issue Sep 11, 2024
We usually always include `onDuplicateKeyUpdate()` when we save an object in the database, it was missing for batch insert of flow topologies.

Fixes #4825
loicmathieu added a commit that referenced this issue Sep 11, 2024
We usually always include `onDuplicateKeyUpdate()` when we save an object in the database, it was missing for batch insert of flow topologies.

Fixes #4825
@github-project-automation github-project-automation bot moved this from Backlog to Done in Issues Sep 11, 2024
@github-project-automation github-project-automation bot moved this from Backlog to Done in Issues Sep 11, 2024
fhussonnois pushed a commit that referenced this issue Sep 12, 2024
We usually always include `onDuplicateKeyUpdate()` when we save an object in the database, it was missing for batch insert of flow topologies.

Fixes #4825
@Skraye
Copy link
Member

Skraye commented Sep 23, 2024

Not working on PG14

@Skraye Skraye reopened this Sep 23, 2024
@github-project-automation github-project-automation bot moved this from Done to Backlog in Issues Sep 23, 2024
Skraye added a commit that referenced this issue Sep 23, 2024
Skraye added a commit that referenced this issue Sep 23, 2024
@Skraye Skraye closed this as completed in 82d2170 Sep 23, 2024
@github-project-automation github-project-automation bot moved this from Backlog to Done in Issues Sep 23, 2024
Skraye added a commit that referenced this issue Sep 23, 2024
@tchiotludo tchiotludo reopened this Sep 23, 2024
@github-project-automation github-project-automation bot moved this from Done to Backlog in Issues Sep 23, 2024
@tchiotludo
Copy link
Member Author

The fix will not prevent the executor to crash if we have another error, please capture the error and make the executor only log it as ERROR without crashing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/backend Needs backend code changes bug Something isn't working kind/customer-request Requested by one or more customers kind/quick-win Seems to be quick to do
Projects
Status: Done
3 participants