Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cshannon committed Jan 24, 2025
1 parent 70a34c4 commit aee70f3
Show file tree
Hide file tree
Showing 9 changed files with 2,243 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,10 @@ public interface TabletInformation {
*/
TabletAvailability getTabletAvailability();

/**
* @return the TabletMergeability object
*
* @since 4.0.0
*/
TabletMergeability getTabletMergeability();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LAST;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND;
Expand Down Expand Up @@ -104,6 +105,7 @@
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.SummaryRetriever;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.admin.TableOperations.ImportDestinationArguments;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletInformation;
import org.apache.accumulo.core.client.admin.TabletMergeability;
Expand Down Expand Up @@ -510,11 +512,22 @@ public void putSplits(String tableName, SortedMap<Text,TabletMergeability> split

tabLocator.invalidateCache();

var splitsToTablets = mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
Map<KeyExtent,List<Pair<Text,TabletMergeability>>> tabletSplits =
mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
splitsToTablets.newSplits;
Map<KeyExtent,TabletMergeability> existingSplits =
splitsToTablets.existingSplits;

List<CompletableFuture<Void>> futures = new ArrayList<>();

// TODO: Process existingSplits in a new RPC call and updating existing
// before continuing to the fate operations to add new. ie
// if (doRpcToUpdateMergability(existingSplits)) {
// remove from splitsTodo
// } else {
// handle errors, retry?
// }

// begin the fate operation for each tablet without waiting for the operation to complete
for (Entry<KeyExtent,List<Pair<Text,TabletMergeability>>> splitsForTablet : tabletSplits
.entrySet()) {
Expand Down Expand Up @@ -606,10 +619,11 @@ public void putSplits(String tableName, SortedMap<Text,TabletMergeability> split
waitExecutor.shutdown();
}

private Map<KeyExtent,List<Pair<Text,TabletMergeability>>> mapSplitsToTablets(String tableName,
private SplitsToTablets mapSplitsToTablets(String tableName,
TableId tableId, ClientTabletCache tabLocator, SortedMap<Text,TabletMergeability> splitsTodo)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
Map<KeyExtent,List<Pair<Text,TabletMergeability>>> tabletSplits = new HashMap<>();
Map<KeyExtent,List<Pair<Text,TabletMergeability>>> newSplits = new HashMap<>();
Map<KeyExtent,TabletMergeability> existingSplits = new HashMap<>();

var iterator = splitsTodo.entrySet().iterator();
while (iterator.hasNext()) {
Expand All @@ -633,20 +647,32 @@ private Map<KeyExtent,List<Pair<Text,TabletMergeability>>> mapSplitsToTablets(St
}

if (split.equals(tablet.getExtent().endRow())) {
existingSplits.put(tablet.getExtent(), splitEntry.getValue());
// split already exists, so remove it
iterator.remove();
continue;
}

tabletSplits.computeIfAbsent(tablet.getExtent(), k -> new ArrayList<>())
newSplits.computeIfAbsent(tablet.getExtent(), k -> new ArrayList<>())
.add(Pair.fromEntry(splitEntry));

} catch (InvalidTabletHostingRequestException e) {
// not expected
throw new AccumuloException(e);
}
}
return tabletSplits;
return new SplitsToTablets(newSplits, existingSplits);
}

private static class SplitsToTablets {
final Map<KeyExtent,List<Pair<Text,TabletMergeability>>> newSplits;
final Map<KeyExtent,TabletMergeability> existingSplits;

private SplitsToTablets(Map<KeyExtent,List<Pair<Text,TabletMergeability>>> newSplits,
Map<KeyExtent,TabletMergeability> existingSplits) {
this.newSplits = Objects.requireNonNull(newSplits);
this.existingSplits = Objects.requireNonNull(existingSplits);
}
}

@Override
Expand Down Expand Up @@ -2244,7 +2270,7 @@ public Stream<TabletInformation> getTabletInformation(final String tableName, fi

TabletsMetadata tabletsMetadata =
context.getAmple().readTablets().forTable(tableId).overlapping(scanRangeStart, true, null)
.fetch(AVAILABILITY, LOCATION, DIR, PREV_ROW, FILES, LAST, LOGS, SUSPEND)
.fetch(AVAILABILITY, LOCATION, DIR, PREV_ROW, FILES, LAST, LOGS, SUSPEND, MERGEABILITY)
.checkConsistency().build();

Set<TServerInstance> liveTserverSet = TabletMetadata.getLiveTServers(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletInformation;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
Expand Down Expand Up @@ -93,6 +94,11 @@ public TabletAvailability getTabletAvailability() {
return tabletMetadata.getTabletAvailability();
}

@Override
public TabletMergeability getTabletMergeability() {
return tabletMetadata.getTabletMergeability().getTabletMergeability();
}

@Override
public String toString() {
return "TabletInformationImpl{tabletMetadata=" + tabletMetadata + ", estimatedSize="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.function.Function;

import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.manager.thrift.TTabletMergeability;
import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.TextUtil;
Expand Down Expand Up @@ -95,6 +96,20 @@ public static Pair<Text,TabletMergeability> decode(String data) {
: TabletMergeability.after(Duration.ofNanos(jData.delay)));
}

public static TabletMergeability fromThrift(TTabletMergeability thriftTm) {
if (thriftTm.never) {
return TabletMergeability.never();
}
return TabletMergeability.after(Duration.ofNanos(thriftTm.delay));
}

public static TTabletMergeability toThrift(TabletMergeability tabletMergeability) {
if (tabletMergeability.isNever()) {
return new TTabletMergeability(true, -1L);
}
return new TTabletMergeability(false, tabletMergeability.getDelay().orElseThrow().toNanos());
}

private static class GSonData {
byte[] split;
boolean never;
Expand Down
Loading

0 comments on commit aee70f3

Please sign in to comment.