Skip to content

Commit

Permalink
Introduce Jackson serializer which can serialize mostly anything (#260)
Browse files Browse the repository at this point in the history
  • Loading branch information
badgerwithagun authored Mar 14, 2022
1 parent bd1c71a commit 9ec6dc5
Show file tree
Hide file tree
Showing 17 changed files with 1,047 additions and 8 deletions.
120 changes: 118 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ A flexible implementation of the [Transaction Outbox Pattern](https://microservi
1. [Advanced](#advanced)
1. [The nested outbox pattern](#the-nested-outbox-pattern)
1. [Idempotency protection](#idempotency-protection)
1. [Flexible serialization](#flexible-serialization-beta)
1. [Clustering](#clustering)
1. [Configuration reference](#configuration-reference)
1. [Stubbing in tests](#stubbing-in-tests)

Expand Down Expand Up @@ -262,7 +264,9 @@ backgroundThread.interrupt();
backgroundThread.join();
```

Don't worry about it running on multiple instances simultaneously. It's designed to handle concurrent use (particularly on databases that support `SKIP LOCKED`, such as Postgres and MySQL 8+), and indeed multi-processing it can be a benefit; spreading high workloads across instances without any need for more complex high-availability configuration (that said, if you want to distribute work across a cluster at point of submission, this is also supported).
`flush()` is designed to handle concurrent use on databases that support `SKIP LOCKED`, such as Postgres and MySQL 8+. Feel free to run this as often as you like (within reason, e.g. once a minute) on every instance of your application. This can have the benefit of spreading work across multiple instances when the work backlog is extremely high, but is not as effective as a proper [clustering](#clustering) approach.

However, multiple concurrent calls to `flush()` can cause lock timeout errors on databases without `SKIP LOCKED` support, such as MySQL 5.7. This is harmless, but will cause a lot of log noise, so you may prefer to run on a single instance at a time to avoid this.

## Managing the "dead letter queue"

Expand Down Expand Up @@ -340,6 +344,118 @@ outbox.with()
Where `context-clientid` is a globally-unique identifier derived from the incoming request. Such ids are usually available from queue middleware as message ids, or if not you can require as part of the incoming API (possibly with a tenant prefix to ensure global uniqueness across tenants).
### Flexible serialization (beta)
Most people will use the default persistor, `DefaultPersistor`, to persist tasks to a relational database. This uses `DefaultInvocationSerializer` by default, which in turn uses [GSON](https://github.com/google/gson) to serialize as JSON. `DefaultInvocationSerializer` is extremely limited by design, with a small list of allowed classes in method arguments.
You can extend the list of support types by calling `serializableTypes` in its builder, but it will always be restricted to this global list. This is by design, to avoid building a [deserialization of untrusted data](https://owasp.org/www-community/vulnerabilities/Deserialization_of_untrusted_data) vulnerability into the library.
Furthermore, there is no support for the case where run-time and compile-time types differ, such as in polymorphic collections. The following will always fail with `DefaultInvocationSerializer`:
```
outbox.schedule(Service.class).processList(List.of(1, "2", 3L));
```
However, if you completely trust your serialized data (for example, your developers don't have write access to your production database, and the access credentials are well guarded) then you may prefer to have 100% flexibility, with no need to declare the types used and the ability to use any combination of run-time and compile-time types.

See [transaction-outbox-jackson](transactionoutbox-jackson/README.md), which uses a specially-configured Jackson `ObjectMapper` to achieve this.

### Clustering

The default mechanism for _running_ tasks (either immediately, or when they are picked up by background processing) is via a `java.concurrent.Executor`, which effectively does the following:
```
executor.execute(() -> outbox.processNow(transactionOutboxEntry));
```
This offloads processing to a background thread _on the application instance_ on which the task was picked up. Under high load, this can mean thousands of tasks being picked up from the database queue and submitted at the same time on the same application instance, even if there are 20 instances of the application, effectively limiting the total rate of processing to what a single instance can handle.

If you want to instead push the work for processing by _any_ of your application instances, thus spreading the work around a cluster, there are multiple approaches, just some of which are listed below:

* An HTTP endpoint on a load-balanced DNS with service discovery (such as a container orchestrator e.g. Kubernetes or Nomad)
* A shared queue (AWS SQS, ActiveMQ, ZeroMQ)
* A lower-level clustering/messaging toolkit such as [JGroups](http://www.jgroups.org/).

All of these can be implemented as follows:

When defining the `TransactionOutbox`, replace `ExecutorSubmitter` with something which serializes a `TransactionOutboxEntry` and ships it to the remote queue/address. Here's what configuration might look for a `RestApiSubmitter` which ships the request to a load-balanced endpoint hosted on Nomad/Consul:
```
TransactionOutbox outbox = TransactionOutbox.builder().submitter(restApiSubmitter)
```
It is strongly advised that you use a local executor in-line, to ensure that if there are communications issues with your endpoint or queue, it doesn't fail the calling thread. Here is an example using [Feign](https://github.com/OpenFeign/feign):
```
@Slf4j
class RestApiSubmitter implements Submitter {

private final FeignResource feignResource;
private final ExecutorService localExecutor;
private final Provider<TransactionOutbox> outbox;

@Inject
RestApiExecutor(String endpointUrl, ExecutorService localExecutor, ObjectMapper objectMapper, Provider<TransactionOutbox> outbox) {
this.feignResource = Feign.builder()
.decoder(new JacksonDecoder(objectMapper))
.target(GitHub.class, "https://api.github.com");;
this.localExecutor = localExecutor;
this.outbox = outbox;
}

@Override
public void submit(TransactionOutboxEntry entry, Consumer<TransactionOutboxEntry> leIgnore) {
try {
localExecutor.execute(() -> processRemotely(entry));
log.info("Queued {} to be sent for remote processing", entry.description());
} catch (RejectedExecutionException e) {
log.info("Will queue {} for processing when local executor is available", entry.description());
} catch (Exception e) {
log.warn("Failed to queue {} for execution at {}. It will be re-attempted later.", entry.description(), url, e);
}
}

private void processRemotely(TransactionOutboxEntry entry) {
try {
feignResource.process(entry);
log.info("Submitted {} for remote processing at {}", entry.description(), url);
} catch (Exception e) {
log.warn(
"Failed to submit {} for remote processing at {}. It will be re-attempted later.",
entry.description(),
url,
e
);
}
}

public interface FeignResource {
@RequestLine("POST /outbox/process")
void process(TransactionOutboxEntry entry);
}

}
```
Then listen on your communication mechanism for incoming serialized `TransactionOutboxEntry`s, and push them to a normal local `ExecutorSubmitter`. Here's what a JAX-RS example might look like:
```
@POST
@Path("/outbox/process")
void processOutboxEntry(String entry) {
TransactionOutboxEntry entry = somethingWhichCanSerializeTransactionOutboxEntries.deserialize(entry);
Submitter submitter = ExecutorSubmitter.builder().executor(localExecutor).logLevelWorkQueueSaturation(Level.INFO).build();
submitter.submit(entry, outbox.get()::processNow);
}
```
This whole approach is complicated a little by the inherent difficulty in serializing and deserializing a `TransactionOutboxEntry`, which is extremely polymorphic in nature. A reference approach is provided by [transaction-outbox-jackson](transactionoutbox-jackson/README.md), which provides the features necessary to make a Jackson `ObjectMapper` able to handle the work. With that on the classpath you can use an `ObjectMapper` as follows:
```
// Add support for TransactionOutboxEntry to your normal application ObjectMapper
yourNormalSharedObjectMapper.registerModule(new TransactionOutboxJacksonModule());
// (Optional) support deep polymorphic requests - for this we need to copy the object
// mapper so it doesn't break the way the rest of your application works
ObjectMapper objectMapper = yourNormalSharedObjectMapper.copy();
objectMapper.setDefaultTyping(TransactionOutboxJacksonModule.typeResolver());

// Serialize
String message = objectMapper.writeValueAsString(entry);

// Deserialize
TransactionOutboxEntry entry = objectMapper.readValue(message, TransactionOutboxEntry.class);
```
Armed with the above, happy clustering!

## Configuration reference

This example shows a number of other configuration options in action:
Expand Down Expand Up @@ -378,7 +494,7 @@ TransactionOutbox outbox = TransactionOutbox.builder()
// Change the log level used when work cannot be submitted to a saturated queue to INFO level (the default
// is WARN, which you should probably consider a production incident). You can also change the Executor used
// for submitting work to a shared thread pool used by the rest of your application. Fully-custom Submitter
// implementations are also easy to implement.
// implementations are also easy to implement, for example to cluster work.
.submitter(ExecutorSubmitter.builder()
.executor(ForkJoinPool.commonPool())
.logLevelWorkQueueSaturation(Level.INFO)
Expand Down
4 changes: 3 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

<modules>
<module>transactionoutbox-core</module>
<module>transactionoutbox-jackson</module>
<module>transactionoutbox-spring</module>
<module>transactionoutbox-guice</module>
<module>transactionoutbox-jooq</module>
Expand Down Expand Up @@ -239,8 +240,9 @@
<profile>
<id>active-on-jdk-11</id>
<activation>
<jdk>11</jdk>
<jdk>[11,16)</jdk>
</activation>

<build>
<plugins>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;

import lombok.Builder;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -123,8 +122,8 @@ private static final class InvocationJsonSerializer
implements JsonSerializer<Invocation>, JsonDeserializer<Invocation> {

private final int version;
private Map<Class<?>, String> classToName = new HashMap<>();
private Map<String, Class<?>> nameToClass = new HashMap<>();
private final Map<Class<?>, String> classToName = new HashMap<>();
private final Map<String, Class<?>> nameToClass = new HashMap<>();

InvocationJsonSerializer(Set<Class<?>> serializableClasses, int version) {
this.version = version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ private TypeCache<Class<?>> setupByteBuddyCache() {
try {
return new TypeCache<>(Sort.WEAK);
} catch (NoClassDefFoundError error) {
log.info("ByteBuddy is not on the classpath, so only interfaces can be used with transaction-outbox");
log.info(
"ByteBuddy is not on the classpath, so only interfaces can be used with transaction-outbox");
return null;
}
}
Expand All @@ -44,7 +45,8 @@ private ObjenesisStd setupObjenesis() {
try {
return new ObjenesisStd();
} catch (NoClassDefFoundError error) {
log.info("Objenesis is not on the classpath, so only interfaces or classes with default constructors can be used with transaction-outbox");
log.info(
"Objenesis is not on the classpath, so only interfaces or classes with default constructors can be used with transaction-outbox");
return null;
}
}
Expand Down
83 changes: 83 additions & 0 deletions transactionoutbox-jackson/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# transaction-outbox-jackson

[![Jackson on Maven Central](https://maven-badges.herokuapp.com/maven-central/com.gruelbox/transactionoutbox-jackson/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.gruelbox/transactionoutbox-guice)
[![Jackson Javadoc](https://www.javadoc.io/badge/com.gruelbox/transactionoutbox-jackson.svg?color=blue)](https://www.javadoc.io/doc/com.gruelbox/transactionoutbox-guice)
[![Latest snapshot](https://img.shields.io/github/v/tag/gruelbox/transaction-outbox?label=snapshot&sort=semver)](#development-snapshots)

Extension for [transaction-outbox-core](../README.md) which uses Jackson for serialisation.

If you are confident in trusting your database, then this serializer has a number of advantages: it is as
configurable as whatever Jackson's `ObjectMapper` can handle, and explicitly handles n-depth polymorphic trees. This
largely means that you can throw pretty much anything at it and it will "just work".

However, if there is any risk that you might not trust the source of the serialized `Invocation`,
_do not use this_. This serializer is vulnerable to
[deserialization of untrusted data](https://github.com/gruelbox/transaction-outbox/issues/236#issuecomment-1024929436),
which is why it is not included in the core library.

## Installation

### Stable releases

#### Maven

```xml
<dependency>
<groupId>com.gruelbox</groupId>
<artifactId>transactionoutbox-jackson</artifactId>
<version>4.2.268</version>
</dependency>
```

#### Gradle

```groovy
implementation 'com.gruelbox:transactionoutbox-jackson:4.2.268'
```

### Development snapshots

See [transactionoutbox-core](../README.md) for more information.

## Configuration

### Fresh projects

If starting with a fresh project, you don't need to worry about compatibility with DefaultInvocationSerializer, so configure as follows:

```java
var outbox = TransactionOutbox.builder()
.persistor(DefaultPersistor.builder()
.dialect(Dialect.H2)
.serializer(JacksonInvocationSerializer.builder()
.mapper(new ObjectMapper())
.build())
.build())
```

### Existing projects using DefaultInvocationSerializer

If you're already using Transaction Outbox, you may have outbox tasks queued which your application needs to continue to be capable of loading.
To handle this, pass through an instance of `DefaultInvocationSerializer` which matches what you used previously:

```java
var outbox = TransactionOutbox.builder()
.persistor(DefaultPersistor.builder()
.dialect(Dialect.H2)
.serializer(JacksonInvocationSerializer.builder()
.mapper(new ObjectMapper())
.defaultInvocationSerializer(DefaultInvocationSerializer.builder()
.serializableTypes(Set.of(Foo.class, Bar.class))
.build())
.build())
.build())
```

## Usage

You can now go wild with your scheduled method arguments:

```java
outbox.schedule(getClass())
.process(List.of(LocalDate.of(2000,1,1), "a", "b", 2));
```
97 changes: 97 additions & 0 deletions transactionoutbox-jackson/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>transactionoutbox-parent</artifactId>
<groupId>com.gruelbox</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<name>Transaction Outbox Jackson</name>
<packaging>jar</packaging>
<artifactId>transactionoutbox-jackson</artifactId>
<description>A safe implementation of the transactional outbox pattern for Java (Jackson extension library)
</description>

<properties>
<guice.version>5.2.4.RELEASE</guice.version>
<jackson.version>2.13.2</jackson.version>
<commons.lang.version>3.12.0</commons.lang.version>
</properties>

<dependencies>

<!-- Run time -->
<dependency>
<groupId>com.gruelbox</groupId>
<artifactId>transactionoutbox-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons.lang.version}</version>
</dependency>

<!-- Compile time -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

<!-- Test -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
</dependencies>
</project>
Loading

0 comments on commit 9ec6dc5

Please sign in to comment.