Skip to content

Commit

Permalink
feat: initial Java database work
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Aug 16, 2024
1 parent ef3e3dd commit 25f26fa
Show file tree
Hide file tree
Showing 15 changed files with 239 additions and 73 deletions.
4 changes: 3 additions & 1 deletion buildengine/build_java.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ func buildJavaModule(ctx context.Context, module Module) error {
logger.Warnf("unable to update ftl.version in %s: %s", module.Config.Dir, err.Error())
}
logger.Infof("Using build command '%s'", module.Config.Build)
err := exec.Command(ctx, log.Debug, module.Config.Dir, "bash", "-c", module.Config.Build).RunBuffered(ctx)
command := exec.Command(ctx, log.Debug, module.Config.Dir, "bash", "-c", module.Config.Build)
command.Env = append(command.Env, "FTL_MODULE_NAME="+module.Config.Module)
err := command.RunBuffered(ctx)
if err != nil {
return fmt.Errorf("failed to build module %q: %w", module.Config.Module, err)
}
Expand Down
8 changes: 8 additions & 0 deletions jvm-runtime/ftl-runtime/common/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-jackson-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-agroal-spi</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-credentials-deployment</artifactId>
</dependency>

<dependency>
<groupId>xyz.block.ftl</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package xyz.block.ftl.deployment;

import java.util.Optional;

import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigRoot;

