Skip to content
This repository has been archived by the owner on Dec 18, 2022. It is now read-only.

Commit

Permalink
Copy in code from Debezium Outbox Sample Shipment Service
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric Murphy committed Jul 23, 2020
1 parent f9770e2 commit 5f13c47
Showing 10 changed files with 497 additions and 0 deletions.
135 changes: 135 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.debezium.examples.outbox</groupId>
<artifactId>outbox-shipment-service</artifactId>
<name>Debezium Outbox Demo - Shipment Service</name>
<version>1.0.0-SNAPSHOT</version>

<properties>
<version.quarkus>1.6.0.Final</version.quarkus>
<version.surefire>2.22.0</version.surefire>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-bom</artifactId>
<version>${version.quarkus}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<finalName>shipment</finalName>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${version.quarkus}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>${version.surefire}</version>
<configuration>
<systemProperties>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
</systemProperties>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-hibernate-orm</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jdbc-postgresql</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
<profile>
<id>native</id>
<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${version.quarkus}</version>
<executions>
<execution>
<goals>
<goal>native-image</goal>
</goals>
<configuration>
<enableHttpUrlHandler>true</enableHttpUrlHandler>
<autoServiceLoaderRegistration>false</autoServiceLoaderRegistration>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${version.surefire}</version>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
<configuration>
<systemProperties>
<native.image.path>${project.build.directory}/${project.build.finalName}-runner</native.image.path>
</systemProperties>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>

</project>
9 changes: 9 additions & 0 deletions src/main/docker/Dockerfile.jvm
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
####
# This Dockerfile is used to build a container that runs shipment-service-quarkus in JVM mode
#
###
FROM fabric8/java-jboss-openjdk8-jdk
ENV JAVA_OPTIONS=-Dquarkus.http.host=0.0.0.0
COPY target/lib/* /deployments/lib/
COPY target/*-runner.jar /deployments/app.jar
ENTRYPOINT [ "/deployments/run-java.sh" ]
8 changes: 8 additions & 0 deletions src/main/docker/Dockerfile.native
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
####
# This Dockerfile is used in order to build a container that runs shipment-service-quarkus in native (non-JVM) mode
###
FROM registry.fedoraproject.org/fedora-minimal
WORKDIR /work/
COPY target/*-runner /work/application
RUN chmod 755 /work
cmd [ "./application", "-Dquarkus.http.host=0.0.0.0" ]
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.examples.outbox.shipment.facade;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.apache.kafka.common.header.Header;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.smallrye.reactive.messaging.kafka.KafkaRecord;

@ApplicationScoped
public class KafkaEventConsumer {

private static final Logger LOG = LoggerFactory.getLogger(KafkaEventConsumer.class);

@Inject
OrderEventHandler orderEventHandler;

@Incoming("orders")
public CompletionStage<Void> onMessage(KafkaRecord<String, String> message) throws IOException {
return CompletableFuture.runAsync(() -> {
LOG.debug("Kafka message with key = {} arrived", message.getKey());

String eventId = getHeaderAsString(message, "id");
String eventType = getHeaderAsString(message, "eventType");

orderEventHandler.onOrderEvent(
UUID.fromString(eventId),
eventType,
message.getKey(),
message.getPayload(),
message.getTimestamp()
);
});
}

private String getHeaderAsString(KafkaRecord<?, ?> record, String name) {
Header header = record.getHeaders().lastHeader(name);
if (header == null) {
throw new IllegalArgumentException("Expected record header '" + name + "' not present");
}

return new String(header.value(), Charset.forName("UTF-8"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.examples.outbox.shipment.facade;

import java.io.IOException;
import java.time.Instant;
import java.util.UUID;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.transaction.Transactional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.debezium.examples.outbox.shipment.messagelog.MessageLog;
import io.debezium.examples.outbox.shipment.service.ShipmentService;

@ApplicationScoped
public class OrderEventHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(OrderEventHandler.class);

@Inject
MessageLog log;

@Inject
ShipmentService shipmentService;

private final ObjectMapper objectMapper = new ObjectMapper();

@Transactional
public void onOrderEvent(UUID eventId, String eventType, String key, String event, Instant ts) {
if (log.alreadyProcessed(eventId)) {
LOGGER.info("Event with UUID {} was already retrieved, ignoring it", eventId);
return;
}

JsonNode eventPayload = deserialize(event);

LOGGER.info("Received 'Order' event -- key: {}, event id: '{}', event type: '{}', ts: '{}'", key, eventId, eventType, ts);

if (eventType.equals("OrderCreated")) {
shipmentService.orderCreated(eventPayload);
}
else if (eventType.equals("OrderLineUpdated")) {
shipmentService.orderLineUpdated(eventPayload);
}
else {
LOGGER.warn("Unkown event type");
}

log.processed(eventId);
}

private JsonNode deserialize(String event) {
JsonNode eventPayload;

try {
String unescaped = objectMapper.readValue(event, String.class);
eventPayload = objectMapper.readTree(unescaped);
}
catch (IOException e) {
throw new RuntimeException("Couldn't deserialize event", e);
}

return eventPayload.has("schema") ? eventPayload.get("payload") : eventPayload;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.examples.outbox.shipment.messagelog;

import java.time.Instant;
import java.util.UUID;

import javax.persistence.Entity;
import javax.persistence.Id;

@Entity
public class ConsumedMessage {

@Id
private UUID eventId;

private Instant timeOfReceiving;

ConsumedMessage() {
}

public ConsumedMessage(UUID eventId, Instant timeOfReceiving) {
this.eventId = eventId;
this.timeOfReceiving = timeOfReceiving;
}

public UUID getEventId() {
return eventId;
}

public void setEventId(UUID eventId) {
this.eventId = eventId;
}

public Instant getTimeOfReceiving() {
return timeOfReceiving;
}

public void setTimeOfReceiving(Instant timeOfReceiving) {
this.timeOfReceiving = timeOfReceiving;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.examples.outbox.shipment.messagelog;

import java.time.Instant;
import java.util.UUID;

import javax.enterprise.context.ApplicationScoped;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.transaction.Transactional;
import javax.transaction.Transactional.TxType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class MessageLog {
private static final Logger LOG = LoggerFactory.getLogger(MessageLog.class);

@PersistenceContext
EntityManager entityManager;

@Transactional(value=TxType.MANDATORY)
public void processed(UUID eventId) {
entityManager.persist(new ConsumedMessage(eventId, Instant.now()));
}

@Transactional(value=TxType.MANDATORY)
public boolean alreadyProcessed(UUID eventId) {
LOG.debug("Looking for event with id {} in message log", eventId);
return entityManager.find(ConsumedMessage.class, eventId) != null;
}
}
Loading

0 comments on commit 5f13c47

Please sign in to comment.