Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cshannon committed Feb 8, 2025
1 parent bf0d160 commit cfa82eb
Show file tree
Hide file tree
Showing 10 changed files with 373 additions and 12 deletions.
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public enum FateOperation {
NAMESPACE_RENAME(TFateOperation.NAMESPACE_RENAME),
SHUTDOWN_TSERVER(null),
SYSTEM_SPLIT(null),
SYSTEM_MERGE(null),
TABLE_BULK_IMPORT2(TFateOperation.TABLE_BULK_IMPORT2),
TABLE_CANCEL_COMPACT(TFateOperation.TABLE_CANCEL_COMPACT),
TABLE_CLONE(TFateOperation.TABLE_CLONE),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.manager.merge;

import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY;

import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;

import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.Fate.FateOperation;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.TraceRepo;
import org.apache.accumulo.manager.tableOps.merge.MergeInfo;
import org.apache.accumulo.manager.tableOps.merge.TableRangeOp;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FindMergeableRangeTask implements Runnable {

private static final Logger log = LoggerFactory.getLogger(FindMergeableRangeTask.class);
private final Manager manager;

public FindMergeableRangeTask(Manager manager) {
this.manager = Objects.requireNonNull(manager);
}

@Override
public void run() {
var context = manager.getContext();
Map<TableId,String> tables = context.getTableIdToNameMap();

for (Entry<TableId,String> table : tables.entrySet()) {
TableId tableId = table.getKey();
String tableName = table.getValue();

long maxFileCount =
context.getTableConfiguration(tableId).getCount(Property.TABLE_MERGE_FILE_MAX);
long threshold =
context.getTableConfiguration(tableId).getAsBytes(Property.TABLE_SPLIT_THRESHOLD);
double mergeabilityThreshold = .25;
long maxTotalSize = (long) (threshold * mergeabilityThreshold);

try {
NamespaceId namespaceId = context.getNamespaceId(tableId);
var type = FateInstanceType.fromTableId(tableId);

try (var tablets = context.getAmple().readTablets().forTable(tableId)
.fetch(FILES, MERGEABILITY).checkConsistency().build()) {

final MergeableRange current =
new MergeableRange(manager.getSteadyTime(), maxFileCount, maxTotalSize);

for (var tabletMetadata : tablets) {

if (!current.add(tabletMetadata)) {
submit(current, type, table, namespaceId);
current.resetAndAdd(tabletMetadata);
}

// log.debug("{} found {} files in the merge range, maxFiles is {}", fateId,
// totalFiles,
// maxFiles);
// mergeable = false;
// }
}

submit(current, type, table, namespaceId);
}

} catch (Exception e) {
log.error("Failed to generate system merges for {}", tableName, e);
}
}

}

void submit(MergeableRange range, FateInstanceType type, Entry<TableId,String> table,
NamespaceId namespaceId) {
if (range.tabletCount < 2) {
return;
}

TableId tableId = table.getKey();
String tableName = table.getValue();

String startRowStr = StringUtils.defaultIfBlank(range.startRow.toString(), "-inf");
String endRowStr = StringUtils.defaultIfBlank(range.startRow.toString(), "+inf");
log.debug("Creating merge op: {} from startRow: {} to endRow: {}", tableId, startRowStr,
endRowStr);
var fateId = manager.fate(type).startTransaction();
String goalMessage = TableOperation.MERGE + " Merge table " + tableName + "(" + tableId
+ ") splits from " + startRowStr + " to " + endRowStr;

manager.fate(type).seedTransaction(FateOperation.SYSTEM_MERGE, fateId,
new TraceRepo<>(new TableRangeOp(MergeInfo.Operation.MERGE, namespaceId, tableId,
range.startRow, range.endRow)),
true, goalMessage);
}

static class MergeableRange {
final SteadyTime currentTime;
final long maxFileCount;
final long maxTotalSize;

Text startRow;
Text endRow;
int tabletCount;
long totalFileCount = 0;
long totalFileSize = 0;

MergeableRange(SteadyTime currentTime, long maxFileCount, long maxTotalSize) {
this.currentTime = currentTime;
this.maxFileCount = maxFileCount;
this.maxTotalSize = maxTotalSize;
}

boolean add(TabletMetadata tm) {
if (validate(tm)) {
tabletCount++;
if (startRow == null) {
startRow = tm.getEndRow();
} else {
endRow = tm.getEndRow();
}
totalFileCount += tm.getFiles().size();
totalFileSize += tm.getFileSize();
return true;
}
return false;
}

private boolean validate(TabletMetadata tm) {
if (!tm.getTabletMergeability().isMergeable(currentTime)) {
return false;
}

if (totalFileCount + tm.getFiles().size() > maxFileCount) {
return false;
}

return totalFileSize + tm.getFileSize() <= maxTotalSize;
}

void resetAndAdd(TabletMetadata tm) {
reset();
add(tm);
}

void reset() {
startRow = null;
endRow = null;
tabletCount = 0;
totalFileCount = 0;
totalFileSize = 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.accumulo.manager.tableOps.merge;

import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
import static org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation.SYSTEM_MERGE;

import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.FateId;
Expand All @@ -28,6 +29,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

public class CountFiles extends ManagerRepo {
private static final Logger log = LoggerFactory.getLogger(CountFiles.class);
private static final long serialVersionUID = 1L;
Expand All @@ -39,6 +42,9 @@ public CountFiles(MergeInfo mergeInfo) {

@Override
public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
// SYSTEM_MERGE should be executing VerifyMergeability repo, which already
// will count files
Preconditions.checkState(data.op != SYSTEM_MERGE, "Unexpected op %s", SYSTEM_MERGE);

var range = data.getReserveExtent();

Expand Down Expand Up @@ -75,10 +81,13 @@ public Repo<Manager> call(FateId fateId, Manager env) throws Exception {
if (totalFiles >= maxFiles) {
return new UnreserveAndError(data, totalFiles, maxFiles);
} else {
if (data.op == MergeInfo.Operation.MERGE) {
return new MergeTablets(data);
} else {
return new DeleteRows(data);
switch (data.op) {
case MERGE:
return new MergeTablets(data);
case DELETE:
return new DeleteRows(data);
default:
throw new IllegalStateException("Unknown op " + data.op);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.accumulo.manager.tableOps.merge;

import java.io.Serializable;
import java.util.Objects;

import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
Expand All @@ -37,7 +38,11 @@ public class MergeInfo implements Serializable {
private static final long serialVersionUID = 1L;

public enum Operation {
MERGE, DELETE,
MERGE, SYSTEM_MERGE, DELETE;

public boolean isMergeOp() {
return this == MERGE || this == SYSTEM_MERGE;
}
}

final TableId tableId;
Expand All @@ -60,7 +65,7 @@ private MergeInfo(TableId tableId, NamespaceId namespaceId, byte[] startRow, byt
this.namespaceId = namespaceId;
this.startRow = startRow;
this.endRow = endRow;
this.op = op;
this.op = Objects.requireNonNull(op);
this.mergeRangeSet = mergeRange != null;
if (mergeRange != null) {
mergeStartRow =
Expand Down Expand Up @@ -102,6 +107,7 @@ public KeyExtent getMergeExtent() {
public KeyExtent getReserveExtent() {
switch (op) {
case MERGE:
case SYSTEM_MERGE:
return getOriginalExtent();
case DELETE: {
if (endRow == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ public long isReady(FateId fateId, Manager env) throws Exception {

@Override
public Repo<Manager> call(FateId fateId, Manager environment) throws Exception {
return new CountFiles(data);
switch (data.op) {
case SYSTEM_MERGE:
return new VerifyMergeability(data);
case MERGE:
case DELETE:
return new CountFiles(data);
default:
throw new IllegalStateException("Unknown op " + data.op);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ public TableRangeOp(MergeInfo.Operation op, NamespaceId namespaceId, TableId tab
@Override
public Repo<Manager> call(FateId fateId, Manager env) throws Exception {

if (AccumuloTable.ROOT.tableId().equals(data.tableId)
&& MergeInfo.Operation.MERGE.equals(data.op)) {
if (AccumuloTable.ROOT.tableId().equals(data.tableId) && data.op.isMergeOp()) {
log.warn("Attempt to merge tablets for {} does nothing. It is not splittable.",
AccumuloTable.ROOT.tableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public UnreserveAndError(MergeInfo mergeInfo, long totalFiles, long maxFiles) {
public Repo<Manager> call(FateId fateId, Manager environment) throws Exception {
FinishTableRangeOp.removeOperationIds(log, mergeInfo, fateId, environment);
throw new AcceptableThriftTableOperationException(mergeInfo.tableId.toString(), null,
mergeInfo.op == MergeInfo.Operation.MERGE ? TableOperation.MERGE
: TableOperation.DELETE_RANGE,
mergeInfo.op.isMergeOp() ? TableOperation.MERGE : TableOperation.DELETE_RANGE,
TableOperationExceptionType.OTHER,
"Aborted merge because it would produce a tablets with more files than the configured limit of "
+ maxFiles + ". Observed " + totalFiles + " files in the merge range.");
Expand Down
Loading

0 comments on commit cfa82eb

Please sign in to comment.