Skip to content

Commit

Permalink
Support TabletMergeability updates and queries (#5284)
Browse files Browse the repository at this point in the history
This is part 2 of the API changes for #5014 and is a follow on to #5187

putSplits() now supports updating existing tablets with the new
TabletMergeability setting, as well as creating new splits.
TabletMergeabilityInfo has been added to TabletInformation so a user can
retrieve the TabletMergeability setting on a tablet as well as the
insertion time and if the tablet is mergeable or not based on the
current manager time.

A couple new RPCs were added to the Manager service including an RPC to
update existing tablets with new TabletMergeability and to retrieve the
current Manager time.

---------

Co-authored-by: Keith Turner <kturner@apache.org>
  • Loading branch information
cshannon and keith-turner authored Jan 31, 2025
1 parent c438520 commit daed0fd
Show file tree
Hide file tree
Showing 15 changed files with 4,018 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,10 @@ public interface TabletInformation {
*/
TabletAvailability getTabletAvailability();

/**
* @return the TabletMergeabilityInfo object
*
* @since 4.0.0
*/
TabletMergeabilityInfo getTabletMergeabilityInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ public class TabletMergeability implements Serializable {
private final Duration delay;

private TabletMergeability(Duration delay) {
this.delay = Objects.requireNonNull(delay);
this.delay = delay;
Preconditions.checkArgument(!delay.isNegative(), "Delay '%s' must not be negative.",
delay.toNanos());
}

// Edge case for NEVER
Expand Down Expand Up @@ -74,7 +76,8 @@ public boolean isAlways() {
* <li>positive delay</li>
* </ul>
*
* @return the configured mergeability delay
*
* @return the configured mergeability delay or empty if mergeability is NEVER
*/
public Optional<Duration> getDelay() {
return Optional.ofNullable(delay);
Expand Down Expand Up @@ -122,14 +125,15 @@ public static TabletMergeability always() {

/**
* Creates a {@link TabletMergeability} that signals a tablet has a delay to a point in the future
* before it is automatically eligible to be merged. The duration must be positive value.
* before it is automatically eligible to be merged. The duration must be greater than or equal to
* 0. A delay of 0 means a Tablet is immediately eligible to be merged.
*
* @param delay the duration of the delay
*
* @return a {@link TabletMergeability} from the given delay.
*/
public static TabletMergeability after(Duration delay) {
Preconditions.checkArgument(delay.toNanos() >= 0, "Duration of delay must be greater than 0.");
// Delay will be validated that it is >=0 inside the constructor
return new TabletMergeability(delay);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.client.admin;

import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;

import org.apache.accumulo.core.clientImpl.TabletMergeabilityUtil;

import com.google.common.base.Preconditions;

/**
* @since 4.0.0
*/
public class TabletMergeabilityInfo {

private final TabletMergeability tabletMergeability;
private final Optional<Duration> insertionTime;
private final Supplier<Duration> currentTime;

public TabletMergeabilityInfo(TabletMergeability tabletMergeability,
Optional<Duration> insertionTime, Supplier<Duration> currentTime) {
this.tabletMergeability = Objects.requireNonNull(tabletMergeability);
this.insertionTime = Objects.requireNonNull(insertionTime);
this.currentTime = Objects.requireNonNull(currentTime);
// This makes sure that insertionTime is set if TabletMergeability has a delay, and is empty
// if TabletMergeability is NEVER
Preconditions.checkArgument(tabletMergeability.isNever() == insertionTime.isEmpty(),
"insertionTime must not be empty if and only if TabletMergeability delay is >= 0");
insertionTime.ifPresent(
it -> Preconditions.checkArgument(!it.isNegative(), "insertionTime must not be negative"));
}

/**
* @return the TabletMergeability
*/
public TabletMergeability getTabletMergeability() {
return tabletMergeability;
}

/**
* If the TabletMergeability is configured with a delay this returns an estimate of the elapsed
* time since the delay was initially set. Returns optional.empty() if the tablet was configured
* with a TabletMergeability of never.
*/
public Optional<Duration> getElapsed() {
// @formatter:off
/*
* It's possible the "current time" is read from the manager and then a tablets insertion time
* is read later and this could cause a negative read, in this case set the elapsed time to
* zero. For example:
*
* 1. Thread_1 starts reading tablet info objects
* 2. Thread_1 reads TabletMergeabilityInfo for tablet_X. This causes the memoized
* current steady time supplier to be set to 5.
* 3. Thread_2 updates TabletMergeability for tablet_Y. It sets the insertion steady
* time for the update to 7.
* 4. Thread_1 reads TabletMergeabilityInfo for tablet_Y. To compute elapsed it
* does 5 - 7 = -2. In this case, just return 0.
*/
// @formatter:on
return insertionTime.map(it -> currentTime.get().minus(it))
.map(elapsed -> elapsed.isNegative() ? Duration.ZERO : elapsed);
}

/**
* If the TabletMergeability is configured with a delay this returns an estimate of the remaining
* time since the delay was initially set. Returns optional.empty() if the tablet was configured
* with a TabletMergeability of never.
*/
public Optional<Duration> getRemaining() {
// Delay should always be set if insertionTime is not empty
// getElapsed() and getDelay() are guaranteed to both be >= 0
return getElapsed().map(elapsed -> tabletMergeability.getDelay().orElseThrow().minus(elapsed))
.map(remaining -> remaining.isNegative() ? Duration.ZERO : remaining);
}

/**
* Returns an Optional duration of the configured {@link TabletMergeability} delay which is one
* of:
*
* <ul>
* <li>empty (never)</li>
* <li>0 (now)</li>
* <li>positive delay</li>
* </ul>
*
* @return the configured mergeability delay or empty if mergeability is NEVER
*/
public Optional<Duration> getDelay() {
return tabletMergeability.getDelay();
}

/**
* Check if this tablet is eligible to be automatically merged. <br>
* If TabletMergeability is set to "always", this will return true. <br>
* If TabletMergeability is set to "never", this will return false. <br>
* If TabletMergeability is configured with a delay, this will check if enough time has elapsed
* and return true if it has.
*
* @return true if the tablet is eligible for automatic merging, else false
*/
public boolean isMergeable() {
if (tabletMergeability.isNever()) {
return false;
}
// The constructor checks that if tabletMergeability is set to a value other
// than never that the insertionTime is also present
return TabletMergeabilityUtil.isMergeable(insertionTime.orElseThrow(),
tabletMergeability.getDelay().orElseThrow(), currentTime.get());
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}

TabletMergeabilityInfo that = (TabletMergeabilityInfo) o;
return tabletMergeability.equals(that.tabletMergeability)
&& insertionTime.equals(that.insertionTime);
}

@Override
public int hashCode() {
int result = tabletMergeability.hashCode();
result = 31 * result + insertionTime.hashCode();
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LAST;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND;
Expand Down Expand Up @@ -104,6 +105,7 @@
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.SummaryRetriever;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.admin.TableOperations.ImportDestinationArguments;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletInformation;
import org.apache.accumulo.core.client.admin.TabletMergeability;
Expand Down Expand Up @@ -182,6 +184,7 @@

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;

public class TableOperationsImpl extends TableOperationsHelper {

Expand Down Expand Up @@ -510,11 +513,33 @@ public void putSplits(String tableName, SortedMap<Text,TabletMergeability> split

tabLocator.invalidateCache();

Map<KeyExtent,List<Pair<Text,TabletMergeability>>> tabletSplits =
mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
var splitsToTablets = mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
Map<KeyExtent,List<Pair<Text,TabletMergeability>>> tabletSplits = splitsToTablets.newSplits;
Map<KeyExtent,TabletMergeability> existingSplits = splitsToTablets.existingSplits;

List<CompletableFuture<Void>> futures = new ArrayList<>();

// Handle existing updates
if (!existingSplits.isEmpty()) {
futures.add(CompletableFuture.supplyAsync(() -> {
try {
var tSplits = existingSplits.entrySet().stream().collect(Collectors.toMap(
e -> e.getKey().toThrift(), e -> TabletMergeabilityUtil.toThrift(e.getValue())));
return ThriftClientTypes.MANAGER.executeTableCommand(context,
client -> client.updateTabletMergeability(TraceUtil.traceInfo(), context.rpcCreds(),
tableName, tSplits));
} catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
// This exception type is used because it makes it easier in the foreground thread to do
// exception analysis when using CompletableFuture.
throw new CompletionException(e);
}
}, startExecutor).thenApplyAsync(updated -> {
// Remove the successfully updated tablets from the list, failures will be retried
updated.forEach(tke -> splitsTodo.remove(KeyExtent.fromThrift(tke).endRow()));
return null;
}, waitExecutor));
}

// begin the fate operation for each tablet without waiting for the operation to complete
for (Entry<KeyExtent,List<Pair<Text,TabletMergeability>>> splitsForTablet : tabletSplits
.entrySet()) {
Expand Down Expand Up @@ -606,10 +631,11 @@ public void putSplits(String tableName, SortedMap<Text,TabletMergeability> split
waitExecutor.shutdown();
}

private Map<KeyExtent,List<Pair<Text,TabletMergeability>>> mapSplitsToTablets(String tableName,
TableId tableId, ClientTabletCache tabLocator, SortedMap<Text,TabletMergeability> splitsTodo)
private SplitsToTablets mapSplitsToTablets(String tableName, TableId tableId,
ClientTabletCache tabLocator, SortedMap<Text,TabletMergeability> splitsTodo)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
Map<KeyExtent,List<Pair<Text,TabletMergeability>>> tabletSplits = new HashMap<>();
Map<KeyExtent,List<Pair<Text,TabletMergeability>>> newSplits = new HashMap<>();
Map<KeyExtent,TabletMergeability> existingSplits = new HashMap<>();

var iterator = splitsTodo.entrySet().iterator();
while (iterator.hasNext()) {
Expand All @@ -632,21 +658,33 @@ private Map<KeyExtent,List<Pair<Text,TabletMergeability>>> mapSplitsToTablets(St
tablet = tabLocator.findTablet(context, split, false, LocationNeed.NOT_REQUIRED);
}

// For splits that already exist collect them so we can update them
// separately
if (split.equals(tablet.getExtent().endRow())) {
// split already exists, so remove it
iterator.remove();
existingSplits.put(tablet.getExtent(), splitEntry.getValue());
continue;
}

tabletSplits.computeIfAbsent(tablet.getExtent(), k -> new ArrayList<>())
newSplits.computeIfAbsent(tablet.getExtent(), k -> new ArrayList<>())
.add(Pair.fromEntry(splitEntry));

} catch (InvalidTabletHostingRequestException e) {
// not expected
throw new AccumuloException(e);
}
}
return tabletSplits;
return new SplitsToTablets(newSplits, existingSplits);
}

private static class SplitsToTablets {
final Map<KeyExtent,List<Pair<Text,TabletMergeability>>> newSplits;
final Map<KeyExtent,TabletMergeability> existingSplits;

private SplitsToTablets(Map<KeyExtent,List<Pair<Text,TabletMergeability>>> newSplits,
Map<KeyExtent,TabletMergeability> existingSplits) {
this.newSplits = Objects.requireNonNull(newSplits);
this.existingSplits = Objects.requireNonNull(existingSplits);
}
}

@Override
Expand Down Expand Up @@ -2244,11 +2282,20 @@ public Stream<TabletInformation> getTabletInformation(final String tableName, fi

TabletsMetadata tabletsMetadata =
context.getAmple().readTablets().forTable(tableId).overlapping(scanRangeStart, true, null)
.fetch(AVAILABILITY, LOCATION, DIR, PREV_ROW, FILES, LAST, LOGS, SUSPEND)
.fetch(AVAILABILITY, LOCATION, DIR, PREV_ROW, FILES, LAST, LOGS, SUSPEND, MERGEABILITY)
.checkConsistency().build();

Set<TServerInstance> liveTserverSet = TabletMetadata.getLiveTServers(context);

var currentTime = Suppliers.memoize(() -> {
try {
return Duration.ofNanos(ThriftClientTypes.MANAGER.execute(context,
client -> client.getManagerTimeNanos(TraceUtil.traceInfo(), context.rpcCreds())));
} catch (AccumuloException | AccumuloSecurityException e) {
throw new IllegalStateException(e);
}
});

return tabletsMetadata.stream().peek(tm -> {
if (scanRangeStart != null && tm.getEndRow() != null
&& tm.getEndRow().compareTo(scanRangeStart) < 0) {
Expand All @@ -2257,8 +2304,8 @@ public Stream<TabletInformation> getTabletInformation(final String tableName, fi
}
}).takeWhile(tm -> tm.getPrevEndRow() == null
|| !range.afterEndKey(new Key(tm.getPrevEndRow()).followingKey(PartialKey.ROW)))
.map(tm -> new TabletInformationImpl(tm,
TabletState.compute(tm, liveTserverSet).toString()));
.map(tm -> new TabletInformationImpl(tm, TabletState.compute(tm, liveTserverSet).toString(),
currentTime));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
*/
package org.apache.accumulo.core.clientImpl;

import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;

import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletInformation;
import org.apache.accumulo.core.client.admin.TabletMergeabilityInfo;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
Expand All @@ -34,8 +38,10 @@ public class TabletInformationImpl implements TabletInformation {
private long estimatedSize;
private long estimatedEntries;
private final String tabletState;
private final Supplier<Duration> currentTime;

public TabletInformationImpl(TabletMetadata tabletMetadata, String tabletState) {
public TabletInformationImpl(TabletMetadata tabletMetadata, String tabletState,
Supplier<Duration> currentTime) {
this.tabletMetadata = tabletMetadata;
estimatedEntries = 0L;
estimatedSize = 0L;
Expand All @@ -44,6 +50,7 @@ public TabletInformationImpl(TabletMetadata tabletMetadata, String tabletState)
estimatedSize += dfv.getSize();
}
this.tabletState = tabletState;
this.currentTime = Objects.requireNonNull(currentTime);
}

@Override
Expand Down Expand Up @@ -93,6 +100,11 @@ public TabletAvailability getTabletAvailability() {
return tabletMetadata.getTabletAvailability();
}

@Override
public TabletMergeabilityInfo getTabletMergeabilityInfo() {
return TabletMergeabilityUtil.toInfo(tabletMetadata.getTabletMergeability(), currentTime);
}

@Override
public String toString() {
return "TabletInformationImpl{tabletMetadata=" + tabletMetadata + ", estimatedSize="
Expand Down
Loading

0 comments on commit daed0fd

Please sign in to comment.