diff --git a/langstream-agents/langstream-ai-agents/pom.xml b/langstream-agents/langstream-ai-agents/pom.xml index b8d9474ba..0fc60bd9b 100644 --- a/langstream-agents/langstream-ai-agents/pom.xml +++ b/langstream-agents/langstream-ai-agents/pom.xml @@ -66,7 +66,17 @@ com.datastax.astra astra-sdk-devops - 0.6.9 + 1.0 + + + com.datastax.astra + astra-db-client + 1.0 + + + com.datastax.astra + astra-sdk + 1.0 io.netty.incubator diff --git a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/datasource/impl/AstraDataSource.java b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/datasource/impl/CassandraDataSourceProvider.java similarity index 94% rename from langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/datasource/impl/AstraDataSource.java rename to langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/datasource/impl/CassandraDataSourceProvider.java index 70fcfb0b7..99cbce9ab 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/datasource/impl/AstraDataSource.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/datasource/impl/CassandraDataSourceProvider.java @@ -20,7 +20,7 @@ import com.datastax.oss.streaming.ai.datasource.QueryStepDataSource; import java.util.Map; -public class AstraDataSource implements DataSourceProvider { +public class CassandraDataSourceProvider implements DataSourceProvider { @Override public boolean supports(Map dataSourceConfig) { diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/datasource/CassandraDataSource.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/datasource/CassandraDataSource.java index a3d2808e2..61e667765 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/datasource/CassandraDataSource.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/datasource/CassandraDataSource.java @@ -31,9 +31,9 @@ import com.datastax.oss.driver.api.core.type.reflect.GenericType; import com.datastax.oss.driver.internal.core.type.codec.CqlVectorCodec; import com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry; -import com.dtsx.astra.sdk.db.AstraDbClient; -import com.dtsx.astra.sdk.db.DatabaseClient; -import com.dtsx.astra.sdk.utils.ApiLocator; +import com.dtsx.astra.sdk.db.AstraDBOpsClient; +import com.dtsx.astra.sdk.db.DbOpsClient; +import com.dtsx.astra.sdk.utils.AstraEnvironment; import edu.umd.cs.findbugs.annotations.Nullable; import java.io.ByteArrayInputStream; import java.net.InetSocketAddress; @@ -241,13 +241,13 @@ private CqlSession buildCqlSession(Map dataSourceConfig) { log.info( "Automatically downloading the secure bundle for database name {} from AstraDB", astraDatabase); - DatabaseClient databaseClient = this.buildAstraClient(); + DbOpsClient databaseClient = this.buildAstraClient(); secureBundleDecoded = downloadSecureBundle(databaseClient); } else if (!astraDatabaseId.isEmpty() && !astraToken.isEmpty()) { log.info( "Automatically downloading the secure bundle for database id {} from AstraDB", astraDatabaseId); - DatabaseClient databaseClient = this.buildAstraClient(); + DbOpsClient databaseClient = this.buildAstraClient(); secureBundleDecoded = downloadSecureBundle(databaseClient); } else { log.info("No secure bundle provided, using the default CQL driver for Cassandra"); @@ -281,11 +281,11 @@ public CqlSession getSession() { return session; } - public DatabaseClient buildAstraClient() { + public DbOpsClient buildAstraClient() { return buildAstraClient(astraToken, astraDatabase, astraDatabaseId, astraEnvironment); } - public static DatabaseClient buildAstraClient( + public static DbOpsClient buildAstraClient( String astraToken, String astraDatabase, String astraDatabaseId, @@ -293,9 +293,8 @@ public static DatabaseClient buildAstraClient( if (astraToken.isEmpty()) { throw new IllegalArgumentException("You must configure the AstraDB token"); } - AstraDbClient astraDbClient = - new AstraDbClient( - astraToken, ApiLocator.AstraEnvironment.valueOf(astraEnvironment)); + AstraDBOpsClient astraDbClient = + new AstraDBOpsClient(astraToken, AstraEnvironment.valueOf(astraEnvironment)); if (!astraDatabase.isEmpty()) { return astraDbClient.databaseByName(astraDatabase); } else if (!astraDatabaseId.isEmpty()) { @@ -306,7 +305,7 @@ public static DatabaseClient buildAstraClient( } } - public static byte[] downloadSecureBundle(DatabaseClient databaseClient) { + public static byte[] downloadSecureBundle(DbOpsClient databaseClient) { long start = System.currentTimeMillis(); byte[] secureBundleDecoded = databaseClient.downloadDefaultSecureConnectBundle(); long delta = System.currentTimeMillis() - start; diff --git a/langstream-agents/langstream-ai-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider b/langstream-agents/langstream-ai-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider index 3c023fedb..233a8ee8a 100644 --- a/langstream-agents/langstream-ai-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider +++ b/langstream-agents/langstream-ai-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider @@ -1,2 +1,2 @@ -ai.langstream.ai.agents.datasource.impl.AstraDataSource +ai.langstream.ai.agents.datasource.impl.CassandraDataSourceProvider ai.langstream.ai.agents.datasource.impl.JdbcDataSourceProvider \ No newline at end of file diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraCollectionsAssetsManagerProvider.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraCollectionsAssetsManagerProvider.java new file mode 100644 index 000000000..c915b4ab3 --- /dev/null +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraCollectionsAssetsManagerProvider.java @@ -0,0 +1,117 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.vector.astra; + +import ai.langstream.api.model.AssetDefinition; +import ai.langstream.api.runner.assets.AssetManager; +import ai.langstream.api.runner.assets.AssetManagerProvider; +import ai.langstream.api.util.ConfigurationUtils; +import io.stargate.sdk.doc.exception.CollectionNotFoundException; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class AstraCollectionsAssetsManagerProvider implements AssetManagerProvider { + + @Override + public boolean supports(String assetType) { + return "astra-collection".equals(assetType); + } + + @Override + public AssetManager createInstance(String assetType) { + + switch (assetType) { + case "astra-collection": + return new AstraDBCollectionAssetManager(); + default: + throw new IllegalArgumentException(); + } + } + + private abstract static class BaseAstraAssetManager implements AssetManager { + + AstraCollectionsDataSource datasource; + AssetDefinition assetDefinition; + + @Override + public void initialize(AssetDefinition assetDefinition) { + this.datasource = buildDataSource(assetDefinition); + this.assetDefinition = assetDefinition; + } + + @Override + public void close() throws Exception { + if (datasource != null) { + datasource.close(); + } + } + } + + private static class AstraDBCollectionAssetManager extends BaseAstraAssetManager { + + @Override + public boolean assetExists() throws Exception { + String collection = getCollection(); + log.info("Checking if collection {} exists", collection); + return datasource.getAstraDB().isCollectionExists(collection); + } + + @Override + public void deployAsset() throws Exception { + int vectorDimension = getVectorDimension(); + + String collection = getCollection(); + log.info("Create collection {} with vector dimension {}", collection, vectorDimension); + datasource.getAstraDB().createCollection(collection, vectorDimension); + } + + private String getCollection() { + return ConfigurationUtils.getString("collection", null, assetDefinition.getConfig()); + } + + private int getVectorDimension() { + return ConfigurationUtils.getInt("vector-dimension", 1536, assetDefinition.getConfig()); + } + + @Override + public boolean deleteAssetIfExists() throws Exception { + String collection = getCollection(); + + log.info("Deleting collection {}", collection); + + try { + datasource.getAstraDB().deleteCollection(collection); + return true; + } catch (CollectionNotFoundException e) { + log.info( + "collection does not exist, maybe it was deleted by another agent ({})", + e.toString()); + return false; + } + } + } + + private static AstraCollectionsDataSource buildDataSource(AssetDefinition assetDefinition) { + AstraCollectionsDataSource dataSource = new AstraCollectionsDataSource(); + Map datasourceDefinition = + ConfigurationUtils.getMap("datasource", Map.of(), assetDefinition.getConfig()); + Map configuration = + ConfigurationUtils.getMap("configuration", Map.of(), datasourceDefinition); + dataSource.initialize(configuration); + return dataSource; + } +} diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraCollectionsDataSource.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraCollectionsDataSource.java new file mode 100644 index 000000000..1ed0caee2 --- /dev/null +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraCollectionsDataSource.java @@ -0,0 +1,76 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.vector.astra; + +import ai.langstream.api.util.ConfigurationUtils; +import com.datastax.oss.streaming.ai.datasource.QueryStepDataSource; +import com.dtsx.astra.sdk.AstraDB; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class AstraCollectionsDataSource implements QueryStepDataSource { + + AstraDB astraDB; + + @Override + public void initialize(Map dataSourceConfig) { + log.info( + "Initializing CassandraDataSource with config {}", + ConfigurationUtils.redactSecrets(dataSourceConfig)); + String astraToken = ConfigurationUtils.getString("token", "", dataSourceConfig); + String astraEndpoint = ConfigurationUtils.getString("endpoint", "", dataSourceConfig); + this.astraDB = new AstraDB(astraToken, astraEndpoint); + } + + @Override + public void close() {} + + @Override + public List> fetchData(String query, List params) { + if (log.isDebugEnabled()) { + log.debug( + "Executing query {} with params {} ({})", + query, + params, + params.stream() + .map(v -> v == null ? "null" : v.getClass().toString()) + .collect(Collectors.joining(","))); + } + throw new UnsupportedOperationException(); + } + + @Override + public Map executeStatement( + String query, List generatedKeys, List params) { + if (log.isDebugEnabled()) { + log.debug( + "Executing statement {} with params {} ({})", + query, + params, + params.stream() + .map(v -> v == null ? "null" : v.getClass().toString()) + .collect(Collectors.joining(","))); + } + throw new UnsupportedOperationException(); + } + + public AstraDB getAstraDB() { + return astraDB; + } +} diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraCollectionsDataSourceProvider.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraCollectionsDataSourceProvider.java new file mode 100644 index 000000000..8971b8162 --- /dev/null +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraCollectionsDataSourceProvider.java @@ -0,0 +1,37 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.vector.astra; + +import ai.langstream.ai.agents.datasource.DataSourceProvider; +import com.datastax.oss.streaming.ai.datasource.QueryStepDataSource; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class AstraCollectionsDataSourceProvider implements DataSourceProvider { + + @Override + public boolean supports(Map dataSourceConfig) { + String service = (String) dataSourceConfig.get("service"); + return "astra-collections".equals(service); + } + + @Override + public QueryStepDataSource createDataSourceImplementation( + Map dataSourceConfig) { + return new AstraCollectionsDataSource(); + } +} diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraCollectionsWriter.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraCollectionsWriter.java new file mode 100644 index 000000000..366dbadf3 --- /dev/null +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/astra/AstraCollectionsWriter.java @@ -0,0 +1,57 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.vector.astra; + +import ai.langstream.api.database.VectorDatabaseWriter; +import ai.langstream.api.database.VectorDatabaseWriterProvider; +import ai.langstream.api.runner.code.Record; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class AstraCollectionsWriter implements VectorDatabaseWriterProvider { + + @Override + public boolean supports(Map dataSourceConfig) { + return "astra-collections".equals(dataSourceConfig.get("service")); + } + + @Override + public VectorDatabaseWriter createImplementation(Map datasourceConfig) { + return new AstraCollectionsDatabaseWriter(datasourceConfig); + } + + private static class AstraCollectionsDatabaseWriter implements VectorDatabaseWriter { + + private final Map datasourceConfig; + + public AstraCollectionsDatabaseWriter(Map datasourceConfig) { + this.datasourceConfig = datasourceConfig; + } + + @Override + public void initialise(Map agentConfiguration) {} + + @Override + public CompletableFuture upsert(Record record, Map context) { + return CompletableFuture.failedFuture(new UnsupportedOperationException()); + } + + @Override + public void close() {} + } +} diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraAssetsManagerProvider.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraAssetsManagerProvider.java index a7d2651fe..116502749 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraAssetsManagerProvider.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraAssetsManagerProvider.java @@ -25,7 +25,7 @@ import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; import com.datastax.oss.driver.api.core.servererrors.AlreadyExistsException; import com.datastax.oss.streaming.ai.datasource.CassandraDataSource; -import com.dtsx.astra.sdk.db.DatabaseClient; +import com.dtsx.astra.sdk.db.DbOpsClient; import java.util.List; import java.util.Map; import java.util.Optional; @@ -211,7 +211,7 @@ private static class AstraDBKeyspaceAssetManager extends BaseCassandraAssetManag public boolean assetExists() throws Exception { String keySpace = getKeyspace(); log.info("Checking if keyspace {} exists", keySpace); - DatabaseClient astraDbClient = datasource.buildAstraClient(); + DbOpsClient astraDbClient = datasource.buildAstraClient(); boolean exist = astraDbClient.keyspaces().exist(keySpace); log.info("Result: {}", exist); return exist; @@ -220,7 +220,7 @@ public boolean assetExists() throws Exception { @Override public void deployAsset() throws Exception { String keySpace = getKeyspace(); - DatabaseClient astraDbClient = datasource.buildAstraClient(); + DbOpsClient astraDbClient = datasource.buildAstraClient(); try { astraDbClient.keyspaces().create(keySpace); } catch (com.dtsx.astra.sdk.db.exception.KeyspaceAlreadyExistException e) { @@ -250,7 +250,7 @@ public boolean deleteAssetIfExists() throws Exception { String keySpace = getKeyspace(); log.info("Deleting keyspace {}", keySpace); - DatabaseClient astraDbClient = datasource.buildAstraClient(); + DbOpsClient astraDbClient = datasource.buildAstraClient(); try { astraDbClient.keyspaces().delete(keySpace); return true; diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraWriter.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraWriter.java index b7508d45d..89e21afce 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraWriter.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/cassandra/CassandraWriter.java @@ -27,7 +27,7 @@ import com.datastax.oss.common.sink.config.CassandraSinkConfig; import com.datastax.oss.common.sink.util.SinkUtil; import com.datastax.oss.streaming.ai.datasource.CassandraDataSource; -import com.dtsx.astra.sdk.db.DatabaseClient; +import com.dtsx.astra.sdk.db.DbOpsClient; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.HashMap; @@ -121,7 +121,7 @@ public void initialise(Map agentConfiguration) { "environment", "PROD", datasource); if (!token.isEmpty() && (!database.isEmpty() || !databaseId.isEmpty())) { - DatabaseClient databaseClient = + DbOpsClient databaseClient = CassandraDataSource.buildAstraClient( token, database, diff --git a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider index 81ead541b..e227398ea 100644 --- a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider +++ b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider @@ -1,4 +1,5 @@ ai.langstream.agents.vector.pinecone.PineconeDataSource ai.langstream.agents.vector.milvus.MilvusDataSource ai.langstream.agents.vector.solr.SolrDataSource -ai.langstream.agents.vector.opensearch.OpenSearchDataSource \ No newline at end of file +ai.langstream.agents.vector.opensearch.OpenSearchDataSource +ai.langstream.agents.vector.astra.AstraCollectionsDataSourceProvider \ No newline at end of file diff --git a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.runner.assets.AssetManagerProvider b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.runner.assets.AssetManagerProvider index 45925c4fa..fd6eb76be 100644 --- a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.runner.assets.AssetManagerProvider +++ b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.runner.assets.AssetManagerProvider @@ -2,4 +2,5 @@ ai.langstream.agents.vector.cassandra.CassandraAssetsManagerProvider ai.langstream.agents.vector.milvus.MilvusAssetsManagerProvider ai.langstream.agents.vector.jdbc.JdbcAssetsManagerProvider ai.langstream.agents.vector.solr.SolrAssetsManagerProvider -ai.langstream.agents.vector.opensearch.OpenSearchAssetsManagerProvider \ No newline at end of file +ai.langstream.agents.vector.opensearch.OpenSearchAssetsManagerProvider +ai.langstream.agents.vector.astra.AstraCollectionsAssetsManagerProvider \ No newline at end of file