Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cshannon committed Jan 12, 2025
1 parent e745a5d commit 0399dd2
Show file tree
Hide file tree
Showing 18 changed files with 304 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.function.Function;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.IteratorSetting;
Expand All @@ -48,7 +52,7 @@
import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
import org.apache.hadoop.io.Text;

import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.ImmutableSortedMap;

/**
* This object stores table creation parameters. Currently includes: {@link TimeType}, whether to
Expand All @@ -71,7 +75,7 @@ public class NewTableConfiguration {
private Map<String,String> summarizerProps = Collections.emptyMap();
private Map<String,String> localityProps = Collections.emptyMap();
private final Map<String,String> iteratorProps = new HashMap<>();
private SortedSet<Text> splitProps = Collections.emptySortedSet();
private SortedMap<Text,TabletMergeability> splitProps = Collections.emptySortedMap();
private TabletAvailability initialTabletAvailability = TabletAvailability.ONDEMAND;

private void checkDisjoint(Map<String,String> props, Map<String,String> derivedProps,
Expand Down Expand Up @@ -188,6 +192,10 @@ public Map<String,String> getProperties() {
* @since 2.0.0
*/
public Collection<Text> getSplits() {
return splitProps.keySet();
}

public SortedMap<Text,TabletMergeability> getSplitsMap() {
return splitProps;
}

