Skip to content

Commit

Permalink
modify the verification conditions before writing data to ZooKeeper.
Browse files Browse the repository at this point in the history
  • Loading branch information
wangrenyi committed Jan 26, 2025
1 parent 1c9bf82 commit 1e7a3b4
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ private Op convertOp(MetadataOp op) {
case PUT: {
OpPut p = op.asPut();
CreateMode createMode = getCreateMode(p.getOptions());
if (p.getOptExpectedVersion().isPresent() && p.getOptExpectedVersion().get() == -1L) {
if (p.getOptExpectedVersion().isEmpty() || p.getOptExpectedVersion().get() == -1L) {
// We are assuming a create operation
return Op.create(p.getPath(), p.getData(), ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.metadata;

import static org.junit.Assert.assertFalse;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
Expand Down Expand Up @@ -66,4 +67,39 @@ public void sequentialKeys(String provider, Supplier<String> urlSupplier) throws
assertNotEquals(seq1, seq2);
assertTrue(n1 < n2);
}

@Test
public void zookeeperEphemeralKeys() throws Exception {
final String key1 = newKey();
final String key2 = newKey();
@Cleanup
MetadataStoreExtended store = MetadataStoreExtended.create(zks.getConnectionString(), MetadataStoreConfig.builder().build());
store.put(key1, "value-1".getBytes(), Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)).join();
store.put(key2, "value-1".getBytes(), Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
store.close();

@Cleanup
MetadataStoreExtended store2 = MetadataStoreExtended.create(zks.getConnectionString(), MetadataStoreConfig.builder().build());
assertFalse(store2.exists(key1).join());
assertFalse(store2.exists(key2).join());
store2.close();
}

@Test(dataProvider = "impl", enabled = false)
public void ephemeralKeys(String provider, Supplier<String> urlSupplier) throws Exception {
final String key1 = newKey();
final String key2 = newKey();

MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(), MetadataStoreConfig.builder().build());

store.put(key1, "value-1".getBytes(), Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)).join();
store.put(key2, "value-1".getBytes(), Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
store.close();

@Cleanup MetadataStoreExtended store2 = MetadataStoreExtended.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
assertFalse(store2.exists(key1).join());
assertFalse(store2.exists(key2).join());
store2.close();
}

}

0 comments on commit 1e7a3b4

Please sign in to comment.