Skip to content

Commit

Permalink
feat: check pending total bytes for push
Browse files Browse the repository at this point in the history
  • Loading branch information
absolute8511 committed Oct 15, 2023
1 parent edd9fd8 commit 9bc3eee
Showing 1 changed file with 33 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package io.openmessaging.storage.dledger;

import com.alibaba.fastjson.JSON;

import io.openmessaging.storage.dledger.DLedgerEntryPusher.EntryDispatcherState;
import io.openmessaging.storage.dledger.common.Closure;
import io.openmessaging.storage.dledger.common.ShutdownAbleThread;
import io.openmessaging.storage.dledger.common.Status;
Expand Down Expand Up @@ -55,6 +57,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -180,12 +183,33 @@ public int getPendingCount(long currTerm) {
return pendings.size();
}

public long getPendingSize(long currTerm) {
if (dispatcherMap == null) {
return 0;
}
long total = 0;
for (EntryDispatcher dispatcher : dispatcherMap.values()) {
total += dispatcher.pendingTotalSize.get();
}
return total;
}

public boolean isPendingFull(long currTerm) {
checkTermForPendingMap(currTerm, "isPendingFull");
if (dLedgerStore.isLocalToomuchUncommitted()) {
return true;
}
return pendingClosure.get(currTerm).size() > dLedgerConfig.getMaxPendingRequestsNum();
if (pendingClosure.get(currTerm).size() > dLedgerConfig.getMaxPendingRequestsNum()) {
return true;
}
// avoid too much memory in pending if more than half followers fall behind too much
int fallBehindTooMuch = 0;
for (EntryDispatcher dispatcher : dispatcherMap.values()) {
if (dispatcher.pendingTotalSize.get() >= dLedgerConfig.getPeerPushPendingMaxBytes()) {
fallBehindTooMuch++;
}
}
return fallBehindTooMuch > dispatcherMap.size() / 2;
}

public void appendClosure(Closure closure, long term, long index) {
Expand Down Expand Up @@ -409,6 +433,8 @@ private class EntryDispatcher extends ShutdownAbleThread {
private long matchIndex = -1;

private final int maxPendingSize = 1000;
private AtomicLong pendingTotalSize = new AtomicLong(0);

private long term = -1;
private String leaderId = null;
private long lastCheckLeakTimeMs = System.currentTimeMillis();
Expand Down Expand Up @@ -715,6 +741,10 @@ private void doAppend() throws Exception {
doCheckAppendResponse();
break;
}
if (pendingTotalSize.get() >= dLedgerConfig.getPeerPushPendingMaxBytes()) {
// to avoid oom or fullgc, we should wait for a while if too much pending big entry size
break;
}
long lastIndexToBeSend = doAppendInner(writeIndex);
if (lastIndexToBeSend == -1) {
break;
Expand Down Expand Up @@ -759,8 +789,10 @@ private void sendBatchAppendEntryRequest() throws Exception {
StopWatch watch = StopWatch.createStarted();
CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(batchAppendEntryRequest);
pendingMap.put(firstIndex, new Pair<>(System.currentTimeMillis(), batchAppendEntryRequest.getCount()));
pendingTotalSize.addAndGet(entriesSize);
responseFuture.whenComplete((x, ex) -> {
try {
pendingTotalSize.addAndGet(-1 * entriesSize);
PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);
DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());
switch (responseCode) {
Expand Down

0 comments on commit 9bc3eee

Please sign in to comment.