Skip to content

Commit

Permalink
Add e2e test with WebCrawler + embeddings + vector sink + completions (
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi authored Sep 19, 2023
1 parent 27ed749 commit 9bba4f0
Show file tree
Hide file tree
Showing 8 changed files with 456 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,11 @@ protected static ConsumeGatewayMessage consumeOneMessageFromGateway(
"bin/langstream gateway consume %s %s %s -n 1"
.formatted(applicationId, gatewayId, String.join(" ", extraArgs));
final String response = executeCommandOnClient(command);
final String secondLine = response.lines().collect(Collectors.toList()).get(1);
final List<String> lines = response.lines().collect(Collectors.toList());
if (lines.size() <= 1) {
return null;
}
final String secondLine = lines.get(1);
return ConsumeGatewayMessage.readValue(secondLine);
}

Expand Down Expand Up @@ -1024,6 +1028,13 @@ protected static boolean isApplicationReady(
.filter(s -> !s.isBlank())
.collect(Collectors.toList());
System.out.println("app line " + lineAsList);
final String status = lineAsList.get(3);
if (status != null && status.equals("ERROR_DEPLOYING")) {
log.info("application {} is in ERROR_DEPLOYING state, dumping status", applicationId);
executeCommandOnClient(
"bin/langstream apps get %s -o yaml".formatted(applicationId).split(" "));
throw new IllegalStateException("application is in ERROR_DEPLOYING state");
}
if (lineAsList.size() <= 5) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ai.langstream.tests;

import ai.langstream.tests.util.ConsumeGatewayMessage;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@Slf4j
@ExtendWith(BaseEndToEndTest.class)
@Tag(BaseEndToEndTest.CATEGORY_NEEDS_CREDENTIALS)
public class WebCrawlerToVectorIT extends BaseEndToEndTest {

static Map<String, String> appEnv;

@BeforeAll
public static void checkCredentials() {
appEnv =
getAppEnvMapFromSystem(
List.of(
"OPEN_AI_ACCESS_KEY",
"OPEN_AI_URL",
"OPEN_AI_EMBEDDINGS_MODEL",
"OPEN_AI_CHAT_COMPLETIONS_MODEL",
"OPEN_AI_PROVIDER",
"ASTRA_TOKEN",
"ASTRA_CLIENT_ID",
"ASTRA_SECRET",
"ASTRA_SECURE_BUNDLE",
"ASTRA_ENVIRONMENT",
"ASTRA_DATABASE"));
}

@Test
public void test() throws Exception {
installLangStreamCluster(false);
final String tenant = "ten-" + System.currentTimeMillis();
setupTenant(tenant);

final String applicationId = "app";

deployLocalApplication(applicationId, "webcrawler-to-vector", appEnv);
awaitApplicationReady(applicationId, 3);

final String sessionId = UUID.randomUUID().toString();

executeCommandOnClient(
"bin/langstream gateway produce %s user-input -v 'When was released LangStream 0.0.20? Write it in format yyyy-dd-mm.' -p sessionId=%s"
.formatted(applicationId, sessionId)
.split(" "));

boolean ok = false;
for (int i = 0; i < 10; i++) {
final ConsumeGatewayMessage message =
consumeOneMessageFromGateway(
applicationId,
"llm-debug",
"-p sessionId=%s --connect-timeout 30".formatted(sessionId).split(" "));
if (message == null) {
Thread.sleep(5000);
continue;
}

log.info("Output: {}", message);
final String asString = (String) message.getRecord().getValue();
final String answer = (String) JSON_MAPPER.readValue(asString, Map.class).get("answer");
log.info("Answer: {}", answer);
if (answer.contains("2023-09-19")) {
ok = true;
break;
}
Thread.sleep(5000);
}
if (!ok) {
Assertions.fail(
"the chatbot did not answer correctly, maybe the crawler didn't finished yet?");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

topics:
- name: "questions-topic"
creation-mode: create-if-not-exists
- name: "answers-topic"
creation-mode: create-if-not-exists
- name: "log-topic"
creation-mode: create-if-not-exists
errors:
on-failure: "skip"
resources:
size: 2
pipeline:
- name: "convert-to-structure"
type: "document-to-json"
input: "questions-topic"
configuration:
text-field: "question"
- name: "compute-embeddings"
type: "compute-ai-embeddings"
configuration:
model: "{{{secrets.open-ai.embeddings-model}}}"
embeddings-field: "value.question_embeddings"
text: "{{% value.question }}"
flush-interval: 0
- name: "lookup-related-documents-in-llm"
type: "query"
configuration:
datasource: "AstraDatasource"
query: "SELECT text FROM langstreamtest.documents ORDER BY embeddings_vector ANN OF ? LIMIT 5"
fields:
- "value.question_embeddings"
output-field: "value.related_documents"
- name: "ai-chat-completions"
type: "ai-chat-completions"

configuration:
model: "{{{secrets.open-ai.chat-completions-model}}}" # This needs to be set to the model deployment name, not the base name
# on the log-topic we add a field with the answer
completion-field: "value.answer"
# we are also logging the prompt we sent to the LLM
log-field: "value.prompt"
# here we configure the streaming behavior
# as soon as the LLM answers with a chunk we send it to the answers-topic
stream-to-topic: "answers-topic"
# on the streaming answer we send the answer as whole message
# the 'value' syntax is used to refer to the whole value of the message
stream-response-completion-field: "value"
# we want to stream the answer as soon as we have 20 chunks
# in order to reduce latency for the first message the agent sends the first message
# with 1 chunk, then with 2 chunks....up to the min-chunks-per-message value
# eventually we want to send bigger messages to reduce the overhead of each message on the topic
min-chunks-per-message: 20
messages:
- role: system
content: |
An user is going to perform a questions, he documents below may help you in answering to their questions.
Please try to leverage them in your answer as much as possible.
Take into consideration that the user is always asking questions about the LangStream project.
If you provide code or YAML snippets, please explicitly state that they are examples.
Do not provide information that is not related to the LangStream project.
Documents:
{{%# value.related_documents}}
{{% text}}
{{%/ value.related_documents}}
- role: user
content: "{{% value.question}}"
- name: "cleanup-response"
type: "drop-fields"
output: "log-topic"
configuration:
fields:
- "question_embeddings"
- "related_documents"
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

configuration:
resources:
- type: "open-ai-configuration"
name: "OpenAI Azure configuration"
configuration:
url: "{{ secrets.open-ai.url }}"
access-key: "{{ secrets.open-ai.access-key }}"
provider: "{{ secrets.open-ai.provider }}"
- type: "datasource"
name: "AstraDatasource"
configuration:
service: "astra"
clientId: "{{{ secrets.astra.clientId }}}"
secret: "{{{ secrets.astra.secret }}}"
secureBundle: "{{{ secrets.astra.secureBundle }}}"
database: "{{{ secrets.astra.database }}}"
token: "{{{ secrets.astra.token }}}"
environment: "{{{ secrets.astra.environment }}}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name: "Crawl a website"
topics:
- name: "chunks-topic"
creation-mode: create-if-not-exists
resources:
size: 2
pipeline:
- name: "Crawl the WebSite"
type: "webcrawler-source"
configuration:
seed-urls: ["https://langstream.ai/changelog/"]
allowed-domains: ["https://langstream.ai"]
forbidden-paths: []
min-time-between-requests: 500
reindex-interval-seconds: 3600
max-error-count: 5
max-urls: 1
max-depth: 1
handle-robots-file: true
user-agent: ""
scan-html-documents: true
http-timeout: 10000
handle-cookies: true
max-unflushed-pages: 100
bucketName: "langstream-test-crawler-to-vector"
endpoint: http://minio.minio-dev.svc.cluster.local:9000
access-key: minioadmin
secret-key: minioadmin
- name: "Extract text"
type: "text-extractor"
- name: "Normalise text"
type: "text-normaliser"
configuration:
make-lowercase: true
trim-spaces: true
- name: "Detect language"
type: "language-detector"
configuration:
allowedLanguages: ["en"]
property: "language"
- name: "Split into chunks"
type: "text-splitter"
configuration:
splitter_type: "RecursiveCharacterTextSplitter"
chunk_size: 400
separators: ["\n\n", "\n", " ", ""]
keep_separator: false
chunk_overlap: 100
length_function: "cl100k_base"
- name: "Convert to structured data"
type: "document-to-json"
configuration:
text-field: text
copy-properties: true
- name: "prepare-structure"
type: "compute"
configuration:
fields:
- name: "value.filename"
expression: "properties.url"
type: STRING
- name: "value.chunk_id"
expression: "properties.chunk_id"
type: STRING
- name: "value.language"
expression: "properties.language"
type: STRING
- name: "value.chunk_num_tokens"
expression: "properties.chunk_num_tokens"
type: STRING
- name: "compute-embeddings"
id: "step1"
type: "compute-ai-embeddings"
output: "chunks-topic"
configuration:
model: "{{{secrets.open-ai.embeddings-model}}}"
embeddings-field: "value.embeddings_vector"
text: "{{% value.text }}"
batch-size: 10
flush-interval: 500
Loading

0 comments on commit 9bba4f0

Please sign in to comment.