Skip to content

Commit

Permalink
Add initial TabletMergeability support to client api
Browse files Browse the repository at this point in the history
This is a follow on PR to apache#5187 for Issue apache#5014

This commit adds the first part of the needed client API changes to
support setting TabletMergeability on splits. Two inital updates have
been made to support table creation and adding splits later.

1) A new withSplits() method has been added to NewTableConfiguration
that takes a map of splits and associated TabletMergeability. The
exiting withSplits() method that takes a set just delegates to the new
method and sets a default for TabletMergeability to never because by
default user created tables should never be automatically mergeable.

2) A new putSplits() method has been added that also takes a map and
will create new splits with an associated TabletMergeability. The
existing addSplits() method delgates to this as well. Note that this
method is only partially done. The intent is to support updating
existing splits as well but this will be done in another PR.

The support the changes, where necessary the splits are now encoded with
the TabletMergeability value as json instead of just passing the split
bytes.
  • Loading branch information
cshannon committed Jan 18, 2025
1 parent 428f7c7 commit 3ea39c0
Show file tree
Hide file tree
Showing 24 changed files with 504 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.function.Function;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.IteratorSetting;
Expand All @@ -48,7 +52,7 @@
import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
import org.apache.hadoop.io.Text;

import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.ImmutableSortedMap;

/**
* This object stores table creation parameters. Currently includes: {@link TimeType}, whether to
Expand All @@ -71,7 +75,7 @@ public class NewTableConfiguration {
private Map<String,String> summarizerProps = Collections.emptyMap();
private Map<String,String> localityProps = Collections.emptyMap();
private final Map<String,String> iteratorProps = new HashMap<>();
private SortedSet<Text> splitProps = Collections.emptySortedSet();
private SortedMap<Text,TabletMergeability> splitProps = Collections.emptySortedMap();
private TabletAvailability initialTabletAvailability = TabletAvailability.ONDEMAND;

private void checkDisjoint(Map<String,String> props, Map<String,String> derivedProps,
Expand Down Expand Up @@ -188,6 +192,18 @@ public Map<String,String> getProperties() {
* @since 2.0.0
*/
public Collection<Text> getSplits() {
return splitProps.keySet();
}

/**
* Return Collection of split values and associated TabletMergeability.
*
* @return Collection containing splits and TabletMergeability associated with this
* NewTableConfiguration object.
*
* @since 4.0.0
*/
public SortedMap<Text,TabletMergeability> getSplitsMap() {
return splitProps;
}

