Skip to content

Commit

Permalink
修改jdk版本为1.8
Browse files Browse the repository at this point in the history
  • Loading branch information
dushixiang committed Apr 11, 2021
1 parent 54ecb1c commit 3b0819f
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 17 deletions.
10 changes: 1 addition & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<name>kafka-map</name>
<description>a simple kafka manager</description>
<properties>
<java.version>11</java.version>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
Expand Down Expand Up @@ -65,14 +65,6 @@
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public List<ConsumerGroup> consumerGroup(String clusterId, String filterGroupId)

public ConsumerGroupInfo info(String clusterId, String groupId) throws ExecutionException, InterruptedException {
AdminClient adminClient = clusterService.getAdminClient(clusterId);
ConsumerGroupDescription consumerGroupDescription = adminClient.describeConsumerGroups(List.of(groupId)).all().get().get(groupId);
ConsumerGroupDescription consumerGroupDescription = adminClient.describeConsumerGroups(Collections.singletonList(groupId)).all().get().get(groupId);
Set<String> topicNames = consumerGroupDescription
.members()
.stream()
Expand All @@ -185,6 +185,6 @@ public ConsumerGroupInfo info(String clusterId, String groupId) throws Execution

public void delete(String clusterId, String groupId) throws ExecutionException, InterruptedException {
AdminClient adminClient = clusterService.getAdminClient(clusterId);
adminClient.deleteConsumerGroups(List.of(groupId)).all().get();
adminClient.deleteConsumerGroups(Collections.singletonList(groupId)).all().get();
}
}
9 changes: 5 additions & 4 deletions src/main/java/cn/typesafe/km/service/MessageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import javax.annotation.Resource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -27,7 +28,7 @@ public List<ConsumerMessage> data(String clusterId, String topicName, Integer tP
try (KafkaConsumer<String, String> kafkaConsumer = clusterService.createConsumer(clusterId)) {

TopicPartition topicPartition = new TopicPartition(topicName, tPartition);
List<TopicPartition> topicPartitions = List.of(topicPartition);
List<TopicPartition> topicPartitions = Collections.singletonList(topicPartition);
kafkaConsumer.assign(topicPartitions);

Long beginningOffset = kafkaConsumer.beginningOffsets(topicPartitions).get(topicPartition);
Expand All @@ -39,11 +40,11 @@ public List<ConsumerMessage> data(String clusterId, String topicName, Integer tP
Long endOffset = kafkaConsumer.endOffsets(topicPartitions).get(topicPartition);
long currentOffset = startOffset - 1;

final var records = new ArrayList<ConsumerRecord<String, String>>(count);
List<ConsumerRecord<String, String>> records = new ArrayList<>(count);

var emptyPoll = 0;
int emptyPoll = 0;
while (records.size() < count && currentOffset < endOffset) {
var polled = kafkaConsumer.poll(Duration.ofMillis(200)).records(topicPartition);
List<ConsumerRecord<String, String>> polled = kafkaConsumer.poll(Duration.ofMillis(200)).records(topicPartition);

if (!CollectionUtils.isEmpty(polled)) {
records.addAll(polled);
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/cn/typesafe/km/service/TopicService.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public List<Topic> topics(String clusterId, Set<String> topicNames) throws Inter
public TopicInfo info(String clusterId, String topicName) throws ExecutionException, InterruptedException {
AdminClient adminClient = clusterService.getAdminClient(clusterId);
try (KafkaConsumer<String, String> kafkaConsumer = clusterService.createConsumer(clusterId)) {
TopicDescription topicDescription = adminClient.describeTopics(List.of(topicName)).all().get().get(topicName);
TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(topicName)).all().get().get(topicName);
TopicInfo topicInfo = new TopicInfo();
topicInfo.setClusterId(clusterId);
topicInfo.setName(topicName);
Expand Down Expand Up @@ -241,7 +241,9 @@ public void deleteTopic(String clusterId, List<String> topics) throws ExecutionE

public void createPartitions(String clusterId, String topic, int totalCount) throws ExecutionException, InterruptedException {
AdminClient adminClient = clusterService.getAdminClient(clusterId);
Map<String, NewPartitions> newPartitionsMap = Map.of(topic, NewPartitions.increaseTo(totalCount));
Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
newPartitionsMap.put(topic, NewPartitions.increaseTo(totalCount));

adminClient.createPartitions(newPartitionsMap).all().get();
}

Expand Down

0 comments on commit 3b0819f

Please sign in to comment.