Skip to content

Commit

Permalink
Add getRemaining() to api and improve tests/comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cshannon committed Jan 31, 2025
1 parent 628aec2 commit 06bb135
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public boolean isAlways() {
* <li>positive delay</li>
* </ul>
*
* @return the configured mergeability delay
*
* @return the configured mergeability delay or empty if mergeability is NEVER
*/
public Optional<Duration> getDelay() {
return Optional.ofNullable(delay);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public TabletMergeabilityInfo(TabletMergeability tabletMergeability,
// if TabletMergeability is NEVER
Preconditions.checkArgument(tabletMergeability.isNever() == insertionTime.isEmpty(),
"insertionTime must not be empty if and only if TabletMergeability delay is >= 0");
insertionTime.ifPresent(
it -> Preconditions.checkArgument(!it.isNegative(), "insertionTime must not be negative"));
}

/**
Expand All @@ -60,14 +62,53 @@ public TabletMergeability getTabletMergeability() {
* with a TabletMergeability of never.
*/
public Optional<Duration> getElapsed() {
// It's possible the "current time" is read from the manager and then a tablets insertion time
// is read much later and this could cause a negative read, in this case set the elapsed time to
// zero. Avoiding this would require an RPC per call to this method to get the latest manager
// time, so since this is an estimate return zero for this case.
// @formatter:off
/*
* It's possible the "current time" is read from the manager and then a tablets insertion time
* is read later and this could cause a negative read, in this case set the elapsed time to
* zero. For example:
*
* 1. Thread_1 starts reading tablet info objects
* 2. Thread_1 reads TabletMergeabilityInfo for tablet_X. This causes the memoized
* current steady time supplier to be set to 5.
* 3. Thread_2 updates TabletMergeability for tablet_Y. It sets the insertion steady
* time for the update to 7.
* 4. Thread_1 reads TabletMergeabilityInfo for tablet_Y. To compute elapsed it
* does 5 - 7 = -2. In this case, just return 0.
*/
// @formatter:on
return insertionTime.map(it -> currentTime.get().minus(it))
.map(elapsed -> elapsed.isNegative() ? Duration.ZERO : elapsed);
}

/**
* If the TabletMergeability is configured with a delay this returns an estimate of the remaining
* time since the delay was initially set. Returns optional.empty() if the tablet was configured
* with a TabletMergeability of never.
*/
public Optional<Duration> getRemaining() {
// Delay should always be set if insertionTime is not empty
// getElapsed() and getDelay() are guaranteed to both be >= 0
return getElapsed().map(elapsed -> tabletMergeability.getDelay().orElseThrow().minus(elapsed))
.map(remaining -> remaining.isNegative() ? Duration.ZERO : remaining);
}

/**
* Returns an Optional duration of the configured {@link TabletMergeability} delay which is one
* of:
*
* <ul>
* <li>empty (never)</li>
* <li>0 (now)</li>
* <li>positive delay</li>
* </ul>
*
* @return the configured mergeability delay or empty if mergeability is NEVER
*/
public Optional<Duration> getDelay() {
return tabletMergeability.getDelay();
}

/**
* Check if this tablet is eligible to be automatically merged. <br>
* If TabletMergeability is set to "always", this will return true. <br>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.client.admin;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.Duration;
import java.util.Optional;

import org.junit.jupiter.api.Test;

public class TabletMergeabilityInfoTest {

@Test
public void testValidation() {
// ALWAYS requires an insertion time
assertThrows(IllegalArgumentException.class,
() -> new TabletMergeabilityInfo(TabletMergeability.always(), Optional.empty(),
() -> Duration.ofDays(10)));

// NEVER requires empty insertion time
assertThrows(IllegalArgumentException.class,
() -> new TabletMergeabilityInfo(TabletMergeability.never(), Optional.of(Duration.ZERO),
() -> Duration.ofDays(10)));

// Delay can't be null
assertThrows(IllegalArgumentException.class,
() -> new TabletMergeabilityInfo(TabletMergeability.after(Duration.ofMillis(-10)),
Optional.of(Duration.ofHours(1)), () -> Duration.ofDays(10)));

// InsertionTime can't be negative
assertThrows(IllegalArgumentException.class,
() -> new TabletMergeabilityInfo(TabletMergeability.always(),
Optional.of(Duration.ofHours(-1)), () -> Duration.ofDays(10)));

// No negative supplier for current time
assertThrows(NullPointerException.class,
() -> new TabletMergeabilityInfo(TabletMergeability.always(),
Optional.of(Duration.ofHours(1)), null));
}

@Test
public void testNever() {
var tmi = new TabletMergeabilityInfo(TabletMergeability.never(), Optional.empty(),
() -> Duration.ofDays(10));
assertFalse(tmi.isMergeable());
assertTrue(tmi.getTabletMergeability().isNever());
assertTrue(tmi.getElapsed().isEmpty());
assertTrue(tmi.getDelay().isEmpty());
assertTrue(tmi.getRemaining().isEmpty());
}

@Test
public void testAlways() {
var tmi = new TabletMergeabilityInfo(TabletMergeability.always(),
Optional.of(Duration.ofDays(1)), () -> Duration.ofDays(10));
assertTrue(tmi.isMergeable());
assertTrue(tmi.getTabletMergeability().isAlways());
assertTrue(tmi.getElapsed().orElseThrow().toNanos() > 0);
assertEquals(Duration.ZERO, tmi.getRemaining().orElseThrow());
}

@Test
public void testDelay() {
// test values when isMergeable() is false
var tmi = new TabletMergeabilityInfo(TabletMergeability.after(Duration.ofDays(2)),
Optional.of(Duration.ofDays(9)), () -> Duration.ofDays(10));
assertFalse(tmi.isMergeable());
assertEquals(Duration.ofDays(2), tmi.getDelay().orElseThrow());
assertEquals(Duration.ofDays(1), tmi.getElapsed().orElseThrow());
assertEquals(tmi.getDelay().map(delay -> delay.minus(tmi.getElapsed().orElseThrow())),
tmi.getRemaining());

// test values when isMergeable() is true
var tmi2 = new TabletMergeabilityInfo(TabletMergeability.after(Duration.ofDays(6)),
Optional.of(Duration.ofDays(1)), () -> Duration.ofDays(10));
assertTrue(tmi2.isMergeable());
assertEquals(Duration.ofDays(6), tmi2.getDelay().orElseThrow());
assertEquals(Duration.ofDays(9), tmi2.getElapsed().orElseThrow());
assertEquals(Duration.ZERO, tmi2.getRemaining().orElseThrow());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
Expand All @@ -45,6 +46,7 @@
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.client.admin.TabletMergeabilityInfo;
import org.apache.accumulo.core.clientImpl.TabletMergeabilityUtil;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
Expand Down Expand Up @@ -243,29 +245,39 @@ public void addSplitIsMergeableTest() throws Exception {
assertTrue(split1Tmi.isMergeable());
assertTrue(split1Tmi.getTabletMergeability().isAlways());
assertTrue(split1Tmi.getElapsed().orElseThrow().toNanos() > 0);
assertEquals(Duration.ZERO, split1Tmi.getRemaining().orElseThrow());
assertEquals(computeRemaining(split1Tmi), split1Tmi.getRemaining());

// Set to never
var split2Tmi = tableInfo.get(split2).getTabletMergeabilityInfo();
assertFalse(split2Tmi.isMergeable());
assertTrue(split2Tmi.getTabletMergeability().isNever());
assertTrue(split2Tmi.getElapsed().isEmpty());
assertTrue(split2Tmi.getDelay().isEmpty());
assertTrue(split2Tmi.getRemaining().isEmpty());

// Set to a delay of 1 ms and current time should have elapsed long enough
var split3Tmi = tableInfo.get(split3).getTabletMergeabilityInfo();
assertTrue(split3Tmi.isMergeable());
assertEquals(Duration.ofMillis(1),
split3Tmi.getTabletMergeability().getDelay().orElseThrow());
assertEquals(Duration.ofMillis(1), split3Tmi.getDelay().orElseThrow());
assertTrue(split3Tmi.getElapsed().orElseThrow().toNanos() > 0);
assertEquals(computeRemaining(split3Tmi), split3Tmi.getRemaining());

// Set to a delay of 1 day and current time has NOT elapsed long enough
var split4Tmi = tableInfo.get(split4).getTabletMergeabilityInfo();
assertFalse(split4Tmi.isMergeable());
assertEquals(Duration.ofDays(1), split4Tmi.getTabletMergeability().getDelay().orElseThrow());
assertEquals(Duration.ofDays(1), split4Tmi.getDelay().orElseThrow());
assertTrue(split4Tmi.getElapsed().orElseThrow().toNanos() > 0);

assertEquals(computeRemaining(split4Tmi), split4Tmi.getRemaining());
}
}

// test remaining matches logic in the TabletMergeabilityInfo class
private static Optional<Duration> computeRemaining(TabletMergeabilityInfo tmi) {
return tmi.getDelay().map(delay -> delay.minus(tmi.getElapsed().orElseThrow()))
.map(remaining -> remaining.isNegative() ? Duration.ZERO : remaining);
}

@Test
public void concurrentAddSplitTest() throws Exception {
var threads = 10;
Expand Down

0 comments on commit 06bb135

Please sign in to comment.