Skip to content

Commit

Permalink
Add Transactional outbox pattern implementation based on direct write…
Browse files Browse the repository at this point in the history
…s to the WAL
  • Loading branch information
rkudryashov committed Aug 13, 2024
1 parent ec042c3 commit a7389fb
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.romankudryashov.eventdrivenarchitecture.commonmodel

import java.util.UUID

// this class is used for direct writes of outbox messages to the WAL
data class OutboxMessage<T>(
val id: UUID = UUID.randomUUID(),
val aggregateType: AggregateType,
val aggregateId: Long?,
val type: EventType,
val topic: String,
val payload: T
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.romankudryashov.eventdrivenarchitecture.commonmodel.Book
import com.romankudryashov.eventdrivenarchitecture.commonmodel.BookLoan
import com.romankudryashov.eventdrivenarchitecture.commonmodel.CurrentAndPreviousState
import com.romankudryashov.eventdrivenarchitecture.commonmodel.Notification
import com.romankudryashov.eventdrivenarchitecture.commonmodel.OutboxMessage
import org.springframework.aot.hint.MemberCategory
import org.springframework.aot.hint.RuntimeHints
import org.springframework.aot.hint.RuntimeHintsRegistrar
Expand All @@ -20,6 +21,7 @@ class CommonRuntimeHints : RuntimeHintsRegistrar {
.registerType(BookLoan::class.java, MemberCategory.INVOKE_PUBLIC_METHODS, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS)
.registerType(CurrentAndPreviousState::class.java, MemberCategory.INVOKE_PUBLIC_METHODS, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS)
.registerType(Notification::class.java, MemberCategory.INVOKE_PUBLIC_METHODS, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS)
.registerType(OutboxMessage::class.java, MemberCategory.INVOKE_PUBLIC_METHODS, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS)
// required to persist entities
// TODO: remove after https://hibernate.atlassian.net/browse/HHH-16809
.registerType(Array<UUID>::class.java, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS)
Expand Down
12 changes: 7 additions & 5 deletions kafka-connect/connectors/user.source.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,23 @@
"topic.prefix": "user.source",
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 6,
"table.include.list": "public.outbox",
"schema.exclude.list": "public",
"heartbeat.interval.ms": "5000",
"tombstones.on.delete": false,
"publication.autocreate.mode": "filtered",
"transforms": "addMetadataHeaders,outbox",
"publication.autocreate.mode": "no_tables",
"transforms": "addMetadataHeaders,decode,outbox",
"transforms.addMetadataHeaders.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.addMetadataHeaders.fields": "source,op,transaction",
"transforms.addMetadataHeaders.headers": "source,op,transaction",
"transforms.addMetadataHeaders.operation": "copy",
"transforms.addMetadataHeaders.predicate": "isHeartbeat",
"transforms.addMetadataHeaders.negate": true,
"transforms.decode.type": "io.debezium.connector.postgresql.transforms.DecodeLogicalDecodingMessageContent",
"transforms.decode.fields.null.include": true,
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.key": "aggregateId",
"transforms.outbox.table.expand.json.payload": true,
"transforms.outbox.table.fields.additional.placement": "type:header,aggregate_type:header:dataSchemaName",
"transforms.outbox.table.fields.additional.placement": "type:header,aggregateType:header:dataSchemaName",
"transforms.outbox.route.by.field": "topic",
"transforms.outbox.route.topic.replacement": "library.${routedByValue}",
"predicates": "isHeartbeat",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package com.romankudryashov.eventdrivenarchitecture.userservice.persistence

import com.romankudryashov.eventdrivenarchitecture.commonmodel.OutboxMessage
import com.romankudryashov.eventdrivenarchitecture.userservice.persistence.entity.InboxMessageEntity
import com.romankudryashov.eventdrivenarchitecture.userservice.persistence.entity.OutboxMessageEntity
import com.romankudryashov.eventdrivenarchitecture.userservice.persistence.entity.UserEntity
import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.data.domain.Pageable
import org.springframework.data.jpa.repository.JpaRepository
import org.springframework.data.jpa.repository.Lock
import org.springframework.stereotype.Repository
import java.util.UUID
import jakarta.persistence.EntityManager
import jakarta.persistence.LockModeType

interface UserRepository : JpaRepository<UserEntity, Long> {
Expand All @@ -21,4 +24,15 @@ interface InboxMessageRepository : JpaRepository<InboxMessageEntity, UUID> {
fun findAllByStatusAndProcessedByOrderByCreatedAtAsc(status: InboxMessageEntity.Status, processedBy: String, pageable: Pageable): List<InboxMessageEntity>
}

interface OutboxMessageRepository : JpaRepository<OutboxMessageEntity, UUID>
@Repository
class OutboxMessageRepository(
private val entityManager: EntityManager,
private val objectMapper: ObjectMapper
) {
fun <T> persistOutboxMessageToWalInsideTransaction(outboxMessage: OutboxMessage<T>) {
val outboxMessageJson = objectMapper.writeValueAsString(outboxMessage)
entityManager.createQuery("SELECT pg_logical_emit_message(true, 'outbox', :outboxMessage)")
.setParameter("outboxMessage", outboxMessageJson)
.singleResult
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package com.romankudryashov.eventdrivenarchitecture.userservice.persistence.entity

import com.romankudryashov.eventdrivenarchitecture.commonmodel.AggregateType
import com.romankudryashov.eventdrivenarchitecture.commonmodel.EventType
import com.fasterxml.jackson.databind.JsonNode
import org.hibernate.annotations.ColumnDefault
import org.hibernate.annotations.Generated
import org.hibernate.annotations.JdbcTypeCode
import org.hibernate.type.SqlTypes
import java.util.UUID
Expand Down Expand Up @@ -62,20 +59,3 @@ class InboxMessageEntity(
Error
}
}

@Entity
@Table(name = "outbox")
class OutboxMessageEntity(
@Id
@Generated
@ColumnDefault("gen_random_uuid()")
val id: UUID? = null,
@Enumerated(value = EnumType.STRING)
val aggregateType: AggregateType,
val aggregateId: Long?,
@Enumerated(value = EnumType.STRING)
val type: EventType,
val topic: String,
@JdbcTypeCode(SqlTypes.JSON)
val payload: JsonNode
) : AbstractEntity()
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import com.romankudryashov.eventdrivenarchitecture.commonmodel.EventType
import com.romankudryashov.eventdrivenarchitecture.commonmodel.EventType.RollbackBookLentCommand
import com.romankudryashov.eventdrivenarchitecture.commonmodel.EventType.SendNotificationCommand
import com.romankudryashov.eventdrivenarchitecture.commonmodel.Notification
import com.romankudryashov.eventdrivenarchitecture.commonmodel.OutboxMessage
import com.romankudryashov.eventdrivenarchitecture.userservice.exception.UserServiceException
import com.romankudryashov.eventdrivenarchitecture.userservice.persistence.OutboxMessageRepository
import com.romankudryashov.eventdrivenarchitecture.userservice.persistence.entity.OutboxMessageEntity
import com.romankudryashov.eventdrivenarchitecture.userservice.service.OutboxMessageService
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
Expand All @@ -33,17 +33,16 @@ class OutboxMessageServiceImpl(
override fun saveSendNotificationCommandMessage(payload: Notification, aggregateId: Long) =
save(createOutboxMessage(AggregateType.Notification, null, SendNotificationCommand, payload))

private fun <T> createOutboxMessage(aggregateType: AggregateType, aggregateId: Long?, type: EventType, payload: T) = OutboxMessageEntity(
private fun <T> createOutboxMessage(aggregateType: AggregateType, aggregateId: Long?, type: EventType, payload: T) = OutboxMessage(
aggregateType = aggregateType,
aggregateId = aggregateId,
type = type,
topic = outboxEventTypeToTopic[type] ?: throw UserServiceException("Can't determine topic for outbox event type `$type`"),
payload = objectMapper.convertValue(payload, JsonNode::class.java)
)

private fun save(outboxMessage: OutboxMessageEntity) {
private fun <T> save(outboxMessage: OutboxMessage<T>) {
log.debug("Start saving an outbox message: {}", outboxMessage)
outboxMessageRepository.save(outboxMessage)
outboxMessageRepository.deleteById(outboxMessage.id!!)
outboxMessageRepository.persistOutboxMessageToWalInsideTransaction(outboxMessage)
}
}
11 changes: 0 additions & 11 deletions user-service/src/main/resources/db/migration/V1_0_0__structure.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,3 @@ create table inbox_unprocessed(
created_at timestamptz not null default current_timestamp,
updated_at timestamptz not null default current_timestamp
);

create table outbox(
id uuid primary key default gen_random_uuid(),
aggregate_type varchar not null,
aggregate_id varchar,
type varchar not null,
topic varchar not null,
payload jsonb not null,
created_at timestamptz not null default current_timestamp,
updated_at timestamptz not null default current_timestamp
);

0 comments on commit a7389fb

Please sign in to comment.