diff --git a/dev-support/spotbugs-exclude.xml b/dev-support/spotbugs-exclude.xml
index f57faaf65bc8..1137f5fdc142 100644
--- a/dev-support/spotbugs-exclude.xml
+++ b/dev-support/spotbugs-exclude.xml
@@ -247,4 +247,9 @@
+
+
+
+
+
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java
index 2b9732092ce9..234c92104a55 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java
@@ -145,6 +145,8 @@ private static FirstLevelBlockCache createFirstLevelCache(final Configuration c)
return new LruBlockCache(cacheSize, blockSize, true, c);
} else if (policy.equalsIgnoreCase("TinyLFU")) {
return new TinyLfuBlockCache(cacheSize, blockSize, ForkJoinPool.commonPool(), c);
+ } else if (policy.equalsIgnoreCase("AdaptiveLRU")) {
+ return new LruAdaptiveBlockCache(cacheSize, blockSize, true, c);
} else {
throw new IllegalArgumentException("Unknown policy: " + policy);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruAdaptiveBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruAdaptiveBlockCache.java
new file mode 100644
index 000000000000..083f1098af5c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruAdaptiveBlockCache.java
@@ -0,0 +1,1421 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static java.util.Objects.requireNonNull;
+
+import java.lang.ref.WeakReference;
+import java.util.EnumMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
+import org.apache.hbase.thirdparty.com.google.common.base.Objects;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This realisation improve performance of classical LRU
+ * cache up to 3 times via reduce GC job.
+ *
+ * The classical block cache implementation that is memory-aware using {@link HeapSize},
+ * memory-bound using an
+ * LRU eviction algorithm, and concurrent: backed by a {@link ConcurrentHashMap} and with a
+ * non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock}
+ * operations.
+ *
+ * Contains three levels of block priority to allow for scan-resistance and in-memory families
+ * {@link org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder#setInMemory(boolean)} (An
+ * in-memory column family is a column family that should be served from memory if possible):
+ * single-access, multiple-accesses, and in-memory priority. A block is added with an in-memory
+ * priority flag if {@link org.apache.hadoop.hbase.client.ColumnFamilyDescriptor#isInMemory()},
+ * otherwise a block becomes a single access priority the first time it is read into this block
+ * cache. If a block is accessed again while in cache, it is marked as a multiple access priority
+ * block. This delineation of blocks is used to prevent scans from thrashing the cache adding a
+ * least-frequently-used element to the eviction algorithm.
+ * map;
+
+ /** Eviction lock (locked when eviction in process) */
+ private transient final ReentrantLock evictionLock = new ReentrantLock(true);
+
+ private final long maxBlockSize;
+
+ /** Volatile boolean to track if we are in an eviction process or not */
+ private volatile boolean evictionInProgress = false;
+
+ /** Eviction thread */
+ private transient final EvictionThread evictionThread;
+
+ /** Statistics thread schedule pool (for heavy debugging, could remove) */
+ private transient final ScheduledExecutorService scheduleThreadPool =
+ Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
+ .setNameFormat("LruAdaptiveBlockCacheStatsExecutor").setDaemon(true).build());
+
+ /** Current size of cache */
+ private final AtomicLong size;
+
+ /** Current size of data blocks */
+ private final LongAdder dataBlockSize;
+
+ /** Current number of cached elements */
+ private final AtomicLong elements;
+
+ /** Current number of cached data block elements */
+ private final LongAdder dataBlockElements;
+
+ /** Cache access count (sequential ID) */
+ private final AtomicLong count;
+
+ /** hard capacity limit */
+ private final float hardCapacityLimitFactor;
+
+ /** Cache statistics */
+ private final CacheStats stats;
+
+ /** Maximum allowable size of cache (block put if size > max, evict) */
+ private long maxSize;
+
+ /** Approximate block size */
+ private final long blockSize;
+
+ /** Acceptable size of cache (no evictions if size < acceptable) */
+ private final float acceptableFactor;
+
+ /** Minimum threshold of cache (when evicting, evict until size < min) */
+ private final float minFactor;
+
+ /** Single access bucket size */
+ private final float singleFactor;
+
+ /** Multiple access bucket size */
+ private final float multiFactor;
+
+ /** In-memory bucket size */
+ private final float memoryFactor;
+
+ /** Overhead of the structure itself */
+ private final long overhead;
+
+ /** Whether in-memory hfile's data block has higher priority when evicting */
+ private boolean forceInMemory;
+
+ /**
+ * Where to send victims (blocks evicted/missing from the cache). This is used only when we use an
+ * external cache as L2.
+ * Note: See org.apache.hadoop.hbase.io.hfile.MemcachedBlockCache
+ */
+ private transient BlockCache victimHandler = null;
+
+ /** Percent of cached data blocks */
+ private volatile int cacheDataBlockPercent;
+
+ /** Limit of count eviction process when start to avoid to cache blocks */
+ private final int heavyEvictionCountLimit;
+
+ /** Limit of volume eviction process when start to avoid to cache blocks */
+ private final long heavyEvictionMbSizeLimit;
+
+ /** Adjust auto-scaling via overhead of evition rate */
+ private final float heavyEvictionOverheadCoefficient;
+
+ /**
+ * Default constructor. Specify maximum size and expected average block
+ * size (approximation is fine).
+ *
+ * All other factors will be calculated based on defaults specified in
+ * this class.
+ *
+ * @param maxSize maximum size of cache, in bytes
+ * @param blockSize approximate size of each block, in bytes
+ */
+ public LruAdaptiveBlockCache(long maxSize, long blockSize) {
+ this(maxSize, blockSize, true);
+ }
+
+ /**
+ * Constructor used for testing. Allows disabling of the eviction thread.
+ */
+ public LruAdaptiveBlockCache(long maxSize, long blockSize, boolean evictionThread) {
+ this(maxSize, blockSize, evictionThread,
+ (int) Math.ceil(1.2 * maxSize / blockSize),
+ DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
+ DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
+ DEFAULT_SINGLE_FACTOR,
+ DEFAULT_MULTI_FACTOR,
+ DEFAULT_MEMORY_FACTOR,
+ DEFAULT_HARD_CAPACITY_LIMIT_FACTOR,
+ false,
+ DEFAULT_MAX_BLOCK_SIZE,
+ DEFAULT_LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT,
+ DEFAULT_LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT,
+ DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT);
+ }
+
+ public LruAdaptiveBlockCache(long maxSize, long blockSize,
+ boolean evictionThread, Configuration conf) {
+ this(maxSize, blockSize, evictionThread,
+ (int) Math.ceil(1.2 * maxSize / blockSize),
+ DEFAULT_LOAD_FACTOR,
+ DEFAULT_CONCURRENCY_LEVEL,
+ conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
+ conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
+ conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
+ conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
+ conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
+ conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME,
+ DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
+ conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE),
+ conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE),
+ conf.getInt(LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT,
+ DEFAULT_LRU_CACHE_HEAVY_EVICTION_COUNT_LIMIT),
+ conf.getLong(LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT,
+ DEFAULT_LRU_CACHE_HEAVY_EVICTION_MB_SIZE_LIMIT),
+ conf.getFloat(LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT,
+ DEFAULT_LRU_CACHE_HEAVY_EVICTION_OVERHEAD_COEFFICIENT));
+ }
+
+ public LruAdaptiveBlockCache(long maxSize, long blockSize, Configuration conf) {
+ this(maxSize, blockSize, true, conf);
+ }
+
+ /**
+ * Configurable constructor. Use this constructor if not using defaults.
+ *
+ * @param maxSize maximum size of this cache, in bytes
+ * @param blockSize expected average size of blocks, in bytes
+ * @param evictionThread whether to run evictions in a bg thread or not
+ * @param mapInitialSize initial size of backing ConcurrentHashMap
+ * @param mapLoadFactor initial load factor of backing ConcurrentHashMap
+ * @param mapConcurrencyLevel initial concurrency factor for backing CHM
+ * @param minFactor percentage of total size that eviction will evict until
+ * @param acceptableFactor percentage of total size that triggers eviction
+ * @param singleFactor percentage of total size for single-access blocks
+ * @param multiFactor percentage of total size for multiple-access blocks
+ * @param memoryFactor percentage of total size for in-memory blocks
+ */
+ public LruAdaptiveBlockCache(long maxSize, long blockSize, boolean evictionThread,
+ int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
+ float minFactor, float acceptableFactor, float singleFactor,
+ float multiFactor, float memoryFactor, float hardLimitFactor,
+ boolean forceInMemory, long maxBlockSize,
+ int heavyEvictionCountLimit, long heavyEvictionMbSizeLimit,
+ float heavyEvictionOverheadCoefficient) {
+ this.maxBlockSize = maxBlockSize;
+ if(singleFactor + multiFactor + memoryFactor != 1 ||
+ singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
+ throw new IllegalArgumentException("Single, multi, and memory factors " +
+ " should be non-negative and total 1.0");
+ }
+ if (minFactor >= acceptableFactor) {
+ throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
+ }
+ if (minFactor >= 1.0f || acceptableFactor >= 1.0f) {
+ throw new IllegalArgumentException("all factors must be < 1");
+ }
+ this.maxSize = maxSize;
+ this.blockSize = blockSize;
+ this.forceInMemory = forceInMemory;
+ map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel);
+ this.minFactor = minFactor;
+ this.acceptableFactor = acceptableFactor;
+ this.singleFactor = singleFactor;
+ this.multiFactor = multiFactor;
+ this.memoryFactor = memoryFactor;
+ this.stats = new CacheStats(this.getClass().getSimpleName());
+ this.count = new AtomicLong(0);
+ this.elements = new AtomicLong(0);
+ this.dataBlockElements = new LongAdder();
+ this.dataBlockSize = new LongAdder();
+ this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
+ this.size = new AtomicLong(this.overhead);
+ this.hardCapacityLimitFactor = hardLimitFactor;
+ if (evictionThread) {
+ this.evictionThread = new EvictionThread(this);
+ this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
+ } else {
+ this.evictionThread = null;
+ }
+
+ // check the bounds
+ this.heavyEvictionCountLimit = Math.max(heavyEvictionCountLimit, 0);
+ this.heavyEvictionMbSizeLimit = Math.max(heavyEvictionCountLimit, 1);
+ this.cacheDataBlockPercent = 100;
+ heavyEvictionOverheadCoefficient = Math.min(heavyEvictionOverheadCoefficient, 1.0f);
+ heavyEvictionOverheadCoefficient = Math.max(heavyEvictionOverheadCoefficient, 0.001f);
+ this.heavyEvictionOverheadCoefficient = heavyEvictionOverheadCoefficient;
+
+ // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log
+ // every five minutes.
+ this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), STAT_THREAD_PERIOD,
+ STAT_THREAD_PERIOD, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void setVictimCache(BlockCache victimCache) {
+ if (victimHandler != null) {
+ throw new IllegalArgumentException("The victim cache has already been set");
+ }
+ victimHandler = requireNonNull(victimCache);
+ }
+
+ @Override
+ public void setMaxSize(long maxSize) {
+ this.maxSize = maxSize;
+ if (this.size.get() > acceptableSize() && !evictionInProgress) {
+ runEviction();
+ }
+ }
+
+ public int getCacheDataBlockPercent() {
+ return cacheDataBlockPercent;
+ }
+
+ /**
+ * The block cached in LruAdaptiveBlockCache will always be an heap block: on the one side,
+ * the heap access will be more faster then off-heap, the small index block or meta block
+ * cached in CombinedBlockCache will benefit a lot. on other side, the LruAdaptiveBlockCache
+ * size is always calculated based on the total heap size, if caching an off-heap block in
+ * LruAdaptiveBlockCache, the heap size will be messed up. Here we will clone the block into an
+ * heap block if it's an off-heap block, otherwise just use the original block. The key point is
+ * maintain the refCnt of the block (HBASE-22127):
+ * 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle;
+ * 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's
+ * reservoir, if both RPC and LruAdaptiveBlockCache release the block, then it can be garbage
+ * collected by JVM, so need a retain here.
+ * @param buf the original block
+ * @return an block with an heap memory backend.
+ */
+ private Cacheable asReferencedHeapBlock(Cacheable buf) {
+ if (buf instanceof HFileBlock) {
+ HFileBlock blk = ((HFileBlock) buf);
+ if (blk.isSharedMem()) {
+ return HFileBlock.deepCloneOnHeap(blk);
+ }
+ }
+ // The block will be referenced by this LruAdaptiveBlockCache,
+ // so should increase its refCnt here.
+ return buf.retain();
+ }
+
+ // BlockCache implementation
+
+ /**
+ * Cache the block with the specified name and buffer.
+ *
+ * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
+ * this can happen, for which we compare the buffer contents.
+ *
+ * @param cacheKey block's cache key
+ * @param buf block buffer
+ * @param inMemory if block is in-memory
+ */
+ @Override
+ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
+
+ // Some data blocks will not put into BlockCache when eviction rate too much.
+ // It is good for performance
+ // (see details: https://issues.apache.org/jira/browse/HBASE-23887)
+ // How to calculate it can find inside EvictionThread class.
+ if (cacheDataBlockPercent != 100 && buf.getBlockType().isData()) {
+ // It works like filter - blocks which two last digits of offset
+ // more than we calculate in Eviction Thread will not put into BlockCache
+ if (cacheKey.getOffset() % 100 >= cacheDataBlockPercent) {
+ return;
+ }
+ }
+
+ if (buf.heapSize() > maxBlockSize) {
+ // If there are a lot of blocks that are too
+ // big this can make the logs way too noisy.
+ // So we log 2%
+ if (stats.failInsert() % 50 == 0) {
+ LOG.warn("Trying to cache too large a block "
+ + cacheKey.getHfileName() + " @ "
+ + cacheKey.getOffset()
+ + " is " + buf.heapSize()
+ + " which is larger than " + maxBlockSize);
+ }
+ return;
+ }
+
+ LruCachedBlock cb = map.get(cacheKey);
+ if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this,
+ cacheKey, buf)) {
+ return;
+ }
+ long currentSize = size.get();
+ long currentAcceptableSize = acceptableSize();
+ long hardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);
+ if (currentSize >= hardLimitSize) {
+ stats.failInsert();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("LruAdaptiveBlockCache current size " + StringUtils.byteDesc(currentSize)
+ + " has exceeded acceptable size " + StringUtils.byteDesc(currentAcceptableSize) + "."
+ + " The hard limit size is " + StringUtils.byteDesc(hardLimitSize)
+ + ", failed to put cacheKey:" + cacheKey + " into LruAdaptiveBlockCache.");
+ }
+ if (!evictionInProgress) {
+ runEviction();
+ }
+ return;
+ }
+ // Ensure that the block is an heap one.
+ buf = asReferencedHeapBlock(buf);
+ cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
+ long newSize = updateSizeMetrics(cb, false);
+ map.put(cacheKey, cb);
+ long val = elements.incrementAndGet();
+ if (buf.getBlockType().isData()) {
+ dataBlockElements.increment();
+ }
+ if (LOG.isTraceEnabled()) {
+ long size = map.size();
+ assertCounterSanity(size, val);
+ }
+ if (newSize > currentAcceptableSize && !evictionInProgress) {
+ runEviction();
+ }
+ }
+
+ /**
+ * Sanity-checking for parity between actual block cache content and metrics.
+ * Intended only for use with TRACE level logging and -ea JVM.
+ */
+ private static void assertCounterSanity(long mapSize, long counterVal) {
+ if (counterVal < 0) {
+ LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
+ ", mapSize=" + mapSize);
+ return;
+ }
+ if (mapSize < Integer.MAX_VALUE) {
+ double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
+ if (pct_diff > 0.05) {
+ LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
+ ", mapSize=" + mapSize);
+ }
+ }
+ }
+
+ /**
+ * Cache the block with the specified name and buffer.
+ *
+ * TODO after HBASE-22005, we may cache an block which allocated from off-heap, but our LRU cache
+ * sizing is based on heap size, so we should handle this in HBASE-22127. It will introduce an
+ * switch whether make the LRU on-heap or not, if so we may need copy the memory to on-heap,
+ * otherwise the caching size is based on off-heap.
+ * @param cacheKey block's cache key
+ * @param buf block buffer
+ */
+ @Override
+ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
+ cacheBlock(cacheKey, buf, false);
+ }
+
+ /**
+ * Helper function that updates the local size counter and also updates any
+ * per-cf or per-blocktype metrics it can discern from given
+ * {@link LruCachedBlock}
+ */
+ private long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
+ long heapsize = cb.heapSize();
+ BlockType bt = cb.getBuffer().getBlockType();
+ if (evict) {
+ heapsize *= -1;
+ }
+ if (bt != null && bt.isData()) {
+ dataBlockSize.add(heapsize);
+ }
+ return size.addAndGet(heapsize);
+ }
+
+ /**
+ * Get the buffer of the block with the specified name.
+ *
+ * @param cacheKey block's cache key
+ * @param caching true if the caller caches blocks on cache misses
+ * @param repeat Whether this is a repeat lookup for the same block
+ * (used to avoid double counting cache misses when doing double-check
+ * locking)
+ * @param updateCacheMetrics Whether to update cache metrics or not
+ *
+ * @return buffer of specified cache key, or null if not in cache
+ */
+ @Override
+ public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
+ boolean updateCacheMetrics) {
+ LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> {
+ // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside
+ // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove
+ // the block and release, then we're retaining a block with refCnt=0 which is disallowed.
+ // see HBASE-22422.
+ val.getBuffer().retain();
+ return val;
+ });
+ if (cb == null) {
+ if (!repeat && updateCacheMetrics) {
+ stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
+ }
+ // If there is another block cache then try and read there.
+ // However if this is a retry ( second time in double checked locking )
+ // And it's already a miss then the l2 will also be a miss.
+ if (victimHandler != null && !repeat) {
+ // The handler will increase result's refCnt for RPC, so need no extra retain.
+ Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
+ // Promote this to L1.
+ if (result != null) {
+ if (caching) {
+ cacheBlock(cacheKey, result, /* inMemory = */ false);
+ }
+ }
+ return result;
+ }
+ return null;
+ }
+ if (updateCacheMetrics) {
+ stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
+ }
+ cb.access(count.incrementAndGet());
+ return cb.getBuffer();
+ }
+
+ /**
+ * Whether the cache contains block with specified cacheKey
+ *
+ * @return true if contains the block
+ */
+ @Override
+ public boolean containsBlock(BlockCacheKey cacheKey) {
+ return map.containsKey(cacheKey);
+ }
+
+ @Override
+ public boolean evictBlock(BlockCacheKey cacheKey) {
+ LruCachedBlock cb = map.get(cacheKey);
+ return cb != null && evictBlock(cb, false) > 0;
+ }
+
+ /**
+ * Evicts all blocks for a specific HFile. This is an
+ * expensive operation implemented as a linear-time search through all blocks
+ * in the cache. Ideally this should be a search in a log-access-time map.
+ *
+ *
+ * This is used for evict-on-close to remove all blocks of a specific HFile.
+ *
+ * @return the number of blocks evicted
+ */
+ @Override
+ public int evictBlocksByHfileName(String hfileName) {
+ int numEvicted = (int) map.keySet().stream().filter(key -> key.getHfileName().equals(hfileName))
+ .filter(this::evictBlock).count();
+ if (victimHandler != null) {
+ numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
+ }
+ return numEvicted;
+ }
+
+ /**
+ * Evict the block, and it will be cached by the victim handler if exists &&
+ * block may be read again later
+ *
+ * @param evictedByEvictionProcess true if the given block is evicted by
+ * EvictionThread
+ * @return the heap size of evicted block
+ */
+ protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
+ LruCachedBlock previous = map.remove(block.getCacheKey());
+ if (previous == null) {
+ return 0;
+ }
+ updateSizeMetrics(block, true);
+ long val = elements.decrementAndGet();
+ if (LOG.isTraceEnabled()) {
+ long size = map.size();
+ assertCounterSanity(size, val);
+ }
+ if (block.getBuffer().getBlockType().isData()) {
+ dataBlockElements.decrement();
+ }
+ if (evictedByEvictionProcess) {
+ // When the eviction of the block happened because of invalidation of HFiles, no need to
+ // update the stats counter.
+ stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
+ if (victimHandler != null) {
+ victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
+ }
+ }
+ // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO
+ // NOT move this up because if do that then the victimHandler may access the buffer with
+ // refCnt = 0 which is disallowed.
+ previous.getBuffer().release();
+ return block.heapSize();
+ }
+
+ /**
+ * Multi-threaded call to run the eviction process.
+ */
+ private void runEviction() {
+ if (evictionThread == null) {
+ evict();
+ } else {
+ evictionThread.evict();
+ }
+ }
+
+ boolean isEvictionInProgress() {
+ return evictionInProgress;
+ }
+
+ long getOverhead() {
+ return overhead;
+ }
+
+ /**
+ * Eviction method.
+ *
+ * Evict items in order of use, allowing delete items
+ * which haven't been used for the longest amount of time.
+ *
+ * @return how many bytes were freed
+ */
+ long evict() {
+
+ // Ensure only one eviction at a time
+ if (!evictionLock.tryLock()) {
+ return 0;
+ }
+
+ long bytesToFree = 0L;
+
+ try {
+ evictionInProgress = true;
+ long currentSize = this.size.get();
+ bytesToFree = currentSize - minSize();
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Block cache LRU eviction started; Attempting to free " +
+ StringUtils.byteDesc(bytesToFree) + " of total=" +
+ StringUtils.byteDesc(currentSize));
+ }
+
+ if (bytesToFree <= 0) {
+ return 0;
+ }
+
+ // Instantiate priority buckets
+ BlockBucket bucketSingle
+ = new BlockBucket("single", bytesToFree, blockSize, singleSize());
+ BlockBucket bucketMulti
+ = new BlockBucket("multi", bytesToFree, blockSize, multiSize());
+ BlockBucket bucketMemory
+ = new BlockBucket("memory", bytesToFree, blockSize, memorySize());
+
+ // Scan entire map putting into appropriate buckets
+ for (LruCachedBlock cachedBlock : map.values()) {
+ switch (cachedBlock.getPriority()) {
+ case SINGLE: {
+ bucketSingle.add(cachedBlock);
+ break;
+ }
+ case MULTI: {
+ bucketMulti.add(cachedBlock);
+ break;
+ }
+ case MEMORY: {
+ bucketMemory.add(cachedBlock);
+ break;
+ }
+ }
+ }
+
+ long bytesFreed = 0;
+ if (forceInMemory || memoryFactor > 0.999f) {
+ long s = bucketSingle.totalSize();
+ long m = bucketMulti.totalSize();
+ if (bytesToFree > (s + m)) {
+ // this means we need to evict blocks in memory bucket to make room,
+ // so the single and multi buckets will be emptied
+ bytesFreed = bucketSingle.free(s);
+ bytesFreed += bucketMulti.free(m);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
+ " from single and multi buckets");
+ }
+ bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
+ " total from all three buckets ");
+ }
+ } else {
+ // this means no need to evict block in memory bucket,
+ // and we try best to make the ratio between single-bucket and
+ // multi-bucket is 1:2
+ long bytesRemain = s + m - bytesToFree;
+ if (3 * s <= bytesRemain) {
+ // single-bucket is small enough that no eviction happens for it
+ // hence all eviction goes from multi-bucket
+ bytesFreed = bucketMulti.free(bytesToFree);
+ } else if (3 * m <= 2 * bytesRemain) {
+ // multi-bucket is small enough that no eviction happens for it
+ // hence all eviction goes from single-bucket
+ bytesFreed = bucketSingle.free(bytesToFree);
+ } else {
+ // both buckets need to evict some blocks
+ bytesFreed = bucketSingle.free(s - bytesRemain / 3);
+ if (bytesFreed < bytesToFree) {
+ bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
+ }
+ }
+ }
+ } else {
+ PriorityQueue bucketQueue = new PriorityQueue<>(3);
+
+ bucketQueue.add(bucketSingle);
+ bucketQueue.add(bucketMulti);
+ bucketQueue.add(bucketMemory);
+
+ int remainingBuckets = bucketQueue.size();
+
+ BlockBucket bucket;
+ while ((bucket = bucketQueue.poll()) != null) {
+ long overflow = bucket.overflow();
+ if (overflow > 0) {
+ long bucketBytesToFree =
+ Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets);
+ bytesFreed += bucket.free(bucketBytesToFree);
+ }
+ remainingBuckets--;
+ }
+ }
+ if (LOG.isTraceEnabled()) {
+ long single = bucketSingle.totalSize();
+ long multi = bucketMulti.totalSize();
+ long memory = bucketMemory.totalSize();
+ LOG.trace("Block cache LRU eviction completed; " +
+ "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
+ "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
+ "single=" + StringUtils.byteDesc(single) + ", " +
+ "multi=" + StringUtils.byteDesc(multi) + ", " +
+ "memory=" + StringUtils.byteDesc(memory));
+ }
+ } finally {
+ stats.evict();
+ evictionInProgress = false;
+ evictionLock.unlock();
+ }
+ return bytesToFree;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("blockCount", getBlockCount())
+ .add("currentSize", StringUtils.byteDesc(getCurrentSize()))
+ .add("freeSize", StringUtils.byteDesc(getFreeSize()))
+ .add("maxSize", StringUtils.byteDesc(getMaxSize()))
+ .add("heapSize", StringUtils.byteDesc(heapSize()))
+ .add("minSize", StringUtils.byteDesc(minSize()))
+ .add("minFactor", minFactor)
+ .add("multiSize", StringUtils.byteDesc(multiSize()))
+ .add("multiFactor", multiFactor)
+ .add("singleSize", StringUtils.byteDesc(singleSize()))
+ .add("singleFactor", singleFactor)
+ .toString();
+ }
+
+ /**
+ * Used to group blocks into priority buckets. There will be a BlockBucket
+ * for each priority (single, multi, memory). Once bucketed, the eviction
+ * algorithm takes the appropriate number of elements out of each according
+ * to configuration parameters and their relatives sizes.
+ */
+ private class BlockBucket implements Comparable {
+
+ private final String name;
+ private final LruCachedBlockQueue queue;
+ private long totalSize = 0;
+ private final long bucketSize;
+
+ public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
+ this.name = name;
+ this.bucketSize = bucketSize;
+ queue = new LruCachedBlockQueue(bytesToFree, blockSize);
+ totalSize = 0;
+ }
+
+ public void add(LruCachedBlock block) {
+ totalSize += block.heapSize();
+ queue.add(block);
+ }
+
+ public long free(long toFree) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("freeing {} from {}", StringUtils.byteDesc(toFree), this);
+ }
+ LruCachedBlock cb;
+ long freedBytes = 0;
+ while ((cb = queue.pollLast()) != null) {
+ freedBytes += evictBlock(cb, true);
+ if (freedBytes >= toFree) {
+ return freedBytes;
+ }
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("freeing {} from {}", StringUtils.byteDesc(toFree), this);
+ }
+ return freedBytes;
+ }
+
+ public long overflow() {
+ return totalSize - bucketSize;
+ }
+
+ public long totalSize() {
+ return totalSize;
+ }
+
+ @Override
+ public int compareTo(BlockBucket that) {
+ return Long.compare(this.overflow(), that.overflow());
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (!(that instanceof BlockBucket)) {
+ return false;
+ }
+ return compareTo((BlockBucket)that) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(name, bucketSize, queue, totalSize);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("name", name)
+ .add("totalSize", StringUtils.byteDesc(totalSize))
+ .add("bucketSize", StringUtils.byteDesc(bucketSize))
+ .toString();
+ }
+ }
+
+ /**
+ * Get the maximum size of this cache.
+ *
+ * @return max size in bytes
+ */
+
+ @Override
+ public long getMaxSize() {
+ return this.maxSize;
+ }
+
+ @Override
+ public long getCurrentSize() {
+ return this.size.get();
+ }
+
+ @Override
+ public long getCurrentDataSize() {
+ return this.dataBlockSize.sum();
+ }
+
+ @Override
+ public long getFreeSize() {
+ return getMaxSize() - getCurrentSize();
+ }
+
+ @Override
+ public long size() {
+ return getMaxSize();
+ }
+
+ @Override
+ public long getBlockCount() {
+ return this.elements.get();
+ }
+
+ @Override
+ public long getDataBlockCount() {
+ return this.dataBlockElements.sum();
+ }
+
+ EvictionThread getEvictionThread() {
+ return this.evictionThread;
+ }
+
+ /*
+ * Eviction thread. Sits in waiting state until an eviction is triggered
+ * when the cache size grows above the acceptable level.
+ *
+ * Thread is triggered into action by {@link LruAdaptiveBlockCache#runEviction()}
+ */
+ static class EvictionThread extends Thread {
+
+ private WeakReference cache;
+ private volatile boolean go = true;
+ // flag set after enter the run method, used for test
+ private boolean enteringRun = false;
+
+ public EvictionThread(LruAdaptiveBlockCache cache) {
+ super(Thread.currentThread().getName() + ".LruAdaptiveBlockCache.EvictionThread");
+ setDaemon(true);
+ this.cache = new WeakReference<>(cache);
+ }
+
+ @Override
+ public void run() {
+ enteringRun = true;
+ long freedSumMb = 0;
+ int heavyEvictionCount = 0;
+ int freedDataOverheadPercent = 0;
+ long startTime = System.currentTimeMillis();
+ while (this.go) {
+ synchronized (this) {
+ try {
+ this.wait(1000 * 10/*Don't wait for ever*/);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted eviction thread ", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ LruAdaptiveBlockCache cache = this.cache.get();
+ if (cache == null) {
+ break;
+ }
+ freedSumMb += cache.evict()/1024/1024;
+ /*
+ * Sometimes we are reading more data than can fit into BlockCache
+ * and it is the cause a high rate of evictions.
+ * This in turn leads to heavy Garbage Collector works.
+ * So a lot of blocks put into BlockCache but never read,
+ * but spending a lot of CPU resources.
+ * Here we will analyze how many bytes were freed and decide
+ * decide whether the time has come to reduce amount of caching blocks.
+ * It help avoid put too many blocks into BlockCache
+ * when evict() works very active and save CPU for other jobs.
+ * More delails: https://issues.apache.org/jira/browse/HBASE-23887
+ */
+
+ // First of all we have to control how much time
+ // has passed since previuos evict() was launched
+ // This is should be almost the same time (+/- 10s)
+ // because we get comparable volumes of freed bytes each time.
+ // 10s because this is default period to run evict() (see above this.wait)
+ long stopTime = System.currentTimeMillis();
+ if ((stopTime - startTime) > 1000 * 10 - 1) {
+ // Here we have to calc what situation we have got.
+ // We have the limit "hbase.lru.cache.heavy.eviction.bytes.size.limit"
+ // and can calculte overhead on it.
+ // We will use this information to decide,
+ // how to change percent of caching blocks.
+ freedDataOverheadPercent =
+ (int) (freedSumMb * 100 / cache.heavyEvictionMbSizeLimit) - 100;
+ if (freedSumMb > cache.heavyEvictionMbSizeLimit) {
+ // Now we are in the situation when we are above the limit
+ // But maybe we are going to ignore it because it will end quite soon
+ heavyEvictionCount++;
+ if (heavyEvictionCount > cache.heavyEvictionCountLimit) {
+ // It is going for a long time and we have to reduce of caching
+ // blocks now. So we calculate here how many blocks we want to skip.
+ // It depends on:
+ // 1. Overhead - if overhead is big we could more aggressive
+ // reducing amount of caching blocks.
+ // 2. How fast we want to get the result. If we know that our
+ // heavy reading for a long time, we don't want to wait and can
+ // increase the coefficient and get good performance quite soon.
+ // But if we don't sure we can do it slowly and it could prevent
+ // premature exit from this mode. So, when the coefficient is
+ // higher we can get better performance when heavy reading is stable.
+ // But when reading is changing we can adjust to it and set
+ // the coefficient to lower value.
+ int change =
+ (int) (freedDataOverheadPercent * cache.heavyEvictionOverheadCoefficient);
+ // But practice shows that 15% of reducing is quite enough.
+ // We are not greedy (it could lead to premature exit).
+ change = Math.min(15, change);
+ change = Math.max(0, change); // I think it will never happen but check for sure
+ // So this is the key point, here we are reducing % of caching blocks
+ cache.cacheDataBlockPercent -= change;
+ // If we go down too deep we have to stop here, 1% any way should be.
+ cache.cacheDataBlockPercent = Math.max(1, cache.cacheDataBlockPercent);
+ }
+ } else {
+ // Well, we have got overshooting.
+ // Mayby it is just short-term fluctuation and we can stay in this mode.
+ // It help avoid permature exit during short-term fluctuation.
+ // If overshooting less than 90%, we will try to increase the percent of
+ // caching blocks and hope it is enough.
+ if (freedSumMb >= cache.heavyEvictionMbSizeLimit * 0.1) {
+ // Simple logic: more overshooting - more caching blocks (backpressure)
+ int change = (int) (-freedDataOverheadPercent * 0.1 + 1);
+ cache.cacheDataBlockPercent += change;
+ // But it can't be more then 100%, so check it.
+ cache.cacheDataBlockPercent = Math.min(100, cache.cacheDataBlockPercent);
+ } else {
+ // Looks like heavy reading is over.
+ // Just exit form this mode.
+ heavyEvictionCount = 0;
+ cache.cacheDataBlockPercent = 100;
+ }
+ }
+ LOG.info("BlockCache evicted (MB): {}, overhead (%): {}, " +
+ "heavy eviction counter: {}, " +
+ "current caching DataBlock (%): {}",
+ freedSumMb, freedDataOverheadPercent,
+ heavyEvictionCount, cache.cacheDataBlockPercent);
+
+ freedSumMb = 0;
+ startTime = stopTime;
+ }
+ }
+ }
+
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
+ justification="This is what we want")
+ public void evict() {
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+
+ synchronized void shutdown() {
+ this.go = false;
+ this.notifyAll();
+ }
+
+ /**
+ * Used for the test.
+ */
+ boolean isEnteringRun() {
+ return this.enteringRun;
+ }
+ }
+
+ /*
+ * Statistics thread. Periodically prints the cache statistics to the log.
+ */
+ static class StatisticsThread extends Thread {
+
+ private final LruAdaptiveBlockCache lru;
+
+ public StatisticsThread(LruAdaptiveBlockCache lru) {
+ super("LruAdaptiveBlockCacheStats");
+ setDaemon(true);
+ this.lru = lru;
+ }
+
+ @Override
+ public void run() {
+ lru.logStats();
+ }
+ }
+
+ public void logStats() {
+ // Log size
+ long totalSize = heapSize();
+ long freeSize = maxSize - totalSize;
+ LruAdaptiveBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
+ "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
+ "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
+ "blockCount=" + getBlockCount() + ", " +
+ "accesses=" + stats.getRequestCount() + ", " +
+ "hits=" + stats.getHitCount() + ", " +
+ "hitRatio=" + (stats.getHitCount() == 0 ?
+ "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
+ "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
+ "cachingHits=" + stats.getHitCachingCount() + ", " +
+ "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
+ "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
+ "evictions=" + stats.getEvictionCount() + ", " +
+ "evicted=" + stats.getEvictedCount() + ", " +
+ "evictedPerRun=" + stats.evictedPerEviction());
+ }
+
+ /**
+ * Get counter statistics for this cache.
+ *
+ * Includes: total accesses, hits, misses, evicted blocks, and runs
+ * of the eviction processes.
+ */
+ @Override
+ public CacheStats getStats() {
+ return this.stats;
+ }
+
+ public final static long CACHE_FIXED_OVERHEAD =
+ ClassSize.estimateBase(LruAdaptiveBlockCache.class, false);
+
+ @Override
+ public long heapSize() {
+ return getCurrentSize();
+ }
+
+ private static long calculateOverhead(long maxSize, long blockSize, int concurrency) {
+ // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
+ return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP
+ + ((long) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY)
+ + ((long) concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
+ }
+
+ @Override
+ public Iterator iterator() {
+ final Iterator iterator = map.values().iterator();
+
+ return new Iterator() {
+ private final long now = System.nanoTime();
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public CachedBlock next() {
+ final LruCachedBlock b = iterator.next();
+ return new CachedBlock() {
+ @Override
+ public String toString() {
+ return BlockCacheUtil.toString(this, now);
+ }
+
+ @Override
+ public BlockPriority getBlockPriority() {
+ return b.getPriority();
+ }
+
+ @Override
+ public BlockType getBlockType() {
+ return b.getBuffer().getBlockType();
+ }
+
+ @Override
+ public long getOffset() {
+ return b.getCacheKey().getOffset();
+ }
+
+ @Override
+ public long getSize() {
+ return b.getBuffer().heapSize();
+ }
+
+ @Override
+ public long getCachedTime() {
+ return b.getCachedTime();
+ }
+
+ @Override
+ public String getFilename() {
+ return b.getCacheKey().getHfileName();
+ }
+
+ @Override
+ public int compareTo(CachedBlock other) {
+ int diff = this.getFilename().compareTo(other.getFilename());
+ if (diff != 0) {
+ return diff;
+ }
+ diff = Long.compare(this.getOffset(), other.getOffset());
+ if (diff != 0) {
+ return diff;
+ }
+ if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
+ throw new IllegalStateException(this.getCachedTime() + ", " + other.getCachedTime());
+ }
+ return Long.compare(other.getCachedTime(), this.getCachedTime());
+ }
+
+ @Override
+ public int hashCode() {
+ return b.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof CachedBlock) {
+ CachedBlock cb = (CachedBlock)obj;
+ return compareTo(cb) == 0;
+ } else {
+ return false;
+ }
+ }
+ };
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ // Simple calculators of sizes given factors and maxSize
+
+ long acceptableSize() {
+ return (long)Math.floor(this.maxSize * this.acceptableFactor);
+ }
+ private long minSize() {
+ return (long)Math.floor(this.maxSize * this.minFactor);
+ }
+ private long singleSize() {
+ return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
+ }
+ private long multiSize() {
+ return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
+ }
+ private long memorySize() {
+ return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
+ }
+
+ @Override
+ public void shutdown() {
+ if (victimHandler != null) {
+ victimHandler.shutdown();
+ }
+ this.scheduleThreadPool.shutdown();
+ for (int i = 0; i < 10; i++) {
+ if (!this.scheduleThreadPool.isShutdown()) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while sleeping");
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+
+ if (!this.scheduleThreadPool.isShutdown()) {
+ List runnables = this.scheduleThreadPool.shutdownNow();
+ LOG.debug("Still running " + runnables);
+ }
+ this.evictionThread.shutdown();
+ }
+
+ /** Clears the cache. Used in tests. */
+ public void clearCache() {
+ this.map.clear();
+ this.elements.set(0);
+ }
+
+ public Map getEncodingCountsForTest() {
+ Map counts = new EnumMap<>(DataBlockEncoding.class);
+ for (LruCachedBlock block : map.values()) {
+ DataBlockEncoding encoding = ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
+ Integer count = counts.get(encoding);
+ counts.put(encoding, (count == null ? 0 : count) + 1);
+ }
+ return counts;
+ }
+
+ Map getMapForTests() {
+ return map;
+ }
+
+ @Override
+ public BlockCache[] getBlockCaches() {
+ if (victimHandler != null) {
+ return new BlockCache[] { this, this.victimHandler };
+ }
+ return null;
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruAdaptiveBlockCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruAdaptiveBlockCache.java
new file mode 100644
index 000000000000..f29d12ac315b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruAdaptiveBlockCache.java
@@ -0,0 +1,1174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.hfile.LruAdaptiveBlockCache.EvictionThread;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests the concurrent LruAdaptiveBlockCache.
+ *
+ * Tests will ensure it grows and shrinks in size properly,
+ * evictions run when they're supposed to and do what they should,
+ * and that cached blocks are accessible when expected to be.
+ */
+@Category({IOTests.class, SmallTests.class})
+public class TestLruAdaptiveBlockCache {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestLruAdaptiveBlockCache.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestLruAdaptiveBlockCache.class);
+
+ @Test
+ public void testCacheEvictionThreadSafe() throws Exception {
+ long maxSize = 100000;
+ int numBlocks = 9;
+ int testRuns = 10;
+ final long blockSize = calculateBlockSizeDefault(maxSize, numBlocks);
+ assertTrue("calculateBlockSize appears broken.", blockSize * numBlocks <= maxSize);
+
+ final Configuration conf = HBaseConfiguration.create();
+ final LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize);
+ EvictionThread evictionThread = cache.getEvictionThread();
+ assertNotNull(evictionThread);
+ while (!evictionThread.isEnteringRun()) {
+ Thread.sleep(1000);
+ }
+ final String hfileName = "hfile";
+ int threads = 10;
+ final int blocksPerThread = 5 * numBlocks;
+ for (int run = 0; run != testRuns; ++run) {
+ final AtomicInteger blockCount = new AtomicInteger(0);
+ ExecutorService service = Executors.newFixedThreadPool(threads);
+ for (int i = 0; i != threads; ++i) {
+ service.execute(() -> {
+ for (int blockIndex = 0; blockIndex < blocksPerThread
+ || (!cache.isEvictionInProgress()); ++blockIndex) {
+ CachedItem block = new CachedItem(hfileName, (int) blockSize,
+ blockCount.getAndIncrement());
+ boolean inMemory = Math.random() > 0.5;
+ cache.cacheBlock(block.cacheKey, block, inMemory);
+ }
+ cache.evictBlocksByHfileName(hfileName);
+ });
+ }
+ service.shutdown();
+ // The test may fail here if the evict thread frees the blocks too fast
+ service.awaitTermination(10, TimeUnit.MINUTES);
+ Waiter.waitFor(conf, 10000, 100, new ExplainingPredicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return cache.getBlockCount() == 0;
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "Cache block count failed to return to 0";
+ }
+ });
+ assertEquals(0, cache.getBlockCount());
+ assertEquals(cache.getOverhead(), cache.getCurrentSize());
+ }
+ }
+
+ @Test
+ public void testBackgroundEvictionThread() throws Exception {
+ long maxSize = 100000;
+ int numBlocks = 9;
+ long blockSize = calculateBlockSizeDefault(maxSize, numBlocks);
+ assertTrue("calculateBlockSize appears broken.",
+ blockSize * numBlocks <= maxSize);
+
+ LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize,blockSize);
+ EvictionThread evictionThread = cache.getEvictionThread();
+ assertNotNull(evictionThread);
+
+ CachedItem[] blocks = generateFixedBlocks(numBlocks + 1, blockSize, "block");
+
+ // Make sure eviction thread has entered run method
+ while (!evictionThread.isEnteringRun()) {
+ Thread.sleep(1);
+ }
+
+ // Add all the blocks
+ for (CachedItem block : blocks) {
+ cache.cacheBlock(block.cacheKey, block);
+ }
+
+ // wait until at least one eviction has run
+ int n = 0;
+ while(cache.getStats().getEvictionCount() == 0) {
+ Thread.sleep(200);
+ assertTrue("Eviction never happened.", n++ < 20);
+ }
+
+ // let cache stabilize
+ // On some systems, the cache will run multiple evictions before it attains
+ // steady-state. For instance, after populating the cache with 10 blocks,
+ // the first eviction evicts a single block and then a second eviction
+ // evicts another. I think this is due to the delta between minSize and
+ // acceptableSize, combined with variance between object overhead on
+ // different environments.
+ n = 0;
+ for (long prevCnt = 0 /* < number of blocks added */,
+ curCnt = cache.getBlockCount();
+ prevCnt != curCnt; prevCnt = curCnt, curCnt = cache.getBlockCount()) {
+ Thread.sleep(200);
+ assertTrue("Cache never stabilized.", n++ < 20);
+ }
+
+ long evictionCount = cache.getStats().getEvictionCount();
+ assertTrue(evictionCount >= 1);
+ System.out.println("Background Evictions run: " + evictionCount);
+ }
+
+ @Test
+ public void testCacheSimple() throws Exception {
+
+ long maxSize = 1000000;
+ long blockSize = calculateBlockSizeDefault(maxSize, 101);
+
+ LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize);
+
+ CachedItem [] blocks = generateRandomBlocks(100, blockSize);
+
+ long expectedCacheSize = cache.heapSize();
+
+ // Confirm empty
+ for (CachedItem block : blocks) {
+ assertTrue(cache.getBlock(block.cacheKey, true, false,
+ true) == null);
+ }
+
+ // Add blocks
+ for (CachedItem block : blocks) {
+ cache.cacheBlock(block.cacheKey, block);
+ expectedCacheSize += block.cacheBlockHeapSize();
+ }
+
+ // Verify correctly calculated cache heap size
+ assertEquals(expectedCacheSize, cache.heapSize());
+
+ // Check if all blocks are properly cached and retrieved
+ for (CachedItem block : blocks) {
+ HeapSize buf = cache.getBlock(block.cacheKey, true, false,
+ true);
+ assertTrue(buf != null);
+ assertEquals(buf.heapSize(), block.heapSize());
+ }
+
+ // Re-add same blocks and ensure nothing has changed
+ long expectedBlockCount = cache.getBlockCount();
+ for (CachedItem block : blocks) {
+ cache.cacheBlock(block.cacheKey, block);
+ }
+ assertEquals(
+ "Cache should ignore cache requests for blocks already in cache",
+ expectedBlockCount, cache.getBlockCount());
+
+ // Verify correctly calculated cache heap size
+ assertEquals(expectedCacheSize, cache.heapSize());
+
+ // Check if all blocks are properly cached and retrieved
+ for (CachedItem block : blocks) {
+ HeapSize buf = cache.getBlock(block.cacheKey, true, false,
+ true);
+ assertTrue(buf != null);
+ assertEquals(buf.heapSize(), block.heapSize());
+ }
+
+ // Expect no evictions
+ assertEquals(0, cache.getStats().getEvictionCount());
+ Thread t = new LruAdaptiveBlockCache.StatisticsThread(cache);
+ t.start();
+ t.join();
+ }
+
+ @Test
+ public void testCacheEvictionSimple() throws Exception {
+
+ long maxSize = 100000;
+ long blockSize = calculateBlockSizeDefault(maxSize, 10);
+
+ LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize,blockSize,false);
+
+ CachedItem [] blocks = generateFixedBlocks(10, blockSize, "block");
+
+ long expectedCacheSize = cache.heapSize();
+
+ // Add all the blocks
+ for (CachedItem block : blocks) {
+ cache.cacheBlock(block.cacheKey, block);
+ expectedCacheSize += block.cacheBlockHeapSize();
+ }
+
+ // A single eviction run should have occurred
+ assertEquals(1, cache.getStats().getEvictionCount());
+
+ // Our expected size overruns acceptable limit
+ assertTrue(expectedCacheSize >
+ (maxSize * LruAdaptiveBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
+
+ // But the cache did not grow beyond max
+ assertTrue(cache.heapSize() < maxSize);
+
+ // And is still below the acceptable limit
+ assertTrue(cache.heapSize() <
+ (maxSize * LruAdaptiveBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
+
+ // All blocks except block 0 should be in the cache
+ assertTrue(cache.getBlock(blocks[0].cacheKey, true, false,
+ true) == null);
+ for(int i=1;i
+ (maxSize * LruAdaptiveBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
+
+ // But the cache did not grow beyond max
+ assertTrue(cache.heapSize() <= maxSize);
+
+ // And is now below the acceptable limit
+ assertTrue(cache.heapSize() <=
+ (maxSize * LruAdaptiveBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
+
+ // We expect fairness across the two priorities.
+ // This test makes multi go barely over its limit, in-memory
+ // empty, and the rest in single. Two single evictions and
+ // one multi eviction expected.
+ assertTrue(cache.getBlock(singleBlocks[0].cacheKey, true, false,
+ true) == null);
+ assertTrue(cache.getBlock(multiBlocks[0].cacheKey, true, false,
+ true) == null);
+
+ // And all others to be cached
+ for(int i=1;i<4;i++) {
+ assertEquals(cache.getBlock(singleBlocks[i].cacheKey, true, false,
+ true),
+ singleBlocks[i]);
+ assertEquals(cache.getBlock(multiBlocks[i].cacheKey, true, false,
+ true),
+ multiBlocks[i]);
+ }
+ }
+
+ @Test
+ public void testCacheEvictionThreePriorities() throws Exception {
+
+ long maxSize = 100000;
+ long blockSize = calculateBlockSize(maxSize, 10);
+
+ LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize, false,
+ (int)Math.ceil(1.2*maxSize/blockSize),
+ LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR,
+ LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL,
+ 0.98f, // min
+ 0.99f, // acceptable
+ 0.33f, // single
+ 0.33f, // multi
+ 0.34f, // memory
+ 1.2f, // limit
+ false,
+ 16 * 1024 * 1024,
+ 10,
+ 500,
+ 0.01f);
+
+ CachedItem [] singleBlocks = generateFixedBlocks(5, blockSize, "single");
+ CachedItem [] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
+ CachedItem [] memoryBlocks = generateFixedBlocks(5, blockSize, "memory");
+
+ long expectedCacheSize = cache.heapSize();
+
+ // Add 3 blocks from each priority
+ for(int i=0;i<3;i++) {
+
+ // Just add single blocks
+ cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]);
+ expectedCacheSize += singleBlocks[i].cacheBlockHeapSize();
+
+ // Add and get multi blocks
+ cache.cacheBlock(multiBlocks[i].cacheKey, multiBlocks[i]);
+ expectedCacheSize += multiBlocks[i].cacheBlockHeapSize();
+ cache.getBlock(multiBlocks[i].cacheKey, true, false, true);
+
+ // Add memory blocks as such
+ cache.cacheBlock(memoryBlocks[i].cacheKey, memoryBlocks[i], true);
+ expectedCacheSize += memoryBlocks[i].cacheBlockHeapSize();
+
+ }
+
+ // Do not expect any evictions yet
+ assertEquals(0, cache.getStats().getEvictionCount());
+
+ // Verify cache size
+ assertEquals(expectedCacheSize, cache.heapSize());
+
+ // Insert a single block, oldest single should be evicted
+ cache.cacheBlock(singleBlocks[3].cacheKey, singleBlocks[3]);
+
+ // Single eviction, one thing evicted
+ assertEquals(1, cache.getStats().getEvictionCount());
+ assertEquals(1, cache.getStats().getEvictedCount());
+
+ // Verify oldest single block is the one evicted
+ assertEquals(null, cache.getBlock(singleBlocks[0].cacheKey, true, false,
+ true));
+
+ // Change the oldest remaining single block to a multi
+ cache.getBlock(singleBlocks[1].cacheKey, true, false, true);
+
+ // Insert another single block
+ cache.cacheBlock(singleBlocks[4].cacheKey, singleBlocks[4]);
+
+ // Two evictions, two evicted.
+ assertEquals(2, cache.getStats().getEvictionCount());
+ assertEquals(2, cache.getStats().getEvictedCount());
+
+ // Oldest multi block should be evicted now
+ assertEquals(null, cache.getBlock(multiBlocks[0].cacheKey, true, false,
+ true));
+
+ // Insert another memory block
+ cache.cacheBlock(memoryBlocks[3].cacheKey, memoryBlocks[3], true);
+
+ // Three evictions, three evicted.
+ assertEquals(3, cache.getStats().getEvictionCount());
+ assertEquals(3, cache.getStats().getEvictedCount());
+
+ // Oldest memory block should be evicted now
+ assertEquals(null, cache.getBlock(memoryBlocks[0].cacheKey, true, false,
+ true));
+
+ // Add a block that is twice as big (should force two evictions)
+ CachedItem [] bigBlocks = generateFixedBlocks(3, blockSize*3, "big");
+ cache.cacheBlock(bigBlocks[0].cacheKey, bigBlocks[0]);
+
+ // Four evictions, six evicted (inserted block 3X size, expect +3 evicted)
+ assertEquals(4, cache.getStats().getEvictionCount());
+ assertEquals(6, cache.getStats().getEvictedCount());
+
+ // Expect three remaining singles to be evicted
+ assertEquals(null, cache.getBlock(singleBlocks[2].cacheKey, true, false,
+ true));
+ assertEquals(null, cache.getBlock(singleBlocks[3].cacheKey, true, false,
+ true));
+ assertEquals(null, cache.getBlock(singleBlocks[4].cacheKey, true, false,
+ true));
+
+ // Make the big block a multi block
+ cache.getBlock(bigBlocks[0].cacheKey, true, false, true);
+
+ // Cache another single big block
+ cache.cacheBlock(bigBlocks[1].cacheKey, bigBlocks[1]);
+
+ // Five evictions, nine evicted (3 new)
+ assertEquals(5, cache.getStats().getEvictionCount());
+ assertEquals(9, cache.getStats().getEvictedCount());
+
+ // Expect three remaining multis to be evicted
+ assertEquals(null, cache.getBlock(singleBlocks[1].cacheKey, true, false,
+ true));
+ assertEquals(null, cache.getBlock(multiBlocks[1].cacheKey, true, false,
+ true));
+ assertEquals(null, cache.getBlock(multiBlocks[2].cacheKey, true, false,
+ true));
+
+ // Cache a big memory block
+ cache.cacheBlock(bigBlocks[2].cacheKey, bigBlocks[2], true);
+
+ // Six evictions, twelve evicted (3 new)
+ assertEquals(6, cache.getStats().getEvictionCount());
+ assertEquals(12, cache.getStats().getEvictedCount());
+
+ // Expect three remaining in-memory to be evicted
+ assertEquals(null, cache.getBlock(memoryBlocks[1].cacheKey, true, false,
+ true));
+ assertEquals(null, cache.getBlock(memoryBlocks[2].cacheKey, true, false,
+ true));
+ assertEquals(null, cache.getBlock(memoryBlocks[3].cacheKey, true, false,
+ true));
+ }
+
+ @Test
+ public void testCacheEvictionInMemoryForceMode() throws Exception {
+ long maxSize = 100000;
+ long blockSize = calculateBlockSize(maxSize, 10);
+
+ LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize, false,
+ (int)Math.ceil(1.2*maxSize/blockSize),
+ LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR,
+ LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL,
+ 0.98f, // min
+ 0.99f, // acceptable
+ 0.2f, // single
+ 0.3f, // multi
+ 0.5f, // memory
+ 1.2f, // limit
+ true,
+ 16 * 1024 * 1024,
+ 10,
+ 500,
+ 0.01f);
+
+ CachedItem [] singleBlocks = generateFixedBlocks(10, blockSize, "single");
+ CachedItem [] multiBlocks = generateFixedBlocks(10, blockSize, "multi");
+ CachedItem [] memoryBlocks = generateFixedBlocks(10, blockSize, "memory");
+
+ long expectedCacheSize = cache.heapSize();
+
+ // 0. Add 5 single blocks and 4 multi blocks to make cache full, si:mu:me = 5:4:0
+ for(int i = 0; i < 4; i++) {
+ // Just add single blocks
+ cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]);
+ expectedCacheSize += singleBlocks[i].cacheBlockHeapSize();
+ // Add and get multi blocks
+ cache.cacheBlock(multiBlocks[i].cacheKey, multiBlocks[i]);
+ expectedCacheSize += multiBlocks[i].cacheBlockHeapSize();
+ cache.getBlock(multiBlocks[i].cacheKey, true, false, true);
+ }
+ // 5th single block
+ cache.cacheBlock(singleBlocks[4].cacheKey, singleBlocks[4]);
+ expectedCacheSize += singleBlocks[4].cacheBlockHeapSize();
+ // Do not expect any evictions yet
+ assertEquals(0, cache.getStats().getEvictionCount());
+ // Verify cache size
+ assertEquals(expectedCacheSize, cache.heapSize());
+
+ // 1. Insert a memory block, oldest single should be evicted, si:mu:me = 4:4:1
+ cache.cacheBlock(memoryBlocks[0].cacheKey, memoryBlocks[0], true);
+ // Single eviction, one block evicted
+ assertEquals(1, cache.getStats().getEvictionCount());
+ assertEquals(1, cache.getStats().getEvictedCount());
+ // Verify oldest single block (index = 0) is the one evicted
+ assertEquals(null, cache.getBlock(singleBlocks[0].cacheKey, true, false,
+ true));
+
+ // 2. Insert another memory block, another single evicted, si:mu:me = 3:4:2
+ cache.cacheBlock(memoryBlocks[1].cacheKey, memoryBlocks[1], true);
+ // Two evictions, two evicted.
+ assertEquals(2, cache.getStats().getEvictionCount());
+ assertEquals(2, cache.getStats().getEvictedCount());
+ // Current oldest single block (index = 1) should be evicted now
+ assertEquals(null, cache.getBlock(singleBlocks[1].cacheKey, true, false,
+ true));
+
+ // 3. Insert 4 memory blocks, 2 single and 2 multi evicted, si:mu:me = 1:2:6
+ cache.cacheBlock(memoryBlocks[2].cacheKey, memoryBlocks[2], true);
+ cache.cacheBlock(memoryBlocks[3].cacheKey, memoryBlocks[3], true);
+ cache.cacheBlock(memoryBlocks[4].cacheKey, memoryBlocks[4], true);
+ cache.cacheBlock(memoryBlocks[5].cacheKey, memoryBlocks[5], true);
+ // Three evictions, three evicted.
+ assertEquals(6, cache.getStats().getEvictionCount());
+ assertEquals(6, cache.getStats().getEvictedCount());
+ // two oldest single blocks and two oldest multi blocks evicted
+ assertEquals(null, cache.getBlock(singleBlocks[2].cacheKey, true, false,
+ true));
+ assertEquals(null, cache.getBlock(singleBlocks[3].cacheKey, true, false,
+ true));
+ assertEquals(null, cache.getBlock(multiBlocks[0].cacheKey, true, false,
+ true));
+ assertEquals(null, cache.getBlock(multiBlocks[1].cacheKey, true, false,
+ true));
+
+ // 4. Insert 3 memory blocks, the remaining 1 single and 2 multi evicted
+ // si:mu:me = 0:0:9
+ cache.cacheBlock(memoryBlocks[6].cacheKey, memoryBlocks[6], true);
+ cache.cacheBlock(memoryBlocks[7].cacheKey, memoryBlocks[7], true);
+ cache.cacheBlock(memoryBlocks[8].cacheKey, memoryBlocks[8], true);
+ // Three evictions, three evicted.
+ assertEquals(9, cache.getStats().getEvictionCount());
+ assertEquals(9, cache.getStats().getEvictedCount());
+ // one oldest single block and two oldest multi blocks evicted
+ assertEquals(null, cache.getBlock(singleBlocks[4].cacheKey, true, false,
+ true));
+ assertEquals(null, cache.getBlock(multiBlocks[2].cacheKey, true, false,
+ true));
+ assertEquals(null, cache.getBlock(multiBlocks[3].cacheKey, true, false,
+ true));
+
+ // 5. Insert one memory block, the oldest memory evicted
+ // si:mu:me = 0:0:9
+ cache.cacheBlock(memoryBlocks[9].cacheKey, memoryBlocks[9], true);
+ // one eviction, one evicted.
+ assertEquals(10, cache.getStats().getEvictionCount());
+ assertEquals(10, cache.getStats().getEvictedCount());
+ // oldest memory block evicted
+ assertEquals(null, cache.getBlock(memoryBlocks[0].cacheKey, true, false,
+ true));
+
+ // 6. Insert one new single block, itself evicted immediately since
+ // all blocks in cache are memory-type which have higher priority
+ // si:mu:me = 0:0:9 (no change)
+ cache.cacheBlock(singleBlocks[9].cacheKey, singleBlocks[9]);
+ // one eviction, one evicted.
+ assertEquals(11, cache.getStats().getEvictionCount());
+ assertEquals(11, cache.getStats().getEvictedCount());
+ // the single block just cached now evicted (can't evict memory)
+ assertEquals(null, cache.getBlock(singleBlocks[9].cacheKey, true, false,
+ true));
+ }
+
+ // test scan resistance
+ @Test
+ public void testScanResistance() throws Exception {
+
+ long maxSize = 100000;
+ long blockSize = calculateBlockSize(maxSize, 10);
+
+ LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize, false,
+ (int)Math.ceil(1.2*maxSize/blockSize),
+ LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR,
+ LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL,
+ 0.66f, // min
+ 0.99f, // acceptable
+ 0.33f, // single
+ 0.33f, // multi
+ 0.34f, // memory
+ 1.2f, // limit
+ false,
+ 16 * 1024 * 1024,
+ 10,
+ 500,
+ 0.01f);
+
+ CachedItem [] singleBlocks = generateFixedBlocks(20, blockSize, "single");
+ CachedItem [] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
+
+ // Add 5 multi blocks
+ for (CachedItem block : multiBlocks) {
+ cache.cacheBlock(block.cacheKey, block);
+ cache.getBlock(block.cacheKey, true, false, true);
+ }
+
+ // Add 5 single blocks
+ for(int i=0;i<5;i++) {
+ cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]);
+ }
+
+ // An eviction ran
+ assertEquals(1, cache.getStats().getEvictionCount());
+
+ // To drop down to 2/3 capacity, we'll need to evict 4 blocks
+ assertEquals(4, cache.getStats().getEvictedCount());
+
+ // Should have been taken off equally from single and multi
+ assertEquals(null, cache.getBlock(singleBlocks[0].cacheKey, true, false,
+ true));
+ assertEquals(null, cache.getBlock(singleBlocks[1].cacheKey, true, false,
+ true));
+ assertEquals(null, cache.getBlock(multiBlocks[0].cacheKey, true, false,
+ true));
+ assertEquals(null, cache.getBlock(multiBlocks[1].cacheKey, true, false,
+ true));
+
+ // Let's keep "scanning" by adding single blocks. From here on we only
+ // expect evictions from the single bucket.
+
+ // Every time we reach 10 total blocks (every 4 inserts) we get 4 single
+ // blocks evicted. Inserting 13 blocks should yield 3 more evictions and
+ // 12 more evicted.
+
+ for(int i=5;i<18;i++) {
+ cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]);
+ }
+
+ // 4 total evictions, 16 total evicted
+ assertEquals(4, cache.getStats().getEvictionCount());
+ assertEquals(16, cache.getStats().getEvictedCount());
+
+ // Should now have 7 total blocks
+ assertEquals(7, cache.getBlockCount());
+
+ }
+
+ @Test
+ public void testMaxBlockSize() throws Exception {
+ long maxSize = 100000;
+ long blockSize = calculateBlockSize(maxSize, 10);
+
+ LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize, false,
+ (int)Math.ceil(1.2*maxSize/blockSize),
+ LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR,
+ LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL,
+ 0.66f, // min
+ 0.99f, // acceptable
+ 0.33f, // single
+ 0.33f, // multi
+ 0.34f, // memory
+ 1.2f, // limit
+ false,
+ 1024,
+ 10,
+ 500,
+ 0.01f);
+
+ CachedItem [] tooLong = generateFixedBlocks(10, 1024+5, "long");
+ CachedItem [] small = generateFixedBlocks(15, 600, "small");
+
+
+ for (CachedItem i:tooLong) {
+ cache.cacheBlock(i.cacheKey, i);
+ }
+ for (CachedItem i:small) {
+ cache.cacheBlock(i.cacheKey, i);
+ }
+ assertEquals(15,cache.getBlockCount());
+ for (CachedItem i:small) {
+ assertNotNull(cache.getBlock(i.cacheKey, true, false, false));
+ }
+ for (CachedItem i:tooLong) {
+ assertNull(cache.getBlock(i.cacheKey, true, false, false));
+ }
+
+ assertEquals(10, cache.getStats().getFailedInserts());
+ }
+
+ // test setMaxSize
+ @Test
+ public void testResizeBlockCache() throws Exception {
+
+ long maxSize = 300000;
+ long blockSize = calculateBlockSize(maxSize, 31);
+
+ LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize, false,
+ (int)Math.ceil(1.2*maxSize/blockSize),
+ LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR,
+ LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL,
+ 0.98f, // min
+ 0.99f, // acceptable
+ 0.33f, // single
+ 0.33f, // multi
+ 0.34f, // memory
+ 1.2f, // limit
+ false,
+ 16 * 1024 * 1024,
+ 10,
+ 500,
+ 0.01f);
+
+ CachedItem [] singleBlocks = generateFixedBlocks(10, blockSize, "single");
+ CachedItem [] multiBlocks = generateFixedBlocks(10, blockSize, "multi");
+ CachedItem [] memoryBlocks = generateFixedBlocks(10, blockSize, "memory");
+
+ // Add all blocks from all priorities
+ for(int i=0;i<10;i++) {
+
+ // Just add single blocks
+ cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]);
+
+ // Add and get multi blocks
+ cache.cacheBlock(multiBlocks[i].cacheKey, multiBlocks[i]);
+ cache.getBlock(multiBlocks[i].cacheKey, true, false, true);
+
+ // Add memory blocks as such
+ cache.cacheBlock(memoryBlocks[i].cacheKey, memoryBlocks[i], true);
+ }
+
+ // Do not expect any evictions yet
+ assertEquals(0, cache.getStats().getEvictionCount());
+
+ // Resize to half capacity plus an extra block (otherwise we evict an extra)
+ cache.setMaxSize((long)(maxSize * 0.5f));
+
+ // Should have run a single eviction
+ assertEquals(1, cache.getStats().getEvictionCount());
+
+ // And we expect 1/2 of the blocks to be evicted
+ assertEquals(15, cache.getStats().getEvictedCount());
+
+ // And the oldest 5 blocks from each category should be gone
+ for(int i=0;i<5;i++) {
+ assertEquals(null, cache.getBlock(singleBlocks[i].cacheKey, true,
+ false, true));
+ assertEquals(null, cache.getBlock(multiBlocks[i].cacheKey, true,
+ false, true));
+ assertEquals(null, cache.getBlock(memoryBlocks[i].cacheKey, true,
+ false, true));
+ }
+
+ // And the newest 5 blocks should still be accessible
+ for(int i=5;i<10;i++) {
+ assertEquals(singleBlocks[i], cache.getBlock(singleBlocks[i].cacheKey, true,
+ false, true));
+ assertEquals(multiBlocks[i], cache.getBlock(multiBlocks[i].cacheKey, true,
+ false, true));
+ assertEquals(memoryBlocks[i], cache.getBlock(memoryBlocks[i].cacheKey, true,
+ false, true));
+ }
+ }
+
+ // test metricsPastNPeriods
+ @Test
+ public void testPastNPeriodsMetrics() throws Exception {
+ double delta = 0.01;
+
+ // 3 total periods
+ CacheStats stats = new CacheStats("test", 3);
+
+ // No accesses, should be 0
+ stats.rollMetricsPeriod();
+ assertEquals(0.0, stats.getHitRatioPastNPeriods(), delta);
+ assertEquals(0.0, stats.getHitCachingRatioPastNPeriods(), delta);
+
+ // period 1, 1 hit caching, 1 hit non-caching, 2 miss non-caching
+ // should be (2/4)=0.5 and (1/1)=1
+ stats.hit(false, true, BlockType.DATA);
+ stats.hit(true, true, BlockType.DATA);
+ stats.miss(false, false, BlockType.DATA);
+ stats.miss(false, false, BlockType.DATA);
+ stats.rollMetricsPeriod();
+ assertEquals(0.5, stats.getHitRatioPastNPeriods(), delta);
+ assertEquals(1.0, stats.getHitCachingRatioPastNPeriods(), delta);
+
+ // period 2, 1 miss caching, 3 miss non-caching
+ // should be (2/8)=0.25 and (1/2)=0.5
+ stats.miss(true, false, BlockType.DATA);
+ stats.miss(false, false, BlockType.DATA);
+ stats.miss(false, false, BlockType.DATA);
+ stats.miss(false, false, BlockType.DATA);
+ stats.rollMetricsPeriod();
+ assertEquals(0.25, stats.getHitRatioPastNPeriods(), delta);
+ assertEquals(0.5, stats.getHitCachingRatioPastNPeriods(), delta);
+
+ // period 3, 2 hits of each type
+ // should be (6/12)=0.5 and (3/4)=0.75
+ stats.hit(false, true, BlockType.DATA);
+ stats.hit(true, true, BlockType.DATA);
+ stats.hit(false, true, BlockType.DATA);
+ stats.hit(true, true, BlockType.DATA);
+ stats.rollMetricsPeriod();
+ assertEquals(0.5, stats.getHitRatioPastNPeriods(), delta);
+ assertEquals(0.75, stats.getHitCachingRatioPastNPeriods(), delta);
+
+ // period 4, evict period 1, two caching misses
+ // should be (4/10)=0.4 and (2/5)=0.4
+ stats.miss(true, false, BlockType.DATA);
+ stats.miss(true, false, BlockType.DATA);
+ stats.rollMetricsPeriod();
+ assertEquals(0.4, stats.getHitRatioPastNPeriods(), delta);
+ assertEquals(0.4, stats.getHitCachingRatioPastNPeriods(), delta);
+
+ // period 5, evict period 2, 2 caching misses, 2 non-caching hit
+ // should be (6/10)=0.6 and (2/6)=1/3
+ stats.miss(true, false, BlockType.DATA);
+ stats.miss(true, false, BlockType.DATA);
+ stats.hit(false, true, BlockType.DATA);
+ stats.hit(false, true, BlockType.DATA);
+ stats.rollMetricsPeriod();
+ assertEquals(0.6, stats.getHitRatioPastNPeriods(), delta);
+ assertEquals((double)1/3, stats.getHitCachingRatioPastNPeriods(), delta);
+
+ // period 6, evict period 3
+ // should be (2/6)=1/3 and (0/4)=0
+ stats.rollMetricsPeriod();
+ assertEquals((double)1/3, stats.getHitRatioPastNPeriods(), delta);
+ assertEquals(0.0, stats.getHitCachingRatioPastNPeriods(), delta);
+
+ // period 7, evict period 4
+ // should be (2/4)=0.5 and (0/2)=0
+ stats.rollMetricsPeriod();
+ assertEquals(0.5, stats.getHitRatioPastNPeriods(), delta);
+ assertEquals(0.0, stats.getHitCachingRatioPastNPeriods(), delta);
+
+ // period 8, evict period 5
+ // should be 0 and 0
+ stats.rollMetricsPeriod();
+ assertEquals(0.0, stats.getHitRatioPastNPeriods(), delta);
+ assertEquals(0.0, stats.getHitCachingRatioPastNPeriods(), delta);
+
+ // period 9, one of each
+ // should be (2/4)=0.5 and (1/2)=0.5
+ stats.miss(true, false, BlockType.DATA);
+ stats.miss(false, false, BlockType.DATA);
+ stats.hit(true, true, BlockType.DATA);
+ stats.hit(false, true, BlockType.DATA);
+ stats.rollMetricsPeriod();
+ assertEquals(0.5, stats.getHitRatioPastNPeriods(), delta);
+ assertEquals(0.5, stats.getHitCachingRatioPastNPeriods(), delta);
+ }
+
+ @Test
+ public void testCacheBlockNextBlockMetadataMissing() {
+ long maxSize = 100000;
+ long blockSize = calculateBlockSize(maxSize, 10);
+ int size = 100;
+ int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
+ byte[] byteArr = new byte[length];
+ ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
+ HFileContext meta = new HFileContextBuilder().build();
+ HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size,
+ -1, ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, 52,
+ -1, meta, HEAP);
+ HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size,
+ -1, ByteBuff.wrap(buf), HFileBlock.FILL_HEADER, -1, -1,
+ -1, meta, HEAP);
+
+ LruAdaptiveBlockCache cache = new LruAdaptiveBlockCache(maxSize, blockSize, false,
+ (int)Math.ceil(1.2*maxSize/blockSize),
+ LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR,
+ LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL,
+ 0.66f, // min
+ 0.99f, // acceptable
+ 0.33f, // single
+ 0.33f, // multi
+ 0.34f, // memory
+ 1.2f, // limit
+ false,
+ 1024,
+ 10,
+ 500,
+ 0.01f);
+
+ BlockCacheKey key = new BlockCacheKey("key1", 0);
+ ByteBuffer actualBuffer = ByteBuffer.allocate(length);
+ ByteBuffer block1Buffer = ByteBuffer.allocate(length);
+ ByteBuffer block2Buffer = ByteBuffer.allocate(length);
+ blockWithNextBlockMetadata.serialize(block1Buffer, true);
+ blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
+
+ //Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
+ CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
+ block1Buffer);
+
+ //Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
+ CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
+ block1Buffer);
+
+ //Clear and add blockWithoutNextBlockMetadata
+ cache.clearCache();
+ assertNull(cache.getBlock(key, false, false, false));
+ CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
+ block2Buffer);
+
+ //Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
+ CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
+ block1Buffer);
+ }
+
+ private CachedItem [] generateFixedBlocks(int numBlocks, int size, String pfx) {
+ CachedItem [] blocks = new CachedItem[numBlocks];
+ for(int i=0;i getDeserializer() {
+ return null;
+ }
+
+ @Override
+ public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) {
+ }
+
+ @Override
+ public BlockType getBlockType() {
+ return BlockType.DATA;
+ }
+ }
+
+ static void testMultiThreadGetAndEvictBlockInternal(BlockCache cache) throws Exception {
+ int size = 100;
+ int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
+ byte[] byteArr = new byte[length];
+ HFileContext meta = new HFileContextBuilder().build();
+ BlockCacheKey key = new BlockCacheKey("key1", 0);
+ HFileBlock blk = new HFileBlock(BlockType.DATA, size, size, -1,
+ ByteBuff.wrap(ByteBuffer.wrap(byteArr, 0, size)), HFileBlock.FILL_HEADER, -1,
+ 52, -1, meta,
+ HEAP);
+ AtomicBoolean err1 = new AtomicBoolean(false);
+ Thread t1 = new Thread(() -> {
+ for (int i = 0; i < 10000 && !err1.get(); i++) {
+ try {
+ cache.getBlock(key, false, false, true);
+ } catch (Exception e) {
+ err1.set(true);
+ LOG.info("Cache block or get block failure: ", e);
+ }
+ }
+ });
+
+ AtomicBoolean err2 = new AtomicBoolean(false);
+ Thread t2 = new Thread(() -> {
+ for (int i = 0; i < 10000 && !err2.get(); i++) {
+ try {
+ cache.evictBlock(key);
+ } catch (Exception e) {
+ err2.set(true);
+ LOG.info("Evict block failure: ", e);
+ }
+ }
+ });
+
+ AtomicBoolean err3 = new AtomicBoolean(false);
+ Thread t3 = new Thread(() -> {
+ for (int i = 0; i < 10000 && !err3.get(); i++) {
+ try {
+ cache.cacheBlock(key, blk);
+ } catch (Exception e) {
+ err3.set(true);
+ LOG.info("Cache block failure: ", e);
+ }
+ }
+ });
+ t1.start();
+ t2.start();
+ t3.start();
+ t1.join();
+ t2.join();
+ t3.join();
+ Assert.assertFalse(err1.get());
+ Assert.assertFalse(err2.get());
+ Assert.assertFalse(err3.get());
+ }
+
+ @Test
+ public void testMultiThreadGetAndEvictBlock() throws Exception {
+ long maxSize = 100000;
+ long blockSize = calculateBlockSize(maxSize, 10);
+ LruAdaptiveBlockCache cache =
+ new LruAdaptiveBlockCache(maxSize, blockSize, false,
+ (int) Math.ceil(1.2 * maxSize / blockSize),
+ LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR, LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL,
+ 0.66f, // min
+ 0.99f, // acceptable
+ 0.33f, // single
+ 0.33f, // multi
+ 0.34f, // memory
+ 1.2f, // limit
+ false, 1024,
+ 10,
+ 500,
+ 0.01f);
+ testMultiThreadGetAndEvictBlockInternal(cache);
+ }
+
+ public void testSkipCacheDataBlocksInteral(int heavyEvictionCountLimit) throws Exception {
+ long maxSize = 100000000;
+ int numBlocks = 100000;
+ final long blockSize = calculateBlockSizeDefault(maxSize, numBlocks);
+ assertTrue("calculateBlockSize appears broken.",
+ blockSize * numBlocks <= maxSize);
+
+ final LruAdaptiveBlockCache cache =
+ new LruAdaptiveBlockCache(maxSize, blockSize, true,
+ (int) Math.ceil(1.2 * maxSize / blockSize),
+ LruAdaptiveBlockCache.DEFAULT_LOAD_FACTOR, LruAdaptiveBlockCache.DEFAULT_CONCURRENCY_LEVEL,
+ 0.5f, // min
+ 0.99f, // acceptable
+ 0.33f, // single
+ 0.33f, // multi
+ 0.34f, // memory
+ 1.2f, // limit
+ false,
+ maxSize,
+ heavyEvictionCountLimit,
+ 200,
+ 0.01f);
+
+ EvictionThread evictionThread = cache.getEvictionThread();
+ assertNotNull(evictionThread);
+ while (!evictionThread.isEnteringRun()) {
+ Thread.sleep(1);
+ }
+
+ final String hfileName = "hfile";
+ for (int blockIndex = 0; blockIndex <= numBlocks * 3000; ++blockIndex) {
+ CachedItem block = new CachedItem(hfileName, (int) blockSize, blockIndex);
+ cache.cacheBlock(block.cacheKey, block, false);
+ if (cache.getCacheDataBlockPercent() < 70) {
+ // enough for test
+ break;
+ }
+ }
+
+ evictionThread.evict();
+ Thread.sleep(100);
+
+ if (heavyEvictionCountLimit == 0) {
+ // Check if all offset (last two digits) of cached blocks less than the percent.
+ // It means some of blocks haven't put into BlockCache
+ assertTrue(cache.getCacheDataBlockPercent() < 90);
+ for (BlockCacheKey key : cache.getMapForTests().keySet()) {
+ assertTrue(!(key.getOffset() % 100 > 90));
+ }
+ } else {
+ // Check that auto-scaling is not working (all blocks in BlockCache)
+ assertTrue(cache.getCacheDataBlockPercent() == 100);
+ int counter = 0;
+ for (BlockCacheKey key : cache.getMapForTests().keySet()) {
+ if (key.getOffset() % 100 > 90) {
+ counter++;
+ }
+ }
+ assertTrue(counter > 1000);
+ }
+ evictionThread.shutdown();
+ }
+
+ @Test
+ public void testSkipCacheDataBlocks() throws Exception {
+ // Check that auto-scaling will work right after start
+ testSkipCacheDataBlocksInteral(0);
+ // Check that auto-scaling will not work right after start
+ // (have to finished before auto-scaling)
+ testSkipCacheDataBlocksInteral(100);
+ }
+}