diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/BaseEndToEndTest.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/BaseEndToEndTest.java index 40ce49147..7129318cd 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/BaseEndToEndTest.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/BaseEndToEndTest.java @@ -281,7 +281,11 @@ protected static ConsumeGatewayMessage consumeOneMessageFromGateway( "bin/langstream gateway consume %s %s %s -n 1" .formatted(applicationId, gatewayId, String.join(" ", extraArgs)); final String response = executeCommandOnClient(command); - final String secondLine = response.lines().collect(Collectors.toList()).get(1); + final List lines = response.lines().collect(Collectors.toList()); + if (lines.size() <= 1) { + return null; + } + final String secondLine = lines.get(1); return ConsumeGatewayMessage.readValue(secondLine); } @@ -1024,6 +1028,13 @@ protected static boolean isApplicationReady( .filter(s -> !s.isBlank()) .collect(Collectors.toList()); System.out.println("app line " + lineAsList); + final String status = lineAsList.get(3); + if (status != null && status.equals("ERROR_DEPLOYING")) { + log.info("application {} is in ERROR_DEPLOYING state, dumping status", applicationId); + executeCommandOnClient( + "bin/langstream apps get %s -o yaml".formatted(applicationId).split(" ")); + throw new IllegalStateException("application is in ERROR_DEPLOYING state"); + } if (lineAsList.size() <= 5) { return false; } diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/WebCrawlerToVectorIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/WebCrawlerToVectorIT.java new file mode 100644 index 000000000..2e88e905f --- /dev/null +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/WebCrawlerToVectorIT.java @@ -0,0 +1,99 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.tests; + +import ai.langstream.tests.util.ConsumeGatewayMessage; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@Slf4j +@ExtendWith(BaseEndToEndTest.class) +@Tag(BaseEndToEndTest.CATEGORY_NEEDS_CREDENTIALS) +public class WebCrawlerToVectorIT extends BaseEndToEndTest { + + static Map appEnv; + + @BeforeAll + public static void checkCredentials() { + appEnv = + getAppEnvMapFromSystem( + List.of( + "OPEN_AI_ACCESS_KEY", + "OPEN_AI_URL", + "OPEN_AI_EMBEDDINGS_MODEL", + "OPEN_AI_CHAT_COMPLETIONS_MODEL", + "OPEN_AI_PROVIDER", + "ASTRA_TOKEN", + "ASTRA_CLIENT_ID", + "ASTRA_SECRET", + "ASTRA_SECURE_BUNDLE", + "ASTRA_ENVIRONMENT", + "ASTRA_DATABASE")); + } + + @Test + public void test() throws Exception { + installLangStreamCluster(false); + final String tenant = "ten-" + System.currentTimeMillis(); + setupTenant(tenant); + + final String applicationId = "app"; + + deployLocalApplication(applicationId, "webcrawler-to-vector", appEnv); + awaitApplicationReady(applicationId, 3); + + final String sessionId = UUID.randomUUID().toString(); + + executeCommandOnClient( + "bin/langstream gateway produce %s user-input -v 'When was released LangStream 0.0.20? Write it in format yyyy-dd-mm.' -p sessionId=%s" + .formatted(applicationId, sessionId) + .split(" ")); + + boolean ok = false; + for (int i = 0; i < 10; i++) { + final ConsumeGatewayMessage message = + consumeOneMessageFromGateway( + applicationId, + "llm-debug", + "-p sessionId=%s --connect-timeout 30".formatted(sessionId).split(" ")); + if (message == null) { + Thread.sleep(5000); + continue; + } + + log.info("Output: {}", message); + final String asString = (String) message.getRecord().getValue(); + final String answer = (String) JSON_MAPPER.readValue(asString, Map.class).get("answer"); + log.info("Answer: {}", answer); + if (answer.contains("2023-09-19")) { + ok = true; + break; + } + Thread.sleep(5000); + } + if (!ok) { + Assertions.fail( + "the chatbot did not answer correctly, maybe the crawler didn't finished yet?"); + } + } +} diff --git a/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/chatbot.yaml b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/chatbot.yaml new file mode 100644 index 000000000..b1775c504 --- /dev/null +++ b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/chatbot.yaml @@ -0,0 +1,90 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +topics: + - name: "questions-topic" + creation-mode: create-if-not-exists + - name: "answers-topic" + creation-mode: create-if-not-exists + - name: "log-topic" + creation-mode: create-if-not-exists +errors: + on-failure: "skip" +resources: + size: 2 +pipeline: + - name: "convert-to-structure" + type: "document-to-json" + input: "questions-topic" + configuration: + text-field: "question" + - name: "compute-embeddings" + type: "compute-ai-embeddings" + configuration: + model: "{{{secrets.open-ai.embeddings-model}}}" + embeddings-field: "value.question_embeddings" + text: "{{% value.question }}" + flush-interval: 0 + - name: "lookup-related-documents-in-llm" + type: "query" + configuration: + datasource: "AstraDatasource" + query: "SELECT text FROM langstreamtest.documents ORDER BY embeddings_vector ANN OF ? LIMIT 5" + fields: + - "value.question_embeddings" + output-field: "value.related_documents" + - name: "ai-chat-completions" + type: "ai-chat-completions" + + configuration: + model: "{{{secrets.open-ai.chat-completions-model}}}" # This needs to be set to the model deployment name, not the base name + # on the log-topic we add a field with the answer + completion-field: "value.answer" + # we are also logging the prompt we sent to the LLM + log-field: "value.prompt" + # here we configure the streaming behavior + # as soon as the LLM answers with a chunk we send it to the answers-topic + stream-to-topic: "answers-topic" + # on the streaming answer we send the answer as whole message + # the 'value' syntax is used to refer to the whole value of the message + stream-response-completion-field: "value" + # we want to stream the answer as soon as we have 20 chunks + # in order to reduce latency for the first message the agent sends the first message + # with 1 chunk, then with 2 chunks....up to the min-chunks-per-message value + # eventually we want to send bigger messages to reduce the overhead of each message on the topic + min-chunks-per-message: 20 + messages: + - role: system + content: | + An user is going to perform a questions, he documents below may help you in answering to their questions. + Please try to leverage them in your answer as much as possible. + Take into consideration that the user is always asking questions about the LangStream project. + If you provide code or YAML snippets, please explicitly state that they are examples. + Do not provide information that is not related to the LangStream project. + + Documents: + {{%# value.related_documents}} + {{% text}} + {{%/ value.related_documents}} + - role: user + content: "{{% value.question}}" + - name: "cleanup-response" + type: "drop-fields" + output: "log-topic" + configuration: + fields: + - "question_embeddings" + - "related_documents" \ No newline at end of file diff --git a/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/configuration.yaml b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/configuration.yaml new file mode 100644 index 000000000..525e68629 --- /dev/null +++ b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/configuration.yaml @@ -0,0 +1,35 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +configuration: + resources: + - type: "open-ai-configuration" + name: "OpenAI Azure configuration" + configuration: + url: "{{ secrets.open-ai.url }}" + access-key: "{{ secrets.open-ai.access-key }}" + provider: "{{ secrets.open-ai.provider }}" + - type: "datasource" + name: "AstraDatasource" + configuration: + service: "astra" + clientId: "{{{ secrets.astra.clientId }}}" + secret: "{{{ secrets.astra.secret }}}" + secureBundle: "{{{ secrets.astra.secureBundle }}}" + database: "{{{ secrets.astra.database }}}" + token: "{{{ secrets.astra.token }}}" + environment: "{{{ secrets.astra.environment }}}" \ No newline at end of file diff --git a/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/crawler.yaml b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/crawler.yaml new file mode 100644 index 000000000..852914c1d --- /dev/null +++ b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/crawler.yaml @@ -0,0 +1,96 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name: "Crawl a website" +topics: + - name: "chunks-topic" + creation-mode: create-if-not-exists +resources: + size: 2 +pipeline: + - name: "Crawl the WebSite" + type: "webcrawler-source" + configuration: + seed-urls: ["https://langstream.ai/changelog/"] + allowed-domains: ["https://langstream.ai"] + forbidden-paths: [] + min-time-between-requests: 500 + reindex-interval-seconds: 3600 + max-error-count: 5 + max-urls: 1 + max-depth: 1 + handle-robots-file: true + user-agent: "" + scan-html-documents: true + http-timeout: 10000 + handle-cookies: true + max-unflushed-pages: 100 + bucketName: "langstream-test-crawler-to-vector" + endpoint: http://minio.minio-dev.svc.cluster.local:9000 + access-key: minioadmin + secret-key: minioadmin + - name: "Extract text" + type: "text-extractor" + - name: "Normalise text" + type: "text-normaliser" + configuration: + make-lowercase: true + trim-spaces: true + - name: "Detect language" + type: "language-detector" + configuration: + allowedLanguages: ["en"] + property: "language" + - name: "Split into chunks" + type: "text-splitter" + configuration: + splitter_type: "RecursiveCharacterTextSplitter" + chunk_size: 400 + separators: ["\n\n", "\n", " ", ""] + keep_separator: false + chunk_overlap: 100 + length_function: "cl100k_base" + - name: "Convert to structured data" + type: "document-to-json" + configuration: + text-field: text + copy-properties: true + - name: "prepare-structure" + type: "compute" + configuration: + fields: + - name: "value.filename" + expression: "properties.url" + type: STRING + - name: "value.chunk_id" + expression: "properties.chunk_id" + type: STRING + - name: "value.language" + expression: "properties.language" + type: STRING + - name: "value.chunk_num_tokens" + expression: "properties.chunk_num_tokens" + type: STRING + - name: "compute-embeddings" + id: "step1" + type: "compute-ai-embeddings" + output: "chunks-topic" + configuration: + model: "{{{secrets.open-ai.embeddings-model}}}" + embeddings-field: "value.embeddings_vector" + text: "{{% value.text }}" + batch-size: 10 + flush-interval: 500 \ No newline at end of file diff --git a/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/gateways.yaml b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/gateways.yaml new file mode 100644 index 000000000..739289814 --- /dev/null +++ b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/gateways.yaml @@ -0,0 +1,50 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +gateways: + - id: "user-input" + type: produce + topic: "questions-topic" + parameters: + - sessionId + produceOptions: + headers: + - key: langstream-client-session-id + valueFromParameters: sessionId + + - id: "bot-output" + type: consume + topic: "answers-topic" + parameters: + - sessionId + consumeOptions: + filters: + headers: + - key: langstream-client-session-id + valueFromParameters: sessionId + + + - id: "llm-debug" + type: consume + topic: "log-topic" + parameters: + - sessionId + consumeOptions: + filters: + headers: + - key: langstream-client-session-id + valueFromParameters: sessionId \ No newline at end of file diff --git a/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/write-to-vector.yaml b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/write-to-vector.yaml new file mode 100644 index 000000000..f324f7e4f --- /dev/null +++ b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/write-to-vector.yaml @@ -0,0 +1,65 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name: "Write to vector" +topics: + - name: "chunks-topic" + creation-mode: create-if-not-exists +assets: + - name: "langstream-keyspace" + asset-type: "astra-keyspace" + creation-mode: create-if-not-exists + deletion-mode: delete + config: + keyspace: "langstreamtest" + datasource: "AstraDatasource" + delete-statements: + - | + DROP KEYSPACE IF EXISTS langstreamtest; + - name: "documents-table" + asset-type: "cassandra-table" + creation-mode: create-if-not-exists + deletion-mode: delete + config: + table-name: "documents" + keyspace: "langstreamtest" + datasource: "AstraDatasource" + delete-statements: + - | + DROP TABLE IF EXISTS langstreamtest.documents; + create-statements: + - | + CREATE TABLE IF NOT EXISTS langstreamtest.documents ( + filename TEXT, + chunk_id int, + num_tokens int, + language TEXT, + text TEXT, + embeddings_vector VECTOR, + PRIMARY KEY (filename, chunk_id)); + - | + CREATE CUSTOM INDEX IF NOT EXISTS documents_ann_index ON langstreamtest.documents(embeddings_vector) USING 'StorageAttachedIndex'; +pipeline: + - name: "Write to Astra" + type: "vector-db-sink" + input: "chunks-topic" + resources: + size: 2 + configuration: + datasource: "AstraDatasource" + table-name: "documents" + keyspace: "langstreamtest" + mapping: "filename=value.filename, chunk_id=value.chunk_id, language=value.language, text=value.text, embeddings_vector=value.embeddings_vector, num_tokens=value.chunk_num_tokens" \ No newline at end of file diff --git a/langstream-e2e-tests/src/test/resources/secrets/secret1.yaml b/langstream-e2e-tests/src/test/resources/secrets/secret1.yaml index b9aff840a..369f6fd76 100644 --- a/langstream-e2e-tests/src/test/resources/secrets/secret1.yaml +++ b/langstream-e2e-tests/src/test/resources/secrets/secret1.yaml @@ -32,4 +32,12 @@ secrets: url: "${OPEN_AI_URL}" provider: "${OPEN_AI_PROVIDER}" embeddings-model: "${OPEN_AI_EMBEDDINGS_MODEL}" - chat-completions-model: "${OPEN_AI_CHAT_COMPLETIONS_MODEL}" \ No newline at end of file + chat-completions-model: "${OPEN_AI_CHAT_COMPLETIONS_MODEL}" + - id: astra + data: + clientId: "${ASTRA_CLIENT_ID}" + secret: "${ASTRA_SECRET}" + token: "${ASTRA_TOKEN}" + database: "${ASTRA_DATABASE}" + secureBundle: "${ASTRA_SECURE_BUNDLE}" + environment: "${ASTRA_ENVIRONMENT}" \ No newline at end of file