From dc14d12598273d19d1df3dd1311048945221b936 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 26 Oct 2023 13:50:13 +0200 Subject: [PATCH 1/7] [python] Add support for hot reload of code in docker mode --- .../agents/grpc/AbstractGrpcAgent.java | 17 ++- .../agents/grpc/GrpcAgentProcessor.java | 12 +- .../agents/grpc/PythonGrpcAgentProcessor.java | 15 ++- .../langstream/api/runner/code/AgentCode.java | 10 ++ .../api/runner/code/AgentCodeAndLoader.java | 15 +++ .../commands/docker/ApplicationWatcher.java | 107 ++++++++++++++++++ .../docker/LocalRunApplicationCmd.java | 4 + .../langstream/runtime/agent/AgentRunner.java | 25 ++-- .../runtime/agent/AgentRunnerStarter.java | 4 +- .../agent/CompositeAgentProcessor.java | 7 ++ ...AgentInfo.java => AgentAPIController.java} | 16 ++- .../runtime/agent/api/AgentInfoServlet.java | 27 ++++- .../langstream/AbstractApplicationRunner.java | 19 ++-- .../kafka/AbstractKafkaApplicationRunner.java | 6 +- .../tester/InMemoryApplicationStore.java | 14 +-- .../tester/LocalApplicationRunner.java | 16 +-- .../ai/langstream/runtime/tester/Main.java | 82 ++++++++++++++ 17 files changed, 342 insertions(+), 54 deletions(-) create mode 100644 langstream-cli/src/main/java/ai/langstream/cli/commands/docker/ApplicationWatcher.java rename langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/api/{AgentInfo.java => AgentAPIController.java} (83%) diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/AbstractGrpcAgent.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/AbstractGrpcAgent.java index 7bfabb427..34835d879 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/AbstractGrpcAgent.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/AbstractGrpcAgent.java @@ -101,13 +101,24 @@ protected Map buildAdditionalInfo() { } } - @Override - public synchronized void close() throws Exception { + protected synchronized void stop() throws Exception { + stopChannel(true); + } + + public synchronized void stopChannel(boolean wait) throws Exception { if (channel != null) { - channel.shutdown(); + ManagedChannel shutdown = channel.shutdown(); + if (wait) { + shutdown.awaitTermination(1, TimeUnit.MINUTES); + } } } + @Override + public synchronized void close() throws Exception { + stopChannel(false); + } + protected Object fromGrpc(Value value) throws IOException { if (value == null) { return null; diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/GrpcAgentProcessor.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/GrpcAgentProcessor.java index e85e8506a..8ecb07a5c 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/GrpcAgentProcessor.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/GrpcAgentProcessor.java @@ -86,10 +86,7 @@ public synchronized void process( @Override public synchronized void close() throws Exception { - if (request != null) { - request.onCompleted(); - } - super.close(); + stop(); } private SourceRecordAndResult fromGrpc( @@ -163,4 +160,11 @@ public void onCompleted() { } }; } + + protected synchronized void stop() throws Exception { + if (request != null) { + request.onCompleted(); + } + super.stop(); + } } diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java index 906e72eda..20a930516 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java @@ -39,7 +39,20 @@ public void start() throws Exception { @Override public synchronized void close() throws Exception { + stop(); + } + + @Override + protected synchronized void stop() throws Exception { if (server != null) server.close(); - super.close(); + } + + @Override + public void restart() throws Exception { + stop(); + super.stop(); + + super.start(); + start(); } } diff --git a/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentCode.java b/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentCode.java index 44c24e364..75fb2434f 100644 --- a/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentCode.java +++ b/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentCode.java @@ -58,4 +58,14 @@ default void close() throws Exception {} * @return information about the agent */ List getAgentStatus(); + + /** + * Gracefully restart the agent. + * + * @throws Exception + */ + default void restart() throws Exception { + throw new UnsupportedOperationException( + "Restart is not supported for agent type " + agentType()); + } } diff --git a/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentCodeAndLoader.java b/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentCodeAndLoader.java index 4943dfcf6..3353ee702 100644 --- a/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentCodeAndLoader.java +++ b/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentCodeAndLoader.java @@ -134,6 +134,11 @@ public void close() throws Exception { executeWithContextClassloader(AgentCode::close); } + @Override + public void restart() throws Exception { + executeWithContextClassloader(AgentCode::restart); + } + @Override public ComponentType componentType() { return callNoExceptionWithContextClassloader(AgentCode::componentType); @@ -192,6 +197,11 @@ public void close() throws Exception { executeWithContextClassloader(AgentCode::close); } + @Override + public void restart() throws Exception { + executeWithContextClassloader(AgentCode::restart); + } + @Override public ComponentType componentType() { return callNoExceptionWithContextClassloader(AgentCode::componentType); @@ -249,6 +259,11 @@ public void close() throws Exception { executeWithContextClassloader(AgentCode::close); } + @Override + public void restart() throws Exception { + executeWithContextClassloader(AgentCode::restart); + } + @Override public ComponentType componentType() { return callNoExceptionWithContextClassloader(AgentCode::componentType); diff --git a/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/ApplicationWatcher.java b/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/ApplicationWatcher.java new file mode 100644 index 000000000..0dacf6304 --- /dev/null +++ b/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/ApplicationWatcher.java @@ -0,0 +1,107 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.cli.commands.docker; + +import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; +import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE; +import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; +import static java.nio.file.StandardWatchEventKinds.OVERFLOW; + +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ApplicationWatcher { + + public static void watchApplication(Path codeDirectory) throws Exception { + try (WatchService watcher = FileSystems.getDefault().newWatchService(); ) { + + Thread watchThread = + new Thread( + () -> { + try { + watchFiles(watcher, codeDirectory); + } catch (Exception e) { + log.error("Error while watching files", e); + } + }); + + watchThread.start(); + } + } + + private static void watchFiles(WatchService watcher, Path dir) throws Exception { + WatchKey register = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); + + log.info("Watching files in {}, key {}", dir, register); + for (; ; ) { + + // wait for key to be signaled + WatchKey key; + try { + key = watcher.take(); + log.info("Received key {}", key); + } catch (InterruptedException x) { + return; + } + + for (WatchEvent event : key.pollEvents()) { + WatchEvent.Kind kind = event.kind(); + log.info("Event kind {}", kind); + + // This key is registered only + // for ENTRY_CREATE events, + // but an OVERFLOW event can + // occur regardless if events + // are lost or discarded. + if (kind == OVERFLOW) { + continue; + } + + // The filename is the + // context of the event. + WatchEvent ev = (WatchEvent) event; + Path filename = ev.context(); + + // Verify that the new + // file is a text file. + try { + // Resolve the filename against the directory. + // If the filename is "test" and the directory is "foo", + // the resolved name is "test/foo". + Path child = dir.resolve(filename); + log.info("File {} changed", child); + } catch (Exception x) { + log.error("Error while watching files", x); + continue; + } + } + + // Reset the key -- this step is critical if you want to + // receive further watch events. If the key is no longer valid, + // the directory is inaccessible so exit the loop. + boolean valid = key.reset(); + if (!valid) { + log.info("Key is not valid, exiting"); + break; + } + } + } +} diff --git a/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java b/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java index 93d7c63ca..77b542a0b 100644 --- a/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java +++ b/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java @@ -359,6 +359,10 @@ private void executeOnDocker( commandLine.add("-p"); commandLine.add("8090:8090"); + // agent control + commandLine.add("-p"); + commandLine.add("8790:8790"); + if (memory != null && !memory.isEmpty()) { commandLine.add("--memory"); commandLine.add(memory); diff --git a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunner.java b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunner.java index e1f30be9d..16fd2b9ff 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunner.java +++ b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunner.java @@ -39,7 +39,7 @@ import ai.langstream.api.runner.topics.TopicProducer; import ai.langstream.api.runtime.ComponentType; import ai.langstream.impl.nar.NarFileHandler; -import ai.langstream.runtime.agent.api.AgentInfo; +import ai.langstream.runtime.agent.api.AgentAPIController; import ai.langstream.runtime.agent.api.AgentInfoServlet; import ai.langstream.runtime.agent.api.MetricsHttpServlet; import ai.langstream.runtime.agent.simple.IdentityAgentProvider; @@ -91,7 +91,8 @@ public interface MainErrorHandler { void handleError(Throwable error); } - private static Server bootstrapHttpServer(AgentInfo agentInfo) throws Exception { + private static Server bootstrapHttpServer(AgentAPIController agentAPIController) + throws Exception { DefaultExports.initialize(); Server server = new Server(8080); log.info("Started metrics and agent server on port 8080"); @@ -101,7 +102,7 @@ private static Server bootstrapHttpServer(AgentInfo agentInfo) throws Exception context.setContextPath("/"); server.setHandler(context); context.addServlet(new ServletHolder(new MetricsHttpServlet()), "/metrics"); - context.addServlet(new ServletHolder(new AgentInfoServlet(agentInfo)), "/info"); + context.addServlet(new ServletHolder(new AgentInfoServlet(agentAPIController)), "/info"); server.start(); return server; } @@ -111,7 +112,7 @@ public static void runAgent( Path codeDirectory, Path agentsDirectory, Path basePersistentStateDirectory, - AgentInfo agentInfo, + AgentAPIController agentAPIController, Supplier continueLoop, Runnable beforeStopSource, boolean startHttpServer, @@ -123,7 +124,7 @@ public static void runAgent( codeDirectory, agentsDirectory, basePersistentStateDirectory, - agentInfo, + agentAPIController, continueLoop, beforeStopSource, startHttpServer, @@ -135,7 +136,7 @@ public void run( Path codeDirectory, Path agentsDirectory, Path basePersistentStateDirectory, - AgentInfo agentInfo, + AgentAPIController agentAPIController, Supplier continueLoop, Runnable beforeStopSource, boolean startHttpServer, @@ -181,14 +182,14 @@ public void run( AgentCodeAndLoader agentCode = initAgent(configuration, agentCodeRegistry); Server server = null; try { - server = startHttpServer ? bootstrapHttpServer(agentInfo) : null; + server = startHttpServer ? bootstrapHttpServer(agentAPIController) : null; runJavaAgent( configuration, continueLoop, agentId, topicConnectionsRuntime, agentCode, - agentInfo, + agentAPIController, beforeStopSource, codeDirectory, basePersistentStateDirectory); @@ -243,7 +244,7 @@ private static void runJavaAgent( String agentId, TopicConnectionsRuntime topicConnectionsRuntime, AgentCodeAndLoader agentCodeWithLoader, - AgentInfo agentInfo, + AgentAPIController agentAPIController, Runnable beforeStopSource, Path codeDirectory, Path basePersistentStateDirectory) @@ -302,7 +303,7 @@ private static void runJavaAgent( mainProcessor = new IdentityAgentProvider.IdentityAgentCode(); mainProcessor.setMetadata("identity", "identity", System.currentTimeMillis()); } - agentInfo.watchProcessor(mainProcessor); + agentAPIController.watchProcessor(mainProcessor); AgentSource source = null; if (agentCodeWithLoader.isSource()) { @@ -318,7 +319,7 @@ private static void runJavaAgent( source.setMetadata("topic-source", "topic-source", System.currentTimeMillis()); source.init(Map.of()); } - agentInfo.watchSource(source); + agentAPIController.watchSource(source); AgentSink sink = null; if (agentCodeWithLoader.isSink()) { @@ -332,7 +333,7 @@ private static void runJavaAgent( sink.setMetadata("topic-sink", "topic-sink", System.currentTimeMillis()); sink.init(Map.of()); } - agentInfo.watchSink(sink); + agentAPIController.watchSink(sink); String onBadRecord = configuration diff --git a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunnerStarter.java b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunnerStarter.java index 24461603d..6919229f6 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunnerStarter.java +++ b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunnerStarter.java @@ -25,7 +25,7 @@ import static ai.langstream.runtime.api.agent.AgentRunnerConstants.POD_CONFIG_ENV_DEFAULT; import ai.langstream.runtime.RuntimeStarter; -import ai.langstream.runtime.agent.api.AgentInfo; +import ai.langstream.runtime.agent.api.AgentAPIController; import ai.langstream.runtime.api.agent.RuntimePodConfiguration; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; @@ -122,7 +122,7 @@ public void start(String... args) throws Exception { codeDirectory, agentsDirectory, basePersistentStateDirectory, - new AgentInfo(), + new AgentAPIController(), continueLoop::get, null, true, diff --git a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/CompositeAgentProcessor.java b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/CompositeAgentProcessor.java index 0d34d5208..ba47c9df4 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/CompositeAgentProcessor.java +++ b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/CompositeAgentProcessor.java @@ -240,4 +240,11 @@ public final List getAgentStatus() { } return result; } + + @Override + public void restart() throws Exception { + for (AgentProcessor processor : processors) { + processor.restart(); + } + } } diff --git a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/api/AgentInfo.java b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/api/AgentAPIController.java similarity index 83% rename from langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/api/AgentInfo.java rename to langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/api/AgentAPIController.java index cb3c92f53..dbb60eed4 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/api/AgentInfo.java +++ b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/api/AgentAPIController.java @@ -20,8 +20,9 @@ import ai.langstream.api.runner.code.AgentStatusResponse; import java.util.ArrayList; import java.util.List; +import java.util.Map; -public class AgentInfo { +public class AgentAPIController { private AgentProcessor processor; private AgentCode source; private AgentCode sink; @@ -57,4 +58,17 @@ public List serveWorkerStatus() { } return result; } + + public Map restart() throws Exception { + if (source != null) { + source.restart(); + } + if (processor != null) { + processor.restart(); + } + if (sink != null) { + sink.restart(); + } + return Map.of(); + } } diff --git a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/api/AgentInfoServlet.java b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/api/AgentInfoServlet.java index 2b07e86de..4f10ef496 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/api/AgentInfoServlet.java +++ b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/api/AgentInfoServlet.java @@ -20,19 +20,38 @@ import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; import java.io.IOException; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class AgentInfoServlet extends HttpServlet { private static final ObjectMapper MAPPER = new ObjectMapper(); - private final AgentInfo agentInfo; + private final AgentAPIController agentAPIController; - public AgentInfoServlet(AgentInfo agentInfo) { - this.agentInfo = agentInfo; + public AgentInfoServlet(AgentAPIController agentAPIController) { + this.agentAPIController = agentAPIController; } @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException { resp.setContentType("application/json"); - MAPPER.writeValue(resp.getOutputStream(), agentInfo.serveWorkerStatus()); + MAPPER.writeValue(resp.getOutputStream(), agentAPIController.serveWorkerStatus()); + } + + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException { + resp.setContentType("application/json"); + String uri = req.getRequestURI(); + if (uri.endsWith("/restart")) { + try { + MAPPER.writeValue(resp.getOutputStream(), agentAPIController.restart()); + } catch (Throwable error) { + log.error("Error while restarting the agents"); + resp.getOutputStream().write((error + "").getBytes()); + resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } + } else { + resp.setStatus(HttpServletResponse.SC_NOT_FOUND); + } } } diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java index 26bcb0f85..7b0765773 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java @@ -29,7 +29,7 @@ import ai.langstream.impl.nar.NarFileHandler; import ai.langstream.impl.parser.ModelBuilder; import ai.langstream.runtime.agent.AgentRunner; -import ai.langstream.runtime.agent.api.AgentInfo; +import ai.langstream.runtime.agent.api.AgentAPIController; import ai.langstream.runtime.api.agent.RuntimePodConfiguration; import io.fabric8.kubernetes.api.model.Secret; import java.nio.file.Files; @@ -169,7 +169,7 @@ public static void setup() throws Exception { .build(); } - public record AgentRunResult(Map info) {} + public record AgentRunResult(Map info) {} protected AgentRunResult executeAgentRunners(ApplicationRuntime runtime) throws Exception { String runnerExecutionId = UUID.randomUUID().toString(); @@ -177,7 +177,7 @@ protected AgentRunResult executeAgentRunners(ApplicationRuntime runtime) throws "{} Starting Agent Runners. Running {} pods", runnerExecutionId, runtime.secrets.size()); - Map allAgentsInfo = new ConcurrentHashMap<>(); + Map allAgentsInfo = new ConcurrentHashMap<>(); try { List pods = new ArrayList<>(); runtime.secrets() @@ -212,8 +212,9 @@ protected AgentRunResult executeAgentRunners(ApplicationRuntime runtime) throws "{} AgentPod {} Started", runnerExecutionId, podConfiguration.agent().agentId()); - AgentInfo agentInfo = new AgentInfo(); - allAgentsInfo.put(podConfiguration.agent().agentId(), agentInfo); + AgentAPIController agentAPIController = new AgentAPIController(); + allAgentsInfo.put( + podConfiguration.agent().agentId(), agentAPIController); AtomicInteger numLoops = new AtomicInteger(); for (String agentWithDisk : podConfiguration.agent().agentsWithDisk()) { @@ -232,16 +233,16 @@ protected AgentRunResult executeAgentRunners(ApplicationRuntime runtime) throws codeDirectory, agentsDirectory, basePersistenceDirectory, - agentInfo, + agentAPIController, () -> { log.info( "Num loops {}/{}", numLoops.get(), maxNumLoops); return numLoops.incrementAndGet() <= maxNumLoops; }, - () -> validateAgentInfoBeforeStop(agentInfo), + () -> validateAgentInfoBeforeStop(agentAPIController), false, narFileHandler); - List infos = agentInfo.serveWorkerStatus(); + List infos = agentAPIController.serveWorkerStatus(); log.info( "{} AgentPod {} AgentInfo {}", runnerExecutionId, @@ -287,7 +288,7 @@ protected AgentRunResult executeAgentRunners(ApplicationRuntime runtime) throws return new AgentRunResult(allAgentsInfo); } - protected void validateAgentInfoBeforeStop(AgentInfo agentInfo) {} + protected void validateAgentInfoBeforeStop(AgentAPIController agentAPIController) {} @AfterEach public void resetNumLoops() { diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/AbstractKafkaApplicationRunner.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/AbstractKafkaApplicationRunner.java index d6307a601..1cdbdbf57 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/AbstractKafkaApplicationRunner.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/AbstractKafkaApplicationRunner.java @@ -23,7 +23,7 @@ import ai.langstream.AbstractApplicationRunner; import ai.langstream.kafka.extensions.KafkaContainerExtension; -import ai.langstream.runtime.agent.api.AgentInfo; +import ai.langstream.runtime.agent.api.AgentAPIController; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@ -230,11 +230,11 @@ protected static AdminClient getKafkaAdmin() { } @Override - protected void validateAgentInfoBeforeStop(AgentInfo agentInfo) { + protected void validateAgentInfoBeforeStop(AgentAPIController agentAPIController) { if (!validateConsumerOffsets) { return; } - agentInfo + agentAPIController .serveWorkerStatus() .forEach( workerStatus -> { diff --git a/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/InMemoryApplicationStore.java b/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/InMemoryApplicationStore.java index e558bc9e7..89aa300a9 100644 --- a/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/InMemoryApplicationStore.java +++ b/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/InMemoryApplicationStore.java @@ -27,7 +27,7 @@ import ai.langstream.api.storage.ApplicationStore; import ai.langstream.deployer.k8s.agents.AgentResourcesFactory; import ai.langstream.deployer.k8s.api.crds.apps.SerializedApplicationInstance; -import ai.langstream.runtime.agent.api.AgentInfo; +import ai.langstream.runtime.agent.api.AgentAPIController; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -46,7 +46,7 @@ public class InMemoryApplicationStore implements ApplicationStore { private static final ConcurrentHashMap SECRETS = new ConcurrentHashMap<>(); public interface AgentInfoCollector { - public Map collectAgentsStatus(); + public Map collectAgentsStatus(); } private static AgentInfoCollector agentsInfoCollector = Map::of; @@ -202,7 +202,7 @@ private static ApplicationStatus buildMockApplicationStatus( Map agents = new HashMap<>(); - Map agentsInfo = agentsInfoCollector.collectAgentsStatus(); + Map agentsInfo = agentsInfoCollector.collectAgentsStatus(); for (String declaredAgent : declaredAgents) { ApplicationStatus.AgentStatus agentStatus = new ApplicationStatus.AgentStatus(); @@ -217,9 +217,9 @@ private static ApplicationStatus buildMockApplicationStatus( throw new IllegalStateException("No agent runner spec for agent " + declaredAgent); } - AgentInfo agentInfo = agentsInfo.get(declaredAgent); + AgentAPIController agentAPIController = agentsInfo.get(declaredAgent); Map podStatuses = - getPodStatuses(agentInfo, agentRunnerSpec, queryPods); + getPodStatuses(agentAPIController, agentRunnerSpec, queryPods); agentStatus.setWorkers(podStatuses); agents.put(declaredAgent, agentStatus); @@ -232,7 +232,7 @@ private static ApplicationStatus buildMockApplicationStatus( } private static Map getPodStatuses( - AgentInfo agentInfo, + AgentAPIController agentAPIController, final AgentResourcesFactory.AgentRunnerSpec agentRunnerSpec, boolean queryPods) { @@ -254,7 +254,7 @@ private static Map getPodStatuses( if (queryPods) { // status of the agents - List agentStatusResponses = agentInfo.serveWorkerStatus(); + List agentStatusResponses = agentAPIController.serveWorkerStatus(); agentWorkerStatus = agentWorkerStatus.applyAgentStatus(agentStatusResponses); } diff --git a/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/LocalApplicationRunner.java b/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/LocalApplicationRunner.java index 92da0d40d..fe9dc60ca 100644 --- a/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/LocalApplicationRunner.java +++ b/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/LocalApplicationRunner.java @@ -27,7 +27,7 @@ import ai.langstream.impl.nar.NarFileHandler; import ai.langstream.impl.parser.ModelBuilder; import ai.langstream.runtime.agent.AgentRunner; -import ai.langstream.runtime.agent.api.AgentInfo; +import ai.langstream.runtime.agent.api.AgentAPIController; import ai.langstream.runtime.api.agent.RuntimePodConfiguration; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; @@ -75,7 +75,7 @@ public class LocalApplicationRunner final AtomicBoolean started = new AtomicBoolean(); - final Map allAgentsInfo = new ConcurrentHashMap<>(); + final Map allAgentsInfo = new ConcurrentHashMap<>(); public LocalApplicationRunner( Path agentsDirectory, Path codeDirectory, Path basePersistentStateDirectory) @@ -160,11 +160,11 @@ private void ensureDiskDirectories(ExecutionPlan implementation) throws IOExcept } @Override - public Map collectAgentsStatus() { + public Map collectAgentsStatus() { return new HashMap<>(allAgentsInfo); } - public record AgentRunResult(Map info) {} + public record AgentRunResult(Map info) {} public void start() { kubeServer.start(); @@ -222,19 +222,19 @@ public AgentRunResult executeAgentRunners(ApplicationRuntime runtime, List {}, false, narFileHandler); - List infos = agentInfo.serveWorkerStatus(); + List infos = agentAPIController.serveWorkerStatus(); log.info( "{} AgentPod {} AgentInfo {}", runnerExecutionId, diff --git a/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/Main.java b/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/Main.java index 04e26d6ba..a5d894028 100644 --- a/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/Main.java +++ b/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/Main.java @@ -24,22 +24,48 @@ import ai.langstream.impl.common.ApplicationPlaceholderResolver; import ai.langstream.impl.parser.ModelBuilder; import ai.langstream.impl.storage.GlobalMetadataStoreManager; +import ai.langstream.runtime.agent.api.AgentAPIController; +import ai.langstream.runtime.agent.api.AgentInfoServlet; +import ai.langstream.runtime.agent.api.MetricsHttpServlet; import ai.langstream.webservice.LangStreamControlPlaneWebApplication; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; + +import java.io.IOException; +import java.net.InetAddress; +import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; + +import io.prometheus.client.hotspot.DefaultExports; +import jakarta.servlet.http.HttpServlet; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; import lombok.extern.slf4j.Slf4j; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; + +import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; +import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE; +import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; +import static java.nio.file.StandardWatchEventKinds.OVERFLOW; @Slf4j public class Main { public static void main(String... args) { try { + + String tenant = System.getenv().getOrDefault("LANSGSTREAM_TESTER_TENANT", "tenant"); String applicationId = System.getenv().getOrDefault("LANSGSTREAM_TESTER_APPLICATIONID", "app"); @@ -137,10 +163,15 @@ public static void main(String... args) { agentsIdToKeepInStats.add(singleAgentId); } + + + Server server = null; try (LocalApplicationRunner runner = new LocalApplicationRunner( Paths.get(agentsDirectory), codeDirectory, basePersistentStatePath); ) { + server = bootstrapHttpServer(runner); + server.start(); InMemoryApplicationStore.setAgentsInfoCollector(runner); InMemoryApplicationStore.setFilterAgents(agentsIdToKeepInStats); @@ -176,6 +207,10 @@ public static void main(String... args) { runner.executeAgentRunners(applicationRuntime, agentsToRun); } + } finally { + if (server != null) { + server.stop(); + } } } catch (Throwable error) { @@ -189,4 +224,51 @@ private static ObjectMapper yamlPrinter() { .enable(SerializationFeature.INDENT_OUTPUT) .enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS); } + + private static Server bootstrapHttpServer(LocalApplicationRunner runner) throws Exception { + DefaultExports.initialize(); + Server server = new Server(8790); + log.info("Started local agent controller service at port 8790"); + String url = "http://" + InetAddress.getLocalHost().getCanonicalHostName() + ":8790"; + log.info("The addresses should be {}/metrics and {}/info}", url, url); + ServletContextHandler context = new ServletContextHandler(); + context.setContextPath("/"); + server.setHandler(context); + context.addServlet(new ServletHolder(new MetricsHttpServlet()), "/metrics"); + context.addServlet(new ServletHolder(new AgentControllerServlet(runner)), "/info"); + server.start(); + return server; + } + + private static class AgentControllerServlet extends HttpServlet { + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final LocalApplicationRunner agentAPIController; + + public AgentControllerServlet(LocalApplicationRunner runner) { + this.agentAPIController = runner; + } + + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException { + resp.setContentType("application/json"); + String uri = req.getRequestURI(); + if (uri.endsWith("/restart")) { + try { + Collection agents = agentAPIController.collectAgentsStatus().values(); + List> result = new ArrayList<>(); + for (AgentAPIController controller : agents) { + result.add(controller.restart()); + } + MAPPER.writeValue(resp.getOutputStream(), result); + } catch (Throwable error) { + log.error("Error while restarting the agents"); + resp.getOutputStream().write((error + "").getBytes()); + resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } + } else { + resp.setStatus(HttpServletResponse.SC_NOT_FOUND); + } + } + } } From c443533b954683d6fb280d4abe74d1603306d7d6 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 26 Oct 2023 14:35:07 +0200 Subject: [PATCH 2/7] More work --- .../agents/grpc/GrpcAgentProcessor.java | 24 +++++-- .../agents/grpc/PythonGrpcAgentProcessor.java | 9 ++- .../agents/grpc/PythonGrpcServer.java | 8 ++- .../langstream/api/runner/code/AgentCode.java | 9 ++- .../commands/docker/ApplicationWatcher.java | 37 ++++++----- .../docker/LocalRunApplicationCmd.java | 62 +++++++++++++------ .../ai/langstream/runtime/tester/Main.java | 3 +- 7 files changed, 101 insertions(+), 51 deletions(-) diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/GrpcAgentProcessor.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/GrpcAgentProcessor.java index 8ecb07a5c..295c1a72c 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/GrpcAgentProcessor.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/GrpcAgentProcessor.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import lombok.extern.slf4j.Slf4j; @@ -40,6 +41,8 @@ public class GrpcAgentProcessor extends AbstractGrpcAgent implements AgentProces private final StreamObserver responseObserver = getResponseObserver(); + private final AtomicBoolean restarting = new AtomicBoolean(false); + private record RecordAndSink( ai.langstream.api.runner.code.Record sourceRecord, RecordSink sink) {} @@ -58,6 +61,7 @@ public synchronized void onNewSchemaToSend(Schema schema) { @Override public void start() throws Exception { + restarting.set(false); super.start(); request = AgentServiceGrpc.newStub(channel).withWaitForReady().process(responseObserver); } @@ -147,21 +151,31 @@ public void onNext(ProcessorResponse response) { @Override public void onError(Throwable throwable) { - agentContext.criticalFailure( - new RuntimeException( - "gRPC server sent error: %s".formatted(throwable.getMessage()), - throwable)); + if (!restarting.get()) { + agentContext.criticalFailure( + new RuntimeException( + "gRPC server sent error: %s".formatted(throwable.getMessage()), + throwable)); + } else { + log.info("Ignoring error during restart {}", throwable + ""); + } } @Override public void onCompleted() { - agentContext.criticalFailure( + if (!restarting.get()) { + agentContext.criticalFailure( new RuntimeException("gRPC server completed the stream unexpectedly")); + } else { + log.info("Ignoring error server stop during restart"); + } } }; } protected synchronized void stop() throws Exception { + log.info("Restarting..."); + restarting.set(true); if (request != null) { request.onCompleted(); } diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java index 20a930516..63213f668 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentProcessor.java @@ -39,20 +39,19 @@ public void start() throws Exception { @Override public synchronized void close() throws Exception { - stop(); + if (server != null) server.close(false); } @Override protected synchronized void stop() throws Exception { - if (server != null) server.close(); + if (server != null) server.close(true); } @Override public void restart() throws Exception { - stop(); super.stop(); - - super.start(); + stop(); start(); + super.start(); } } diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java index ec72cb478..46c1ca0a2 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcServer.java @@ -117,14 +117,16 @@ private AgentContextConfiguration computeAgentContextConfiguration() { return agentContextConfiguration; } - public void close() throws Exception { + public void close(boolean ignoreErrors) throws Exception { if (pythonProcess != null) { pythonProcess.destroy(); int exitCode = pythonProcess.waitFor(); log.info("Python process exited with code {}", exitCode); - if (exitCode != 0) { - throw new RuntimeException("Python code exited with code " + exitCode); + if (!ignoreErrors) { + if (exitCode != 0) { + throw new RuntimeException("Python code exited with code " + exitCode); + } } } } diff --git a/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentCode.java b/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentCode.java index 75fb2434f..3be317e90 100644 --- a/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentCode.java +++ b/langstream-api/src/main/java/ai/langstream/api/runner/code/AgentCode.java @@ -16,12 +16,18 @@ package ai.langstream.api.runner.code; import ai.langstream.api.runtime.ComponentType; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.List; import java.util.Map; /** Body of the agent */ public interface AgentCode extends AutoCloseable { + static final Logger log = LoggerFactory.getLogger(AgentCode.class); + String agentId(); /** @@ -65,7 +71,6 @@ default void close() throws Exception {} * @throws Exception */ default void restart() throws Exception { - throw new UnsupportedOperationException( - "Restart is not supported for agent type " + agentType()); + log.info("Restart is not supported for agent type {}", agentType()); } } diff --git a/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/ApplicationWatcher.java b/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/ApplicationWatcher.java index 0dacf6304..8440f6de4 100644 --- a/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/ApplicationWatcher.java +++ b/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/ApplicationWatcher.java @@ -20,37 +20,41 @@ import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; import static java.nio.file.StandardWatchEventKinds.OVERFLOW; -import java.nio.file.FileSystems; import java.nio.file.Path; import java.nio.file.WatchEvent; import java.nio.file.WatchKey; import java.nio.file.WatchService; +import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; @Slf4j public class ApplicationWatcher { - public static void watchApplication(Path codeDirectory) throws Exception { - try (WatchService watcher = FileSystems.getDefault().newWatchService(); ) { + public static void watchApplication( + Path codeDirectory, Consumer changedFiles, WatchService watcher) + throws Exception { - Thread watchThread = - new Thread( - () -> { - try { - watchFiles(watcher, codeDirectory); - } catch (Exception e) { - log.error("Error while watching files", e); - } - }); + Thread watchThread = + new Thread( + () -> { + try { + System.out.println("Watching files in " + codeDirectory); + watchFiles(watcher, codeDirectory, changedFiles); + } catch (Throwable e) { + e.printStackTrace(); + log.error("Error while watching files", e); + } + }); - watchThread.start(); - } + watchThread.start(); } - private static void watchFiles(WatchService watcher, Path dir) throws Exception { + private static void watchFiles(WatchService watcher, Path dir, Consumer changedFiles) + throws Exception { WatchKey register = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); log.info("Watching files in {}, key {}", dir, register); + System.out.println("Watching files in " + dir + ", key " + register); for (; ; ) { // wait for key to be signaled @@ -58,6 +62,7 @@ private static void watchFiles(WatchService watcher, Path dir) throws Exception try { key = watcher.take(); log.info("Received key {}", key); + System.out.println("Received key " + key); } catch (InterruptedException x) { return; } @@ -65,6 +70,7 @@ private static void watchFiles(WatchService watcher, Path dir) throws Exception for (WatchEvent event : key.pollEvents()) { WatchEvent.Kind kind = event.kind(); log.info("Event kind {}", kind); + System.out.println("Event kind " + kind); // This key is registered only // for ENTRY_CREATE events, @@ -88,6 +94,7 @@ private static void watchFiles(WatchService watcher, Path dir) throws Exception // the resolved name is "test/foo". Path child = dir.resolve(filename); log.info("File {} changed", child); + changedFiles.accept(filename.toAbsolutePath().toString()); } catch (Exception x) { log.error("Error while watching files", x); continue; diff --git a/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java b/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java index 77b542a0b..cdd717614 100644 --- a/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java +++ b/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java @@ -31,8 +31,10 @@ import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.WatchService; import java.nio.file.attribute.PosixFilePermissions; import java.time.Duration; import java.util.ArrayList; @@ -384,29 +386,49 @@ private void executeOnDocker( System.out.println(String.join(" ", commandLine)); } - final Path outputLog = Files.createTempFile("langstream", ".log"); - log("Logging to file: " + outputLog.toAbsolutePath()); - ProcessBuilder processBuilder = - new ProcessBuilder(commandLine) - .redirectErrorStream(true) - .redirectOutput(outputLog.toFile()); - Process process = processBuilder.start(); - dockerProcess.set(process.toHandle()); - CompletableFuture.runAsync( - () -> tailLogSysOut(outputLog), Executors.newSingleThreadExecutor()); - - if (startUI) { - Executors.newSingleThreadExecutor() - .execute(() -> startUI(tenant, applicationId, outputLog, process)); - } - final int exited = process.waitFor(); - // wait for the log to be printed - Thread.sleep(1000); - if (exited != 0) { - throw new RuntimeException("Process exited with code " + exited); + try (WatchService watcher = FileSystems.getDefault().newWatchService(); ) { + startWatchService(appTmp.toPath(), watcher); + + final Path outputLog = Files.createTempFile("langstream", ".log"); + log("Logging to file: " + outputLog.toAbsolutePath()); + ProcessBuilder processBuilder = + new ProcessBuilder(commandLine) + .redirectErrorStream(true) + .redirectOutput(outputLog.toFile()); + Process process = processBuilder.start(); + dockerProcess.set(process.toHandle()); + CompletableFuture.runAsync( + () -> tailLogSysOut(outputLog), Executors.newSingleThreadExecutor()); + + if (startUI) { + Executors.newSingleThreadExecutor() + .execute(() -> startUI(tenant, applicationId, outputLog, process)); + } + + final int exited = process.waitFor(); + // wait for the log to be printed + Thread.sleep(1000); + if (exited != 0) { + throw new RuntimeException("Process exited with code " + exited); + } } } + private void startWatchService(Path applicationDirectory, WatchService watcher) + throws Exception { + + ApplicationWatcher.watchApplication( + applicationDirectory, + file -> { + if (file.endsWith(".py")) { + log("A python file has changed, restarting the application"); + } else { + log("A file has changed"); + } + }, + watcher); + } + private File prepareSecretsFile(String secretsContents) throws IOException { File tmpSecretsFile = null; if (secretsContents != null) { diff --git a/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/Main.java b/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/Main.java index a5d894028..a08857160 100644 --- a/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/Main.java +++ b/langstream-runtime/langstream-runtime-tester/src/main/java/ai/langstream/runtime/tester/Main.java @@ -235,7 +235,7 @@ private static Server bootstrapHttpServer(LocalApplicationRunner runner) throws context.setContextPath("/"); server.setHandler(context); context.addServlet(new ServletHolder(new MetricsHttpServlet()), "/metrics"); - context.addServlet(new ServletHolder(new AgentControllerServlet(runner)), "/info"); + context.addServlet(new ServletHolder(new AgentControllerServlet(runner)), "/commands/*"); server.start(); return server; } @@ -252,6 +252,7 @@ public AgentControllerServlet(LocalApplicationRunner runner) { @Override protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException { resp.setContentType("application/json"); + log.info("Received request {}", req.getRequestURI()); String uri = req.getRequestURI(); if (uri.endsWith("/restart")) { try { From 5d50962fba3bb4104f58ed979cc637ad23ae7f0f Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 26 Oct 2023 14:35:55 +0200 Subject: [PATCH 3/7] Fix build --- .../java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java | 2 +- .../java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java index f05d1eeca..edec6cf5c 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java @@ -39,7 +39,7 @@ public void start() throws Exception { @Override public synchronized void close() throws Exception { - if (server != null) server.close(); + if (server != null) server.close(false); super.close(); } } diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java index 26d871a20..8789ff075 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java @@ -39,7 +39,7 @@ public void start() throws Exception { @Override public synchronized void close() throws Exception { - if (server != null) server.close(); + if (server != null) server.close(false); super.close(); } } From 7b84a1de52b61bebfa9d95b4258d8d1f4678b8a9 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 26 Oct 2023 14:54:27 +0200 Subject: [PATCH 4/7] More work --- .../agents/grpc/AbstractGrpcAgent.java | 16 +++++---- .../agents/grpc/GrpcAgentProcessor.java | 18 +++++++--- .../docker/LocalRunApplicationCmd.java | 34 +++++++++++++++++-- 3 files changed, 55 insertions(+), 13 deletions(-) diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/AbstractGrpcAgent.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/AbstractGrpcAgent.java index 34835d879..f31fdbbb5 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/AbstractGrpcAgent.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/AbstractGrpcAgent.java @@ -102,21 +102,25 @@ protected Map buildAdditionalInfo() { } protected synchronized void stop() throws Exception { - stopChannel(true); + stopChannel(false); } - public synchronized void stopChannel(boolean wait) throws Exception { - if (channel != null) { - ManagedChannel shutdown = channel.shutdown(); + public void stopChannel(boolean wait) throws Exception { + ManagedChannel currentChannel; + synchronized(this) { + currentChannel = channel; + } + if (currentChannel != null) { + ManagedChannel shutdown = currentChannel.shutdown(); if (wait) { - shutdown.awaitTermination(1, TimeUnit.MINUTES); + shutdown.awaitTermination(10, TimeUnit.SECONDS); } } } @Override public synchronized void close() throws Exception { - stopChannel(false); + stopChannel(true); } protected Object fromGrpc(Value value) throws IOException { diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/GrpcAgentProcessor.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/GrpcAgentProcessor.java index 295c1a72c..62616c94e 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/GrpcAgentProcessor.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/GrpcAgentProcessor.java @@ -84,7 +84,15 @@ public synchronized void process( } } if (requestBuilder.getRecordsCount() > 0) { - request.onNext(requestBuilder.build()); + try { + request.onNext(requestBuilder.build()); + } catch (IllegalStateException stopped) { + if (restarting.get()) { + log.info("Ignoring error during restart {}", stopped + ""); + } else { + throw stopped; + } + } } } @@ -173,11 +181,13 @@ public void onCompleted() { }; } - protected synchronized void stop() throws Exception { + protected void stop() throws Exception { log.info("Restarting..."); restarting.set(true); - if (request != null) { - request.onCompleted(); + synchronized(this) { + if (request != null) { + request.onCompleted(); + } } super.stop(); } diff --git a/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java b/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java index cdd717614..2a9390bfa 100644 --- a/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java +++ b/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java @@ -30,6 +30,8 @@ import ai.langstream.cli.util.LocalFileReferenceResolver; import java.io.File; import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; import java.nio.charset.StandardCharsets; import java.nio.file.FileSystems; import java.nio.file.Files; @@ -98,6 +100,11 @@ public class LocalRunApplicationCmd extends BaseDockerCmd { description = "Start the UI") private boolean startUI = true; + @CommandLine.Option( + names = {"--watch-files"}, + description = "Start the UI") + private boolean watchFiles = true; + @CommandLine.Option( names = {"-up", "--ui-port"}, description = "Port for the local webserver and UI. If 0, a random port will be used.", @@ -387,7 +394,9 @@ private void executeOnDocker( } try (WatchService watcher = FileSystems.getDefault().newWatchService(); ) { - startWatchService(appTmp.toPath(), watcher); + if (watchFiles) { + startWatchService(appTmp.toPath(), watcher); + } final Path outputLog = Files.createTempFile("langstream", ".log"); log("Logging to file: " + outputLog.toAbsolutePath()); @@ -420,15 +429,34 @@ private void startWatchService(Path applicationDirectory, WatchService watcher) ApplicationWatcher.watchApplication( applicationDirectory, file -> { - if (file.endsWith(".py")) { + if (file.endsWith("/python") || file.endsWith(".py")) { log("A python file has changed, restarting the application"); + restartAgents(); } else { - log("A file has changed"); + log("A file has changed: " + file); } }, watcher); } + private static void restartAgents() { + HttpURLConnection urlConnection = null; + try { + URL url = new URL("http://localhost:7890/commands/restart"); + System.out.println("Calling " + url + " to restart the agents"); + urlConnection = (HttpURLConnection) url.openConnection(); + urlConnection.setRequestMethod("POST"); + Object content = urlConnection.getContent(); + System.out.println("Response: " + content); + } catch (Exception e) { + System.err.println("Could not restart the agents: " + e.getMessage()); + } finally { + if (urlConnection != null) { + urlConnection.disconnect(); + } + } + } + private File prepareSecretsFile(String secretsContents) throws IOException { File tmpSecretsFile = null; if (secretsContents != null) { From ed4b9b857c003215432d5b5fb22b29c89ce40160 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 26 Oct 2023 15:03:59 +0200 Subject: [PATCH 5/7] Last fix --- .../cli/commands/docker/LocalRunApplicationCmd.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java b/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java index 2a9390bfa..316ff7416 100644 --- a/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java +++ b/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java @@ -395,7 +395,15 @@ private void executeOnDocker( try (WatchService watcher = FileSystems.getDefault().newWatchService(); ) { if (watchFiles) { - startWatchService(appTmp.toPath(), watcher); + Path python = appTmp.toPath().resolve("python"); + if (Files.isDirectory(python)) { // only watch if python is present + startWatchService(appTmp.toPath(), watcher); + } else { + log( + "Python directory " + + python.toAbsolutePath() + + " not found, not watching files"); + } } final Path outputLog = Files.createTempFile("langstream", ".log"); @@ -442,7 +450,7 @@ private void startWatchService(Path applicationDirectory, WatchService watcher) private static void restartAgents() { HttpURLConnection urlConnection = null; try { - URL url = new URL("http://localhost:7890/commands/restart"); + URL url = new URL("http://localhost:8790/commands/restart"); System.out.println("Calling " + url + " to restart the agents"); urlConnection = (HttpURLConnection) url.openConnection(); urlConnection.setRequestMethod("POST"); From aca9f223f411fa2e06241800c2e506680f1206dc Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 26 Oct 2023 15:41:14 +0200 Subject: [PATCH 6/7] Fix tests --- .../langstream/agents/grpc/PythonGrpcAgentSink.java | 13 +++++++++++++ .../agents/grpc/PythonGrpcAgentSource.java | 13 +++++++++++++ .../commands/docker/LocalRunApplicationCmdTest.java | 5 +++++ 3 files changed, 31 insertions(+) diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java index edec6cf5c..0662290b8 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSink.java @@ -42,4 +42,17 @@ public synchronized void close() throws Exception { if (server != null) server.close(false); super.close(); } + + @Override + protected synchronized void stop() throws Exception { + if (server != null) server.close(true); + } + + @Override + public void restart() throws Exception { + super.stop(); + stop(); + start(); + super.start(); + } } diff --git a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java index 8789ff075..9e8d671af 100644 --- a/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java +++ b/langstream-agents/langstream-agent-grpc/src/main/java/ai/langstream/agents/grpc/PythonGrpcAgentSource.java @@ -42,4 +42,17 @@ public synchronized void close() throws Exception { if (server != null) server.close(false); super.close(); } + + @Override + protected synchronized void stop() throws Exception { + if (server != null) server.close(true); + } + + @Override + public void restart() throws Exception { + super.stop(); + stop(); + start(); + super.start(); + } } diff --git a/langstream-cli/src/test/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmdTest.java b/langstream-cli/src/test/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmdTest.java index ff9a16651..ba66f8653 100644 --- a/langstream-cli/src/test/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmdTest.java +++ b/langstream-cli/src/test/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmdTest.java @@ -29,9 +29,12 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; + +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.SystemUtils; import org.junit.jupiter.api.Test; +@Slf4j class LocalRunApplicationCmdTest extends CommandTestBase { @Test @@ -60,6 +63,7 @@ void testArgs() throws Exception { final List lines = result.out().lines().collect(Collectors.toList()); final String lastLine = lines.get(lines.size() - 1); + log.info("Last line: {}", lastLine); assertTrue( lastLine.contains( "run --rm -i -e START_BROKER=true -e START_MINIO=true -e START_HERDDB=true " @@ -72,6 +76,7 @@ void testArgs() throws Exception { + "--add-host my-cluster-kafka-bootstrap.kafka:127.0.0.1 " + "-p 8091:8091 " + "-p 8090:8090 " + + "-p 8790:8790 " + "ghcr.io/langstream/langstream-runtime-tester:unknown")); final List volumes = extractVolumes(lastLine); From 4a81c59f75e8fe8ac5d608847462ad927d4ed4c0 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 26 Oct 2023 15:44:22 +0200 Subject: [PATCH 7/7] Remove debug --- .../cli/commands/docker/ApplicationWatcher.java | 8 -------- .../cli/commands/docker/LocalRunApplicationCmd.java | 10 ++++------ .../commands/docker/LocalRunApplicationCmdTest.java | 1 - 3 files changed, 4 insertions(+), 15 deletions(-) diff --git a/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/ApplicationWatcher.java b/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/ApplicationWatcher.java index 8440f6de4..f3d8718de 100644 --- a/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/ApplicationWatcher.java +++ b/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/ApplicationWatcher.java @@ -38,11 +38,9 @@ public static void watchApplication( new Thread( () -> { try { - System.out.println("Watching files in " + codeDirectory); watchFiles(watcher, codeDirectory, changedFiles); } catch (Throwable e) { e.printStackTrace(); - log.error("Error while watching files", e); } }); @@ -54,23 +52,18 @@ private static void watchFiles(WatchService watcher, Path dir, Consumer WatchKey register = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); log.info("Watching files in {}, key {}", dir, register); - System.out.println("Watching files in " + dir + ", key " + register); for (; ; ) { // wait for key to be signaled WatchKey key; try { key = watcher.take(); - log.info("Received key {}", key); - System.out.println("Received key " + key); } catch (InterruptedException x) { return; } for (WatchEvent event : key.pollEvents()) { WatchEvent.Kind kind = event.kind(); - log.info("Event kind {}", kind); - System.out.println("Event kind " + kind); // This key is registered only // for ENTRY_CREATE events, @@ -93,7 +86,6 @@ private static void watchFiles(WatchService watcher, Path dir, Consumer // If the filename is "test" and the directory is "foo", // the resolved name is "test/foo". Path child = dir.resolve(filename); - log.info("File {} changed", child); changedFiles.accept(filename.toAbsolutePath().toString()); } catch (Exception x) { log.error("Error while watching files", x); diff --git a/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java b/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java index 316ff7416..8f67f4938 100644 --- a/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java +++ b/langstream-cli/src/main/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmd.java @@ -102,7 +102,7 @@ public class LocalRunApplicationCmd extends BaseDockerCmd { @CommandLine.Option( names = {"--watch-files"}, - description = "Start the UI") + description = "Watch files and apply the changes automatically (only Python files)") private boolean watchFiles = true; @CommandLine.Option( @@ -447,17 +447,15 @@ private void startWatchService(Path applicationDirectory, WatchService watcher) watcher); } - private static void restartAgents() { + private void restartAgents() { HttpURLConnection urlConnection = null; try { URL url = new URL("http://localhost:8790/commands/restart"); - System.out.println("Calling " + url + " to restart the agents"); urlConnection = (HttpURLConnection) url.openConnection(); urlConnection.setRequestMethod("POST"); - Object content = urlConnection.getContent(); - System.out.println("Response: " + content); + urlConnection.getContent(); } catch (Exception e) { - System.err.println("Could not restart the agents: " + e.getMessage()); + log("Could not reload the agents: " + e); } finally { if (urlConnection != null) { urlConnection.disconnect(); diff --git a/langstream-cli/src/test/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmdTest.java b/langstream-cli/src/test/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmdTest.java index ba66f8653..011aa30cb 100644 --- a/langstream-cli/src/test/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmdTest.java +++ b/langstream-cli/src/test/java/ai/langstream/cli/commands/docker/LocalRunApplicationCmdTest.java @@ -29,7 +29,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; - import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.SystemUtils; import org.junit.jupiter.api.Test;