From 0952046776d0ae5b7a486b028279980b1bd2ee31 Mon Sep 17 00:00:00 2001 From: Anarak404 <65184993+Anarak404@users.noreply.github.com> Date: Fri, 8 Nov 2024 16:32:05 +0100 Subject: [PATCH] resolve #1886 | rewrite deprecated curator API - transactions (#1910) Co-authored-by: Mateusz <76775507+szczygiel-m@users.noreply.github.com> --- .../zookeeper/ZookeeperBasedRepository.java | 41 ++++++++----------- .../cache/HierarchicalCacheTest.groovy | 31 +++++++------- 2 files changed, 32 insertions(+), 40 deletions(-) diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java index 85827afb21..a70d6c3723 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperBasedRepository.java @@ -3,16 +3,9 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.function.BiConsumer; -import java.util.stream.Collectors; import org.apache.commons.lang3.ArrayUtils; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; +import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; @@ -21,6 +14,14 @@ import pl.allegro.tech.hermes.common.exception.RepositoryNotAvailableException; import pl.allegro.tech.hermes.infrastructure.MalformedDataException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; + public abstract class ZookeeperBasedRepository { private static final Logger logger = LoggerFactory.getLogger(ZookeeperBasedRepository.class); @@ -161,15 +162,10 @@ protected void createRecursively(String path, Object value) throws Exception { protected void createInTransaction(String path, Object value, String childPath) throws Exception { ensureConnected(); - zookeeper - .inTransaction() - .create() - .forPath(path, mapper.writeValueAsBytes(value)) - .and() - .create() - .forPath(childPath) - .and() - .commit(); + zookeeper.transaction().forOperations( + zookeeper.transactionOp().create().forPath(path, mapper.writeValueAsBytes(value)), + zookeeper.transactionOp().create().forPath(childPath) + ); } protected void deleteInTransaction(List paths) throws Exception { @@ -177,14 +173,11 @@ protected void deleteInTransaction(List paths) throws Exception { throw new InternalProcessingException("Attempting to remove empty set of paths from ZK"); } ensureConnected(); - CuratorTransactionFinal transaction = - zookeeper.inTransaction().delete().forPath(paths.get(0)).and(); - - for (int i = 1; i < paths.size(); i++) { - transaction = transaction.delete().forPath(paths.get(i)).and(); + List operations = new ArrayList<>(paths.size()); + for (String path : paths) { + operations.add(zookeeper.transactionOp().delete().forPath(path)); } - - transaction.commit(); + zookeeper.transaction().forOperations(operations); } protected void create(String path, Object value) throws Exception { diff --git a/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/cache/HierarchicalCacheTest.groovy b/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/cache/HierarchicalCacheTest.groovy index 48b9c9490d..9c13330957 100644 --- a/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/cache/HierarchicalCacheTest.groovy +++ b/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/cache/HierarchicalCacheTest.groovy @@ -1,6 +1,5 @@ package pl.allegro.tech.hermes.infrastructure.zookeeper.cache -import org.awaitility.Awaitility import pl.allegro.tech.hermes.test.IntegrationTest import java.time.Duration @@ -42,10 +41,10 @@ class HierarchicalCacheTest extends IntegrationTest { cache.start() when: - zookeeper().inTransaction() - .create().forPath('/hierarchicalCacheTest/groups/groupA', 'groupA'.bytes) - .and().create().forPath('/hierarchicalCacheTest/groups/groupA/topics') - .and().commit() + zookeeper().transaction().forOperations( + zookeeper().transactionOp().create().forPath('/hierarchicalCacheTest/groups/groupA', 'groupA'.bytes), + zookeeper().transactionOp().create().forPath('/hierarchicalCacheTest/groups/groupA/topics') + ) then: await().atMost(Duration.ofSeconds(5)).until({ @@ -53,10 +52,10 @@ class HierarchicalCacheTest extends IntegrationTest { }) when: - zookeeper().inTransaction() - .create().forPath('/hierarchicalCacheTest/groups/groupA/topics/topicA', 'topicA'.bytes) - .and().create().forPath('/hierarchicalCacheTest/groups/groupA/topics/topicA/subscriptions') - .and().commit() + zookeeper().transaction().forOperations( + zookeeper().transactionOp().create().forPath('/hierarchicalCacheTest/groups/groupA/topics/topicA', 'topicA'.bytes), + zookeeper().transactionOp().create().forPath('/hierarchicalCacheTest/groups/groupA/topics/topicA/subscriptions') + ) then: await().atMost(Duration.ofSeconds(5)).until({ @@ -77,13 +76,13 @@ class HierarchicalCacheTest extends IntegrationTest { def "should call callbacks for all entities created before cache started"() { given: - zookeeper().inTransaction() - .create().forPath('/hierarchicalCacheTest/groups/groupB', 'groupB'.bytes) - .and().create().forPath('/hierarchicalCacheTest/groups/groupB/topics') - .and().create().forPath('/hierarchicalCacheTest/groups/groupB/topics/topicB', 'topicB'.bytes) - .and().create().forPath('/hierarchicalCacheTest/groups/groupB/topics/topicB/subscriptions') - .and().create().forPath('/hierarchicalCacheTest/groups/groupB/topics/topicB/subscriptions/subB', 'subB'.bytes) - .and().commit() + zookeeper().transaction().forOperations( + zookeeper().transactionOp().create().forPath('/hierarchicalCacheTest/groups/groupB', 'groupB'.bytes), + zookeeper().transactionOp().create().forPath('/hierarchicalCacheTest/groups/groupB/topics'), + zookeeper().transactionOp().create().forPath('/hierarchicalCacheTest/groups/groupB/topics/topicB', 'topicB'.bytes), + zookeeper().transactionOp().create().forPath('/hierarchicalCacheTest/groups/groupB/topics/topicB/subscriptions'), + zookeeper().transactionOp().create().forPath('/hierarchicalCacheTest/groups/groupB/topics/topicB/subscriptions/subB', 'subB'.bytes) + ) when: cache.start()