From 1deef92ee8f48dfec991d93f9fd8f8081959dcb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Mon, 9 Oct 2023 14:22:32 +0200 Subject: [PATCH] Add Azure blob storage source (#546) --- .../pom.xml | 115 ++++++++ .../source/AzureBlobStorageSource.java | 261 ++++++++++++++++++ .../AzureBlobStorageSourceCodeProvider.java | 31 +++ .../META-INF/ai.langstream.agents.index | 1 + ...ngstream.api.runner.code.AgentCodeProvider | 1 + .../source/AzureBlobStorageSourceTest.java | 48 ++++ .../source}/HttpRequestAgent.java | 2 +- .../source}/HttpRequestAgentProvider.java | 2 +- ...ngstream.api.runner.code.AgentCodeProvider | 2 +- langstream-agents/pom.xml | 1 + .../ObjectStorageSourceAgentProvider.java | 209 ++++++++++++++ .../k8s/agents/S3SourceAgentProvider.java | 119 -------- ...i.langstream.api.runtime.AgentNodeProvider | 2 +- .../KafkaConnectAgentsProviderTest.java | 76 ++--- .../QueryVectorDBAgentProviderTest.java | 112 ++++---- .../k8s/agents/S3SourceAgentProviderTest.java | 147 ++++++---- .../langstream-runtime-impl/pom.xml | 17 ++ 17 files changed, 873 insertions(+), 273 deletions(-) create mode 100644 langstream-agents/langstream-agent-azure-blob-storage-source/pom.xml create mode 100644 langstream-agents/langstream-agent-azure-blob-storage-source/src/main/java/ai/langstream/agents/azureblobstorage/source/AzureBlobStorageSource.java create mode 100644 langstream-agents/langstream-agent-azure-blob-storage-source/src/main/java/ai/langstream/agents/azureblobstorage/source/AzureBlobStorageSourceCodeProvider.java create mode 100644 langstream-agents/langstream-agent-azure-blob-storage-source/src/main/resources/META-INF/ai.langstream.agents.index create mode 100644 langstream-agents/langstream-agent-azure-blob-storage-source/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider create mode 100644 langstream-agents/langstream-agent-azure-blob-storage-source/src/test/java/ai/langstream/agents/azureblobstorage/source/AzureBlobStorageSourceTest.java rename langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/{queryhttp => azureblobstorage/source}/HttpRequestAgent.java (99%) rename langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/{queryhttp => azureblobstorage/source}/HttpRequestAgentProvider.java (95%) create mode 100644 langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/ObjectStorageSourceAgentProvider.java delete mode 100644 langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/S3SourceAgentProvider.java diff --git a/langstream-agents/langstream-agent-azure-blob-storage-source/pom.xml b/langstream-agents/langstream-agent-azure-blob-storage-source/pom.xml new file mode 100644 index 000000000..207c8cfab --- /dev/null +++ b/langstream-agents/langstream-agent-azure-blob-storage-source/pom.xml @@ -0,0 +1,115 @@ + + + + + langstream-agents + ai.langstream + 0.1.1-SNAPSHOT + + 4.0.0 + + + langstream-agent-azure-blob-storage-source + + + + com.azure + azure-sdk-bom + ${azure-sdk-bom.version} + pom + import + + + + + + + ${project.groupId} + langstream-api + ${project.version} + provided + + + ${project.groupId} + langstream-agents-commons + ${project.version} + + + org.projectlombok + lombok + provided + + + org.slf4j + slf4j-api + + + com.azure + azure-storage-blob + + + com.azure + azure-core-http-netty + + + + + com.azure + azure-core-http-okhttp + + + ch.qos.logback + logback-core + test + + + ch.qos.logback + logback-classic + test + + + org.junit.jupiter + junit-jupiter + test + + + + + + org.apache.nifi + nifi-nar-maven-plugin + true + + nar + + + + default-nar + package + + nar + + + + + + + \ No newline at end of file diff --git a/langstream-agents/langstream-agent-azure-blob-storage-source/src/main/java/ai/langstream/agents/azureblobstorage/source/AzureBlobStorageSource.java b/langstream-agents/langstream-agent-azure-blob-storage-source/src/main/java/ai/langstream/agents/azureblobstorage/source/AzureBlobStorageSource.java new file mode 100644 index 000000000..87087a00d --- /dev/null +++ b/langstream-agents/langstream-agent-azure-blob-storage-source/src/main/java/ai/langstream/agents/azureblobstorage/source/AzureBlobStorageSource.java @@ -0,0 +1,261 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.azureblobstorage.source; + +import ai.langstream.api.runner.code.AbstractAgentCode; +import ai.langstream.api.runner.code.AgentSource; +import ai.langstream.api.runner.code.Header; +import ai.langstream.api.runner.code.Record; +import ai.langstream.api.util.ConfigurationUtils; +import com.azure.core.http.rest.PagedIterable; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobContainerClientBuilder; +import com.azure.storage.blob.models.BlobItem; +import com.azure.storage.common.StorageSharedKeyCredential; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import lombok.AllArgsConstructor; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class AzureBlobStorageSource extends AbstractAgentCode implements AgentSource { + private BlobContainerClient client; + private final Set blobsToCommit = ConcurrentHashMap.newKeySet(); + private int idleTime; + + public static final String ALL_FILES = "*"; + public static final String DEFAULT_EXTENSIONS_FILTER = "pdf,docx,html,htm,md,txt"; + private Set extensions = Set.of(); + + static BlobContainerClient createContainerClient(Map configuration) { + return createContainerClient( + ConfigurationUtils.getString("container", "langstream-azure-source", configuration), + ConfigurationUtils.requiredNonEmptyField( + configuration, "endpoint", () -> "azure blob storage source"), + ConfigurationUtils.getString("sas-token", null, configuration), + ConfigurationUtils.getString("storage-account-name", null, configuration), + ConfigurationUtils.getString("storage-account-key", null, configuration), + ConfigurationUtils.getString( + "storage-account-connection-string", null, configuration)); + } + + static BlobContainerClient createContainerClient( + String container, + String endpoint, + String sasToken, + String storageAccountName, + String storageAccountKey, + String storageAccountConnectionString) { + + BlobContainerClientBuilder containerClientBuilder = new BlobContainerClientBuilder(); + if (sasToken != null) { + containerClientBuilder.sasToken(sasToken); + log.info("Connecting to Azure at {} with SAS token", endpoint); + } else if (storageAccountName != null) { + containerClientBuilder.credential( + new StorageSharedKeyCredential(storageAccountName, storageAccountKey)); + log.info( + "Connecting to Azure at {} with account name {}", endpoint, storageAccountName); + } else if (storageAccountConnectionString != null) { + log.info("Connecting to Azure at {} with connection string", endpoint); + containerClientBuilder.credential( + StorageSharedKeyCredential.fromConnectionString( + storageAccountConnectionString)); + } else { + throw new IllegalArgumentException( + "Either sas-token, account-name/account-key or account-connection-string must be provided"); + } + + containerClientBuilder.endpoint(endpoint); + containerClientBuilder.containerName(container); + + final BlobContainerClient containerClient = containerClientBuilder.buildClient(); + log.info( + "Connected to Azure to account {}, container {}", + containerClient.getAccountName(), + containerClient.getBlobContainerName()); + + if (!containerClient.exists()) { + log.info("Creating container"); + containerClient.createIfNotExists(); + } else { + log.info("Container already exists"); + } + return containerClient; + } + + @Override + public void init(Map configuration) throws Exception { + client = createContainerClient(configuration); + idleTime = Integer.parseInt(configuration.getOrDefault("idle-time", 5).toString()); + extensions = + Set.of( + configuration + .getOrDefault("file-extensions", DEFAULT_EXTENSIONS_FILTER) + .toString() + .split(",")); + + log.info("Getting files with extensions {} (use '*' to no filter)", extensions); + } + + @Override + public List read() throws Exception { + List records = new ArrayList<>(); + final PagedIterable blobs; + try { + blobs = client.listBlobs(); + } catch (Exception e) { + log.error("Error listing blobs on container {}", client.getBlobContainerName(), e); + throw e; + } + boolean somethingFound = false; + for (BlobItem blob : blobs) { + final String name = blob.getName(); + if (blob.isDeleted()) { + log.debug("Skipping blob {}. deleted status", name); + continue; + } + boolean extensionAllowed = isExtensionAllowed(name, extensions); + if (!extensionAllowed) { + log.debug("Skipping blob with bad extension {}", name); + continue; + } + if (!blobsToCommit.contains(name)) { + log.info("Found new blob {}", name); + try { + byte[] read = client.getBlobClient(name).downloadContent().toBytes(); + ; + records.add(new BlobSourceRecord(read, name)); + somethingFound = true; + blobsToCommit.add(name); + } catch (Exception e) { + log.error("Error reading object {}", name, e); + throw e; + } + break; + } else { + log.info("Skipping already processed object {}", name); + } + } + if (!somethingFound) { + log.info("Nothing found, sleeping for {} seconds", idleTime); + Thread.sleep(idleTime * 1000L); + } else { + processed(0, 1); + } + return records; + } + + static boolean isExtensionAllowed(String name, Set extensions) { + if (extensions.contains(ALL_FILES)) { + return true; + } + String extension; + int extensionIndex = name.lastIndexOf('.'); + if (extensionIndex < 0 || extensionIndex == name.length() - 1) { + extension = ""; + } else { + extension = name.substring(extensionIndex + 1); + } + return extensions.contains(extension); + } + + @Override + protected Map buildAdditionalInfo() { + return Map.of("container", client.getBlobContainerName()); + } + + @Override + public void commit(List records) throws Exception { + for (Record record : records) { + BlobSourceRecord blobRecord = (BlobSourceRecord) record; + String name = blobRecord.name; + log.info("Removing blob {}", name); + client.getBlobClient(name).deleteIfExists(); + blobsToCommit.remove(name); + } + } + + private static class BlobSourceRecord implements Record { + private final byte[] read; + private final String name; + private final long timestamp = System.currentTimeMillis(); + + public BlobSourceRecord(byte[] read, String name) { + this.read = read; + this.name = name; + } + + /** + * the key is used for routing, so it is better to set it to something meaningful. In case + * of retransmission the message will be sent to the same partition. + * + * @return the key + */ + @Override + public Object key() { + return name; + } + + @Override + public Object value() { + return read; + } + + @Override + public String origin() { + return null; + } + + @Override + public Long timestamp() { + return timestamp; + } + + @Override + public Collection
headers() { + return List.of(new BlobHeader("name", name)); + } + + @AllArgsConstructor + @ToString + private static class BlobHeader implements Header { + + final String key; + final String value; + + @Override + public String key() { + return key; + } + + @Override + public String value() { + return value; + } + + @Override + public String valueAsString() { + return value; + } + } + } +} diff --git a/langstream-agents/langstream-agent-azure-blob-storage-source/src/main/java/ai/langstream/agents/azureblobstorage/source/AzureBlobStorageSourceCodeProvider.java b/langstream-agents/langstream-agent-azure-blob-storage-source/src/main/java/ai/langstream/agents/azureblobstorage/source/AzureBlobStorageSourceCodeProvider.java new file mode 100644 index 000000000..36182aa9f --- /dev/null +++ b/langstream-agents/langstream-agent-azure-blob-storage-source/src/main/java/ai/langstream/agents/azureblobstorage/source/AzureBlobStorageSourceCodeProvider.java @@ -0,0 +1,31 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.azureblobstorage.source; + +import ai.langstream.api.runner.code.AgentCode; +import ai.langstream.api.runner.code.AgentCodeProvider; + +public class AzureBlobStorageSourceCodeProvider implements AgentCodeProvider { + @Override + public boolean supports(String agentType) { + return "azure-blob-storage-source".equals(agentType); + } + + @Override + public AgentCode createInstance(String agentType) { + return new AzureBlobStorageSource(); + } +} diff --git a/langstream-agents/langstream-agent-azure-blob-storage-source/src/main/resources/META-INF/ai.langstream.agents.index b/langstream-agents/langstream-agent-azure-blob-storage-source/src/main/resources/META-INF/ai.langstream.agents.index new file mode 100644 index 000000000..57f87915b --- /dev/null +++ b/langstream-agents/langstream-agent-azure-blob-storage-source/src/main/resources/META-INF/ai.langstream.agents.index @@ -0,0 +1 @@ +azure-blob-storage-source \ No newline at end of file diff --git a/langstream-agents/langstream-agent-azure-blob-storage-source/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider b/langstream-agents/langstream-agent-azure-blob-storage-source/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider new file mode 100644 index 000000000..4f4a7838b --- /dev/null +++ b/langstream-agents/langstream-agent-azure-blob-storage-source/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider @@ -0,0 +1 @@ +ai.langstream.agents.azureblobstorage.source.AzureBlobStorageSourceCodeProvider \ No newline at end of file diff --git a/langstream-agents/langstream-agent-azure-blob-storage-source/src/test/java/ai/langstream/agents/azureblobstorage/source/AzureBlobStorageSourceTest.java b/langstream-agents/langstream-agent-azure-blob-storage-source/src/test/java/ai/langstream/agents/azureblobstorage/source/AzureBlobStorageSourceTest.java new file mode 100644 index 000000000..70c88c1b5 --- /dev/null +++ b/langstream-agents/langstream-agent-azure-blob-storage-source/src/test/java/ai/langstream/agents/azureblobstorage/source/AzureBlobStorageSourceTest.java @@ -0,0 +1,48 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.azureblobstorage.source; + +import static org.junit.jupiter.api.Assertions.*; + +import ai.langstream.api.runner.code.Record; +import com.azure.core.util.BinaryData; +import com.azure.storage.blob.BlobContainerClient; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +class AzureBlobStorageSourceTest { + + @Test + @Disabled + void test() throws Exception { + final String endpoint = ""; + final String connectionString = ""; + + AzureBlobStorageSource source = new AzureBlobStorageSource(); + final Map config = + Map.of("endpoint", endpoint, "storage-account-connection-string", connectionString); + final BlobContainerClient containerClient = + AzureBlobStorageSource.createContainerClient(config); + containerClient.getBlobClient("test.txt").deleteIfExists(); + containerClient.getBlobClient("test.txt").upload(BinaryData.fromString("test")); + source.init(config); + final List read = source.read(); + assertEquals(1, read.size()); + assertEquals("test", new String((byte[]) read.get(0).value())); + } +} diff --git a/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/queryhttp/HttpRequestAgent.java b/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/azureblobstorage/source/HttpRequestAgent.java similarity index 99% rename from langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/queryhttp/HttpRequestAgent.java rename to langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/azureblobstorage/source/HttpRequestAgent.java index d2d2c6e07..43b638d7e 100644 --- a/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/queryhttp/HttpRequestAgent.java +++ b/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/azureblobstorage/source/HttpRequestAgent.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package ai.langstream.agents.queryhttp; +package ai.langstream.agents.azureblobstorage.source; import ai.langstream.ai.agents.commons.JsonRecord; import ai.langstream.ai.agents.commons.TransformContext; diff --git a/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/queryhttp/HttpRequestAgentProvider.java b/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/azureblobstorage/source/HttpRequestAgentProvider.java similarity index 95% rename from langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/queryhttp/HttpRequestAgentProvider.java rename to langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/azureblobstorage/source/HttpRequestAgentProvider.java index c0ca6f854..b8a228543 100644 --- a/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/queryhttp/HttpRequestAgentProvider.java +++ b/langstream-agents/langstream-agent-http-request/src/main/java/ai/langstream/agents/azureblobstorage/source/HttpRequestAgentProvider.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package ai.langstream.agents.queryhttp; +package ai.langstream.agents.azureblobstorage.source; import ai.langstream.api.runner.code.AgentCodeProvider; import ai.langstream.api.runner.code.AgentProcessor; diff --git a/langstream-agents/langstream-agent-http-request/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider b/langstream-agents/langstream-agent-http-request/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider index 248a0d85f..afa4d7185 100644 --- a/langstream-agents/langstream-agent-http-request/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider +++ b/langstream-agents/langstream-agent-http-request/src/main/resources/META-INF/services/ai.langstream.api.runner.code.AgentCodeProvider @@ -1 +1 @@ -ai.langstream.agents.queryhttp.HttpRequestAgentProvider \ No newline at end of file +ai.langstream.agents.azureblobstorage.source.HttpRequestAgentProvider \ No newline at end of file diff --git a/langstream-agents/pom.xml b/langstream-agents/pom.xml index e362ad4c8..573bca172 100644 --- a/langstream-agents/pom.xml +++ b/langstream-agents/pom.xml @@ -41,5 +41,6 @@ langstream-vector-agents langstream-agents-flow-control langstream-agent-http-request + langstream-agent-azure-blob-storage-source diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/ObjectStorageSourceAgentProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/ObjectStorageSourceAgentProvider.java new file mode 100644 index 000000000..59bf1bd5f --- /dev/null +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/ObjectStorageSourceAgentProvider.java @@ -0,0 +1,209 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.runtime.impl.k8s.agents; + +import ai.langstream.api.doc.AgentConfig; +import ai.langstream.api.doc.ConfigProperty; +import ai.langstream.api.model.AgentConfiguration; +import ai.langstream.api.runtime.ComponentType; +import ai.langstream.impl.agents.AbstractComposableAgentProvider; +import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import java.util.Set; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +/** Implements support for S3/Azure Source Agents. */ +@Slf4j +public class ObjectStorageSourceAgentProvider extends AbstractComposableAgentProvider { + + protected static final String AZURE_BLOB_STORAGE_SOURCE = "azure-blob-storage-source"; + protected static final String S3_SOURCE = "s3-source"; + + public ObjectStorageSourceAgentProvider() { + super( + Set.of(S3_SOURCE, AZURE_BLOB_STORAGE_SOURCE), + List.of(KubernetesClusterRuntime.CLUSTER_TYPE, "none")); + } + + @Override + protected final ComponentType getComponentType(AgentConfiguration agentConfiguration) { + return ComponentType.SOURCE; + } + + @Override + protected Class getAgentConfigModelClass(String type) { + switch (type) { + case S3_SOURCE: + return S3SourceConfiguration.class; + case AZURE_BLOB_STORAGE_SOURCE: + return AzureBlobStorageConfiguration.class; + default: + throw new IllegalArgumentException("Unknown agent type: " + type); + } + } + + @AgentConfig(name = "S3 Source", description = "Reads data from S3 bucket") + @Data + public static class S3SourceConfiguration { + + protected static final String DEFAULT_BUCKET_NAME = "langstream-source"; + protected static final String DEFAULT_ENDPOINT = "http://minio-endpoint.-not-set:9090"; + protected static final String DEFAULT_ACCESSKEY = "minioadmin"; + protected static final String DEFAULT_SECRETKEY = "minioadmin"; + protected static final String DEFAULT_FILE_EXTENSIONS = "pdf,docx,html,htm,md,txt"; + + @ConfigProperty( + description = + """ + The name of the bucket to read from. + """, + defaultValue = DEFAULT_BUCKET_NAME) + private String bucketName = DEFAULT_BUCKET_NAME; + + @ConfigProperty( + description = + """ + The endpoint of the S3 server. + """, + defaultValue = DEFAULT_ENDPOINT) + private String endpoint = DEFAULT_ENDPOINT; + + @ConfigProperty( + description = + """ + Access key for the S3 server. + """, + defaultValue = DEFAULT_ACCESSKEY) + @JsonProperty("access-key") + private String accessKey = DEFAULT_ACCESSKEY; + + @ConfigProperty( + description = + """ + Secret key for the S3 server. + """, + defaultValue = DEFAULT_SECRETKEY) + @JsonProperty("secret-key") + private String secretKey = DEFAULT_SECRETKEY; + + @ConfigProperty( + required = false, + description = + """ + Region for the S3 server. + """) + private String region = ""; + + @ConfigProperty( + defaultValue = "5", + description = + """ + Time in seconds to sleep after polling for new files. + """) + @JsonProperty("idle-time") + private int idleTime; + + @ConfigProperty( + defaultValue = DEFAULT_FILE_EXTENSIONS, + description = + """ + Comma separated list of file extensions to filter by. + """) + @JsonProperty("file-extensions") + private String fileExtensions = DEFAULT_FILE_EXTENSIONS; + } + + @AgentConfig( + name = "Azure Blob Storage Source", + description = + """ + Reads data from Azure blobs. There are three supported ways to authenticate: + - SAS token + - Storage account name and key + - Storage account connection string + """) + @Data + public static class AzureBlobStorageConfiguration { + + @ConfigProperty( + defaultValue = "langstream-azure-source", + description = + """ + The name of the Azure econtainer to read from. + """) + private String container; + + @ConfigProperty( + required = true, + description = + """ + Endpoint to connect to. Usually it's https://.blob.core.windows.net. + """) + private String endpoint; + + @ConfigProperty( + description = + """ + Azure SAS token. If not provided, storage account name and key must be provided. + """) + @JsonProperty("sas-token") + private String sasToken; + + @ConfigProperty( + description = + """ + Azure storage account name. If not provided, SAS token must be provided. + """) + @JsonProperty("storage-account-name") + private String storageAccountName; + + @ConfigProperty( + description = + """ + Azure storage account key. If not provided, SAS token must be provided. + """) + @JsonProperty("storage-account-key") + private String storageAccountKey; + + @ConfigProperty( + description = + """ + Azure storage account connection string. If not provided, SAS token must be provided. + """) + @JsonProperty("storage-account-connection-string") + private String storageAccountConnectionString; + + @ConfigProperty( + defaultValue = "5", + description = + """ + Time in seconds to sleep after polling for new files. + """) + @JsonProperty("idle-time") + private int idleTime; + + @ConfigProperty( + defaultValue = "pdf,docx,html,htm,md,txt", + description = + """ + Comma separated list of file extensions to filter by. + """) + @JsonProperty("file-extensions") + private String fileExtensions; + } +} diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/S3SourceAgentProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/S3SourceAgentProvider.java deleted file mode 100644 index 9e7065374..000000000 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/S3SourceAgentProvider.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright DataStax, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ai.langstream.runtime.impl.k8s.agents; - -import ai.langstream.api.doc.AgentConfig; -import ai.langstream.api.doc.ConfigProperty; -import ai.langstream.api.model.AgentConfiguration; -import ai.langstream.api.runtime.ComponentType; -import ai.langstream.impl.agents.AbstractComposableAgentProvider; -import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime; -import com.fasterxml.jackson.annotation.JsonProperty; -import java.util.List; -import java.util.Set; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; - -/** Implements support for S3 Source Agents. */ -@Slf4j -public class S3SourceAgentProvider extends AbstractComposableAgentProvider { - - public S3SourceAgentProvider() { - super(Set.of("s3-source"), List.of(KubernetesClusterRuntime.CLUSTER_TYPE, "none")); - } - - @Override - protected final ComponentType getComponentType(AgentConfiguration agentConfiguration) { - return ComponentType.SOURCE; - } - - @Override - protected Class getAgentConfigModelClass(String type) { - switch (type) { - case "s3-source": - return S3SourceConfiguration.class; - default: - throw new IllegalArgumentException("Unknown agent type: " + type); - } - } - - @AgentConfig(name = "S3 Source", description = "Reads data from S3 bucket") - @Data - public static class S3SourceConfiguration { - - protected static final String DEFAULT_BUCKET_NAME = "langstream-source"; - protected static final String DEFAULT_ENDPOINT = "http://minio-endpoint.-not-set:9090"; - protected static final String DEFAULT_ACCESSKEY = "minioadmin"; - protected static final String DEFAULT_SECRETKEY = "minioadmin"; - protected static final String DEFAULT_FILE_EXTENSIONS = "pdf,docx,html,htm,md,txt"; - - @ConfigProperty( - description = """ - The name of the bucket to read from. - """, - defaultValue = DEFAULT_BUCKET_NAME) - private String bucketName = DEFAULT_BUCKET_NAME; - - @ConfigProperty( - description = """ - The endpoint of the S3 server. - """, - defaultValue = DEFAULT_ENDPOINT) - private String endpoint = DEFAULT_ENDPOINT; - - @ConfigProperty( - description = """ - Access key for the S3 server. - """, - defaultValue = DEFAULT_ACCESSKEY) - @JsonProperty("access-key") - private String accessKey = DEFAULT_ACCESSKEY; - - @ConfigProperty( - description = """ - Secret key for the S3 server. - """, - defaultValue = DEFAULT_SECRETKEY) - @JsonProperty("secret-key") - private String secretKey = DEFAULT_SECRETKEY; - - @ConfigProperty( - required = false, - description = - """ - Region for the S3 server. - """) - private String region = ""; - - @ConfigProperty( - defaultValue = "5", - description = - """ - Region for the S3 server. - """) - @JsonProperty("idle-time") - private int idleTime; - - @ConfigProperty( - defaultValue = DEFAULT_FILE_EXTENSIONS, - description = - """ - Comma separated list of file extensions to filter by. - """) - @JsonProperty("file-extensions") - private String fileExtensions = DEFAULT_FILE_EXTENSIONS; - } -} diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AgentNodeProvider b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AgentNodeProvider index 61bbaaea6..71a308a35 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AgentNodeProvider +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AgentNodeProvider @@ -1,6 +1,6 @@ ai.langstream.runtime.impl.k8s.agents.PythonCodeAgentProvider ai.langstream.runtime.impl.k8s.agents.KubernetesGenAIToolKitFunctionAgentProvider -ai.langstream.runtime.impl.k8s.agents.S3SourceAgentProvider +ai.langstream.runtime.impl.k8s.agents.ObjectStorageSourceAgentProvider ai.langstream.runtime.impl.k8s.agents.WebCrawlerSourceAgentProvider ai.langstream.runtime.impl.k8s.agents.TextProcessingAgentsProvider ai.langstream.runtime.impl.k8s.agents.KafkaConnectAgentsProvider diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KafkaConnectAgentsProviderTest.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KafkaConnectAgentsProviderTest.java index 841f9247c..78985a3c5 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KafkaConnectAgentsProviderTest.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KafkaConnectAgentsProviderTest.java @@ -135,61 +135,35 @@ public void testDocumentation() { final Map model = new PluginsRegistry() .lookupAgentImplementation( - "s3-source", - new NoOpComputeClusterRuntimeProvider.NoOpClusterRuntime()) + "sink", new NoOpComputeClusterRuntimeProvider.NoOpClusterRuntime()) .generateSupportedTypesDocumentation(); Assertions.assertEquals( """ - { - "s3-source" : { - "name" : "S3 Source", - "description" : "Reads data from S3 bucket", - "properties" : { - "access-key" : { - "description" : "Access key for the S3 server.", - "required" : false, - "type" : "string", - "defaultValue" : "minioadmin" - }, - "bucketName" : { - "description" : "The name of the bucket to read from.", - "required" : false, - "type" : "string", - "defaultValue" : "langstream-source" - }, - "endpoint" : { - "description" : "The endpoint of the S3 server.", - "required" : false, - "type" : "string", - "defaultValue" : "http://minio-endpoint.-not-set:9090" - }, - "file-extensions" : { - "description" : "Comma separated list of file extensions to filter by.", - "required" : false, - "type" : "string", - "defaultValue" : "pdf,docx,html,htm,md,txt" - }, - "idle-time" : { - "description" : "Region for the S3 server.", - "required" : false, - "type" : "integer", - "defaultValue" : "5" - }, - "region" : { - "description" : "Region for the S3 server.", - "required" : false, - "type" : "string" - }, - "secret-key" : { - "description" : "Secret key for the S3 server.", - "required" : false, - "type" : "string", - "defaultValue" : "minioadmin" - } - } - } - }""", + { + "sink" : { + "name" : "Kafka Connect Sink agent", + "description" : "Run any Kafka Connect Sink.\\n All the configuration properties are passed to the Kafka Connect Sink.", + "properties" : { + "connector.class" : { + "description" : "Java main class for the Kafka Sink connector.", + "required" : true, + "type" : "string" + } + } + }, + "source" : { + "name" : "Kafka Connect Source agent", + "description" : "Run any Kafka Connect Source.\\n All the configuration properties are passed to the Kafka Connect Source.", + "properties" : { + "connector.class" : { + "description" : "Java main class for the Kafka Source connector.", + "required" : true, + "type" : "string" + } + } + } + }""", SerializationUtil.prettyPrintJson(model)); } } diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProviderTest.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProviderTest.java index 9de14dc65..1a52341bd 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProviderTest.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProviderTest.java @@ -115,61 +115,73 @@ public void testDocumentation() { final Map model = new PluginsRegistry() .lookupAgentImplementation( - "s3-source", + "vector-db-sink", new NoOpComputeClusterRuntimeProvider.NoOpClusterRuntime()) .generateSupportedTypesDocumentation(); Assertions.assertEquals( """ - { - "s3-source" : { - "name" : "S3 Source", - "description" : "Reads data from S3 bucket", - "properties" : { - "access-key" : { - "description" : "Access key for the S3 server.", - "required" : false, - "type" : "string", - "defaultValue" : "minioadmin" - }, - "bucketName" : { - "description" : "The name of the bucket to read from.", - "required" : false, - "type" : "string", - "defaultValue" : "langstream-source" - }, - "endpoint" : { - "description" : "The endpoint of the S3 server.", - "required" : false, - "type" : "string", - "defaultValue" : "http://minio-endpoint.-not-set:9090" - }, - "file-extensions" : { - "description" : "Comma separated list of file extensions to filter by.", - "required" : false, - "type" : "string", - "defaultValue" : "pdf,docx,html,htm,md,txt" - }, - "idle-time" : { - "description" : "Region for the S3 server.", - "required" : false, - "type" : "integer", - "defaultValue" : "5" - }, - "region" : { - "description" : "Region for the S3 server.", - "required" : false, - "type" : "string" - }, - "secret-key" : { - "description" : "Secret key for the S3 server.", - "required" : false, - "type" : "string", - "defaultValue" : "minioadmin" - } - } - } - }""", + { + "query-vector-db" : { + "name" : "Query a vector database", + "description" : "Query a vector database using Vector Search capabilities.", + "properties" : { + "composable" : { + "description" : "Whether this step can be composed with other steps.", + "required" : false, + "type" : "boolean", + "defaultValue" : "true" + }, + "datasource" : { + "description" : "Reference to a datasource id configured in the application.", + "required" : true, + "type" : "string" + }, + "fields" : { + "description" : "Fields of the record to use as input parameters for the query.", + "required" : false, + "type" : "array", + "items" : { + "description" : "Fields of the record to use as input parameters for the query.", + "required" : false, + "type" : "string" + } + }, + "only-first" : { + "description" : "If true, only the first result of the query is stored in the output field.", + "required" : false, + "type" : "boolean", + "defaultValue" : "false" + }, + "output-field" : { + "description" : "The name of the field to use to store the query result.", + "required" : true, + "type" : "string" + }, + "query" : { + "description" : "The query to use to extract the data.", + "required" : true, + "type" : "string" + }, + "when" : { + "description" : "Execute the step only when the condition is met.\\nYou can use the expression language to reference the message.\\nExample: when: \\"value.first == 'f1' && value.last.toUpperCase() == 'L1'\\"", + "required" : false, + "type" : "string" + } + } + }, + "vector-db-sink" : { + "name" : "Vector database sink", + "description" : "Store vectors in a vector database.\\nConfiguration properties depends on the vector database implementation, specified by the \\"datasource\\" property.", + "properties" : { + "datasource" : { + "description" : "The defined datasource ID to use to store the vectors.", + "required" : true, + "type" : "string" + } + } + } + }""", SerializationUtil.prettyPrintJson(model)); } } diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/S3SourceAgentProviderTest.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/S3SourceAgentProviderTest.java index 3ac911cd9..3a6e52be5 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/S3SourceAgentProviderTest.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/S3SourceAgentProviderTest.java @@ -103,55 +103,104 @@ public void testDocumentation() { Assertions.assertEquals( """ - { - "s3-source" : { - "name" : "S3 Source", - "description" : "Reads data from S3 bucket", - "properties" : { - "access-key" : { - "description" : "Access key for the S3 server.", - "required" : false, - "type" : "string", - "defaultValue" : "minioadmin" - }, - "bucketName" : { - "description" : "The name of the bucket to read from.", - "required" : false, - "type" : "string", - "defaultValue" : "langstream-source" - }, - "endpoint" : { - "description" : "The endpoint of the S3 server.", - "required" : false, - "type" : "string", - "defaultValue" : "http://minio-endpoint.-not-set:9090" - }, - "file-extensions" : { - "description" : "Comma separated list of file extensions to filter by.", - "required" : false, - "type" : "string", - "defaultValue" : "pdf,docx,html,htm,md,txt" - }, - "idle-time" : { - "description" : "Region for the S3 server.", - "required" : false, - "type" : "integer", - "defaultValue" : "5" - }, - "region" : { - "description" : "Region for the S3 server.", - "required" : false, - "type" : "string" - }, - "secret-key" : { - "description" : "Secret key for the S3 server.", - "required" : false, - "type" : "string", - "defaultValue" : "minioadmin" - } - } - } - }""", + { + "azure-blob-storage-source" : { + "name" : "Azure Blob Storage Source", + "description" : "Reads data from Azure blobs. There are three supported ways to authenticate:\\n- SAS token\\n- Storage account name and key\\n- Storage account connection string", + "properties" : { + "container" : { + "description" : "The name of the Azure econtainer to read from.", + "required" : false, + "type" : "string", + "defaultValue" : "langstream-azure-source" + }, + "endpoint" : { + "description" : "Endpoint to connect to. Usually it's https://.blob.core.windows.net.", + "required" : true, + "type" : "string" + }, + "file-extensions" : { + "description" : "Comma separated list of file extensions to filter by.", + "required" : false, + "type" : "string", + "defaultValue" : "pdf,docx,html,htm,md,txt" + }, + "idle-time" : { + "description" : "Time in seconds to sleep after polling for new files.", + "required" : false, + "type" : "integer", + "defaultValue" : "5" + }, + "sas-token" : { + "description" : "Azure SAS token. If not provided, storage account name and key must be provided.", + "required" : false, + "type" : "string" + }, + "storage-account-connection-string" : { + "description" : "Azure storage account connection string. If not provided, SAS token must be provided.", + "required" : false, + "type" : "string" + }, + "storage-account-key" : { + "description" : "Azure storage account key. If not provided, SAS token must be provided.", + "required" : false, + "type" : "string" + }, + "storage-account-name" : { + "description" : "Azure storage account name. If not provided, SAS token must be provided.", + "required" : false, + "type" : "string" + } + } + }, + "s3-source" : { + "name" : "S3 Source", + "description" : "Reads data from S3 bucket", + "properties" : { + "access-key" : { + "description" : "Access key for the S3 server.", + "required" : false, + "type" : "string", + "defaultValue" : "minioadmin" + }, + "bucketName" : { + "description" : "The name of the bucket to read from.", + "required" : false, + "type" : "string", + "defaultValue" : "langstream-source" + }, + "endpoint" : { + "description" : "The endpoint of the S3 server.", + "required" : false, + "type" : "string", + "defaultValue" : "http://minio-endpoint.-not-set:9090" + }, + "file-extensions" : { + "description" : "Comma separated list of file extensions to filter by.", + "required" : false, + "type" : "string", + "defaultValue" : "pdf,docx,html,htm,md,txt" + }, + "idle-time" : { + "description" : "Time in seconds to sleep after polling for new files.", + "required" : false, + "type" : "integer", + "defaultValue" : "5" + }, + "region" : { + "description" : "Region for the S3 server.", + "required" : false, + "type" : "string" + }, + "secret-key" : { + "description" : "Secret key for the S3 server.", + "required" : false, + "type" : "string", + "defaultValue" : "minioadmin" + } + } + } + }""", SerializationUtil.prettyPrintJson(model)); } } diff --git a/langstream-runtime/langstream-runtime-impl/pom.xml b/langstream-runtime/langstream-runtime-impl/pom.xml index 9dc7cba36..2c4a6054d 100644 --- a/langstream-runtime/langstream-runtime-impl/pom.xml +++ b/langstream-runtime/langstream-runtime-impl/pom.xml @@ -269,6 +269,13 @@ provided + + ${project.groupId} + langstream-agent-azure-blob-storage-source + ${project.version} + provided + + ${project.groupId} langstream-agent-webcrawler @@ -512,6 +519,16 @@ ${project.build.directory}/agents + + ${project.groupId} + langstream-agent-azure-blob-storage-source + ${project.version} + nar + nar + false + ${project.build.directory}/agents + + ${project.groupId} langstream-agent-webcrawler