Skip to content

Commit

Permalink
Day 27 task
Browse files Browse the repository at this point in the history
  • Loading branch information
pawelpluta committed Jan 28, 2023
1 parent 256bc71 commit c2c2a51
Show file tree
Hide file tree
Showing 20 changed files with 502 additions and 1 deletion.
3 changes: 2 additions & 1 deletion README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ Every task will focus on Java, popular libraries and frameworks. So, let's start
* [Day 23](/day023/README.MD)
* [Day 24](/day024/README.MD)
* [Day 25](/day025/README.MD)
* [Day 26](/day026/README.MD)
* [Day 26](/day026/README.MD)
* [Day 27](/day027/README.MD)
18 changes: 18 additions & 0 deletions day027/README.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Day 27 - saga as orchestrator

Saga pattern allows for dividing one big transaction across multiple smaller ones.
It can be useful in distributed systems to perform complex processes.

Saga can contain multiple steps, like a process manager.
In case of problems at any step of the process, the Saga knows how to revert all previously made changes.
Those "revers" are called compensating transactions.

In this task, we are simulating a system that performs two things:

1. When order is placed, it makes a reservation for products in warehouse. Reservation decreases available product count.
2. when products are reserved, it invokes the payment

If payment goes without problems, then the flow is finished.
But when payment will fail for some reason, we want to release the reservation.

Your task is to write compensating transaction in `WarehouseEventListener`. It should be created in `ReservationsListener`
29 changes: 29 additions & 0 deletions day027/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
plugins {
id 'org.springframework.boot' version '3.0.1'
id 'io.spring.dependency-management' version '1.1.0'
id 'java'
}

group 'com.pawelpluta'
version '0.0.1'

repositories {
mavenCentral()
}

dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testImplementation 'org.apache.commons:commons-lang3:3.12.0'
testImplementation 'org.testcontainers:testcontainers:1.17.6'
testImplementation 'org.testcontainers:kafka:1.17.6'
testImplementation 'org.awaitility:awaitility:4.2.0'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.9.1'
}

test {
useJUnitPlatform()
}
1 change: 1 addition & 0 deletions day027/settings.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
rootProject.name = 'day027'
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.pawelpluta.day027;

import org.springframework.stereotype.Repository;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

@Repository
class InMemoryPaymentRepository {

private final Map<String, String> paymentStatus = new HashMap<>();
public void save(Payment payment) {
paymentStatus.put(payment.orderId(), payment.status());
}

public Optional<Payment> findByOrderId(String orderId) {
return Optional.ofNullable(paymentStatus.get(orderId))
.map(status -> new Payment(orderId, status));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.pawelpluta.day027;

import org.springframework.stereotype.Repository;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

@Repository
class InMemoryProductRepository {

private final Map<String, Integer> productsQuantity = new HashMap<>();
public void save(Product product) {
productsQuantity.put(product.productId(), product.quantity());
}

public Optional<Product> findById(String productId) {
return Optional.ofNullable(productsQuantity.get(productId))
.map(quantity -> new Product(productId, quantity, List.of()));
}
}
111 changes: 111 additions & 0 deletions day027/src/main/java/com/pawelpluta/day027/KafkaConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package com.pawelpluta.day027;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
class KafkaConfig {
@Value("${kafka.orders.bootstrapServers}")
private String orderBootstrapServers;
@Value("${kafka.orders.groupId}")
private String ordersConsumerGroupId;
@Value("${kafka.orders.offset}")
private String ordersOffset;

@Value("${kafka.reservations.bootstrapServers}")
private String reservationsBootstrapServers;
@Value("${kafka.reservations.groupId}")
private String reservationsConsumerGroupId;
@Value("${kafka.reservations.offset}")
private String reservationsOffset;

@Bean
public Map<String, Object> ordersConsumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, orderBootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, ordersConsumerGroupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, ordersOffset);
return props;
}

@Bean
public ConsumerFactory<String, OrderPlacedEvent> ordersConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
ordersConsumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(OrderPlacedEvent.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent> orderKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderPlacedEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(ordersConsumerFactory());
return factory;
}

@Bean
public Map<String, Object> reservationsConsumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, reservationsBootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, reservationsConsumerGroupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, reservationsOffset);
return props;
}

@Bean
public ConsumerFactory<String, WarehouseProductReservedEvent> reservationsConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
ordersConsumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(WarehouseProductReservedEvent.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, WarehouseProductReservedEvent> reservationsKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, WarehouseProductReservedEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(reservationsConsumerFactory());
return factory;
}

@Bean
public Map<String, Object> reservationsProducerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, reservationsBootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}

