Skip to content

Commit

Permalink
[Issue 15896] Fix LockTimeout when storePut on the same key concurren…
Browse files Browse the repository at this point in the history
…tly in RocksdbMetadataStore (#16005)

* Fix LockTimeout when storePut or storeDelete on the same key concurrently

* Add cleanup
  • Loading branch information
Jason918 authored and merlimat committed Jun 15, 2022
1 parent 0d4c0b1 commit 8a14fe0
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ protected CompletableFuture<Void> storeDelete(String path, Optional<Long> expect
}
try (Transaction transaction = db.beginTransaction(optionSync)) {
byte[] pathBytes = toBytes(path);
byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, false);
byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, true);
MetaValue metaValue = MetaValue.parse(oldValueData);
if (metaValue == null) {
throw new MetadataStoreException.NotFoundException(String.format("path %s not found.", path));
Expand All @@ -504,6 +504,9 @@ protected CompletableFuture<Void> storeDelete(String path, Optional<Long> expect
return CompletableFuture.completedFuture(null);
}
} catch (Throwable e) {
if (log.isDebugEnabled()) {
log.debug("error in storeDelete,path={}", path, e);
}
return FutureUtil.failedFuture(MetadataStoreException.wrap(e));
} finally {
dbStateLock.readLock().unlock();
Expand All @@ -523,7 +526,7 @@ protected CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Lo
}
try (Transaction transaction = db.beginTransaction(optionSync)) {
byte[] pathBytes = toBytes(path);
byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, false);
byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, true);
MetaValue metaValue = MetaValue.parse(oldValueData);
if (expectedVersion.isPresent()) {
if (metaValue == null && expectedVersion.get() != -1
Expand Down Expand Up @@ -572,6 +575,9 @@ protected CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Lo
metaValue.ephemeral, true));
}
} catch (Throwable e) {
if (log.isDebugEnabled()) {
log.debug("error in storePut,path={}", path, e);
}
return FutureUtil.failedFuture(MetadataStoreException.wrap(e));
} finally {
dbStateLock.readLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.assertj.core.util.Lists;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

@Slf4j
Expand Down Expand Up @@ -447,4 +448,35 @@ public void run() {
assertEquals(successWrites.get(), maxValue);
assertEquals(store.get(path).get().get().getValue()[0], maxValue);
}

@Test(dataProvider = "impl")
public void testConcurrentPut(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());

String k = newKey();
CompletableFuture<Void> f1 =
CompletableFuture.runAsync(() -> store.put(k, new byte[0], Optional.of(-1L)).join());
CompletableFuture<Void> f2 =
CompletableFuture.runAsync(() -> store.put(k, new byte[0], Optional.of(-1L)).join());
Awaitility.await().until(() -> f1.isDone() && f2.isDone());
assertTrue(f1.isCompletedExceptionally() && !f2.isCompletedExceptionally() ||
! f1.isCompletedExceptionally() && f2.isCompletedExceptionally());
}

@Test(dataProvider = "impl")
public void testConcurrentDelete(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());

String k = newKey();
store.put(k, new byte[0], Optional.of(-1L)).join();
CompletableFuture<Void> f1 =
CompletableFuture.runAsync(() -> store.delete(k, Optional.empty()).join());
CompletableFuture<Void> f2 =
CompletableFuture.runAsync(() -> store.delete(k, Optional.empty()).join());
Awaitility.await().until(() -> f1.isDone() && f2.isDone());
assertTrue(f1.isCompletedExceptionally() && !f2.isCompletedExceptionally() ||
! f1.isCompletedExceptionally() && f2.isCompletedExceptionally());
}
}

0 comments on commit 8a14fe0

Please sign in to comment.