Skip to content

Commit

Permalink
Issue 2728: Entry Log GC may get blocked when using entryLogPerLedger…
Browse files Browse the repository at this point in the history
…Enabled option (apache#2779)
  • Loading branch information
RaulGracia authored Oct 11, 2021
1 parent 883231e commit e413c70
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,27 @@ long getLeastUnflushedLogId() {
return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId();
}

/**
* Get the last log id created so far. If entryLogPerLedger is enabled, the Garbage Collector
* process needs to look beyond the least unflushed entry log file, as there may be entry logs
* ready to be garbage collected.
*
* @return last entry log id created.
*/
long getLastLogId() {
return recentlyCreatedEntryLogsStatus.getLastLogId();
}

/**
* Returns whether the current log id exists and has been rotated already.
*
* @param entryLogId EntryLog id to check.
* @return Whether the given entryLogId exists and has been rotated.
*/
boolean isFlushedEntryLog(Long entryLogId) {
return recentlyCreatedEntryLogsStatus.isFlushedEntryLog(entryLogId);
}

long getPreviousAllocatedEntryLogId() {
return entryLoggerAllocator.getPreallocatedLogId();
}
Expand Down Expand Up @@ -1249,5 +1270,14 @@ synchronized void flushRotatedEntryLog(Long entryLogId) {
synchronized long getLeastUnflushedLogId() {
return leastUnflushedLogId;
}

synchronized long getLastLogId() {
return !entryLogsStatusMap.isEmpty() ? entryLogsStatusMap.lastKey() : 0;
}

synchronized boolean isFlushedEntryLog(Long entryLogId) {
return entryLogsStatusMap.containsKey(entryLogId) && entryLogsStatusMap.get(entryLogId)
|| entryLogId < leastUnflushedLogId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import lombok.Getter;
import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner;
Expand Down Expand Up @@ -583,12 +584,15 @@ protected void compactEntryLog(EntryLogMetadata entryLogMeta) {
* @throws IOException
*/
protected Map<Long, EntryLogMetadata> extractMetaFromEntryLogs(Map<Long, EntryLogMetadata> entryLogMetaMap) {
// Extract it for every entry log except for the current one.
// Entry Log ID's are just a long value that starts at 0 and increments
// by 1 when the log fills up and we roll to a new one.
long curLogId = entryLogger.getLeastUnflushedLogId();
// Entry Log ID's are just a long value that starts at 0 and increments by 1 when the log fills up and we roll
// to a new one. We scan entry logs as follows:
// - entryLogPerLedgerEnabled is false: Extract it for every entry log except for the current one (un-flushed).
// - entryLogPerLedgerEnabled is true: Scan all flushed entry logs up to the highest known id.
Supplier<Long> finalEntryLog = () -> conf.isEntryLogPerLedgerEnabled() ? entryLogger.getLastLogId() :
entryLogger.getLeastUnflushedLogId();
boolean hasExceptionWhenScan = false;
for (long entryLogId = scannedLogId; entryLogId < curLogId; entryLogId++) {
boolean increaseScannedLogId = true;
for (long entryLogId = scannedLogId; entryLogId < finalEntryLog.get(); entryLogId++) {
// Comb the current entry log file if it has not already been extracted.
if (entryLogMetaMap.containsKey(entryLogId)) {
continue;
Expand All @@ -600,6 +604,15 @@ protected Map<Long, EntryLogMetadata> extractMetaFromEntryLogs(Map<Long, EntryLo
continue;
}

// If entryLogPerLedgerEnabled is true, we will look for entry log files beyond getLeastUnflushedLogId()
// that have been explicitly rotated or below getLeastUnflushedLogId().
if (conf.isEntryLogPerLedgerEnabled() && !entryLogger.isFlushedEntryLog(entryLogId)) {
LOG.info("Entry log {} not flushed (entryLogPerLedgerEnabled). Starting next iteration at this point.",
entryLogId);
increaseScannedLogId = false;
continue;
}

LOG.info("Extracting entry log meta from entryLogId: {}", entryLogId);

try {
Expand All @@ -619,8 +632,9 @@ protected Map<Long, EntryLogMetadata> extractMetaFromEntryLogs(Map<Long, EntryLo

// if scan failed on some entry log, we don't move 'scannedLogId' to next id
// if scan succeed, we don't need to scan it again during next gc run,
// we move 'scannedLogId' to next id
if (!hasExceptionWhenScan) {
// we move 'scannedLogId' to next id (unless entryLogPerLedgerEnabled is true
// and we have found and un-flushed entry log already).
if (!hasExceptionWhenScan && (!conf.isEntryLogPerLedgerEnabled() || increaseScannedLogId)) {
++scannedLogId;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
Expand Down Expand Up @@ -544,6 +545,75 @@ public void testForceMinorCompaction() throws Exception {
.get().intValue() > 0);
}

@Test
public void testMinorCompactionWithEntryLogPerLedgerEnabled() throws Exception {
// restart bookies
restartBookies(c-> {
c.setMajorCompactionThreshold(0.0f);
c.setGcWaitTime(60000);
c.setMinorCompactionInterval(120000);
c.setMajorCompactionInterval(240000);
c.setForceAllowCompaction(true);
c.setEntryLogPerLedgerEnabled(true);
return c;
});

// prepare data
LedgerHandle[] lhs = prepareData(3, false);

for (LedgerHandle lh : lhs) {
lh.close();
}

long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime;
long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime;
assertFalse(getGCThread().enableMajorCompaction);
assertTrue(getGCThread().enableMinorCompaction);

// remove ledgers 1 and 2
bkc.deleteLedger(lhs[1].getId());
bkc.deleteLedger(lhs[2].getId());

// Need to wait until entry log 3 gets flushed before initiating GC to satisfy assertions.
while (!getGCThread().entryLogger.isFlushedEntryLog(3L)) {
TimeUnit.MILLISECONDS.sleep(100);
}

LOG.info("Finished deleting the ledgers contains most entries.");
getGCThread().triggerGC(true, false, false).get();

assertEquals(lastMajorCompactionTime, getGCThread().lastMajorCompactionTime);
assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime);

// At this point, we have the following state of ledgers end entry logs:
// L0 (not deleted) -> E0 (un-flushed): Entry log should exist.
// L1 (deleted) -> E1 (un-flushed): Entry log should exist as un-flushed entry logs are not considered for GC.
// L2 (deleted) -> E2 (flushed): Entry log should have been garbage collected.
// E3 (flushed): Entry log should have been garbage collected.
// E4 (un-flushed): Entry log should exist as un-flushed entry logs are not considered for GC.
assertTrue("Not found entry log files [0, 1, 4].log that should not have been compacted in: "
+ tmpDirs.get(0), TestUtils.hasAllLogFiles(tmpDirs.get(0), 0, 1, 4));
assertTrue("Found entry log files [2, 3].log that should have been compacted in ledgerDirectory: "
+ tmpDirs.get(0), TestUtils.hasNoneLogFiles(tmpDirs.get(0), 2, 3));

// Now, let's mark E1 as flushed, as its ledger L1 has been deleted already. In this case, the GC algorithm
// should consider it for deletion.
getGCThread().entryLogger.recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(1L);
getGCThread().triggerGC(true, false, false).get();
assertTrue("Found entry log file 1.log that should have been compacted in ledgerDirectory: "
+ tmpDirs.get(0), TestUtils.hasNoneLogFiles(tmpDirs.get(0), 1));

// Once removed the ledger L0, then deleting E0 is fine (only if it has been flushed).
bkc.deleteLedger(lhs[0].getId());
getGCThread().triggerGC(true, false, false).get();
assertTrue("Found entry log file 0.log that should not have been compacted in ledgerDirectory: "
+ tmpDirs.get(0), TestUtils.hasAllLogFiles(tmpDirs.get(0), 0));
getGCThread().entryLogger.recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(0L);
getGCThread().triggerGC(true, false, false).get();
assertTrue("Found entry log file 0.log that should have been compacted in ledgerDirectory: "
+ tmpDirs.get(0), TestUtils.hasNoneLogFiles(tmpDirs.get(0), 0));
}

@Test
public void testMinorCompactionWithNoWritableLedgerDirs() throws Exception {
// prepare data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package org.apache.bookkeeper.util;

import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
Expand Down Expand Up @@ -49,9 +50,31 @@ public static String buildStatsCounterPathFromBookieID(BookieId bookieId) {
return bookieId.toString().replace('.', '_').replace('-', '_').replace(":", "_");
}

public static boolean hasAllLogFiles(File ledgerDirectory, Integer... logsId) {
Set<Integer> logs = findEntryLogFileIds(ledgerDirectory);
return logs.containsAll(Arrays.asList(logsId));
}

public static boolean hasNoneLogFiles(File ledgerDirectory, Integer... logsId) {
Set<Integer> logs = findEntryLogFileIds(ledgerDirectory);
return Arrays.stream(logsId).noneMatch(logs::contains);
}

public static boolean hasLogFiles(File ledgerDirectory, boolean partial, Integer... logsId) {
boolean result = partial ? false : true;
Set<Integer> logs = new HashSet<Integer>();
boolean result = !partial;
Set<Integer> logs = findEntryLogFileIds(ledgerDirectory);
for (Integer logId : logsId) {
boolean exist = logs.contains(logId);
if ((partial && exist)
|| (!partial && !exist)) {
return !result;
}
}
return result;
}

private static Set<Integer> findEntryLogFileIds(File ledgerDirectory) {
Set<Integer> logs = new HashSet<>();
for (File file : BookieImpl.getCurrentDirectory(ledgerDirectory).listFiles()) {
if (file.isFile()) {
String name = file.getName();
Expand All @@ -61,14 +84,7 @@ public static boolean hasLogFiles(File ledgerDirectory, boolean partial, Integer
logs.add(Integer.parseInt(name.split("\\.")[0], 16));
}
}
for (Integer logId : logsId) {
boolean exist = logs.contains(logId);
if ((partial && exist)
|| (!partial && !exist)) {
return !result;
}
}
return result;
return logs;
}

public static void waitUntilLacUpdated(ReadHandle rh, long newLac) throws Exception {
Expand Down

0 comments on commit e413c70

Please sign in to comment.