From 93aad05f1cdf50c7ff89b2ff76411ffc7a60fff5 Mon Sep 17 00:00:00 2001 From: victokoh <55743157+victokoh@users.noreply.github.com> Date: Mon, 10 May 2021 16:58:03 +0100 Subject: [PATCH] =?UTF-8?q?bugfix:=20default=20persistor=20should=20not=20?= =?UTF-8?q?swallow=20exceptions=20thrown=20when=20a=E2=80=A6=20(#141)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * bugfix: default persistor should not swallow exceptions thrown when a unique request id is present on a txoutbox entry. * +1 invocation `text` -> `mediumtext` for mysql dialects. --- .../DefaultMigrationManager.java | 7 +- .../transactionoutbox/DefaultPersistor.java | 1 + .../AbstractDefaultPersistorTest.java | 11 +++ .../acceptance/AbstractAcceptanceTest.java | 79 +++++++++++++++---- 4 files changed, 82 insertions(+), 16 deletions(-) diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultMigrationManager.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultMigrationManager.java index 82cf58d6..dbdc3a51 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultMigrationManager.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultMigrationManager.java @@ -68,7 +68,12 @@ class DefaultMigrationManager { "ALTER TABLE TXNO_OUTBOX ADD COLUMN lastAttemptTime TIMESTAMP(6) NULL AFTER invocation", Map.of( Dialect.POSTGRESQL_9, - "ALTER TABLE TXNO_OUTBOX ADD COLUMN lastAttemptTime TIMESTAMP(6)"))); + "ALTER TABLE TXNO_OUTBOX ADD COLUMN lastAttemptTime TIMESTAMP(6)")), + new Migration( + 8, + "Update length of invocation column on outbox for MySQL dialects only.", + "ALTER TABLE TXNO_OUTBOX MODIFY COLUMN invocation MEDIUMTEXT", + Map.of(Dialect.POSTGRESQL_9, "", Dialect.H2, ""))); static void migrate(TransactionManager transactionManager, @NotNull Dialect dialect) { transactionManager.inTransaction( diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java index fe96df77..4dae3f30 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java @@ -114,6 +114,7 @@ public void save(Transaction tx, TransactionOutboxEntry entry) throw new AlreadyScheduledException( "Request " + entry.description() + " already exists", e); } + throw e; } } } diff --git a/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/AbstractDefaultPersistorTest.java b/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/AbstractDefaultPersistorTest.java index cf7426e8..84e94772 100644 --- a/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/AbstractDefaultPersistorTest.java +++ b/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/AbstractDefaultPersistorTest.java @@ -53,6 +53,17 @@ void testInsertAndSelect() throws Exception { tx -> assertThat(persistor().selectBatch(tx, 100, now.plusMillis(1)), contains(entry))); } + @Test + void testInsertWithUniqueRequestIdFailureBubblesExceptionUp() { + var invalidEntry = + createEntry("FOO", now, false).toBuilder() + .uniqueRequestId("INTENTIONALLY_TOO_LONG_TO_CAUSE_BLOW_UP".repeat(10)) + .build(); + assertThrows( + RuntimeException.class, + () -> txManager().inTransactionThrows(tx -> persistor().save(tx, invalidEntry))); + } + @Test void testInsertDuplicate() throws Exception { TransactionOutboxEntry entry1 = createEntry("FOO1", now, false, "context-clientkey1"); diff --git a/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/acceptance/AbstractAcceptanceTest.java b/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/acceptance/AbstractAcceptanceTest.java index 88fba757..3d97fb8c 100644 --- a/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/acceptance/AbstractAcceptanceTest.java +++ b/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/acceptance/AbstractAcceptanceTest.java @@ -3,22 +3,9 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.junit.Assert.*; +import static org.junit.jupiter.api.Assertions.assertThrows; -import com.gruelbox.transactionoutbox.AlreadyScheduledException; -import com.gruelbox.transactionoutbox.DefaultPersistor; -import com.gruelbox.transactionoutbox.Dialect; -import com.gruelbox.transactionoutbox.Instantiator; -import com.gruelbox.transactionoutbox.NoTransactionActiveException; -import com.gruelbox.transactionoutbox.Persistor; -import com.gruelbox.transactionoutbox.Submitter; -import com.gruelbox.transactionoutbox.ThreadLocalContextTransactionManager; -import com.gruelbox.transactionoutbox.ThrowingRunnable; -import com.gruelbox.transactionoutbox.ThrowingTransactionalSupplier; -import com.gruelbox.transactionoutbox.Transaction; -import com.gruelbox.transactionoutbox.TransactionManager; -import com.gruelbox.transactionoutbox.TransactionOutbox; -import com.gruelbox.transactionoutbox.TransactionOutboxEntry; -import com.gruelbox.transactionoutbox.TransactionOutboxListener; +import com.gruelbox.transactionoutbox.*; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import java.sql.Connection; @@ -45,6 +32,9 @@ import lombok.Value; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.junit.Assume; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -402,6 +392,65 @@ final void retryBehaviour() throws Exception { }); } + @Test + final void onSchedulingFailure_BubbleExceptionsUp() throws Exception { + TransactionManager transactionManager = simpleTxnManager(); + CountDownLatch latch = new CountDownLatch(1); + TransactionOutbox outbox = + TransactionOutbox.builder() + .transactionManager(transactionManager) + .instantiator( + Instantiator.using( + clazz -> + (InterfaceProcessor) + (foo, bar) -> + LOGGER.info( + "Entered the method to process successfully. Processing ({}, {})", + foo, + bar))) + .persistor(Persistor.forDialect(connectionDetails().dialect())) + .submitter(Submitter.withExecutor(unreliablePool)) + .attemptFrequency(Duration.ofMillis(500)) + .listener(new LatchListener(latch)) + .build(); + + clearOutbox(); + + withRunningFlusher( + outbox, + () -> { + // run for mysql only. + var dialect = connectionDetails().dialect(); + Assume.assumeThat( + dialect, + new Matcher<>() { + @Override + public void describeTo(Description description) {} + + @Override + public boolean matches(Object o) { + return Dialect.MY_SQL_8.equals(o) || Dialect.MY_SQL_5.equals(o); + } + + @Override + public void describeMismatch(Object o, Description description) {} + + @Override + public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {} + }); + assertThrows( + Exception.class, + () -> + transactionManager.inTransaction( + () -> + outbox + .with() + .uniqueRequestId("some_unique_id") + .schedule(InterfaceProcessor.class) + .process(1, "This invocation is too long".repeat(650000)))); + }); + } + @Test final void lastAttemptTime_updatesEveryTime() throws Exception { TransactionManager transactionManager = simpleTxnManager();