Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cshannon committed Jan 10, 2025
1 parent 7701e59 commit 06a6457
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.function.Function;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.IteratorSetting;
Expand All @@ -48,7 +52,7 @@
import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
import org.apache.hadoop.io.Text;

import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.ImmutableSortedMap;

/**
* This object stores table creation parameters. Currently includes: {@link TimeType}, whether to
Expand All @@ -71,7 +75,7 @@ public class NewTableConfiguration {
private Map<String,String> summarizerProps = Collections.emptyMap();
private Map<String,String> localityProps = Collections.emptyMap();
private final Map<String,String> iteratorProps = new HashMap<>();
private SortedSet<Text> splitProps = Collections.emptySortedSet();
private SortedMap<Text,TabletMergeability> splitProps = Collections.emptySortedMap();
private TabletAvailability initialTabletAvailability = TabletAvailability.ONDEMAND;

private void checkDisjoint(Map<String,String> props, Map<String,String> derivedProps,
Expand Down Expand Up @@ -188,6 +192,10 @@ public Map<String,String> getProperties() {
* @since 2.0.0
*/
public Collection<Text> getSplits() {
return splitProps.keySet();
}

public SortedMap<Text,TabletMergeability> getSplitsMap() {
return splitProps;
}

Expand Down Expand Up @@ -258,10 +266,22 @@ public NewTableConfiguration setLocalityGroups(Map<String,Set<Text>> groups) {
*
* @since 2.0.0
*/
@SuppressWarnings("unchecked")
public NewTableConfiguration withSplits(final SortedSet<Text> splits) {
checkArgument(splits != null, "splits set is null");
checkArgument(!splits.isEmpty(), "splits set is empty");
this.splitProps = ImmutableSortedSet.copyOf(splits);
return withSplits(
splits.stream()
.collect(ImmutableSortedMap.toImmutableSortedMap(
Optional.ofNullable((Comparator<Text>) splits.comparator())
.orElse(Comparator.naturalOrder()),
Function.identity(), t -> TabletMergeability.never())));
}

public NewTableConfiguration withSplits(final SortedMap<Text,TabletMergeability> splits) {
checkArgument(splits != null, "splits set is null");
checkArgument(!splits.isEmpty(), "splits set is empty");
this.splitProps = ImmutableSortedMap.copyOf(splits);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletInformation;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer;
import org.apache.accumulo.core.client.admin.compaction.CompactionSelector;
Expand Down Expand Up @@ -263,11 +264,12 @@ public void create(String tableName, NewTableConfiguration ntc)
// Check for possible initial splits to be added at table creation
// Always send number of initial splits to be created, even if zero. If greater than zero,
// add the splits to the argument List which will be used by the FATE operations.
int numSplits = ntc.getSplits().size();
int numSplits = ntc.getSplitsMap().size();
args.add(ByteBuffer.wrap(String.valueOf(numSplits).getBytes(UTF_8)));
if (numSplits > 0) {
for (Text t : ntc.getSplits()) {
args.add(TextUtil.getByteBuffer(t));
for (Entry<Text,TabletMergeability> t : ntc.getSplitsMap().entrySet()) {
args.add(ByteBuffer
.wrap(TabletMergeabilityUtil.serialize(t.getKey(), t.getValue()).getBytes(UTF_8)));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.clientImpl;

import java.time.Duration;

import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.json.ByteArrayToBase64TypeAdapter;
import org.apache.hadoop.io.Text;

import com.google.common.base.Preconditions;
import com.google.gson.Gson;

public class TabletMergeabilityUtil {

private static final Gson gson = ByteArrayToBase64TypeAdapter.createBase64Gson();

public static String serialize(Text split, TabletMergeability tabletMergeability) {
GSonData jData = new GSonData();
jData.split = split.getBytes();
jData.never = tabletMergeability.isNever();
jData.delay = tabletMergeability.getDelay().map(Duration::toNanos).orElse(null);
return gson.toJson(jData);
}

public static Pair<Text,TabletMergeability> deserialize(String data) {
GSonData jData = gson.fromJson(data, GSonData.class);
Preconditions.checkArgument(jData.never == (jData.delay == null),
"delay should both be set if and only if mergeability 'never' is false");
return new Pair<>(new Text(jData.split), jData.never ? TabletMergeability.never()
: TabletMergeability.after(Duration.ofNanos(jData.delay)));
}

private static class GSonData {
byte[] split;
boolean never;
Long delay;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -925,11 +924,11 @@ private void writeSplitsToFile(Path splitsPath, final List<ByteBuffer> arguments
final int splitCount, final int splitOffset) throws IOException {
FileSystem fs = splitsPath.getFileSystem(manager.getContext().getHadoopConf());
try (FSDataOutputStream stream = fs.create(splitsPath)) {
// base64 encode because splits can contain binary
for (int i = splitOffset; i < splitCount + splitOffset; i++) {
// This is already encoded as json
byte[] splitBytes = ByteBufferUtil.toBytes(arguments.get(i));
String encodedSplit = Base64.getEncoder().encodeToString(splitBytes);
stream.write((encodedSplit + '\n').getBytes(UTF_8));
stream.write(splitBytes);
stream.write("\n".getBytes(UTF_8));
}
} catch (IOException e) {
log.error("Error in FateServiceHandler while writing splits to {}: {}", splitsPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,21 @@
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.NamespaceNotFoundException;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.clientImpl.Namespace;
import org.apache.accumulo.core.clientImpl.Namespaces;
import org.apache.accumulo.core.clientImpl.TabletMergeabilityUtil;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.data.AbstractId;
Expand All @@ -48,6 +52,7 @@
import org.apache.accumulo.core.fate.zookeeper.FateLock;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooReservation;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.tables.TableNameUtil;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.server.ServerContext;
Expand Down Expand Up @@ -276,4 +281,19 @@ public static SortedSet<Text> getSortedSetFromFile(Manager manager, Path path, b
return data;
}

public static SortedMap<Text,TabletMergeability> getSortedSplitsFromFile(Manager manager,
Path path) throws IOException {
FileSystem fs = path.getFileSystem(manager.getContext().getHadoopConf());
var data = new TreeMap<Text,TabletMergeability>();
try (var file = new java.util.Scanner(fs.open(path), UTF_8)) {
while (file.hasNextLine()) {
String line = file.nextLine();
log.trace("split line: {}", line);
Pair<Text,TabletMergeability> splitTm = TabletMergeabilityUtil.deserialize(line);
data.put(splitTm.getFirst(), splitTm.getSecond());
}
}
return data;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
package org.apache.accumulo.manager.tableOps.create;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.manager.tableOps.Utils.getSortedSplitsFromFile;

import java.io.IOException;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
Expand Down Expand Up @@ -83,7 +86,8 @@ public void undo(FateId fateId, Manager manager) throws Exception {
* to the file system for later use during this FATE operation.
*/
private void createTableDirectoriesInfo(Manager manager) throws IOException {
SortedSet<Text> splits = Utils.getSortedSetFromFile(manager, tableInfo.getSplitPath(), true);
SortedMap<Text,TabletMergeability> splits =
Utils.getSortedSplitsFromFile(manager, tableInfo.getSplitPath());
SortedSet<Text> tabletDirectoryInfo = createTabletDirectoriesSet(manager, splits.size());
writeTabletDirectoriesToFileSystem(manager, tabletDirectoryInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@
*/
package org.apache.accumulo.manager.tableOps.create;

import static org.apache.accumulo.manager.tableOps.Utils.getSortedSplitsFromFile;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.TreeMap;
import java.util.stream.Stream;

import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
Expand Down Expand Up @@ -58,15 +62,15 @@ public long isReady(FateId fateId, Manager environment) {

@Override
public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
SortedSet<Text> splits;
SortedMap<Text,TabletMergeability> splits;
Map<Text,Text> splitDirMap;

if (tableInfo.getInitialSplitSize() > 0) {
splits = Utils.getSortedSetFromFile(env, tableInfo.getSplitPath(), true);
splits = Utils.getSortedSplitsFromFile(env, tableInfo.getSplitPath());
SortedSet<Text> dirs = Utils.getSortedSetFromFile(env, tableInfo.getSplitDirsPath(), false);
splitDirMap = createSplitDirectoryMap(splits, dirs);
} else {
splits = new TreeSet<>();
splits = new TreeMap<>();
splitDirMap = Map.of();
}

Expand All @@ -75,11 +79,12 @@ public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
return new FinishCreateTable(tableInfo);
}

private void writeSplitsToMetadataTable(ServerContext context, SortedSet<Text> splits,
Map<Text,Text> data, ServiceLock lock) {
private void writeSplitsToMetadataTable(ServerContext context,
SortedMap<Text,TabletMergeability> splits, Map<Text,Text> data, ServiceLock lock) {
try (var tabletsMutator = context.getAmple().mutateTablets()) {
Text prevSplit = null;
Iterable<Text> iter = () -> Stream.concat(splits.stream(), Stream.of((Text) null)).iterator();
Iterable<Text> iter =
() -> Stream.concat(splits.keySet().stream(), Stream.of((Text) null)).iterator();
for (Text split : iter) {
var extent = new KeyExtent(tableInfo.getTableId(), split, prevSplit);

Expand Down Expand Up @@ -109,10 +114,11 @@ public void undo(FateId fateId, Manager environment) throws Exception {
/**
* Create a map containing an association between each split directory and a split value.
*/
private Map<Text,Text> createSplitDirectoryMap(SortedSet<Text> splits, SortedSet<Text> dirs) {
private Map<Text,Text> createSplitDirectoryMap(SortedMap<Text,TabletMergeability> splits,
SortedSet<Text> dirs) {
Preconditions.checkArgument(splits.size() == dirs.size());
Map<Text,Text> data = new HashMap<>();
Iterator<Text> s = splits.iterator();
Iterator<Text> s = splits.keySet().iterator();
Iterator<Text> d = dirs.iterator();
while (s.hasNext() && d.hasNext()) {
data.put(s.next(), d.next());
Expand Down

0 comments on commit 06a6457

Please sign in to comment.