Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cshannon committed Jan 25, 2025
1 parent 70a34c4 commit 31d8782
Show file tree
Hide file tree
Showing 10 changed files with 2,383 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,10 @@ public interface TabletInformation {
*/
TabletAvailability getTabletAvailability();

/**
* @return the TabletMergeabilityInfo object
*
* @since 4.0.0
*/
TabletMergeabilityInfo getTabletMergeabilityInfo();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.client.admin;

import java.util.Objects;
import java.util.Optional;

import org.apache.accumulo.core.util.time.SteadyTime;

public class TabletMergeabilityInfo {

private final TabletMergeability tabletMergeability;
private final Optional<SteadyTime> insertionTime;

public TabletMergeabilityInfo(TabletMergeability tabletMergeability,
Optional<SteadyTime> insertionTime) {
this.tabletMergeability = Objects.requireNonNull(tabletMergeability);
this.insertionTime = Objects.requireNonNull(insertionTime);
}

public TabletMergeability getTabletMergeability() {
return tabletMergeability;
}

public Optional<SteadyTime> getInsertionTime() {
return insertionTime;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}

TabletMergeabilityInfo that = (TabletMergeabilityInfo) o;
return tabletMergeability.equals(that.tabletMergeability)
&& insertionTime.equals(that.insertionTime);
}

@Override
public int hashCode() {
int result = tabletMergeability.hashCode();
result = 31 * result + insertionTime.hashCode();
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LAST;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND;
Expand Down Expand Up @@ -104,6 +105,7 @@
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.SummaryRetriever;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.admin.TableOperations.ImportDestinationArguments;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletInformation;
import org.apache.accumulo.core.client.admin.TabletMergeability;
Expand Down Expand Up @@ -510,11 +512,32 @@ public void putSplits(String tableName, SortedMap<Text,TabletMergeability> split

tabLocator.invalidateCache();

Map<KeyExtent,List<Pair<Text,TabletMergeability>>> tabletSplits =
mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
var splitsToTablets = mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
Map<KeyExtent,List<Pair<Text,TabletMergeability>>> tabletSplits = splitsToTablets.newSplits;
Map<KeyExtent,TabletMergeability> existingSplits = splitsToTablets.existingSplits;

List<CompletableFuture<Void>> futures = new ArrayList<>();

// Handle existing updates
if (!existingSplits.isEmpty()) {
futures.add(CompletableFuture.supplyAsync(() -> {
try {
var tSplits = existingSplits.entrySet().stream().collect(Collectors.toMap(
e -> e.getKey().toThrift(), e -> TabletMergeabilityUtil.toThrift(e.getValue())));
return ThriftClientTypes.MANAGER.executeTableCommand(context,
client -> client.updateTabletMergeability(TraceUtil.traceInfo(), context.rpcCreds(),
tableName, tSplits));
} catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
// This exception type is used because it makes it easier in the foreground thread to do
// exception analysis when using CompletableFuture.
throw new CompletionException(e);
}
}, startExecutor).thenApplyAsync(updated -> {
updated.forEach(tke -> splitsTodo.remove(KeyExtent.fromThrift(tke).endRow()));
return null;
}, waitExecutor));
}

// begin the fate operation for each tablet without waiting for the operation to complete
for (Entry<KeyExtent,List<Pair<Text,TabletMergeability>>> splitsForTablet : tabletSplits
.entrySet()) {
Expand Down Expand Up @@ -606,10 +629,11 @@ public void putSplits(String tableName, SortedMap<Text,TabletMergeability> split
waitExecutor.shutdown();
}

private Map<KeyExtent,List<Pair<Text,TabletMergeability>>> mapSplitsToTablets(String tableName,
TableId tableId, ClientTabletCache tabLocator, SortedMap<Text,TabletMergeability> splitsTodo)
private SplitsToTablets mapSplitsToTablets(String tableName, TableId tableId,
ClientTabletCache tabLocator, SortedMap<Text,TabletMergeability> splitsTodo)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
Map<KeyExtent,List<Pair<Text,TabletMergeability>>> tabletSplits = new HashMap<>();
Map<KeyExtent,List<Pair<Text,TabletMergeability>>> newSplits = new HashMap<>();
Map<KeyExtent,TabletMergeability> existingSplits = new HashMap<>();

var iterator = splitsTodo.entrySet().iterator();
while (iterator.hasNext()) {
Expand All @@ -633,20 +657,30 @@ private Map<KeyExtent,List<Pair<Text,TabletMergeability>>> mapSplitsToTablets(St
}

if (split.equals(tablet.getExtent().endRow())) {
// split already exists, so remove it
iterator.remove();
existingSplits.put(tablet.getExtent(), splitEntry.getValue());
continue;
}

tabletSplits.computeIfAbsent(tablet.getExtent(), k -> new ArrayList<>())
newSplits.computeIfAbsent(tablet.getExtent(), k -> new ArrayList<>())
.add(Pair.fromEntry(splitEntry));

} catch (InvalidTabletHostingRequestException e) {
// not expected
throw new AccumuloException(e);
}
}
return tabletSplits;
return new SplitsToTablets(newSplits, existingSplits);
}

private static class SplitsToTablets {
final Map<KeyExtent,List<Pair<Text,TabletMergeability>>> newSplits;
final Map<KeyExtent,TabletMergeability> existingSplits;

private SplitsToTablets(Map<KeyExtent,List<Pair<Text,TabletMergeability>>> newSplits,
Map<KeyExtent,TabletMergeability> existingSplits) {
this.newSplits = Objects.requireNonNull(newSplits);
this.existingSplits = Objects.requireNonNull(existingSplits);
}
}

@Override
Expand Down Expand Up @@ -2244,7 +2278,7 @@ public Stream<TabletInformation> getTabletInformation(final String tableName, fi

TabletsMetadata tabletsMetadata =
context.getAmple().readTablets().forTable(tableId).overlapping(scanRangeStart, true, null)
.fetch(AVAILABILITY, LOCATION, DIR, PREV_ROW, FILES, LAST, LOGS, SUSPEND)
.fetch(AVAILABILITY, LOCATION, DIR, PREV_ROW, FILES, LAST, LOGS, SUSPEND, MERGEABILITY)
.checkConsistency().build();

Set<TServerInstance> liveTserverSet = TabletMetadata.getLiveTServers(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletInformation;
import org.apache.accumulo.core.client.admin.TabletMergeabilityInfo;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
Expand Down Expand Up @@ -93,6 +94,11 @@ public TabletAvailability getTabletAvailability() {
return tabletMetadata.getTabletAvailability();
}

@Override
public TabletMergeabilityInfo getTabletMergeabilityInfo() {
return TabletMergeabilityUtil.toInfo(tabletMetadata.getTabletMergeability());
}

@Override
public String toString() {
return "TabletInformationImpl{tabletMetadata=" + tabletMetadata + ", estimatedSize="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import java.util.function.Function;

import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.client.admin.TabletMergeabilityInfo;
import org.apache.accumulo.core.manager.thrift.TTabletMergeability;
import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata;
import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.TextUtil;
Expand Down Expand Up @@ -95,6 +98,24 @@ public static Pair<Text,TabletMergeability> decode(String data) {
: TabletMergeability.after(Duration.ofNanos(jData.delay)));
}

public static TabletMergeability fromThrift(TTabletMergeability thriftTm) {
if (thriftTm.never) {
return TabletMergeability.never();
}
return TabletMergeability.after(Duration.ofNanos(thriftTm.delay));
}

public static TTabletMergeability toThrift(TabletMergeability tabletMergeability) {
if (tabletMergeability.isNever()) {
return new TTabletMergeability(true, -1L);
}
return new TTabletMergeability(false, tabletMergeability.getDelay().orElseThrow().toNanos());
}

public static TabletMergeabilityInfo toInfo(TabletMergeabilityMetadata metadata) {
return new TabletMergeabilityInfo(metadata.getTabletMergeability(), metadata.getSteadyTime());
}

private static class GSonData {
byte[] split;
boolean never;
Expand Down
Loading

0 comments on commit 31d8782

Please sign in to comment.