Skip to content

Commit

Permalink
send now from the outbox
Browse files Browse the repository at this point in the history
  • Loading branch information
Marc Gorzala committed Dec 27, 2023
1 parent 599d1fa commit 4da38fa
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 8 deletions.
53 changes: 53 additions & 0 deletions src/main/java/net/dancier/dancer/messaging/SendMessagesJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package net.dancier.dancer.messaging;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import jakarta.transaction.Transactional;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.net.URI;
import java.util.Collection;

@RequiredArgsConstructor
@Component
public class SendMessagesJob {

private final static Logger log = LoggerFactory.getLogger(SendMessagesJob.class);

private final OutboxJpaRepository outboxJpaRepository;

private final KafkaTemplate kafkaTemplate;

private final ObjectMapper objectMapper;

@Transactional
@Scheduled(fixedRate = 2000)
public void sendMessages() throws JsonProcessingException {
Collection<OutboxJpaEntity> itemsToSend = outboxJpaRepository.lockAndList();
for (OutboxJpaEntity item: itemsToSend) {
log.info("Sending: {}", item);
send(item);
item.setStatus(OutboxJpaEntity.STATUS.DONE);
}
kafkaTemplate.flush();
}

private void send(OutboxJpaEntity item) throws JsonProcessingException {
CloudEvent cloudEvent = CloudEventBuilder.v1()
.withId(item.getId().toString())
.withSource(URI.create(item.getSource()))
.withType(item.getType())
.withData(objectMapper.writeValueAsBytes(item.getData()))
.build();

kafkaTemplate.send(item.getType(), item.getKey(), cloudEvent);

}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
package net.dancier.dancer.messaging;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TopicConfiguration {
@Bean
public NewTopic profileUpdated() {
public KafkaAdmin.NewTopics createTopics() {

return new NewTopic("profile-updated", 1, (short) 1);
}

Expand Down

0 comments on commit 4da38fa

Please sign in to comment.