diff --git a/plugins/redis-positions-storage/build.gradle b/plugins/redis-positions-storage/build.gradle new file mode 100644 index 00000000..9df04051 --- /dev/null +++ b/plugins/redis-positions-storage/build.gradle @@ -0,0 +1,40 @@ +test { + useJUnitPlatform() +} + +jar { + manifest { + attributes( + 'Plugin-Id': "${project.name}", + 'Plugin-Version': "${project.version}", + ) + } + + into('lib') { + from configurations.compile + } +} + +dependencies { + compileOnly 'org.projectlombok:lombok' + annotationProcessor 'org.projectlombok:lombok' + compileOnly 'com.google.auto.service:auto-service' + annotationProcessor 'com.google.auto.service:auto-service' + + compileOnly 'org.springframework.boot:spring-boot-starter' + compileOnly 'javax.validation:validation-api' + compileOnly project(":api") + compileOnly 'io.projectreactor:reactor-core' + + compile 'io.lettuce:lettuce-core:5.1.7.RELEASE' + + testCompileOnly 'org.projectlombok:lombok' + testCompileOnly 'org.springframework.boot:spring-boot-starter' + testCompileOnly 'org.springframework.boot:spring-boot-starter-test' + testAnnotationProcessor 'org.projectlombok:lombok' + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine' + testCompile project(":tck") + testCompile 'org.assertj:assertj-core' + testCompile 'org.testcontainers:testcontainers' + testCompile 'org.mockito:mockito-junit-jupiter' +} diff --git a/plugins/redis-positions-storage/src/main/java/com/github/bsideup/liiklus/positions/redis/RedisPositionsStorage.java b/plugins/redis-positions-storage/src/main/java/com/github/bsideup/liiklus/positions/redis/RedisPositionsStorage.java new file mode 100644 index 00000000..713b1b61 --- /dev/null +++ b/plugins/redis-positions-storage/src/main/java/com/github/bsideup/liiklus/positions/redis/RedisPositionsStorage.java @@ -0,0 +1,107 @@ +package com.github.bsideup.liiklus.positions.redis; + +import com.github.bsideup.liiklus.positions.GroupId; +import com.github.bsideup.liiklus.positions.PositionsStorage; +import com.google.common.collect.ImmutableMap; +import io.lettuce.core.RedisClient; +import io.lettuce.core.api.StatefulRedisConnection; +import lombok.RequiredArgsConstructor; +import lombok.Value; +import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Publisher; + +import java.util.Map; +import java.util.concurrent.CompletionStage; +import java.util.stream.Collector; +import java.util.stream.Collectors; + +@Slf4j +public class RedisPositionsStorage implements PositionsStorage { + private static final String KV_SEPARATOR = ":"; + + private static final Collector, ?, Map> ENTRY_MAP_COLLECTOR = Collectors.toMap( + o -> { + String key = o.getKey(); + String[] split = key.split(KV_SEPARATOR); + return Integer.parseInt(split[1]); + }, + o -> Long.parseLong(o.getValue()) + ); + + private final StatefulRedisConnection connection; + + public RedisPositionsStorage(RedisClient redisClient) { + connection = redisClient.connect(); + } + + @Override + public Publisher findAll() { + return connection.reactive() + .keys("*") + .flatMap(s -> connection.reactive().hgetall(s) + .map(stringStringMap -> { + String[] split = s.split(KV_SEPARATOR); + return new Positions(split[0], GroupId.ofString(split[1]), stringStringMap + .entrySet().stream().collect(ENTRY_MAP_COLLECTOR)); + }) + ); + } + + @Override + public CompletionStage> findAll(String topic, GroupId groupId) { + return connection.async() + .hgetall(toKey(topic, groupId.asString())) + .thenApply(stringStringMap -> stringStringMap.entrySet().stream() + .collect(ENTRY_MAP_COLLECTOR)); + } + + @Override + public CompletionStage>> findAllVersionsByGroup(String topic, String groupName) { + return connection.reactive() + .keys(toKey(topic, groupName) + "*") + .flatMap(s -> connection.reactive() + .hgetall(s) + .map(stringStringMap -> stringStringMap.entrySet() + .stream().collect(ENTRY_MAP_COLLECTOR)) + .map(integerLongMap -> new VersionWithData( + GroupId.ofString(s.split(":")[1]).getVersion().orElse(0), + integerLongMap) + ) + ) + .collectMap( + VersionWithData::getVersion, + VersionWithData::getPositionByPratition + ).toFuture(); + } + + @Override + public CompletionStage update(String topic, GroupId groupId, int partition, long position) { + return connection.async() + .hmset( + toKey(topic, groupId.asString()), + ImmutableMap.of( + groupId.getVersion().orElse(0).toString() + KV_SEPARATOR + partition, + Long.toString(position) + ) + ).thenAccept(s -> log.debug("set is {} for {}", s, topic)); + } + + /** + * Should close connection after stop context + */ + public void shutdown() { + connection.close(); + } + + private static String toKey(String topic, String groupName) { + return topic + KV_SEPARATOR + groupName; + } + + @Value + @RequiredArgsConstructor(staticName = "of") + private static class VersionWithData { + int version; + Map positionByPratition; + } + +} diff --git a/plugins/redis-positions-storage/src/main/java/com/github/bsideup/liiklus/positions/redis/config/RedisPositionsConfiguration.java b/plugins/redis-positions-storage/src/main/java/com/github/bsideup/liiklus/positions/redis/config/RedisPositionsConfiguration.java new file mode 100644 index 00000000..fada4418 --- /dev/null +++ b/plugins/redis-positions-storage/src/main/java/com/github/bsideup/liiklus/positions/redis/config/RedisPositionsConfiguration.java @@ -0,0 +1,57 @@ +package com.github.bsideup.liiklus.positions.redis.config; + +import com.github.bsideup.liiklus.positions.PositionsStorage; +import com.github.bsideup.liiklus.positions.redis.RedisPositionsStorage; +import com.google.auto.service.AutoService; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.context.properties.bind.Binder; +import org.springframework.context.ApplicationContextInitializer; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.validation.annotation.Validated; + +import javax.validation.constraints.NotEmpty; + +@Slf4j +@AutoService(ApplicationContextInitializer.class) +public class RedisPositionsConfiguration implements ApplicationContextInitializer { + + @Override + public void initialize(GenericApplicationContext applicationContext) { + ConfigurableEnvironment environment = applicationContext.getEnvironment(); + + var type = environment.getProperty("storage.positions.type"); + if(!"REDIS".equals(type)) { + return; + } + + RedisProperties redisProperties = Binder.get(environment) + .bind("redis", RedisProperties.class) + .orElseGet(RedisProperties::new); + + applicationContext.registerBean(PositionsStorage.class, + () -> { + RedisURI redisURI = RedisURI.builder() + .withHost(redisProperties.getHost()) + .withPort(redisProperties.getPort()) + .build(); + + RedisClient redisClient = RedisClient.create(redisURI); + + return new RedisPositionsStorage(redisClient); + }, + bd -> bd.setDestroyMethodName("shutdown") + ); + + } + + @Data + @Validated + public static class RedisProperties { + @NotEmpty String host; + @NotEmpty int port; + } +} diff --git a/plugins/redis-positions-storage/src/test/java/com/github/bsideup/liiklus/positions/redis/RedisPositionsStorageTest.java b/plugins/redis-positions-storage/src/test/java/com/github/bsideup/liiklus/positions/redis/RedisPositionsStorageTest.java new file mode 100644 index 00000000..3a5e3cf8 --- /dev/null +++ b/plugins/redis-positions-storage/src/test/java/com/github/bsideup/liiklus/positions/redis/RedisPositionsStorageTest.java @@ -0,0 +1,26 @@ +package com.github.bsideup.liiklus.positions.redis; + +import com.github.bsideup.liiklus.positions.PositionsStorage; +import com.github.bsideup.liiklus.positions.PositionsStorageTests; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import lombok.Getter; +import org.testcontainers.containers.GenericContainer; + +class RedisPositionsStorageTest implements PositionsStorageTests { + public static final GenericContainer redis = new GenericContainer("redis:3.0.6") + .withExposedPorts(6379); + + static { + redis.start(); + } + + private final RedisClient redisClient = RedisClient.create(RedisURI.builder() + .withHost(redis.getContainerIpAddress()) + .withPort(redis.getMappedPort(6379)) + .build()); + + @Getter + PositionsStorage storage = new RedisPositionsStorage(redisClient); + +} diff --git a/plugins/redis-positions-storage/src/test/java/com/github/bsideup/liiklus/positions/redis/config/RedisPositionsConfigurationTest.java b/plugins/redis-positions-storage/src/test/java/com/github/bsideup/liiklus/positions/redis/config/RedisPositionsConfigurationTest.java new file mode 100644 index 00000000..efb809d5 --- /dev/null +++ b/plugins/redis-positions-storage/src/test/java/com/github/bsideup/liiklus/positions/redis/config/RedisPositionsConfigurationTest.java @@ -0,0 +1,54 @@ +package com.github.bsideup.liiklus.positions.redis.config; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.mock.env.MockEnvironment; + +import static org.assertj.core.api.Assertions.assertThat; + +@ExtendWith(MockitoExtension.class) +class RedisPositionsConfigurationTest { + @InjectMocks RedisPositionsConfiguration redisPositionsConfiguration; + + MockEnvironment configurableEnvironment; + GenericApplicationContext genericApplicationContext; + + @BeforeEach + void setUp() { + configurableEnvironment = new MockEnvironment(); + genericApplicationContext = new GenericApplicationContext(); + genericApplicationContext.setEnvironment(configurableEnvironment); + } + + @Test + void should_skip_when_position_storage_not_redis() { + //given + + //when + redisPositionsConfiguration.initialize(this.genericApplicationContext); + genericApplicationContext.refresh(); + + //then + assertThat(genericApplicationContext.getBeanDefinitionNames()).hasSize(0); + + } + + @Test + void should_register_positions_storage_bean_when_type_is_redis() { + //given + configurableEnvironment.setProperty("storage.positions.type", "REDIS"); + configurableEnvironment.setProperty("redis.host", "host"); + configurableEnvironment.setProperty("redis.port", "8888"); + + //when + redisPositionsConfiguration.initialize(genericApplicationContext); + + //then + assertThat(genericApplicationContext.getBeanDefinitionNames().length).isGreaterThan(1); + } + +}