Skip to content

Commit

Permalink
bugfix: default persistor should not swallow exceptions thrown when a… (
Browse files Browse the repository at this point in the history
#141)

* bugfix: default persistor should not swallow exceptions thrown when a unique request id is present on a txoutbox entry.
* +1 invocation `text` -> `mediumtext` for mysql dialects.
  • Loading branch information
victokoh authored May 10, 2021
1 parent aff3812 commit 93aad05
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ class DefaultMigrationManager {
"ALTER TABLE TXNO_OUTBOX ADD COLUMN lastAttemptTime TIMESTAMP(6) NULL AFTER invocation",
Map.of(
Dialect.POSTGRESQL_9,
"ALTER TABLE TXNO_OUTBOX ADD COLUMN lastAttemptTime TIMESTAMP(6)")));
"ALTER TABLE TXNO_OUTBOX ADD COLUMN lastAttemptTime TIMESTAMP(6)")),
new Migration(
8,
"Update length of invocation column on outbox for MySQL dialects only.",
"ALTER TABLE TXNO_OUTBOX MODIFY COLUMN invocation MEDIUMTEXT",
Map.of(Dialect.POSTGRESQL_9, "", Dialect.H2, "")));

static void migrate(TransactionManager transactionManager, @NotNull Dialect dialect) {
transactionManager.inTransaction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public void save(Transaction tx, TransactionOutboxEntry entry)
throw new AlreadyScheduledException(
"Request " + entry.description() + " already exists", e);
}
throw e;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ void testInsertAndSelect() throws Exception {
tx -> assertThat(persistor().selectBatch(tx, 100, now.plusMillis(1)), contains(entry)));
}

@Test
void testInsertWithUniqueRequestIdFailureBubblesExceptionUp() {
var invalidEntry =
createEntry("FOO", now, false).toBuilder()
.uniqueRequestId("INTENTIONALLY_TOO_LONG_TO_CAUSE_BLOW_UP".repeat(10))
.build();
assertThrows(
RuntimeException.class,
() -> txManager().inTransactionThrows(tx -> persistor().save(tx, invalidEntry)));
}

@Test
void testInsertDuplicate() throws Exception {
TransactionOutboxEntry entry1 = createEntry("FOO1", now, false, "context-clientkey1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,9 @@
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.junit.Assert.*;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.gruelbox.transactionoutbox.AlreadyScheduledException;
import com.gruelbox.transactionoutbox.DefaultPersistor;
import com.gruelbox.transactionoutbox.Dialect;
import com.gruelbox.transactionoutbox.Instantiator;
import com.gruelbox.transactionoutbox.NoTransactionActiveException;
import com.gruelbox.transactionoutbox.Persistor;
import com.gruelbox.transactionoutbox.Submitter;
import com.gruelbox.transactionoutbox.ThreadLocalContextTransactionManager;
import com.gruelbox.transactionoutbox.ThrowingRunnable;
import com.gruelbox.transactionoutbox.ThrowingTransactionalSupplier;
import com.gruelbox.transactionoutbox.Transaction;
import com.gruelbox.transactionoutbox.TransactionManager;
import com.gruelbox.transactionoutbox.TransactionOutbox;
import com.gruelbox.transactionoutbox.TransactionOutboxEntry;
import com.gruelbox.transactionoutbox.TransactionOutboxListener;
import com.gruelbox.transactionoutbox.*;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
Expand All @@ -45,6 +32,9 @@
import lombok.Value;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.junit.Assume;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
Expand Down Expand Up @@ -402,6 +392,65 @@ final void retryBehaviour() throws Exception {
});
}

@Test
final void onSchedulingFailure_BubbleExceptionsUp() throws Exception {
TransactionManager transactionManager = simpleTxnManager();
CountDownLatch latch = new CountDownLatch(1);
TransactionOutbox outbox =
TransactionOutbox.builder()
.transactionManager(transactionManager)
.instantiator(
Instantiator.using(
clazz ->
(InterfaceProcessor)
(foo, bar) ->
LOGGER.info(
"Entered the method to process successfully. Processing ({}, {})",
foo,
bar)))
.persistor(Persistor.forDialect(connectionDetails().dialect()))
.submitter(Submitter.withExecutor(unreliablePool))
.attemptFrequency(Duration.ofMillis(500))
.listener(new LatchListener(latch))
.build();

clearOutbox();

withRunningFlusher(
outbox,
() -> {
// run for mysql only.
var dialect = connectionDetails().dialect();
Assume.assumeThat(
dialect,
new Matcher<>() {
@Override
public void describeTo(Description description) {}

@Override
public boolean matches(Object o) {
return Dialect.MY_SQL_8.equals(o) || Dialect.MY_SQL_5.equals(o);
}

@Override
public void describeMismatch(Object o, Description description) {}

@Override
public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {}
});
assertThrows(
Exception.class,
() ->
transactionManager.inTransaction(
() ->
outbox
.with()
.uniqueRequestId("some_unique_id")
.schedule(InterfaceProcessor.class)
.process(1, "This invocation is too long".repeat(650000))));
});
}

@Test
final void lastAttemptTime_updatesEveryTime() throws Exception {
TransactionManager transactionManager = simpleTxnManager();
Expand Down

0 comments on commit 93aad05

Please sign in to comment.