Skip to content

Commit

Permalink
bsideup#141 add redis backed positions storage
Browse files Browse the repository at this point in the history
  • Loading branch information
lavcraft committed Jul 12, 2019
1 parent 454026f commit 330fcee
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 0 deletions.
40 changes: 40 additions & 0 deletions plugins/redis-positions-storage/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
test {
useJUnitPlatform()
}

jar {
manifest {
attributes(
'Plugin-Id': "${project.name}",
'Plugin-Version': "${project.version}",
)
}

into('lib') {
from configurations.compile
}
}

dependencies {
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
compileOnly 'com.google.auto.service:auto-service'
annotationProcessor 'com.google.auto.service:auto-service'

compileOnly 'org.springframework.boot:spring-boot-starter'
compileOnly 'javax.validation:validation-api'
compileOnly project(":api")
compileOnly 'io.projectreactor:reactor-core'

compile 'io.lettuce:lettuce-core:5.1.7.RELEASE'

testCompileOnly 'org.projectlombok:lombok'
testCompileOnly 'org.springframework.boot:spring-boot-starter'
testCompileOnly 'org.springframework.boot:spring-boot-starter-test'
testAnnotationProcessor 'org.projectlombok:lombok'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
testCompile project(":tck")
testCompile 'org.assertj:assertj-core'
testCompile 'org.testcontainers:testcontainers'
testCompile 'org.mockito:mockito-junit-jupiter'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package com.github.bsideup.liiklus.positions.redis;

import com.github.bsideup.liiklus.positions.GroupId;
import com.github.bsideup.liiklus.positions.PositionsStorage;
import com.google.common.collect.ImmutableMap;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;

import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collector;
import java.util.stream.Collectors;

@Slf4j
public class RedisPositionsStorage implements PositionsStorage {
private static final String KV_SEPARATOR = ":";

private static final Collector<Map.Entry<String, String>, ?, Map<Integer, Long>> ENTRY_MAP_COLLECTOR = Collectors.toMap(
o -> {
String key = o.getKey();
String[] split = key.split(KV_SEPARATOR);
return Integer.parseInt(split[1]);
},
o -> Long.parseLong(o.getValue())
);

private final StatefulRedisConnection<String, String> connection;

public RedisPositionsStorage(RedisClient redisClient) {
connection = redisClient.connect();
}

@Override
public Publisher<Positions> findAll() {
return connection.reactive()
.keys("*")
.flatMap(s -> connection.reactive().hgetall(s)
.map(stringStringMap -> {
String[] split = s.split(KV_SEPARATOR);
return new Positions(split[0], GroupId.ofString(split[1]), stringStringMap
.entrySet().stream().collect(ENTRY_MAP_COLLECTOR));
})
);
}

@Override
public CompletionStage<Map<Integer, Long>> findAll(String topic, GroupId groupId) {
return connection.async()
.hgetall(toKey(topic, groupId.asString()))
.thenApply(stringStringMap -> stringStringMap.entrySet().stream()
.collect(ENTRY_MAP_COLLECTOR));
}

@Override
public CompletionStage<Map<Integer, Map<Integer, Long>>> findAllVersionsByGroup(String topic, String groupName) {
return connection.reactive()
.keys(toKey(topic, groupName) + "*")
.flatMap(s -> connection.reactive()
.hgetall(s)
.map(stringStringMap -> stringStringMap.entrySet()
.stream().collect(ENTRY_MAP_COLLECTOR))
.map(integerLongMap -> new VersionWithData(
GroupId.ofString(s.split(":")[1]).getVersion().orElse(0),
integerLongMap)
)
)
.collectMap(
VersionWithData::getVersion,
VersionWithData::getPositionByPratition
).toFuture();
}

@Override
public CompletionStage<Void> update(String topic, GroupId groupId, int partition, long position) {
return connection.async()
.hmset(
toKey(topic, groupId.asString()),
ImmutableMap.of(
groupId.getVersion().orElse(0).toString() + KV_SEPARATOR + partition,
Long.toString(position)
)
).thenAccept(s -> log.debug("set is {} for {}", s, topic));
}

/**
* Should close connection after stop context
*/
public void shutdown() {
connection.close();
}

private static String toKey(String topic, String groupName) {
return topic + KV_SEPARATOR + groupName;
}

@Value
@RequiredArgsConstructor(staticName = "of")
private static class VersionWithData {
int version;
Map<Integer, Long> positionByPratition;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.github.bsideup.liiklus.positions.redis.config;

import com.github.bsideup.liiklus.positions.PositionsStorage;
import com.github.bsideup.liiklus.positions.redis.RedisPositionsStorage;
import com.google.auto.service.AutoService;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.validation.annotation.Validated;

import javax.validation.constraints.NotEmpty;

@Slf4j
@AutoService(ApplicationContextInitializer.class)
public class RedisPositionsConfiguration implements ApplicationContextInitializer<GenericApplicationContext> {

@Override
public void initialize(GenericApplicationContext applicationContext) {
ConfigurableEnvironment environment = applicationContext.getEnvironment();

var type = environment.getProperty("storage.positions.type");
if(!"REDIS".equals(type)) {
return;
}

RedisProperties redisProperties = Binder.get(environment)
.bind("redis", RedisProperties.class)
.orElseGet(RedisProperties::new);

applicationContext.registerBean(PositionsStorage.class,
() -> {
RedisURI redisURI = RedisURI.builder()
.withHost(redisProperties.getHost())
.withPort(redisProperties.getPort())
.build();

RedisClient redisClient = RedisClient.create(redisURI);

return new RedisPositionsStorage(redisClient);
},
bd -> bd.setDestroyMethodName("shutdown")
);

}

@Data
@Validated
public static class RedisProperties {
@NotEmpty String host;
@NotEmpty int port;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.github.bsideup.liiklus.positions.redis;

import com.github.bsideup.liiklus.positions.PositionsStorage;
import com.github.bsideup.liiklus.positions.PositionsStorageTests;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import lombok.Getter;
import org.testcontainers.containers.GenericContainer;

class RedisPositionsStorageTest implements PositionsStorageTests {
public static final GenericContainer redis = new GenericContainer("redis:3.0.6")
.withExposedPorts(6379);

static {
redis.start();
}

private final RedisClient redisClient = RedisClient.create(RedisURI.builder()
.withHost(redis.getContainerIpAddress())
.withPort(redis.getMappedPort(6379))
.build());

@Getter
PositionsStorage storage = new RedisPositionsStorage(redisClient);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.github.bsideup.liiklus.positions.redis.config;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.mock.env.MockEnvironment;

import static org.assertj.core.api.Assertions.assertThat;

@ExtendWith(MockitoExtension.class)
class RedisPositionsConfigurationTest {
@InjectMocks RedisPositionsConfiguration redisPositionsConfiguration;

MockEnvironment configurableEnvironment;
GenericApplicationContext genericApplicationContext;

@BeforeEach
void setUp() {
configurableEnvironment = new MockEnvironment();
genericApplicationContext = new GenericApplicationContext();
genericApplicationContext.setEnvironment(configurableEnvironment);
}

@Test
void should_skip_when_position_storage_not_redis() {
//given

//when
redisPositionsConfiguration.initialize(this.genericApplicationContext);
genericApplicationContext.refresh();

//then
assertThat(genericApplicationContext.getBeanDefinitionNames()).hasSize(0);

}

@Test
void should_register_positions_storage_bean_when_type_is_redis() {
//given
configurableEnvironment.setProperty("storage.positions.type", "REDIS");
configurableEnvironment.setProperty("redis.host", "host");
configurableEnvironment.setProperty("redis.port", "8888");

//when
redisPositionsConfiguration.initialize(genericApplicationContext);

//then
assertThat(genericApplicationContext.getBeanDefinitionNames().length).isGreaterThan(1);
}

}

0 comments on commit 330fcee

Please sign in to comment.