Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cshannon committed Feb 2, 2025
1 parent 3213646 commit 51bb1da
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public Repo<Manager> call(FateId fateId, Manager env) throws Exception {

switch (data.op) {
case MERGE:
case SYSTEM_MERGE:
for (var tabletMeta : tablets) {
totalFiles += tabletMeta.getFiles().size();
}
Expand All @@ -75,10 +76,15 @@ public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
if (totalFiles >= maxFiles) {
return new UnreserveAndError(data, totalFiles, maxFiles);
} else {
if (data.op == MergeInfo.Operation.MERGE) {
return new MergeTablets(data);
} else {
return new DeleteRows(data);
switch (data.op) {
case MERGE:
return new MergeTablets(data);
case SYSTEM_MERGE:
return new VerifyMergeability(data);
case DELETE:
return new DeleteRows(data);
default:
throw new IllegalStateException("Unknown op " + data.op);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.accumulo.manager.tableOps.merge;

import java.io.Serializable;
import java.util.Objects;

import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
Expand All @@ -37,7 +38,11 @@ public class MergeInfo implements Serializable {
private static final long serialVersionUID = 1L;

public enum Operation {
MERGE, DELETE,
MERGE, SYSTEM_MERGE, DELETE;

public boolean isMergeOp() {
return this == MERGE || this == SYSTEM_MERGE;
}
}

final TableId tableId;
Expand All @@ -60,7 +65,7 @@ private MergeInfo(TableId tableId, NamespaceId namespaceId, byte[] startRow, byt
this.namespaceId = namespaceId;
this.startRow = startRow;
this.endRow = endRow;
this.op = op;
this.op = Objects.requireNonNull(op);
this.mergeRangeSet = mergeRange != null;
if (mergeRange != null) {
mergeStartRow =
Expand Down Expand Up @@ -102,6 +107,7 @@ public KeyExtent getMergeExtent() {
public KeyExtent getReserveExtent() {
switch (op) {
case MERGE:
case SYSTEM_MERGE:
return getOriginalExtent();
case DELETE: {
if (endRow == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ public TableRangeOp(MergeInfo.Operation op, NamespaceId namespaceId, TableId tab
@Override
public Repo<Manager> call(FateId fateId, Manager env) throws Exception {

if (AccumuloTable.ROOT.tableId().equals(data.tableId)
&& MergeInfo.Operation.MERGE.equals(data.op)) {
if (AccumuloTable.ROOT.tableId().equals(data.tableId) && data.op.isMergeOp()) {
log.warn("Attempt to merge tablets for {} does nothing. It is not splittable.",
AccumuloTable.ROOT.tableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public UnreserveAndError(MergeInfo mergeInfo, long totalFiles, long maxFiles) {
public Repo<Manager> call(FateId fateId, Manager environment) throws Exception {
FinishTableRangeOp.removeOperationIds(log, mergeInfo, fateId, environment);
throw new AcceptableThriftTableOperationException(mergeInfo.tableId.toString(), null,
mergeInfo.op == MergeInfo.Operation.MERGE ? TableOperation.MERGE
: TableOperation.DELETE_RANGE,
mergeInfo.op.isMergeOp() ? TableOperation.MERGE : TableOperation.DELETE_RANGE,
TableOperationExceptionType.OTHER,
"Aborted merge because it would produce a tablets with more files than the configured limit of "
+ maxFiles + ". Observed " + totalFiles + " files in the merge range.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.manager.tableOps.merge;

import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY;

import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

public class VerifyMergeability extends ManagerRepo {
private static final Logger log = LoggerFactory.getLogger(CountFiles.class);
private static final long serialVersionUID = 1L;
private final MergeInfo data;

public VerifyMergeability(MergeInfo mergeInfo) {
this.data = mergeInfo;
Preconditions.checkArgument(data.op == Operation.SYSTEM_MERGE, "Must be a System Merge");
}

@Override
public Repo<Manager> call(FateId fateId, Manager env) throws Exception {

var range = data.getReserveExtent();

long totalFiles = 0;

var currentTime = env.getSteadyTime();
var context = env.getContext();
var tableConf = context.getTableConfiguration(data.tableId);
var threshold = tableConf.getAsBytes(Property.TABLE_SPLIT_THRESHOLD);

// max percentage of split threshold
double mergeabilityThreshold = .25;
long maxTotalSize = (long) (threshold * mergeabilityThreshold);

try (var tablets = env.getContext().getAmple().readTablets().forTable(data.tableId)
.overlapping(range.prevEndRow(), range.endRow()).fetch(FILES, MERGEABILITY)
.checkConsistency().build()) {

boolean mergeable = true;
long totalSize = 0;
for (var tabletMetadata : tablets) {
if (!tabletMetadata.getTabletMergeability().isMergeable(currentTime)) {
mergeable = false;
}
totalSize += tabletMetadata.getFileSize();
if (totalSize > maxTotalSize) {
mergeable = false;
}

// TODO: we can check file count as well and remove FindFiles repo
if (!mergeable) {
// TODO: update UnreserveAndError to pass reason, not max files
return new UnreserveAndError(data, totalFiles, 0);
}
}

}

return new MergeTablets(data);
}

}

0 comments on commit 51bb1da

Please sign in to comment.