Expand Down Expand Up @@ -258,10 +266,22 @@ public NewTableConfiguration setLocalityGroups(Map<String,Set<Text>> groups) {
*
* @since 2.0.0
*/
@SuppressWarnings("unchecked")
public NewTableConfiguration withSplits(final SortedSet<Text> splits) {
checkArgument(splits != null, "splits set is null");
checkArgument(!splits.isEmpty(), "splits set is empty");
this.splitProps = ImmutableSortedSet.copyOf(splits);
return withSplits(
splits.stream()
.collect(ImmutableSortedMap.toImmutableSortedMap(
Optional.ofNullable((Comparator<Text>) splits.comparator())
.orElse(Comparator.naturalOrder()),
Function.identity(), t -> TabletMergeability.never())));
}

public NewTableConfiguration withSplits(final SortedMap<Text,TabletMergeability> splits) {
checkArgument(splits != null, "splits set is null");
checkArgument(!splits.isEmpty(), "splits set is empty");
this.splitProps = ImmutableSortedMap.copyOf(splits);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
Expand Down Expand Up @@ -187,6 +188,9 @@ void exportTable(String tableName, String exportDir)
void addSplits(String tableName, SortedSet<Text> partitionKeys)
throws TableNotFoundException, AccumuloException, AccumuloSecurityException;

void putSplits(String tableName, SortedMap<Text,TabletMergeability> splits)
throws TableNotFoundException, AccumuloException, AccumuloSecurityException;

/**
* @param tableName the name of the table
* @return the split points (end-row names) for the table's current split profile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
Expand Down Expand Up @@ -105,6 +106,7 @@
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletInformation;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer;
import org.apache.accumulo.core.client.admin.compaction.CompactionSelector;
Expand Down Expand Up @@ -263,11 +265,11 @@ public void create(String tableName, NewTableConfiguration ntc)
// Check for possible initial splits to be added at table creation
// Always send number of initial splits to be created, even if zero. If greater than zero,
// add the splits to the argument List which will be used by the FATE operations.
int numSplits = ntc.getSplits().size();
int numSplits = ntc.getSplitsMap().size();
args.add(ByteBuffer.wrap(String.valueOf(numSplits).getBytes(UTF_8)));
if (numSplits > 0) {
for (Text t : ntc.getSplits()) {
args.add(TextUtil.getByteBuffer(t));
for (Entry<Text,TabletMergeability> t : ntc.getSplitsMap().entrySet()) {
args.add(TabletMergeabilityUtil.encodeAsBuffer(t.getKey(), t.getValue()));
}
}

Expand Down Expand Up @@ -475,8 +477,15 @@ String doFateOperation(TFateOperation op, List<ByteBuffer> args, Map<String,Stri
public static final String SPLIT_SUCCESS_MSG = "SPLIT_SUCCEEDED";

@Override
@SuppressWarnings("unchecked")
public void addSplits(String tableName, SortedSet<Text> splits)
throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
putSplits(tableName, TabletMergeabilityUtil.userDefaultSplits(splits));
}

@Override
public void putSplits(String tableName, SortedMap<Text,TabletMergeability> splits)
throws AccumuloException, TableNotFoundException, AccumuloSecurityException {

EXISTING_TABLE_NAME.validate(tableName);

Expand All @@ -487,7 +496,8 @@ public void addSplits(String tableName, SortedSet<Text> splits)

ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, tableId);

SortedSet<Text> splitsTodo = Collections.synchronizedSortedSet(new TreeSet<>(splits));
SortedMap<Text,TabletMergeability> splitsTodo =
Collections.synchronizedSortedMap(new TreeMap<>(splits));

final ByteBuffer EMPTY = ByteBuffer.allocate(0);

Expand All @@ -500,13 +510,14 @@ public void addSplits(String tableName, SortedSet<Text> splits)

tabLocator.invalidateCache();

Map<KeyExtent,List<Text>> tabletSplits =
Map<KeyExtent,List<Pair<Text,TabletMergeability>>> tabletSplits =
mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);

List<CompletableFuture<Void>> futures = new ArrayList<>();

// begin the fate operation for each tablet without waiting for the operation to complete
for (Entry<KeyExtent,List<Text>> splitsForTablet : tabletSplits.entrySet()) {
for (Entry<KeyExtent,List<Pair<Text,TabletMergeability>>> splitsForTablet : tabletSplits
.entrySet()) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
var extent = splitsForTablet.getKey();

Expand All @@ -515,7 +526,9 @@ public void addSplits(String tableName, SortedSet<Text> splits)
args.add(extent.endRow() == null ? EMPTY : TextUtil.getByteBuffer(extent.endRow()));
args.add(
extent.prevEndRow() == null ? EMPTY : TextUtil.getByteBuffer(extent.prevEndRow()));
splitsForTablet.getValue().forEach(split -> args.add(TextUtil.getByteBuffer(split)));

splitsForTablet.getValue()
.forEach(split -> args.add(TabletMergeabilityUtil.encodeAsBuffer(split)));

try {
return handleFateOperation(() -> {
Expand All @@ -533,13 +546,13 @@ public void addSplits(String tableName, SortedSet<Text> splits)
// wait for the fate operation to complete in a separate thread pool
}, startExecutor).thenApplyAsync(pair -> {
final TFateId opid = pair.getFirst();
final List<Text> completedSplits = pair.getSecond();
final List<Pair<Text,TabletMergeability>> completedSplits = pair.getSecond();

try {
String status = handleFateOperation(() -> waitForFateOperation(opid), tableName);

if (SPLIT_SUCCESS_MSG.equals(status)) {
completedSplits.forEach(splitsTodo::remove);
completedSplits.stream().map(Pair::getFirst).forEach(splitsTodo::remove);
}
} catch (TableExistsException | NamespaceExistsException | NamespaceNotFoundException
| AccumuloSecurityException | TableNotFoundException | AccumuloException e) {
Expand Down Expand Up @@ -593,14 +606,15 @@ public void addSplits(String tableName, SortedSet<Text> splits)
waitExecutor.shutdown();
}

private Map<KeyExtent,List<Text>> mapSplitsToTablets(String tableName, TableId tableId,
ClientTabletCache tabLocator, SortedSet<Text> splitsTodo)
private Map<KeyExtent,List<Pair<Text,TabletMergeability>>> mapSplitsToTablets(String tableName,
TableId tableId, ClientTabletCache tabLocator, SortedMap<Text,TabletMergeability> splitsTodo)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
Map<KeyExtent,List<Text>> tabletSplits = new HashMap<>();
Map<KeyExtent,List<Pair<Text,TabletMergeability>>> tabletSplits = new HashMap<>();

var iterator = splitsTodo.iterator();
var iterator = splitsTodo.entrySet().iterator();
while (iterator.hasNext()) {
var split = iterator.next();
var splitEntry = iterator.next();
var split = splitEntry.getKey();

try {
Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
Expand All @@ -624,7 +638,8 @@ private Map<KeyExtent,List<Text>> mapSplitsToTablets(String tableName, TableId t
continue;
}

tabletSplits.computeIfAbsent(tablet.getExtent(), k -> new ArrayList<>()).add(split);
tabletSplits.computeIfAbsent(tablet.getExtent(), k -> new ArrayList<>())
.add(Pair.fromEntry(splitEntry));

} catch (InvalidTabletHostingRequestException e) {
// not expected
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.clientImpl;

import static java.nio.charset.StandardCharsets.UTF_8;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Comparator;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.function.Function;

import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.json.ByteArrayToBase64TypeAdapter;
import org.apache.hadoop.io.Text;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSortedMap;
import com.google.gson.Gson;

public class TabletMergeabilityUtil {

private static final Gson gson = ByteArrayToBase64TypeAdapter.createBase64Gson();

public static SortedMap<Text,TabletMergeability> userDefaultSplits(SortedSet<Text> splits) {
return splitsWithDefault(splits, TabletMergeability.never());
}

public static SortedMap<Text,TabletMergeability> systemDefaultSplits(SortedSet<Text> splits) {
return splitsWithDefault(splits, TabletMergeability.always());
}

@SuppressWarnings("unchecked")
public static SortedMap<Text,TabletMergeability> splitsWithDefault(SortedSet<Text> splits,
TabletMergeability tabletMergeability) {
Objects.requireNonNull(tabletMergeability);
return splits.stream()
.collect(ImmutableSortedMap.toImmutableSortedMap(Optional
.ofNullable((Comparator<Text>) splits.comparator()).orElse(Comparator.naturalOrder()),
Function.identity(), t -> tabletMergeability));
}

public static ByteBuffer encodeAsBuffer(Text split, TabletMergeability tabletMergeability) {
return ByteBuffer.wrap(encode(split, tabletMergeability).getBytes(UTF_8));
}

public static String encode(Text split, TabletMergeability tabletMergeability) {
GSonData jData = new GSonData();
jData.split = split.getBytes();
jData.never = tabletMergeability.isNever();
jData.delay = tabletMergeability.getDelay().map(Duration::toNanos).orElse(null);
return gson.toJson(jData);
}

public static ByteBuffer encodeAsBuffer(Pair<Text,TabletMergeability> split) {
return ByteBuffer.wrap(encode(split).getBytes(UTF_8));
}

public static String encode(Pair<Text,TabletMergeability> split) {
return encode(split.getFirst(), split.getSecond());
}

public static Pair<Text,TabletMergeability> decodeFromBuffer(ByteBuffer data) {
return decode(ByteBufferUtil.toString(data));
}

public static Pair<Text,TabletMergeability> decode(String data) {
GSonData jData = gson.fromJson(data, GSonData.class);
Preconditions.checkArgument(jData.never == (jData.delay == null),
"delay should both be set if and only if mergeability 'never' is false");
return new Pair<>(new Text(jData.split), jData.never ? TabletMergeability.never()
: TabletMergeability.after(Duration.ofNanos(jData.delay)));
}

private static class GSonData {
byte[] split;
boolean never;
Long delay;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ public static TabletMergeabilityMetadata after(Duration delay, SteadyTime curren
return new TabletMergeabilityMetadata(TabletMergeability.after(delay), currentTime);
}

public static TabletMergeabilityMetadata after(TabletMergeability mergeability,
SteadyTime currentTime) {
return after(mergeability.getDelay().orElseThrow(), currentTime);
}

public static Value toValue(TabletMergeabilityMetadata tmm) {
return new Value(tmm.toJson());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.function.Consumer;
Expand All @@ -48,6 +49,7 @@
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.SummaryRetriever;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.data.Range;
Expand Down Expand Up @@ -80,6 +82,9 @@ public void create(String tableName, NewTableConfiguration ntc) {}
@Override
public void addSplits(String tableName, SortedSet<Text> partitionKeys) {}

@Override
public void putSplits(String tableName, SortedMap<Text,TabletMergeability> partitionKeys) {}

@Override
public Collection<Text> listSplits(String tableName) {
return null;
Expand Down
Loading

0 comments on commit 0399dd2

Please sign in to comment.