From c3b7467cb564cb4b60500155cddf34241c28532e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Wed, 25 Oct 2023 10:01:16 +0200 Subject: [PATCH 1/8] Add persistent state directory to python agents --- .../agents/grpc/PythonGrpcAgentProcessor.java | 2 +- .../agents/grpc/PythonGrpcAgentSink.java | 2 +- .../agents/grpc/PythonGrpcAgentSource.java | 2 +- .../agents/grpc/PythonGrpcServer.java | 22 +++++++- .../apps/python-processor/python/example.py | 24 +++++++-- .../src/main/python/Pipfile | 2 +- .../src/main/python/langstream/api.py | 17 ++++++- .../main/python/langstream_grpc/__main__.py | 7 ++- .../src/main/python/langstream_grpc/api.py | 16 +++++- .../python/langstream_grpc/grpc_service.py | 28 +++++++++-- .../langstream_grpc/tests/server_and_stub.py | 13 +++-- .../tests/test_grpc_processor.py | 50 ++++++++++++++++++- 12 files changed, 159 insertions(+), 26 deletions(-) diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java index 2c57c0c2c..bc896a2f8 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java @@ -30,7 +30,7 @@ public void init(Map configuration) throws Exception { @Override public void start() throws Exception { - server = new PythonGrpcServer(agentContext.getCodeDirectory(), configuration); + server = new PythonGrpcServer(agentContext.getCodeDirectory(), configuration, agentContext); channel = server.start(); super.start(); } diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java index a9739c935..85a721b99 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java @@ -30,7 +30,7 @@ public void init(Map configuration) throws Exception { @Override public void start() throws Exception { - server = new PythonGrpcServer(agentContext.getCodeDirectory(), configuration); + server = new PythonGrpcServer(agentContext.getCodeDirectory(), configuration, agentContext); channel = server.start(); super.start(); } diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java index 210b32561..c4b49f9bd 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java @@ -30,7 +30,7 @@ public void init(Map configuration) throws Exception { @Override public void start() throws Exception { - server = new PythonGrpcServer(agentContext.getCodeDirectory(), configuration); + server = new PythonGrpcServer(agentContext.getCodeDirectory(), configuration, agentContext); channel = server.start(); super.start(); } diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java index 6948ec11b..314a097fa 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java @@ -15,6 +15,7 @@ */ package ai.langstream.agents.grpc; +import ai.langstream.api.runner.code.AgentContext; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.Empty; import io.grpc.ManagedChannel; @@ -22,6 +23,7 @@ import java.net.ServerSocket; import java.nio.file.Path; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -31,11 +33,13 @@ public class PythonGrpcServer { private final Path codeDirectory; private final Map configuration; + private final AgentContext agentContext; private Process pythonProcess; - public PythonGrpcServer(Path codeDirectory, Map configuration) { + public PythonGrpcServer(Path codeDirectory, Map configuration, AgentContext agentContext) { this.codeDirectory = codeDirectory; this.configuration = configuration; + this.agentContext = agentContext; } public ManagedChannel start() throws Exception { @@ -57,6 +61,8 @@ public ManagedChannel start() throws Exception { pythonCodeDirectory.toAbsolutePath(), pythonCodeDirectory.resolve("lib").toAbsolutePath()); + AgentContextConfiguration agentContextConfiguration = computeAgentContextConfiguration(); + // copy input/output to standard input/output of the java process // this allows to use "kubectl logs" easily ProcessBuilder processBuilder = @@ -65,7 +71,8 @@ public ManagedChannel start() throws Exception { "-m", "langstream_grpc", "[::]:%s".formatted(port), - MAPPER.writeValueAsString(configuration)) + MAPPER.writeValueAsString(configuration), + MAPPER.writeValueAsString(agentContextConfiguration)) .inheritIO() .redirectOutput(ProcessBuilder.Redirect.INHERIT) .redirectError(ProcessBuilder.Redirect.INHERIT); @@ -91,6 +98,15 @@ public ManagedChannel start() throws Exception { return channel; } + private AgentContextConfiguration computeAgentContextConfiguration() { + final Optional persistentStateDirectoryForAgent = + agentContext.getPersistentStateDirectoryForAgent(agentContext.getAgentId()); + + final String persistentStateDirectory = persistentStateDirectoryForAgent.map(p -> p.toFile().getAbsolutePath()).orElse(null); + AgentContextConfiguration agentContextConfiguration = new AgentContextConfiguration(persistentStateDirectory); + return agentContextConfiguration; + } + public void close() throws Exception { if (pythonProcess != null) { pythonProcess.destroy(); @@ -102,4 +118,6 @@ public void close() throws Exception { } } } + + public record AgentContextConfiguration(String persistentStateDirectory) {} } diff --git a/langstream-e2e-tests/src/test/resources/apps/python-processor/python/example.py b/langstream-e2e-tests/src/test/resources/apps/python-processor/python/example.py index d572ac100..c8a8bfa90 100644 --- a/langstream-e2e-tests/src/test/resources/apps/python-processor/python/example.py +++ b/langstream-e2e-tests/src/test/resources/apps/python-processor/python/example.py @@ -14,17 +14,35 @@ # limitations under the License. # -from langstream import SimpleRecord, Processor -import logging +from langstream import SimpleRecord, Processor, AgentContext +import logging, os class Exclamation(Processor): - def init(self, config): + def init(self, config, context: AgentContext): print("init", config) self.secret_value = config["secret_value"] + self.context = context def process(self, record): logging.info("Processing record" + str(record)) + directory = self.context.get_persistent_state_directory() + counter_file = os.path.resolve(directory, "counter.txt") + counter = 0 + if os.path.exists(counter_file): + with open(counter_file, "r") as f: + counter = int(f.read()) + counter += 1 + else: + counter = 1 + with open(counter_file, 'w') as file: + file.write(str(counter)) + + + if self.secret_value == "super secret value - changed": + assert counter == 2 + else: + assert counter == 1 return [ SimpleRecord( record.value() + "!!" + self.secret_value, headers=record.headers() diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/Pipfile b/langstream-runtime/langstream-runtime-impl/src/main/python/Pipfile index b5b68745c..ef6d71489 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/Pipfile +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/Pipfile @@ -1,4 +1,4 @@ -[packages] + [packages] grpcio = "*" fastavro = "*" diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/api.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/api.py index 089462b37..4b10186b9 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/api.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/api.py @@ -26,6 +26,7 @@ "Source", "Sink", "Processor", + "AgentContext" ] @@ -60,11 +61,25 @@ def headers(self) -> List[Tuple[str, Any]]: RecordType = Union[Record, dict, list, tuple] +class AgentContext(ABC): + """The Agent context interface""" + + def __init__(self): + """Initialize the agent context.""" + pass + + + @abstractmethod + def get_persistent_state_directory(self): + """Return a path pointing to the stateful agent directory. Return None if not configured in the agent.""" + pass + + class Agent(ABC): """The Agent interface""" - def init(self, config: Dict[str, Any]): + def init(self, config: Dict[str, Any], context: AgentContext): """Initialize the agent from the given configuration.""" pass diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/__main__.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/__main__.py index d11dbc5b7..eddd7dcda 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/__main__.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/__main__.py @@ -27,12 +27,15 @@ datefmt="%H:%M:%S", ) - if len(sys.argv) != 3: + if len(sys.argv) <= 3: print("Missing gRPC target and python class name") print("usage: python -m langstream_grpc ") sys.exit(1) - server = AgentServer(sys.argv[1], sys.argv[2]) + context_config = {} + if len(sys.argv) > 3: + context_config = sys.argv[3] + server = AgentServer(sys.argv[1], sys.argv[2], context_config) server.start() server.grpc_server.wait_for_termination() server.stop() diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/api.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/api.py index 089462b37..dbc74ecec 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/api.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/api.py @@ -26,6 +26,7 @@ "Source", "Sink", "Processor", + "AgentContext" ] @@ -61,10 +62,23 @@ def headers(self) -> List[Tuple[str, Any]]: RecordType = Union[Record, dict, list, tuple] +class AgentContext(ABC): + """The Agent context interface""" + + def __init__(self): + """Initialize the agent context.""" + pass + + + def get_persistent_state_directory(self): + """Return a path pointing to the stateful agent directory. Return None if not configured in the agent.""" + return None + + class Agent(ABC): """The Agent interface""" - def init(self, config: Dict[str, Any]): + def init(self, config: Dict[str, Any], context: AgentContext): """Initialize the agent from the given configuration.""" pass diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py index e25ac6f7f..20eb01fb4 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py @@ -26,6 +26,7 @@ import fastavro import grpc +import inspect from langstream_grpc.proto import agent_pb2_grpc from langstream_grpc.proto.agent_pb2 import ( @@ -49,6 +50,7 @@ Processor, Record, Agent, + AgentContext ) from .util import SimpleRecord, AvroValue @@ -324,27 +326,43 @@ def to_grpc_value(self, value) -> Tuple[Optional[Schema], Optional[Value]]: def call_method_if_exists(klass, method, *args, **kwargs): method = getattr(klass, method, None) if callable(method): - return method(*args, **kwargs) + defined_positional_parameters_count = len(inspect.signature(method).parameters) + if defined_positional_parameters_count >= len(args): + return method(*args, **kwargs) + else: + return method(*args[:defined_positional_parameters_count], **kwargs) return None -def init_agent(configuration) -> Agent: +def init_agent(configuration, context) -> Agent: full_class_name = configuration["className"] class_name = full_class_name.split(".")[-1] module_name = full_class_name[: -len(class_name) - 1] module = importlib.import_module(module_name) agent = getattr(module, class_name)() - call_method_if_exists(agent, "init", configuration) + context_impl = AgentContextImpl(configuration, context) + call_method_if_exists(agent, "init", configuration, context_impl) return agent +class AgentContextImpl(AgentContext): + def __init__(self, configuration: dict, context: dict): + self.configuration = configuration + self.context = context + + def get_persistent_state_directory(self) -> str: + dir = self.context.get("persistentStateDirectory", "") + if not dir: + return None + return dir + class AgentServer(object): - def __init__(self, target: str, config: str): + def __init__(self, target: str, config: str, context: str): self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) self.target = target self.grpc_server = grpc.server(self.thread_pool) self.port = self.grpc_server.add_insecure_port(target) - self.agent = init_agent(json.loads(config)) + self.agent = init_agent(json.loads(config), json.loads(context)) def start(self): call_method_if_exists(self.agent, "start") diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/server_and_stub.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/server_and_stub.py index 1ce338ed3..90286e95c 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/server_and_stub.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/server_and_stub.py @@ -16,24 +16,23 @@ from typing import Optional -import grpc +import grpc, json from langstream_grpc.grpc_service import AgentServer from langstream_grpc.proto.agent_pb2_grpc import AgentServiceStub class ServerAndStub(object): - def __init__(self, class_name): - self.class_name = class_name + def __init__(self, class_name, agent_config = {}, context = {}): + self.config = agent_config.copy() + self.config["className"] = class_name + self.context = context self.server: Optional[AgentServer] = None self.channel: Optional[grpc.Channel] = None self.stub: Optional[AgentServiceStub] = None def __enter__(self): - config = f"""{{ - "className": "{self.class_name}" - }}""" - self.server = AgentServer("[::]:0", config) + self.server = AgentServer("[::]:0", json.dumps(self.config), json.dumps(self.context)) self.server.start() self.channel = grpc.insecure_channel("localhost:%d" % self.server.port) self.stub = AgentServiceStub(channel=self.channel) diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_processor.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_processor.py index 25ae02ff9..bd42b6b00 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_processor.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_processor.py @@ -23,7 +23,7 @@ import pytest from google.protobuf import empty_pb2 -from langstream_grpc.api import Record, RecordType, Processor +from langstream_grpc.api import Record, RecordType, Processor, AgentContext from langstream_grpc.proto.agent_pb2 import ( ProcessorRequest, Record as GrpcRecord, @@ -212,6 +212,33 @@ def test_info(): assert info.json_info == '{"test-info-key": "test-info-value"}' + +def test_init_one_parameter(): + with ServerAndStub( + "langstream_grpc.tests.test_grpc_processor.ProcessorInitOneParameter", + {"my-param": "my-value"} + ) as server_and_stub: + for response in server_and_stub.stub.process( + iter([ProcessorRequest(records=[GrpcRecord()])]) + ): + assert len(response.results) == 1 + result = response.results[0].records[0] + assert result.value.string_value == "my-value" + + +def test_processor_use_context(): + with ServerAndStub( + "langstream_grpc.tests.test_grpc_processor.ProcessorUseContext", + {"my-param": "my-value"}, + {"persistentStateDirectory": "/tmp/processor"} + ) as server_and_stub: + for response in server_and_stub.stub.process( + iter([ProcessorRequest(records=[GrpcRecord()])]) + ): + assert len(response.results) == 1 + result = response.results[0].records[0] + assert result.value.string_value == "directory is /tmp/processor" + class MyProcessor(Processor): def agent_info(self) -> Dict[str, Any]: return {"test-info-key": "test-info-value"} @@ -252,3 +279,24 @@ def __init__(self): def process(self, record: Record) -> Future[List[RecordType]]: return self.executor.submit(lambda r: [r], record) + + + +class ProcessorInitOneParameter(Processor): + def init(self, agent_config): + self.myparam = agent_config["my-param"] + + def process(self, record: Record) -> List[RecordType]: + return [{ + "value": self.myparam + }] + +class ProcessorUseContext(Processor): + def init(self, agent_config, context: AgentContext): + self.myparam = agent_config["my-param"] + self.context = context + + def process(self, record: Record) -> List[RecordType]: + return [{ + "value": "directory is " + str(self.context.get_persistent_state_directory()) + }] From cf84e7868e268958ddf1605e080550e0eebc076a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Wed, 25 Oct 2023 12:29:38 +0200 Subject: [PATCH 2/8] fix compule --- .../ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java | 2 +- .../java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java | 2 +- .../ai/langstream/agents/grpc/PythonGrpcAgentSource.java | 2 +- .../java/ai/langstream/agents/grpc/PythonGrpcServer.java | 6 ++++-- .../langstream-runtime-impl/src/main/python/Pipfile | 2 +- 5 files changed, 8 insertions(+), 6 deletions(-) diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java index bc896a2f8..8709e59e7 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java @@ -30,7 +30,7 @@ public void init(Map configuration) throws Exception { @Override public void start() throws Exception { - server = new PythonGrpcServer(agentContext.getCodeDirectory(), configuration, agentContext); + server = new PythonGrpcServer(agentContext.getCodeDirectory(), configuration, agentId(), agentContext); channel = server.start(); super.start(); } diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java index 85a721b99..dcfdede3d 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java @@ -30,7 +30,7 @@ public void init(Map configuration) throws Exception { @Override public void start() throws Exception { - server = new PythonGrpcServer(agentContext.getCodeDirectory(), configuration, agentContext); + server = new PythonGrpcServer(agentContext.getCodeDirectory(), configuration, agentId(), agentContext); channel = server.start(); super.start(); } diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java index c4b49f9bd..53a833d56 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java @@ -30,7 +30,7 @@ public void init(Map configuration) throws Exception { @Override public void start() throws Exception { - server = new PythonGrpcServer(agentContext.getCodeDirectory(), configuration, agentContext); + server = new PythonGrpcServer(agentContext.getCodeDirectory(), configuration, agentId(), agentContext); channel = server.start(); super.start(); } diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java index 314a097fa..aee7bc00e 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java @@ -33,12 +33,14 @@ public class PythonGrpcServer { private final Path codeDirectory; private final Map configuration; + private final String agentId; private final AgentContext agentContext; private Process pythonProcess; - public PythonGrpcServer(Path codeDirectory, Map configuration, AgentContext agentContext) { + public PythonGrpcServer(Path codeDirectory, Map configuration, String agentId, AgentContext agentContext) { this.codeDirectory = codeDirectory; this.configuration = configuration; + this.agentId = agentId; this.agentContext = agentContext; } @@ -100,7 +102,7 @@ public ManagedChannel start() throws Exception { private AgentContextConfiguration computeAgentContextConfiguration() { final Optional persistentStateDirectoryForAgent = - agentContext.getPersistentStateDirectoryForAgent(agentContext.getAgentId()); + agentContext.getPersistentStateDirectoryForAgent(agentId); final String persistentStateDirectory = persistentStateDirectoryForAgent.map(p -> p.toFile().getAbsolutePath()).orElse(null); AgentContextConfiguration agentContextConfiguration = new AgentContextConfiguration(persistentStateDirectory); diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/Pipfile b/langstream-runtime/langstream-runtime-impl/src/main/python/Pipfile index ef6d71489..b5b68745c 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/Pipfile +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/Pipfile @@ -1,4 +1,4 @@ - [packages] +[packages] grpcio = "*" fastavro = "*" From bf1beb1ae9f9e00e4ae409b6ba80618c5098ecb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Wed, 25 Oct 2023 14:53:18 +0200 Subject: [PATCH 3/8] fix end to end test --- .../ai/langstream/tests/util/BaseEndToEndTest.java | 11 +++++++---- .../resources/apps/python-processor/pipeline.yaml | 3 +++ .../resources/apps/python-processor/python/example.py | 4 ++-- .../src/main/python/langstream/__init__.py | 2 ++ 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java index 97e161488..eb0d82d7d 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java @@ -156,9 +156,9 @@ public static KubernetesClient getClient() { } private static void dumpTest(String prefix) { - dumpAllPodsLogs(prefix); + dumpAllPodsLogs(prefix + ".logs"); dumpEvents(prefix); - dumpAllResources(prefix); + dumpAllResources(prefix + ".resource"); dumpProcessOutput(prefix, "kubectl-nodes", "kubectl describe nodes".split(" ")); } @@ -558,7 +558,8 @@ private void cleanupEnv() { @AfterEach public void cleanupAfterEach() { - cleanupAllEndToEndTestsNamespaces(); + // do not cleanup langstream tenant here otherwise we won't get the logs in case of test + // failed cleanupEnv(); } @@ -688,6 +689,8 @@ private static void installLangStream(boolean authentication) { agentResources: cpuPerUnit: %s memPerUnit: %s + storageClassesMapping: + default: standard client: image: repository: %s/langstream-cli @@ -941,7 +944,7 @@ protected static void dumpResource(String filePrefix, HasMetadata resource) { final File outputFile = new File( TEST_LOGS_DIR, - "%s-%s-%s.txt" + "%s.%s.%s.txt" .formatted( filePrefix, resource.getKind(), diff --git a/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml b/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml index 26dc40078..5add9e10e 100644 --- a/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/python-processor/pipeline.yaml @@ -35,6 +35,9 @@ pipeline: - name: "Process using Python" resources: size: 2 + disk: + enabled: true + size: 50M id: "test-python-processor" type: "python-processor" input: ls-test-topic0 diff --git a/langstream-e2e-tests/src/test/resources/apps/python-processor/python/example.py b/langstream-e2e-tests/src/test/resources/apps/python-processor/python/example.py index c8a8bfa90..a87a6fe73 100644 --- a/langstream-e2e-tests/src/test/resources/apps/python-processor/python/example.py +++ b/langstream-e2e-tests/src/test/resources/apps/python-processor/python/example.py @@ -20,14 +20,14 @@ class Exclamation(Processor): def init(self, config, context: AgentContext): - print("init", config) + print("init", config, context) self.secret_value = config["secret_value"] self.context = context def process(self, record): logging.info("Processing record" + str(record)) directory = self.context.get_persistent_state_directory() - counter_file = os.path.resolve(directory, "counter.txt") + counter_file = os.path.join(directory, "counter.txt") counter = 0 if os.path.exists(counter_file): with open(counter_file, "r") as f: diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/__init__.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/__init__.py index c0a3e0bca..e24db692e 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/__init__.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/__init__.py @@ -22,6 +22,7 @@ Sink, Source, Processor, + AgentContext ) from .util import SimpleRecord, AvroValue @@ -34,4 +35,5 @@ "Processor", "SimpleRecord", "AvroValue", + "AgentContext" ] From 67fa32f75fac1b15503d2b2c7dc12655866412a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Wed, 25 Oct 2023 14:53:46 +0200 Subject: [PATCH 4/8] fix --- .../agents/grpc/PythonGrpcAgentProcessor.java | 4 +++- .../agents/grpc/PythonGrpcAgentSink.java | 4 +++- .../agents/grpc/PythonGrpcAgentSource.java | 4 +++- .../langstream/agents/grpc/PythonGrpcServer.java | 14 +++++++++++--- 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java index 8709e59e7..906e72eda 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java @@ -30,7 +30,9 @@ public void init(Map configuration) throws Exception { @Override public void start() throws Exception { - server = new PythonGrpcServer(agentContext.getCodeDirectory(), configuration, agentId(), agentContext); + server = + new PythonGrpcServer( + agentContext.getCodeDirectory(), configuration, agentId(), agentContext); channel = server.start(); super.start(); } diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java index dcfdede3d..f05d1eeca 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java @@ -30,7 +30,9 @@ public void init(Map configuration) throws Exception { @Override public void start() throws Exception { - server = new PythonGrpcServer(agentContext.getCodeDirectory(), configuration, agentId(), agentContext); + server = + new PythonGrpcServer( + agentContext.getCodeDirectory(), configuration, agentId(), agentContext); channel = server.start(); super.start(); } diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java index 53a833d56..26d871a20 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java @@ -30,7 +30,9 @@ public void init(Map configuration) throws Exception { @Override public void start() throws Exception { - server = new PythonGrpcServer(agentContext.getCodeDirectory(), configuration, agentId(), agentContext); + server = + new PythonGrpcServer( + agentContext.getCodeDirectory(), configuration, agentId(), agentContext); channel = server.start(); super.start(); } diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java index aee7bc00e..ec72cb478 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java @@ -37,7 +37,11 @@ public class PythonGrpcServer { private final AgentContext agentContext; private Process pythonProcess; - public PythonGrpcServer(Path codeDirectory, Map configuration, String agentId, AgentContext agentContext) { + public PythonGrpcServer( + Path codeDirectory, + Map configuration, + String agentId, + AgentContext agentContext) { this.codeDirectory = codeDirectory; this.configuration = configuration; this.agentId = agentId; @@ -104,8 +108,12 @@ private AgentContextConfiguration computeAgentContextConfiguration() { final Optional persistentStateDirectoryForAgent = agentContext.getPersistentStateDirectoryForAgent(agentId); - final String persistentStateDirectory = persistentStateDirectoryForAgent.map(p -> p.toFile().getAbsolutePath()).orElse(null); - AgentContextConfiguration agentContextConfiguration = new AgentContextConfiguration(persistentStateDirectory); + final String persistentStateDirectory = + persistentStateDirectoryForAgent + .map(p -> p.toFile().getAbsolutePath()) + .orElse(null); + AgentContextConfiguration agentContextConfiguration = + new AgentContextConfiguration(persistentStateDirectory); return agentContextConfiguration; } From 418500307a47374a0f00d83a74b7b7ba68dc01fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Wed, 25 Oct 2023 14:57:23 +0200 Subject: [PATCH 5/8] fix --- .../src/main/python/langstream_grpc/__main__.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/__main__.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/__main__.py index eddd7dcda..f0c3b6e87 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/__main__.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/__main__.py @@ -27,15 +27,12 @@ datefmt="%H:%M:%S", ) - if len(sys.argv) <= 3: - print("Missing gRPC target and python class name") - print("usage: python -m langstream_grpc ") + if len(sys.argv) != 4: + print("Missing gRPC target or config or agent context") + print("usage: python -m langstream_grpc ") sys.exit(1) - context_config = {} - if len(sys.argv) > 3: - context_config = sys.argv[3] - server = AgentServer(sys.argv[1], sys.argv[2], context_config) + server = AgentServer(sys.argv[1], sys.argv[2], sys.argv[3]) server.start() server.grpc_server.wait_for_termination() server.stop() From ce5caeff277bcb8010e93082463419d59ab5119b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Wed, 25 Oct 2023 15:03:18 +0200 Subject: [PATCH 6/8] lint --- .../src/main/python/langstream/__init__.py | 12 ++------- .../src/main/python/langstream/api.py | 5 ++-- .../main/python/langstream_grpc/__main__.py | 4 ++- .../src/main/python/langstream_grpc/api.py | 3 +-- .../python/langstream_grpc/grpc_service.py | 10 ++----- .../langstream_grpc/tests/server_and_stub.py | 9 ++++--- .../tests/test_grpc_processor.py | 27 ++++++++++--------- 7 files changed, 30 insertions(+), 40 deletions(-) diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/__init__.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/__init__.py index e24db692e..c1a9ca0b8 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/__init__.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/__init__.py @@ -15,15 +15,7 @@ # limitations under the License. # -from .api import ( - Agent, - Record, - RecordType, - Sink, - Source, - Processor, - AgentContext -) +from .api import Agent, Record, RecordType, Sink, Source, Processor, AgentContext from .util import SimpleRecord, AvroValue __all__ = [ @@ -35,5 +27,5 @@ "Processor", "SimpleRecord", "AvroValue", - "AgentContext" + "AgentContext", ] diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/api.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/api.py index 4b10186b9..02b541f03 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/api.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/api.py @@ -26,7 +26,7 @@ "Source", "Sink", "Processor", - "AgentContext" + "AgentContext", ] @@ -61,6 +61,7 @@ def headers(self) -> List[Tuple[str, Any]]: RecordType = Union[Record, dict, list, tuple] + class AgentContext(ABC): """The Agent context interface""" @@ -68,14 +69,12 @@ def __init__(self): """Initialize the agent context.""" pass - @abstractmethod def get_persistent_state_directory(self): """Return a path pointing to the stateful agent directory. Return None if not configured in the agent.""" pass - class Agent(ABC): """The Agent interface""" diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/__main__.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/__main__.py index f0c3b6e87..e39d6dddf 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/__main__.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/__main__.py @@ -29,7 +29,9 @@ if len(sys.argv) != 4: print("Missing gRPC target or config or agent context") - print("usage: python -m langstream_grpc ") + print( + "usage: python -m langstream_grpc " + ) sys.exit(1) server = AgentServer(sys.argv[1], sys.argv[2], sys.argv[3]) diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/api.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/api.py index dbc74ecec..db22cb9a5 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/api.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/api.py @@ -26,7 +26,7 @@ "Source", "Sink", "Processor", - "AgentContext" + "AgentContext", ] @@ -69,7 +69,6 @@ def __init__(self): """Initialize the agent context.""" pass - def get_persistent_state_directory(self): """Return a path pointing to the stateful agent directory. Return None if not configured in the agent.""" return None diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py index 20eb01fb4..fb7eb19b4 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py @@ -44,14 +44,7 @@ SinkResponse, ) from langstream_grpc.proto.agent_pb2_grpc import AgentServiceServicer -from .api import ( - Source, - Sink, - Processor, - Record, - Agent, - AgentContext -) +from .api import Source, Sink, Processor, Record, Agent, AgentContext from .util import SimpleRecord, AvroValue @@ -344,6 +337,7 @@ def init_agent(configuration, context) -> Agent: call_method_if_exists(agent, "init", configuration, context_impl) return agent + class AgentContextImpl(AgentContext): def __init__(self, configuration: dict, context: dict): self.configuration = configuration diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/server_and_stub.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/server_and_stub.py index 90286e95c..77e360780 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/server_and_stub.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/server_and_stub.py @@ -16,14 +16,15 @@ from typing import Optional -import grpc, json +import grpc +import json from langstream_grpc.grpc_service import AgentServer from langstream_grpc.proto.agent_pb2_grpc import AgentServiceStub class ServerAndStub(object): - def __init__(self, class_name, agent_config = {}, context = {}): + def __init__(self, class_name, agent_config={}, context={}): self.config = agent_config.copy() self.config["className"] = class_name self.context = context @@ -32,7 +33,9 @@ def __init__(self, class_name, agent_config = {}, context = {}): self.stub: Optional[AgentServiceStub] = None def __enter__(self): - self.server = AgentServer("[::]:0", json.dumps(self.config), json.dumps(self.context)) + self.server = AgentServer( + "[::]:0", json.dumps(self.config), json.dumps(self.context) + ) self.server.start() self.channel = grpc.insecure_channel("localhost:%d" % self.server.port) self.stub = AgentServiceStub(channel=self.channel) diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_processor.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_processor.py index bd42b6b00..25ae85b93 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_processor.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/tests/test_grpc_processor.py @@ -212,11 +212,10 @@ def test_info(): assert info.json_info == '{"test-info-key": "test-info-value"}' - def test_init_one_parameter(): with ServerAndStub( "langstream_grpc.tests.test_grpc_processor.ProcessorInitOneParameter", - {"my-param": "my-value"} + {"my-param": "my-value"}, ) as server_and_stub: for response in server_and_stub.stub.process( iter([ProcessorRequest(records=[GrpcRecord()])]) @@ -228,17 +227,18 @@ def test_init_one_parameter(): def test_processor_use_context(): with ServerAndStub( - "langstream_grpc.tests.test_grpc_processor.ProcessorUseContext", - {"my-param": "my-value"}, - {"persistentStateDirectory": "/tmp/processor"} + "langstream_grpc.tests.test_grpc_processor.ProcessorUseContext", + {"my-param": "my-value"}, + {"persistentStateDirectory": "/tmp/processor"}, ) as server_and_stub: for response in server_and_stub.stub.process( - iter([ProcessorRequest(records=[GrpcRecord()])]) + iter([ProcessorRequest(records=[GrpcRecord()])]) ): assert len(response.results) == 1 result = response.results[0].records[0] assert result.value.string_value == "directory is /tmp/processor" + class MyProcessor(Processor): def agent_info(self) -> Dict[str, Any]: return {"test-info-key": "test-info-value"} @@ -281,15 +281,13 @@ def process(self, record: Record) -> Future[List[RecordType]]: return self.executor.submit(lambda r: [r], record) - class ProcessorInitOneParameter(Processor): def init(self, agent_config): self.myparam = agent_config["my-param"] def process(self, record: Record) -> List[RecordType]: - return [{ - "value": self.myparam - }] + return [{"value": self.myparam}] + class ProcessorUseContext(Processor): def init(self, agent_config, context: AgentContext): @@ -297,6 +295,9 @@ def init(self, agent_config, context: AgentContext): self.context = context def process(self, record: Record) -> List[RecordType]: - return [{ - "value": "directory is " + str(self.context.get_persistent_state_directory()) - }] + return [ + { + "value": "directory is " + + str(self.context.get_persistent_state_directory()) + } + ] From f1b01e294bd8a26bcf9c6127dfb815fefe205da7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Wed, 25 Oct 2023 15:20:40 +0200 Subject: [PATCH 7/8] comments --- .../src/main/python/langstream/api.py | 4 ---- .../src/main/python/langstream_grpc/api.py | 5 +---- .../src/main/python/langstream_grpc/grpc_service.py | 4 ++-- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/api.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/api.py index 02b541f03..350318f7c 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/api.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream/api.py @@ -65,10 +65,6 @@ def headers(self) -> List[Tuple[str, Any]]: class AgentContext(ABC): """The Agent context interface""" - def __init__(self): - """Initialize the agent context.""" - pass - @abstractmethod def get_persistent_state_directory(self): """Return a path pointing to the stateful agent directory. Return None if not configured in the agent.""" diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/api.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/api.py index db22cb9a5..47fc37824 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/api.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/api.py @@ -65,10 +65,7 @@ def headers(self) -> List[Tuple[str, Any]]: class AgentContext(ABC): """The Agent context interface""" - def __init__(self): - """Initialize the agent context.""" - pass - + @abstractmethod def get_persistent_state_directory(self): """Return a path pointing to the stateful agent directory. Return None if not configured in the agent.""" return None diff --git a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py index fb7eb19b4..a51548e6d 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py +++ b/langstream-runtime/langstream-runtime-impl/src/main/python/langstream_grpc/grpc_service.py @@ -333,12 +333,12 @@ def init_agent(configuration, context) -> Agent: module_name = full_class_name[: -len(class_name) - 1] module = importlib.import_module(module_name) agent = getattr(module, class_name)() - context_impl = AgentContextImpl(configuration, context) + context_impl = DefaultAgentContext(configuration, context) call_method_if_exists(agent, "init", configuration, context_impl) return agent -class AgentContextImpl(AgentContext): +class DefaultAgentContext(AgentContext): def __init__(self, configuration: dict, context: dict): self.configuration = configuration self.context = context From 53c4d7f475ca627e8e51a3ae9c7a5358937228c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Wed, 25 Oct 2023 15:31:40 +0200 Subject: [PATCH 8/8] fix cci --- .github/workflows/ci.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bd98ef8aa..7716ceb99 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -198,7 +198,9 @@ jobs: - name: Run tests run: | set -e - ./docker/build.sh runtime-base-docker-image && ./docker/build.sh runtime && ./docker/build.sh runtime-tester + ./docker/build.sh runtime-base-docker-image + ./docker/build.sh runtime + ./docker/build.sh runtime-tester ./mvnw -ntp install -pl langstream-e2e-tests -DskipTests -Dspotless.skip -Dlicense.skip export LANGSTREAM_TESTS_CLI_BIN=$(pwd)/bin/langstream ./mvnw -ntp verify -pl langstream-e2e-tests -De2eTests -Dgroups="cli"