Skip to content

Commit

Permalink
增加延迟消息功能
Browse files Browse the repository at this point in the history
  • Loading branch information
dushixiang committed Aug 14, 2021
1 parent d833503 commit 2dc5737
Show file tree
Hide file tree
Showing 22 changed files with 591 additions and 52 deletions.
28 changes: 17 additions & 11 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</parent>
<groupId>cn.typesafe</groupId>
<artifactId>kafka-map</artifactId>
<version>1.0.0</version>
<version>1.1.0</version>
<name>kafka-map</name>
<description>a simple kafka manager</description>
<properties>
Expand All @@ -36,12 +36,6 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<scope>compile</scope>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>

<dependency>
<groupId>com.github.gwenn</groupId>
Expand All @@ -60,15 +54,27 @@

<dependency>
<scope>compile</scope>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-crypto</artifactId>
<version>5.4.6</version>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1.1-jre</version>
</dependency>
<dependency>
<scope>compile</scope>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.0.2</version>
<version>3.0.3</version>
</dependency>
<dependency>
<scope>compile</scope>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-crypto</artifactId>
<version>5.5.1</version>
</dependency>
<dependency>
<scope>compile</scope>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>

Expand Down
4 changes: 4 additions & 0 deletions src/main/java/cn/typesafe/km/KafkaMapApplication.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package cn.typesafe.km;

import cn.typesafe.km.service.ClusterService;
import cn.typesafe.km.service.UserService;
import lombok.SneakyThrows;
import org.springframework.boot.CommandLineRunner;
Expand Down Expand Up @@ -31,9 +32,12 @@ public static void initDatabaseDir(){

@Resource
private UserService userService;
@Resource
private ClusterService clusterService;

@Override
public void run(String... args) throws Exception {
userService.initUser();
clusterService.restore();
}
}
3 changes: 3 additions & 0 deletions src/main/java/cn/typesafe/km/config/Constant.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@
*/
public final class Constant {
public static final String CONSUMER_GROUP_ID = "kafka-map";

public static final String DELAY_MESSAGE_ENABLED = "enabled";
public static final String DELAY_MESSAGE_DISABLED = "disabled";
}
10 changes: 10 additions & 0 deletions src/main/java/cn/typesafe/km/controller/ClusterController.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,14 @@ public void delete(@PathVariable String ids) {
public void updateName(@PathVariable String clusterId, @RequestBody Cluster cluster) {
clusterService.updateNameById(clusterId, cluster.getName());
}

@PostMapping("/{clusterId}/enableDelayMessage")
public void enableDelayMessage(@PathVariable String clusterId) {
clusterService.enableDelayMessage(clusterId);
}

@PostMapping("/{clusterId}/disableDelayMessage")
public void disableDelayMessage(@PathVariable String clusterId) {
clusterService.disableDelayMessage(clusterId);
}
}
15 changes: 15 additions & 0 deletions src/main/java/cn/typesafe/km/delay/DelayMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package cn.typesafe.km.delay;

import lombok.Data;

