Skip to content

Commit

Permalink
Add python-kafka integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 22, 2024
1 parent 61b5a98 commit d950df6
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 20 deletions.
18 changes: 18 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ async fn passthrough_nodejs() {
.expect("Shotover did not shutdown within 10s");
}

#[tokio::test]
async fn passthrough_python() {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/passthrough/topology.yaml")
.start()
.await;

test_helpers::connection::kafka::python::run_python_smoke_test("127.0.0.1:9192").await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}

#[rstest]
#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
Expand Down
1 change: 1 addition & 0 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
pub mod cpp;
pub mod java;
pub mod node;
pub mod python;

use anyhow::Result;
#[cfg(feature = "kafka-cpp-driver-tests")]
Expand Down
26 changes: 6 additions & 20 deletions test-helpers/src/connection/kafka/node.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::path::Path;

use crate::run_command_async;

pub async fn run_node_smoke_test(address: &str) {
let dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("src/connection/kafka/node");
let config = format!(
Expand All @@ -8,8 +10,8 @@ pub async fn run_node_smoke_test(address: &str) {
brokers: ["{address}"],
}})"#
);
run_command(&dir, "npm", &["install"]).await;
run_command(&dir, "npm", &["start", &config]).await;
run_command_async(&dir, "npm", &["install"]).await;
run_command_async(&dir, "npm", &["start", &config]).await;
}

pub async fn run_node_smoke_test_scram(address: &str, user: &str, password: &str) {
Expand All @@ -25,22 +27,6 @@ pub async fn run_node_smoke_test_scram(address: &str, user: &str, password: &str
}}
}})"#
);
run_command(&dir, "npm", &["install"]).await;
run_command(&dir, "npm", &["start", &config]).await;
}

async fn run_command(current_dir: &Path, command: &str, args: &[&str]) -> String {
let output = tokio::process::Command::new(command)
.args(args)
.current_dir(current_dir)
.output()
.await
.unwrap();

let stdout = String::from_utf8(output.stdout).unwrap();
let stderr = String::from_utf8(output.stderr).unwrap();
if !output.status.success() {
panic!("command {command} {args:?} failed:\nstdout:\n{stdout}\nstderr:\n{stderr}")
}
stdout
run_command_async(&dir, "npm", &["install"]).await;
run_command_async(&dir, "npm", &["start", &config]).await;
}
64 changes: 64 additions & 0 deletions test-helpers/src/connection/kafka/python.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use tokio::process::Command;

use crate::run_command_async;
use std::{
path::{Path, PathBuf},
time::Duration,
};

pub async fn run_python_smoke_test(address: &str) {
ensure_uv_is_installed().await;

let project_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("src/connection/kafka/python");
let uv_binary = uv_binary_path();
let config = format!(
r#"{{
'bootstrap_servers': ["{address}"],
}}"#
);
tokio::time::timeout(
Duration::from_secs(30),
run_command_async(
&project_dir,
uv_binary.to_str().unwrap(),
&["run", "main.py", &config],
),
)
.await
.unwrap();
}

/// Install a specific version of UV to:
/// * avoid developers having to manually install an external tool
/// * avoid issues due to a different version being installed
pub async fn ensure_uv_is_installed() {
let uv_binary = uv_binary_path();

if let Ok(output) = Command::new(uv_binary).arg("--help").output().await {
if output.status.success() {
// already correctly installed
return;
}
}

// Install to this custom path to avoid overwriting any UV already installed by the user.
// Specifically uses `..` instead of absolute path to avoid spaces messing up the bash script
let path = "../target/uv";

run_command_async(
Path::new("."),
"bash",
&[
"-c",
&format!("curl -LsSf https://astral.sh/uv/0.4.6/install.sh | env INSTALLER_NO_MODIFY_PATH=1 UV_INSTALL_DIR={path} sh"),
],
)
.await
}

fn uv_binary_path() -> PathBuf {
Path::new(env!("CARGO_MANIFEST_DIR"))
.parent()
.unwrap()
.join("target/uv/bin/uv")
}
1 change: 1 addition & 0 deletions test-helpers/src/connection/kafka/python/.python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.12
30 changes: 30 additions & 0 deletions test-helpers/src/connection/kafka/python/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from kafka import KafkaConsumer
from kafka import KafkaProducer
import sys

def main():
config = eval(sys.argv[1])
print("Running kafka-python script with config:")
print(config)

producer = KafkaProducer(**config)
producer.send('test_topic', b'some_message_bytes').get(timeout=10)
producer.send('test_topic', b'another_message').get(timeout=10)

consumer = KafkaConsumer('test_topic', auto_offset_reset='earliest', **config)

msg = next(consumer)
assert(msg.topic == "test_topic")
assert(msg.value == b"some_message_bytes")
assert(msg.offset == 0)

msg = next(consumer)
assert(msg.topic == "test_topic")
assert(msg.value == b"another_message")
assert(msg.offset == 1)

print("kafka-python script passed all test cases")


if __name__ == "__main__":
main()
9 changes: 9 additions & 0 deletions test-helpers/src/connection/kafka/python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[project]
name = "python"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"kafka-python-ng>=2.2.3",
]
22 changes: 22 additions & 0 deletions test-helpers/src/connection/kafka/python/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions test-helpers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod shotover_process;
mod test_tracing;

use anyhow::{anyhow, Result};
use std::path::Path;
use subprocess::{Exec, Redirection};

/// Runs a command and returns the output as a string.
Expand Down Expand Up @@ -36,3 +37,16 @@ pub fn run_command(command: &str, args: &[&str]) -> Result<String> {
))
}
}

pub async fn run_command_async(current_dir: &Path, command: &str, args: &[&str]) {
let output = tokio::process::Command::new(command)
.args(args)
.current_dir(current_dir)
.status()
.await
.unwrap();

if !output.success() {
panic!("command {command} {args:?} failed. See above output.")
}
}

0 comments on commit d950df6

Please sign in to comment.