Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cshannon committed Feb 7, 2025
1 parent 5dffcd6 commit a9c222c
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 11 deletions.
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public enum FateOperation {
NAMESPACE_RENAME(TFateOperation.NAMESPACE_RENAME),
SHUTDOWN_TSERVER(null),
SYSTEM_SPLIT(null),
SYSTEM_MERGE(null),
TABLE_BULK_IMPORT2(TFateOperation.TABLE_BULK_IMPORT2),
TABLE_CANCEL_COMPACT(TFateOperation.TABLE_CANCEL_COMPACT),
TABLE_CLONE(TFateOperation.TABLE_CLONE),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.manager.merge;

import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;

import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.Fate.FateOperation;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.TraceRepo;
import org.apache.accumulo.manager.tableOps.merge.MergeInfo;
import org.apache.accumulo.manager.tableOps.merge.TableRangeOp;
import org.apache.accumulo.manager.tableOps.merge.UnreserveAndError;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY;

public class FindMergeableTask implements Runnable {

private static final Logger log = LoggerFactory.getLogger(FindMergeableTask.class);
private final Manager manager;

public FindMergeableTask(Manager manager) {
this.manager = Objects.requireNonNull(manager);
}

@Override
public void run() {
var context = manager.getContext();
var tableOp = TableOperation.MERGE;
Map<TableId,String> tables = context.getTableIdToNameMap();

for (Entry<TableId,String> table : tables.entrySet()) {
TableId tableId = table.getKey();
String tableName = table.getValue();

long maxFiles = context.getTableConfiguration(tableId)
.getCount(Property.TABLE_MERGE_FILE_MAX);
long threshold = context.getTableConfiguration(tableId)
.getAsBytes(Property.TABLE_SPLIT_THRESHOLD);
double mergeabilityThreshold = .25;
long maxTotalSize = (long) (threshold * mergeabilityThreshold);


try {
NamespaceId namespaceId = context.getNamespaceId(tableId);
var type = FateInstanceType.fromTableId(tableId);

try (var tablets = context.getAmple().readTablets().forTable(tableId)
.fetch(FILES, MERGEABILITY).checkConsistency().build()) {

long totalFiles = 0;
long totalSize = 0;
var currentTime = manager.getSteadyTime();
//boolean mergeable = true;

Text startRow = null;
Text endRow = null;

boolean mergeable = false;
for (var tabletMetadata : tablets) {

if (!tabletMetadata.getTabletMergeability().isMergeable(currentTime)) {
mergeable = false;
}

totalFiles += tabletMetadata.getFiles().size();
if (totalFiles >= maxFiles) {
//log.debug("{} found {} files in the merge range, maxFiles is {}", fateId, totalFiles,
// maxFiles);
mergeable = false;
}

totalSize += tabletMetadata.getFileSize();
if (totalSize > maxTotalSize) {
mergeable = false;
}

if (!mergeable) {

}
}

}

Text startRow = new Text();
Text endRow = new Text();
String startRowStr = StringUtils.defaultIfBlank(startRow.toString(), "-inf");
String endRowStr = StringUtils.defaultIfBlank(startRow.toString(), "+inf");
log.debug("Creating merge op: {} from startRow: {} to endRow: {}", tableId, startRowStr,
endRowStr);
var fateId = manager.fate(type).startTransaction();
String goalMessage = tableOp + " Merge table " + tableName + "(" + tableId
+ ") splits from " + startRowStr + " to " + endRowStr;

manager.fate(type).seedTransaction(FateOperation.SYSTEM_MERGE, fateId, new TraceRepo<>(
new TableRangeOp(MergeInfo.Operation.MERGE, namespaceId, tableId, startRow, endRow)),
true, goalMessage);
} catch (Exception e) {
log.error("Failed to generate system merges for {}", tableName, e);
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.accumulo.manager.tableOps.merge;

import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
import static org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation.SYSTEM_MERGE;

import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.FateId;
Expand All @@ -28,6 +29,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

public class CountFiles extends ManagerRepo {
private static final Logger log = LoggerFactory.getLogger(CountFiles.class);
private static final long serialVersionUID = 1L;
Expand All @@ -39,6 +42,9 @@ public CountFiles(MergeInfo mergeInfo) {

@Override
public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
// SYSTEM_MERGE should be executing VerifyMergeability repo, which already
// will count files
Preconditions.checkState(data.op != SYSTEM_MERGE, "Unexpected op %s", SYSTEM_MERGE);

var range = data.getReserveExtent();

Expand Down Expand Up @@ -75,10 +81,13 @@ public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
if (totalFiles >= maxFiles) {
return new UnreserveAndError(data, totalFiles, maxFiles);
} else {
if (data.op == MergeInfo.Operation.MERGE) {
return new MergeTablets(data);
} else {
return new DeleteRows(data);
switch (data.op) {
case MERGE:
return new MergeTablets(data);
case DELETE:
return new DeleteRows(data);
default:
throw new IllegalStateException("Unknown op " + data.op);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.accumulo.manager.tableOps.merge;

import java.io.Serializable;
import java.util.Objects;

import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
Expand All @@ -37,7 +38,11 @@ public class MergeInfo implements Serializable {
private static final long serialVersionUID = 1L;

public enum Operation {
MERGE, DELETE,
MERGE, SYSTEM_MERGE, DELETE;

public boolean isMergeOp() {
return this == MERGE || this == SYSTEM_MERGE;
}
}

final TableId tableId;
Expand All @@ -60,7 +65,7 @@ private MergeInfo(TableId tableId, NamespaceId namespaceId, byte[] startRow, byt
this.namespaceId = namespaceId;
this.startRow = startRow;
this.endRow = endRow;
this.op = op;
this.op = Objects.requireNonNull(op);
this.mergeRangeSet = mergeRange != null;
if (mergeRange != null) {
mergeStartRow =
Expand Down Expand Up @@ -102,6 +107,7 @@ public KeyExtent getMergeExtent() {
public KeyExtent getReserveExtent() {
switch (op) {
case MERGE:
case SYSTEM_MERGE:
return getOriginalExtent();
case DELETE: {
if (endRow == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ public long isReady(FateId fateId, Manager env) throws Exception {

@Override
public Repo<Manager> call(FateId fateId, Manager environment) throws Exception {
return new CountFiles(data);
switch (data.op) {
case SYSTEM_MERGE:
return new VerifyMergeability(data);
case MERGE:
case DELETE:
return new CountFiles(data);
default:
throw new IllegalStateException("Unknown op " + data.op);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ public TableRangeOp(MergeInfo.Operation op, NamespaceId namespaceId, TableId tab
@Override
public Repo<Manager> call(FateId fateId, Manager env) throws Exception {

if (AccumuloTable.ROOT.tableId().equals(data.tableId)
&& MergeInfo.Operation.MERGE.equals(data.op)) {
if (AccumuloTable.ROOT.tableId().equals(data.tableId) && data.op.isMergeOp()) {
log.warn("Attempt to merge tablets for {} does nothing. It is not splittable.",
AccumuloTable.ROOT.tableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public UnreserveAndError(MergeInfo mergeInfo, long totalFiles, long maxFiles) {
public Repo<Manager> call(FateId fateId, Manager environment) throws Exception {
FinishTableRangeOp.removeOperationIds(log, mergeInfo, fateId, environment);
throw new AcceptableThriftTableOperationException(mergeInfo.tableId.toString(), null,
mergeInfo.op == MergeInfo.Operation.MERGE ? TableOperation.MERGE
: TableOperation.DELETE_RANGE,
mergeInfo.op.isMergeOp() ? TableOperation.MERGE : TableOperation.DELETE_RANGE,
TableOperationExceptionType.OTHER,
"Aborted merge because it would produce a tablets with more files than the configured limit of "
+ maxFiles + ". Observed " + totalFiles + " files in the merge range.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.manager.tableOps.merge;

import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY;

import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

public class VerifyMergeability extends ManagerRepo {
private static final Logger log = LoggerFactory.getLogger(VerifyMergeability.class);
private static final long serialVersionUID = 1L;
private final MergeInfo data;

// Only used for System merges
VerifyMergeability(MergeInfo mergeInfo) {
this.data = mergeInfo;
Preconditions.checkArgument(data.op == Operation.SYSTEM_MERGE, "Must be a System Merge");
}

@Override
public Repo<Manager> call(FateId fateId, Manager env) throws Exception {

var range = data.getReserveExtent();

var currentTime = env.getSteadyTime();
var context = env.getContext();
var tableConf = context.getTableConfiguration(data.tableId);
var threshold = tableConf.getAsBytes(Property.TABLE_SPLIT_THRESHOLD);

// max percentage of split threshold
double mergeabilityThreshold = .25;
long maxTotalSize = (long) (threshold * mergeabilityThreshold);
long totalFiles = 0;

long maxFiles = env.getContext().getTableConfiguration(data.getOriginalExtent().tableId())
.getCount(Property.TABLE_MERGE_FILE_MAX);

try (var tablets = env.getContext().getAmple().readTablets().forTable(data.tableId)
.overlapping(range.prevEndRow(), range.endRow()).fetch(FILES, MERGEABILITY)
.checkConsistency().build()) {

boolean mergeable = true;
long totalSize = 0;

for (var tabletMetadata : tablets) {
if (!tabletMetadata.getTabletMergeability().isMergeable(currentTime)) {
mergeable = false;
}

totalFiles += tabletMetadata.getFiles().size();
if (totalFiles >= maxFiles) {
log.debug("{} found {} files in the merge range, maxFiles is {}", fateId, totalFiles,
maxFiles);
mergeable = false;
}

totalSize += tabletMetadata.getFileSize();
if (totalSize > maxTotalSize) {
mergeable = false;
}

if (!mergeable) {
// TODO: update UnreserveAndError to pass reason, not always max files
return new UnreserveAndError(data, totalFiles, 0);
}
}

}

return new MergeTablets(data);
}

}

0 comments on commit a9c222c

Please sign in to comment.