Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: 优化Topic内存占用,减少不必要的内存消耗. #13

Merged
merged 1 commit into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 111 additions & 68 deletions src/main/java/org/jetlinks/core/topic/Topic.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package org.jetlinks.core.topic;

import com.google.common.collect.Maps;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.core.utils.RecyclableDequeue;
import org.jetlinks.core.utils.RecyclerUtils;
import org.jetlinks.core.utils.StringBuilderUtils;
import org.jetlinks.core.utils.TopicUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.ObjectUtils;
import reactor.core.publisher.Flux;
import reactor.function.Consumer4;
import reactor.function.Consumer5;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
Expand All @@ -27,41 +28,37 @@ public final class Topic<T> {
@Getter
private final Topic<T> parent;

@Setter(AccessLevel.PRIVATE)
private String part;

@Setter(AccessLevel.PRIVATE)
private volatile String topic;

@Setter(AccessLevel.PRIVATE)
private volatile String[] topics;

private final int depth;

private final ConcurrentMap<String, Topic<T>> child = Maps.newConcurrentMap();
private volatile ConcurrentMap<String, Topic<T>> child;

private final ConcurrentMap<T, AtomicInteger> subscribers = Maps.newConcurrentMap();
private volatile ConcurrentMap<T, AtomicInteger> subscribers;

public static <T> Topic<T> createRoot() {
return new Topic<>(null, "/");
}

public Topic<T> append(String topic) {
if (topic.equals("/") || topic.equals("")) {
if (topic.equals("/") || topic.isEmpty()) {
return this;
}
return getOrDefault(topic, Topic::new);
}

private Topic(Topic<T> parent, String part) {

if (StringUtils.isEmpty(part) || part.equals("/")) {
if (ObjectUtils.isEmpty(part) || part.equals("/")) {
this.part = "";
} else {
if (part.contains("/")) {
this.ofTopic(part);
} else {
this.part = part;
setPart(part);
}
}
this.parent = parent;
Expand All @@ -72,35 +69,37 @@ private Topic(Topic<T> parent, String part) {
}
}

private void setPart(String part) {
this.part = RecyclerUtils.intern(part);
}

public String[] getTopics() {
if (topics != null) {
return topics;
}
return topics = TopicUtils.split(getTopic());
return topics = TopicUtils.split(getTopic(), true);
}

public String getTopic() {
if (topic == null) {
Topic<T> parent = getParent();
StringBuilder builder = new StringBuilder();
if (parent != null) {
String parentTopic = parent.getTopic();
builder.append(parentTopic).append(parentTopic.equals("/") ? "" : "/");
} else {
builder.append("/");
}
return topic = builder.append(part).toString();
}
return topic;
return StringBuilderUtils
.buildString(getParent(), (_parent, builder) -> {
if (_parent != null) {
String parentTopic = _parent.getTopic();
builder.append(parentTopic).append(parentTopic.equals("/") ? "" : "/");
} else {
builder.append("/");
}
builder.append(part);
});
}

public T getSubscriberOrSubscribe(Supplier<T> supplier) {
if (subscribers.size() > 0) {
return subscribers.keySet().iterator().next();
if (!subscribers().isEmpty()) {
return subscribers().keySet().iterator().next();
}
synchronized (this) {
if (subscribers.size() > 0) {
return subscribers.keySet().iterator().next();
if (!subscribers().isEmpty()) {
return subscribers().keySet().iterator().next();
}
T sub = supplier.get();
subscribe(sub);
Expand All @@ -109,69 +108,105 @@ public T getSubscriberOrSubscribe(Supplier<T> supplier) {
}

public Set<T> getSubscribers() {
return subscribers.keySet();
return subscribers == null ? Collections.emptySet() : subscribers().keySet();
}

public boolean subscribed(T subscriber) {
return subscribers.containsKey(subscriber);
return subscribers().containsKey(subscriber);
}

@SafeVarargs
public final void subscribe(T... subscribers) {
for (T subscriber : subscribers) {
this.subscribers.computeIfAbsent(subscriber, i -> new AtomicInteger()).incrementAndGet();
this.subscribers()
.computeIfAbsent(subscriber, i -> new AtomicInteger())
.incrementAndGet();
}
}

@SafeVarargs
public final List<T> unsubscribe(T... subscribers) {
List<T> unsub = new ArrayList<>();
List<T> unsub = new ArrayList<>(subscribers.length);
for (T subscriber : subscribers) {
this.subscribers.computeIfPresent(subscriber, (k, v) -> {
if (v.decrementAndGet() <= 0) {
unsub.add(k);
return null;
}
return v;
});
this.subscribers()
.computeIfPresent(subscriber, (k, v) -> {
if (v.decrementAndGet() <= 0) {
unsub.add(k);
return null;
}
return v;
});
}
return unsub;
}

public final void unsubscribe(Predicate<T> predicate) {
for (Map.Entry<T, AtomicInteger> entry : this.subscribers.entrySet()) {
public void unsubscribe(Predicate<T> predicate) {
ConcurrentMap<T, AtomicInteger> subscribers = this.subscribers;
if (subscribers == null) {
return;
}

for (Map.Entry<T, AtomicInteger> entry : subscribers.entrySet()) {
if (predicate.test(entry.getKey()) && entry.getValue().decrementAndGet() <= 0) {
this.subscribers.remove(entry.getKey());
subscribers.remove(entry.getKey());
}
}
}

public final void unsubscribeAll() {
this.subscribers.clear();
public void unsubscribeAll() {
if (subscribers == null) {
return;
}
subscribers.clear();
}

public Collection<Topic<T>> getChildren() {
if (child == null) {
return Collections.emptyList();
}
return child.values();
}

private Map<String, Topic<T>> child() {
if (child == null) {
synchronized (this) {
if (child == null) {
child = new ConcurrentHashMap<>();
}
}
}
return child;
}

private ConcurrentMap<T, AtomicInteger> subscribers() {
if (subscribers == null) {
synchronized (this) {
if (subscribers == null) {
subscribers = new ConcurrentHashMap<>();
}
}
}
return subscribers;
}

private void ofTopic(String topic) {
String[] parts = topic.split("/", 2);
this.part = parts[0];
setPart(parts[0]);
if (parts.length > 1) {
Topic<T> part = new Topic<>(this, parts[1]);
this.child.put(part.part, part);
this.child().put(part.part, part);
}
}

private Topic<T> getOrDefault(String topic, BiFunction<Topic<T>, String, Topic<T>> mapping) {
if (topic.startsWith("/")) {
if (topic.charAt(0) == '/') {
topic = topic.substring(1);
}
String[] parts = topic.split("/");
Topic<T> part = child.computeIfAbsent(parts[0], _topic -> mapping.apply(this, _topic));
String[] parts = TopicUtils.split(topic, true, true);
Topic<T> part = child().computeIfAbsent(parts[0], _topic -> mapping.apply(this, _topic));
for (int i = 1; i < parts.length && part != null; i++) {
Topic<T> parent = part;
part = part.child.computeIfAbsent(parts[i], _topic -> mapping.apply(parent, _topic));
part = part.child().computeIfAbsent(parts[i], _topic -> mapping.apply(parent, _topic));
}
return part;
}
Expand All @@ -181,9 +216,7 @@ public Optional<Topic<T>> getTopic(String topic) {
}

public Flux<Topic<T>> findTopic(String topic) {
return Flux.create(sink -> {
findTopic(topic, sink::next, sink::complete);
});
return Flux.create(sink -> findTopic(topic, sink::next, sink::complete));
}

public void findTopic(String topic,
Expand All @@ -195,16 +228,16 @@ public void findTopic(String topic,
end,
sink,
(nil, nil2, _end, _sink, _topic) -> _sink.accept(_topic),
(nil ,nil2, _end, _sink) -> _end.run());
(nil, nil2, _end, _sink) -> _end.run());
}

public <ARG0, ARG1, ARG2, ARG3> void findTopic(String topic,
ARG0 arg0, ARG1 arg1, ARG2 arg2, ARG3 arg3,
Consumer5<ARG0, ARG1, ARG2, ARG3, Topic<T>> sink,
Consumer4<ARG0, ARG1, ARG2, ARG3> end) {
String[] topics = TopicUtils.split(topic, true);
String[] topics = TopicUtils.split(topic, true, false);

if (!topic.startsWith("/")) {
if (topic.charAt(0) != '/') {
String[] newTopics = new String[topics.length + 1];
newTopics[0] = "";
System.arraycopy(topics, 0, newTopics, 1, topics.length);
Expand All @@ -216,10 +249,12 @@ public <ARG0, ARG1, ARG2, ARG3> void findTopic(String topic,

@Override
public String toString() {
return "topic: " + getTopic() + ", subscribers: " + subscribers.size() + ", children: " + child.size();
return "topic: " + getTopic()
+ ", subscribers: " + (subscribers == null ? 0 : subscribers.size())
+ ", children: " + (child == null ? 0 : child.size());
}

protected boolean match(String[] pars) {
private boolean match(String[] pars) {
return TopicUtils.match(getTopics(), pars)
|| TopicUtils.match(pars, getTopics());
}
Expand All @@ -242,14 +277,20 @@ public static <T, ARG0, ARG1, ARG2, ARG3> void find(
if (part == null) {
break;
}

if (part.match(topicParts)) {
sink.accept(arg0, arg1, arg2, arg3, part);
}

Map<String, Topic<T>> child = part.child;
if (child == null) {
continue;
}
//订阅了如 /device/**/event/*
if (part.part.equals("**")) {
Topic<T> tmp = null;
for (int i = part.depth; i < topicParts.length; i++) {
tmp = part.child.get(topicParts[i]);
tmp = child.get(topicParts[i]);
if (tmp != null) {
cache.add(tmp);
}
Expand All @@ -259,14 +300,14 @@ public static <T, ARG0, ARG1, ARG2, ARG3> void find(
}
}
if ("**".equals(nextPart) || "*".equals(nextPart)) {
cache.addAll(part.child.values());
cache.addAll(child.values());
continue;
}
Topic<T> next = part.child.get("**");
Topic<T> next = child.get("**");
if (next != null) {
cache.add(next);
}
next = part.child.get("*");
next = child.get("*");
if (next != null) {
cache.add(next);
}
Expand All @@ -276,10 +317,10 @@ public static <T, ARG0, ARG1, ARG2, ARG3> void find(
}
nextPart = topicParts[part.depth + 1];
if (nextPart.equals("*") || nextPart.equals("**")) {
cache.addAll(part.child.values());
cache.addAll(child.values());
continue;
}
next = part.child.get(nextPart);
next = child.get(nextPart);
if (next != null) {
cache.add(next);
}
Expand All @@ -292,15 +333,15 @@ public static <T, ARG0, ARG1, ARG2, ARG3> void find(
}

public long getTotalTopic() {
long total = child.size();
long total = child == null ? 0 : child().size();
for (Topic<T> tTopic : getChildren()) {
total += tTopic.getTotalTopic();
}
return total;
}

public long getTotalSubscriber() {
long total = subscribers.size();
long total = subscribers == null ? 0 : subscribers().size();
for (Topic<T> tTopic : getChildren()) {
total += tTopic.getTotalTopic();
}
Expand All @@ -320,8 +361,10 @@ public Flux<Topic<T>> getAllSubscriber() {

public void clean() {
unsubscribeAll();
getChildren().forEach(Topic::clean);
child.clear();
if (child != null) {
child.values().forEach(Topic::clean);
child().clear();
}
}

}
Loading