Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Status & Reservation columns are now in their own column families #5031

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@ public interface FateMutator<T> {
FateMutator<T> putCreateTime(long ctime);

/**
* Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will
* put the reservation if there is not already a reservation present
* Add a conditional mutation to {@link FateSchema.ReservationColumnFamily#RESERVATION_COLUMN}
* that will put the reservation if there is not already a reservation present
*
* @param reservation the reservation to attempt to put
* @return the FateMutator with this added mutation
*/
FateMutator<T> putReservedTx(FateStore.FateReservation reservation);

/**
* Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will
* delete the column if the column value matches the given reservation
* Add a conditional mutation to {@link FateSchema.ReservationColumnFamily#RESERVATION_COLUMN}
* that will delete the column if the column value matches the given reservation
*
* @param reservation the reservation to attempt to remove
* @return the FateMutator with this added mutation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily;
import org.apache.accumulo.core.fate.user.schema.FateSchema.ReservationColumnFamily;
import org.apache.accumulo.core.fate.user.schema.FateSchema.StatusColumnFamily;
import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily;
import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
Expand All @@ -64,7 +66,7 @@ public FateMutatorImpl(ClientContext context, String tableName, FateId fateId) {

@Override
public FateMutator<T> putStatus(TStatus status) {
TxColumnFamily.STATUS_COLUMN.put(mutation, new Value(status.name()));
StatusColumnFamily.STATUS_COLUMN.put(mutation, new Value(status.name()));
return this;
}

Expand All @@ -82,20 +84,23 @@ public FateMutator<T> putCreateTime(long ctime) {

@Override
public FateMutator<T> putReservedTx(FateStore.FateReservation reservation) {
Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
Condition condition =
new Condition(ReservationColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
ReservationColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
mutation.addCondition(condition);
TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(reservation.getSerialized()));
ReservationColumnFamily.RESERVATION_COLUMN.put(mutation,
new Value(reservation.getSerialized()));
return this;
}

@Override
public FateMutator<T> putUnreserveTx(FateStore.FateReservation reservation) {
Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier())
.setValue(reservation.getSerialized());
Condition condition =
new Condition(ReservationColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
ReservationColumnFamily.RESERVATION_COLUMN.getColumnQualifier())
.setValue(reservation.getSerialized());
mutation.addCondition(condition);
TxColumnFamily.RESERVATION_COLUMN.putDelete(mutation);
ReservationColumnFamily.RESERVATION_COLUMN.putDelete(mutation);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> op
protected boolean filter(Text currentRow, List<Key> keys, List<Value> values) {
for (int i = 0; i < keys.size(); i++) {
Key key = keys.get(i);
if (FateSchema.TxColumnFamily.STATUS_COLUMN.hasColumns(key)
if (FateSchema.StatusColumnFamily.STATUS_COLUMN.hasColumns(key)
&& valuesToAccept.contains(ReadOnlyFateStore.TStatus.valueOf(values.get(i).toString()))) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.accumulo.core.fate.user;

import static org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily.STATUS_COLUMN;
import static org.apache.accumulo.core.fate.user.schema.FateSchema.StatusColumnFamily.STATUS_COLUMN;

import java.io.IOException;
import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.StackOverflowException;
import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily;
import org.apache.accumulo.core.fate.user.schema.FateSchema.ReservationColumnFamily;
import org.apache.accumulo.core.fate.user.schema.FateSchema.StatusColumnFamily;
import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily;
import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
Expand Down Expand Up @@ -158,12 +160,12 @@ public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
// 4) If the fate id is still NEW/unseeded and unreserved, we can try to reserve it
try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) {
scanner.setRange(getRow(fateId));
scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(),
TxColumnFamily.STATUS_COLUMN.getColumnQualifier());
scanner.fetchColumn(StatusColumnFamily.STATUS_COLUMN.getColumnFamily(),
StatusColumnFamily.STATUS_COLUMN.getColumnQualifier());
scanner.fetchColumn(TxColumnFamily.TX_KEY_COLUMN.getColumnFamily(),
TxColumnFamily.TX_KEY_COLUMN.getColumnQualifier());
scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
scanner.fetchColumn(ReservationColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
ReservationColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
TStatus statusSeen = TStatus.UNKNOWN;
Optional<FateKey> fateKeySeen = Optional.empty();
Optional<FateReservation> reservationSeen = Optional.empty();
Expand All @@ -174,13 +176,13 @@ public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
Value val = entry.getValue();

switch (colq.toString()) {
case TxColumnFamily.STATUS:
case StatusColumnFamily.STATUS:
statusSeen = TStatus.valueOf(val.toString());
break;
case TxColumnFamily.TX_KEY:
fateKeySeen = Optional.of(FateKey.deserialize(val.get()));
break;
case TxColumnFamily.RESERVATION:
case ReservationColumnFamily.RESERVATION:
reservationSeen = Optional.of(FateReservation.deserialize(val.get()));
break;
default:
Expand Down Expand Up @@ -243,8 +245,8 @@ public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
// attempt or was not written at all).
try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) {
scanner.setRange(getRow(fateId));
scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
scanner.fetchColumn(ReservationColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
ReservationColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
FateReservation persistedRes =
scanner.stream().map(entry -> FateReservation.deserialize(entry.getValue().get()))
.findFirst().orElse(null);
Expand Down Expand Up @@ -289,8 +291,8 @@ protected Stream<FateIdStatus> getTransactions(EnumSet<TStatus> statuses) {
Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY);
scanner.setRange(new Range());
RowFateStatusFilter.configureScanner(scanner, statuses);
TxColumnFamily.STATUS_COLUMN.fetch(scanner);
TxColumnFamily.RESERVATION_COLUMN.fetch(scanner);
StatusColumnFamily.STATUS_COLUMN.fetch(scanner);
ReservationColumnFamily.RESERVATION_COLUMN.fetch(scanner);
return scanner.stream().onClose(scanner::close).map(e -> {
String txUUIDStr = e.getKey().getRow().toString();
FateId fateId = FateId.from(fateInstanceType, txUUIDStr);
Expand All @@ -312,10 +314,10 @@ protected Stream<FateIdStatus> getTransactions(EnumSet<TStatus> statuses) {
Text colq = entry.getKey().getColumnQualifier();
Value val = entry.getValue();
switch (colq.toString()) {
case TxColumnFamily.STATUS:
case StatusColumnFamily.STATUS:
status = TStatus.valueOf(val.toString());
break;
case TxColumnFamily.RESERVATION:
case ReservationColumnFamily.RESERVATION:
reservation = FateReservation.deserialize(val.get());
break;
default:
Expand Down Expand Up @@ -359,7 +361,7 @@ public Stream<FateKey> list(FateKey.FateKeyType type) {
protected TStatus _getStatus(FateId fateId) {
return scanTx(scanner -> {
scanner.setRange(getRow(fateId));
TxColumnFamily.STATUS_COLUMN.fetch(scanner);
StatusColumnFamily.STATUS_COLUMN.fetch(scanner);
return scanner.stream().map(e -> TStatus.valueOf(e.getValue().toString())).findFirst()
.orElse(TStatus.UNKNOWN);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,12 @@ public static class TxColumnFamily {
public static final String STR_NAME = "tx";
public static final Text NAME = new Text(STR_NAME);

public static final String STATUS = "status";
public static final ColumnFQ STATUS_COLUMN = new ColumnFQ(NAME, new Text(STATUS));

public static final String TX_KEY = "txkey";
public static final ColumnFQ TX_KEY_COLUMN = new ColumnFQ(NAME, new Text(TX_KEY));

public static final String CREATE_TIME = "ctime";
public static final ColumnFQ CREATE_TIME_COLUMN = new ColumnFQ(NAME, new Text(CREATE_TIME));

public static final String RESERVATION = "reservation";
public static final ColumnFQ RESERVATION_COLUMN = new ColumnFQ(NAME, new Text(RESERVATION));
}

public static class TxInfoColumnFamily {
Expand Down Expand Up @@ -65,4 +60,20 @@ public static class RepoColumnFamily {
public static final Text NAME = new Text(STR_NAME);
}

public static class StatusColumnFamily {
public static final String STR_NAME = "status";
public static final Text NAME = new Text(STR_NAME);

public static final String STATUS = "status";
public static final ColumnFQ STATUS_COLUMN = new ColumnFQ(NAME, new Text(STATUS));
}

public static class ReservationColumnFamily {
public static final String STR_NAME = "reservation";
public static final Text NAME = new Text(STR_NAME);

public static final String RESERVATION = "reservation";
public static final ColumnFQ RESERVATION_COLUMN = new ColumnFQ(NAME, new Text(RESERVATION));
}
Comment on lines +63 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could put these two columns in a single column family, that avoids the duplication in family and qualifier. Some possible name for the family are txtAdmin, txMgmt, or txSituation. Tried admin in the suggestion below because was thinking this is information related to administering a fate transaction.

Suggested change
public static class StatusColumnFamily {
public static final String STR_NAME = "status";
public static final Text NAME = new Text(STR_NAME);
public static final String STATUS = "status";
public static final ColumnFQ STATUS_COLUMN = new ColumnFQ(NAME, new Text(STATUS));
}
public static class ReservationColumnFamily {
public static final String STR_NAME = "reservation";
public static final Text NAME = new Text(STR_NAME);
public static final String RESERVATION = "reservation";
public static final ColumnFQ RESERVATION_COLUMN = new ColumnFQ(NAME, new Text(RESERVATION));
}
public static class TxAdminColumnFamily {
public static final String STR_NAME = "txAdmin";
public static final Text NAME = new Text(STR_NAME);
public static final String STATUS = "status";
public static final ColumnFQ STATUS_COLUMN = new ColumnFQ(NAME, new Text(STATUS));
public static final String RESERVATION = "reservation";
public static final ColumnFQ RESERVATION_COLUMN = new ColumnFQ(NAME, new Text(RESERVATION));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,12 @@ public InitialConfiguration(Configuration hadoopConf, SiteConfiguration siteConf

initialFateTableConf.putAll(commonConfig);
initialFateTableConf.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "256M");
// Create a locality group that contains status so its fast to scan. When fate looks for work is
// scans this family.
initialFateTableConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "status",
FateSchema.TxColumnFamily.STR_NAME);
initialFateTableConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "status");
// Create a locality group that contains status and reservation so its fast to scan. When fate
// looks for work is scans this family.
initialFateTableConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "statusReservation",
String.format("%s,%s", FateSchema.StatusColumnFamily.STR_NAME,
FateSchema.ReservationColumnFamily.STR_NAME));
initialFateTableConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "statusReservation");

initialScanRefTableConf.putAll(commonConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.fate.user.UserFateStore;
import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily;
import org.apache.accumulo.core.fate.user.schema.FateSchema;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.server.ServerContext;
Expand Down Expand Up @@ -70,7 +70,7 @@ public void executeTest(FateTestExecutor<TestEnv> testMethod, int maxDeferred,
protected TStatus getTxStatus(ServerContext context, FateId fateId) {
try (Scanner scanner = context.createScanner(table, Authorizations.EMPTY)) {
scanner.setRange(getRow(fateId));
TxColumnFamily.STATUS_COLUMN.fetch(scanner);
FateSchema.StatusColumnFamily.STATUS_COLUMN.fetch(scanner);
return StreamSupport.stream(scanner.spliterator(), false)
.map(e -> TStatus.valueOf(e.getValue().toString())).findFirst().orElse(TStatus.UNKNOWN);
} catch (TableNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ private void injectStatus(ClientContext client, String table, FateId fateId, TSt
throws TableNotFoundException {
try (BatchWriter writer = client.createBatchWriter(table)) {
Mutation mutation = new Mutation(new Text(fateId.getTxUUIDStr()));
FateSchema.TxColumnFamily.STATUS_COLUMN.put(mutation, new Value(status.name()));
FateSchema.StatusColumnFamily.STATUS_COLUMN.put(mutation, new Value(status.name()));
writer.addMutation(mutation);
} catch (MutationsRejectedException e) {
throw new RuntimeException(e);
Expand Down