Skip to content

Commit

Permalink
resolve #1886 | rewrite deprecated curator API - transactions (#1910)
Browse files Browse the repository at this point in the history
Co-authored-by: Mateusz <76775507+szczygiel-m@users.noreply.github.com>
  • Loading branch information
Anarak404 and szczygiel-m authored Nov 8, 2024
1 parent 428092c commit 0952046
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,9 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
Expand All @@ -21,6 +14,14 @@
import pl.allegro.tech.hermes.common.exception.RepositoryNotAvailableException;
import pl.allegro.tech.hermes.infrastructure.MalformedDataException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

public abstract class ZookeeperBasedRepository {

private static final Logger logger = LoggerFactory.getLogger(ZookeeperBasedRepository.class);
Expand Down Expand Up @@ -161,30 +162,22 @@ protected void createRecursively(String path, Object value) throws Exception {

protected void createInTransaction(String path, Object value, String childPath) throws Exception {
ensureConnected();
zookeeper
.inTransaction()
.create()
.forPath(path, mapper.writeValueAsBytes(value))
.and()
.create()
.forPath(childPath)
.and()
.commit();
zookeeper.transaction().forOperations(
zookeeper.transactionOp().create().forPath(path, mapper.writeValueAsBytes(value)),
zookeeper.transactionOp().create().forPath(childPath)
);
}

protected void deleteInTransaction(List<String> paths) throws Exception {
if (paths.isEmpty()) {
throw new InternalProcessingException("Attempting to remove empty set of paths from ZK");
}
ensureConnected();
CuratorTransactionFinal transaction =
zookeeper.inTransaction().delete().forPath(paths.get(0)).and();

for (int i = 1; i < paths.size(); i++) {
transaction = transaction.delete().forPath(paths.get(i)).and();
List<CuratorOp> operations = new ArrayList<>(paths.size());
for (String path : paths) {
operations.add(zookeeper.transactionOp().delete().forPath(path));
}

transaction.commit();
zookeeper.transaction().forOperations(operations);
}

protected void create(String path, Object value) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package pl.allegro.tech.hermes.infrastructure.zookeeper.cache

import org.awaitility.Awaitility
import pl.allegro.tech.hermes.test.IntegrationTest

import java.time.Duration
Expand Down Expand Up @@ -42,21 +41,21 @@ class HierarchicalCacheTest extends IntegrationTest {
cache.start()

when:
zookeeper().inTransaction()
.create().forPath('/hierarchicalCacheTest/groups/groupA', 'groupA'.bytes)
.and().create().forPath('/hierarchicalCacheTest/groups/groupA/topics')
.and().commit()
zookeeper().transaction().forOperations(
zookeeper().transactionOp().create().forPath('/hierarchicalCacheTest/groups/groupA', 'groupA'.bytes),
zookeeper().transactionOp().create().forPath('/hierarchicalCacheTest/groups/groupA/topics')
)

then:
await().atMost(Duration.ofSeconds(5)).until({
calledCallbacks.contains(new Tuple(CHILD_ADDED, '/hierarchicalCacheTest/groups/groupA', 'groupA'))
})

when:
zookeeper().inTransaction()
.create().forPath('/hierarchicalCacheTest/groups/groupA/topics/topicA', 'topicA'.bytes)
.and().create().forPath('/hierarchicalCacheTest/groups/groupA/topics/topicA/subscriptions')
.and().commit()
zookeeper().transaction().forOperations(
zookeeper().transactionOp().create().forPath('/hierarchicalCacheTest/groups/groupA/topics/topicA', 'topicA'.bytes),
zookeeper().transactionOp().create().forPath('/hierarchicalCacheTest/groups/groupA/topics/topicA/subscriptions')
)

then:
await().atMost(Duration.ofSeconds(5)).until({
Expand All @@ -77,13 +76,13 @@ class HierarchicalCacheTest extends IntegrationTest {

def "should call callbacks for all entities created before cache started"() {
given:
zookeeper().inTransaction()
.create().forPath('/hierarchicalCacheTest/groups/groupB', 'groupB'.bytes)
.and().create().forPath('/hierarchicalCacheTest/groups/groupB/topics')
.and().create().forPath('/hierarchicalCacheTest/groups/groupB/topics/topicB', 'topicB'.bytes)
.and().create().forPath('/hierarchicalCacheTest/groups/groupB/topics/topicB/subscriptions')
.and().create().forPath('/hierarchicalCacheTest/groups/groupB/topics/topicB/subscriptions/subB', 'subB'.bytes)
.and().commit()
zookeeper().transaction().forOperations(
zookeeper().transactionOp().create().forPath('/hierarchicalCacheTest/groups/groupB', 'groupB'.bytes),
zookeeper().transactionOp().create().forPath('/hierarchicalCacheTest/groups/groupB/topics'),
zookeeper().transactionOp().create().forPath('/hierarchicalCacheTest/groups/groupB/topics/topicB', 'topicB'.bytes),
zookeeper().transactionOp().create().forPath('/hierarchicalCacheTest/groups/groupB/topics/topicB/subscriptions'),
zookeeper().transactionOp().create().forPath('/hierarchicalCacheTest/groups/groupB/topics/topicB/subscriptions/subB', 'subB'.bytes)
)

when:
cache.start()
Expand Down

0 comments on commit 0952046

Please sign in to comment.