diff --git a/.gitignore b/.gitignore index d5744b84..87ea0329 100644 --- a/.gitignore +++ b/.gitignore @@ -18,8 +18,6 @@ bin/ /config/ -/protocol/generated/ -/examples/java/generated/ - +generated/ infrastructure.yaml diff --git a/Dockerfile b/Dockerfile index 3d43ef27..010a22a9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,7 +20,7 @@ WORKDIR /app RUN java -Xshare:dump -COPY --from=workspace /root/project/app/build/libs/app.jar app.jar +COPY --from=workspace /root/project/app/build/libs/app-boot.jar app.jar COPY --from=workspace /root/project/plugins/*/build/libs/*.jar plugins/ ENV JAVA_OPTS="" diff --git a/app/build.gradle b/app/build.gradle index 61eaa631..27b09c88 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -4,6 +4,14 @@ test { useJUnit() } +jar { + enabled = true +} + +bootJar { + archiveClassifier = 'boot' +} + dependencies { compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' @@ -14,11 +22,7 @@ dependencies { compile 'org.springframework.boot:spring-boot-starter-webflux' compile 'org.springframework.fu:spring-fu-autoconfigure-adapter' - compile 'io.grpc:grpc-netty' - compile 'io.grpc:grpc-services' - compile 'io.prometheus:simpleclient_common' compile 'org.pf4j:pf4j' - compile 'io.rsocket:rsocket-transport-netty' testCompileOnly 'org.projectlombok:lombok' testAnnotationProcessor 'org.projectlombok:lombok' diff --git a/app/src/main/java/com/github/bsideup/liiklus/Application.java b/app/src/main/java/com/github/bsideup/liiklus/Application.java index e3c52aff..0f82438f 100644 --- a/app/src/main/java/com/github/bsideup/liiklus/Application.java +++ b/app/src/main/java/com/github/bsideup/liiklus/Application.java @@ -1,13 +1,9 @@ package com.github.bsideup.liiklus; -import com.github.bsideup.liiklus.config.GRPCConfiguration; -import com.github.bsideup.liiklus.config.LayersConfiguration; -import com.github.bsideup.liiklus.config.MetricsConfiguration; -import com.github.bsideup.liiklus.config.RSocketConfiguration; -import com.github.bsideup.liiklus.monitoring.MetricsCollector; +import com.github.bsideup.liiklus.config.GatewayConfiguration; import com.github.bsideup.liiklus.plugins.LiiklusPluginManager; -import io.prometheus.client.exporter.common.TextFormat; import lombok.extern.slf4j.Slf4j; +import org.pf4j.PluginManager; import org.springframework.boot.SpringApplication; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -24,19 +20,13 @@ import org.springframework.context.ApplicationContextInitializer; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.support.GenericApplicationContext; -import org.springframework.core.env.Profiles; import org.springframework.core.env.SimpleCommandLinePropertySource; import org.springframework.core.env.StandardEnvironment; -import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; +import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.RouterFunctions; import org.springframework.web.reactive.function.server.ServerResponse; -import org.springframework.web.reactive.function.server.support.RouterFunctionMapping; -import java.io.IOException; -import java.io.StringWriter; import java.nio.file.Paths; -import java.util.Collections; @Slf4j @SpringBootApplication @@ -99,35 +89,15 @@ protected void load(ApplicationContext context, Object[] sources) { binder.bind("spring.webflux", WebFluxProperties.class).orElseGet(WebFluxProperties::new), new NettyReactiveWebServerFactory() ), - new GRPCConfiguration(), - new RSocketConfiguration(), - new LayersConfiguration(), - new MetricsConfiguration(), + new GatewayConfiguration(), (GenericApplicationContext applicationContext) -> { - applicationContext.registerBean(RouterFunctionMapping.class, () -> { - var router = RouterFunctions.route(); - router.GET("/health", __ -> ServerResponse.ok().syncBody("OK")); - - if (environment.acceptsProfiles(Profiles.of("exporter"))) { - var metricsCollector = applicationContext.getBean(MetricsCollector.class); - router.GET("/prometheus", __ -> { - return metricsCollector.collect() - .collectList() - .flatMap(metrics -> { - try { - var writer = new StringWriter(); - TextFormat.write004(writer, Collections.enumeration(metrics)); - return ServerResponse.ok() - .contentType(MediaType.valueOf(TextFormat.CONTENT_TYPE_004)) - .syncBody(writer.toString()); - } catch (IOException e) { - return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); - } - }); - }); - } - return new RouterFunctionMapping(router.build()); + applicationContext.registerBean("health", RouterFunction.class, () -> { + return RouterFunctions.route() + .GET("/health", __ -> ServerResponse.ok().syncBody("OK")) + .build(); }); + + applicationContext.registerBean(PluginManager.class, () -> pluginManager); } ); diff --git a/app/src/main/java/com/github/bsideup/liiklus/config/LayersConfiguration.java b/app/src/main/java/com/github/bsideup/liiklus/config/GatewayConfiguration.java similarity index 89% rename from app/src/main/java/com/github/bsideup/liiklus/config/LayersConfiguration.java rename to app/src/main/java/com/github/bsideup/liiklus/config/GatewayConfiguration.java index 9fdbf2b7..d4934254 100644 --- a/app/src/main/java/com/github/bsideup/liiklus/config/LayersConfiguration.java +++ b/app/src/main/java/com/github/bsideup/liiklus/config/GatewayConfiguration.java @@ -2,6 +2,7 @@ import com.github.bsideup.liiklus.records.RecordPostProcessor; import com.github.bsideup.liiklus.records.RecordPreProcessor; +import com.github.bsideup.liiklus.service.LiiklusService; import lombok.Data; import org.springframework.boot.context.properties.bind.Binder; import org.springframework.context.ApplicationContextInitializer; @@ -13,11 +14,12 @@ import java.util.Map; import java.util.stream.Collectors; -public class LayersConfiguration implements ApplicationContextInitializer { +public class GatewayConfiguration implements ApplicationContextInitializer { @Override public void initialize(GenericApplicationContext applicationContext) { var environment = applicationContext.getEnvironment(); + if (!environment.acceptsProfiles(Profiles.of("gateway"))) { return; } @@ -40,6 +42,8 @@ public void initialize(GenericApplicationContext applicationContext) { .sorted(comparator.reversed()) .collect(Collectors.toList()) )); + + applicationContext.registerBean(LiiklusService.class); } @Data diff --git a/app/src/main/java/com/github/bsideup/liiklus/config/MetricsConfiguration.java b/app/src/main/java/com/github/bsideup/liiklus/config/MetricsConfiguration.java deleted file mode 100644 index 22d1bb75..00000000 --- a/app/src/main/java/com/github/bsideup/liiklus/config/MetricsConfiguration.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.github.bsideup.liiklus.config; - -import com.github.bsideup.liiklus.monitoring.MetricsCollector; -import org.springframework.context.ApplicationContextInitializer; -import org.springframework.context.support.GenericApplicationContext; -import org.springframework.core.env.Profiles; - -public class MetricsConfiguration implements ApplicationContextInitializer { - - @Override - public void initialize(GenericApplicationContext applicationContext) { - var environment = applicationContext.getEnvironment(); - - if (!environment.acceptsProfiles(Profiles.of("exporter"))) { - return; - } - - applicationContext.registerBean(MetricsCollector.class); - } -} \ No newline at end of file diff --git a/app/src/main/java/com/github/bsideup/liiklus/service/ReactorLiiklusServiceImpl.java b/app/src/main/java/com/github/bsideup/liiklus/service/LiiklusService.java similarity index 87% rename from app/src/main/java/com/github/bsideup/liiklus/service/ReactorLiiklusServiceImpl.java rename to app/src/main/java/com/github/bsideup/liiklus/service/LiiklusService.java index bb833b63..4a1cdcb6 100644 --- a/app/src/main/java/com/github/bsideup/liiklus/service/ReactorLiiklusServiceImpl.java +++ b/app/src/main/java/com/github/bsideup/liiklus/service/LiiklusService.java @@ -15,9 +15,6 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import com.google.protobuf.Timestamp; -import com.salesforce.reactorgrpc.stub.SubscribeOnlyOnceLifter; -import io.grpc.Status; -import io.netty.buffer.ByteBuf; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.Value; @@ -25,7 +22,6 @@ import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.Operators; import reactor.core.publisher.SignalType; import java.util.*; @@ -41,7 +37,7 @@ @RequiredArgsConstructor @FieldDefaults(makeFinal = true) @Slf4j -public class ReactorLiiklusServiceImpl extends ReactorLiiklusServiceGrpc.LiiklusServiceImplBase implements LiiklusService { +public class LiiklusService { private static final NavigableMap> EMPTY_ACKED_OFFSETS = Collections.unmodifiableNavigableMap(new TreeMap<>()); @@ -57,12 +53,6 @@ public class ReactorLiiklusServiceImpl extends ReactorLiiklusServiceGrpc.Liiklus RecordPostProcessorChain recordPostProcessorChain; - @Override - public Mono publish(PublishRequest message, ByteBuf metadata) { - return publish(Mono.just(message)); - } - - @Override public Mono publish(Mono requestMono) { return requestMono .map(request -> new Envelope( @@ -90,16 +80,9 @@ public Mono publish(Mono requestMono) { .setOffset(it.getOffset()) .build() ) - .log("publish", Level.SEVERE, SignalType.ON_ERROR) - .onErrorMap(e -> Status.INTERNAL.withCause(e).withDescription(e.getMessage()).asException()); - } - - @Override - public Flux subscribe(SubscribeRequest message, ByteBuf metadata) { - return subscribe(Mono.just(message)); + .log("publish", Level.SEVERE, SignalType.ON_ERROR); } - @Override public Flux subscribe(Mono requestFlux) { return requestFlux .flatMapMany(subscribe -> { @@ -166,7 +149,7 @@ public Flux subscribe(Mono requestFlux) { Flux.from(source.getPublisher()) .log("partition-" + partition, Level.WARNING, SignalType.ON_ERROR) .doFinally(__ -> sourcesByPartition.remove(partition)) - .transform(Operators.lift(new SubscribeOnlyOnceLifter())) + // TODO .transform(Operators.lift(new SubscribeOnlyOnceLifter())) ) ); @@ -183,16 +166,9 @@ public Flux subscribe(Mono requestFlux) { subscriptions.remove(sessionId, storedSubscription); }); }) - .log("subscribe", Level.SEVERE, SignalType.ON_ERROR) - .onErrorMap(e -> Status.INTERNAL.withCause(e).withDescription(e.getMessage()).asException()); - } - - @Override - public Flux receive(ReceiveRequest message, ByteBuf metadata) { - return receive(Mono.just(message)); + .log("subscribe", Level.SEVERE, SignalType.ON_ERROR); } - @Override public Flux receive(Mono requestMono) { return requestMono .flatMapMany(request -> { @@ -235,16 +211,9 @@ public Flux receive(Mono requestMono) { }); }); }) - .log("receive", Level.SEVERE, SignalType.ON_ERROR) - .onErrorMap(e -> Status.INTERNAL.withCause(e).withDescription(e.getMessage()).asException()); + .log("receive", Level.SEVERE, SignalType.ON_ERROR); } - @Override - public Mono ack(AckRequest message, ByteBuf metadata) { - return ack(Mono.just(message)); - } - - @Override public Mono ack(Mono request) { return request .flatMap(ack -> { @@ -277,16 +246,9 @@ public Mono ack(Mono request) { )); }) .thenReturn(Empty.getDefaultInstance()) - .log("ack", Level.SEVERE, SignalType.ON_ERROR) - .onErrorMap(e -> Status.INTERNAL.withCause(e).withDescription(e.getMessage()).asException()); - } - - @Override - public Mono getOffsets(GetOffsetsRequest message, ByteBuf metadata) { - return getOffsets(Mono.just(message)); + .log("ack", Level.SEVERE, SignalType.ON_ERROR); } - @Override public Mono getOffsets(Mono request) { return request.flatMap(getOffsets -> Mono .fromCompletionStage(positionsStorage.findAll( @@ -299,20 +261,13 @@ public Mono getOffsets(Mono request) { .defaultIfEmpty(emptyMap()) .map(offsets -> GetOffsetsReply.newBuilder().putAllOffsets(offsets).build()) .log("getOffsets", Level.SEVERE, SignalType.ON_ERROR) - .onErrorMap(e -> Status.INTERNAL.withCause(e).withDescription(e.getMessage()).asException()) ); } - @Override - public Mono getEndOffsets(GetEndOffsetsRequest message, ByteBuf metadata) { - return getEndOffsets(Mono.just(message)); - } - - @Override public Mono getEndOffsets(Mono request) { return request.flatMap(getEndOffsets -> { if (!(recordsStorage instanceof FiniteRecordsStorage)) { - return Mono.error(Status.INTERNAL.withDescription("The record storage is not finite").asException()); + return Mono.error(new IllegalStateException("The record storage is not finite")); } var topic = getEndOffsets.getTopic(); diff --git a/app/src/test/java/com/github/bsideup/liiklus/AckTest.java b/app/src/test/java/com/github/bsideup/liiklus/AckTest.java index b910b2d6..8fba29bd 100644 --- a/app/src/test/java/com/github/bsideup/liiklus/AckTest.java +++ b/app/src/test/java/com/github/bsideup/liiklus/AckTest.java @@ -40,7 +40,17 @@ public void setUpAckTest() throws Exception { public void testManualAck() throws Exception { Integer partition = stub.subscribe(subscribeRequest) .take(1) - .delayUntil(it -> stub.ack(AckRequest.newBuilder().setAssignment(it.getAssignment()).setOffset(100).build())) + .delayUntil(it -> { + return stub.ack( + AckRequest.newBuilder() + .setTopic(subscribeRequest.getTopic()) + .setGroup(subscribeRequest.getGroup()) + .setGroupVersion(subscribeRequest.getGroupVersion()) + .setPartition(it.getAssignment().getPartition()) + .setOffset(100) + .build() + ); + }) .map(it -> it.getAssignment().getPartition()) .blockFirst(Duration.ofSeconds(30)); diff --git a/app/src/test/java/com/github/bsideup/liiklus/config/LayersConfigurationTest.java b/app/src/test/java/com/github/bsideup/liiklus/config/GatewayConfigurationTest.java similarity index 92% rename from app/src/test/java/com/github/bsideup/liiklus/config/LayersConfigurationTest.java rename to app/src/test/java/com/github/bsideup/liiklus/config/GatewayConfigurationTest.java index e9ffd936..c2c3e2c1 100644 --- a/app/src/test/java/com/github/bsideup/liiklus/config/LayersConfigurationTest.java +++ b/app/src/test/java/com/github/bsideup/liiklus/config/GatewayConfigurationTest.java @@ -17,13 +17,13 @@ import static org.assertj.core.api.Assertions.assertThat; @RunWith(MockitoJUnitRunner.class) -public class LayersConfigurationTest { +public class GatewayConfigurationTest { MockEnvironment environment; StaticApplicationContext applicationContext; - final LayersConfiguration layersConfiguration = new LayersConfiguration(); + final GatewayConfiguration gatewayConfiguration = new GatewayConfiguration(); @Before public void setUp() throws Exception { @@ -40,7 +40,7 @@ public void setUp() throws Exception { @Test public void testOrderWithEmptyOrders() throws Exception { - layersConfiguration.initialize(applicationContext); + gatewayConfiguration.initialize(applicationContext); assertThat(applicationContext.getBean(RecordPreProcessorChain.class).getAll()).containsExactly( P1.INSTANCE, @@ -58,7 +58,7 @@ public void testOrderWithEmptyOrders() throws Exception { public void testOrderWithDefaultOrder() throws Exception { setOrder(P2.class, 0); - layersConfiguration.initialize(applicationContext); + gatewayConfiguration.initialize(applicationContext); assertThat(applicationContext.getBean(RecordPreProcessorChain.class).getAll()).containsExactly( P1.INSTANCE, P2.INSTANCE, @@ -75,7 +75,7 @@ public void testOrderWithDefaultOrder() throws Exception { public void testNegativeOrders() throws Exception { setOrder(P2.class, -100); setOrder(P3.class, -50); - layersConfiguration.initialize(applicationContext); + gatewayConfiguration.initialize(applicationContext); assertThat(applicationContext.getBean(RecordPreProcessorChain.class).getAll()).containsExactly( P2.INSTANCE, @@ -94,7 +94,7 @@ public void testNegativeOrders() throws Exception { public void testPositiveOrders() throws Exception { setOrder(P2.class, 100); setOrder(P3.class, 50); - layersConfiguration.initialize(applicationContext); + gatewayConfiguration.initialize(applicationContext); assertThat(applicationContext.getBean(RecordPreProcessorChain.class).getAll()).containsExactly( P1.INSTANCE, @@ -113,7 +113,7 @@ public void testPositiveOrders() throws Exception { public void testOrderClashing() throws Exception { setOrder(P3.class, -100); setOrder(P2.class, -100); - layersConfiguration.initialize(applicationContext); + gatewayConfiguration.initialize(applicationContext); assertThat(applicationContext.getBean(RecordPreProcessorChain.class).getAll()).containsExactly( P2.INSTANCE, diff --git a/app/src/test/java/com/github/bsideup/liiklus/test/AbstractIntegrationTest.java b/app/src/test/java/com/github/bsideup/liiklus/test/AbstractIntegrationTest.java index 65f16de3..137e65dc 100644 --- a/app/src/test/java/com/github/bsideup/liiklus/test/AbstractIntegrationTest.java +++ b/app/src/test/java/com/github/bsideup/liiklus/test/AbstractIntegrationTest.java @@ -4,16 +4,19 @@ import com.github.bsideup.liiklus.GRPCLiiklusClient; import com.github.bsideup.liiklus.LiiklusClient; import com.github.bsideup.liiklus.RSocketLiiklusClient; -import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Server; import io.rsocket.RSocketFactory; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.server.CloseableChannel; import org.junit.Rule; import org.junit.rules.TestName; +import org.pf4j.PluginManager; import org.springframework.context.ApplicationContext; import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.time.Duration; import java.util.*; @@ -46,27 +49,54 @@ public static int getPartitionByKey(String key) { protected static final ApplicationContext applicationContext; static { - System.setProperty("server.port", "0"); - System.setProperty("grpc.enabled", "false"); System.setProperty("plugins.dir", "../plugins"); System.setProperty("plugins.pathMatcher", "*/build/libs/*.jar"); var args = Arrays.asList( - "grpc.inProcessServerName=liiklus", "storage.positions.type=MEMORY", "storage.records.type=MEMORY", - "rsocket.port=0" + "rsocket.port=0", + "grpc.port=0", + "server.port=0" ); applicationContext = Application.start(args.stream().map(it -> "--" + it).toArray(String[]::new)); + var pluginManager = applicationContext.getBean(PluginManager.class); boolean useGrpc = false; if (useGrpc) { - stub = new GRPCLiiklusClient(InProcessChannelBuilder.forName("liiklus").build()); + try { + var classLoader = pluginManager.getPluginClassLoader("grpc-transport"); + var serverClazz = classLoader.loadClass(Server.class.getName()); + var getPortMethod = serverClazz.getDeclaredMethod("getPort"); + var server = applicationContext.getBean(serverClazz); + + stub = new GRPCLiiklusClient( + ManagedChannelBuilder + .forAddress("localhost", (int) getPortMethod.invoke(server)) + .usePlaintext() + .build() + ); + } catch (Exception e) { + throw new RuntimeException(e); + } } else { - var transport = TcpClientTransport.create(applicationContext.getBean(CloseableChannel.class).address()); - var rSocket = RSocketFactory.connect().transport(transport).start().block(); - stub = new RSocketLiiklusClient(rSocket); + try { + var classLoader = pluginManager.getPluginClassLoader("rsocket-transport"); + var closeableChannelClass = classLoader.loadClass(CloseableChannel.class.getName()); + var addressMethod = closeableChannelClass.getDeclaredMethod("address"); + var closeableChannel = applicationContext.getBean(closeableChannelClass); + + stub = new RSocketLiiklusClient( + RSocketFactory.connect() + .transport( + TcpClientTransport.create((InetSocketAddress) addressMethod.invoke(closeableChannel)) + ) + .start().block() + ); + } catch (Exception e) { + throw new RuntimeException(e); + } } Hooks.onOperatorDebug(); diff --git a/client/build.gradle b/client/build.gradle index 612bab61..fc9223c9 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -1,3 +1,57 @@ +plugins { + id "idea" +} + +apply plugin: "com.google.protobuf" + +protobuf { + protoc { + artifact = 'com.google.protobuf:protoc' + } + + generatedFilesBaseDir = "$projectDir/generated" + + plugins { + grpc { + artifact = 'io.grpc:protoc-gen-grpc-java' + } + + reactor { + artifact = "com.salesforce.servicelibs:reactor-grpc:0.9.0:jdk8@jar" + } + + rsocketRpc { + artifact = 'io.rsocket.rpc:rsocket-rpc-protobuf' + } + } + + generateProtoTasks { + ofSourceSet('main').each { task -> + task.builtins { + remove java + } + task.plugins { + grpc { } + reactor { } + rsocketRpc { } + } + } + } +} + +clean { + delete protobuf.generatedFilesBaseDir +} + +idea { + module { + generatedSourceDirs += file("${protobuf.generatedFilesBaseDir}/main/java") + generatedSourceDirs += file("${protobuf.generatedFilesBaseDir}/main/reactor") + generatedSourceDirs += file("${protobuf.generatedFilesBaseDir}/main/grpc") + generatedSourceDirs += file("${protobuf.generatedFilesBaseDir}/main/rsocketRpc") + } +} + sourceCompatibility = targetCompatibility = 8 test { @@ -8,8 +62,14 @@ dependencies { compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' + protobuf project(":protocol") compile project(":protocol") + compile 'io.grpc:grpc-stub' + compile 'io.grpc:grpc-protobuf' + compile 'com.salesforce.servicelibs:reactor-grpc-stub' + compile 'io.rsocket.rpc:rsocket-rpc-core' + compile 'io.rsocket:rsocket-transport-netty' compile 'io.grpc:grpc-netty' compile 'com.google.protobuf:protobuf-java-util' diff --git a/plugins/grpc-transport/build.gradle b/plugins/grpc-transport/build.gradle new file mode 100644 index 00000000..a882f4e3 --- /dev/null +++ b/plugins/grpc-transport/build.gradle @@ -0,0 +1,77 @@ +plugins { + id "idea" +} + +apply plugin: "com.google.protobuf" + +protobuf { + protoc { + artifact = 'com.google.protobuf:protoc' + } + + generatedFilesBaseDir = "$projectDir/generated" + + plugins { + grpc { + artifact = 'io.grpc:protoc-gen-grpc-java' + } + + reactor { + artifact = "com.salesforce.servicelibs:reactor-grpc:0.9.0:jdk8@jar" + } + } + + generateProtoTasks { + ofSourceSet('main').each { task -> + task.builtins { + remove java + } + task.plugins { + grpc { } + reactor { } + } + } + } +} + +clean { + delete protobuf.generatedFilesBaseDir +} + +idea { + module { + generatedSourceDirs += file("${protobuf.generatedFilesBaseDir}/main/java") + generatedSourceDirs += file("${protobuf.generatedFilesBaseDir}/main/grpc") + generatedSourceDirs += file("${protobuf.generatedFilesBaseDir}/main/reactor") + } +} + +jar { + manifest { + attributes( + 'Plugin-Id': "${project.name}", + 'Plugin-Version': "${project.version}", + ) + } + + into('lib') { + from(configurations.compile - configurations.compileOnly) + } +} + +dependencies { + compileOnly 'org.projectlombok:lombok' + annotationProcessor 'org.projectlombok:lombok' + compileOnly 'com.google.auto.service:auto-service' + annotationProcessor 'com.google.auto.service:auto-service' + + compileOnly project(":app") + + protobuf project(":protocol") + + compile 'com.salesforce.servicelibs:reactor-grpc-stub' + compile 'io.grpc:grpc-stub' + compile 'io.grpc:grpc-protobuf' + compile 'io.grpc:grpc-netty' + compile 'io.grpc:grpc-services' +} diff --git a/plugins/grpc-transport/src/main/java/com/github/bsideup/liiklus/transport/grpc/GRPCLiiklusService.java b/plugins/grpc-transport/src/main/java/com/github/bsideup/liiklus/transport/grpc/GRPCLiiklusService.java new file mode 100644 index 00000000..0398f2b6 --- /dev/null +++ b/plugins/grpc-transport/src/main/java/com/github/bsideup/liiklus/transport/grpc/GRPCLiiklusService.java @@ -0,0 +1,66 @@ +package com.github.bsideup.liiklus.transport.grpc; + +import com.github.bsideup.liiklus.protocol.AckRequest; +import com.github.bsideup.liiklus.protocol.GetEndOffsetsReply; +import com.github.bsideup.liiklus.protocol.GetEndOffsetsRequest; +import com.github.bsideup.liiklus.protocol.GetOffsetsReply; +import com.github.bsideup.liiklus.protocol.GetOffsetsRequest; +import com.github.bsideup.liiklus.protocol.PublishReply; +import com.github.bsideup.liiklus.protocol.PublishRequest; +import com.github.bsideup.liiklus.protocol.ReactorLiiklusServiceGrpc; +import com.github.bsideup.liiklus.protocol.ReceiveReply; +import com.github.bsideup.liiklus.protocol.ReceiveRequest; +import com.github.bsideup.liiklus.protocol.SubscribeReply; +import com.github.bsideup.liiklus.protocol.SubscribeRequest; +import com.github.bsideup.liiklus.service.LiiklusService; +import com.google.protobuf.Empty; +import io.grpc.Status; +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import lombok.experimental.FieldDefaults; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@RequiredArgsConstructor +@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) +public class GRPCLiiklusService extends ReactorLiiklusServiceGrpc.LiiklusServiceImplBase { + + LiiklusService liiklusService; + + @Override + public Mono publish(Mono request) { + return liiklusService.publish(request) + .onErrorMap(e -> Status.INTERNAL.withCause(e).withDescription(e.getMessage()).asException()); + } + + @Override + public Flux subscribe(Mono request) { + return liiklusService.subscribe(request) + .onErrorMap(e -> Status.INTERNAL.withCause(e).withDescription(e.getMessage()).asException()); + } + + @Override + public Flux receive(Mono request) { + return liiklusService.receive(request) + .onErrorMap(e -> Status.INTERNAL.withCause(e).withDescription(e.getMessage()).asException()); + } + + @Override + public Mono ack(Mono request) { + return liiklusService.ack(request) + .onErrorMap(e -> Status.INTERNAL.withCause(e).withDescription(e.getMessage()).asException()); + } + + @Override + public Mono getOffsets(Mono request) { + return liiklusService.getOffsets(request) + .onErrorMap(e -> Status.INTERNAL.withCause(e).withDescription(e.getMessage()).asException()); + } + + @Override + public Mono getEndOffsets(Mono request) { + return liiklusService.getEndOffsets(request) + .onErrorMap(e -> Status.INTERNAL.withCause(e).withDescription(e.getMessage()).asException()); + } + +} diff --git a/app/src/main/java/com/github/bsideup/liiklus/config/GRPCConfiguration.java b/plugins/grpc-transport/src/main/java/com/github/bsideup/liiklus/transport/grpc/config/GRPCConfiguration.java similarity index 65% rename from app/src/main/java/com/github/bsideup/liiklus/config/GRPCConfiguration.java rename to plugins/grpc-transport/src/main/java/com/github/bsideup/liiklus/transport/grpc/config/GRPCConfiguration.java index caf96b17..edbb8bfe 100644 --- a/app/src/main/java/com/github/bsideup/liiklus/config/GRPCConfiguration.java +++ b/plugins/grpc-transport/src/main/java/com/github/bsideup/liiklus/transport/grpc/config/GRPCConfiguration.java @@ -1,8 +1,8 @@ -package com.github.bsideup.liiklus.config; +package com.github.bsideup.liiklus.transport.grpc.config; -import com.github.bsideup.liiklus.service.ReactorLiiklusServiceImpl; +import com.github.bsideup.liiklus.transport.grpc.GRPCLiiklusService; +import com.google.auto.service.AutoService; import io.grpc.*; -import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.netty.NettyServerBuilder; import io.grpc.protobuf.services.ProtoReflectionService; import io.netty.channel.nio.NioEventLoopGroup; @@ -15,6 +15,7 @@ import java.util.concurrent.TimeUnit; +@AutoService(ApplicationContextInitializer.class) public class GRPCConfiguration implements ApplicationContextInitializer { @Override @@ -29,24 +30,20 @@ public void initialize(GenericApplicationContext applicationContext) { var serverProperties = binder.bind("grpc", GRpcServerProperties.class).orElseGet(GRpcServerProperties::new); - applicationContext.registerBean(ReactorLiiklusServiceImpl.class); + if (!serverProperties.isEnabled()) { + return; + } + + applicationContext.registerBean(GRPCLiiklusService.class); applicationContext.registerBean( Server.class, () -> { - ServerBuilder serverBuilder; - - if (serverProperties.isEnabled()) { - serverBuilder = NettyServerBuilder - .forPort(serverProperties.getPort()) - .workerEventLoopGroup(new NioEventLoopGroup(Schedulers.DEFAULT_POOL_SIZE)) - .permitKeepAliveTime(150, TimeUnit.SECONDS) - .permitKeepAliveWithoutCalls(true); - } else { - serverBuilder = InProcessServerBuilder.forName(serverProperties.getInProcessServerName()); - } - - return serverBuilder + var serverBuilder = NettyServerBuilder + .forPort(serverProperties.getPort()) + .workerEventLoopGroup(new NioEventLoopGroup(Schedulers.DEFAULT_POOL_SIZE)) + .permitKeepAliveTime(150, TimeUnit.SECONDS) + .permitKeepAliveWithoutCalls(true) .directExecutor() .intercept(new ServerInterceptor() { @Override @@ -55,9 +52,13 @@ public ServerCall.Listener interceptCall(ServerCall { it.setInitMethodName("start"); @@ -73,8 +74,6 @@ static class GRpcServerProperties { boolean enabled = true; - String inProcessServerName; - } } diff --git a/plugins/metrics/build.gradle b/plugins/metrics/build.gradle new file mode 100644 index 00000000..a0f52edd --- /dev/null +++ b/plugins/metrics/build.gradle @@ -0,0 +1,23 @@ +jar { + manifest { + attributes( + 'Plugin-Id': "${project.name}", + 'Plugin-Version': "${project.version}", + ) + } + + into('lib') { + from(configurations.compile - configurations.compileOnly) + } +} + +dependencies { + compileOnly 'org.projectlombok:lombok' + annotationProcessor 'org.projectlombok:lombok' + compileOnly 'com.google.auto.service:auto-service' + annotationProcessor 'com.google.auto.service:auto-service' + + compileOnly project(":app") + + compile 'io.prometheus:simpleclient_common' +} diff --git a/app/src/main/java/com/github/bsideup/liiklus/monitoring/MetricsCollector.java b/plugins/metrics/src/main/java/com/github/bsideup/liiklus/metrics/MetricsCollector.java similarity index 97% rename from app/src/main/java/com/github/bsideup/liiklus/monitoring/MetricsCollector.java rename to plugins/metrics/src/main/java/com/github/bsideup/liiklus/metrics/MetricsCollector.java index f0236d13..be11e333 100644 --- a/app/src/main/java/com/github/bsideup/liiklus/monitoring/MetricsCollector.java +++ b/plugins/metrics/src/main/java/com/github/bsideup/liiklus/metrics/MetricsCollector.java @@ -1,4 +1,4 @@ -package com.github.bsideup.liiklus.monitoring; +package com.github.bsideup.liiklus.metrics; import com.github.bsideup.liiklus.positions.PositionsStorage; import com.github.bsideup.liiklus.positions.PositionsStorage.Positions; diff --git a/plugins/metrics/src/main/java/com/github/bsideup/liiklus/metrics/config/MetricsConfiguration.java b/plugins/metrics/src/main/java/com/github/bsideup/liiklus/metrics/config/MetricsConfiguration.java new file mode 100644 index 00000000..b8b8d7ca --- /dev/null +++ b/plugins/metrics/src/main/java/com/github/bsideup/liiklus/metrics/config/MetricsConfiguration.java @@ -0,0 +1,54 @@ +package com.github.bsideup.liiklus.metrics.config; + +import com.github.bsideup.liiklus.metrics.MetricsCollector; +import com.google.auto.service.AutoService; +import io.prometheus.client.exporter.common.TextFormat; +import org.springframework.context.ApplicationContextInitializer; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.core.env.Profiles; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.web.reactive.function.server.RouterFunction; +import org.springframework.web.reactive.function.server.RouterFunctions; +import org.springframework.web.reactive.function.server.ServerResponse; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Collections; + +@AutoService(ApplicationContextInitializer.class) +public class MetricsConfiguration implements ApplicationContextInitializer { + + @Override + public void initialize(GenericApplicationContext applicationContext) { + var environment = applicationContext.getEnvironment(); + + if (!environment.acceptsProfiles(Profiles.of("exporter"))) { + return; + } + + applicationContext.registerBean(MetricsCollector.class); + + applicationContext.registerBean("prometheus", RouterFunction.class, () -> { + var metricsCollector = applicationContext.getBean(MetricsCollector.class); + return RouterFunctions.route() + .GET("/prometheus", __ -> { + return metricsCollector.collect() + .collectList() + .flatMap(metrics -> { + try { + var writer = new StringWriter(); + TextFormat.write004(writer, Collections.enumeration(metrics)); + return ServerResponse.ok() + .contentType(MediaType.valueOf(TextFormat.CONTENT_TYPE_004)) + .syncBody(writer.toString()); + } catch (IOException e) { + return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); + } + }); + }) + .build(); + + }); + } +} \ No newline at end of file diff --git a/plugins/rsocket-transport/build.gradle b/plugins/rsocket-transport/build.gradle new file mode 100644 index 00000000..cc779c7e --- /dev/null +++ b/plugins/rsocket-transport/build.gradle @@ -0,0 +1,68 @@ +plugins { + id "idea" +} + +apply plugin: "com.google.protobuf" + +protobuf { + protoc { + artifact = 'com.google.protobuf:protoc' + } + + generatedFilesBaseDir = "$projectDir/generated" + + plugins { + rsocketRpc { + artifact = 'io.rsocket.rpc:rsocket-rpc-protobuf' + } + } + + generateProtoTasks { + ofSourceSet('main').each { task -> + task.builtins { + remove java + } + task.plugins { + rsocketRpc { } + } + } + } +} + +clean { + delete protobuf.generatedFilesBaseDir +} + +idea { + module { + generatedSourceDirs += file("${protobuf.generatedFilesBaseDir}/main/java") + generatedSourceDirs += file("${protobuf.generatedFilesBaseDir}/main/rsocketRpc") + } +} + +jar { + manifest { + attributes( + 'Plugin-Id': "${project.name}", + 'Plugin-Version': "${project.version}", + ) + } + + into('lib') { + from(configurations.compile - configurations.compileOnly) + } +} + +dependencies { + compileOnly 'org.projectlombok:lombok' + annotationProcessor 'org.projectlombok:lombok' + compileOnly 'com.google.auto.service:auto-service' + annotationProcessor 'com.google.auto.service:auto-service' + + protobuf project(":protocol") + + compileOnly project(":app") + + compile 'io.rsocket.rpc:rsocket-rpc-core' + compile 'io.rsocket:rsocket-transport-netty' +} diff --git a/plugins/rsocket-transport/src/main/java/com/github/bsideup/liiklus/transport/rsocket/RSocketLiiklusService.java b/plugins/rsocket-transport/src/main/java/com/github/bsideup/liiklus/transport/rsocket/RSocketLiiklusService.java new file mode 100644 index 00000000..fb102773 --- /dev/null +++ b/plugins/rsocket-transport/src/main/java/com/github/bsideup/liiklus/transport/rsocket/RSocketLiiklusService.java @@ -0,0 +1,58 @@ +package com.github.bsideup.liiklus.transport.rsocket; + +import com.github.bsideup.liiklus.protocol.AckRequest; +import com.github.bsideup.liiklus.protocol.GetEndOffsetsReply; +import com.github.bsideup.liiklus.protocol.GetEndOffsetsRequest; +import com.github.bsideup.liiklus.protocol.GetOffsetsReply; +import com.github.bsideup.liiklus.protocol.GetOffsetsRequest; +import com.github.bsideup.liiklus.protocol.PublishReply; +import com.github.bsideup.liiklus.protocol.PublishRequest; +import com.github.bsideup.liiklus.protocol.ReceiveReply; +import com.github.bsideup.liiklus.protocol.ReceiveRequest; +import com.github.bsideup.liiklus.protocol.SubscribeReply; +import com.github.bsideup.liiklus.protocol.SubscribeRequest; +import com.github.bsideup.liiklus.service.LiiklusService; +import com.google.protobuf.Empty; +import io.netty.buffer.ByteBuf; +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import lombok.experimental.FieldDefaults; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@RequiredArgsConstructor +@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) +public class RSocketLiiklusService implements com.github.bsideup.liiklus.protocol.LiiklusService { + + LiiklusService liiklusService; + + @Override + public Mono publish(PublishRequest message, ByteBuf metadata) { + return liiklusService.publish(Mono.just(message)); + } + + @Override + public Flux subscribe(SubscribeRequest message, ByteBuf metadata) { + return liiklusService.subscribe(Mono.just(message)); + } + + @Override + public Flux receive(ReceiveRequest message, ByteBuf metadata) { + return liiklusService.receive(Mono.just(message)); + } + + @Override + public Mono ack(AckRequest message, ByteBuf metadata) { + return liiklusService.ack(Mono.just(message)); + } + + @Override + public Mono getOffsets(GetOffsetsRequest message, ByteBuf metadata) { + return liiklusService.getOffsets(Mono.just(message)); + } + + @Override + public Mono getEndOffsets(GetEndOffsetsRequest message, ByteBuf metadata) { + return liiklusService.getEndOffsets(Mono.just(message)); + } +} diff --git a/app/src/main/java/com/github/bsideup/liiklus/config/RSocketConfiguration.java b/plugins/rsocket-transport/src/main/java/com/github/bsideup/liiklus/transport/rsocket/config/RSocketConfiguration.java similarity index 84% rename from app/src/main/java/com/github/bsideup/liiklus/config/RSocketConfiguration.java rename to plugins/rsocket-transport/src/main/java/com/github/bsideup/liiklus/transport/rsocket/config/RSocketConfiguration.java index 29b5aac2..1572f942 100644 --- a/app/src/main/java/com/github/bsideup/liiklus/config/RSocketConfiguration.java +++ b/plugins/rsocket-transport/src/main/java/com/github/bsideup/liiklus/transport/rsocket/config/RSocketConfiguration.java @@ -1,7 +1,9 @@ -package com.github.bsideup.liiklus.config; +package com.github.bsideup.liiklus.transport.rsocket.config; +import com.github.bsideup.liiklus.protocol.LiiklusService; import com.github.bsideup.liiklus.protocol.LiiklusServiceServer; -import com.github.bsideup.liiklus.service.ReactorLiiklusServiceImpl; +import com.github.bsideup.liiklus.transport.rsocket.RSocketLiiklusService; +import com.google.auto.service.AutoService; import io.rsocket.RSocketFactory; import io.rsocket.rpc.rsocket.RequestHandlingRSocket; import io.rsocket.transport.netty.server.CloseableChannel; @@ -15,6 +17,7 @@ import java.util.Optional; +@AutoService(ApplicationContextInitializer.class) public class RSocketConfiguration implements ApplicationContextInitializer { @Override @@ -33,10 +36,13 @@ public void initialize(GenericApplicationContext applicationContext) { return; } + applicationContext.registerBean(RSocketLiiklusService.class); + applicationContext.registerBean( CloseableChannel.class, () -> { - var liiklusService = applicationContext.getBean(ReactorLiiklusServiceImpl.class); + var liiklusService = applicationContext.getBean(LiiklusService.class); + return RSocketFactory.receive() .acceptor((setup, sendingSocket) -> Mono.just(new RequestHandlingRSocket(new LiiklusServiceServer(liiklusService, Optional.empty(), Optional.empty())))) .transport(TcpServerTransport.create(serverProperties.getHost(), serverProperties.getPort())) diff --git a/protocol/build.gradle b/protocol/build.gradle index 25aaa94b..69af7221 100644 --- a/protocol/build.gradle +++ b/protocol/build.gradle @@ -12,28 +12,6 @@ protobuf { } generatedFilesBaseDir = "$projectDir/generated" - - plugins { - grpc { - artifact = 'io.grpc:protoc-gen-grpc-java' - } - - reactor { - artifact = "com.salesforce.servicelibs:reactor-grpc:0.9.0:jdk8@jar" - } - - rsocketRpc { - artifact = 'io.rsocket.rpc:rsocket-rpc-protobuf' - } - } - - generateProtoTasks { - ofSourceSet('main')*.plugins { - grpc { } - reactor { } - rsocketRpc { } - } - } } clean { @@ -43,17 +21,9 @@ clean { idea { module { generatedSourceDirs += file("${protobuf.generatedFilesBaseDir}/main/java") - generatedSourceDirs += file("${protobuf.generatedFilesBaseDir}/main/reactor") - generatedSourceDirs += file("${protobuf.generatedFilesBaseDir}/main/grpc") - generatedSourceDirs += file("${protobuf.generatedFilesBaseDir}/main/rsocketRpc") } } dependencies { compile 'com.google.protobuf:protobuf-java' - - compile 'io.grpc:grpc-stub' - compile 'io.grpc:grpc-protobuf' - compile 'com.salesforce.servicelibs:reactor-grpc-stub' - compile 'io.rsocket.rpc:rsocket-rpc-core' } \ No newline at end of file