From 06a6457c7505339a73996200dcb026688003e2e6 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon" Date: Sun, 15 Dec 2024 15:24:51 -0500 Subject: [PATCH] wip --- .../client/admin/NewTableConfiguration.java | 26 ++++++++- .../core/clientImpl/TableOperationsImpl.java | 8 ++- .../clientImpl/TabletMergeabilityUtil.java | 57 +++++++++++++++++++ .../accumulo/manager/FateServiceHandler.java | 7 +-- .../accumulo/manager/tableOps/Utils.java | 20 +++++++ .../manager/tableOps/create/ChooseDir.java | 6 +- .../tableOps/create/PopulateMetadata.java | 24 +++++--- 7 files changed, 128 insertions(+), 20 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtil.java diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java b/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java index ffb6b46deac..34176b11add 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java @@ -24,13 +24,17 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.Set; +import java.util.SortedMap; import java.util.SortedSet; +import java.util.function.Function; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.IteratorSetting; @@ -48,7 +52,7 @@ import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; import org.apache.hadoop.io.Text; -import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.ImmutableSortedMap; /** * This object stores table creation parameters. Currently includes: {@link TimeType}, whether to @@ -71,7 +75,7 @@ public class NewTableConfiguration { private Map summarizerProps = Collections.emptyMap(); private Map localityProps = Collections.emptyMap(); private final Map iteratorProps = new HashMap<>(); - private SortedSet splitProps = Collections.emptySortedSet(); + private SortedMap splitProps = Collections.emptySortedMap(); private TabletAvailability initialTabletAvailability = TabletAvailability.ONDEMAND; private void checkDisjoint(Map props, Map derivedProps, @@ -188,6 +192,10 @@ public Map getProperties() { * @since 2.0.0 */ public Collection getSplits() { + return splitProps.keySet(); + } + + public SortedMap getSplitsMap() { return splitProps; } @@ -258,10 +266,22 @@ public NewTableConfiguration setLocalityGroups(Map> groups) { * * @since 2.0.0 */ + @SuppressWarnings("unchecked") public NewTableConfiguration withSplits(final SortedSet splits) { checkArgument(splits != null, "splits set is null"); checkArgument(!splits.isEmpty(), "splits set is empty"); - this.splitProps = ImmutableSortedSet.copyOf(splits); + return withSplits( + splits.stream() + .collect(ImmutableSortedMap.toImmutableSortedMap( + Optional.ofNullable((Comparator) splits.comparator()) + .orElse(Comparator.naturalOrder()), + Function.identity(), t -> TabletMergeability.never()))); + } + + public NewTableConfiguration withSplits(final SortedMap splits) { + checkArgument(splits != null, "splits set is null"); + checkArgument(!splits.isEmpty(), "splits set is empty"); + this.splitProps = ImmutableSortedMap.copyOf(splits); return this; } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java index 0b1ff708519..ce4ae2b10c0 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java @@ -105,6 +105,7 @@ import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.client.admin.TabletInformation; +import org.apache.accumulo.core.client.admin.TabletMergeability; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer; import org.apache.accumulo.core.client.admin.compaction.CompactionSelector; @@ -263,11 +264,12 @@ public void create(String tableName, NewTableConfiguration ntc) // Check for possible initial splits to be added at table creation // Always send number of initial splits to be created, even if zero. If greater than zero, // add the splits to the argument List which will be used by the FATE operations. - int numSplits = ntc.getSplits().size(); + int numSplits = ntc.getSplitsMap().size(); args.add(ByteBuffer.wrap(String.valueOf(numSplits).getBytes(UTF_8))); if (numSplits > 0) { - for (Text t : ntc.getSplits()) { - args.add(TextUtil.getByteBuffer(t)); + for (Entry t : ntc.getSplitsMap().entrySet()) { + args.add(ByteBuffer + .wrap(TabletMergeabilityUtil.serialize(t.getKey(), t.getValue()).getBytes(UTF_8))); } } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtil.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtil.java new file mode 100644 index 00000000000..fb2a96294b7 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletMergeabilityUtil.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.clientImpl; + +import java.time.Duration; + +import org.apache.accumulo.core.client.admin.TabletMergeability; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.json.ByteArrayToBase64TypeAdapter; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Preconditions; +import com.google.gson.Gson; + +public class TabletMergeabilityUtil { + + private static final Gson gson = ByteArrayToBase64TypeAdapter.createBase64Gson(); + + public static String serialize(Text split, TabletMergeability tabletMergeability) { + GSonData jData = new GSonData(); + jData.split = split.getBytes(); + jData.never = tabletMergeability.isNever(); + jData.delay = tabletMergeability.getDelay().map(Duration::toNanos).orElse(null); + return gson.toJson(jData); + } + + public static Pair deserialize(String data) { + GSonData jData = gson.fromJson(data, GSonData.class); + Preconditions.checkArgument(jData.never == (jData.delay == null), + "delay should both be set if and only if mergeability 'never' is false"); + return new Pair<>(new Text(jData.split), jData.never ? TabletMergeability.never() + : TabletMergeability.after(Duration.ofNanos(jData.delay))); + } + + private static class GSonData { + byte[] split; + boolean never; + Long delay; + } + +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java index 7cdde03dc52..c4238d11f6a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java @@ -34,7 +34,6 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Base64; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -925,11 +924,11 @@ private void writeSplitsToFile(Path splitsPath, final List arguments final int splitCount, final int splitOffset) throws IOException { FileSystem fs = splitsPath.getFileSystem(manager.getContext().getHadoopConf()); try (FSDataOutputStream stream = fs.create(splitsPath)) { - // base64 encode because splits can contain binary for (int i = splitOffset; i < splitCount + splitOffset; i++) { + // This is already encoded as json byte[] splitBytes = ByteBufferUtil.toBytes(arguments.get(i)); - String encodedSplit = Base64.getEncoder().encodeToString(splitBytes); - stream.write((encodedSplit + '\n').getBytes(UTF_8)); + stream.write(splitBytes); + stream.write("\n".getBytes(UTF_8)); } } catch (IOException e) { log.error("Error in FateServiceHandler while writing splits to {}: {}", splitsPath, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java index 36622789443..58328915fd9 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java @@ -25,7 +25,9 @@ import java.util.Base64; import java.util.HashMap; import java.util.Map; +import java.util.SortedMap; import java.util.SortedSet; +import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -33,9 +35,11 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.NamespaceNotFoundException; +import org.apache.accumulo.core.client.admin.TabletMergeability; import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; import org.apache.accumulo.core.clientImpl.Namespace; import org.apache.accumulo.core.clientImpl.Namespaces; +import org.apache.accumulo.core.clientImpl.TabletMergeabilityUtil; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; import org.apache.accumulo.core.data.AbstractId; @@ -48,6 +52,7 @@ import org.apache.accumulo.core.fate.zookeeper.FateLock; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooReservation; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.tables.TableNameUtil; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.server.ServerContext; @@ -276,4 +281,19 @@ public static SortedSet getSortedSetFromFile(Manager manager, Path path, b return data; } + public static SortedMap getSortedSplitsFromFile(Manager manager, + Path path) throws IOException { + FileSystem fs = path.getFileSystem(manager.getContext().getHadoopConf()); + var data = new TreeMap(); + try (var file = new java.util.Scanner(fs.open(path), UTF_8)) { + while (file.hasNextLine()) { + String line = file.nextLine(); + log.trace("split line: {}", line); + Pair splitTm = TabletMergeabilityUtil.deserialize(line); + data.put(splitTm.getFirst(), splitTm.getSecond()); + } + } + return data; + } + } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/ChooseDir.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/ChooseDir.java index 6532cb30fea..a80bb41c82c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/ChooseDir.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/ChooseDir.java @@ -19,12 +19,15 @@ package org.apache.accumulo.manager.tableOps.create; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.manager.tableOps.Utils.getSortedSplitsFromFile; import java.io.IOException; +import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeSet; import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.admin.TabletMergeability; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.manager.Manager; @@ -83,7 +86,8 @@ public void undo(FateId fateId, Manager manager) throws Exception { * to the file system for later use during this FATE operation. */ private void createTableDirectoriesInfo(Manager manager) throws IOException { - SortedSet splits = Utils.getSortedSetFromFile(manager, tableInfo.getSplitPath(), true); + SortedMap splits = + Utils.getSortedSplitsFromFile(manager, tableInfo.getSplitPath()); SortedSet tabletDirectoryInfo = createTabletDirectoriesSet(manager, splits.size()); writeTabletDirectoriesToFileSystem(manager, tabletDirectoryInfo); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java index 2b987c7a4e0..438146145b8 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java @@ -18,13 +18,17 @@ */ package org.apache.accumulo.manager.tableOps.create; +import static org.apache.accumulo.manager.tableOps.Utils.getSortedSplitsFromFile; + import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.SortedMap; import java.util.SortedSet; -import java.util.TreeSet; +import java.util.TreeMap; import java.util.stream.Stream; +import org.apache.accumulo.core.client.admin.TabletMergeability; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; @@ -58,15 +62,15 @@ public long isReady(FateId fateId, Manager environment) { @Override public Repo call(FateId fateId, Manager env) throws Exception { - SortedSet splits; + SortedMap splits; Map splitDirMap; if (tableInfo.getInitialSplitSize() > 0) { - splits = Utils.getSortedSetFromFile(env, tableInfo.getSplitPath(), true); + splits = Utils.getSortedSplitsFromFile(env, tableInfo.getSplitPath()); SortedSet dirs = Utils.getSortedSetFromFile(env, tableInfo.getSplitDirsPath(), false); splitDirMap = createSplitDirectoryMap(splits, dirs); } else { - splits = new TreeSet<>(); + splits = new TreeMap<>(); splitDirMap = Map.of(); } @@ -75,11 +79,12 @@ public Repo call(FateId fateId, Manager env) throws Exception { return new FinishCreateTable(tableInfo); } - private void writeSplitsToMetadataTable(ServerContext context, SortedSet splits, - Map data, ServiceLock lock) { + private void writeSplitsToMetadataTable(ServerContext context, + SortedMap splits, Map data, ServiceLock lock) { try (var tabletsMutator = context.getAmple().mutateTablets()) { Text prevSplit = null; - Iterable iter = () -> Stream.concat(splits.stream(), Stream.of((Text) null)).iterator(); + Iterable iter = + () -> Stream.concat(splits.keySet().stream(), Stream.of((Text) null)).iterator(); for (Text split : iter) { var extent = new KeyExtent(tableInfo.getTableId(), split, prevSplit); @@ -109,10 +114,11 @@ public void undo(FateId fateId, Manager environment) throws Exception { /** * Create a map containing an association between each split directory and a split value. */ - private Map createSplitDirectoryMap(SortedSet splits, SortedSet dirs) { + private Map createSplitDirectoryMap(SortedMap splits, + SortedSet dirs) { Preconditions.checkArgument(splits.size() == dirs.size()); Map data = new HashMap<>(); - Iterator s = splits.iterator(); + Iterator s = splits.keySet().iterator(); Iterator d = dirs.iterator(); while (s.hasNext() && d.hasNext()) { data.put(s.next(), d.next());