@Bean
public ProducerFactory<String, WarehouseProductReservedEvent> reservationsProducerFactory() {
return new DefaultKafkaProducerFactory<>(reservationsProducerConfigs());
}

@Bean
public KafkaTemplate<String, WarehouseProductReservedEvent> reservationsKafkaTemplate() {
return new KafkaTemplate<>(reservationsProducerFactory());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.pawelpluta.day027;

record OrderPlacedEvent(String orderId, String productId, Integer quantity) {
}
4 changes: 4 additions & 0 deletions day027/src/main/java/com/pawelpluta/day027/Payment.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.pawelpluta.day027;

record Payment(String orderId, String status) {
}
13 changes: 13 additions & 0 deletions day027/src/main/java/com/pawelpluta/day027/Product.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.pawelpluta.day027;

import java.util.List;

record Product(String productId, Integer quantity, List<WarehouseProductReservedEvent> reservations) {
public Product reserve(OrderPlacedEvent event) {
return new Product(productId, quantity - event.quantity(), List.of(reservationEventFor(event)));
}

private WarehouseProductReservedEvent reservationEventFor(OrderPlacedEvent event) {
return new WarehouseProductReservedEvent(event.orderId(), productId, event.quantity());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.pawelpluta.day027;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

@Component
class ReservationsListener {

private final InMemoryPaymentRepository paymentRepository;
private final List<String> failingOrders;

ReservationsListener(InMemoryPaymentRepository paymentRepository) {
this.paymentRepository = paymentRepository;
failingOrders = new ArrayList<>();
}

@KafkaListener(topics = "${kafka.reservations.topic}", containerFactory = "reservationsKafkaListenerContainerFactory")
void reserveGoods(WarehouseProductReservedEvent event) {
if (failingOrders.contains(event.orderId())) {
// TODO invoke compensating transaction
} else {
paymentRepository.save(new Payment(event.orderId(), "PAID"));
}
}

/**
* method for test puproses - simulates some business checks so we can show negative flow
* @param orderId
*/
void failPaymentForOrder(String orderId) {
failingOrders.add(orderId);
}
}
12 changes: 12 additions & 0 deletions day027/src/main/java/com/pawelpluta/day027/SagaApp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.pawelpluta.day027;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
class SagaApp {

public static void main(String[] args) {
SpringApplication.run(SagaApp.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.pawelpluta.day027;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
class WarehouseEventListeners {

private final InMemoryProductRepository productRepository;
private final KafkaTemplate<String, WarehouseProductReservedEvent> reservationsKafkaTemplate;

WarehouseEventListeners(
InMemoryProductRepository productRepository,
KafkaTemplate<String, WarehouseProductReservedEvent> reservationsKafkaTemplate) {
this.productRepository = productRepository;
this.reservationsKafkaTemplate = reservationsKafkaTemplate;
}

@KafkaListener(topics = "${kafka.orders.topic}", containerFactory = "orderKafkaListenerContainerFactory")
void reserveGoods(OrderPlacedEvent event) {
productRepository.findById(event.productId()).ifPresent(product -> {
Product updatedProduct = product.reserve(event);
productRepository.save(updatedProduct);
sendEvents(updatedProduct);
});
}

private void sendEvents(Product updatedProduct) {
updatedProduct.reservations().stream().findFirst()
.ifPresent(reservationEvent -> reservationsKafkaTemplate.send("reservations-topic", reservationEvent));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.pawelpluta.day027;

record WarehouseProductReservedEvent(String orderId, String productId, Integer quantity) {
}
11 changes: 11 additions & 0 deletions day027/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
kafka:
orders:
bootstrapServers: 127.0.0.1:9092
topic: orders-topic
groupId: orders-integrationTest
offset: earliest
reservations:
bootstrapServers: 127.0.0.1:9092
topic: reservations-topic
groupId: reservations-integrationTest
offset: earliest
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.pawelpluta.day027;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
class KafkaProducerConfig {

@Value("${kafka.orders.bootstrapServers}")
private String ticketsBootstrapServers;

@Bean
public Map<String, Object> orderPlacedProducerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ticketsBootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}

@Bean
public ProducerFactory<String, OrderPlacedEvent> orderPlacedProducerFactory() {
return new DefaultKafkaProducerFactory<>(orderPlacedProducerConfigs());
}

@Bean
public KafkaTemplate<String, OrderPlacedEvent> orderPlacedKafkaTemplate() {
return new KafkaTemplate<>(orderPlacedProducerFactory());
}
}
Loading

0 comments on commit c2c2a51

Please sign in to comment.