From 3cfe73368cde7777f687de29cc7db5edfa63c423 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pere=20Urb=C3=B3n?= Date: Wed, 23 Mar 2022 18:30:24 +0100 Subject: [PATCH] Fix RedisBackend bootstrap, NullPointerException (#462) * fix case when first load happends from redis and no state is there, null return handling, added related test as well * small cleanup --- .../kafka/topology/backend/RedisBackend.java | 7 +- .../integration/backend/RedisBackendIT.java | 119 +++++++++++++++++- 2 files changed, 123 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/purbon/kafka/topology/backend/RedisBackend.java b/src/main/java/com/purbon/kafka/topology/backend/RedisBackend.java index e572ed678..4e6927576 100644 --- a/src/main/java/com/purbon/kafka/topology/backend/RedisBackend.java +++ b/src/main/java/com/purbon/kafka/topology/backend/RedisBackend.java @@ -3,6 +3,7 @@ import com.purbon.kafka.topology.BackendController.Mode; import com.purbon.kafka.topology.utils.JSON; import java.io.IOException; +import java.util.Optional; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import redis.clients.jedis.Jedis; @@ -43,14 +44,16 @@ public void close() { @Override public void save(BackendState state) throws IOException { + LOGGER.debug("Storing state for: " + state); jedis.set(JULIE_OPS_STATE, state.asPrettyJson()); } @Override public BackendState load() throws IOException { connectIfNeed(); - String content = jedis.get(JULIE_OPS_STATE); - return (BackendState) JSON.toObject(content, BackendState.class); + Optional contentOptional = Optional.ofNullable(jedis.get(JULIE_OPS_STATE)); + LOGGER.debug("Loading a new state instance: " + contentOptional); + return (BackendState) JSON.toObject(contentOptional.orElse("{}"), BackendState.class); } private void connectIfNeed() { diff --git a/src/test/java/com/purbon/kafka/topology/integration/backend/RedisBackendIT.java b/src/test/java/com/purbon/kafka/topology/integration/backend/RedisBackendIT.java index 5c4f41fb5..6235efdbf 100644 --- a/src/test/java/com/purbon/kafka/topology/integration/backend/RedisBackendIT.java +++ b/src/test/java/com/purbon/kafka/topology/integration/backend/RedisBackendIT.java @@ -1,21 +1,55 @@ package com.purbon.kafka.topology.integration.backend; +import static com.purbon.kafka.topology.CommandLineInterface.BROKERS_OPTION; +import static com.purbon.kafka.topology.Constants.ALLOW_DELETE_TOPICS; +import static com.purbon.kafka.topology.Constants.REDIS_HOST_CONFIG; +import static com.purbon.kafka.topology.Constants.REDIS_PORT_CONFIG; +import static com.purbon.kafka.topology.Constants.STATE_PROCESSOR_IMPLEMENTATION_CLASS; +import static com.purbon.kafka.topology.Constants.TOPOLOGY_TOPIC_STATE_FROM_CLUSTER; +import static com.purbon.kafka.topology.backend.RedisBackend.JULIE_OPS_STATE; import static org.assertj.core.api.Assertions.assertThat; +import com.purbon.kafka.topology.BackendController; +import com.purbon.kafka.topology.Configuration; +import com.purbon.kafka.topology.ExecutionPlan; +import com.purbon.kafka.topology.TopicManager; +import com.purbon.kafka.topology.api.adminclient.TopologyBuilderAdminClient; import com.purbon.kafka.topology.backend.BackendState; import com.purbon.kafka.topology.backend.RedisBackend; +import com.purbon.kafka.topology.integration.containerutils.ContainerFactory; +import com.purbon.kafka.topology.integration.containerutils.ContainerTestUtils; +import com.purbon.kafka.topology.integration.containerutils.SaslPlaintextKafkaContainer; +import com.purbon.kafka.topology.model.Impl.ProjectImpl; +import com.purbon.kafka.topology.model.Impl.TopologyImpl; +import com.purbon.kafka.topology.model.Project; +import com.purbon.kafka.topology.model.Topic; +import com.purbon.kafka.topology.model.Topology; import com.purbon.kafka.topology.model.artefact.KafkaConnectArtefact; import com.purbon.kafka.topology.roles.TopologyAclBinding; +import com.purbon.kafka.topology.schemas.SchemaRegistryManager; +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Properties; +import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.common.resource.ResourceType; +import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; import org.testcontainers.containers.GenericContainer; import org.testcontainers.utility.DockerImageName; +import redis.clients.jedis.Jedis; public class RedisBackendIT { @@ -23,13 +57,65 @@ public class RedisBackendIT { public GenericContainer redis = new GenericContainer<>(DockerImageName.parse("redis:5.0.3-alpine")).withExposedPorts(6379); + private static SaslPlaintextKafkaContainer container; + private TopicManager topicManager; + private AdminClient kafkaAdminClient; + + private ExecutionPlan plan; + + @Rule public MockitoRule mockitoRule = MockitoJUnit.rule(); + private Jedis jedis; + + @BeforeClass + public static void setup() { + container = ContainerFactory.fetchSaslKafkaContainer(System.getProperty("cp.version")); + container.start(); + } + + @AfterClass + public static void teardown() { + container.stop(); + } + + @Before + public void before() throws IOException { + Files.deleteIfExists(Paths.get(".cluster-state")); + + kafkaAdminClient = ContainerTestUtils.getSaslAdminClient(container); + TopologyBuilderAdminClient adminClient = new TopologyBuilderAdminClient(kafkaAdminClient); + + final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient(); + final SchemaRegistryManager schemaRegistryManager = + new SchemaRegistryManager(schemaRegistryClient, System.getProperty("user.dir")); + + this.jedis = new Jedis(redis.getContainerIpAddress(), redis.getFirstMappedPort()); + var backend = new RedisBackend(jedis); + + this.plan = ExecutionPlan.init(new BackendController(backend), System.out); + + Properties props = new Properties(); + props.put(TOPOLOGY_TOPIC_STATE_FROM_CLUSTER, true); + props.put(ALLOW_DELETE_TOPICS, true); + props.put( + STATE_PROCESSOR_IMPLEMENTATION_CLASS, "com.purbon.kafka.topology.backend.RedisBackend"); + props.put(REDIS_HOST_CONFIG, redis.getContainerIpAddress()); + props.put(REDIS_PORT_CONFIG, redis.getFirstMappedPort()); + + HashMap cliOps = new HashMap<>(); + cliOps.put(BROKERS_OPTION, ""); + + Configuration config = new Configuration(cliOps, props); + + this.topicManager = new TopicManager(adminClient, schemaRegistryManager, config); + } + @Test public void testStoreAndFetch() throws IOException { String host = redis.getContainerIpAddress(); int port = redis.getFirstMappedPort(); RedisBackend rsp = new RedisBackend(host, port); - rsp.createOrOpen(); + rsp.load(); TopologyAclBinding binding = TopologyAclBinding.build( @@ -55,4 +141,35 @@ public void testStoreAndFetch() throws IOException { Assert.assertEquals( binding.getPrincipal(), recoveredState.getBindings().iterator().next().getPrincipal()); } + + @Test + public void testTopicCreation() throws IOException { + + Topology topology = new TopologyImpl(); + topology.setContext("testTopicCreation"); + Project project = new ProjectImpl("project"); + topology.addProject(project); + + HashMap config = new HashMap<>(); + config.put(TopicManager.NUM_PARTITIONS, "1"); + config.put(TopicManager.REPLICATION_FACTOR, "1"); + + Topic topicA = new Topic("topicA", config); + project.addTopic(topicA); + + config = new HashMap<>(); + config.put(TopicManager.NUM_PARTITIONS, "1"); + config.put(TopicManager.REPLICATION_FACTOR, "1"); + + Topic topicB = new Topic("topicB", config); + project.addTopic(topicB); + + topicManager.updatePlan(topology, plan); + plan.run(); + + String content = jedis.get(JULIE_OPS_STATE); + assertThat(content) + .contains( + "\"topics\" : [ \"testTopicCreation.project.topicB\", \"testTopicCreation.project.topicA\" ]"); + } }