diff --git a/pom.xml b/pom.xml index 3714c98..94ef531 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ cn.typesafe kafka-map - 1.0.0 + 1.1.0 kafka-map a simple kafka manager @@ -36,12 +36,6 @@ spring-boot-starter-test test - - compile - org.apache.kafka - kafka-clients - 2.7.0 - com.github.gwenn @@ -60,15 +54,27 @@ compile - org.springframework.security - spring-security-crypto - 5.4.6 + com.google.guava + guava + 30.1.1-jre compile com.github.ben-manes.caffeine caffeine - 3.0.2 + 3.0.3 + + + compile + org.springframework.security + spring-security-crypto + 5.5.1 + + + compile + org.apache.kafka + kafka-clients + 2.8.0 diff --git a/src/main/java/cn/typesafe/km/KafkaMapApplication.java b/src/main/java/cn/typesafe/km/KafkaMapApplication.java index b334aed..f93050c 100644 --- a/src/main/java/cn/typesafe/km/KafkaMapApplication.java +++ b/src/main/java/cn/typesafe/km/KafkaMapApplication.java @@ -1,5 +1,6 @@ package cn.typesafe.km; +import cn.typesafe.km.service.ClusterService; import cn.typesafe.km.service.UserService; import lombok.SneakyThrows; import org.springframework.boot.CommandLineRunner; @@ -31,9 +32,12 @@ public static void initDatabaseDir(){ @Resource private UserService userService; + @Resource + private ClusterService clusterService; @Override public void run(String... args) throws Exception { userService.initUser(); + clusterService.restore(); } } diff --git a/src/main/java/cn/typesafe/km/config/Constant.java b/src/main/java/cn/typesafe/km/config/Constant.java index 981b6a3..97e7eb8 100644 --- a/src/main/java/cn/typesafe/km/config/Constant.java +++ b/src/main/java/cn/typesafe/km/config/Constant.java @@ -6,4 +6,7 @@ */ public final class Constant { public static final String CONSUMER_GROUP_ID = "kafka-map"; + + public static final String DELAY_MESSAGE_ENABLED = "enabled"; + public static final String DELAY_MESSAGE_DISABLED = "disabled"; } diff --git a/src/main/java/cn/typesafe/km/controller/ClusterController.java b/src/main/java/cn/typesafe/km/controller/ClusterController.java index 4120678..5ddff9b 100644 --- a/src/main/java/cn/typesafe/km/controller/ClusterController.java +++ b/src/main/java/cn/typesafe/km/controller/ClusterController.java @@ -71,4 +71,14 @@ public void delete(@PathVariable String ids) { public void updateName(@PathVariable String clusterId, @RequestBody Cluster cluster) { clusterService.updateNameById(clusterId, cluster.getName()); } + + @PostMapping("/{clusterId}/enableDelayMessage") + public void enableDelayMessage(@PathVariable String clusterId) { + clusterService.enableDelayMessage(clusterId); + } + + @PostMapping("/{clusterId}/disableDelayMessage") + public void disableDelayMessage(@PathVariable String clusterId) { + clusterService.disableDelayMessage(clusterId); + } } diff --git a/src/main/java/cn/typesafe/km/delay/DelayMessage.java b/src/main/java/cn/typesafe/km/delay/DelayMessage.java new file mode 100644 index 0000000..92c827f --- /dev/null +++ b/src/main/java/cn/typesafe/km/delay/DelayMessage.java @@ -0,0 +1,15 @@ +package cn.typesafe.km.delay; + +import lombok.Data; + +@Data +public class DelayMessage { + // 消息级别,共18个 + private int level; + // 目标消息主题 + private String topic; + // 目标消息key + private String key; + // 目标消息value + private String value; +} diff --git a/src/main/java/cn/typesafe/km/delay/DelayMessageHelper.java b/src/main/java/cn/typesafe/km/delay/DelayMessageHelper.java new file mode 100644 index 0000000..6ff10c4 --- /dev/null +++ b/src/main/java/cn/typesafe/km/delay/DelayMessageHelper.java @@ -0,0 +1,74 @@ +package cn.typesafe.km.delay; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + + +public class DelayMessageHelper { + + private final Map levels = new LinkedHashMap<>(); + private final String servers; + private final String groupId; + private ExecutorService executorService; + private final List runners = new ArrayList<>(); + ; + private DelayMessageListener delayMessageListener; + + public DelayMessageHelper(String servers, String groupId) { + this.servers = servers; + this.groupId = groupId; + + levels.put("__delay-seconds-1", 1000L); + levels.put("__delay-seconds-5", 1000L * 5); + levels.put("__delay-seconds-10", 1000L * 10); + levels.put("__delay-seconds-30", 1000L * 30); + levels.put("__delay-minutes-1", 1000L * 60); + levels.put("__delay-minutes-2", 1000L * 60 * 2); + levels.put("__delay-minutes-3", 1000L * 60 * 3); + levels.put("__delay-minutes-4", 1000L * 60 * 4); + levels.put("__delay-minutes-5", 1000L * 60 * 5); + levels.put("__delay-minutes-6", 1000L * 60 * 6); + levels.put("__delay-minutes-7", 1000L * 60 * 7); + levels.put("__delay-minutes-8", 1000L * 60 * 8); + levels.put("__delay-minutes-9", 1000L * 60 * 9); + levels.put("__delay-minutes-10", 1000L * 60 * 10); + levels.put("__delay-minutes-20", 1000L * 60 * 20); + levels.put("__delay-minutes-30", 1000L * 60 * 30); + levels.put("__delay-hours-1", 1000L * 60 * 60); + levels.put("__delay-hours-2", 1000L * 60 * 60 * 2); + } + + public void start() { + this.executorService = Executors.newFixedThreadPool(levels.size() + 1, new ThreadFactoryBuilder().setNameFormat("level-%d").build()); + + for (Map.Entry entry : levels.entrySet()) { + String topic = entry.getKey(); + Long delayTime = entry.getValue(); + DelayMessageRunner delayMessageRunner = new DelayMessageRunner(servers, groupId, topic, delayTime); + this.executorService.execute(delayMessageRunner); + this.runners.add(delayMessageRunner); + } + this.delayMessageListener = new DelayMessageListener(servers, groupId, new ArrayList<>(this.levels.keySet())); + this.executorService.execute(this.delayMessageListener); + } + + public void stop() { + for (DelayMessageRunner runner : this.runners) { + runner.shutdown(); + } + this.runners.clear(); + + if (this.delayMessageListener != null) { + this.delayMessageListener.shutdown(); + } + if (this.executorService != null) { + this.executorService.shutdown(); + } + } +} diff --git a/src/main/java/cn/typesafe/km/delay/DelayMessageListener.java b/src/main/java/cn/typesafe/km/delay/DelayMessageListener.java new file mode 100644 index 0000000..e4e6e1c --- /dev/null +++ b/src/main/java/cn/typesafe/km/delay/DelayMessageListener.java @@ -0,0 +1,84 @@ +package cn.typesafe.km.delay; + +import cn.typesafe.km.util.Json; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + + +@Slf4j +public class DelayMessageListener implements Runnable { + + private final KafkaConsumer consumer; + private final KafkaProducer producer; + private volatile boolean running = true; + private final List levelTopics; + + public DelayMessageListener(String servers, String groupId, List levelTopics) { + this.levelTopics = levelTopics; + this.consumer = createConsumer(servers, groupId); + this.producer = createProducer(servers); + } + + private KafkaConsumer createConsumer(String servers, String groupId) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000"); + return new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer()); + } + + private KafkaProducer createProducer(String servers) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + return new KafkaProducer<>(props); + } + + @Override + public void run() { + consumer.subscribe(Collections.singletonList("delay-message")); + do { + ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(200)); + for (ConsumerRecord consumerRecord : consumerRecords) { + String value = consumerRecord.value(); + log.debug("pulled delay message: {}", value); + try { + DelayMessage delayMessage = Json.toJavaObject(value, DelayMessage.class); + if (delayMessage.getLevel() < 0 || delayMessage.getLevel() >= levelTopics.size()) { + ProducerRecord record = new ProducerRecord<>(delayMessage.getTopic(), delayMessage.getKey(), delayMessage.getValue()); + producer.send(record); + log.debug("send normal message to user topic: {}", delayMessage.getTopic()); + } else { + String internalDelayTopic = levelTopics.get(delayMessage.getLevel()); + ProducerRecord record = new ProducerRecord<>(internalDelayTopic, null, value); + producer.send(record); + log.debug("send delay message to internal topic: {}", internalDelayTopic); + } + } catch (Exception e) { + log.error("解析消息失败", e); + } + } + } while (running); + } + + public void shutdown() { + this.running = false; + } +} diff --git a/src/main/java/cn/typesafe/km/delay/DelayMessageRunner.java b/src/main/java/cn/typesafe/km/delay/DelayMessageRunner.java new file mode 100644 index 0000000..230da0a --- /dev/null +++ b/src/main/java/cn/typesafe/km/delay/DelayMessageRunner.java @@ -0,0 +1,136 @@ +package cn.typesafe.km.delay; + +import cn.typesafe.km.util.Json; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ExecutionException; + +@Slf4j +public class DelayMessageRunner implements Runnable { + + private final KafkaConsumer consumer; + private final KafkaProducer producer; + private final Object lock = new Object(); + + private final String topic; + private final Long delayTime; + private final Timer timer = new Timer(); + private volatile boolean running = true; + + public DelayMessageRunner(String servers, String groupId, String topic, Long delayTime) { + this.topic = topic; + this.delayTime = delayTime; + this.consumer = createConsumer(servers, groupId); + this.producer = createProducer(servers); + + consumer.subscribe(Collections.singletonList(topic)); + + timer.schedule(new TimerTask() { + @Override + public void run() { + synchronized (lock) { + consumer.resume(consumer.paused()); + lock.notify(); + } + } + }, 0, 100); + } + + private KafkaConsumer createConsumer(String servers, String groupId) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 需要处理早期未到期的数据 + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000"); + return new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer()); + } + + KafkaProducer createProducer(String servers) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + return new KafkaProducer<>(props); + } + + public void shutdown() { + this.running = false; + this.timer.cancel(); + this.consumer.close(); + this.producer.close(); + } + + @SneakyThrows + @Override + public void run() { + do { + synchronized (lock) { + ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(200)); + + if (consumerRecords.isEmpty()) { + lock.wait(); + continue; + } + + log.debug("pulled {} messages form {}.", consumerRecords.count(), topic); + boolean timed = false; + for (ConsumerRecord consumerRecord : consumerRecords) { + long timestamp = consumerRecord.timestamp(); + TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition()); + if (timestamp + delayTime < System.currentTimeMillis()) { + String value = consumerRecord.value(); + DelayMessage delayMessage; + try { + delayMessage = Json.toJavaObject(value, DelayMessage.class); + } catch (Exception e) { + log.warn("Failed to parse json", e); + continue; + } + String appTopic = delayMessage.getTopic(); + String appKey = delayMessage.getKey(); + String appValue = delayMessage.getValue(); + + // send to application topic + ProducerRecord producerRecord = new ProducerRecord<>(appTopic, appKey, appValue); + try { + RecordMetadata recordMetadata = producer.send(producerRecord).get(); + log.debug("send normal message to user topic={}, key={}, value={}, offset={}", appTopic, appKey, appValue, recordMetadata.offset()); + // success. commit message + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1); + HashMap metadataHashMap = new HashMap<>(); + metadataHashMap.put(topicPartition, offsetAndMetadata); + consumer.commitSync(metadataHashMap); + } catch (ExecutionException e) { + consumer.pause(Collections.singletonList(topicPartition)); + consumer.seek(topicPartition, consumerRecord.offset()); + timed = true; + break; + } + } else { + consumer.pause(Collections.singletonList(topicPartition)); + consumer.seek(topicPartition, consumerRecord.offset()); + timed = true; + break; + } + } + + if (timed) { + lock.wait(); + } + } + } while (running); + } +} diff --git a/src/main/java/cn/typesafe/km/entity/Cluster.java b/src/main/java/cn/typesafe/km/entity/Cluster.java index 0e41cdc..323e949 100644 --- a/src/main/java/cn/typesafe/km/entity/Cluster.java +++ b/src/main/java/cn/typesafe/km/entity/Cluster.java @@ -22,6 +22,8 @@ public class Cluster { private String name; @Column(length = 500) private String servers; + @Column(length = 20) + private String delayMessageStatus; private String controller; @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date created; diff --git a/src/main/java/cn/typesafe/km/service/ClusterService.java b/src/main/java/cn/typesafe/km/service/ClusterService.java index a019ef1..3786ed4 100644 --- a/src/main/java/cn/typesafe/km/service/ClusterService.java +++ b/src/main/java/cn/typesafe/km/service/ClusterService.java @@ -1,10 +1,12 @@ package cn.typesafe.km.service; import cn.typesafe.km.config.Constant; +import cn.typesafe.km.delay.DelayMessageHelper; import cn.typesafe.km.entity.Cluster; import cn.typesafe.km.repository.ClusterRepository; import cn.typesafe.km.util.ID; import cn.typesafe.km.util.Networks; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -25,6 +27,7 @@ * @author dushixiang * @date 2021/3/27 11:16 上午 */ +@Slf4j @Service public class ClusterService { @@ -39,6 +42,8 @@ public class ClusterService { @Resource private ConsumerGroupService consumerGroupService; + private final ConcurrentHashMap store = new ConcurrentHashMap<>(); + public Cluster findById(String id) { return clusterRepository.findById(id).orElseThrow(() -> new NoSuchElementException("cluster 「" + id + "」does not exist")); } @@ -53,21 +58,21 @@ private AdminClient createAdminClient(String servers) { public KafkaConsumer createConsumer(String clusterId) { Cluster cluster = findById(clusterId); - return createConsumer(cluster.getServers(), Constant.CONSUMER_GROUP_ID); + return createConsumer(cluster.getServers(), Constant.CONSUMER_GROUP_ID, "earliest"); } - public KafkaConsumer createConsumer(String servers, String groupId) { + public KafkaConsumer createConsumer(String servers, String groupId, String autoOffsetResetConfig) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); - properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); return new KafkaConsumer<>(properties, new StringDeserializer(), new StringDeserializer()); } - public KafkaProducer createProducer(String servers){ + public KafkaProducer createProducer(String servers) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); return new KafkaProducer<>(properties, new StringSerializer(), new StringSerializer()); @@ -114,7 +119,6 @@ public void create(Cluster cluster) throws ExecutionException, InterruptedExcept } AdminClient adminClient = getAdminClient(uuid, cluster.getServers()); - String clusterId = adminClient.describeCluster().clusterId().get(); String controller = adminClient.describeCluster().controller().get().host(); cluster.setId(uuid); @@ -147,4 +151,48 @@ public Cluster detail(String clusterId) throws ExecutionException, InterruptedEx cluster.setConsumerCount(consumerGroupService.countConsumerGroup(cluster.getId())); return cluster; } + + @Transactional + public void enableDelayMessage(String id) { + Cluster cluster = findById(id); + if (Constant.DELAY_MESSAGE_ENABLED.equals(cluster.getDelayMessageStatus())) { + return; + } + DelayMessageHelper delayMessageHelper = store.getOrDefault(id, new DelayMessageHelper(cluster.getServers(), Constant.CONSUMER_GROUP_ID)); + store.put(id, delayMessageHelper); + delayMessageHelper.start(); + cluster.setDelayMessageStatus(Constant.DELAY_MESSAGE_ENABLED); + clusterRepository.saveAndFlush(cluster); + } + + @Transactional + public void disableDelayMessage(String id) { + Cluster cluster = findById(id); + if (Constant.DELAY_MESSAGE_DISABLED.equals(cluster.getDelayMessageStatus())) { + return; + } + DelayMessageHelper delayMessageHelper = store.getOrDefault(id, new DelayMessageHelper(cluster.getServers(), Constant.CONSUMER_GROUP_ID)); + store.put(id, delayMessageHelper); + delayMessageHelper.stop(); + cluster.setDelayMessageStatus(Constant.DELAY_MESSAGE_DISABLED); + clusterRepository.saveAndFlush(cluster); + } + + public void restore() { + List clusters = clusterRepository.findAll(); + for (Cluster cluster : clusters) { + if (Constant.DELAY_MESSAGE_ENABLED.equals(cluster.getDelayMessageStatus())) { + try { + String clusterId = cluster.getId(); + DelayMessageHelper delayMessageHelper = store.getOrDefault(clusterId, new DelayMessageHelper(cluster.getServers(), Constant.CONSUMER_GROUP_ID)); + store.put(clusterId, delayMessageHelper); + delayMessageHelper.start(); + } catch (Exception e) { + log.error("恢复延迟消息失败,集群名称: {},集群地址: {}", cluster.getName(), cluster.getServers(), e); + this.disableDelayMessage(cluster.getId()); + } + } + + } + } } diff --git a/src/main/java/cn/typesafe/km/service/ConsumerGroupService.java b/src/main/java/cn/typesafe/km/service/ConsumerGroupService.java index 8fdd5b9..252cecc 100644 --- a/src/main/java/cn/typesafe/km/service/ConsumerGroupService.java +++ b/src/main/java/cn/typesafe/km/service/ConsumerGroupService.java @@ -115,7 +115,7 @@ private List getTopicOffsets(String topicName, String groupId, Admi public void resetOffset(String topic, String groupId, String clusterId, ResetOffset resetOffset) { Cluster cluster = clusterService.findById(clusterId); - try (KafkaConsumer kafkaConsumer = clusterService.createConsumer(cluster.getServers(), groupId)) { + try (KafkaConsumer kafkaConsumer = clusterService.createConsumer(cluster.getServers(), groupId, "earliest")) { TopicPartition topicPartition = new TopicPartition(topic, resetOffset.getPartition()); List topicPartitions = Collections.singletonList(topicPartition); @@ -213,8 +213,10 @@ public List describe(String clusterId, String groupId) th OffsetAndMetadata offsetAndMetadata = topicPartitionOffsetAndMetadataMap.get(topicPartition); Long beginningOffset = beginningOffsets.get(topicPartition); Long endOffset = endOffsets.get(topicPartition); - long offset = offsetAndMetadata.offset(); - + Long offset = null; + if (offsetAndMetadata != null) { + offset = offsetAndMetadata.offset(); + } ConsumerGroupDescribe consumerGroupDescribe = new ConsumerGroupDescribe(); consumerGroupDescribe.setGroupId(groupId); consumerGroupDescribe.setTopic(topic); @@ -222,7 +224,11 @@ public List describe(String clusterId, String groupId) th consumerGroupDescribe.setCurrentOffset(offset); consumerGroupDescribe.setLogBeginningOffset(beginningOffset); consumerGroupDescribe.setLogEndOffset(endOffset); - consumerGroupDescribe.setLag(endOffset - offset); + if (endOffset != null && offset != null) { + consumerGroupDescribe.setLag(endOffset - offset); + } else { + consumerGroupDescribe.setLag(null); + } consumerGroupDescribe.setConsumerId(consumerId); consumerGroupDescribe.setHost(host); consumerGroupDescribe.setClientId(clientId); diff --git a/src/main/java/cn/typesafe/km/service/dto/ConsumerGroupDescribe.java b/src/main/java/cn/typesafe/km/service/dto/ConsumerGroupDescribe.java index 512a53d..0c34a41 100644 --- a/src/main/java/cn/typesafe/km/service/dto/ConsumerGroupDescribe.java +++ b/src/main/java/cn/typesafe/km/service/dto/ConsumerGroupDescribe.java @@ -11,10 +11,10 @@ public class ConsumerGroupDescribe { private String groupId; private String topic; private int partition; - private long currentOffset; - private long logBeginningOffset; - private long logEndOffset; - private long lag; + private Long currentOffset; + private Long logBeginningOffset; + private Long logEndOffset; + private Long lag; private String consumerId; private String host; private String clientId; diff --git a/src/main/java/cn/typesafe/km/util/Json.java b/src/main/java/cn/typesafe/km/util/Json.java new file mode 100644 index 0000000..552e5aa --- /dev/null +++ b/src/main/java/cn/typesafe/km/util/Json.java @@ -0,0 +1,31 @@ +package cn.typesafe.km.util; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.SneakyThrows; + +import java.util.List; + +public class Json { + + @SneakyThrows + public static String toJsonString(Object o) { + ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper.writeValueAsString(o); + } + + @SneakyThrows + public static T toJavaObject(String json, Class klass) { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return objectMapper.readValue(json, klass); + } + + @SneakyThrows + public static List toJavaArray(String json) { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return objectMapper.readValue(json, new TypeReference<>() {}); + } +} diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml index ea7fee9..b1b0885 100644 --- a/src/main/resources/logback-spring.xml +++ b/src/main/resources/logback-spring.xml @@ -78,6 +78,9 @@ + + + diff --git a/web/package.json b/web/package.json index ad2f10d..61eff3e 100644 --- a/web/package.json +++ b/web/package.json @@ -1,6 +1,6 @@ { "name": "kafka-map", - "version": "1.0.0", + "version": "1.1.0", "private": true, "dependencies": { "@testing-library/jest-dom": "^5.11.4", diff --git a/web/src/components/Cluster.js b/web/src/components/Cluster.js index ee905e1..72e4614 100644 --- a/web/src/components/Cluster.js +++ b/web/src/components/Cluster.js @@ -1,5 +1,19 @@ import React, {Component} from 'react'; -import {Button, message, Table, Tooltip, Modal, Row, Col, Typography, Space, Input, Divider, Popconfirm} from "antd"; +import { + Button, + message, + Table, + Tooltip, + Modal, + Row, + Col, + Typography, + Space, + Input, + Divider, + Popconfirm, + Switch, Popover +} from "antd"; import dayjs from "dayjs"; import request from "../common/request"; import qs from "qs"; @@ -18,6 +32,21 @@ const confirm = Modal.confirm; const {Search} = Input; const {Title, Text} = Typography; +const content = ( +
+

基于kafka的延迟消息服务,支持18个级别,1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

+

向主题:delay-message 投递以下格式的消息即可,level 范围:0-17 。

+
+            {JSON.stringify({
+                "level": 0,
+                "topic": "target",
+                "key": "key",
+                "value": "value"
+            }, null,4)}
+        
+
+); + class Cluster extends Component { inputRefOfName = React.createRef(); @@ -192,7 +221,7 @@ class Cluster extends Component { }) await this.loadTableData(this.state.queryParams); } else { - message.error(+ result.message, 10); + message.error(+result.message, 10); } } finally { this.setState({ @@ -204,14 +233,14 @@ class Cluster extends Component { render() { const columns = [{ - title: , + title: , dataIndex: 'id', key: 'id', render: (id, record, index) => { return index + 1; } }, { - title: , + title: , dataIndex: 'name', key: 'name', render: (name, record) => { @@ -227,11 +256,30 @@ class Cluster extends Component { }, sorter: true, }, { - title: , + title: , dataIndex: 'servers', key: 'servers', }, { - title: , + title: , + dataIndex: 'delayMessageStatus', + key: 'delayMessageStatus', + render: (delayMessageStatus, record, index) => { + return + { + let url = `/clusters/${record['id']}/disableDelayMessage`; + if (checked) { + url = `/clusters/${record['id']}/enableDelayMessage`; + } + await request.post(url); + this.loadTableData() + } + }/> + + + } + }, { + title: , dataIndex: 'topicCount', key: 'topicCount', render: (topicCount, record, index) => { @@ -241,7 +289,7 @@ class Cluster extends Component { } }, { - title: , + title: , dataIndex: 'brokerCount', key: 'brokerCount', render: (brokerCount, record, index) => { @@ -250,7 +298,7 @@ class Cluster extends Component { } }, { - title: , + title: , dataIndex: 'consumerCount', key: 'consumerCount', render: (consumerCount, record, index) => { @@ -259,7 +307,7 @@ class Cluster extends Component { } }, { - title: , + title: , dataIndex: 'created', key: 'created', render: (text, record) => { @@ -272,19 +320,19 @@ class Cluster extends Component { sorter: true, }, { - title: , + title: , key: 'action', render: (text, record, index) => { return (
+ this.showModal(, record) + }}> } + title={} onConfirm={() => this.delete(record['id'])} > - +
) @@ -307,7 +355,7 @@ class Cluster extends Component { - <FormattedMessage id="cluster" /> + <FormattedMessage id="cluster"/> @@ -318,7 +366,7 @@ class Cluster extends Component { onSearch={this.handleSearchByName} /> - }> + }> - }> + }> - }> + }> , - + @@ -100,20 +128,26 @@ class TopicInfo extends Component { } key="partition"> } + topic={this.state.topic} + onRef={this.onTopicPartitionRef} + > + } key="broker"> - + topic={this.state.topic} + onRef={this.onTopicBrokerRef} + > } key="consumer-group"> + topic={this.state.topic} + onRef={this.onTopicConsumerGroupRef} + > diff --git a/web/src/components/TopicPartition.js b/web/src/components/TopicPartition.js index e4ea9e8..4ad1624 100644 --- a/web/src/components/TopicPartition.js +++ b/web/src/components/TopicPartition.js @@ -9,11 +9,18 @@ class TopicPartition extends Component { state = { loading: false, items: [], + clusterId: undefined, + topic: undefined } componentDidMount() { let clusterId = this.props.clusterId; let topic = this.props.topic; + this.setState({ + clusterId: clusterId, + topic: topic + }) + this.props.onRef(this); this.loadItems(clusterId, topic); } @@ -28,6 +35,12 @@ class TopicPartition extends Component { }) } + refresh() { + if (this.state.clusterId && this.state.topic) { + this.loadItems(this.state.clusterId, this.state.topic) + } + } + render() { const columns = [{ diff --git a/web/src/locales/en_US.js b/web/src/locales/en_US.js index f991751..83f9778 100644 --- a/web/src/locales/en_US.js +++ b/web/src/locales/en_US.js @@ -52,6 +52,7 @@ const en_US = { 'produce-message': 'Produce Message', 'pull': 'Pull', 'newest': 'Newest', + 'delay-message': 'Delay Message', } export default en_US; \ No newline at end of file diff --git a/web/src/locales/zh_CN.js b/web/src/locales/zh_CN.js index 9f1bb40..07ee69a 100644 --- a/web/src/locales/zh_CN.js +++ b/web/src/locales/zh_CN.js @@ -53,6 +53,7 @@ const zh_CN = { 'produce-message': '导入数据', 'pull': '拉取', 'newest': '最新', + 'delay-message': '延迟消息', } export default zh_CN; \ No newline at end of file