Skip to content

Commit

Permalink
Updated versions, replaced Spring Cloud Streams by Spring Kafka, remo…
Browse files Browse the repository at this point in the history
…ved C7 from Kafka example (related to #73)
  • Loading branch information
berndruecker committed May 20, 2022
1 parent 71d1c67 commit 24c0470
Show file tree
Hide file tree
Showing 69 changed files with 573 additions and 2,704 deletions.
33 changes: 3 additions & 30 deletions kafka/java/checkout/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,45 +6,18 @@
<version>0.0.1-SNAPSHOT</version>
<properties>
<maven.compiler.release>8</maven.compiler.release>
<spring.boot.version>2.2.5.RELEASE</spring.boot.version>
<spring-cloud-stream.version>Horsham.RELEASE</spring-cloud-stream.version>
<spring.boot.version>2.6.7</spring.boot.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>${spring-cloud-stream.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>2.8.5</version>
</dependency>
</dependencies>
<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,50 @@
package io.flowing.retail.checkout.messages;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

@Component
@EnableBinding(Source.class)
public class MessageSender {

@Autowired
private MessageChannel output;

@Autowired
private ObjectMapper objectMapper;

public void send(Message<?> m) {
try {
// avoid too much magic and transform ourselves
String jsonMessage = objectMapper.writeValueAsString(m);
// wrap into a proper message for the transport (Kafka/Rabbit) and send it
output.send(
MessageBuilder.withPayload(jsonMessage).setHeader("type", m.getType()).build());
} catch (Exception e) {
throw new RuntimeException("Could not tranform and send message due to: "+ e.getMessage(), e);
}
}
}
package io.flowing.retail.checkout.messages;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
* Helper to send messages, currently nailed to Kafka, but could also send via AMQP (e.g. RabbitMQ) or
* any other transport easily
*/
@Component
public class MessageSender {

@Value( "${flowing-retail.topic-name:flowing-retail}")
public String topicName;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private ObjectMapper objectMapper;

@Bean
public NewTopic autoCreateTopicOnStartupIfNotExistant() {
return TopicBuilder.name(topicName).partitions(1).replicas(1).build();
}

public void send(Message<?> m) {
try {
// avoid too much magic and transform ourselves
String jsonMessage = objectMapper.writeValueAsString(m);

// wrap into a proper message for Kafka including a header
ProducerRecord<String, String> record = new ProducerRecord<String, String>("flowing-retail", jsonMessage);
record.headers().add("type", m.getType().getBytes());

// and send it
kafkaTemplate.send(record);
} catch (Exception e) {
throw new RuntimeException("Could not transform and send message: "+ e.getMessage(), e);
}
}
}
8 changes: 4 additions & 4 deletions kafka/java/checkout/src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
spring.cloud.stream.bindings.output.destination=flowing-retail
spring.cloud.stream.bindings.output.content-type=application/json

spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
spring.cloud.stream.kafka.binder.brokers=localhost:29092
# Kafka
flowing-retail.topic-name=flowing-retail
spring.kafka.bootstrap-servers=localhost:29092
spring.kafka.consumer.auto-offset-reset=earliest

server.port = 8091
40 changes: 9 additions & 31 deletions kafka/java/inventory/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,21 @@
<version>0.0.1-SNAPSHOT</version>
<properties>
<maven.compiler.release>8</maven.compiler.release>
<spring.boot.version>2.2.5.RELEASE</spring.boot.version>
<spring-cloud-stream.version>Horsham.RELEASE</spring-cloud-stream.version>
<spring.boot.version>2.6.7</spring.boot.version>
<spring.kafka.version>2.8.5</spring.kafka.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>${spring-cloud-stream.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<!-- required to add JSR310 time formats for Jackson to ObjectMapper -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring.kafka.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
package io.flowing.retail.inventory.messages;

import java.io.IOException;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.flowing.retail.inventory.application.InventoryService;
import io.flowing.retail.inventory.domain.PickOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
@EnableBinding(Sink.class)
public class MessageListener {
public class MessageListener {

@Autowired
private MessageSender messageSender;
Expand All @@ -30,24 +21,25 @@ public class MessageListener {
@Autowired
private ObjectMapper objectMapper;

@StreamListener(target = Sink.INPUT,
condition="(headers['type']?:'')=='FetchGoodsCommand'")
@Transactional
public void retrievePaymentCommandReceived(String messageJson) throws JsonParseException, JsonMappingException, IOException {
Message<FetchGoodsCommandPayload> message = objectMapper.readValue(messageJson, new TypeReference<Message<FetchGoodsCommandPayload>>(){});

FetchGoodsCommandPayload fetchGoodsCommand = message.getData();
String pickId = inventoryService.pickItems( //
fetchGoodsCommand.getItems(), fetchGoodsCommand.getReason(), fetchGoodsCommand.getRefId());

messageSender.send( //
new Message<GoodsFetchedEventPayload>( //
"GoodsFetchedEvent", //
message.getTraceid(), //
new GoodsFetchedEventPayload() //
.setRefId(fetchGoodsCommand.getRefId())
.setPickId(pickId))
.setCorrelationid(message.getCorrelationid()));
@KafkaListener(id = "inventory", topics = MessageSender.TOPIC_NAME)
public void messageReceived(String messagePayloadJson, @Header("type") String messageType) throws Exception{
if ("FetchGoodsCommand".equals(messageType)) {
Message<FetchGoodsCommandPayload> message = objectMapper.readValue(messagePayloadJson, new TypeReference<Message<FetchGoodsCommandPayload>>() {});

FetchGoodsCommandPayload fetchGoodsCommand = message.getData();
String pickId = inventoryService.pickItems( //
fetchGoodsCommand.getItems(), fetchGoodsCommand.getReason(), fetchGoodsCommand.getRefId());

messageSender.send( //
new Message<GoodsFetchedEventPayload>( //
"GoodsFetchedEvent", //
message.getTraceid(), //
new GoodsFetchedEventPayload() //
.setRefId(fetchGoodsCommand.getRefId())
.setPickId(pickId))
.setCorrelationid(message.getCorrelationid()));
}
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,48 @@
package io.flowing.retail.inventory.messages;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.ObjectMapper;

/**
* Helper to send messages, currently nailed to Kafka, but could also send via AMQP (e.g. RabbitMQ) or
* any other transport easily
*/
@Component
@EnableBinding(Source.class)
public class MessageSender {

public static final String TOPIC_NAME = "flowing-retail";

@Autowired
private MessageChannel output;

private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private ObjectMapper objectMapper;
private ObjectMapper objectMapper;

@Bean
public NewTopic autoCreateTopicOnStartupIfNotExistant() {
return TopicBuilder.name(TOPIC_NAME).partitions(1).replicas(1).build();
}

public void send(Message<?> m) {
try {
// avoid too much magic and transform ourselves
String jsonMessage = objectMapper.writeValueAsString(m);
// wrap into a proper message for the transport (Kafka/Rabbit) and send it
output.send(
MessageBuilder.withPayload(jsonMessage).setHeader("type", m.getType()).build());

// wrap into a proper message for Kafka including a header
ProducerRecord<String, String> record = new ProducerRecord<String, String>("flowing-retail", jsonMessage);
record.headers().add("type", m.getType().getBytes());

// and send it
kafkaTemplate.send(record);
} catch (Exception e) {
throw new RuntimeException("Could not tranform and send message due to: "+ e.getMessage(), e);
throw new RuntimeException("Could not transform and send message: "+ e.getMessage(), e);
}
}
}
11 changes: 4 additions & 7 deletions kafka/java/inventory/src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
spring.cloud.stream.bindings.output.destination=flowing-retail
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.bindings.input.destination=flowing-retail
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=inventory
spring.application.name=inventory

spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
spring.cloud.stream.kafka.binder.brokers=localhost:29092
# Kafka
spring.kafka.bootstrap-servers=localhost:29092
spring.kafka.consumer.auto-offset-reset=earliest
5 changes: 0 additions & 5 deletions kafka/java/order-camunda/Dockerfile

This file was deleted.

Loading

0 comments on commit 24c0470

Please sign in to comment.