From 0fdb03628397967eb7d54831209f25ebce34dc20 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon" Date: Sun, 2 Feb 2025 14:06:20 -0500 Subject: [PATCH] wip --- .../manager/tableOps/merge/CountFiles.java | 17 +++- .../manager/tableOps/merge/MergeInfo.java | 10 +- .../tableOps/merge/ReserveTablets.java | 10 +- .../manager/tableOps/merge/TableRangeOp.java | 3 +- .../tableOps/merge/UnreserveAndError.java | 3 +- .../tableOps/merge/VerifyMergeability.java | 98 +++++++++++++++++++ 6 files changed, 130 insertions(+), 11 deletions(-) create mode 100644 server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/VerifyMergeability.java diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java index 4083eecc864..84259fd7918 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/CountFiles.java @@ -19,12 +19,15 @@ package org.apache.accumulo.manager.tableOps.merge; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation.SYSTEM_MERGE; +import com.google.common.base.Preconditions; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +42,9 @@ public CountFiles(MergeInfo mergeInfo) { @Override public Repo call(FateId fateId, Manager env) throws Exception { + // SYSTEM_MERGE should be executing VerifyMergeability repo, which already + // will count files + Preconditions.checkState(data.op != SYSTEM_MERGE, "Unexpected op %s", SYSTEM_MERGE); var range = data.getReserveExtent(); @@ -75,10 +81,13 @@ public Repo call(FateId fateId, Manager env) throws Exception { if (totalFiles >= maxFiles) { return new UnreserveAndError(data, totalFiles, maxFiles); } else { - if (data.op == MergeInfo.Operation.MERGE) { - return new MergeTablets(data); - } else { - return new DeleteRows(data); + switch (data.op) { + case MERGE: + return new MergeTablets(data); + case DELETE: + return new DeleteRows(data); + default: + throw new IllegalStateException("Unknown op " + data.op); } } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeInfo.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeInfo.java index 0da5159b657..6b84e16e7f0 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeInfo.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeInfo.java @@ -19,6 +19,7 @@ package org.apache.accumulo.manager.tableOps.merge; import java.io.Serializable; +import java.util.Objects; import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; @@ -37,7 +38,11 @@ public class MergeInfo implements Serializable { private static final long serialVersionUID = 1L; public enum Operation { - MERGE, DELETE, + MERGE, SYSTEM_MERGE, DELETE; + + public boolean isMergeOp() { + return this == MERGE || this == SYSTEM_MERGE; + } } final TableId tableId; @@ -60,7 +65,7 @@ private MergeInfo(TableId tableId, NamespaceId namespaceId, byte[] startRow, byt this.namespaceId = namespaceId; this.startRow = startRow; this.endRow = endRow; - this.op = op; + this.op = Objects.requireNonNull(op); this.mergeRangeSet = mergeRange != null; if (mergeRange != null) { mergeStartRow = @@ -102,6 +107,7 @@ public KeyExtent getMergeExtent() { public KeyExtent getReserveExtent() { switch (op) { case MERGE: + case SYSTEM_MERGE: return getOriginalExtent(); case DELETE: { if (endRow == null) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java index ffaa6adcd25..9a8a4c3dc09 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java @@ -128,6 +128,14 @@ public long isReady(FateId fateId, Manager env) throws Exception { @Override public Repo call(FateId fateId, Manager environment) throws Exception { - return new CountFiles(data); + switch (data.op) { + case SYSTEM_MERGE: + return new VerifyMergeability(data); + case MERGE: + case DELETE: + return new CountFiles(data); + default: + throw new IllegalStateException("Unknown op " + data.op); + } } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java index ddbbce5b7e4..ea63940db6a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/TableRangeOp.java @@ -57,8 +57,7 @@ public TableRangeOp(MergeInfo.Operation op, NamespaceId namespaceId, TableId tab @Override public Repo call(FateId fateId, Manager env) throws Exception { - if (AccumuloTable.ROOT.tableId().equals(data.tableId) - && MergeInfo.Operation.MERGE.equals(data.op)) { + if (AccumuloTable.ROOT.tableId().equals(data.tableId) && data.op.isMergeOp()) { log.warn("Attempt to merge tablets for {} does nothing. It is not splittable.", AccumuloTable.ROOT.tableName()); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/UnreserveAndError.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/UnreserveAndError.java index 926e190c881..de9aa83b2ff 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/UnreserveAndError.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/UnreserveAndError.java @@ -45,8 +45,7 @@ public UnreserveAndError(MergeInfo mergeInfo, long totalFiles, long maxFiles) { public Repo call(FateId fateId, Manager environment) throws Exception { FinishTableRangeOp.removeOperationIds(log, mergeInfo, fateId, environment); throw new AcceptableThriftTableOperationException(mergeInfo.tableId.toString(), null, - mergeInfo.op == MergeInfo.Operation.MERGE ? TableOperation.MERGE - : TableOperation.DELETE_RANGE, + mergeInfo.op.isMergeOp() ? TableOperation.MERGE : TableOperation.DELETE_RANGE, TableOperationExceptionType.OTHER, "Aborted merge because it would produce a tablets with more files than the configured limit of " + maxFiles + ". Observed " + totalFiles + " files in the merge range."); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/VerifyMergeability.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/VerifyMergeability.java new file mode 100644 index 00000000000..5979c006c34 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/VerifyMergeability.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.merge; + +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class VerifyMergeability extends ManagerRepo { + private static final Logger log = LoggerFactory.getLogger(CountFiles.class); + private static final long serialVersionUID = 1L; + private final MergeInfo data; + + public VerifyMergeability(MergeInfo mergeInfo) { + this.data = mergeInfo; + Preconditions.checkArgument(data.op == Operation.SYSTEM_MERGE, "Must be a System Merge"); + } + + @Override + public Repo call(FateId fateId, Manager env) throws Exception { + + var range = data.getReserveExtent(); + + var currentTime = env.getSteadyTime(); + var context = env.getContext(); + var tableConf = context.getTableConfiguration(data.tableId); + var threshold = tableConf.getAsBytes(Property.TABLE_SPLIT_THRESHOLD); + + // max percentage of split threshold + double mergeabilityThreshold = .25; + long maxTotalSize = (long) (threshold * mergeabilityThreshold); + long totalFiles = 0; + + try (var tablets = env.getContext().getAmple().readTablets().forTable(data.tableId) + .overlapping(range.prevEndRow(), range.endRow()).fetch(FILES, MERGEABILITY) + .checkConsistency().build()) { + + boolean mergeable = true; + long totalSize = 0; + + long maxFiles = env.getContext().getTableConfiguration(data.getOriginalExtent().tableId()) + .getCount(Property.TABLE_MERGE_FILE_MAX); + + for (var tabletMetadata : tablets) { + if (!tabletMetadata.getTabletMergeability().isMergeable(currentTime)) { + mergeable = false; + } + + totalFiles += tabletMetadata.getFiles().size(); + if (totalFiles >= maxFiles) { + log.debug("{} found {} files in the merge range, maxFiles is {}", fateId, totalFiles, maxFiles); + mergeable = false; + } + + totalSize += tabletMetadata.getFileSize(); + if (totalSize > maxTotalSize) { + mergeable = false; + } + + // TODO: we can check file count as well and remove FindFiles repo + if (!mergeable) { + // TODO: update UnreserveAndError to pass reason, not max files + return new UnreserveAndError(data, totalFiles, 0); + } + } + + } + + return new MergeTablets(data); + } + +}