Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide an interface for generating sequences #678

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ void test() {

TransactionManager transactionManager = TransactionManager.fromDataSource(dataSource);

Dialect dialect = Dialect.POSTGRESQL_9;
TransactionOutbox outbox =
TransactionOutbox.builder()
// The most complex part to set up for most will be synchronizing with your existing
Expand All @@ -47,7 +48,8 @@ void test() {
DefaultPersistor.builder()
// Selecting the right SQL dialect ensures that features such as SKIP LOCKED are
// used correctly.
.dialect(Dialect.POSTGRESQL_9)
.dialect(dialect)
.sequenceGenerator(DefaultSequenceGenerator.builder().dialect(dialect).build())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be necessary (see comments on DefaultPersistor)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

// Override the table name (defaults to "TXNO_OUTBOX")
.tableName("transactionOutbox")
// Shorten the time we will wait for write locks (defaults to 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ final void workAlwaysSerialized() throws Exception {
.persistor(
DefaultPersistor.builder()
.dialect(connectionDetails().dialect())
.sequenceGenerator(DefaultSequenceGenerator.builder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be necessary (see comments on DefaultPersistor)

.dialect(connectionDetails().dialect())
.build())
.serializer(
DefaultInvocationSerializer.builder()
.serializableTypes(Set.of(Arg.class))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.gruelbox.transactionoutbox;

import com.gruelbox.transactionoutbox.spi.Utils;
import java.io.IOException;
import java.io.Reader;
import java.io.StringWriter;
import java.io.Writer;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.SQLTimeoutException;
import java.sql.Statement;
import java.sql.Timestamp;
Expand Down Expand Up @@ -56,6 +56,12 @@ public class DefaultPersistor implements Persistor, Validatable {
@SuppressWarnings("JavaDoc")
private final Dialect dialect;

/**
* @param sequenceGenerator The sequence generator used for ordered tasks. Required
*/
@SuppressWarnings("JavaDoc")
private final SequenceGenerator sequenceGenerator;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On its own, this is a compatibility break; it requires all existing users to specify DefaultSequenceGenerator when creating a DefaultPersistor. I suggest changing this class to have a constructor annotated with @Builder; then you can accept null for sequenceGenerator and replace it with a DefaultSequenceGenerator.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


/**
* @param tableName The database table name. The default is {@code TXNO_OUTBOX}.
*/
Expand Down Expand Up @@ -88,6 +94,7 @@ public class DefaultPersistor implements Persistor, Validatable {
public void validate(Validator validator) {
validator.notNull("dialect", dialect);
validator.notNull("tableName", tableName);
validator.notNull("sequenceGenerator", sequenceGenerator);
}

@Override
Expand Down Expand Up @@ -119,8 +126,12 @@ public void save(Transaction tx, TransactionOutboxEntry entry)
var writer = new StringWriter();
serializer.serializeInvocation(entry.getInvocation(), writer);
if (entry.getTopic() != null) {
setNextSequence(tx, entry);
log.info("Assigned sequence number {} to topic {}", entry.getSequence(), entry.getTopic());
try {
entry.setSequence(sequenceGenerator.generate(tx, entry.getTopic()));
log.info("Assigned sequence number {} to topic {}", entry.getSequence(), entry.getTopic());
} catch (Exception e) {
throw new RuntimeException("Failed to assign sequence number", e);
}
}
PreparedStatement stmt = tx.prepareBatchStatement(insertSql);
setupInsert(entry, writer, stmt);
Expand All @@ -132,7 +143,7 @@ public void save(Transaction tx, TransactionOutboxEntry entry)
stmt.executeUpdate();
log.debug("Inserted {} immediately", entry.description());
} catch (Exception e) {
if (indexViolation(e)) {
if (Utils.indexViolation(e)) {
throw new AlreadyScheduledException(
"Request " + entry.description() + " already exists", e);
}
Expand All @@ -141,47 +152,6 @@ public void save(Transaction tx, TransactionOutboxEntry entry)
}
}

private void setNextSequence(Transaction tx, TransactionOutboxEntry entry) throws SQLException {
//noinspection resource
var seqSelect = tx.prepareBatchStatement(dialect.getFetchNextSequence());
seqSelect.setString(1, entry.getTopic());
try (ResultSet rs = seqSelect.executeQuery()) {
if (rs.next()) {
entry.setSequence(rs.getLong(1) + 1L);
//noinspection resource
var seqUpdate =
tx.prepareBatchStatement("UPDATE TXNO_SEQUENCE SET seq = ? WHERE topic = ?");
seqUpdate.setLong(1, entry.getSequence());
seqUpdate.setString(2, entry.getTopic());
seqUpdate.executeUpdate();
} else {
try {
entry.setSequence(1L);
//noinspection resource
var seqInsert =
tx.prepareBatchStatement("INSERT INTO TXNO_SEQUENCE (topic, seq) VALUES (?, ?)");
seqInsert.setString(1, entry.getTopic());
seqInsert.setLong(2, entry.getSequence());
seqInsert.executeUpdate();
} catch (Exception e) {
if (indexViolation(e)) {
setNextSequence(tx, entry);
} else {
throw e;
}
}
}
}
}

private boolean indexViolation(Exception e) {
return (e instanceof SQLIntegrityConstraintViolationException)
|| (e.getClass().getName().equals("org.postgresql.util.PSQLException")
&& e.getMessage().contains("constraint"))
|| (e.getClass().getName().equals("com.microsoft.sqlserver.jdbc.SQLServerException")
&& e.getMessage().contains("duplicate key"));
}

private void setupInsert(
TransactionOutboxEntry entry, StringWriter writer, PreparedStatement stmt)
throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.gruelbox.transactionoutbox;

import com.gruelbox.transactionoutbox.spi.Utils;
import java.sql.ResultSet;

import lombok.AccessLevel;
import lombok.Builder;
import lombok.RequiredArgsConstructor;

/**
* Generates a sequence number based on the <i>TXNO_SEQUENCE</i> table in a relational database.
*/
@Builder
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
public class DefaultSequenceGenerator implements SequenceGenerator, Validatable {
private final Dialect dialect;

@Override
public long generate(Transaction tx, String topic) throws Exception {
//noinspection resource
var seqSelect = tx.prepareBatchStatement(dialect.getFetchNextSequence());
seqSelect.setString(1, topic);
try (ResultSet rs = seqSelect.executeQuery()) {
long sequence = 1L;
if (rs.next()) {
sequence = rs.getLong(1) + 1;
//noinspection resource
var seqUpdate =
tx.prepareBatchStatement("UPDATE TXNO_SEQUENCE SET seq = ? WHERE topic = ?");
seqUpdate.setLong(1, sequence);
seqUpdate.setString(2, topic);
seqUpdate.executeUpdate();
} else {
try {
//noinspection resource
var seqInsert =
tx.prepareBatchStatement("INSERT INTO TXNO_SEQUENCE (topic, seq) VALUES (?, ?)");
seqInsert.setString(1, topic);
seqInsert.setLong(2, sequence);
seqInsert.executeUpdate();
} catch (Exception e) {
if (Utils.indexViolation(e)) {
return generate(tx, topic);
} else {
throw e;
}
}
}

return sequence;
}
}

@Override
public void validate(Validator validator) {
validator.notNull("dialect", dialect);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ public interface Persistor {
* @return The persistor.
*/
static DefaultPersistor forDialect(Dialect dialect) {
return DefaultPersistor.builder().dialect(dialect).build();
return DefaultPersistor.builder()
.dialect(dialect)
.sequenceGenerator(DefaultSequenceGenerator.builder().dialect(dialect).build())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be necessary (see comments on DefaultPersistor)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.gruelbox.transactionoutbox;

/**
* Generates sequences for a topic that is used in ordered tasks.
* For most use cases, just use {@link DefaultSequenceGenerator}.
*/
public interface SequenceGenerator {
/**
* Returns the sequence number for a topic
*
* @param tx The current {@link Transaction}
* @param topic The topic. Can be considered as a key in your storage
* @return The sequence number for a topic
* @throws Exception Any exception
*/
long generate(Transaction tx, String topic) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.gruelbox.transactionoutbox.ThrowingRunnable;
import com.gruelbox.transactionoutbox.UncheckedException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.function.Supplier;
Expand All @@ -24,6 +25,14 @@ public static boolean safelyRun(String gerund, ThrowingRunnable runnable) {
}
}

public static boolean indexViolation(Exception e) {
return (e instanceof SQLIntegrityConstraintViolationException)
|| (e.getClass().getName().equals("org.postgresql.util.PSQLException")
&& e.getMessage().contains("constraint"))
|| (e.getClass().getName().equals("com.microsoft.sqlserver.jdbc.SQLServerException")
&& e.getMessage().contains("duplicate key"));
}

@SuppressWarnings("unused")
public static void safelyClose(AutoCloseable... closeables) {
safelyClose(Arrays.asList(closeables));
Expand Down