Skip to content

Commit

Permalink
code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
bsideup committed Jul 28, 2019
1 parent b6f388b commit cfd6f5b
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import org.pf4j.PluginManager;
import org.pf4j.ServiceProviderExtensionFinder;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -12,6 +14,13 @@ class LiiklusExtensionFinder extends ServiceProviderExtensionFinder {
super(pluginManager);
}

@Override
public Map<String, Set<String>> readClasspathStorages() {
// The app does not provide any extensions,
// we can safely return an empty Map here to avoid an exception ('META-INF/services' not found)
return Collections.emptyMap();
}

@Override
public Set<String> findClassNames(String pluginId) {
var pluginClassLoader = pluginId != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,9 @@ public Flux<SubscribeReply> subscribe(Mono<SubscribeRequest> requestFlux) {
autoOffsetReset = Optional.empty();
}

var subscription = recordsStorage.subscribe(topic, groupId.getName(), autoOffsetReset);

var sessionId = UUID.randomUUID().toString();

var storedSubscription = new StoredSubscription(subscription, topic, groupId);
var storedSubscription = new StoredSubscription(topic, groupId);
subscriptions.put(sessionId, storedSubscription);

var sourcesByPartition = sources.computeIfAbsent(sessionId, __ -> new ConcurrentHashMap<>());
Expand All @@ -137,6 +135,8 @@ public Flux<SubscribeReply> subscribe(Mono<SubscribeRequest> requestFlux) {
.toFuture();
};

var subscription = recordsStorage.subscribe(topic, groupId.getName(), autoOffsetReset);

return Flux.from(subscription.getPublisher(offsetsProvider))
.flatMap(sources -> Flux.fromStream(sources).map(source -> {
var partition = source.getPartition();
Expand Down Expand Up @@ -300,8 +300,6 @@ private Mono<NavigableMap<Integer, Map<Integer, Long>>> getOffsetsByGroupName(St
@Value
private static class StoredSubscription {

Subscription subscription;

String topic;

GroupId groupId;
Expand Down

0 comments on commit cfd6f5b

Please sign in to comment.