diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java b/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java index ffb6b46deac..34176b11add 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java @@ -24,13 +24,17 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.Set; +import java.util.SortedMap; import java.util.SortedSet; +import java.util.function.Function; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.IteratorSetting; @@ -48,7 +52,7 @@ import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; import org.apache.hadoop.io.Text; -import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.ImmutableSortedMap; /** * This object stores table creation parameters. Currently includes: {@link TimeType}, whether to @@ -71,7 +75,7 @@ public class NewTableConfiguration { private Map summarizerProps = Collections.emptyMap(); private Map localityProps = Collections.emptyMap(); private final Map iteratorProps = new HashMap<>(); - private SortedSet splitProps = Collections.emptySortedSet(); + private SortedMap splitProps = Collections.emptySortedMap(); private TabletAvailability initialTabletAvailability = TabletAvailability.ONDEMAND; private void checkDisjoint(Map props, Map derivedProps, @@ -188,6 +192,10 @@ public Map getProperties() { * @since 2.0.0 */ public Collection getSplits() { + return splitProps.keySet(); + } + + public SortedMap getSplitsMap() { return splitProps; } @@ -258,10 +266,22 @@ public NewTableConfiguration setLocalityGroups(Map> groups) { * * @since 2.0.0 */ + @SuppressWarnings("unchecked") public NewTableConfiguration withSplits(final SortedSet splits) { checkArgument(splits != null, "splits set is null"); checkArgument(!splits.isEmpty(), "splits set is empty"); - this.splitProps = ImmutableSortedSet.copyOf(splits); + return withSplits( + splits.stream() + .collect(ImmutableSortedMap.toImmutableSortedMap( + Optional.ofNullable((Comparator) splits.comparator()) + .orElse(Comparator.naturalOrder()), + Function.identity(), t -> TabletMergeability.never()))); + } + + public NewTableConfiguration withSplits(final SortedMap splits) { + checkArgument(splits != null, "splits set is null"); + checkArgument(!splits.isEmpty(), "splits set is empty"); + this.splitProps = ImmutableSortedMap.copyOf(splits); return this; } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java index 0b1ff708519..ce4ae2b10c0 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java @@ -105,6 +105,7 @@ import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.client.admin.TabletInformation; +import org.apache.accumulo.core.client.admin.TabletMergeability; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer; import org.apache.accumulo.core.client.admin.compaction.CompactionSelector; @@ -263,11 +264,12 @@ public void create(String tableName, NewTableConfiguration ntc) // Check for possible initial splits to be added at table creation // Always send number of initial splits to be created, even if zero. If greater than zero, // add the splits to the argument List which will be used by the FATE operations. - int numSplits = ntc.getSplits().size(); + int numSplits = ntc.getSplitsMap().size(); args.add(ByteBuffer.wrap(String.valueOf(numSplits).getBytes(UTF_8))); if (numSplits > 0) { - for (Text t : ntc.getSplits()) { - args.add(TextUtil.getByteBuffer(t)); + for (Entry t : ntc.getSplitsMap().entrySet()) { + args.add(ByteBuffer + .wrap(TabletMergeabilityUtil.serialize(t.getKey(), t.getValue()).getBytes(UTF_8))); } } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtil.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtil.java new file mode 100644 index 00000000000..fb2a96294b7 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtil.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.clientImpl; + +import java.time.Duration; + +import org.apache.accumulo.core.client.admin.TabletMergeability; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.json.ByteArrayToBase64TypeAdapter; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Preconditions; +import com.google.gson.Gson; + +public class TabletMergeabilityUtil { + + private static final Gson gson = ByteArrayToBase64TypeAdapter.createBase64Gson(); + + public static String serialize(Text split, TabletMergeability tabletMergeability) { + GSonData jData = new GSonData(); + jData.split = split.getBytes(); + jData.never = tabletMergeability.isNever(); + jData.delay = tabletMergeability.getDelay().map(Duration::toNanos).orElse(null); + return gson.toJson(jData); + } + + public static Pair deserialize(String data) { + GSonData jData = gson.fromJson(data, GSonData.class); + Preconditions.checkArgument(jData.never == (jData.delay == null), + "delay should both be set if and only if mergeability 'never' is false"); + return new Pair<>(new Text(jData.split), jData.never ? TabletMergeability.never() + : TabletMergeability.after(Duration.ofNanos(jData.delay))); + } + + private static class GSonData { + byte[] split; + boolean never; + Long delay; + } + +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java index 7cdde03dc52..ee8fa0b1c7f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java @@ -34,7 +34,6 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Base64; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -925,11 +924,10 @@ private void writeSplitsToFile(Path splitsPath, final List arguments final int splitCount, final int splitOffset) throws IOException { FileSystem fs = splitsPath.getFileSystem(manager.getContext().getHadoopConf()); try (FSDataOutputStream stream = fs.create(splitsPath)) { - // base64 encode because splits can contain binary for (int i = splitOffset; i < splitCount + splitOffset; i++) { byte[] splitBytes = ByteBufferUtil.toBytes(arguments.get(i)); - String encodedSplit = Base64.getEncoder().encodeToString(splitBytes); - stream.write((encodedSplit + '\n').getBytes(UTF_8)); + stream.write(splitBytes); + stream.write("\n".getBytes(UTF_8)); } } catch (IOException e) { log.error("Error in FateServiceHandler while writing splits to {}: {}", splitsPath, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/ChooseDir.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/ChooseDir.java index 6532cb30fea..99745baa27a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/ChooseDir.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/ChooseDir.java @@ -21,16 +21,20 @@ import static java.nio.charset.StandardCharsets.UTF_8; import java.io.IOException; +import java.util.SortedMap; import java.util.SortedSet; +import java.util.TreeMap; import java.util.TreeSet; import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.admin.TabletMergeability; +import org.apache.accumulo.core.clientImpl.TabletMergeabilityUtil; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.manager.tableOps.TableInfo; -import org.apache.accumulo.manager.tableOps.Utils; import org.apache.accumulo.server.tablets.UniqueNameAllocator; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -83,11 +87,28 @@ public void undo(FateId fateId, Manager manager) throws Exception { * to the file system for later use during this FATE operation. */ private void createTableDirectoriesInfo(Manager manager) throws IOException { - SortedSet splits = Utils.getSortedSetFromFile(manager, tableInfo.getSplitPath(), true); + SortedMap splits = + getSortedMapFromFile(manager, tableInfo.getSplitPath()); SortedSet tabletDirectoryInfo = createTabletDirectoriesSet(manager, splits.size()); writeTabletDirectoriesToFileSystem(manager, tabletDirectoryInfo); } + // TODO: Temporary put here as a test, needs to be moved to Utils + public static SortedMap getSortedMapFromFile(Manager manager, Path path) + throws IOException { + FileSystem fs = path.getFileSystem(manager.getContext().getHadoopConf()); + var data = new TreeMap(); + try (var file = new java.util.Scanner(fs.open(path), UTF_8)) { + while (file.hasNextLine()) { + String line = file.nextLine(); + log.info("path: {}, line: {}", path, line); + Pair splitTm = TabletMergeabilityUtil.deserialize(line); + data.put(splitTm.getFirst(), splitTm.getSecond()); + } + } + return data; + } + /** * Create a set of unique table directories. These will be associated with splits in a follow-on * FATE step. diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java index 2b987c7a4e0..66d57d99053 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java @@ -21,10 +21,12 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.SortedMap; import java.util.SortedSet; -import java.util.TreeSet; +import java.util.TreeMap; import java.util.stream.Stream; +import org.apache.accumulo.core.client.admin.TabletMergeability; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; @@ -58,15 +60,15 @@ public long isReady(FateId fateId, Manager environment) { @Override public Repo call(FateId fateId, Manager env) throws Exception { - SortedSet splits; + SortedMap splits; Map splitDirMap; if (tableInfo.getInitialSplitSize() > 0) { - splits = Utils.getSortedSetFromFile(env, tableInfo.getSplitPath(), true); + splits = ChooseDir.getSortedMapFromFile(env, tableInfo.getSplitPath()); SortedSet dirs = Utils.getSortedSetFromFile(env, tableInfo.getSplitDirsPath(), false); splitDirMap = createSplitDirectoryMap(splits, dirs); } else { - splits = new TreeSet<>(); + splits = new TreeMap<>(); splitDirMap = Map.of(); } @@ -75,11 +77,12 @@ public Repo call(FateId fateId, Manager env) throws Exception { return new FinishCreateTable(tableInfo); } - private void writeSplitsToMetadataTable(ServerContext context, SortedSet splits, - Map data, ServiceLock lock) { + private void writeSplitsToMetadataTable(ServerContext context, + SortedMap splits, Map data, ServiceLock lock) { try (var tabletsMutator = context.getAmple().mutateTablets()) { Text prevSplit = null; - Iterable iter = () -> Stream.concat(splits.stream(), Stream.of((Text) null)).iterator(); + Iterable iter = + () -> Stream.concat(splits.keySet().stream(), Stream.of((Text) null)).iterator(); for (Text split : iter) { var extent = new KeyExtent(tableInfo.getTableId(), split, prevSplit); @@ -109,10 +112,11 @@ public void undo(FateId fateId, Manager environment) throws Exception { /** * Create a map containing an association between each split directory and a split value. */ - private Map createSplitDirectoryMap(SortedSet splits, SortedSet dirs) { + private Map createSplitDirectoryMap(SortedMap splits, + SortedSet dirs) { Preconditions.checkArgument(splits.size() == dirs.size()); Map data = new HashMap<>(); - Iterator s = splits.iterator(); + Iterator s = splits.keySet().iterator(); Iterator d = dirs.iterator(); while (s.hasNext() && d.hasNext()) { data.put(s.next(), d.next());