From b03f6d51ac9d22d4180f269f6da555f2c3fac138 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 9 Nov 2023 17:47:55 +0100 Subject: [PATCH] [completions and embeddings] Add support for Ollama (local LLMs) (#711) --- .../applications/ollama-chatbot/.gitignore | 1 + .../applications/ollama-chatbot/README.md | 39 +++ .../applications/ollama-chatbot/assets.yaml | 33 ++ .../applications/ollama-chatbot/chatbot.yaml | 102 ++++++ .../ollama-chatbot/configuration.yaml | 36 ++ .../applications/ollama-chatbot/crawler.yaml | 115 ++++++ .../applications/ollama-chatbot/gateways.yaml | 43 +++ examples/secrets/secrets.yaml | 5 + .../agents/services/impl/OllamaProvider.java | 330 ++++++++++++++++++ .../ai/model/config/OllamaConfig.java | 26 ++ .../ai/model/config/TransformStepConfig.java | 2 + ...ai.agents.services.ServiceProviderProvider | 3 +- .../services/impl/OllamaProviderTest.java | 120 +++++++ .../MermaidAppDiagramGenerator.java | 5 +- .../ai/GenAIToolKitFunctionAgentProvider.java | 10 +- .../AIProvidersResourceProvider.java | 22 +- ...GenAIToolKitFunctionAgentProviderTest.java | 2 +- 17 files changed, 888 insertions(+), 6 deletions(-) create mode 100644 examples/applications/ollama-chatbot/.gitignore create mode 100644 examples/applications/ollama-chatbot/README.md create mode 100644 examples/applications/ollama-chatbot/assets.yaml create mode 100644 examples/applications/ollama-chatbot/chatbot.yaml create mode 100644 examples/applications/ollama-chatbot/configuration.yaml create mode 100644 examples/applications/ollama-chatbot/crawler.yaml create mode 100644 examples/applications/ollama-chatbot/gateways.yaml create mode 100644 langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/OllamaProvider.java create mode 100644 langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/OllamaConfig.java create mode 100644 langstream-agents/langstream-ai-agents/src/test/java/ai/langstream/ai/agents/services/impl/OllamaProviderTest.java diff --git a/examples/applications/ollama-chatbot/.gitignore b/examples/applications/ollama-chatbot/.gitignore new file mode 100644 index 000000000..55dea2dd3 --- /dev/null +++ b/examples/applications/ollama-chatbot/.gitignore @@ -0,0 +1 @@ +java/lib/* \ No newline at end of file diff --git a/examples/applications/ollama-chatbot/README.md b/examples/applications/ollama-chatbot/README.md new file mode 100644 index 000000000..5eb5770cb --- /dev/null +++ b/examples/applications/ollama-chatbot/README.md @@ -0,0 +1,39 @@ +# Running your own Chat bot using Ollama.ai + +This sample application shows how to build a chat bot over the content of a website. +In this case you are going to crawl the LangStream.ai documentation website. + +The Chat bot will be able to help you with LangStream. + +In this example we are using [HerdDB](ps://github.com/diennea/herddb) as a vector database using the JDBC driver, +but you can use any Vector databases. + +As LLM we are using [Ollama](https://ollama.ai), that is a service that runs on your machine. + + +## Install Ollama + + +Follow the instructions on the Ollama.ai website to install Ollama. + +Then start Ollama with the llama2 model + +``` +ollama run llama2 +``` + + +## Deploy the LangStream application in docker + +The default docker runner starts Minio, Kafka and HerdDB, so you can run the application locally. + +``` +./bin/langstream docker run test -app examples/applications/ollama-chatbot -s examples/secrets/secrets.yaml +``` + +## Talk with the Chat bot using the CLI +Since the application opens a gateway, we can use the gateway API to send and consume messages. + +``` +./bin/langstream gateway chat test -cg bot-output -pg user-input -p sessionId=$(uuidgen) +``` diff --git a/examples/applications/ollama-chatbot/assets.yaml b/examples/applications/ollama-chatbot/assets.yaml new file mode 100644 index 000000000..7162d6e12 --- /dev/null +++ b/examples/applications/ollama-chatbot/assets.yaml @@ -0,0 +1,33 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +assets: + - name: "documents-table" + asset-type: "jdbc-table" + creation-mode: create-if-not-exists + config: + table-name: "documents" + datasource: "JdbcDatasource" + create-statements: + - | + CREATE TABLE documents ( + filename TEXT, + chunk_id int, + num_tokens int, + lang TEXT, + text TEXT, + embeddings_vector FLOATA, + PRIMARY KEY (filename, chunk_id)); \ No newline at end of file diff --git a/examples/applications/ollama-chatbot/chatbot.yaml b/examples/applications/ollama-chatbot/chatbot.yaml new file mode 100644 index 000000000..2904a05ff --- /dev/null +++ b/examples/applications/ollama-chatbot/chatbot.yaml @@ -0,0 +1,102 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +topics: + - name: "questions-topic" + creation-mode: create-if-not-exists + - name: "answers-topic" + creation-mode: create-if-not-exists + - name: "log-topic" + creation-mode: create-if-not-exists +errors: + on-failure: "skip" +pipeline: + - name: "convert-to-structure" + type: "document-to-json" + input: "questions-topic" + configuration: + text-field: "question" + - name: "compute-embeddings" + type: "compute-ai-embeddings" + configuration: + model: "${secrets.ollama.model}" + embeddings-field: "value.question_embeddings" + text: "{{ value.question }}" + flush-interval: 0 + - name: "lookup-related-documents" + type: "query-vector-db" + configuration: + datasource: "JdbcDatasource" + query: "SELECT text,embeddings_vector FROM documents ORDER BY cosine_similarity(embeddings_vector, CAST(? as FLOAT ARRAY)) DESC LIMIT 20" + fields: + - "value.question_embeddings" + output-field: "value.related_documents" + - name: "re-rank documents with MMR" + type: "re-rank" + configuration: + max: 5 # keep only the top 5 documents, because we have an hard limit on the prompt size + field: "value.related_documents" + query-text: "value.question" + query-embeddings: "value.question_embeddings" + output-field: "value.related_documents" + text-field: "record.text" + embeddings-field: "record.embeddings_vector" + algorithm: "MMR" + lambda: 0.5 + k1: 1.2 + b: 0.75 + - name: "ai-chat-completions" + type: "ai-chat-completions" + + configuration: + model: "${secrets.ollama.model}" + # on the log-topic we add a field with the answer + completion-field: "value.answer" + # we are also logging the prompt we sent to the LLM + log-field: "value.prompt" + # here we configure the streaming behavior + # as soon as the LLM answers with a chunk we send it to the answers-topic + stream-to-topic: "answers-topic" + # on the streaming answer we send the answer as whole message + # the 'value' syntax is used to refer to the whole value of the message + stream-response-completion-field: "value" + # we want to stream the answer as soon as we have 20 chunks + # in order to reduce latency for the first message the agent sends the first message + # with 1 chunk, then with 2 chunks....up to the min-chunks-per-message value + # eventually we want to send bigger messages to reduce the overhead of each message on the topic + min-chunks-per-message: 20 + messages: + - role: system + content: | + An user is going to perform a questions, The documents below may help you in answering to their questions. + Please try to leverage them in your answer as much as possible. + Take into consideration that the user is always asking questions about the LangStream project. + If you provide code or YAML snippets, please explicitly state that they are examples. + Do not provide information that is not related to the LangStream project. + + Documents: + {{# value.related_documents}} + {{ text}} + {{/ value.related_documents}} + - role: user + content: "{{ value.question}}" + - name: "cleanup-response" + type: "drop-fields" + output: "log-topic" + configuration: + fields: + - "question_embeddings" + - "related_documents" \ No newline at end of file diff --git a/examples/applications/ollama-chatbot/configuration.yaml b/examples/applications/ollama-chatbot/configuration.yaml new file mode 100644 index 000000000..fa8401f4e --- /dev/null +++ b/examples/applications/ollama-chatbot/configuration.yaml @@ -0,0 +1,36 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +configuration: + resources: + - type: "datasource" + name: "JdbcDatasource" + configuration: + service: "jdbc" + driverClass: "herddb.jdbc.Driver" + url: "${secrets.herddb.url}" + user: "${secrets.herddb.user}" + password: "${secrets.herddb.password}" + - type: "ollama-configuration" + name: "ollama" + configuration: + url: "${secrets.ollama.url}" + dependencies: + - name: "HerdDB.org JDBC Driver" + url: "https://repo1.maven.org/maven2/org/herddb/herddb-jdbc/0.28.0/herddb-jdbc-0.28.0-thin.jar" + sha512sum: "d8ea8fbb12eada8f860ed660cbc63d66659ab3506bc165c85c420889aa8a1dac53dab7906ef61c4415a038c5a034f0d75900543dd0013bdae50feafd46f51c8e" + type: "java-library" \ No newline at end of file diff --git a/examples/applications/ollama-chatbot/crawler.yaml b/examples/applications/ollama-chatbot/crawler.yaml new file mode 100644 index 000000000..3630385dc --- /dev/null +++ b/examples/applications/ollama-chatbot/crawler.yaml @@ -0,0 +1,115 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name: "Crawl a website" +resources: + size: 2 +pipeline: + - name: "Crawl the WebSite" + type: "webcrawler-source" + configuration: + seed-urls: ["https://docs.langstream.ai/"] + allowed-domains: ["https://docs.langstream.ai"] + forbidden-paths: [] + min-time-between-requests: 500 + max-error-count: 5 + max-urls: 1000 + max-depth: 50 + handle-robots-file: true + scan-html-documents: true + state-storage: disk + - name: "Extract text" + type: "text-extractor" + - name: "Normalise text" + type: "text-normaliser" + configuration: + make-lowercase: true + trim-spaces: true + - name: "Detect language" + type: "language-detector" + configuration: + allowedLanguages: ["en", "fr"] + property: "language" + - name: "Split into chunks" + type: "text-splitter" + configuration: + splitter_type: "RecursiveCharacterTextSplitter" + chunk_size: 400 + separators: ["\n\n", "\n", " ", ""] + keep_separator: false + chunk_overlap: 100 + length_function: "cl100k_base" + - name: "Convert to structured data" + type: "document-to-json" + configuration: + text-field: text + copy-properties: true + - name: "prepare-structure" + type: "compute" + configuration: + fields: + - name: "value.filename" + expression: "properties.url" + type: STRING + - name: "value.chunk_id" + expression: "properties.chunk_id" + type: STRING + - name: "value.language" + expression: "properties.language" + type: STRING + - name: "value.chunk_num_tokens" + expression: "properties.chunk_num_tokens" + type: STRING + - name: "compute-embeddings" + id: "step1" + type: "compute-ai-embeddings" + configuration: + model: "${secrets.ollama.model}" + embeddings-field: "value.embeddings_vector" + text: "{{ value.text }}" + batch-size: 10 + flush-interval: 500 + - name: "Delete stale chunks" + type: "query" + configuration: + datasource: "JdbcDatasource" + when: "fn:toInt(properties.text_num_chunks) == (fn:toInt(properties.chunk_id) + 1)" + mode: "execute" + query: "DELETE FROM documents WHERE filename = ? AND chunk_id > ?" + output-field: "value.delete-results" + fields: + - "value.filename" + - "fn:toInt(value.chunk_id)" + - name: "Write" + type: "vector-db-sink" + configuration: + datasource: "JdbcDatasource" + table-name: "documents" + fields: + - name: "filename" + expression: "value.filename" + primary-key: true + - name: "chunk_id" + expression: "value.chunk_id" + primary-key: true + - name: "embeddings_vector" + expression: "fn:toListOfFloat(value.embeddings_vector)" + - name: "lang" + expression: "value.language" + - name: "text" + expression: "value.text" + - name: "num_tokens" + expression: "value.chunk_num_tokens" \ No newline at end of file diff --git a/examples/applications/ollama-chatbot/gateways.yaml b/examples/applications/ollama-chatbot/gateways.yaml new file mode 100644 index 000000000..132788270 --- /dev/null +++ b/examples/applications/ollama-chatbot/gateways.yaml @@ -0,0 +1,43 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +gateways: + - id: "user-input" + type: produce + topic: "questions-topic" + parameters: + - sessionId + produceOptions: + headers: + - key: langstream-client-session-id + valueFromParameters: sessionId + + - id: "bot-output" + type: consume + topic: "answers-topic" + parameters: + - sessionId + consumeOptions: + filters: + headers: + - key: langstream-client-session-id + valueFromParameters: sessionId + + + - id: "llm-debug" + type: consume + topic: "log-topic" \ No newline at end of file diff --git a/examples/secrets/secrets.yaml b/examples/secrets/secrets.yaml index f93be05eb..cc9de5fb9 100644 --- a/examples/secrets/secrets.yaml +++ b/examples/secrets/secrets.yaml @@ -144,6 +144,11 @@ secrets: secret-key: "${BEDROCK_SECRET_KEY}" region: "${REGION:-us-east-1}" completions-model: "${BEDROCK_COMPLETIONS_MODEL}" + - name: ollama + id: ollama + data: + url: "${OLLAMA_URL:-http://host.docker.internal:11434}" + model: "${OLLAMA_MODEL:-mistral}" - name: camel-github-source id: camel-github-source data: diff --git a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/OllamaProvider.java b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/OllamaProvider.java new file mode 100644 index 000000000..588f01c38 --- /dev/null +++ b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/OllamaProvider.java @@ -0,0 +1,330 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.ai.agents.services.impl; + +import ai.langstream.ai.agents.services.ServiceProviderProvider; +import ai.langstream.api.runner.code.MetricsReporter; +import ai.langstream.api.util.ConfigurationUtils; +import com.datastax.oss.streaming.ai.completions.ChatChoice; +import com.datastax.oss.streaming.ai.completions.ChatCompletions; +import com.datastax.oss.streaming.ai.completions.ChatMessage; +import com.datastax.oss.streaming.ai.completions.CompletionsService; +import com.datastax.oss.streaming.ai.completions.TextCompletionResult; +import com.datastax.oss.streaming.ai.embeddings.EmbeddingsService; +import com.datastax.oss.streaming.ai.services.ServiceProvider; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.StringWriter; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class OllamaProvider implements ServiceProviderProvider { + + static ObjectMapper mapper = + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + private static final ObjectMapper MAPPER = + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + @Override + public boolean supports(Map agentConfiguration) { + return agentConfiguration.containsKey("ollama"); + } + + @Override + public ServiceProvider createImplementation( + Map agentConfiguration, MetricsReporter metricsReporter) { + + Map config = (Map) agentConfiguration.get("ollama"); + String url = (String) config.get("url"); + return new OllamaServiceProvider(url); + } + + private static class OllamaServiceProvider implements ServiceProvider { + + final HttpClient httpClient; + private final String url; + + @SneakyThrows + public OllamaServiceProvider(String url) { + this.url = url; + this.httpClient = HttpClient.newHttpClient(); + } + + @Override + public CompletionsService getCompletionsService(Map map) throws Exception { + String model = (String) map.get("model"); + if (model == null) { + throw new IllegalArgumentException("'model' is required for completions service"); + } + int minChunksPerMessage = ConfigurationUtils.getInt("min-chunks-per-message", 20, map); + return new OllamaCompletionsService(model, minChunksPerMessage); + } + + @Override + public EmbeddingsService getEmbeddingsService(Map map) throws Exception { + String model = (String) map.get("model"); + if (model == null) { + throw new IllegalArgumentException("'model' is required for embedding service"); + } + return new OllamaEmbeddingsService(model); + } + + private class OllamaEmbeddingsService implements EmbeddingsService { + private final String model; + + public OllamaEmbeddingsService(String model) { + this.model = model; + } + + @Override + @SneakyThrows + public CompletableFuture>> computeEmbeddings(List list) { + List>> futures = new ArrayList<>(); + for (String text : list) { + futures.add(computeEmbeddings(text)); + } + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .thenApply( + ___ -> + futures.stream() + .map(CompletableFuture::join) + .collect(Collectors.toList())); + } + + record EmbeddingResponse(List embedding, String error) {} + + private CompletableFuture> computeEmbeddings(String prompt) { + String request; + try { + request = MAPPER.writeValueAsString(new Request(model, prompt)); + final HttpRequest.BodyPublisher bodyPublisher = + HttpRequest.BodyPublishers.ofString(request); + + final HttpRequest.Builder requestBuilder = + HttpRequest.newBuilder() + .uri(new URI(url + "/api/embeddings")) + .version(HttpClient.Version.HTTP_1_1) + .method("POST", bodyPublisher); + final HttpRequest httpRequest = requestBuilder.build(); + + return httpClient + .sendAsync(httpRequest, HttpResponse.BodyHandlers.ofString()) + .thenApply( + response -> { + if (response.statusCode() != 200) { + throw new RuntimeException( + "HTTP Error: " + response.statusCode()); + } + try { + String body = response.body(); + EmbeddingResponse embeddings = + mapper.readValue(body, EmbeddingResponse.class); + if (embeddings.error != null) { + throw new RuntimeException(embeddings.error); + } + return embeddings.embedding(); + } catch (JsonProcessingException error) { + throw new CompletionException(error); + } + }); + } catch (Throwable error) { + log.error("IO Error while calling Ollama", error); + return CompletableFuture.failedFuture(error); + } + } + } + + private class StreamResponseProcessor extends CompletableFuture + implements Flow.Subscriber { + + Flow.Subscription subscription; + + private final StringWriter totalAnswer = new StringWriter(); + + private final StringWriter writer = new StringWriter(); + private final AtomicInteger numberOfChunks = new AtomicInteger(); + private final int minChunksPerMessage; + + private final AtomicInteger currentChunkSize = new AtomicInteger(1); + private final AtomicInteger index = new AtomicInteger(); + + private final CompletionsService.StreamingChunksConsumer streamingChunksConsumer; + + private final String answerId = java.util.UUID.randomUUID().toString(); + + public StreamResponseProcessor( + int minChunksPerMessage, + CompletionsService.StreamingChunksConsumer streamingChunksConsumer) { + this.minChunksPerMessage = minChunksPerMessage; + this.streamingChunksConsumer = streamingChunksConsumer; + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + subscription.request(1); + } + + record ResponseLine( + String model, String create_at, String response, boolean done, String error) {} + + @Override + @SneakyThrows + public synchronized void onNext(String body) { + ResponseLine responseLine = mapper.readValue(body, ResponseLine.class); + String content = responseLine.response(); + boolean last = responseLine.done(); + + if (responseLine.error != null) { + log.error("Error: {}", responseLine.error); + this.completeExceptionally(new RuntimeException(responseLine.error)); + return; + } + + if (content != null && !content.isEmpty()) { + writer.write(content); + totalAnswer.write(content); + numberOfChunks.incrementAndGet(); + } + + // start from 1 chunk, then double the size until we reach the minChunksPerMessage + // this gives better latencies for the first message + int currentMinChunksPerMessage = currentChunkSize.get(); + + if (numberOfChunks.get() >= currentMinChunksPerMessage || last) { + currentChunkSize.set( + Math.min(currentMinChunksPerMessage * 2, minChunksPerMessage)); + streamingChunksConsumer.consumeChunk( + answerId, + index.incrementAndGet(), + new ChatChoice(new ChatMessage("system", writer.toString())), + last); + writer.getBuffer().setLength(0); + numberOfChunks.set(0); + } + if (last) { + this.complete(buildTotalAnswerMessage()); + } + + subscription.request(1); + } + + @Override + public void onError(Throwable error) { + log.error("IO Error while calling Ollama", error); + this.completeExceptionally(error); + } + + @Override + public void onComplete() { + if (!this.isDone()) { + this.complete(buildTotalAnswerMessage()); + } + } + + public String buildTotalAnswerMessage() { + return totalAnswer.toString(); + } + } + + record Request(String model, String prompt) {} + + @Override + public void close() {} + + private class OllamaCompletionsService implements CompletionsService { + private final String model; + private final int minChunksPerMessage; + + public OllamaCompletionsService(String model, int minChunksPerMessage) { + this.model = model; + this.minChunksPerMessage = minChunksPerMessage; + } + + @Override + public CompletableFuture getChatCompletions( + List list, + StreamingChunksConsumer streamingChunksConsumer, + Map additionalConfiguration) { + + String prompt = + list.stream().map(c -> c.getContent()).collect(Collectors.joining("\"")); + + String request; + try { + request = MAPPER.writeValueAsString(new Request(model, prompt)); + final HttpRequest.BodyPublisher bodyPublisher = + HttpRequest.BodyPublishers.ofString(request); + + final HttpRequest.Builder requestBuilder = + HttpRequest.newBuilder() + .uri(new URI(url + "/api/generate")) + .version(HttpClient.Version.HTTP_1_1) + .method("POST", bodyPublisher); + final HttpRequest httpRequest = requestBuilder.build(); + + StreamResponseProcessor streamResponseProcessor = + new StreamResponseProcessor( + minChunksPerMessage, streamingChunksConsumer); + httpClient.sendAsync( + httpRequest, + HttpResponse.BodyHandlers.fromLineSubscriber(streamResponseProcessor)); + + return streamResponseProcessor.thenApply( + s -> { + ChatCompletions result = new ChatCompletions(); + result.setChoices( + List.of(new ChatChoice(new ChatMessage("system", s)))); + return result; + }); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + } + + @Override + public CompletableFuture getTextCompletions( + List prompt, + StreamingChunksConsumer streamingChunksConsumer, + Map options) { + return getChatCompletions( + prompt.stream().map(p -> new ChatMessage(null, p)).toList(), + streamingChunksConsumer, + options) + .thenApply( + c -> { + return new TextCompletionResult( + c.getChoices().get(0).content(), null); + }); + } + } + } +} diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/OllamaConfig.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/OllamaConfig.java new file mode 100644 index 000000000..d180e7b5a --- /dev/null +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/OllamaConfig.java @@ -0,0 +1,26 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.streaming.ai.model.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Getter; + +@Getter +public class OllamaConfig { + + @JsonProperty(value = "url") + private String url; +} diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/TransformStepConfig.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/TransformStepConfig.java index 13392ead5..88e3befbd 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/TransformStepConfig.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/TransformStepConfig.java @@ -31,6 +31,8 @@ public class TransformStepConfig { @JsonProperty private BedrockConfig bedrock; + @JsonProperty private OllamaConfig ollama; + @JsonProperty private Map datasource; @JsonProperty private boolean attemptJsonConversion = true; diff --git a/langstream-agents/langstream-ai-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.services.ServiceProviderProvider b/langstream-agents/langstream-ai-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.services.ServiceProviderProvider index a455c9da4..bdde5f0e7 100644 --- a/langstream-agents/langstream-ai-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.services.ServiceProviderProvider +++ b/langstream-agents/langstream-ai-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.services.ServiceProviderProvider @@ -1,4 +1,5 @@ ai.langstream.ai.agents.services.impl.OpenAIServiceProvider ai.langstream.ai.agents.services.impl.VertexAIProvider ai.langstream.ai.agents.services.impl.HuggingFaceProvider -ai.langstream.ai.agents.services.impl.BedrockServiceProvider \ No newline at end of file +ai.langstream.ai.agents.services.impl.BedrockServiceProvider +ai.langstream.ai.agents.services.impl.OllamaProvider \ No newline at end of file diff --git a/langstream-agents/langstream-ai-agents/src/test/java/ai/langstream/ai/agents/services/impl/OllamaProviderTest.java b/langstream-agents/langstream-ai-agents/src/test/java/ai/langstream/ai/agents/services/impl/OllamaProviderTest.java new file mode 100644 index 000000000..d63e67073 --- /dev/null +++ b/langstream-agents/langstream-ai-agents/src/test/java/ai/langstream/ai/agents/services/impl/OllamaProviderTest.java @@ -0,0 +1,120 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.ai.agents.services.impl; + +import static com.github.tomakehurst.wiremock.client.WireMock.ok; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import ai.langstream.api.runner.code.MetricsReporter; +import com.datastax.oss.streaming.ai.completions.ChatMessage; +import com.datastax.oss.streaming.ai.completions.Chunk; +import com.datastax.oss.streaming.ai.completions.CompletionsService; +import com.datastax.oss.streaming.ai.embeddings.EmbeddingsService; +import com.datastax.oss.streaming.ai.services.ServiceProvider; +import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; +import com.github.tomakehurst.wiremock.junit5.WireMockTest; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; + +@Slf4j +@WireMockTest +class OllamaProviderTest { + + @Test + void testCompletions(WireMockRuntimeInfo wmRuntimeInfo) throws Exception { + + stubFor( + post("/api/generate") + .willReturn( + ok( + """ + {"model":"llama2","created_at":"2023-11-09T13:48:51.788062Z","response":"one","done":false} + {"model":"llama2","created_at":"2023-11-09T13:48:51.788062Z","response":" two","done":false} + {"model":"llama2","created_at":"2023-11-09T13:48:51.788062Z","response":" three","done":true} + """))); + OllamaProvider provider = new OllamaProvider(); + ServiceProvider implementation = + provider.createImplementation( + Map.of("ollama", Map.of("url", wmRuntimeInfo.getHttpBaseUrl())), + MetricsReporter.DISABLED); + + List chunks = new CopyOnWriteArrayList<>(); + CompletionsService service = + implementation.getCompletionsService(Map.of("model", "llama2")); + String result = + service.getChatCompletions( + List.of( + new ChatMessage("user") + .setContent("Tell me three numberss")), + new CompletionsService.StreamingChunksConsumer() { + @Override + public void consumeChunk( + String answerId, int index, Chunk chunk, boolean last) { + log.info( + "answerId: {}, index: {}, chunk: {}, last: {}", + answerId, + index, + chunk, + last); + chunks.add(chunk.content()); + } + }, + Map.of()) + .get() + .getChoices() + .get(0) + .content(); + log.info("result: {}", result); + assertEquals("one two three", result); + assertEquals(List.of("one", " two three"), chunks); + } + + @Test + void testEmbeddings(WireMockRuntimeInfo wmRuntimeInfo) throws Exception { + stubFor( + post("/api/embeddings") + .willReturn( + ok( + """ + {"embedding":[-0.9004754424095154,1.2847540378570557,1.1102418899536133,-0.18884147703647614]} + + """))); + + OllamaProvider provider = new OllamaProvider(); + ServiceProvider implementation = + provider.createImplementation( + Map.of("ollama", Map.of("url", wmRuntimeInfo.getHttpBaseUrl())), + MetricsReporter.DISABLED); + + EmbeddingsService service = implementation.getEmbeddingsService(Map.of("model", "llama2")); + + List> result = service.computeEmbeddings(List.of("test")).get(); + log.info("result: {}", result); + assertEquals( + List.of( + List.of( + -0.9004754424095154, + 1.2847540378570557, + 1.1102418899536133, + -0.18884147703647614)), + result); + } +} diff --git a/langstream-cli/src/main/java/ai/langstream/cli/commands/applications/MermaidAppDiagramGenerator.java b/langstream-cli/src/main/java/ai/langstream/cli/commands/applications/MermaidAppDiagramGenerator.java index 2a6c5fc83..67868708f 100644 --- a/langstream-cli/src/main/java/ai/langstream/cli/commands/applications/MermaidAppDiagramGenerator.java +++ b/langstream-cli/src/main/java/ai/langstream/cli/commands/applications/MermaidAppDiagramGenerator.java @@ -348,8 +348,9 @@ private static Agent parseAgent(ApplicationModel model, Map agentMap) { .equals( "hugging-face-configuration") || r.getOriginalType() - .equals( - "bedrock-configuration")) + .equals("bedrock-configuration") + || r.getOriginalType() + .equals("ollama-configuration")) .findFirst() .orElse(null); if (resource != null) { diff --git a/langstream-core/src/main/java/ai/langstream/impl/agents/ai/GenAIToolKitFunctionAgentProvider.java b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/GenAIToolKitFunctionAgentProvider.java index 7c88739b8..71f0fce2c 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/agents/ai/GenAIToolKitFunctionAgentProvider.java +++ b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/GenAIToolKitFunctionAgentProvider.java @@ -55,8 +55,15 @@ public class GenAIToolKitFunctionAgentProvider extends AbstractAgentProvider { protected static final String SERVICE_HUGGING_FACE = "hugging-face-configuration"; protected static final String SERVICE_OPEN_AI = "open-ai-configuration"; protected static final String SERVICE_BEDROCK = "bedrock-configuration"; + protected static final String SERVICE_OLLAMA = "ollama-configuration"; + protected static final List AI_SERVICES = - List.of(SERVICE_VERTEX, SERVICE_HUGGING_FACE, SERVICE_OPEN_AI, SERVICE_BEDROCK); + List.of( + SERVICE_VERTEX, + SERVICE_HUGGING_FACE, + SERVICE_OPEN_AI, + SERVICE_BEDROCK, + SERVICE_OLLAMA); static { final Map steps = new HashMap<>(); @@ -239,6 +246,7 @@ private static String getConfigKey(String type) { case SERVICE_HUGGING_FACE -> "huggingface"; case SERVICE_OPEN_AI -> "openai"; case SERVICE_BEDROCK -> "bedrock"; + case SERVICE_OLLAMA -> "ollama"; default -> null; }; } diff --git a/langstream-core/src/main/java/ai/langstream/impl/resources/AIProvidersResourceProvider.java b/langstream-core/src/main/java/ai/langstream/impl/resources/AIProvidersResourceProvider.java index 86d3ce584..7be72acd3 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/resources/AIProvidersResourceProvider.java +++ b/langstream-core/src/main/java/ai/langstream/impl/resources/AIProvidersResourceProvider.java @@ -36,12 +36,15 @@ public class AIProvidersResourceProvider extends AbstractResourceProvider { protected static final String HUGGING_FACE_CONFIGURATION = "hugging-face-configuration"; protected static final String VERTEX_CONFIGURATION = "vertex-configuration"; protected static final String BEDROCK_CONFIGURATION = "bedrock-configuration"; + + protected static final String OLLAMA_CONFIGURATION = "ollama-configuration"; private static final Set SUPPORTED_TYPES = Set.of( OPEN_AI_CONFIGURATION, HUGGING_FACE_CONFIGURATION, VERTEX_CONFIGURATION, - BEDROCK_CONFIGURATION); + BEDROCK_CONFIGURATION, + OLLAMA_CONFIGURATION); protected static final ObjectMapper MAPPER = new ObjectMapper(); public AIProvidersResourceProvider() { @@ -107,6 +110,9 @@ protected Class getResourceConfigModelClass(String type) { case BEDROCK_CONFIGURATION -> { return BedrockConfig.class; } + case OLLAMA_CONFIGURATION -> { + return OllamaConfig.class; + } default -> throw new IllegalStateException(); } } @@ -232,6 +238,20 @@ public enum Provider { private String accessKey; } + @Data + @ResourceConfig(name = "Ollama", description = "Connect to Ollama API.") + public static class OllamaConfig { + + @ConfigProperty( + description = + """ + URL for the Ollama service. + """, + required = true) + @JsonProperty("url") + private String url; + } + @Data @ResourceConfig(name = "AWS Bedrock", description = "Connect to AWS Bedrock API.") public static class BedrockConfig { diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KubernetesGenAIToolKitFunctionAgentProviderTest.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KubernetesGenAIToolKitFunctionAgentProviderTest.java index a7d2e4a20..d12b9f8d9 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KubernetesGenAIToolKitFunctionAgentProviderTest.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/k8s/agents/KubernetesGenAIToolKitFunctionAgentProviderTest.java @@ -89,7 +89,7 @@ public void testValidationAiChatCompletions() { - role: system content: "Hello" """, - "Found error on agent configuration (agent: 'chat', type: 'ai-chat-completions'). No ai service resource found in application configuration. One of vertex-configuration, hugging-face-configuration, open-ai-configuration, bedrock-configuration must be defined."); + "Found error on agent configuration (agent: 'chat', type: 'ai-chat-completions'). No ai service resource found in application configuration. One of vertex-configuration, hugging-face-configuration, open-ai-configuration, bedrock-configuration, ollama-configuration must be defined."); AgentValidationTestUtil.validate( """