Skip to content

Commit

Permalink
Updates based on feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
cshannon committed Jan 26, 2025
1 parent 36b5037 commit 2f2925d
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ public class TabletMergeability implements Serializable {
private final Duration delay;

private TabletMergeability(Duration delay) {
this.delay = Objects.requireNonNull(delay);
this.delay = delay;
Preconditions.checkArgument(!delay.isNegative(), "Delay '%s' must not be negative.",
delay.toNanos());
}

// Edge case for NEVER
Expand Down Expand Up @@ -122,14 +124,15 @@ public static TabletMergeability always() {

/**
* Creates a {@link TabletMergeability} that signals a tablet has a delay to a point in the future
* before it is automatically eligible to be merged. The duration must be positive value.
* before it is automatically eligible to be merged. The duration must be greater than or equal to
* 0. A delay of 0 means a Tablet is immediately eligible to be merged.
*
* @param delay the duration of the delay
*
* @return a {@link TabletMergeability} from the given delay.
*/
public static TabletMergeability after(Duration delay) {
Preconditions.checkArgument(delay.toNanos() >= 0, "Duration of delay must be greater than 0.");
// Delay will be validated that it is >=0 inside the constructor
return new TabletMergeability(delay);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,17 @@ public TabletMergeability getTabletMergeability() {
}

/**
* Returns the time of the Manager when the TabletMergeability was set on the tablet. This will be
* empty if TabletMergeability is set to never
*
* @return the insertion time is set or else empty
*/
public Optional<Duration> getInsertionTime() {
return insertionTime;
}

/**
* Returns the current time of the Manager
*
* @return the current time
* If the TabletMergeability is configured with a delay this returns an estimate of the elapsed
* time since the delay was initially set. Returns optional.empty() if the tablet was configured
* with a TabletMergeability of never.
*/
public Duration getCurrentTime() {
return currentTime.get();
public Optional<Duration> getElapsed() {
// It's possible the "current time" is read from the manager and then a tablets insertion time
// is read much later and this could cause a negative read, in this case set the elapsed time to
// zero. Avoiding this would require an RPC per call to this method to get the latest manager
// time, so since this is an estimate return zero for this case.
return insertionTime.map(it -> currentTime.get().minus(it))
.map(elapsed -> elapsed.isNegative() ? Duration.ZERO : elapsed);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2290,7 +2290,7 @@ public Stream<TabletInformation> getTabletInformation(final String tableName, fi
var currentTime = Suppliers.memoize(() -> {
try {
return Duration.ofNanos(ThriftClientTypes.MANAGER.execute(context,
client -> client.getManagerTime(TraceUtil.traceInfo(), context.rpcCreds())));
client -> client.getManagerTimeNanos(TraceUtil.traceInfo(), context.rpcCreds())));
} catch (AccumuloException | AccumuloSecurityException e) {
throw new IllegalStateException(e);
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ public void testMergeabilityColumn() {
TabletColumnFamily.MERGEABILITY_COLUMN.put(m, new Value("{\"delay\":1,\"never\"=true}"));
assertViolation(mc, m, (short) 4006);

// SteadyTime must be set if delay positive
// SteadyTime must be set if delay is set
m = new Mutation(new Text("0;foo"));
TabletColumnFamily.MERGEABILITY_COLUMN.put(m, new Value("{\"delay\":10,\"never\"=false}"));
assertViolation(mc, m, (short) 4006);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,8 @@ public List<TKeyExtent> updateTabletMergeability(TInfo tinfo, TCredentials crede
}

@Override
public long getManagerTime(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException {
public long getManagerTimeNanos(TInfo tinfo, TCredentials credentials)
throws ThriftSecurityException {
manager.security.authenticateUser(credentials, credentials);
return manager.getSteadyTime().getNanos();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@

import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
Expand Down Expand Up @@ -145,6 +147,7 @@ public void updateSplitWithMergeabilityTest() throws Exception {
c.tableOperations().create(tableName);

NavigableMap<Text,TabletMergeability> splits = new TreeMap<>();
splits.put(new Text(String.format("%09d", 111)), TabletMergeability.always());
splits.put(new Text(String.format("%09d", 333)), TabletMergeability.never());
splits.put(new Text(String.format("%09d", 666)), TabletMergeability.never());
splits.put(new Text(String.format("%09d", 999)),
Expand All @@ -163,16 +166,22 @@ public void updateSplitWithMergeabilityTest() throws Exception {
verifySplits(id, splitUpdates);
verifySplitsWithApi(c, tableName, splitUpdates);

// Update two existing and add two new splits
// Update existing and add two new splits
// remove split 111 from map so we don't update it
splits.remove(new Text(String.format("%09d", 111)));
splits.put(new Text(String.format("%09d", 333)),
TabletMergeability.after(Duration.ofHours(11)));
splits.put(new Text(String.format("%09d", 666)),
TabletMergeability.after(Duration.ofHours(10)));
splits.put(new Text(String.format("%09d", 999)),
TabletMergeability.after(Duration.ofMinutes(7)));
splits.put(new Text(String.format("%09d", 444)), TabletMergeability.always());
splits.put(new Text(String.format("%09d", 777)),
TabletMergeability.after(Duration.ofMinutes(5)));

c.tableOperations().putSplits(tableName, splits);

// re-add split 111 and verify it has not changed
splits.put(new Text(String.format("%09d", 111)), TabletMergeability.always());

verifySplits(id, splits);
verifySplitsWithApi(c, tableName, splits);

Expand All @@ -183,22 +192,19 @@ public void updateSplitWithMergeabilityTest() throws Exception {

var originalTmi = c.tableOperations().getTabletInformation(tableName, new Range(split))
.findFirst().orElseThrow().getTabletMergeabilityInfo();
// getCurrentTime() uses a supplier and delays until invoking so trigger here
assertTrue(originalTmi.getCurrentTime().getNano() > 0);
assertTrue(originalTmi.getElapsed().orElseThrow().toNanos() > 0);

// Update existing with same delay of 5 minutes
c.tableOperations().putSplits(tableName, splits);
var updatedTmi = c.tableOperations().getTabletInformation(tableName, new Range(split))
.findFirst().orElseThrow().getTabletMergeabilityInfo();

// TabletMergeability setting should be the same but the insertion and current time
// should be different
// TabletMergeability setting should be the same but the new elapsed time should
// be different (less time has passed as it was updated with a new insertion time)
assertEquals(originalTmi.getTabletMergeability(), updatedTmi.getTabletMergeability());
assertTrue(updatedTmi.getInsertionTime().orElseThrow()
.compareTo(originalTmi.getInsertionTime().orElseThrow()) > 0);
// we previously called getCurrentTime() on the originalTmi object so the updated one
// should be newer
assertTrue(updatedTmi.getCurrentTime().compareTo(originalTmi.getCurrentTime()) > 0);
assertTrue(
originalTmi.getElapsed().orElseThrow().compareTo(updatedTmi.getElapsed().orElseThrow())
> 0);
}
}

Expand Down Expand Up @@ -229,66 +235,78 @@ public void addSplitIsMergeableTest() throws Exception {
// Set to always
var split1Tmi = tableInfo.get(split1).getTabletMergeabilityInfo();
assertTrue(split1Tmi.isMergeable());
assertTrue(split1Tmi.getInsertionTime().isPresent());
assertTrue(
split1Tmi.getCurrentTime().compareTo(split1Tmi.getInsertionTime().orElseThrow()) > 0);
assertTrue(split1Tmi.getTabletMergeability().isAlways());
assertTrue(split1Tmi.getElapsed().orElseThrow().toNanos() > 0);

// Set to never
var split2Tmi = tableInfo.get(split2).getTabletMergeabilityInfo();
assertFalse(split2Tmi.isMergeable());
assertFalse(split2Tmi.getInsertionTime().isPresent());
assertTrue(split2Tmi.getTabletMergeability().isNever());
assertTrue(split2Tmi.getElapsed().isEmpty());

// Set to a delay of 1 ms and current time should have elapsed long enough
var split3Tmi = tableInfo.get(split3).getTabletMergeabilityInfo();
assertTrue(split3Tmi.isMergeable());
assertTrue(split3Tmi.getInsertionTime().isPresent());
assertTrue(
split3Tmi.getCurrentTime().compareTo(split3Tmi.getInsertionTime().orElseThrow()) > 0);
assertEquals(Duration.ofMillis(1),
split3Tmi.getTabletMergeability().getDelay().orElseThrow());
assertTrue(split3Tmi.getElapsed().orElseThrow().toNanos() > 0);

// Set to a delay of 1 day and current time has NOT elapsed long enough
var split4Tmi = tableInfo.get(split4).getTabletMergeabilityInfo();
assertFalse(split4Tmi.isMergeable());
assertTrue(split4Tmi.getInsertionTime().isPresent());
assertTrue(
split4Tmi.getCurrentTime().compareTo(split4Tmi.getInsertionTime().orElseThrow()) > 0);
assertEquals(Duration.ofDays(1), split4Tmi.getTabletMergeability().getDelay().orElseThrow());
assertTrue(split4Tmi.getElapsed().orElseThrow().toNanos() > 0);

}
}

// Checks that TabletMergeability in metadata matches split settings in the map
private void verifySplits(TableId id, SortedMap<Text,TabletMergeability> splits) {
final Set<Text> addedSplits = new HashSet<>(splits.keySet());
try (TabletsMetadata tm = getServerContext().getAmple().readTablets().forTable(id).build()) {
tm.stream().forEach(t -> {
var split = t.getEndRow();
// default tablet should be set to never
if (t.getEndRow() == null) {
if (split == null) {
assertEquals(TabletMergeability.never(),
t.getTabletMergeability().getTabletMergeability());
} else {
assertTrue(addedSplits.remove(split));
// New splits should match the original setting in the map
assertEquals(splits.get(t.getEndRow()),
t.getTabletMergeability().getTabletMergeability());
assertEquals(splits.get(split), t.getTabletMergeability().getTabletMergeability());
}
});
}
// All splits should be seen
assertTrue(addedSplits.isEmpty());
}

private void verifySplitsWithApi(AccumuloClient c, String tableName,
SortedMap<Text,TabletMergeability> splits) throws TableNotFoundException {
final Set<Text> addedSplits = new HashSet<>(splits.keySet());
c.tableOperations().getTabletInformation(tableName, new Range()).forEach(ti -> {
var tmInfo = ti.getTabletMergeabilityInfo();
var split = ti.getTabletId().getEndRow();
// default tablet should always be set to never
if (ti.getTabletId().getEndRow() == null) {
if (split == null) {
assertEquals(TabletMergeability.never(),
ti.getTabletMergeabilityInfo().getTabletMergeability());
} else {
assertEquals(splits.get(ti.getTabletId().getEndRow()), tmInfo.getTabletMergeability());
assertTrue(addedSplits.remove(split));
assertEquals(splits.get(split), tmInfo.getTabletMergeability());
}
if (tmInfo.getTabletMergeability().isNever()) {
assertTrue(tmInfo.getElapsed().isEmpty());
} else {
assertTrue(tmInfo.getElapsed().orElseThrow().toNanos() > 0);
}
assertTrue(tmInfo.getCurrentTime().toNanos() > 0);
// isMergeable() should only be true for the always case because
// there has not been a long enough delay after insertion
// for the other tablets to be mergeable
assertEquals(tmInfo.getTabletMergeability().isAlways(), tmInfo.isMergeable());
});
// All splits should be seen
assertTrue(addedSplits.isEmpty());
}

private void verifyData(AccumuloClient client, String tableName, long ts) throws Exception {
Expand Down

0 comments on commit 2f2925d

Please sign in to comment.