@ConfigRoot(name = "ftl")
public class FTLBuildTimeConfig {

/**
* The FTL module name, should be set automatically during build
*/
@ConfigItem
public Optional<String> moduleName;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import io.quarkus.agroal.spi.JdbcDataSourceBuildItem;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.processor.DotNames;
import io.quarkus.deployment.Capabilities;
Expand All @@ -52,6 +53,7 @@
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.LaunchModeBuildItem;
import io.quarkus.deployment.builditem.RunTimeConfigBuilderBuildItem;
import io.quarkus.deployment.builditem.ShutdownContextBuildItem;
import io.quarkus.deployment.builditem.SystemPropertyBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
Expand All @@ -76,7 +78,6 @@
import xyz.block.ftl.Subscription;
import xyz.block.ftl.Verb;
import xyz.block.ftl.VerbName;
import xyz.block.ftl.runtime.FTLController;
import xyz.block.ftl.runtime.FTLHttpHandler;
import xyz.block.ftl.runtime.FTLRecorder;
import xyz.block.ftl.runtime.JsonSerializationConfig;
Expand All @@ -86,11 +87,13 @@
import xyz.block.ftl.runtime.VerbRegistry;
import xyz.block.ftl.runtime.builtin.HttpRequest;
import xyz.block.ftl.runtime.builtin.HttpResponse;
import xyz.block.ftl.runtime.config.FTLConfigSourceFactoryBuilder;
import xyz.block.ftl.v1.CallRequest;
import xyz.block.ftl.v1.schema.Array;
import xyz.block.ftl.v1.schema.Bool;
import xyz.block.ftl.v1.schema.Bytes;
import xyz.block.ftl.v1.schema.Data;
import xyz.block.ftl.v1.schema.Database;
import xyz.block.ftl.v1.schema.Decl;
import xyz.block.ftl.v1.schema.Field;
import xyz.block.ftl.v1.schema.Float;
Expand Down Expand Up @@ -133,9 +136,13 @@ class FtlProcessor {
public static final DotName NOT_NULL = DotName.createSimple(NotNull.class);

@BuildStep
ModuleNameBuildItem moduleName(ApplicationInfoBuildItem applicationInfoBuildItem) {
return new ModuleNameBuildItem(applicationInfoBuildItem.getName());
ModuleNameBuildItem moduleName(ApplicationInfoBuildItem applicationInfoBuildItem, FTLBuildTimeConfig buildTimeConfig) {
return new ModuleNameBuildItem(buildTimeConfig.moduleName.orElse(applicationInfoBuildItem.getName()));
}

@BuildStep
RunTimeConfigBuilderBuildItem runTimeConfigBuilderBuildItem() {
return new RunTimeConfigBuilderBuildItem(FTLConfigSourceFactoryBuilder.class.getName());
}

@BuildStep
Expand All @@ -159,7 +166,7 @@ BindableServiceBuildItem verbService() {
AdditionalBeanBuildItem beans() {
return AdditionalBeanBuildItem.builder()
.addBeanClasses(VerbHandler.class,
VerbRegistry.class, FTLHttpHandler.class, FTLController.class,
VerbRegistry.class, FTLHttpHandler.class,
TopicHelper.class, VerbClientHelper.class, JsonSerializationConfig.class)
.setUnremovable().build();
}
Expand Down Expand Up @@ -223,7 +230,8 @@ public void registerVerbs(CombinedIndexBuildItem index,
TopicsBuildItem topics,
VerbClientBuildItem verbClients,
ModuleNameBuildItem moduleNameBuildItem,
SubscriptionMetaAnnotationsBuildItem subscriptionMetaAnnotationsBuildItem) throws Exception {
SubscriptionMetaAnnotationsBuildItem subscriptionMetaAnnotationsBuildItem,
List<JdbcDataSourceBuildItem> datasources) throws Exception {
String moduleName = moduleNameBuildItem.getModuleName();
Module.Builder moduleBuilder = Module.newBuilder()
.setName(moduleName)
Expand All @@ -233,6 +241,19 @@ public void registerVerbs(CombinedIndexBuildItem index,
new HashSet<>(), new HashSet<>(), topics.getTopics(), verbClients.getVerbClients());
var beans = AdditionalBeanBuildItem.builder().setUnremovable();

//postgres datasources
//this very new, needs a lot of cleanup
for (var ds : datasources) {
if (!ds.isDefault()) {
throw new RuntimeException("only the default datasource is supported");
}
if (!ds.getDbKind().equals("postgresql")) {
throw new RuntimeException("only postgresql is supported not " + ds.getDbKind());
}
moduleBuilder.addDecls(
Decl.newBuilder().setDatabase(Database.newBuilder().setType("postgres").setName("default")).build());
}

//register all the topics we are defining in the module definition

for (var topic : topics.getTopics().values()) {
Expand Down
4 changes: 4 additions & 0 deletions jvm-runtime/ftl-runtime/common/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-credentials</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,18 @@
import java.net.URI;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;

import jakarta.annotation.PreDestroy;
import jakarta.inject.Singleton;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;

import com.google.protobuf.ByteString;

import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import io.quarkus.runtime.Startup;
import xyz.block.ftl.LeaseClient;
import xyz.block.ftl.LeaseFailedException;
import xyz.block.ftl.LeaseHandle;
Expand All @@ -31,61 +29,41 @@
import xyz.block.ftl.v1.VerbServiceGrpc;
import xyz.block.ftl.v1.schema.Ref;

@Singleton
@Startup
public class FTLController implements LeaseClient {
private static final Logger log = Logger.getLogger(FTLController.class);
final String moduleName;

private Throwable currentError;
private volatile ModuleContextResponse moduleContextResponse;
private boolean waiters = false;
private volatile boolean closed = false;

final VerbServiceGrpc.VerbServiceStub verbService;
final StreamObserver<ModuleContextResponse> moduleObserver = new StreamObserver<>() {
@Override
public void onNext(ModuleContextResponse moduleContextResponse) {
synchronized (this) {
currentError = null;
FTLController.this.moduleContextResponse = moduleContextResponse;
if (waiters) {
this.notifyAll();
waiters = false;
}
}
final StreamObserver<ModuleContextResponse> moduleObserver = new ModuleObserver();

}
private static volatile FTLController controller;

@Override
public void onError(Throwable throwable) {
log.error("GRPC connection error", throwable);
synchronized (this) {
currentError = throwable;
if (waiters) {
this.notifyAll();
waiters = false;
/**
* TODO: look at how init should work, this is terrible and will break dev mode
*/
public static FTLController instance() {
if (controller == null) {
synchronized (FTLController.class) {
if (controller == null) {
controller = new FTLController();
}
}
if (!closed) {
verbService.getModuleContext(ModuleContextRequest.newBuilder().setModule(moduleName).build(), moduleObserver);
}
}

@Override
public void onCompleted() {
onError(new RuntimeException("connection closed"));
}
};

@PreDestroy
void shutdown() {

return controller;
}

public FTLController(@ConfigProperty(name = "ftl.endpoint", defaultValue = "http://localhost:8892") URI uri,
@ConfigProperty(name = "ftl.module.name") String moduleName) {
this.moduleName = moduleName;
FTLController() {
String endpoint = System.getenv("FTL_ENDPOINT");
String testEndpoint = System.getProperty("ftl.test.endpoint"); //set by the test framework
if (testEndpoint != null) {
endpoint = testEndpoint;
}
var uri = URI.create(endpoint);
this.moduleName = System.getProperty("ftl.module.name");
var channelBuilder = ManagedChannelBuilder.forAddress(uri.getHost(), uri.getPort());
if (uri.getScheme().equals("http")) {
channelBuilder.usePlaintext();
Expand All @@ -111,6 +89,16 @@ public byte[] getConfig(String secretName) {
throw new RuntimeException("Config not found: " + secretName);
}

public Datasource getDatasource(String name) {
List<ModuleContextResponse.DSN> databasesList = getModuleContext().getDatabasesList();
for (var i : databasesList) {
if (i.getName().equals(name)) {
return Datasource.fromDSN(i.getDsn());
}
}
throw new RuntimeException("no datasource with name " + name);
}

public byte[] callVerb(String name, String module, byte[] payload) {
CompletableFuture<byte[]> cf = new CompletableFuture<>();

Expand Down Expand Up @@ -234,4 +222,63 @@ private ModuleContextResponse getModuleContext() {
}
}

private class ModuleObserver implements StreamObserver<ModuleContextResponse> {

final AtomicInteger failCount = new AtomicInteger();

@Override
public void onNext(ModuleContextResponse moduleContextResponse) {
synchronized (this) {
currentError = null;
FTLController.this.moduleContextResponse = moduleContextResponse;
if (waiters) {
this.notifyAll();
waiters = false;
}
}

}

@Override
public void onError(Throwable throwable) {
log.error("GRPC connection error", throwable);
synchronized (this) {
currentError = throwable;
if (waiters) {
this.notifyAll();
waiters = false;
}
}
if (failCount.incrementAndGet() < 5) {
verbService.getModuleContext(ModuleContextRequest.newBuilder().setModule(moduleName).build(), moduleObserver);
}
}

@Override
public void onCompleted() {
onError(new RuntimeException("connection closed"));
}
}

public record Datasource(String connectionString, String username, String password) {

public static Datasource fromDSN(String dsn) {
//TODO: this is horrible, just quick hack for now
var matcher = Pattern.compile("[&?]user=([^?]*)").matcher(dsn);
String username = "";
if (matcher.find()) {
username = matcher.group(1);
}
dsn = matcher.replaceAll("");
matcher = Pattern.compile("[&?]password=([^?]*)").matcher(dsn);
String password = "";
if (matcher.find()) {
password = matcher.group(1);
}
dsn = matcher.replaceAll("");
dsn = dsn.replaceAll("postgresql://", "jdbc:postgresql://");
return new Datasource(dsn, username, password);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package xyz.block.ftl.runtime;

import java.util.Map;

import jakarta.inject.Singleton;

import io.quarkus.credentials.CredentialsProvider;

@Singleton
public class FTLDatasourceCredentials implements CredentialsProvider {

final FTLController controller;

public FTLDatasourceCredentials(FTLController controller) {
this.controller = controller;
}

@Override
public Map<String, String> getCredentials(String credentialsProviderName) {
var db = controller.getDatasource(credentialsProviderName);

return Map.of();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,15 @@
@Singleton
public class TopicHelper {

final FTLController controller;
final ObjectMapper mapper;

public TopicHelper(FTLController controller, ObjectMapper mapper) {
this.controller = controller;
public TopicHelper(ObjectMapper mapper) {
this.mapper = mapper;
}

public void publish(String topic, String verb, Object message) {
try {
controller.publishEvent(topic, verb, mapper.writeValueAsBytes(message));
FTLController.instance().publishEvent(topic, verb, mapper.writeValueAsBytes(message));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@
@Singleton
public class VerbClientHelper {

final FTLController controller;
final ObjectMapper mapper;

public VerbClientHelper(FTLController controller, ObjectMapper mapper) {
this.controller = controller;
public VerbClientHelper(ObjectMapper mapper) {
this.mapper = mapper;
}

Expand All @@ -27,7 +25,7 @@ public Object call(String verb, String module, Object message, Class<?> returnTy
//TODO: what about optional?
message = Map.of();
}
var result = controller.callVerb(verb, module, mapper.writeValueAsBytes(message));
var result = FTLController.instance().callVerb(verb, module, mapper.writeValueAsBytes(message));
if (listReturnType) {
return mapper.readerForArrayOf(returnType).readValue(result);
} else if (mapReturnType) {
Expand Down
Loading

0 comments on commit 25f26fa

Please sign in to comment.