Skip to content

Commit

Permalink
Add concurrent split test
Browse files Browse the repository at this point in the history
  • Loading branch information
cshannon committed Jan 26, 2025
1 parent 2f2925d commit 79763bf
Showing 1 changed file with 68 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map.Entry;
Expand All @@ -32,6 +33,9 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -54,6 +58,8 @@
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Test;

import com.google.common.collect.ImmutableMap;

public class AddSplitIT extends AccumuloClusterHarness {

@Override
Expand Down Expand Up @@ -260,6 +266,68 @@ public void addSplitIsMergeableTest() throws Exception {
}
}

@Test
public void concurrentAddSplitTest() throws Exception {
var threads = 10;
var service = Executors.newFixedThreadPool(threads);
var latch = new CountDownLatch(threads);

String tableName = getUniqueNames(1)[0];
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
c.tableOperations().create(tableName);

// Create a map to hold all splits to verify later
SortedMap<Text,TabletMergeability> allSplits =
Collections.synchronizedNavigableMap(new TreeMap<>());
var commonBuilder = ImmutableMap.<Text,TabletMergeability>builder();
for (int i = 0; i < 50; i++) {
commonBuilder.put(new Text(String.format("%09d", i)),
TabletMergeability.after(Duration.ofHours(1 + i)));
}

// create 50 splits that will be added to all threads
var commonSplits = commonBuilder.build();
allSplits.putAll(commonSplits);

// Spin up 10 threads and concurrently submit all 50 existing splits
// as well as 50 unique splits, this should create a collision with fate
// and cause retries
for (int i = 1; i <= threads; i++) {
var start = i * 100;
service.execute(() -> {
// add the 50 common splits
SortedMap<Text,TabletMergeability> splits = new TreeMap<>(commonSplits);
// create 50 unique splits
for (int j = start; j < start + 50; j++) {
splits.put(new Text(String.format("%09d", j)),
TabletMergeability.after(Duration.ofHours(1 + j)));
}
// make sure all splits are captured
allSplits.putAll(splits);
// Wait for all 10 threads to be ready before calling putSplits()
// to increase the chance of collisions
latch.countDown();
try {
latch.await();
c.tableOperations().putSplits(tableName, splits);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

// Wait for all 10 threads to finish
service.shutdown();
assertTrue(service.awaitTermination(2, TimeUnit.MINUTES));

// Verify we have 550 splits and then all splits are correctly set
assertEquals(50 + (threads * 50), allSplits.size());
TableId id = TableId.of(c.tableOperations().tableIdMap().get(tableName));
verifySplits(id, allSplits);
verifySplitsWithApi(c, tableName, allSplits);
}
}

// Checks that TabletMergeability in metadata matches split settings in the map
private void verifySplits(TableId id, SortedMap<Text,TabletMergeability> splits) {
final Set<Text> addedSplits = new HashSet<>(splits.keySet());
Expand Down

0 comments on commit 79763bf

Please sign in to comment.