Expand Down Expand Up @@ -258,10 +274,22 @@ public NewTableConfiguration setLocalityGroups(Map<String,Set<Text>> groups) {
*
* @since 2.0.0
*/
@SuppressWarnings("unchecked")
public NewTableConfiguration withSplits(final SortedSet<Text> splits) {
checkArgument(splits != null, "splits set is null");
checkArgument(!splits.isEmpty(), "splits set is empty");
this.splitProps = ImmutableSortedSet.copyOf(splits);
return withSplits(
splits.stream()
.collect(ImmutableSortedMap.toImmutableSortedMap(
Optional.ofNullable((Comparator<Text>) splits.comparator())
.orElse(Comparator.naturalOrder()),
Function.identity(), t -> TabletMergeability.never())));
}

public NewTableConfiguration withSplits(final SortedMap<Text,TabletMergeability> splits) {
checkArgument(splits != null, "splits set is null");
checkArgument(!splits.isEmpty(), "splits set is empty");
this.splitProps = ImmutableSortedMap.copyOf(splits);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
Expand Down Expand Up @@ -187,6 +188,32 @@ void exportTable(String tableName, String exportDir)
void addSplits(String tableName, SortedSet<Text> partitionKeys)
throws TableNotFoundException, AccumuloException, AccumuloSecurityException;

/**
*
* Ensures that tablets are split along a set of keys.
*
* TODO: This method currently only adds new splits (existing are stripped). The intent in a
* future PR is so support updating existing splits and the TabletMergeabilty setting. See
* https://github.com/apache/accumulo/issues/5014
*
* <p>
* Note that while the documentation for Text specifies that its bytestream should be UTF-8, the
* encoding is not enforced by operations that work with byte arrays.
* <p>
* For example, you can create 256 evenly-sliced splits via the following code sample even though
* the given byte sequences are not valid UTF-8.
*
* @param tableName the name of the table
* @param splits a sorted map of row key values to pre-split the table on and associated
* TabletMergeability
*
* @throws AccumuloException if a general error occurs
* @throws AccumuloSecurityException if the user does not have permission
* @throws TableNotFoundException if the table does not exist
*/
void putSplits(String tableName, SortedMap<Text,TabletMergeability> splits)
throws TableNotFoundException, AccumuloException, AccumuloSecurityException;

/**
* @param tableName the name of the table
* @return the split points (end-row names) for the table's current split profile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
Expand Down Expand Up @@ -105,6 +106,7 @@
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletInformation;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer;
import org.apache.accumulo.core.client.admin.compaction.CompactionSelector;
Expand Down Expand Up @@ -263,11 +265,11 @@ public void create(String tableName, NewTableConfiguration ntc)
// Check for possible initial splits to be added at table creation
// Always send number of initial splits to be created, even if zero. If greater than zero,
// add the splits to the argument List which will be used by the FATE operations.
int numSplits = ntc.getSplits().size();
int numSplits = ntc.getSplitsMap().size();
args.add(ByteBuffer.wrap(String.valueOf(numSplits).getBytes(UTF_8)));
if (numSplits > 0) {
for (Text t : ntc.getSplits()) {
args.add(TextUtil.getByteBuffer(t));
for (Entry<Text,TabletMergeability> t : ntc.getSplitsMap().entrySet()) {
args.add(TabletMergeabilityUtil.encodeAsBuffer(t.getKey(), t.getValue()));
}
}

Expand Down Expand Up @@ -475,8 +477,15 @@ String doFateOperation(TFateOperation op, List<ByteBuffer> args, Map<String,Stri
public static final String SPLIT_SUCCESS_MSG = "SPLIT_SUCCEEDED";

@Override
@SuppressWarnings("unchecked")
public void addSplits(String tableName, SortedSet<Text> splits)
throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
putSplits(tableName, TabletMergeabilityUtil.userDefaultSplits(splits));
}

@Override
public void putSplits(String tableName, SortedMap<Text,TabletMergeability> splits)
throws AccumuloException, TableNotFoundException, AccumuloSecurityException {

EXISTING_TABLE_NAME.validate(tableName);

Expand All @@ -487,7 +496,8 @@ public void addSplits(String tableName, SortedSet<Text> splits)

ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, tableId);

SortedSet<Text> splitsTodo = Collections.synchronizedSortedSet(new TreeSet<>(splits));
SortedMap<Text,TabletMergeability> splitsTodo =
Collections.synchronizedSortedMap(new TreeMap<>(splits));

final ByteBuffer EMPTY = ByteBuffer.allocate(0);

Expand All @@ -500,13 +510,14 @@ public void addSplits(String tableName, SortedSet<Text> splits)

tabLocator.invalidateCache();

Map<KeyExtent,List<Text>> tabletSplits =
Map<KeyExtent,List<Pair<Text,TabletMergeability>>> tabletSplits =
mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);

List<CompletableFuture<Void>> futures = new ArrayList<>();

// begin the fate operation for each tablet without waiting for the operation to complete
for (Entry<KeyExtent,List<Text>> splitsForTablet : tabletSplits.entrySet()) {
for (Entry<KeyExtent,List<Pair<Text,TabletMergeability>>> splitsForTablet : tabletSplits
.entrySet()) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
var extent = splitsForTablet.getKey();

Expand All @@ -515,7 +526,9 @@ public void addSplits(String tableName, SortedSet<Text> splits)
args.add(extent.endRow() == null ? EMPTY : TextUtil.getByteBuffer(extent.endRow()));
args.add(
extent.prevEndRow() == null ? EMPTY : TextUtil.getByteBuffer(extent.prevEndRow()));
splitsForTablet.getValue().forEach(split -> args.add(TextUtil.getByteBuffer(split)));

splitsForTablet.getValue()
.forEach(split -> args.add(TabletMergeabilityUtil.encodeAsBuffer(split)));

try {
return handleFateOperation(() -> {
Expand All @@ -533,13 +546,13 @@ public void addSplits(String tableName, SortedSet<Text> splits)
// wait for the fate operation to complete in a separate thread pool
}, startExecutor).thenApplyAsync(pair -> {
final TFateId opid = pair.getFirst();
final List<Text> completedSplits = pair.getSecond();
final List<Pair<Text,TabletMergeability>> completedSplits = pair.getSecond();

try {
String status = handleFateOperation(() -> waitForFateOperation(opid), tableName);

if (SPLIT_SUCCESS_MSG.equals(status)) {
completedSplits.forEach(splitsTodo::remove);
completedSplits.stream().map(Pair::getFirst).forEach(splitsTodo::remove);
}
} catch (TableExistsException | NamespaceExistsException | NamespaceNotFoundException
| AccumuloSecurityException | TableNotFoundException | AccumuloException e) {
Expand Down Expand Up @@ -593,14 +606,15 @@ public void addSplits(String tableName, SortedSet<Text> splits)
waitExecutor.shutdown();
}

private Map<KeyExtent,List<Text>> mapSplitsToTablets(String tableName, TableId tableId,
ClientTabletCache tabLocator, SortedSet<Text> splitsTodo)
private Map<KeyExtent,List<Pair<Text,TabletMergeability>>> mapSplitsToTablets(String tableName,
TableId tableId, ClientTabletCache tabLocator, SortedMap<Text,TabletMergeability> splitsTodo)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
Map<KeyExtent,List<Text>> tabletSplits = new HashMap<>();
Map<KeyExtent,List<Pair<Text,TabletMergeability>>> tabletSplits = new HashMap<>();

var iterator = splitsTodo.iterator();
var iterator = splitsTodo.entrySet().iterator();
while (iterator.hasNext()) {
var split = iterator.next();
var splitEntry = iterator.next();
var split = splitEntry.getKey();

try {
Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
Expand All @@ -624,7 +638,8 @@ private Map<KeyExtent,List<Text>> mapSplitsToTablets(String tableName, TableId t
continue;
}

tabletSplits.computeIfAbsent(tablet.getExtent(), k -> new ArrayList<>()).add(split);
tabletSplits.computeIfAbsent(tablet.getExtent(), k -> new ArrayList<>())
.add(Pair.fromEntry(splitEntry));

} catch (InvalidTabletHostingRequestException e) {
// not expected
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.clientImpl;

import static java.nio.charset.StandardCharsets.UTF_8;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Comparator;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.function.Function;

import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.core.util.json.ByteArrayToBase64TypeAdapter;
import org.apache.hadoop.io.Text;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSortedMap;
import com.google.gson.Gson;

public class TabletMergeabilityUtil {

private static final Gson gson = ByteArrayToBase64TypeAdapter.createBase64Gson();

public static SortedMap<Text,TabletMergeability> userDefaultSplits(SortedSet<Text> splits) {
return splitsWithDefault(splits, TabletMergeability.never());
}

public static SortedMap<Text,TabletMergeability> systemDefaultSplits(SortedSet<Text> splits) {
return splitsWithDefault(splits, TabletMergeability.always());
}

@SuppressWarnings("unchecked")
public static SortedMap<Text,TabletMergeability> splitsWithDefault(SortedSet<Text> splits,
TabletMergeability tabletMergeability) {
Objects.requireNonNull(tabletMergeability);
return splits.stream()
.collect(ImmutableSortedMap.toImmutableSortedMap(Optional
.ofNullable((Comparator<Text>) splits.comparator()).orElse(Comparator.naturalOrder()),
Function.identity(), t -> tabletMergeability));
}

public static ByteBuffer encodeAsBuffer(Text split, TabletMergeability tabletMergeability) {
return ByteBuffer.wrap(encode(split, tabletMergeability).getBytes(UTF_8));
}

// Encode A split and TabletMergeability as json. The split will be Base64 encoded
public static String encode(Text split, TabletMergeability tabletMergeability) {
GSonData jData = new GSonData();
jData.split = TextUtil.getBytes(split);
jData.never = tabletMergeability.isNever();
jData.delay = tabletMergeability.getDelay().map(Duration::toNanos).orElse(null);
return gson.toJson(jData);
}

public static ByteBuffer encodeAsBuffer(Pair<Text,TabletMergeability> split) {
return ByteBuffer.wrap(encode(split).getBytes(UTF_8));
}

public static String encode(Pair<Text,TabletMergeability> split) {
return encode(split.getFirst(), split.getSecond());
}

public static Pair<Text,TabletMergeability> decode(ByteBuffer data) {
return decode(ByteBufferUtil.toString(data));
}

public static Pair<Text,TabletMergeability> decode(String data) {
GSonData jData = gson.fromJson(data, GSonData.class);
Preconditions.checkArgument(jData.never == (jData.delay == null),
"delay should both be set if and only if mergeability 'never' is false");
return new Pair<>(new Text(jData.split), jData.never ? TabletMergeability.never()
: TabletMergeability.after(Duration.ofNanos(jData.delay)));
}

private static class GSonData {
byte[] split;
boolean never;
Long delay;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ public TabletMergeability getTabletMergeability() {
return tabletMergeability;
}

public boolean isNever() {
return tabletMergeability.isNever();
}

public boolean isAlways() {
return tabletMergeability.isAlways();
}

public Optional<Duration> getDelay() {
return tabletMergeability.getDelay();
}

public Optional<SteadyTime> getSteadyTime() {
return Optional.ofNullable(steadyTime);
}
Expand Down Expand Up @@ -134,6 +146,14 @@ public static TabletMergeabilityMetadata after(Duration delay, SteadyTime curren
return new TabletMergeabilityMetadata(TabletMergeability.after(delay), currentTime);
}

public static TabletMergeabilityMetadata toMetadata(TabletMergeability mergeability,
SteadyTime currentTime) {
if (mergeability.isNever()) {
return TabletMergeabilityMetadata.never();
}
return after(mergeability.getDelay().orElseThrow(), currentTime);
}

public static Value toValue(TabletMergeabilityMetadata tmm) {
return new Value(tmm.toJson());
}
Expand Down
Loading

0 comments on commit 3ea39c0

Please sign in to comment.