Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cshannon committed Dec 13, 2024
1 parent 69f226e commit 9bb779b
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.client.admin;

import java.time.Duration;
import java.util.Objects;

import com.google.common.base.Preconditions;

public class TabletMergeability {

public static final TabletMergeability NEVER = new TabletMergeability(Duration.ofNanos(-1));

public static final TabletMergeability ALWAYS = new TabletMergeability(Duration.ofNanos(0));

private final Duration delay;

private TabletMergeability(Duration delay) {
this.delay = Objects.requireNonNull(delay);
}

public boolean isNever() {
return this.delay.isNegative();
}

public boolean isAlways() {
return this.delay.isZero();
}

public Duration getDelay() {
return delay;
}

public static TabletMergeability from(Duration delay) {
Preconditions.checkArgument(delay.getNano() >= -1,
"Duration of delay must be -1, 0, or a positive delay.");
return new TabletMergeability(delay);
}

public static TabletMergeability after(Duration delay) {
Preconditions.checkArgument(delay.getNano() > 0, "Duration of delay must be greater than 0.");
return new TabletMergeability(delay);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.metadata;

import java.time.Duration;

import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.time.SteadyTime;

public class TabletMergeabilityUtil {

public static Value toValue(TabletMergeability tabletMergeability) {
return new Value(Long.toString(tabletMergeability.getDelay().toNanos()));
}

public static TabletMergeability fromValue(Value value) {
return TabletMergeability.from(Duration.ofNanos(Long.parseLong(value.toString())));
}

public boolean isMergeable(TabletMergeability mergeability, SteadyTime currentTime) {
if (mergeability.isNever()) {
return false;
}
return currentTime.getDuration().compareTo(mergeability.getDelay()) >= 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
Expand Down Expand Up @@ -392,6 +393,8 @@ interface TabletUpdates<T> {

T putCloned();

T putTabletMergeability(TabletMergeability tabletMergeability);

/**
* By default the server lock is automatically added to mutations unless this method is set to
* false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ public static class TabletColumnFamily {
public static final String REQUESTED_QUAL = "requestToHost";
public static final ColumnFQ REQUESTED_COLUMN = new ColumnFQ(NAME, new Text(REQUESTED_QUAL));

public static final String MERGEABILITY_QUAL = "mergeability";
public static final ColumnFQ MERGEABILITY_COLUMN =
new ColumnFQ(NAME, new Text(MERGEABILITY_QUAL));

public static Value encodePrevEndRow(Text per) {
if (per == null) {
return new Value(new byte[] {0});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.SELECTED_QUAL;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_QUAL;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.AVAILABILITY_QUAL;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.MERGEABILITY_QUAL;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_QUAL;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.REQUESTED_QUAL;

Expand All @@ -41,6 +42,7 @@
import java.util.Set;

import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.TabletAvailabilityUtil;
import org.apache.accumulo.core.data.ByteSequence;
Expand All @@ -58,6 +60,7 @@
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.SuspendingTServer;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletMergeabilityUtil;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily;
Expand Down Expand Up @@ -123,6 +126,7 @@ public class TabletMetadata {
private final Set<FateId> compacted;
private final Set<FateId> userCompactionsRequested;
private final UnSplittableMetadata unSplittableMetadata;
private final TabletMergeability mergeability;
private final Supplier<Long> fileSize;

private TabletMetadata(Builder tmBuilder) {
Expand Down Expand Up @@ -155,6 +159,7 @@ private TabletMetadata(Builder tmBuilder) {
this.compacted = tmBuilder.compacted.build();
this.userCompactionsRequested = tmBuilder.userCompactionsRequested.build();
this.unSplittableMetadata = tmBuilder.unSplittableMetadata;
this.mergeability = Objects.requireNonNull(tmBuilder.mergeability);
this.fileSize = Suppliers.memoize(() -> {
// This code was using a java stream. While profiling SplitMillionIT, the stream was showing
// up as hot when scanning 1 million tablets. Converted to a for loop to improve performance.
Expand Down Expand Up @@ -198,7 +203,8 @@ public enum ColumnType {
SELECTED,
COMPACTED,
USER_COMPACTION_REQUESTED,
UNSPLITTABLE
UNSPLITTABLE,
MERGEABILITY
}

public static class Location {
Expand Down Expand Up @@ -439,6 +445,11 @@ public UnSplittableMetadata getUnSplittable() {
return unSplittableMetadata;
}

public TabletMergeability getTabletMergeability() {
ensureFetched(ColumnType.MERGEABILITY);
return mergeability;
}

@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("tableId", tableId)
Expand Down Expand Up @@ -527,6 +538,9 @@ public static <E extends Entry<Key,Value>> TabletMetadata convertRow(Iterator<E>
case REQUESTED_QUAL:
tmBuilder.onDemandHostingRequested(true);
break;
case MERGEABILITY_QUAL:
tmBuilder.mergeability(TabletMergeabilityUtil.fromValue(kv.getValue()));
break;
default:
throw new IllegalStateException("Unexpected TabletColumnFamily qualifier: " + qual);
}
Expand Down Expand Up @@ -689,7 +703,7 @@ static class Builder {
private final ImmutableSet.Builder<FateId> compacted = ImmutableSet.builder();
private final ImmutableSet.Builder<FateId> userCompactionsRequested = ImmutableSet.builder();
private UnSplittableMetadata unSplittableMetadata;
// private Supplier<Long> fileSize;
private TabletMergeability mergeability = TabletMergeability.NEVER;

void table(TableId tableId) {
this.tableId = tableId;
Expand Down Expand Up @@ -799,6 +813,10 @@ void unSplittableMetadata(UnSplittableMetadata unSplittableMetadata) {
this.unSplittableMetadata = unSplittableMetadata;
}

void mergeability(TabletMergeability mergeability) {
this.mergeability = mergeability;
}

void keyValue(Entry<Key,Value> kv) {
if (this.keyValues == null) {
this.keyValues = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGEABILITY;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.MERGED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
Expand All @@ -48,6 +49,7 @@
import java.util.TreeMap;

import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
Expand Down Expand Up @@ -312,6 +314,13 @@ public TabletMetadataBuilder putCloned() {
return this;
}

@Override
public TabletMetadataBuilder putTabletMergeability(TabletMergeability tabletMergeability) {
fetched.add(MERGEABILITY);
internalBuilder.putTabletMergeability(tabletMergeability);
return this;
}

@Override
public TabletMetadataBuilder automaticallyPutServerLock(boolean b) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;

import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.clientImpl.TabletAvailabilityUtil;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
Expand All @@ -35,6 +36,7 @@
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.SuspendingTServer;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletMergeabilityUtil;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily;
Expand Down Expand Up @@ -390,6 +392,13 @@ public T automaticallyPutServerLock(boolean b) {
return getThis();
}

@Override
public T putTabletMergeability(TabletMergeability tabletMergeability) {
TabletColumnFamily.MERGEABILITY_COLUMN.put(mutation,
TabletMergeabilityUtil.toValue(tabletMergeability));
return getThis();
}

public void setCloseAfterMutate(AutoCloseable closeable) {
this.closeAfterMutate = closeable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SplitColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.metadata.schema.filters.TabletMetadataFilter;
Expand Down Expand Up @@ -394,6 +395,9 @@ public Options fetch(ColumnType... colsToFetch) {
case UNSPLITTABLE:
qualifiers.add(SplitColumnFamily.UNSPLITTABLE_COLUMN);
break;
case MERGEABILITY:
qualifiers.add(TabletColumnFamily.MERGEABILITY_COLUMN);
break;
default:
throw new IllegalArgumentException("Unknown col type " + colToFetch);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.SuspendingTServer;
import org.apache.accumulo.core.metadata.TabletMergeabilityUtil;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
Expand Down Expand Up @@ -297,6 +298,8 @@ public String getViolationDescription(short violationCode) {
return "Invalid unsplittable column";
case 4005:
return "Malformed availability value";
case 4006:
return "Malformed mergeability value";

}
return null;
Expand Down Expand Up @@ -376,6 +379,13 @@ private void validateTabletFamily(ArrayList<Short> violations, ColumnUpdate colu
addViolation(violations, 4005);
}
break;
case (TabletColumnFamily.MERGEABILITY_QUAL):
try {
TabletMergeabilityUtil.fromValue(new Value(columnUpdate.getValue()));
} catch (IllegalArgumentException e) {
addViolation(violations, 4006);
}
break;
}
}

Expand Down

0 comments on commit 9bb779b

Please sign in to comment.