Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cshannon committed Jan 25, 2025
1 parent 70a34c4 commit 2896097
Show file tree
Hide file tree
Showing 9 changed files with 2,313 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,10 @@ public interface TabletInformation {
*/
TabletAvailability getTabletAvailability();

/**
* @return the TabletMergeability object
*
* @since 4.0.0
*/
TabletMergeability getTabletMergeability();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LAST;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND;
Expand Down Expand Up @@ -104,6 +105,7 @@
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.SummaryRetriever;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.admin.TableOperations.ImportDestinationArguments;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletInformation;
import org.apache.accumulo.core.client.admin.TabletMergeability;
Expand Down Expand Up @@ -510,11 +512,32 @@ public void putSplits(String tableName, SortedMap<Text,TabletMergeability> split

tabLocator.invalidateCache();

Map<KeyExtent,List<Pair<Text,TabletMergeability>>> tabletSplits =
mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
var splitsToTablets = mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
Map<KeyExtent,List<Pair<Text,TabletMergeability>>> tabletSplits = splitsToTablets.newSplits;
Map<KeyExtent,TabletMergeability> existingSplits = splitsToTablets.existingSplits;

List<CompletableFuture<Void>> futures = new ArrayList<>();

// Handle existing updates
if (!existingSplits.isEmpty()) {
futures.add(CompletableFuture.supplyAsync(() -> {
try {
var tSplits = existingSplits.entrySet().stream().collect(Collectors.toMap(
e -> e.getKey().toThrift(), e -> TabletMergeabilityUtil.toThrift(e.getValue())));
return ThriftClientTypes.MANAGER.executeTableCommand(context,
client -> client.updateTabletMergeability(TraceUtil.traceInfo(), context.rpcCreds(),
tableName, tSplits));
} catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
// This exception type is used because it makes it easier in the foreground thread to do
// exception analysis when using CompletableFuture.
throw new CompletionException(e);
}
}, startExecutor).thenApplyAsync(updated -> {
updated.forEach(tke -> splitsTodo.remove(KeyExtent.fromThrift(tke).endRow()));
return null;
}, waitExecutor));
}

// begin the fate operation for each tablet without waiting for the operation to complete
for (Entry<KeyExtent,List<Pair<Text,TabletMergeability>>> splitsForTablet : tabletSplits
.entrySet()) {
Expand Down Expand Up @@ -606,10 +629,11 @@ public void putSplits(String tableName, SortedMap<Text,TabletMergeability> split
waitExecutor.shutdown();
}

private Map<KeyExtent,List<Pair<Text,TabletMergeability>>> mapSplitsToTablets(String tableName,
TableId tableId, ClientTabletCache tabLocator, SortedMap<Text,TabletMergeability> splitsTodo)
private SplitsToTablets mapSplitsToTablets(String tableName, TableId tableId,
ClientTabletCache tabLocator, SortedMap<Text,TabletMergeability> splitsTodo)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
Map<KeyExtent,List<Pair<Text,TabletMergeability>>> tabletSplits = new HashMap<>();
Map<KeyExtent,List<Pair<Text,TabletMergeability>>> newSplits = new HashMap<>();
Map<KeyExtent,TabletMergeability> existingSplits = new HashMap<>();

var iterator = splitsTodo.entrySet().iterator();
while (iterator.hasNext()) {
Expand All @@ -633,20 +657,30 @@ private Map<KeyExtent,List<Pair<Text,TabletMergeability>>> mapSplitsToTablets(St
}

if (split.equals(tablet.getExtent().endRow())) {
// split already exists, so remove it
iterator.remove();
existingSplits.put(tablet.getExtent(), splitEntry.getValue());
continue;
}

tabletSplits.computeIfAbsent(tablet.getExtent(), k -> new ArrayList<>())
newSplits.computeIfAbsent(tablet.getExtent(), k -> new ArrayList<>())
.add(Pair.fromEntry(splitEntry));

} catch (InvalidTabletHostingRequestException e) {
// not expected
throw new AccumuloException(e);
}
}
return tabletSplits;
return new SplitsToTablets(newSplits, existingSplits);
}

private static class SplitsToTablets {
final Map<KeyExtent,List<Pair<Text,TabletMergeability>>> newSplits;
final Map<KeyExtent,TabletMergeability> existingSplits;

private SplitsToTablets(Map<KeyExtent,List<Pair<Text,TabletMergeability>>> newSplits,
Map<KeyExtent,TabletMergeability> existingSplits) {
this.newSplits = Objects.requireNonNull(newSplits);
this.existingSplits = Objects.requireNonNull(existingSplits);
}
}

@Override
Expand Down Expand Up @@ -2244,7 +2278,7 @@ public Stream<TabletInformation> getTabletInformation(final String tableName, fi

TabletsMetadata tabletsMetadata =
context.getAmple().readTablets().forTable(tableId).overlapping(scanRangeStart, true, null)
.fetch(AVAILABILITY, LOCATION, DIR, PREV_ROW, FILES, LAST, LOGS, SUSPEND)
.fetch(AVAILABILITY, LOCATION, DIR, PREV_ROW, FILES, LAST, LOGS, SUSPEND, MERGEABILITY)
.checkConsistency().build();

Set<TServerInstance> liveTserverSet = TabletMetadata.getLiveTServers(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletInformation;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
Expand Down Expand Up @@ -93,6 +94,11 @@ public TabletAvailability getTabletAvailability() {
return tabletMetadata.getTabletAvailability();
}

@Override
public TabletMergeability getTabletMergeability() {
return tabletMetadata.getTabletMergeability().getTabletMergeability();
}

@Override
public String toString() {
return "TabletInformationImpl{tabletMetadata=" + tabletMetadata + ", estimatedSize="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.function.Function;

import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.manager.thrift.TTabletMergeability;
import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.TextUtil;
Expand Down Expand Up @@ -95,6 +96,20 @@ public static Pair<Text,TabletMergeability> decode(String data) {
: TabletMergeability.after(Duration.ofNanos(jData.delay)));
}

public static TabletMergeability fromThrift(TTabletMergeability thriftTm) {
if (thriftTm.never) {
return TabletMergeability.never();
}
return TabletMergeability.after(Duration.ofNanos(thriftTm.delay));
}

public static TTabletMergeability toThrift(TabletMergeability tabletMergeability) {
if (tabletMergeability.isNever()) {
return new TTabletMergeability(true, -1L);
}
return new TTabletMergeability(false, tabletMergeability.getDelay().orElseThrow().toNanos());
}

private static class GSonData {
byte[] split;
boolean never;
Expand Down
Loading

0 comments on commit 2896097

Please sign in to comment.