Skip to content

Commit

Permalink
Add limit on maximum readRequest length in parallel warmup
Browse files Browse the repository at this point in the history
  • Loading branch information
Shubham Tagra committed Oct 20, 2020
1 parent 46c8440 commit 5b50fc9
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public FileDownloadRequestChain(BookKeeper bookKeeper,
long lastModified,
int generationNumber)
{
super(generationNumber);
super(generationNumber, getBlockAlignedMaxChunkSize(conf));
this.bookKeeper = bookKeeper;
this.remoteFileSystem = remoteFileSystem;
this.localFile = localfile;
Expand All @@ -76,6 +76,13 @@ public FileDownloadRequestChain(BookKeeper bookKeeper,
this.maxRemoteReadBufferSize = CacheConfig.getDataTransferBufferSize(conf);
}

private static long getBlockAlignedMaxChunkSize(Configuration conf)
{
long maxReadRequestLength = CacheConfig.getParallelWarmupMaxChunkSize(conf);
long blockSize = CacheConfig.getBlockSize(conf);
return (maxReadRequestLength / blockSize) * blockSize;
}

public String getRemotePath()
{
return this.remotePath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.qubole.rubix.common.utils.DataGen;
import com.qubole.rubix.common.utils.TestUtil;
import com.qubole.rubix.core.ClusterManagerInitilizationException;
import com.qubole.rubix.core.ReadRequestChain;
import com.qubole.rubix.spi.CacheConfig;
import com.qubole.rubix.spi.CacheUtil;
import com.qubole.rubix.spi.ClusterType;
Expand Down Expand Up @@ -121,6 +122,7 @@ public void testGetFileDownloadRequestChains() throws Exception
context.addDownloadRange(500, 800);

final List<FileDownloadRequestChain> requestChains = downloader.getFileDownloadRequestChains(contextMap);
requestChains.stream().forEach(ReadRequestChain::lock);

assertTrue(requestChains.size() == 2,
"Wrong Number of Request Chains. Expected = 2 Got = " + requestChains.size());
Expand Down
47 changes: 47 additions & 0 deletions rubix-core/src/main/java/com/qubole/rubix/core/ReadRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
package com.qubole.rubix.core;

import java.util.Arrays;
import java.util.Objects;

import static com.google.common.base.MoreObjects.toStringHelper;

/**
* Created by stagra on 4/1/16.
Expand Down Expand Up @@ -161,4 +164,48 @@ public ReadRequest clone(boolean createNewBuffer)

return otherRequest;
}

@Override
public boolean equals (Object o)
{
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

ReadRequest that = (ReadRequest) o;

return backendReadStart == that.backendReadStart &&
backendReadEnd == that.backendReadEnd &&
actualReadStart == that.actualReadStart &&
actualReadEnd == that.actualReadEnd &&
destBufferOffset == that.destBufferOffset &&
backendFileSize == that.backendFileSize &&
Arrays.equals(destBuffer, that.destBuffer);
}

@Override
public int hashCode ()
{
int result = Objects.hash(backendReadStart, backendReadEnd, actualReadStart, actualReadEnd, destBufferOffset, backendFileSize);
result = 31 * result + Arrays.hashCode(destBuffer);
return result;
}

@Override
public String toString ()
{
return toStringHelper(this)
.add("backendReadStart", backendReadStart)
.add("backendReadEnd", backendReadEnd)
.add("actualReadStart", actualReadStart)
.add("actualReadEnd", actualReadEnd)
.add("destBuffer", destBuffer)
.add("destBufferOffset", destBufferOffset)
.add("backendFileSize", backendFileSize)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package com.qubole.rubix.core;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -38,6 +39,7 @@ public abstract class ReadRequestChain implements Callable<Long>
ReadRequest lastRequest;
boolean isLocked;
boolean cancelled;
private long maxReadRequestSize;
protected final int generationNumber;

protected String threadName;
Expand All @@ -46,19 +48,32 @@ public abstract class ReadRequestChain implements Callable<Long>
private static final Log log = LogFactory.getLog(ReadRequestChain.class);

public ReadRequestChain(int generationNumber)
{
this(generationNumber, Long.MAX_VALUE);
}

// Caller responsible to keep maxReadRequestSize block aligned
public ReadRequestChain(int generationNumber, long maxReadRequestSize)
{
super();
this.generationNumber = generationNumber;
this.maxReadRequestSize = maxReadRequestSize;
this.threadName = Thread.currentThread().getName();
}

@VisibleForTesting
public void setMaxReadRequestSize(long maxReadRequestSize)
{
this.maxReadRequestSize = maxReadRequestSize;
}

// Should be added in forward seek fashion for better performance
public void addReadRequest(ReadRequest readRequest)
{
checkState(!isLocked, "Adding request to a locked chain");
log.debug(String.format("Request to add ReadRequest: [%d, %d, %d, %d, %d]", readRequest.getBackendReadStart(), readRequest.getBackendReadEnd(), readRequest.getActualReadStart(), readRequest.getActualReadEnd(), readRequest.getDestBufferOffset()));
if (lastRequest == null) {
addRequest(readRequest);
lastRequest = readRequest;
}
else {
// since one chain contains request of same buffer, we can collate
Expand All @@ -69,22 +84,52 @@ public void addReadRequest(ReadRequest readRequest)
log.debug(String.format("Updated last to: [%d, %d, %d, %d, %d]", lastRequest.getBackendReadStart(), lastRequest.getBackendReadEnd(), lastRequest.getActualReadStart(), lastRequest.getActualReadEnd(), lastRequest.getDestBufferOffset()));
}
else {
addRequest(readRequest);
addRequest(lastRequest);
lastRequest = readRequest;
}
}
requests++;
}

private void addRequest(ReadRequest readRequest)
{
long backendReadLength = readRequest.getBackendReadLength();
if (backendReadLength <= maxReadRequestSize) {
addRequestToQueue(readRequest);
return;
}

long backendReadStart = 0;
long actualReadStart = readRequest.getActualReadStart();
while (backendReadStart < backendReadLength) {
long backendReadEnd = backendReadStart + Math.min(maxReadRequestSize, backendReadLength - backendReadStart);
ReadRequest chunkedRequest = new ReadRequest(
backendReadStart,
backendReadEnd,
actualReadStart,
Math.min(readRequest.getActualReadEnd(), backendReadEnd),
readRequest.getDestBuffer(),
readRequest.getDestBufferOffset() + Math.toIntExact(actualReadStart - readRequest.getActualReadStart()),
readRequest.getBackendFileSize());
addRequestToQueue(chunkedRequest);
backendReadStart = backendReadEnd;
actualReadStart = backendReadStart;
}
}

private void addRequestToQueue(ReadRequest readRequest)
{
readRequests.add(readRequest);
lastRequest = readRequest;
log.debug(String.format("Added ReadRequest: [%d, %d, %d, %d, %d]", readRequest.getBackendReadStart(), readRequest.getBackendReadEnd(), readRequest.getActualReadStart(), readRequest.getActualReadEnd(), readRequest.getDestBufferOffset()));
}

public void lock()
{
isLocked = true;
if (lastRequest != null) {
addRequest(lastRequest);
lastRequest = null;
}
}

public List<ReadRequest> getReadRequests()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.util.List;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

/**
Expand All @@ -31,6 +32,7 @@ public void testConsequtiveRequest()
CachedReadRequestChain chain = new CachedReadRequestChain();
chain.addReadRequest(rr1);
chain.addReadRequest(rr2);
chain.lock();

List<ReadRequest> finalRequests = chain.getReadRequests();

Expand All @@ -42,4 +44,38 @@ public void testConsequtiveRequest()
assertTrue(rr.getActualReadStart() == 0, "Wrong actual read start");
assertTrue(rr.getActualReadEnd() == 2048, "Wrong actual read end");
}

@Test
public void testMaxReadRequestSize()
{
ReadRequest rr1 = new ReadRequest(0, 1024, 40, 1024, null, 0, 2500);
ReadRequest rr2 = new ReadRequest(1024, 1280, 1024, 1280, null, 0, 2500);
ReadRequest rr3 = new ReadRequest(1280, 1792, 1280, 1792, null, 0, 2500);
ReadRequest rr4 = new ReadRequest(1792, 2048, 1792, 2000, null, 0, 2500);
ReadRequest rr5 = new ReadRequest(2100, 2200, 2100, 2200, null, 0, 2500);

CachedReadRequestChain chain = new CachedReadRequestChain();
chain.setMaxReadRequestSize(512);
chain.addReadRequest(rr1);
chain.addReadRequest(rr2);
chain.addReadRequest(rr3);
chain.addReadRequest(rr4);
chain.addReadRequest(rr5);
chain.lock();

// Expected chains
ReadRequest[] expectedReadRequests = {
new ReadRequest(0, 512, 40, 512, null, 0, 2500),
new ReadRequest(512, 1024, 512, 1024, null, 472, 2500),
new ReadRequest(1024, 1536, 1024, 1536, null, 984, 2500),
new ReadRequest(1536, 2048, 1536, 2000, null, 1496, 2500),
new ReadRequest(2100, 2200, 2100, 2200, null, 0, 2500),
};

int idx = 0;
for (ReadRequest expected : expectedReadRequests) {
assertEquals(chain.getReadRequests().get(idx), expected, String.format("Expected=%s and actual=%s", expected, chain.getReadRequests().get(idx)));
idx++;
}
}
}
12 changes: 12 additions & 0 deletions rubix-spi/src/main/java/com/qubole/rubix/spi/CacheConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public class CacheConfig
private static final String KEY_DUMMY_MODE = "rubix.cache.dummy.mode";
private static final String KEY_EMBEDDED_MODE = "rubix.cluster.embedded.mode";
private static final String KEY_HEARTBEAT_ENABLED = "rubix.cluster.heartbeat.enabled";
private static final String KEY_PARALLEL_WARMUP_MAX_CHUNK_SIZE = "rubix.cache.parallel.warmup.max_chunk_size";

// Internal Configurations used in RubiX
private static final String KEY_YARN_RESOURCEMANAGER_ADDRESS = "yarn.resourcemanager.address";
Expand Down Expand Up @@ -183,6 +184,7 @@ public class CacheConfig
private static final boolean DEFAULT_EMBEDDED_MODE = false;
public static final String DEFAULT_RUBIX_SITE_LOCATION = "/usr/lib/rubix/etc/rubix-site.xml";
private static final boolean DEFAULT_HEARTBEAT_ENABLED = true;
private static final long DEFAULT_PARALLEL_WARMUP_MAX_CHUNK_SIZE = MEGABYTES.toBytes(100);

private CacheConfig()
{
Expand Down Expand Up @@ -576,6 +578,11 @@ public static String getCurrentNodeHostName(Configuration conf)
return conf.get(KEY_RUBIX_CURRENT_NODE_HOSTNAME, null);
}

public static long getParallelWarmupMaxChunkSize (Configuration conf)
{
return conf.getLong(KEY_PARALLEL_WARMUP_MAX_CHUNK_SIZE, DEFAULT_PARALLEL_WARMUP_MAX_CHUNK_SIZE);
}

public static void setRubixConfigApplied(Configuration conf, boolean value)
{
conf.setBoolean(KEY_RUBIX_SITE_CONFIG_APPLIED, value);
Expand Down Expand Up @@ -880,4 +887,9 @@ public static void setMaxCacheSizeInMB(Configuration conf, long size)
{
conf.setLong(KEY_MAX_CACHE_SIZE_IN_MB, size);
}

public static void setParallelWarmupMaxChunkSize(Configuration conf, long size)
{
conf.setLong(KEY_PARALLEL_WARMUP_MAX_CHUNK_SIZE, size);
}
}

0 comments on commit 5b50fc9

Please sign in to comment.