@Data
public class DelayMessage {
// 消息级别,共18个
private int level;
// 目标消息主题
private String topic;
// 目标消息key
private String key;
// 目标消息value
private String value;
}
74 changes: 74 additions & 0 deletions src/main/java/cn/typesafe/km/delay/DelayMessageHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package cn.typesafe.km.delay;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class DelayMessageHelper {

private final Map<String, Long> levels = new LinkedHashMap<>();
private final String servers;
private final String groupId;
private ExecutorService executorService;
private final List<DelayMessageRunner> runners = new ArrayList<>();
;
private DelayMessageListener delayMessageListener;

public DelayMessageHelper(String servers, String groupId) {
this.servers = servers;
this.groupId = groupId;

levels.put("__delay-seconds-1", 1000L);
levels.put("__delay-seconds-5", 1000L * 5);
levels.put("__delay-seconds-10", 1000L * 10);
levels.put("__delay-seconds-30", 1000L * 30);
levels.put("__delay-minutes-1", 1000L * 60);
levels.put("__delay-minutes-2", 1000L * 60 * 2);
levels.put("__delay-minutes-3", 1000L * 60 * 3);
levels.put("__delay-minutes-4", 1000L * 60 * 4);
levels.put("__delay-minutes-5", 1000L * 60 * 5);
levels.put("__delay-minutes-6", 1000L * 60 * 6);
levels.put("__delay-minutes-7", 1000L * 60 * 7);
levels.put("__delay-minutes-8", 1000L * 60 * 8);
levels.put("__delay-minutes-9", 1000L * 60 * 9);
levels.put("__delay-minutes-10", 1000L * 60 * 10);
levels.put("__delay-minutes-20", 1000L * 60 * 20);
levels.put("__delay-minutes-30", 1000L * 60 * 30);
levels.put("__delay-hours-1", 1000L * 60 * 60);
levels.put("__delay-hours-2", 1000L * 60 * 60 * 2);
}

public void start() {
this.executorService = Executors.newFixedThreadPool(levels.size() + 1, new ThreadFactoryBuilder().setNameFormat("level-%d").build());

for (Map.Entry<String, Long> entry : levels.entrySet()) {
String topic = entry.getKey();
Long delayTime = entry.getValue();
DelayMessageRunner delayMessageRunner = new DelayMessageRunner(servers, groupId, topic, delayTime);
this.executorService.execute(delayMessageRunner);
this.runners.add(delayMessageRunner);
}
this.delayMessageListener = new DelayMessageListener(servers, groupId, new ArrayList<>(this.levels.keySet()));
this.executorService.execute(this.delayMessageListener);
}

public void stop() {
for (DelayMessageRunner runner : this.runners) {
runner.shutdown();
}
this.runners.clear();

if (this.delayMessageListener != null) {
this.delayMessageListener.shutdown();
}
if (this.executorService != null) {
this.executorService.shutdown();
}
}
}
84 changes: 84 additions & 0 deletions src/main/java/cn/typesafe/km/delay/DelayMessageListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package cn.typesafe.km.delay;

import cn.typesafe.km.util.Json;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;


@Slf4j
public class DelayMessageListener implements Runnable {

private final KafkaConsumer<String, String> consumer;
private final KafkaProducer<String, String> producer;
private volatile boolean running = true;
private final List<String> levelTopics;

public DelayMessageListener(String servers, String groupId, List<String> levelTopics) {
this.levelTopics = levelTopics;
this.consumer = createConsumer(servers, groupId);
this.producer = createProducer(servers);
}

private KafkaConsumer<String, String> createConsumer(String servers, String groupId) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000");
return new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
}

private KafkaProducer<String, String> createProducer(String servers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}

@Override
public void run() {
consumer.subscribe(Collections.singletonList("delay-message"));
do {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(200));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
String value = consumerRecord.value();
log.debug("pulled delay message: {}", value);
try {
DelayMessage delayMessage = Json.toJavaObject(value, DelayMessage.class);
if (delayMessage.getLevel() < 0 || delayMessage.getLevel() >= levelTopics.size()) {
ProducerRecord<String, String> record = new ProducerRecord<>(delayMessage.getTopic(), delayMessage.getKey(), delayMessage.getValue());
producer.send(record);
log.debug("send normal message to user topic: {}", delayMessage.getTopic());
} else {
String internalDelayTopic = levelTopics.get(delayMessage.getLevel());
ProducerRecord<String, String> record = new ProducerRecord<>(internalDelayTopic, null, value);
producer.send(record);
log.debug("send delay message to internal topic: {}", internalDelayTopic);
}
} catch (Exception e) {
log.error("解析消息失败", e);
}
}
} while (running);
}

public void shutdown() {
this.running = false;
}
}
Loading

0 comments on commit 2dc5737

Please sign in to comment.