Skip to content

Commit

Permalink
[HUDI-4751] Fix owner instants for transaction manager api callers (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan authored Sep 3, 2022
1 parent 98c3d88 commit 82d41f4
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ public void unlock() {
try {
if (LOCK.isWriteLockedByCurrentThread()) {
LOCK.writeLock().unlock();
LOG.info(getLogMessage(LockState.RELEASED));
} else {
LOG.warn("Cannot unlock because the current thread does not hold the lock.");
}
} catch (Exception e) {
throw new HoodieLockException(getLogMessage(LockState.FAILED_TO_RELEASE), e);
}
LOG.info(getLogMessage(LockState.RELEASED));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstan
ValidationUtils.checkArgument(cleanInstant.getState().equals(HoodieInstant.State.REQUESTED)
|| cleanInstant.getState().equals(HoodieInstant.State.INFLIGHT));

HoodieInstant inflightInstant = null;
try {
final HoodieInstant inflightInstant;
final HoodieTimer timer = new HoodieTimer();
timer.startTimer();
if (cleanInstant.isRequested()) {
Expand All @@ -218,7 +218,7 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstan
cleanStats
);
if (!skipLocking) {
this.txnManager.beginTransaction(Option.empty(), Option.empty());
this.txnManager.beginTransaction(Option.of(inflightInstant), Option.empty());
}
writeTableMetadata(metadata, inflightInstant.getTimestamp());
table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,
Expand All @@ -229,7 +229,7 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstan
throw new HoodieIOException("Failed to clean up after commit", e);
} finally {
if (!skipLocking) {
this.txnManager.endTransaction(Option.empty());
this.txnManager.endTransaction(Option.of(inflightInstant));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ private HoodieRestoreMetadata finishRestore(Map<String, List<HoodieRollbackMetad

HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.convertRestoreMetadata(
instantTime, durationInMs, instantsRolledBack, instantToMetadata);
writeToMetadata(restoreMetadata);
table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, instantTime),
TimelineMetadataUtils.serializeRestoreMetadata(restoreMetadata));
HoodieInstant restoreInflightInstant = new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, instantTime);
writeToMetadata(restoreMetadata, restoreInflightInstant);
table.getActiveTimeline().saveAsComplete(restoreInflightInstant, TimelineMetadataUtils.serializeRestoreMetadata(restoreMetadata));
// get all pending rollbacks instants after restore instant time and delete them.
// if not, rollbacks will be considered not completed and might hinder metadata table compaction.
List<HoodieInstant> instantsToRollback = table.getActiveTimeline().getRollbackTimeline()
Expand All @@ -151,12 +151,12 @@ private HoodieRestoreMetadata finishRestore(Map<String, List<HoodieRollbackMetad
*
* @param restoreMetadata instance of {@link HoodieRestoreMetadata} to be applied to metadata.
*/
private void writeToMetadata(HoodieRestoreMetadata restoreMetadata) {
private void writeToMetadata(HoodieRestoreMetadata restoreMetadata, HoodieInstant restoreInflightInstant) {
try {
this.txnManager.beginTransaction(Option.empty(), Option.empty());
this.txnManager.beginTransaction(Option.of(restoreInflightInstant), Option.empty());
writeTableMetadata(restoreMetadata);
} finally {
this.txnManager.endTransaction(Option.empty());
this.txnManager.endTransaction(Option.of(restoreInflightInstant));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ protected void finishRollback(HoodieInstant inflightInstant, HoodieRollbackMetad
boolean enableLocking = (!skipLocking && !skipTimelinePublish);
try {
if (enableLocking) {
this.txnManager.beginTransaction(Option.empty(), Option.empty());
this.txnManager.beginTransaction(Option.of(inflightInstant), Option.empty());
}

// If publish the rollback to the timeline, we first write the rollback metadata
Expand All @@ -261,7 +261,7 @@ protected void finishRollback(HoodieInstant inflightInstant, HoodieRollbackMetad
throw new HoodieIOException("Error executing rollback at instant " + instantTime, e);
} finally {
if (enableLocking) {
this.txnManager.endTransaction(Option.empty());
this.txnManager.endTransaction(Option.of(inflightInstant));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,47 @@ public void testTryLockReAcquisitionByDifferentThread() {
});
}

@Test
public void testTryUnLockByDifferentThread() {
InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
final AtomicBoolean writer3Completed = new AtomicBoolean(false);

// Main test thread
Assertions.assertTrue(inProcessLockProvider.tryLock());

// Another writer thread
Thread writer2 = new Thread(() -> {
assertDoesNotThrow(() -> {
inProcessLockProvider.unlock();
});
});
writer2.start();
try {
writer2.join();
} catch (InterruptedException e) {
//
}

// try acquiring by diff thread. should fail. since main thread still have acquired the lock. if previous unblock by a different thread would have succeeded, this lock
// acquisition would succeed.
Thread writer3 = new Thread(() -> {
Assertions.assertFalse(inProcessLockProvider.tryLock(50, TimeUnit.MILLISECONDS));
writer3Completed.set(true);
});
writer3.start();
try {
writer3.join();
} catch (InterruptedException e) {
//
}

Assertions.assertTrue(writer3Completed.get());
assertDoesNotThrow(() -> {
// unlock by main thread should succeed.
inProcessLockProvider.unlock();
});
}

@Test
public void testTryLockAcquisitionBeforeTimeOutFromTwoThreads() {
final InProcessLockProvider inProcessLockProvider = new InProcessLockProvider(lockConfiguration, hadoopConfiguration);
Expand Down

0 comments on commit 82d41f4

Please sign in to comment.