Skip to content

Commit

Permalink
HDDS-11712. Process other DeletedBlocksTransaction before retrying fa…
Browse files Browse the repository at this point in the history
…iled one. (apache#7532)
  • Loading branch information
ashishkumar50 authored Dec 17, 2024
1 parent 3648b59 commit 8bb0587
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class DeletedBlockLogImpl
private long scmCommandTimeoutMs = Duration.ofSeconds(300).toMillis();

private static final int LIST_ALL_FAILED_TRANSACTIONS = -1;
private long lastProcessedTransactionId = -1;

public DeletedBlockLogImpl(ConfigurationSource conf,
StorageContainerManager scm,
Expand Down Expand Up @@ -344,6 +345,34 @@ public DatanodeDeletedBlockTransactions getTransactions(
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
deletedBlockLogStateManager.getReadOnlyIterator()) {
if (lastProcessedTransactionId != -1) {
iter.seek(lastProcessedTransactionId);
/*
* We should start from (lastProcessedTransactionId + 1) transaction.
* Now the iterator (iter.next call) is pointing at
* lastProcessedTransactionId, read the current value to move
* the cursor.
*/
if (iter.hasNext()) {
/*
* There is a possibility that the lastProcessedTransactionId got
* deleted from the table, in that case we have to set
* lastProcessedTransactionId to next available transaction in the table.
*
* By doing this there is a chance that we will skip processing the new
* lastProcessedTransactionId, that should be ok. We can get to it in the
* next run.
*/
lastProcessedTransactionId = iter.next().getKey();
}

// If we have reached the end, go to beginning.
if (!iter.hasNext()) {
iter.seekToFirst();
lastProcessedTransactionId = -1;
}
}

// Get the CmdStatus status of the aggregation, so that the current
// status of the specified transaction can be found faster
Map<UUID, Map<Long, CmdStatus>> commandStatus =
Expand All @@ -352,13 +381,14 @@ public DatanodeDeletedBlockTransactions getTransactions(
map(DatanodeDetails::getUuid).collect(Collectors.toSet()));
ArrayList<Long> txIDs = new ArrayList<>();
metrics.setNumBlockDeletionTransactionDataNodes(dnList.size());
Table.KeyValue<Long, DeletedBlocksTransaction> keyValue = null;
// Here takes block replica count as the threshold to avoid the case
// that part of replicas committed the TXN and recorded in the
// SCMDeletedBlockTransactionStatusManager, while they are counted
// in the threshold.
while (iter.hasNext() &&
transactions.getBlocksDeleted() < blockDeletionLimit) {
Table.KeyValue<Long, DeletedBlocksTransaction> keyValue = iter.next();
keyValue = iter.next();
DeletedBlocksTransaction txn = keyValue.getValue();
final ContainerID id = ContainerID.valueOf(txn.getContainerID());
try {
Expand Down Expand Up @@ -386,7 +416,24 @@ public DatanodeDeletedBlockTransactions getTransactions(
LOG.warn("Container: {} was not found for the transaction: {}.", id, txn);
txIDs.add(txn.getTxID());
}

if (lastProcessedTransactionId == keyValue.getKey()) {
// We have circled back to the last transaction.
break;
}

if (!iter.hasNext() && lastProcessedTransactionId != -1) {
/*
* We started from in-between and reached end of the table,
* now we should go to the start of the table and process
* the transactions.
*/
iter.seekToFirst();
}
}

lastProcessedTransactionId = keyValue != null ? keyValue.getKey() : -1;

if (!txIDs.isEmpty()) {
deletedBlockLogStateManager.removeTransactionsFromDB(txIDs);
metrics.incrBlockDeletionTransactionCompleted(txIDs.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ public void close() throws IOException {

@Override
public void seekToFirst() {
throw new UnsupportedOperationException("seekToFirst");
iter.seekToFirst();
findNext();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,75 @@ public void testResetCount() throws Exception {
assertEquals(30 * THREE, blocks.size());
}


@Test
public void testSCMDelIteratorProgress() throws Exception {
int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);

// CASE1: When all transactions are valid and available
// Create 8 TXs in the log.
int noOfTransactions = 8;
addTransactions(generateData(noOfTransactions), true);
mockContainerHealthResult(true);
List<DeletedBlocksTransaction> blocks;

List<Long> txIDs = new ArrayList<>();
int i = 1;
while (i < noOfTransactions) {
// In each iteration read two transaction, API returns all the transactions in order.
// 1st iteration: {1, 2}
// 2nd iteration: {3, 4}
// 3rd iteration: {5, 6}
// 4th iteration: {7, 8}
blocks = getTransactions(2 * BLOCKS_PER_TXN * THREE);
assertEquals(blocks.get(0).getTxID(), i++);
assertEquals(blocks.get(1).getTxID(), i++);
}

// CASE2: When some transactions are not available for delete in the current iteration,
// either due to max retry reach or some other issue.
// New transactions Id is { 9, 10, 11, 12, 13, 14, 15, 16}
addTransactions(generateData(noOfTransactions), true);
mockContainerHealthResult(true);

// Mark transaction Id 11 as reached max retry count so that it will be ignored
// by scm deleting service while fetching transaction for delete
int ignoreTransactionId = 11;
txIDs.add((long) ignoreTransactionId);
for (i = 0; i < maxRetry; i++) {
incrementCount(txIDs);
}
incrementCount(txIDs);

i = 9;
while (true) {
// In each iteration read two transaction.
// If any transaction which is not available for delete in the current iteration,
// it will be ignored and will be re-checked again only after complete table is read.
// 1st iteration: {9, 10}
// 2nd iteration: {12, 13} Transaction 11 is ignored here
// 3rd iteration: {14, 15} Transaction 11 is available here,
// but it will be read only when all db records are read till the end.
// 4th iteration: {16, 11} Since iterator reached at the end of table after reading transaction 16,
// Iterator starts from beginning again, and it returns transaction 11 as well
blocks = getTransactions(2 * BLOCKS_PER_TXN * THREE);
if (i == ignoreTransactionId) {
i++;
}
assertEquals(blocks.get(0).getTxID(), i++);
if (i == 17) {
assertEquals(blocks.get(1).getTxID(), ignoreTransactionId);
break;
}
assertEquals(blocks.get(1).getTxID(), i++);

if (i == 14) {
// Reset transaction 11 so that it will be available in scm key deleting service in the subsequent iterations.
resetCount(txIDs);
}
}
}

@Test
public void testCommitTransactions() throws Exception {
deletedBlockLog.setScmCommandTimeoutMs(Long.MAX_VALUE);
Expand Down

0 comments on commit 8bb0587

Please sign in to comment.