Skip to content

Commit

Permalink
修复Actor的比较策略问题
Browse files Browse the repository at this point in the history
  • Loading branch information
sen.chai committed Nov 30, 2023
1 parent df357f9 commit 121d8f5
Showing 1 changed file with 30 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public static class Actor<E> implements Runnable, Comparable<Actor> {
private final String name;
private long total;
private volatile long submitTs;
private volatile long executeTs;
//通过Unsafe操作
private volatile int status;

Expand All @@ -148,6 +149,7 @@ boolean dispatch(E message) {
@Override
public void run() {
long start = System.currentTimeMillis();
executeTs = start;
String old = Thread.currentThread().getName();
try {
Thread.currentThread().setName(systemName + "-" + name);
Expand Down Expand Up @@ -228,8 +230,16 @@ private boolean updateStatus(int oldStatus, int newStatus) {

@Override
public int compareTo(Actor o) {
/*
原策略"TotalDuration"会存在如下问题:
当一个突发性的高QPS消息过来,由于之前量小,total值很低,所以每次都会优先执行这个消息,导致CPU被全部拿走,进而导致其他消息无法被拉取
int result = Long.compare(total, o.total);
return result == 0 ? Long.compare(submitTs, o.submitTs) : result;
改进策略"LastExecuteTimestamp":
记录executeTs“上次执行时间戳”,使用“上次执行时间戳”进行比较,如果“上次执行时间戳”相同,再降级到total
*/
return ActorCompareWay.LastExecuteTimestamp.compare(this, o);
}

@Override
Expand All @@ -247,6 +257,26 @@ public int hashCode() {
}
}

private enum ActorCompareWay {

LastExecuteTimestamp {
@Override
public int compare(Actor a1, Actor a2) {
int result = Long.compare(a1.executeTs, a2.executeTs);
return result == 0 ? Long.compare(a1.total, a2.total) : result;
}
},
TotalDuration {
@Override
public int compare(Actor a1, Actor a2) {
int result = Long.compare(a1.total, a2.total);
return result == 0 ? Long.compare(a1.submitTs, a2.submitTs) : result;
}
};

public abstract int compare(Actor a1, Actor a2);
}

/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
Expand Down

0 comments on commit 121d8f5

Please sign in to comment.