From 0dbf2a00badd86c75798a03a70234e44add3ee16 Mon Sep 17 00:00:00 2001 From: arbaazkhan1 Date: Tue, 5 Nov 2024 10:42:32 -0500 Subject: [PATCH 1/2] Status & Reservation columns are now in their own column families --- .../accumulo/core/fate/user/FateMutator.java | 8 +++--- .../core/fate/user/FateMutatorImpl.java | 21 ++++++++------ .../core/fate/user/RowFateStatusFilter.java | 2 +- .../core/fate/user/StatusMappingIterator.java | 2 +- .../core/fate/user/UserFateStore.java | 28 ++++++++++--------- .../core/fate/user/schema/FateSchema.java | 21 ++++++++++---- .../server/init/InitialConfiguration.java | 6 +++- .../accumulo/test/fate/user/UserFateIT.java | 4 +-- .../test/fate/user/UserFateStoreIT.java | 2 +- 9 files changed, 58 insertions(+), 36 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java index d199a7463e4..f582c05e72e 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java @@ -34,8 +34,8 @@ public interface FateMutator { FateMutator putCreateTime(long ctime); /** - * Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will - * put the reservation if there is not already a reservation present + * Add a conditional mutation to {@link FateSchema.ReservationColumnFamily#RESERVATION_COLUMN} + * that will put the reservation if there is not already a reservation present * * @param reservation the reservation to attempt to put * @return the FateMutator with this added mutation @@ -43,8 +43,8 @@ public interface FateMutator { FateMutator putReservedTx(FateStore.FateReservation reservation); /** - * Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will - * delete the column if the column value matches the given reservation + * Add a conditional mutation to {@link FateSchema.ReservationColumnFamily#RESERVATION_COLUMN} + * that will delete the column if the column value matches the given reservation * * @param reservation the reservation to attempt to remove * @return the FateMutator with this added mutation diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java index 5d99a8df3a3..e2806cb6d8a 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java @@ -43,6 +43,8 @@ import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily; +import org.apache.accumulo.core.fate.user.schema.FateSchema.ReservationColumnFamily; +import org.apache.accumulo.core.fate.user.schema.FateSchema.StatusColumnFamily; import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily; import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily; import org.apache.accumulo.core.security.Authorizations; @@ -64,7 +66,7 @@ public FateMutatorImpl(ClientContext context, String tableName, FateId fateId) { @Override public FateMutator putStatus(TStatus status) { - TxColumnFamily.STATUS_COLUMN.put(mutation, new Value(status.name())); + StatusColumnFamily.STATUS_COLUMN.put(mutation, new Value(status.name())); return this; } @@ -82,20 +84,23 @@ public FateMutator putCreateTime(long ctime) { @Override public FateMutator putReservedTx(FateStore.FateReservation reservation) { - Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), - TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); + Condition condition = + new Condition(ReservationColumnFamily.RESERVATION_COLUMN.getColumnFamily(), + ReservationColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); mutation.addCondition(condition); - TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(reservation.getSerialized())); + ReservationColumnFamily.RESERVATION_COLUMN.put(mutation, + new Value(reservation.getSerialized())); return this; } @Override public FateMutator putUnreserveTx(FateStore.FateReservation reservation) { - Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), - TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()) - .setValue(reservation.getSerialized()); + Condition condition = + new Condition(ReservationColumnFamily.RESERVATION_COLUMN.getColumnFamily(), + ReservationColumnFamily.RESERVATION_COLUMN.getColumnQualifier()) + .setValue(reservation.getSerialized()); mutation.addCondition(condition); - TxColumnFamily.RESERVATION_COLUMN.putDelete(mutation); + ReservationColumnFamily.RESERVATION_COLUMN.putDelete(mutation); return this; } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java b/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java index f6a6b3bef59..a8992e69e9c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java @@ -58,7 +58,7 @@ public void init(SortedKeyValueIterator source, Map op protected boolean filter(Text currentRow, List keys, List values) { for (int i = 0; i < keys.size(); i++) { Key key = keys.get(i); - if (FateSchema.TxColumnFamily.STATUS_COLUMN.hasColumns(key) + if (FateSchema.StatusColumnFamily.STATUS_COLUMN.hasColumns(key) && valuesToAccept.contains(ReadOnlyFateStore.TStatus.valueOf(values.get(i).toString()))) { return true; } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/StatusMappingIterator.java b/core/src/main/java/org/apache/accumulo/core/fate/user/StatusMappingIterator.java index 1a0fae5aa3e..06b6f690d67 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/StatusMappingIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/StatusMappingIterator.java @@ -18,7 +18,7 @@ */ package org.apache.accumulo.core.fate.user; -import static org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily.STATUS_COLUMN; +import static org.apache.accumulo.core.fate.user.schema.FateSchema.StatusColumnFamily.STATUS_COLUMN; import java.io.IOException; import java.util.Arrays; diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index 7446d1fafe3..0d3f0377a06 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -48,6 +48,8 @@ import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.StackOverflowException; import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily; +import org.apache.accumulo.core.fate.user.schema.FateSchema.ReservationColumnFamily; +import org.apache.accumulo.core.fate.user.schema.FateSchema.StatusColumnFamily; import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily; import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; @@ -158,12 +160,12 @@ public Optional> createAndReserve(FateKey fateKey) { // 4) If the fate id is still NEW/unseeded and unreserved, we can try to reserve it try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { scanner.setRange(getRow(fateId)); - scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(), - TxColumnFamily.STATUS_COLUMN.getColumnQualifier()); + scanner.fetchColumn(StatusColumnFamily.STATUS_COLUMN.getColumnFamily(), + StatusColumnFamily.STATUS_COLUMN.getColumnQualifier()); scanner.fetchColumn(TxColumnFamily.TX_KEY_COLUMN.getColumnFamily(), TxColumnFamily.TX_KEY_COLUMN.getColumnQualifier()); - scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), - TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); + scanner.fetchColumn(ReservationColumnFamily.RESERVATION_COLUMN.getColumnFamily(), + ReservationColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); TStatus statusSeen = TStatus.UNKNOWN; Optional fateKeySeen = Optional.empty(); Optional reservationSeen = Optional.empty(); @@ -174,13 +176,13 @@ public Optional> createAndReserve(FateKey fateKey) { Value val = entry.getValue(); switch (colq.toString()) { - case TxColumnFamily.STATUS: + case StatusColumnFamily.STATUS: statusSeen = TStatus.valueOf(val.toString()); break; case TxColumnFamily.TX_KEY: fateKeySeen = Optional.of(FateKey.deserialize(val.get())); break; - case TxColumnFamily.RESERVATION: + case ReservationColumnFamily.RESERVATION: reservationSeen = Optional.of(FateReservation.deserialize(val.get())); break; default: @@ -243,8 +245,8 @@ public Optional> tryReserve(FateId fateId) { // attempt or was not written at all). try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { scanner.setRange(getRow(fateId)); - scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), - TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); + scanner.fetchColumn(ReservationColumnFamily.RESERVATION_COLUMN.getColumnFamily(), + ReservationColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); FateReservation persistedRes = scanner.stream().map(entry -> FateReservation.deserialize(entry.getValue().get())) .findFirst().orElse(null); @@ -289,8 +291,8 @@ protected Stream getTransactions(EnumSet statuses) { Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY); scanner.setRange(new Range()); RowFateStatusFilter.configureScanner(scanner, statuses); - TxColumnFamily.STATUS_COLUMN.fetch(scanner); - TxColumnFamily.RESERVATION_COLUMN.fetch(scanner); + StatusColumnFamily.STATUS_COLUMN.fetch(scanner); + ReservationColumnFamily.RESERVATION_COLUMN.fetch(scanner); return scanner.stream().onClose(scanner::close).map(e -> { String txUUIDStr = e.getKey().getRow().toString(); FateId fateId = FateId.from(fateInstanceType, txUUIDStr); @@ -312,10 +314,10 @@ protected Stream getTransactions(EnumSet statuses) { Text colq = entry.getKey().getColumnQualifier(); Value val = entry.getValue(); switch (colq.toString()) { - case TxColumnFamily.STATUS: + case StatusColumnFamily.STATUS: status = TStatus.valueOf(val.toString()); break; - case TxColumnFamily.RESERVATION: + case ReservationColumnFamily.RESERVATION: reservation = FateReservation.deserialize(val.get()); break; default: @@ -359,7 +361,7 @@ public Stream list(FateKey.FateKeyType type) { protected TStatus _getStatus(FateId fateId) { return scanTx(scanner -> { scanner.setRange(getRow(fateId)); - TxColumnFamily.STATUS_COLUMN.fetch(scanner); + StatusColumnFamily.STATUS_COLUMN.fetch(scanner); return scanner.stream().map(e -> TStatus.valueOf(e.getValue().toString())).findFirst() .orElse(TStatus.UNKNOWN); }); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java b/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java index 012e2853ff2..5e0f3bb8226 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java @@ -27,17 +27,12 @@ public static class TxColumnFamily { public static final String STR_NAME = "tx"; public static final Text NAME = new Text(STR_NAME); - public static final String STATUS = "status"; - public static final ColumnFQ STATUS_COLUMN = new ColumnFQ(NAME, new Text(STATUS)); - public static final String TX_KEY = "txkey"; public static final ColumnFQ TX_KEY_COLUMN = new ColumnFQ(NAME, new Text(TX_KEY)); public static final String CREATE_TIME = "ctime"; public static final ColumnFQ CREATE_TIME_COLUMN = new ColumnFQ(NAME, new Text(CREATE_TIME)); - public static final String RESERVATION = "reservation"; - public static final ColumnFQ RESERVATION_COLUMN = new ColumnFQ(NAME, new Text(RESERVATION)); } public static class TxInfoColumnFamily { @@ -65,4 +60,20 @@ public static class RepoColumnFamily { public static final Text NAME = new Text(STR_NAME); } + public static class StatusColumnFamily { + public static final String STR_NAME = "status"; + public static final Text NAME = new Text(STR_NAME); + + public static final String STATUS = "status"; + public static final ColumnFQ STATUS_COLUMN = new ColumnFQ(NAME, new Text(STATUS)); + } + + public static class ReservationColumnFamily { + public static final String STR_NAME = "reservation"; + public static final Text NAME = new Text(STR_NAME); + + public static final String RESERVATION = "reservation"; + public static final ColumnFQ RESERVATION_COLUMN = new ColumnFQ(NAME, new Text(RESERVATION)); + } + } diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java index 0f8a382fa2d..a17b237a52d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java @@ -91,9 +91,13 @@ public InitialConfiguration(Configuration hadoopConf, SiteConfiguration siteConf // Create a locality group that contains status so its fast to scan. When fate looks for work is // scans this family. initialFateTableConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "status", - FateSchema.TxColumnFamily.STR_NAME); + FateSchema.StatusColumnFamily.STR_NAME); initialFateTableConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "status"); + initialFateTableConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "reservation", + FateSchema.ReservationColumnFamily.STR_NAME); + initialFateTableConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "reservation"); + initialScanRefTableConf.putAll(commonConfig); int max = hadoopConf.getInt("dfs.replication.max", 512); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java index 7f0383e6f4c..ab6ae2821ef 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java @@ -32,7 +32,7 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.user.UserFateStore; -import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily; +import org.apache.accumulo.core.fate.user.schema.FateSchema; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.server.ServerContext; @@ -70,7 +70,7 @@ public void executeTest(FateTestExecutor testMethod, int maxDeferred, protected TStatus getTxStatus(ServerContext context, FateId fateId) { try (Scanner scanner = context.createScanner(table, Authorizations.EMPTY)) { scanner.setRange(getRow(fateId)); - TxColumnFamily.STATUS_COLUMN.fetch(scanner); + FateSchema.StatusColumnFamily.STATUS_COLUMN.fetch(scanner); return StreamSupport.stream(scanner.spliterator(), false) .map(e -> TStatus.valueOf(e.getValue().toString())).findFirst().orElse(TStatus.UNKNOWN); } catch (TableNotFoundException e) { diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java index c82662182ce..533773b8f26 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java @@ -204,7 +204,7 @@ private void injectStatus(ClientContext client, String table, FateId fateId, TSt throws TableNotFoundException { try (BatchWriter writer = client.createBatchWriter(table)) { Mutation mutation = new Mutation(new Text(fateId.getTxUUIDStr())); - FateSchema.TxColumnFamily.STATUS_COLUMN.put(mutation, new Value(status.name())); + FateSchema.StatusColumnFamily.STATUS_COLUMN.put(mutation, new Value(status.name())); writer.addMutation(mutation); } catch (MutationsRejectedException e) { throw new RuntimeException(e); From 0ec5562bd75c95a18ebf9b27c621758a116bf072 Mon Sep 17 00:00:00 2001 From: arbaazkhan1 Date: Wed, 6 Nov 2024 11:03:17 -0500 Subject: [PATCH 2/2] moved status and reservation into the same locality group --- .../server/init/InitialConfiguration.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java index a17b237a52d..40b6d1a9397 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java @@ -88,15 +88,12 @@ public InitialConfiguration(Configuration hadoopConf, SiteConfiguration siteConf initialFateTableConf.putAll(commonConfig); initialFateTableConf.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "256M"); - // Create a locality group that contains status so its fast to scan. When fate looks for work is - // scans this family. - initialFateTableConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "status", - FateSchema.StatusColumnFamily.STR_NAME); - initialFateTableConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "status"); - - initialFateTableConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "reservation", - FateSchema.ReservationColumnFamily.STR_NAME); - initialFateTableConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "reservation"); + // Create a locality group that contains status and reservation so its fast to scan. When fate + // looks for work is scans this family. + initialFateTableConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "statusReservation", + String.format("%s,%s", FateSchema.StatusColumnFamily.STR_NAME, + FateSchema.ReservationColumnFamily.STR_NAME)); + initialFateTableConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "statusReservation"); initialScanRefTableConf.putAll(commonConfig);