Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cshannon committed Jan 4, 2025
1 parent 408a941 commit 7b9f389
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class TabletMergeability implements Serializable {
private static final long serialVersionUID = 1L;

public static final TabletMergeability NEVER = new TabletMergeability(Duration.ofNanos(-1));
public static final TabletMergeability NOW = new TabletMergeability(Duration.ofNanos(0));
public static final TabletMergeability NOW = new TabletMergeability(Duration.ZERO);

private final Duration delay;

Expand All @@ -44,6 +44,10 @@ public boolean isNow() {
return this.delay.isZero();
}

public boolean isFuture() {
return delay.toNanos() > 0;
}

public Duration getDelay() {
return delay;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
Expand Down Expand Up @@ -393,7 +392,7 @@ interface TabletUpdates<T> {

T putCloned();

T putTabletMergeability(TabletMergeability tabletMergeability);
T putTabletMergeability(TabletMergeabilityMetadata tabletMergeability);

/**
* By default the server lock is automatically added to mutations unless this method is set to
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.metadata.schema;

import static org.apache.accumulo.core.util.LazySingletons.GSON;

import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.time.SteadyTime;

import com.google.common.base.Preconditions;

public class TabletMergeabilityMetadata {

public static final TabletMergeabilityMetadata NEVER =
new TabletMergeabilityMetadata(TabletMergeability.NEVER);
public static final TabletMergeabilityMetadata NOW =
new TabletMergeabilityMetadata(TabletMergeability.NOW);

private final TabletMergeability tabletMergeability;
private final Optional<SteadyTime> steadyTime;

private TabletMergeabilityMetadata(TabletMergeability tabletMergeability, SteadyTime steadyTime) {
this.tabletMergeability = Objects.requireNonNull(tabletMergeability);
this.steadyTime = Optional.ofNullable(steadyTime);
Preconditions.checkArgument(tabletMergeability.isFuture() == this.steadyTime.isPresent(),
"SteadyTime must be set if and only if TabletMergeability delay is greater than 0.");
}

private TabletMergeabilityMetadata(TabletMergeability tabletMergeability) {
this(tabletMergeability, null);
}

public TabletMergeability getTabletMergeability() {
return tabletMergeability;
}

public Optional<SteadyTime> getSteadyTime() {
return steadyTime;
}

public boolean isMergeable(SteadyTime currentTime) {
if (tabletMergeability.isNever()) {
return false;
}
return currentTime.getDuration().compareTo(totalDelay()) >= 0;
}

private Duration totalDelay() {
return steadyTime.map(time -> time.getDuration().plus(tabletMergeability.getDelay()))
.orElse(tabletMergeability.getDelay());
}

private static class GSonData {
long delay;
Long steadyTime;
}

String toJson() {
GSonData jData = new GSonData();
jData.delay = tabletMergeability.getDelay().toNanos();
jData.steadyTime = steadyTime.map(SteadyTime::getNanos).orElse(null);
return GSON.get().toJson(jData);
}

static TabletMergeabilityMetadata fromJson(String json) {
GSonData jData = GSON.get().fromJson(json, GSonData.class);
TabletMergeability tabletMergeability = TabletMergeability.from(Duration.ofNanos(jData.delay));
SteadyTime steadyTime =
jData.steadyTime != null ? SteadyTime.from(jData.steadyTime, TimeUnit.NANOSECONDS) : null;
return new TabletMergeabilityMetadata(tabletMergeability, steadyTime);
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
TabletMergeabilityMetadata that = (TabletMergeabilityMetadata) o;
return Objects.equals(tabletMergeability, that.tabletMergeability)
&& Objects.equals(steadyTime, that.steadyTime);
}

@Override
public int hashCode() {
return Objects.hash(tabletMergeability, steadyTime);
}

@Override
public String toString() {
return "TabletMergeabilityMetadata{" + tabletMergeability + ", " + steadyTime.orElse(null)
+ '}';
}

public static TabletMergeabilityMetadata future(TabletMergeability tm, SteadyTime currentTime) {
return new TabletMergeabilityMetadata(tm, currentTime);
}

public static Value toValue(TabletMergeabilityMetadata tmm) {
return new Value(tmm.toJson());
}

public static TabletMergeabilityMetadata fromValue(Value value) {
return TabletMergeabilityMetadata.fromJson(value.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.Set;

import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.TabletAvailabilityUtil;
import org.apache.accumulo.core.data.ByteSequence;
Expand All @@ -60,7 +59,6 @@
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.SuspendingTServer;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletMergeabilityUtil;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily;
Expand Down Expand Up @@ -126,7 +124,7 @@ public class TabletMetadata {
private final Set<FateId> compacted;
private final Set<FateId> userCompactionsRequested;
private final UnSplittableMetadata unSplittableMetadata;
private final TabletMergeability mergeability;
private final TabletMergeabilityMetadata mergeability;
private final Supplier<Long> fileSize;

private TabletMetadata(Builder tmBuilder) {
Expand Down Expand Up @@ -445,7 +443,7 @@ public UnSplittableMetadata getUnSplittable() {
return unSplittableMetadata;
}

public TabletMergeability getTabletMergeability() {
public TabletMergeabilityMetadata getTabletMergeability() {
ensureFetched(ColumnType.MERGEABILITY);
return mergeability;
}
Expand Down Expand Up @@ -540,7 +538,7 @@ public static <E extends Entry<Key,Value>> TabletMetadata convertRow(Iterator<E>
tmBuilder.onDemandHostingRequested(true);
break;
case MERGEABILITY_QUAL:
tmBuilder.mergeability(TabletMergeabilityUtil.fromValue(kv.getValue()));
tmBuilder.mergeability(TabletMergeabilityMetadata.fromValue(kv.getValue()));
break;
default:
throw new IllegalStateException("Unexpected TabletColumnFamily qualifier: " + qual);
Expand Down Expand Up @@ -704,7 +702,7 @@ static class Builder {
private final ImmutableSet.Builder<FateId> compacted = ImmutableSet.builder();
private final ImmutableSet.Builder<FateId> userCompactionsRequested = ImmutableSet.builder();
private UnSplittableMetadata unSplittableMetadata;
private TabletMergeability mergeability = TabletMergeability.NEVER;
private TabletMergeabilityMetadata mergeability = TabletMergeabilityMetadata.NEVER;

void table(TableId tableId) {
this.tableId = tableId;
Expand Down Expand Up @@ -814,7 +812,7 @@ void unSplittableMetadata(UnSplittableMetadata unSplittableMetadata) {
this.unSplittableMetadata = unSplittableMetadata;
}

void mergeability(TabletMergeability mergeability) {
void mergeability(TabletMergeabilityMetadata mergeability) {
this.mergeability = mergeability;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.TreeMap;

import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
Expand Down Expand Up @@ -315,7 +314,8 @@ public TabletMetadataBuilder putCloned() {
}

@Override
public TabletMetadataBuilder putTabletMergeability(TabletMergeability tabletMergeability) {
public TabletMetadataBuilder
putTabletMergeability(TabletMergeabilityMetadata tabletMergeability) {
fetched.add(MERGEABILITY);
internalBuilder.putTabletMergeability(tabletMergeability);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Map;

import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletMergeability;
import org.apache.accumulo.core.clientImpl.TabletAvailabilityUtil;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
Expand All @@ -36,7 +35,6 @@
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.SuspendingTServer;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletMergeabilityUtil;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily;
Expand Down Expand Up @@ -393,9 +391,9 @@ public T automaticallyPutServerLock(boolean b) {
}

@Override
public T putTabletMergeability(TabletMergeability tabletMergeability) {
public T putTabletMergeability(TabletMergeabilityMetadata tabletMergeability) {
TabletColumnFamily.MERGEABILITY_COLUMN.put(mutation,
TabletMergeabilityUtil.toValue(tabletMergeability));
TabletMergeabilityMetadata.toValue(tabletMergeability));
return getThis();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,13 +640,13 @@ public void testBuilder() {
FateId compactFateId1 = FateId.from(type, UUID.randomUUID());
FateId compactFateId2 = FateId.from(type, UUID.randomUUID());

TabletMetadata tm =
TabletMetadata.builder(extent).putTabletAvailability(TabletAvailability.UNHOSTED)
.putLocation(Location.future(ser1)).putFile(sf1, dfv1).putFile(sf2, dfv2)
.putBulkFile(rf1, loadedFateId1).putBulkFile(rf2, loadedFateId2).putFlushId(27)
.putDirName("dir1").putScan(sf3).putScan(sf4).putCompacted(compactFateId1)
.putCompacted(compactFateId2).putCloned().putTabletMergeability(TabletMergeability.NOW)
.build(ECOMP, HOSTING_REQUESTED, MERGED, USER_COMPACTION_REQUESTED, UNSPLITTABLE);
TabletMetadata tm = TabletMetadata.builder(extent)
.putTabletAvailability(TabletAvailability.UNHOSTED).putLocation(Location.future(ser1))
.putFile(sf1, dfv1).putFile(sf2, dfv2).putBulkFile(rf1, loadedFateId1)
.putBulkFile(rf2, loadedFateId2).putFlushId(27).putDirName("dir1").putScan(sf3).putScan(sf4)
.putCompacted(compactFateId1).putCompacted(compactFateId2).putCloned()
.putTabletMergeability(TabletMergeabilityMetadata.NOW)
.build(ECOMP, HOSTING_REQUESTED, MERGED, USER_COMPACTION_REQUESTED, UNSPLITTABLE);

assertEquals(extent, tm.getExtent());
assertEquals(TabletAvailability.UNHOSTED, tm.getTabletAvailability());
Expand All @@ -665,7 +665,7 @@ public void testBuilder() {
assertFalse(tm.hasMerged());
assertNull(tm.getUnSplittable());
assertEquals("OK", tm.getCloned());
assertEquals(TabletMergeability.NOW, tm.getTabletMergeability());
assertEquals(TabletMergeabilityMetadata.NOW, tm.getTabletMergeability());
assertThrows(IllegalStateException.class, tm::getOperationId);
assertThrows(IllegalStateException.class, tm::getSuspend);
assertThrows(IllegalStateException.class, tm::getTime);
Expand Down Expand Up @@ -713,7 +713,10 @@ public void testBuilder() {
.putTime(new MetadataTime(479, TimeType.LOGICAL)).putWal(le1).putWal(le2)
.setHostingRequested().putSelectedFiles(selFiles).setMerged()
.putUserCompactionRequested(selFilesFateId).setUnSplittable(unsplittableMeta)
.putTabletMergeability(TabletMergeability.after(Duration.ofDays(3))).build();
.putTabletMergeability(
TabletMergeabilityMetadata.future(TabletMergeability.after(Duration.ofDays(3)),
SteadyTime.from(45L, TimeUnit.MILLISECONDS)))
.build();

assertEquals(Set.of(ecid1), tm3.getExternalCompactions().keySet());
assertEquals(Set.of(sf1, sf2), tm3.getExternalCompactions().get(ecid1).getJobFiles());
Expand All @@ -730,7 +733,11 @@ public void testBuilder() {
assertTrue(tm3.hasMerged());
assertTrue(tm3.getUserCompactionsRequested().contains(selFilesFateId));
assertEquals(unsplittableMeta, tm3.getUnSplittable());
assertEquals(Duration.ofDays(3), tm3.getTabletMergeability().getDelay());
var tmm = tm3.getTabletMergeability();
assertEquals(Duration.ofDays(3), tmm.getTabletMergeability().getDelay());
assertEquals(SteadyTime.from(45L, TimeUnit.MILLISECONDS), tmm.getSteadyTime().orElseThrow());
assertTrue(tmm.isMergeable(SteadyTime.from(Duration.ofHours(73))));
assertFalse(tmm.isMergeable(SteadyTime.from(Duration.ofHours(72))));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.SuspendingTServer;
import org.apache.accumulo.core.metadata.TabletMergeabilityUtil;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
Expand All @@ -60,6 +59,7 @@
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Upgrade12to13;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily;
import org.apache.accumulo.core.metadata.schema.SelectedFiles;
import org.apache.accumulo.core.metadata.schema.TabletMergeabilityMetadata;
import org.apache.accumulo.core.metadata.schema.TabletOperationId;
import org.apache.accumulo.core.metadata.schema.TabletOperationType;
import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata;
Expand Down Expand Up @@ -382,7 +382,7 @@ private void validateTabletFamily(ArrayList<Short> violations, ColumnUpdate colu
break;
case (TabletColumnFamily.MERGEABILITY_QUAL):
try {
TabletMergeabilityUtil.fromValue(new Value(columnUpdate.getValue()));
TabletMergeabilityMetadata.fromValue(new Value(columnUpdate.getValue()));
} catch (IllegalArgumentException e) {
addViolation(violations, 4006);
}
Expand Down
Loading

0 comments on commit 7b9f389

Please sign in to comment.