diff --git a/langstream-api/src/main/java/ai/langstream/api/runtime/ComputeClusterRuntime.java b/langstream-api/src/main/java/ai/langstream/api/runtime/ComputeClusterRuntime.java index b4c2f1a2d..d82d8b2c0 100644 --- a/langstream-api/src/main/java/ai/langstream/api/runtime/ComputeClusterRuntime.java +++ b/langstream-api/src/main/java/ai/langstream/api/runtime/ComputeClusterRuntime.java @@ -20,6 +20,8 @@ import ai.langstream.api.model.Connection; import ai.langstream.api.model.Module; import ai.langstream.api.model.Pipeline; +import ai.langstream.api.model.Resource; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -84,5 +86,10 @@ default AgentNodeMetadata computeAgentMetadata( return null; } + default Map getResourceImplementation( + Resource resource, PluginsRegistry pluginsRegistry) { + return new HashMap<>(resource.configuration()); + } + default void close() {} } diff --git a/langstream-api/src/main/java/ai/langstream/api/runtime/PluginsRegistry.java b/langstream-api/src/main/java/ai/langstream/api/runtime/PluginsRegistry.java index c7dfcead7..37bfe6fae 100644 --- a/langstream-api/src/main/java/ai/langstream/api/runtime/PluginsRegistry.java +++ b/langstream-api/src/main/java/ai/langstream/api/runtime/PluginsRegistry.java @@ -69,4 +69,29 @@ public AssetNodeProvider lookupAssetImplementation( + clusterRuntime.getClusterType())); return assetRuntimeProviderProvider.get(); } + + public ResourceNodeProvider lookupResourceImplementation( + String type, ComputeClusterRuntime clusterRuntime) { + log.info( + "Looking for an implementation of resource type {} on {}", + type, + clusterRuntime.getClusterType()); + ServiceLoader loader = ServiceLoader.load(ResourceNodeProvider.class); + ServiceLoader.Provider runtimeProviderProvider = + loader.stream() + .filter( + p -> { + ResourceNodeProvider nodeProvider = p.get(); + return nodeProvider.supports(type, clusterRuntime); + }) + .findFirst() + .orElseThrow( + () -> + new RuntimeException( + "No ResourceNodeProvider found for resource type " + + type + + " for cluster type " + + clusterRuntime.getClusterType())); + return runtimeProviderProvider.get(); + } } diff --git a/langstream-api/src/main/java/ai/langstream/api/runtime/ResourceNodeProvider.java b/langstream-api/src/main/java/ai/langstream/api/runtime/ResourceNodeProvider.java new file mode 100644 index 000000000..235869dcd --- /dev/null +++ b/langstream-api/src/main/java/ai/langstream/api/runtime/ResourceNodeProvider.java @@ -0,0 +1,48 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.api.runtime; + +import ai.langstream.api.model.Module; +import ai.langstream.api.model.Resource; +import java.util.Map; + +public interface ResourceNodeProvider { + + /** + * Create an Implementation of a Resource. + * + * @param module the module + * @param executionPlan the physical application instance + * @param clusterRuntime the cluster runtime + * @param pluginsRegistry the plugins registry + * @return the Agent + */ + Map createImplementation( + Resource resource, + Module module, + ExecutionPlan executionPlan, + ComputeClusterRuntime clusterRuntime, + PluginsRegistry pluginsRegistry); + + /** + * Returns the ability of a Resource to be deployed on the give runtimes. + * + * @param type the type of implementation + * @param clusterRuntime the compute cluster runtime + * @return true if this provider can create the implementation + */ + boolean supports(String type, ComputeClusterRuntime clusterRuntime); +} diff --git a/langstream-core/src/main/java/ai/langstream/impl/agents/ai/GenAIToolKitFunctionAgentProvider.java b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/GenAIToolKitFunctionAgentProvider.java index 0ed6d773d..dcce509d5 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/agents/ai/GenAIToolKitFunctionAgentProvider.java +++ b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/GenAIToolKitFunctionAgentProvider.java @@ -24,6 +24,7 @@ import ai.langstream.api.runtime.ComponentType; import ai.langstream.api.runtime.ComputeClusterRuntime; import ai.langstream.api.runtime.ExecutionPlan; +import ai.langstream.api.runtime.PluginsRegistry; import ai.langstream.impl.common.AbstractAgentProvider; import java.util.ArrayList; import java.util.Collection; @@ -390,7 +391,9 @@ protected void generateSteps( Map originalConfiguration, Map configuration, Application application, - AgentConfiguration agentConfiguration) { + AgentConfiguration agentConfiguration, + ComputeClusterRuntime computeClusterRuntime, + PluginsRegistry pluginsRegistry) { List> steps = new ArrayList<>(); configuration.put("steps", steps); Map step = new HashMap<>(); @@ -403,7 +406,12 @@ protected void generateSteps( DataSourceConfigurationGenerator dataSourceConfigurationInjector = (resourceId) -> - generateDataSourceConfiguration(resourceId, application, configuration); + generateDataSourceConfiguration( + resourceId, + application, + configuration, + computeClusterRuntime, + pluginsRegistry); TopicConfigurationGenerator topicConfigurationGenerator = (topicName) -> { @@ -427,11 +435,15 @@ interface DataSourceConfigurationGenerator { } private void generateAIProvidersConfiguration( - Application applicationInstance, Map configuration) { + Application applicationInstance, + Map configuration, + ComputeClusterRuntime clusterRuntime, + PluginsRegistry pluginsRegistry) { // let the user force the provider or detect it automatically String service = (String) configuration.remove("service"); for (Resource resource : applicationInstance.getResources().values()) { - HashMap configurationCopy = new HashMap<>(resource.configuration()); + Map configurationCopy = + clusterRuntime.getResourceImplementation(resource, pluginsRegistry); switch (resource.type()) { case "vertex-configuration": if (service == null || service.equals("vertex")) { @@ -456,7 +468,11 @@ private void generateAIProvidersConfiguration( } private void generateDataSourceConfiguration( - String resourceId, Application applicationInstance, Map configuration) { + String resourceId, + Application applicationInstance, + Map configuration, + ComputeClusterRuntime computeClusterRuntime, + PluginsRegistry pluginsRegistry) { Resource resource = applicationInstance.getResources().get(resourceId); log.info("Generating datasource configuration for {}", resourceId); if (resource != null) { @@ -467,7 +483,9 @@ private void generateDataSourceConfiguration( if (configuration.containsKey("datasource")) { throw new IllegalArgumentException("Only one datasource is supported"); } - configuration.put("datasource", new HashMap<>(resource.configuration())); + Map resourceImplementation = + computeClusterRuntime.getResourceImplementation(resource, pluginsRegistry); + configuration.put("datasource", resourceImplementation); } else { throw new IllegalArgumentException("Resource " + resourceId + " not found"); } @@ -479,20 +497,29 @@ protected Map computeAgentConfiguration( Module module, Pipeline pipeline, ExecutionPlan executionPlan, - ComputeClusterRuntime clusterRuntime) { + ComputeClusterRuntime clusterRuntime, + PluginsRegistry pluginsRegistry) { Map originalConfiguration = super.computeAgentConfiguration( - agentConfiguration, module, pipeline, executionPlan, clusterRuntime); + agentConfiguration, + module, + pipeline, + executionPlan, + clusterRuntime, + pluginsRegistry); Map configuration = new HashMap<>(); - generateAIProvidersConfiguration(executionPlan.getApplication(), configuration); + generateAIProvidersConfiguration( + executionPlan.getApplication(), configuration, clusterRuntime, pluginsRegistry); generateSteps( module, originalConfiguration, configuration, executionPlan.getApplication(), - agentConfiguration); + agentConfiguration, + clusterRuntime, + pluginsRegistry); return configuration; } diff --git a/langstream-core/src/main/java/ai/langstream/impl/common/AbstractAgentProvider.java b/langstream-core/src/main/java/ai/langstream/impl/common/AbstractAgentProvider.java index 2e9c836cb..ce416edf5 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/common/AbstractAgentProvider.java +++ b/langstream-core/src/main/java/ai/langstream/impl/common/AbstractAgentProvider.java @@ -121,7 +121,8 @@ protected Map computeAgentConfiguration( Module module, Pipeline pipeline, ExecutionPlan executionPlan, - ComputeClusterRuntime clusterRuntime) { + ComputeClusterRuntime clusterRuntime, + PluginsRegistry pluginsRegistry) { return new HashMap<>(agentConfiguration.getConfiguration()); } @@ -141,7 +142,12 @@ public AgentNode createImplementation( ComponentType componentType = getComponentType(agentConfiguration); Map configuration = computeAgentConfiguration( - agentConfiguration, module, pipeline, executionPlan, clusterRuntime); + agentConfiguration, + module, + pipeline, + executionPlan, + clusterRuntime, + pluginsRegistry); // we create the output connection first to make sure that the topic is created ConnectionImplementation output = computeOutput( diff --git a/langstream-core/src/main/java/ai/langstream/impl/common/AbstractAssetProvider.java b/langstream-core/src/main/java/ai/langstream/impl/common/AbstractAssetProvider.java index d150cdbc0..22d5da70c 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/common/AbstractAssetProvider.java +++ b/langstream-core/src/main/java/ai/langstream/impl/common/AbstractAssetProvider.java @@ -45,7 +45,12 @@ public final AssetNode createImplementation( ExecutionPlan executionPlan, ComputeClusterRuntime clusterRuntime, PluginsRegistry pluginsRegistry) { - Map asset = planAsset(assetDefinition, executionPlan.getApplication()); + Map asset = + planAsset( + assetDefinition, + executionPlan.getApplication(), + clusterRuntime, + pluginsRegistry); validateAsset(assetDefinition, asset); return new AssetNode(asset); } @@ -54,7 +59,10 @@ protected abstract void validateAsset( AssetDefinition assetDefinition, Map asset); private Map planAsset( - AssetDefinition assetDefinition, Application application) { + AssetDefinition assetDefinition, + Application application, + ComputeClusterRuntime computeClusterRuntime, + PluginsRegistry pluginsRegistry) { if (!supportedType.contains(assetDefinition.getAssetType())) { throw new IllegalStateException(); @@ -81,7 +89,10 @@ private Map planAsset( key); Resource resource = resources.get(resourceId); if (resource != null) { - value = Map.of("configuration", resource.configuration()); + Map resourceImplementation = + computeClusterRuntime.getResourceImplementation( + resource, pluginsRegistry); + value = Map.of("configuration", resourceImplementation); } else { throw new IllegalArgumentException( "Resource with name=" diff --git a/langstream-core/src/main/java/ai/langstream/impl/common/BasicClusterRuntime.java b/langstream-core/src/main/java/ai/langstream/impl/common/BasicClusterRuntime.java index e37a174c6..0f900a783 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/common/BasicClusterRuntime.java +++ b/langstream-core/src/main/java/ai/langstream/impl/common/BasicClusterRuntime.java @@ -21,6 +21,7 @@ import ai.langstream.api.model.Connection; import ai.langstream.api.model.Module; import ai.langstream.api.model.Pipeline; +import ai.langstream.api.model.Resource; import ai.langstream.api.model.TopicDefinition; import ai.langstream.api.runtime.AgentNode; import ai.langstream.api.runtime.AgentNodeProvider; @@ -32,8 +33,10 @@ import ai.langstream.api.runtime.ExecutionPlan; import ai.langstream.api.runtime.ExecutionPlanOptimiser; import ai.langstream.api.runtime.PluginsRegistry; +import ai.langstream.api.runtime.ResourceNodeProvider; import ai.langstream.api.runtime.StreamingClusterRuntime; import ai.langstream.api.runtime.Topic; +import java.util.HashMap; import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -143,6 +146,15 @@ protected void detectAgents( } } + @Override + public Map getResourceImplementation( + Resource resource, PluginsRegistry pluginsRegistry) { + ResourceNodeProvider nodeProvider = + pluginsRegistry.lookupResourceImplementation(resource.type(), this); + // TODO: validate resource + return new HashMap<>(resource.configuration()); + } + protected AgentNode buildAgent( Module module, Pipeline pipeline, diff --git a/langstream-core/src/main/java/ai/langstream/impl/resources/AIProvidersResourceProvider.java b/langstream-core/src/main/java/ai/langstream/impl/resources/AIProvidersResourceProvider.java new file mode 100644 index 000000000..bd29dd371 --- /dev/null +++ b/langstream-core/src/main/java/ai/langstream/impl/resources/AIProvidersResourceProvider.java @@ -0,0 +1,47 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.impl.resources; + +import ai.langstream.api.model.Module; +import ai.langstream.api.model.Resource; +import ai.langstream.api.runtime.ComputeClusterRuntime; +import ai.langstream.api.runtime.ExecutionPlan; +import ai.langstream.api.runtime.PluginsRegistry; +import ai.langstream.api.runtime.ResourceNodeProvider; +import java.util.Map; +import java.util.Set; + +public class AIProvidersResourceProvider implements ResourceNodeProvider { + + private static final Set SUPPORTED_TYPES = + Set.of("open-ai-configuration", "hugging-face-configuration", "vertex-configuration"); + + @Override + public Map createImplementation( + Resource resource, + Module module, + ExecutionPlan executionPlan, + ComputeClusterRuntime clusterRuntime, + PluginsRegistry pluginsRegistry) { + Map configuration = resource.configuration(); + return resource.configuration(); + } + + @Override + public boolean supports(String type, ComputeClusterRuntime clusterRuntime) { + return SUPPORTED_TYPES.contains(type); + } +} diff --git a/langstream-core/src/main/java/ai/langstream/impl/resources/DataSourceResourceProvider.java b/langstream-core/src/main/java/ai/langstream/impl/resources/DataSourceResourceProvider.java new file mode 100644 index 000000000..ec73cbef7 --- /dev/null +++ b/langstream-core/src/main/java/ai/langstream/impl/resources/DataSourceResourceProvider.java @@ -0,0 +1,48 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.impl.resources; + +import ai.langstream.api.model.Module; +import ai.langstream.api.model.Resource; +import ai.langstream.api.runtime.ComputeClusterRuntime; +import ai.langstream.api.runtime.ExecutionPlan; +import ai.langstream.api.runtime.PluginsRegistry; +import ai.langstream.api.runtime.ResourceNodeProvider; +import ai.langstream.api.util.ConfigurationUtils; +import java.util.Map; + +public class DataSourceResourceProvider implements ResourceNodeProvider { + @Override + public Map createImplementation( + Resource resource, + Module module, + ExecutionPlan executionPlan, + ComputeClusterRuntime clusterRuntime, + PluginsRegistry pluginsRegistry) { + Map configuration = resource.configuration(); + String service = ConfigurationUtils.getString("service", "", configuration); + if (service.isEmpty()) { + throw new IllegalArgumentException( + "Missing required field 'service' in a datasource resource definition"); + } + return resource.configuration(); + } + + @Override + public boolean supports(String type, ComputeClusterRuntime clusterRuntime) { + return "datasource".equals(type); + } +} diff --git a/langstream-core/src/main/java/ai/langstream/impl/resources/VectorDatabaseResourceProvider.java b/langstream-core/src/main/java/ai/langstream/impl/resources/VectorDatabaseResourceProvider.java new file mode 100644 index 000000000..65b2c17a9 --- /dev/null +++ b/langstream-core/src/main/java/ai/langstream/impl/resources/VectorDatabaseResourceProvider.java @@ -0,0 +1,48 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.impl.resources; + +import ai.langstream.api.model.Module; +import ai.langstream.api.model.Resource; +import ai.langstream.api.runtime.ComputeClusterRuntime; +import ai.langstream.api.runtime.ExecutionPlan; +import ai.langstream.api.runtime.PluginsRegistry; +import ai.langstream.api.runtime.ResourceNodeProvider; +import ai.langstream.api.util.ConfigurationUtils; +import java.util.Map; + +public class VectorDatabaseResourceProvider implements ResourceNodeProvider { + @Override + public Map createImplementation( + Resource resource, + Module module, + ExecutionPlan executionPlan, + ComputeClusterRuntime clusterRuntime, + PluginsRegistry pluginsRegistry) { + Map configuration = resource.configuration(); + String service = ConfigurationUtils.getString("service", "", configuration); + if (service.isEmpty()) { + throw new IllegalArgumentException( + "Missing required field 'service' in a vector-database resource definition"); + } + return resource.configuration(); + } + + @Override + public boolean supports(String type, ComputeClusterRuntime clusterRuntime) { + return "vector-database".equals(type); + } +} diff --git a/langstream-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.ResourceNodeProvider b/langstream-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.ResourceNodeProvider new file mode 100644 index 000000000..75066c228 --- /dev/null +++ b/langstream-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.ResourceNodeProvider @@ -0,0 +1,3 @@ +ai.langstream.impl.resources.VectorDatabaseResourceProvider +ai.langstream.impl.resources.DataSourceResourceProvider +ai.langstream.impl.resources.AIProvidersResourceProvider \ No newline at end of file diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProvider.java index a7e50d0f2..70b417ed2 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProvider.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/main/java/ai/langstream/runtime/impl/k8s/agents/QueryVectorDBAgentProvider.java @@ -23,9 +23,9 @@ import ai.langstream.api.runtime.ComponentType; import ai.langstream.api.runtime.ComputeClusterRuntime; import ai.langstream.api.runtime.ExecutionPlan; +import ai.langstream.api.runtime.PluginsRegistry; import ai.langstream.impl.agents.AbstractComposableAgentProvider; import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -55,10 +55,16 @@ protected Map computeAgentConfiguration( Module module, Pipeline pipeline, ExecutionPlan executionPlan, - ComputeClusterRuntime clusterRuntime) { + ComputeClusterRuntime clusterRuntime, + PluginsRegistry pluginsRegistry) { Map originalConfiguration = super.computeAgentConfiguration( - agentConfiguration, module, pipeline, executionPlan, clusterRuntime); + agentConfiguration, + module, + pipeline, + executionPlan, + clusterRuntime, + pluginsRegistry); // get the datasource configuration and inject it into the agent configuration String resourceId = (String) originalConfiguration.remove("datasource"); @@ -72,17 +78,27 @@ protected Map computeAgentConfiguration( + agentConfiguration.getId()); } generateDataSourceConfiguration( - resourceId, executionPlan.getApplication(), originalConfiguration); + resourceId, + executionPlan.getApplication(), + originalConfiguration, + clusterRuntime, + pluginsRegistry); return originalConfiguration; } private void generateDataSourceConfiguration( - String resourceId, Application applicationInstance, Map configuration) { + String resourceId, + Application applicationInstance, + Map configuration, + ComputeClusterRuntime computeClusterRuntime, + PluginsRegistry pluginsRegistry) { Resource resource = applicationInstance.getResources().get(resourceId); log.info("Generating datasource configuration for {}", resourceId); if (resource != null) { + Map resourceImplementation = + computeClusterRuntime.getResourceImplementation(resource, pluginsRegistry); if (!resource.type().equals("datasource") && !resource.type().equals("vector-database")) { throw new IllegalArgumentException( @@ -93,7 +109,7 @@ private void generateDataSourceConfiguration( if (configuration.containsKey("datasource")) { throw new IllegalArgumentException("Only one datasource is supported"); } - configuration.put("datasource", new HashMap<>(resource.configuration())); + configuration.put("datasource", resourceImplementation); } else { throw new IllegalArgumentException("Resource " + resourceId + " not found"); } diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/testagents/TestGenericAgentProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/testagents/TestGenericAgentProvider.java index 4bff51c3b..e5eb5b24d 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/testagents/TestGenericAgentProvider.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/testagents/TestGenericAgentProvider.java @@ -21,6 +21,7 @@ import ai.langstream.api.runtime.ComponentType; import ai.langstream.api.runtime.ComputeClusterRuntime; import ai.langstream.api.runtime.ExecutionPlan; +import ai.langstream.api.runtime.PluginsRegistry; import ai.langstream.impl.common.AbstractAgentProvider; import java.util.List; import java.util.Map; @@ -38,10 +39,16 @@ protected Map computeAgentConfiguration( Module module, Pipeline pipeline, ExecutionPlan executionPlan, - ComputeClusterRuntime clusterRuntime) { + ComputeClusterRuntime clusterRuntime, + PluginsRegistry pluginsRegistry) { Map copy = super.computeAgentConfiguration( - agentConfiguration, module, pipeline, executionPlan, clusterRuntime); + agentConfiguration, + module, + pipeline, + executionPlan, + clusterRuntime, + pluginsRegistry); // TODO..... return copy; } diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/testagents/TestGenericSinkAgentProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/testagents/TestGenericSinkAgentProvider.java index 1a1d9e7d1..68d8f8091 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/testagents/TestGenericSinkAgentProvider.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/testagents/TestGenericSinkAgentProvider.java @@ -22,6 +22,7 @@ import ai.langstream.api.runtime.ComputeClusterRuntime; import ai.langstream.api.runtime.ConnectionImplementation; import ai.langstream.api.runtime.ExecutionPlan; +import ai.langstream.api.runtime.PluginsRegistry; import ai.langstream.api.runtime.Topic; import ai.langstream.impl.common.AbstractAgentProvider; import ai.langstream.runtime.impl.k8s.KubernetesClusterRuntime; @@ -46,10 +47,16 @@ protected Map computeAgentConfiguration( Module module, Pipeline pipeline, ExecutionPlan executionPlan, - ComputeClusterRuntime clusterRuntime) { + ComputeClusterRuntime clusterRuntime, + PluginsRegistry pluginsRegistry) { Map copy = super.computeAgentConfiguration( - agentConfiguration, module, pipeline, executionPlan, clusterRuntime); + agentConfiguration, + module, + pipeline, + executionPlan, + clusterRuntime, + pluginsRegistry); // we can auto-wire the "topics" configuration property ConnectionImplementation connectionImplementation = diff --git a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/testagents/TestGenericSourceAgentProvider.java b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/testagents/TestGenericSourceAgentProvider.java index 1a0a58ab1..b1e613aee 100644 --- a/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/testagents/TestGenericSourceAgentProvider.java +++ b/langstream-k8s-runtime/langstream-k8s-runtime-core/src/test/java/ai/langstream/runtime/impl/testagents/TestGenericSourceAgentProvider.java @@ -22,6 +22,7 @@ import ai.langstream.api.runtime.ComputeClusterRuntime; import ai.langstream.api.runtime.ConnectionImplementation; import ai.langstream.api.runtime.ExecutionPlan; +import ai.langstream.api.runtime.PluginsRegistry; import ai.langstream.api.runtime.Topic; import ai.langstream.impl.common.AbstractAgentProvider; import java.util.List; @@ -45,10 +46,16 @@ protected Map computeAgentConfiguration( Module module, Pipeline pipeline, ExecutionPlan executionPlan, - ComputeClusterRuntime clusterRuntime) { + ComputeClusterRuntime clusterRuntime, + PluginsRegistry pluginsRegistry) { Map copy = super.computeAgentConfiguration( - agentConfiguration, module, pipeline, executionPlan, clusterRuntime); + agentConfiguration, + module, + pipeline, + executionPlan, + clusterRuntime, + pluginsRegistry); // we can auto-wire the "topic" configuration property ConnectionImplementation connectionImplementation =