From 4a532b7bd262618e87677e903aa5253000d5427f Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 15 Sep 2023 10:05:35 +0200 Subject: [PATCH 1/3] Upgrade OpenAI client --- dev/stop-port-forward.sh | 20 +++++++++++++++++++ .../langstream-ai-agents/pom.xml | 5 ----- .../impl/OpenAICompletionService.java | 6 +++--- .../ai/util/TransformFunctionUtil.java | 5 ++--- .../services/impl/OpenAIProviderTest.java | 2 +- .../functions/transforms/GenAITest.java | 2 +- .../langstream/kafka/ChatCompletionsIT.java | 2 +- pom.xml | 11 +++++++++- 8 files changed, 38 insertions(+), 15 deletions(-) create mode 100755 dev/stop-port-forward.sh diff --git a/dev/stop-port-forward.sh b/dev/stop-port-forward.sh new file mode 100755 index 000000000..79b131203 --- /dev/null +++ b/dev/stop-port-forward.sh @@ -0,0 +1,20 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Port forward LangStream control plane and Gateway +pkill -f "kubectl port-forward svc/langstream-control-plane 8090:8090" || true +pkill -f "kubectl port-forward svc/langstream-api-gateway 8091:8091" || true diff --git a/langstream-agents/langstream-ai-agents/pom.xml b/langstream-agents/langstream-ai-agents/pom.xml index 97750871e..8d4ece573 100644 --- a/langstream-agents/langstream-ai-agents/pom.xml +++ b/langstream-agents/langstream-ai-agents/pom.xml @@ -105,11 +105,6 @@ netty-common ${netty.version} - - io.netty - netty-handler - ${netty.version} - io.netty netty-handler-proxy diff --git a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/OpenAICompletionService.java b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/OpenAICompletionService.java index ff3fed242..5d3b73eb9 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/OpenAICompletionService.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/OpenAICompletionService.java @@ -59,9 +59,9 @@ public CompletableFuture getChatCompletions( .map( message -> new com.azure.ai.openai.models.ChatMessage( - ChatRole.fromString( - message.getRole())) - .setContent(message.getContent())) + ChatRole.fromString( + message.getRole()), + message.getContent())) .collect(Collectors.toList())) .setMaxTokens(getInteger("max-tokens", null, options)) .setTemperature(getDouble("temperature", null, options)) diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/util/TransformFunctionUtil.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/util/TransformFunctionUtil.java index 47b844b34..defebaf97 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/util/TransformFunctionUtil.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/util/TransformFunctionUtil.java @@ -17,8 +17,8 @@ import com.azure.ai.openai.OpenAIClient; import com.azure.ai.openai.OpenAIClientBuilder; -import com.azure.ai.openai.models.NonAzureOpenAIKeyCredential; import com.azure.core.credential.AzureKeyCredential; +import com.azure.core.credential.KeyCredential; import com.azure.core.http.HttpClient; import com.azure.core.http.HttpHeaders; import com.azure.core.http.HttpPipeline; @@ -107,8 +107,7 @@ public static OpenAIClient buildOpenAIClient(OpenAIConfig openAIConfig) { if (openAIConfig.getProvider() == OpenAIProvider.AZURE) { openAIClientBuilder.credential(new AzureKeyCredential(openAIConfig.getAccessKey())); } else { - openAIClientBuilder.credential( - new NonAzureOpenAIKeyCredential(openAIConfig.getAccessKey())); + openAIClientBuilder.credential(new KeyCredential(openAIConfig.getAccessKey())); } if (openAIConfig.getUrl() != null) { openAIClientBuilder.endpoint(openAIConfig.getUrl()); diff --git a/langstream-agents/langstream-ai-agents/src/test/java/ai/langstream/ai/agents/services/impl/OpenAIProviderTest.java b/langstream-agents/langstream-ai-agents/src/test/java/ai/langstream/ai/agents/services/impl/OpenAIProviderTest.java index c6a3c3769..834d7cff8 100644 --- a/langstream-agents/langstream-ai-agents/src/test/java/ai/langstream/ai/agents/services/impl/OpenAIProviderTest.java +++ b/langstream-agents/langstream-ai-agents/src/test/java/ai/langstream/ai/agents/services/impl/OpenAIProviderTest.java @@ -42,7 +42,7 @@ class OpenAIProviderTest { void testStreamingCompletion(WireMockRuntimeInfo vmRuntimeInfo) throws Exception { stubFor( - post("/openai/deployments/gpt-35-turbo/chat/completions?api-version=2023-03-15-preview") + post("/openai/deployments/gpt-35-turbo/chat/completions?api-version=2023-08-01-preview") .willReturn( okJson( """ diff --git a/langstream-agents/langstream-ai-agents/src/test/java/com/datastax/oss/pulsar/functions/transforms/GenAITest.java b/langstream-agents/langstream-ai-agents/src/test/java/com/datastax/oss/pulsar/functions/transforms/GenAITest.java index 7d03aa68d..46ce4265b 100644 --- a/langstream-agents/langstream-ai-agents/src/test/java/com/datastax/oss/pulsar/functions/transforms/GenAITest.java +++ b/langstream-agents/langstream-ai-agents/src/test/java/com/datastax/oss/pulsar/functions/transforms/GenAITest.java @@ -291,7 +291,7 @@ void testChatCompletionsWithLogField() throws Exception { assertEquals("result", valueAvroRecord.get("completion").toString()); assertEquals( valueAvroRecord.get("log").toString(), - "{\"options\":{\"max_tokens\":null,\"temperature\":null,\"top_p\":null,\"logit_bias\":null,\"user\":null,\"n\":null,\"stop\":null,\"presence_penalty\":null,\"frequency_penalty\":null,\"stream\":true,\"model\":\"test-model\",\"min-chunks-per-message\":20},\"messages\":[{\"role\":\"user\",\"content\":\"value1 key2\"}],\"model\":\"test-model\"}"); + "{\"options\":{\"max_tokens\":null,\"temperature\":null,\"top_p\":null,\"logit_bias\":null,\"user\":null,\"n\":null,\"stop\":null,\"presence_penalty\":null,\"frequency_penalty\":null,\"stream\":true,\"model\":\"test-model\",\"functions\":null,\"function_call\":null,\"dataSources\":null,\"min-chunks-per-message\":20},\"messages\":[{\"role\":\"user\",\"content\":\"value1 key2\"}],\"model\":\"test-model\"}"); } @Test diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ChatCompletionsIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ChatCompletionsIT.java index 7862f4d99..e5a51a364 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ChatCompletionsIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ChatCompletionsIT.java @@ -61,7 +61,7 @@ public void testChatCompletionWithStreaming() throws Exception { String model = "gpt-35-turbo"; stubFor( - post("/openai/deployments/gpt-35-turbo/chat/completions?api-version=2023-03-15-preview") + post("/openai/deployments/gpt-35-turbo/chat/completions?api-version=2023-08-01-preview") .willReturn( okJson( """ diff --git a/pom.xml b/pom.xml index 1ec7d902b..b02ad0528 100644 --- a/pom.xml +++ b/pom.xml @@ -35,7 +35,8 @@ 7.4.0 1.10.2 - 1.0.0-beta.2 + 1.0.0-beta.4 + 1.2.16 4.1.97.Final 2.0.61.Final @@ -98,6 +99,14 @@ ${netty.version} pom + + com.azure + azure-sdk-bom + ${azure-sdk-bom.version} + pom + import + + io.netty.incubator netty-incubator-transport-native-io_uring From 5bd55c12771010ed71255e3000ee64d42d0c5997 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 15 Sep 2023 12:11:47 +0200 Subject: [PATCH 2/3] Fix docker run --- .../ai/langstream/impl/deploy/ApplicationDeployer.java | 8 ++++++-- .../langstream/runtime/tester/LocalApplicationRunner.java | 4 ++++ .../src/main/resources/gateway.application.properties | 1 + 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/langstream-core/src/main/java/ai/langstream/impl/deploy/ApplicationDeployer.java b/langstream-core/src/main/java/ai/langstream/impl/deploy/ApplicationDeployer.java index ba83169bf..e3a731eb7 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/deploy/ApplicationDeployer.java +++ b/langstream-core/src/main/java/ai/langstream/impl/deploy/ApplicationDeployer.java @@ -60,10 +60,14 @@ public ExecutionPlan createImplementation( StreamingClusterRuntime streamingClusterRuntime = registry.getStreamingClusterRuntime( applicationInstance.getInstance().streamingCluster()); - log.info("Building execution plan for application {}", applicationInstance); + if (log.isDebugEnabled()) { + log.debug("Building execution plan for application {}", applicationInstance); + } final Application resolvedApplicationInstance = ApplicationPlaceholderResolver.resolvePlaceholders(applicationInstance); - log.info("After resolving the placeholders {}", resolvedApplicationInstance); + if (log.isDebugEnabled()) { + log.debug("After resolving the placeholders {}", resolvedApplicationInstance); + } return clusterRuntime.buildExecutionPlan( applicationId, resolvedApplicationInstance, diff --git a/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/LocalApplicationRunner.java b/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/LocalApplicationRunner.java index d94748dd8..e9f778d86 100644 --- a/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/LocalApplicationRunner.java +++ b/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/LocalApplicationRunner.java @@ -16,6 +16,7 @@ package ai.langstream.runtime.tester; import ai.langstream.api.model.Application; +import ai.langstream.api.runner.assets.AssetManagerRegistry; import ai.langstream.api.runner.topics.TopicConnectionsRuntimeRegistry; import ai.langstream.api.runtime.ClusterRuntimeRegistry; import ai.langstream.api.runtime.ExecutionPlan; @@ -75,11 +76,14 @@ public LocalApplicationRunner(Path agentsDirectory, Path codeDirectory) throws E new TopicConnectionsRuntimeRegistry(); narFileHandler.scan(); topicConnectionsRuntimeRegistry.setPackageLoader(narFileHandler); + AssetManagerRegistry assetManagerRegistry = new AssetManagerRegistry(); + assetManagerRegistry.setAssetManagerPackageLoader(narFileHandler); this.applicationDeployer = ApplicationDeployer.builder() .registry(new ClusterRuntimeRegistry()) .pluginsRegistry(new PluginsRegistry()) .topicConnectionsRuntimeRegistry(topicConnectionsRuntimeRegistry) + .assetManagerRegistry(assetManagerRegistry) .build(); } diff --git a/langstream-runtime/langstream-runtime-tester/src/main/resources/gateway.application.properties b/langstream-runtime/langstream-runtime-tester/src/main/resources/gateway.application.properties index 5d0d66874..b89e155cf 100644 --- a/langstream-runtime/langstream-runtime-tester/src/main/resources/gateway.application.properties +++ b/langstream-runtime/langstream-runtime-tester/src/main/resources/gateway.application.properties @@ -1,2 +1,3 @@ application.storage.apps.type=memory server.port=8091 +application.gateways.code.path=/app/agents From a4e1e1d74ff2c448e52a0dff5a5406df22a5ad9c Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 15 Sep 2023 14:08:31 +0200 Subject: [PATCH 3/3] Fix tests --- .../src/test/java/ai/langstream/kafka/ChatCompletionsIT.java | 2 +- .../test/java/ai/langstream/kafka/ComputeEmbeddingsIT.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ChatCompletionsIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ChatCompletionsIT.java index e5a51a364..7121b99b9 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ChatCompletionsIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ChatCompletionsIT.java @@ -182,7 +182,7 @@ tenant, appId, application, buildInstanceYaml(), expectedAgents)) { waitForMessages( consumer, List.of( - "{\"question\":\"the car\",\"session-id\":\"2139847128764192\",\"answer\":\"A car is a vehicle\",\"prompt\":\"{\\\"options\\\":{\\\"max_tokens\\\":null,\\\"temperature\\\":null,\\\"top_p\\\":null,\\\"logit_bias\\\":null,\\\"user\\\":null,\\\"n\\\":null,\\\"stop\\\":null,\\\"presence_penalty\\\":null,\\\"frequency_penalty\\\":null,\\\"stream\\\":true,\\\"model\\\":\\\"gpt-35-turbo\\\",\\\"min-chunks-per-message\\\":3},\\\"messages\\\":[{\\\"role\\\":\\\"user\\\",\\\"content\\\":\\\"What can you tell me about the car ?\\\"}],\\\"model\\\":\\\"gpt-35-turbo\\\"}\"}")); + "{\"question\":\"the car\",\"session-id\":\"2139847128764192\",\"answer\":\"A car is a vehicle\",\"prompt\":\"{\\\"options\\\":{\\\"max_tokens\\\":null,\\\"temperature\\\":null,\\\"top_p\\\":null,\\\"logit_bias\\\":null,\\\"user\\\":null,\\\"n\\\":null,\\\"stop\\\":null,\\\"presence_penalty\\\":null,\\\"frequency_penalty\\\":null,\\\"stream\\\":true,\\\"model\\\":\\\"gpt-35-turbo\\\",\\\"functions\\\":null,\\\"function_call\\\":null,\\\"dataSources\\\":null,\\\"min-chunks-per-message\\\":3},\\\"messages\\\":[{\\\"role\\\":\\\"user\\\",\\\"content\\\":\\\"What can you tell me about the car ?\\\"}],\\\"model\\\":\\\"gpt-35-turbo\\\"}\"}")); ConsumerRecord record = mainOutputRecords.get(0); assertNull(record.headers().lastHeader("stream-id")); diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ComputeEmbeddingsIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ComputeEmbeddingsIT.java index fd91422d1..5359e1016 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ComputeEmbeddingsIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ComputeEmbeddingsIT.java @@ -125,7 +125,7 @@ private static Stream providers() { .formatted(wireMockRuntimeInfo.getHttpBaseUrl()), () -> stubFor( - post("/openai/deployments/text-embedding-ada-002/embeddings?api-version=2023-03-15-preview") + post("/openai/deployments/text-embedding-ada-002/embeddings?api-version=2023-08-01-preview") .willReturn( okJson( """ @@ -278,7 +278,7 @@ public void testComputeBatchEmbeddings() throws Exception { wireMockRuntimeInfo.getWireMock().removeStubMapping(stubMapping); }); stubFor( - post("/openai/deployments/text-embedding-ada-002/embeddings?api-version=2023-03-15-preview") + post("/openai/deployments/text-embedding-ada-002/embeddings?api-version=2023-08-01-preview") .willReturn( okJson( """