Skip to content

Commit

Permalink
[python] Add a CLI command to import pip requirements.txt (#652)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Oct 26, 2023
1 parent 3223c02 commit c6af8f7
Show file tree
Hide file tree
Showing 8 changed files with 334 additions and 39 deletions.
28 changes: 6 additions & 22 deletions examples/applications/python-processor-exclamation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The code in `example.py` adds an exclamation mark to the end of a string message
## Deploy the LangStream application

```
./bin/langstream apps deploy test -app examples/applications/python-processor-exclamation -i examples/instances/kafka-kubernetes.yaml -s examples/secrets/secrets.yaml
./bin/langstream docker run test -app examples/applications/python-processor-exclamation
```

## Talk with the Chat bot using the CLI
Expand All @@ -16,29 +16,13 @@ Since the application opens a gateway, we can use the gateway API to send and co
./bin/langstream gateway chat test -cg consume-output -pg produce-input -p sessionId=$(uuidgen)
```

## Start a Producer
```
kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.35.1-kafka-3.4.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic input-topic
```
# How to import the dependencies

Insert a String:
If you want to try to import the requirements.txt file you can use this command

```
> Hello World
```


## Start a Consumer

Start a Kafka Consumer on a terminal

```
kubectl -n kafka run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.35.1-kafka-3.4.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic output-topic --from-beginning
```

You should see the message with an exclamation mark at the end:

./bin/langstream python load-pip-requirements -app examples/applications/python-processor-exclamation
```
Hello World!
```

This step is not needed to run the sample application, but you can use use this sample application
to get started with your own Python processor code.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
uvicorn==0.12.2
fastapi==0.63.0
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
RootGatewayCmd.class,
RootProfileCmd.class,
RootDockerCmd.class,
RootPythonCmd.class,
AutoComplete.GenerateCompletion.class
})
public class RootCmd {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ai.langstream.cli.commands;

import ai.langstream.cli.commands.python.LoadPythonDependenciesCmd;
import lombok.Getter;
import picocli.CommandLine;

@CommandLine.Command(
name = "python",
header = "Tools for Python developers",
subcommands = {LoadPythonDependenciesCmd.class})
@Getter
public class RootPythonCmd {
@CommandLine.ParentCommand private RootCmd rootCmd;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import ai.langstream.cli.commands.VersionProvider;
import ai.langstream.cli.commands.applications.MermaidAppDiagramGenerator;
import ai.langstream.cli.commands.applications.UIAppCmd;
import ai.langstream.cli.util.DockerImageUtils;
import ai.langstream.cli.util.LocalFileReferenceResolver;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -156,20 +157,9 @@ public class LocalRunApplicationCmd extends BaseDockerCmd {
@SneakyThrows
public void run() {

if (dockerImageVersion != null && dockerImageVersion.endsWith("-SNAPSHOT")) {
// built-from-sources, not a release
dockerImageVersion = "latest-dev";
}
DockerImageUtils.DockerImage dockerImage =
DockerImageUtils.computeDockerImage(dockerImageVersion, dockerImageName);

if (dockerImageName == null) {
if (dockerImageVersion != null && dockerImageVersion.equals("latest-dev")) {
// built-from-sources, not a release
dockerImageName = "langstream/langstream-runtime-tester";
} else {
// default to latest
dockerImageName = "ghcr.io/langstream/langstream-runtime-tester";
}
}
startBroker = !dryRun && startBroker;
startDatabase = !dryRun && startDatabase;
startS3 = !dryRun && startS3;
Expand Down Expand Up @@ -201,7 +191,7 @@ public void run() {
log("Start S3: " + startS3);
log("Start Database: " + startDatabase);
log("Start Webservices " + startWebservices);
log("Using docker image: " + dockerImageName + ":" + dockerImageVersion);
log("Using docker image: " + dockerImage.getFullName());

if (appDirectory == null) {
throw new IllegalArgumentException("application files are required");
Expand Down Expand Up @@ -271,7 +261,8 @@ public void run() {
startS3,
startWebservices,
startDatabase,
dryRun);
dryRun,
dockerImage);
}

private void cleanEnvironment() {
Expand Down Expand Up @@ -312,12 +303,13 @@ private void executeOnDocker(
boolean startS3,
boolean startWebservices,
boolean startDatabase,
boolean dryRun)
boolean dryRun,
DockerImageUtils.DockerImage dockerImage)
throws Exception {
final File appTmp = prepareAppDirectory(appDirectory);
File tmpInstanceFile = prepareInstanceFile(instanceContents);
File tmpSecretsFile = prepareSecretsFile(secretsContents);
String imageName = dockerImageName + ":" + dockerImageVersion;
String imageName = dockerImage.getFullName();
List<String> commandLine = new ArrayList<>();
commandLine.add(dockerCommand);
commandLine.add("run");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ai.langstream.cli.commands.python;

import ai.langstream.cli.commands.BaseCmd;
import ai.langstream.cli.commands.RootCmd;
import ai.langstream.cli.commands.RootPythonCmd;
import picocli.CommandLine;

public abstract class BasePythonCmd extends BaseCmd {

@CommandLine.ParentCommand private RootPythonCmd rootPythonCmd;

@Override
protected RootCmd getRootCmd() {
return rootPythonCmd.getRootCmd();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ai.langstream.cli.commands.python;

import ai.langstream.cli.commands.VersionProvider;
import ai.langstream.cli.util.DockerImageUtils;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import lombok.SneakyThrows;
import org.apache.commons.io.input.Tailer;
import org.apache.commons.io.input.TailerListener;
import picocli.CommandLine;

@CommandLine.Command(
name = "load-pip-requirements",
header = "Process python dependencies in requirements.txt")
public class LoadPythonDependenciesCmd extends BasePythonCmd {

private static final AtomicReference<ProcessHandle> dockerProcess = new AtomicReference<>();

@CommandLine.Option(
names = {"-app", "--application"},
description = "Application directory path",
required = true)
private String appPath;

@CommandLine.Option(
names = {"--docker-args"},
description = "Additional docker arguments")
private List<String> dockerAdditionalArgs = new ArrayList<>();

@CommandLine.Option(
names = {"--docker-command"},
description = "Command to run docker")
private String dockerCommand = "docker";

@CommandLine.Option(
names = {"--langstream-runtime-version"},
description = "Version of the LangStream runtime to use")
private String dockerImageVersion = VersionProvider.getMavenVersion();

@CommandLine.Option(
names = {"--langstream-runtime-docker-image"},
description = "Docker image of the LangStream runtime to use")
private String dockerImageName;

@Override
@SneakyThrows
public void run() {

DockerImageUtils.DockerImage dockerImage =
DockerImageUtils.computeDockerImage(dockerImageVersion, dockerImageName);

if (appPath == null || appPath.isEmpty()) {
throw new IllegalArgumentException("application files are required");
}

final File appDirectory = new File(appPath);

log("Using docker image: " + dockerImage.getFullName());

downloadDependencies(appDirectory.toPath(), getClient(), this::log);

Runtime.getRuntime().addShutdownHook(new Thread(this::cleanEnvironment));

executeOnDocker(appDirectory, dockerImage);
}

private void cleanEnvironment() {
if (dockerProcess.get() != null) {
dockerProcess.get().destroyForcibly();
}
}

private void executeOnDocker(File appDirectory, DockerImageUtils.DockerImage dockerImage)
throws Exception {
final File appTmp = appDirectory;

File pythonDirectory = new File(appDirectory, "python");
if (!pythonDirectory.isDirectory()) {
throw new IllegalArgumentException(
"Directory " + pythonDirectory.getAbsolutePath() + " not found");
}
File requirementsFile = new File(pythonDirectory, "requirements.txt");
if (!requirementsFile.isFile()) {
throw new IllegalArgumentException(
"File "
+ requirementsFile.getAbsolutePath()
+ " not found in "
+ pythonDirectory);
}

String imageName = dockerImage.getFullName();
List<String> commandLine = new ArrayList<>();
commandLine.add(dockerCommand);

/*
docker run --rm \
-v $(pwd):/app-code-download \
--entrypoint "" \
-w /app-code-download/python ghcr.io/langstream/langstream-runtime:0.1.0 \
/bin/bash -c 'pip3 install --target ./lib --upgrade --prefer-binary -r requirements.txt'
*/

commandLine.add("run");
commandLine.add("--rm");
commandLine.add("--entrypoint");
commandLine.add("/bin/bash");
commandLine.add("-w");
commandLine.add("/code/application/python");

commandLine.add("-v");
commandLine.add(appTmp.getAbsolutePath() + ":/code/application");

if (dockerAdditionalArgs != null) {
commandLine.addAll(dockerAdditionalArgs);
}

commandLine.add(imageName);

if (getRootCmd().isVerbose()) {
System.out.println("Executing:");
System.out.println(String.join(" ", commandLine));
}

commandLine.add("-c");
commandLine.add(
"pip3 install --target ./lib --upgrade --prefer-binary -r requirements.txt");

final Path outputLog = Files.createTempFile("langstream", ".log");
log("Logging to file: " + outputLog.toAbsolutePath());
ProcessBuilder processBuilder =
new ProcessBuilder(commandLine)
.redirectErrorStream(true)
.redirectOutput(outputLog.toFile());
Process process = processBuilder.start();
dockerProcess.set(process.toHandle());
CompletableFuture.runAsync(
() -> tailLogSysOut(outputLog), Executors.newSingleThreadExecutor());

final int exited = process.waitFor();
// wait for the log to be printed
Thread.sleep(1000);
if (exited != 0) {
throw new RuntimeException("Process exited with code " + exited);
}
}

private void tailLogSysOut(Path outputLog) {

TailerListener listener =
new TailerListener() {
@Override
public void fileNotFound() {}

@Override
public void fileRotated() {}

@Override
public void handle(Exception e) {}

@Override
public void handle(String s) {
log(s);
}

@Override
public void init(Tailer tailer) {}
};
try (final Tailer tailer =
Tailer.builder()
.setTailerListener(listener)
.setStartThread(false)
.setDelayDuration(Duration.ofMillis(100))
.setFile(outputLog.toFile())
.get(); ) {
while (true) {
tailer.run();
}
}
}
}
Loading

0 comments on commit c6af8f7

Please sign in to comment.