Skip to content

Commit

Permalink
consolidate getting table splits in the code
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-turner committed Nov 17, 2024
1 parent 2e19341 commit 50ab83b
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

Expand All @@ -43,7 +42,6 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;

public class BulkBatchWriter implements BatchWriter {

Expand All @@ -61,21 +59,13 @@ public class BulkBatchWriter implements BatchWriter {
private boolean closed = false;

public BulkBatchWriter(AccumuloClient client, String tableName, FileSystem fileSystem,
Path workPath, long memLimit) {
Path workPath, long memLimit, Supplier<SortedSet<Text>> splitSupplier) {
this.client = client;
this.tableName = tableName;
this.fileSystem = fileSystem;
this.workPath = workPath;
this.memLimit = memLimit;
this.splitSupplier = Suppliers.memoizeWithExpiration(() -> {
try {
var splits = client.tableOperations().listSplits(tableName);
return new TreeSet<>(splits);
} catch (Exception e) {
throw new IllegalStateException(e);
}

}, 10, TimeUnit.MINUTES);
this.splitSupplier = splitSupplier;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public class ContinuousIngest {
private static int pauseMax;

public interface RandomGeneratorFactory extends Supplier<LongSupplier> {
static RandomGeneratorFactory create(ContinuousEnv env, AccumuloClient client, Random random) {
static RandomGeneratorFactory create(ContinuousEnv env, AccumuloClient client,
Supplier<SortedSet<Text>> splitSupplier, Random random) {
final long rowMin = env.getRowMin();
final long rowMax = env.getRowMax();
Properties testProps = env.getTestProperties();
Expand All @@ -78,15 +79,6 @@ static RandomGeneratorFactory create(ContinuousEnv env, AccumuloClient client, R
return new MinMaxRandomGeneratorFactory(rowMin, rowMax, random);
} else {
var tableName = env.getAccumuloTableName();
Supplier<SortedSet<Text>> splitSupplier = Suppliers.memoizeWithExpiration(() -> {
try {
var splits = client.tableOperations().listSplits(tableName);
return new TreeSet<>(splits);
} catch (Exception e) {
throw new IllegalStateException(e);
}

}, 10, TimeUnit.MINUTES);
return new MaxTabletsRandomGeneratorFactory(rowMin, rowMax, maxTablets, splitSupplier,
random);
}
Expand Down Expand Up @@ -134,7 +126,7 @@ public MaxTabletsRandomGeneratorFactory(long minRow, long maxRow, int maxTablets
public LongSupplier get() {
var splits = splitSupplier.get();
if (splits.size() < maxTablets) {
// There are less tablets so generate within the tablet range
// There are less tablets so generate within the entire range
return new MinMaxRandomGeneratorFactory(minRow, maxRow, random).get();
} else {
long prev = minRow;
Expand Down Expand Up @@ -178,7 +170,8 @@ public LongSupplier get() {
public interface BatchWriterFactory {
BatchWriter create(String tableName) throws TableNotFoundException;

static BatchWriterFactory create(AccumuloClient client, ContinuousEnv env) {
static BatchWriterFactory create(AccumuloClient client, ContinuousEnv env,
Supplier<SortedSet<Text>> splitSupplier) {
Properties testProps = env.getTestProperties();
final String bulkWorkDir = testProps.getProperty(TestProps.CI_INGEST_BULK_WORK_DIR);
if (bulkWorkDir == null || bulkWorkDir.isBlank()) {
Expand All @@ -189,7 +182,8 @@ static BatchWriterFactory create(AccumuloClient client, ContinuousEnv env) {
var workDir = new Path(bulkWorkDir);
var filesystem = workDir.getFileSystem(conf);
var memLimit = Long.parseLong(testProps.getProperty(TestProps.CI_INGEST_BULK_MEM_LIMIT));
return tableName -> new BulkBatchWriter(client, tableName, filesystem, workDir, memLimit);
return tableName -> new BulkBatchWriter(client, tableName, filesystem, workDir, memLimit,
splitSupplier);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down Expand Up @@ -239,6 +233,20 @@ private static void pauseCheck(Random rand) throws InterruptedException {
}
}

static Supplier<SortedSet<Text>> createSplitSupplier(AccumuloClient client, String tableName) {

Supplier<SortedSet<Text>> splitSupplier = Suppliers.memoizeWithExpiration(() -> {
try {
var splits = client.tableOperations().listSplits(tableName);
return new TreeSet<>(splits);
} catch (Exception e) {
throw new IllegalStateException(e);
}

}, 10, TimeUnit.MINUTES);
return splitSupplier;
}

public static void main(String[] args) throws Exception {

try (ContinuousEnv env = new ContinuousEnv(args)) {
Expand All @@ -255,8 +263,9 @@ public static void main(String[] args) throws Exception {
final boolean checksum =
Boolean.parseBoolean(testProps.getProperty(TestProps.CI_INGEST_CHECKSUM));

var randomFactory = RandomGeneratorFactory.create(env, client, random);
var batchWriterFactory = BatchWriterFactory.create(client, env);
var splitSupplier = createSplitSupplier(client, tableName);
var randomFactory = RandomGeneratorFactory.create(env, client, splitSupplier, random);
var batchWriterFactory = BatchWriterFactory.create(client, env, splitSupplier);
doIngest(client, randomFactory, batchWriterFactory, tableName, testProps, maxColF, maxColQ,
numEntries, checksum, random);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@ public static void main(String[] args) throws Exception {
Map.of());

log.info("Ingesting {} entries into first table, {}.", initialData, firstTable);
var randomFactory = RandomGeneratorFactory.create(env, client, random);
var batchWriterFactory = ContinuousIngest.BatchWriterFactory.create(client, env);
var splitSupplier = ContinuousIngest.createSplitSupplier(client, firstTable);
var randomFactory = RandomGeneratorFactory.create(env, client, splitSupplier, random);
var batchWriterFactory =
ContinuousIngest.BatchWriterFactory.create(client, env, splitSupplier);
ContinuousIngest.doIngest(client, randomFactory, batchWriterFactory, firstTable, testProps,
maxColF, maxColQ, initialData, false, random);

Expand Down

0 comments on commit 50ab83b

Please sign in to comment.