From cf221e3bfd8cfb852507a840a569ba05b6afc0a4 Mon Sep 17 00:00:00 2001 From: xxchan Date: Sat, 13 Apr 2024 02:18:14 +0800 Subject: [PATCH 01/26] style: improve source test and risedev-env style Signed-off-by: xxchan --- Makefile.toml | 38 +++++----- ci/scripts/e2e-source-test.sh | 51 ++++++------- .../scripts/e2e-full-standalone-demo.sh | 4 +- src/risedevtool/src/risedev_env.rs | 71 +++++++++---------- 4 files changed, 77 insertions(+), 87 deletions(-) diff --git a/Makefile.toml b/Makefile.toml index 37d11266f6d6..295c506081d7 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -558,8 +558,8 @@ env_files = ["${PREFIX_CONFIG}/risedev-env"] script = ''' #!/usr/bin/env bash cat < "${PREFIX_CONFIG}/psql-env" -export PGHOST=$RW_FRONTEND_LISTEN_ADDRESS -export PGPORT=$RW_FRONTEND_PORT +export PGHOST=$RISEDEV_RW_FRONTEND_LISTEN_ADDRESS +export PGPORT=$RISEDEV_RW_FRONTEND_PORT export PGUSER=root export PGDATABASE=dev EOF @@ -576,7 +576,7 @@ dependencies = ["check-risedev-env-file"] env_files = ["${PREFIX_CONFIG}/risedev-env"] script = ''' #!/usr/bin/env bash -psql -h $RW_FRONTEND_LISTEN_ADDRESS -p $RW_FRONTEND_PORT -U root -d dev "$@" +psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev "$@" ''' [tasks.ctl] @@ -883,7 +883,7 @@ cargo build \ """ [tasks.stest] -category = "RiseDev - Deterministic Simulation" +category = "RiseDev - Test - Deterministic Simulation" description = "Run unit tests in deterministic simulation mode" dependencies = ["install-nextest"] env = { CARGO_TARGET_DIR = "target/sim" } @@ -908,7 +908,7 @@ cargo nextest run \ """ [tasks.sit-test] -category = "RiseDev - Deterministic Simulation" +category = "RiseDev - Test - Deterministic Simulation" description = "Run integration tests in deterministic simulation mode" dependencies = ["install-nextest"] env = { CARGO_TARGET_DIR = "target/sim" } @@ -923,7 +923,7 @@ cargo nextest run \ """ [tasks.sarchive-it-test] -category = "RiseDev - Deterministic Simulation" +category = "RiseDev - Test - Deterministic Simulation" description = "Archive integration tests in deterministic simulation mode" dependencies = ["install-nextest"] env = { CARGO_TARGET_DIR = "target/sim" } @@ -939,7 +939,7 @@ cargo nextest archive \ """ [tasks.scheck] -category = "RiseDev - Deterministic Simulation" +category = "RiseDev - Test - Deterministic Simulation" description = "Run cargo check in deterministic simulation mode" dependencies = ["warn-on-missing-tools"] env = { CARGO_TARGET_DIR = "target/sim" } @@ -964,7 +964,7 @@ cargo check \ """ [tasks.sslt] -category = "RiseDev - Deterministic Simulation" +category = "RiseDev - Test - Deterministic Simulation" description = "Run e2e tests in deterministic simulation mode" dependencies = ["warn-on-missing-tools"] env = { CARGO_TARGET_DIR = "target/sim" } @@ -978,7 +978,7 @@ cargo run \ """ [tasks.sslt-build-all] -category = "RiseDev - Deterministic Simulation" +category = "RiseDev - Test - Deterministic Simulation" description = "Build deterministic simulation runner and tests" dependencies = ["warn-on-missing-tools"] env = { CARGO_TARGET_DIR = "target/sim" } @@ -993,7 +993,7 @@ cargo build \ """ [tasks.sslt-cov] -category = "RiseDev - Deterministic Simulation" +category = "RiseDev - Test - Deterministic Simulation" description = "Run e2e tests in deterministic simulation mode and report code coverage" dependencies = ["install-llvm-cov"] env = { CARGO_TARGET_DIR = "target/sim-cov" } @@ -1290,42 +1290,42 @@ echo "All processes has exited." """ [tasks.slt] -category = "RiseDev - SQLLogicTest" -install_crate = { version = "0.19.1", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ +env = { SLT_HOST = "${RISEDEV_RW_FRONTEND_LISTEN_ADDRESS}", SLT_PORT = "${RISEDEV_RW_FRONTEND_PORT}", SLT_DB = "dev", RISEDEV_HIDE_CARGO_MAKE_LOG = "true" } +category = "RiseDev - Test - SQLLogicTest" +install_crate = { version = "0.20.0", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ "--help", ], install_command = "binstall" } dependencies = ["check-risedev-env-file"] env_files = ["${PREFIX_CONFIG}/risedev-env"] -env = { SLT_HOST = "${RW_FRONTEND_LISTEN_ADDRESS}", SLT_PORT = "${RW_FRONTEND_PORT}", SLT_DB = "dev" } command = "sqllogictest" args = ["${@}"] description = "🌟 Run SQLLogicTest" [tasks.slt-streaming] -category = "RiseDev - SQLLogicTest" +category = "RiseDev - Test - SQLLogicTest" extend = "slt" args = ["${@}", "./e2e_test/streaming/**/*.slt"] description = "Run all streaming e2e tests" [tasks.slt-batch] -category = "RiseDev - SQLLogicTest" +category = "RiseDev - Test - SQLLogicTest" extend = "slt" args = ["${@}", "./e2e_test/batch/*.slt"] description = "Run all batch e2e tests" [tasks.slt-generated] -category = "RiseDev - SQLLogicTest" +category = "RiseDev - Test - SQLLogicTest" extend = "slt" args = ["${@}", "./e2e_test/generated/**/*.slt"] description = "Run all generated e2e tests" [tasks.slt-all] -category = "RiseDev - SQLLogicTest" +category = "RiseDev - Test - SQLLogicTest" run_task = { name = ["slt-streaming", "slt-batch", "slt-generated"] } description = "Run all e2e tests" [tasks.docslt] -category = "RiseDev - SQLLogicTest" +category = "RiseDev - Test - SQLLogicTest" description = "Extract SQL examples written in SQLLogicTest syntax from Rust doc comments" script = ''' #!/usr/bin/env bash @@ -1400,7 +1400,7 @@ UPDATE_EXPECT=1 cargo test -p risingwave_connector tests::test_with_options_yaml ''' [tasks.backwards-compat-test] -category = "RiseDev - Backwards Compatibility Test" +category = "RiseDev - Test - Backwards Compatibility Test" description = "Run backwards compatibility test" script = "./backwards-compat-tests/scripts/run_local.sh" diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index f3c99c404f6e..8e1ea0fa2b4b 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -5,15 +5,6 @@ set -euo pipefail source ci/scripts/common.sh -# Arguments: -# $1: subject name -# $2: schema file path -function register_schema_registry() { - curl -X POST http://message_queue:8081/subjects/"$1"/versions \ - -H ‘Content-Type: application/vnd.schemaregistry.v1+json’ \ - --data-binary @<(jq -n --arg schema “$(cat "$2")” ‘{schemaType: “PROTOBUF”, schema: "$schema"}’) -} - # prepare environment export CONNECTOR_LIBS_PATH="./connector-node/libs" @@ -73,26 +64,26 @@ echo 'db.runCommand({ping: 1})' | mongo mongodb://mongodb:27017 echo '> rs config' echo 'rs.conf()' | mongo mongodb://mongodb:27017 echo '> run test..' -sqllogictest -p 4566 -d dev './e2e_test/source/cdc/mongodb/**/*.slt' +risedev slt './e2e_test/source/cdc/mongodb/**/*.slt' echo "--- inline cdc test" export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456 -sqllogictest -p 4566 -d dev './e2e_test/source/cdc_inline/**/*.slt' +risedev slt './e2e_test/source/cdc_inline/**/*.slt' echo "--- opendal source test" -sqllogictest -p 4566 -d dev './e2e_test/source/opendal/**/*.slt' +risedev slt './e2e_test/source/opendal/**/*.slt' echo "--- mysql & postgres cdc validate test" -sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.validate.mysql.slt' -sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.validate.postgres.slt' +risedev slt './e2e_test/source/cdc/cdc.validate.mysql.slt' +risedev slt './e2e_test/source/cdc/cdc.validate.postgres.slt' echo "--- cdc share source test" # cdc share stream test cases export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456 -sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.share_stream.slt' +risedev slt './e2e_test/source/cdc/cdc.share_stream.slt' # create a share source and check whether heartbeat message is received -sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.create_source_job.slt' +risedev slt './e2e_test/source/cdc/cdc.create_source_job.slt' table_id=$(psql -U root -h localhost -p 4566 -d dev -t -c "select id from rw_internal_tables where name like '%mysql_source%';" | xargs); table_count=$(psql -U root -h localhost -p 4566 -d dev -t -c "select count(*) from rw_table(${table_id}, public);" | xargs); if [ "$table_count" -eq 0 ]; then @@ -101,10 +92,10 @@ if [ "$table_count" -eq 0 ]; then fi echo "--- mysql & postgres load and check" -sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.load.slt' +risedev slt './e2e_test/source/cdc/cdc.load.slt' # wait for cdc loading sleep 10 -sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.check.slt' +risedev slt './e2e_test/source/cdc/cdc.check.slt' # kill cluster risedev kill @@ -135,10 +126,10 @@ echo "> wait for cluster recovery finish" sleep 20 echo "> check mviews after cluster recovery" # check results -sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.check_new_rows.slt' +risedev slt './e2e_test/source/cdc/cdc.check_new_rows.slt' # drop relations -sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc_share_stream_drop.slt' +risedev slt './e2e_test/source/cdc/cdc_share_stream_drop.slt' echo "--- Kill cluster" risedev ci-kill @@ -153,30 +144,30 @@ echo "make sure google/protobuf/source_context.proto is NOT in schema registry" curl --silent 'http://message_queue:8081/subjects'; echo # curl --silent --head -X GET 'http://message_queue:8081/subjects/google%2Fprotobuf%2Fsource_context.proto/versions' | grep 404 curl --silent 'http://message_queue:8081/subjects' | grep -v 'google/protobuf/source_context.proto' -sqllogictest -p 4566 -d dev './e2e_test/schema_registry/pb.slt' -sqllogictest -p 4566 -d dev './e2e_test/schema_registry/alter_sr.slt' +risedev slt './e2e_test/schema_registry/pb.slt' +risedev slt './e2e_test/schema_registry/alter_sr.slt' echo "--- Kill cluster" risedev ci-kill echo "--- e2e, ci-kafka-plus-pubsub, kafka and pubsub source" RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ -risedev ci-start ci-pubsub +risedev ci-start ci-pubsub-kafka ./scripts/source/prepare_ci_kafka.sh cargo run --bin prepare_ci_pubsub -sqllogictest -p 4566 -d dev './e2e_test/source/basic/*.slt' -sqllogictest -p 4566 -d dev './e2e_test/source/basic/old_row_format_syntax/*.slt' -sqllogictest -p 4566 -d dev './e2e_test/source/basic/alter/kafka.slt' +risedev slt './e2e_test/source/basic/*.slt' +risedev slt './e2e_test/source/basic/old_row_format_syntax/*.slt' +risedev slt './e2e_test/source/basic/alter/kafka.slt' echo "--- e2e, kafka alter source" chmod +x ./scripts/source/prepare_data_after_alter.sh ./scripts/source/prepare_data_after_alter.sh 2 -sqllogictest -p 4566 -d dev './e2e_test/source/basic/alter/kafka_after_new_data.slt' +risedev slt './e2e_test/source/basic/alter/kafka_after_new_data.slt' echo "--- e2e, kafka alter source again" ./scripts/source/prepare_data_after_alter.sh 3 -sqllogictest -p 4566 -d dev './e2e_test/source/basic/alter/kafka_after_new_data_2.slt' +risedev slt './e2e_test/source/basic/alter/kafka_after_new_data_2.slt' echo "--- Run CH-benCHmark" -risedev slt -p 4566 -d dev './e2e_test/ch_benchmark/batch/ch_benchmark.slt' -risedev slt -p 4566 -d dev './e2e_test/ch_benchmark/streaming/*.slt' +risedev slt './e2e_test/ch_benchmark/batch/ch_benchmark.slt' +risedev slt './e2e_test/ch_benchmark/streaming/*.slt' diff --git a/src/cmd_all/scripts/e2e-full-standalone-demo.sh b/src/cmd_all/scripts/e2e-full-standalone-demo.sh index 6c8c01740731..28469aaddbe7 100755 --- a/src/cmd_all/scripts/e2e-full-standalone-demo.sh +++ b/src/cmd_all/scripts/e2e-full-standalone-demo.sh @@ -67,8 +67,8 @@ sleep 15 # FIXME: Integrate standalone into risedev, so we can reuse risedev-env functionality here. cat << EOF > "$RW_PREFIX"/config/risedev-env RW_META_ADDR="http://0.0.0.0:5690" -RW_FRONTEND_LISTEN_ADDRESS="0.0.0.0" -RW_FRONTEND_PORT="4566" +RISEDEV_RW_FRONTEND_LISTEN_ADDRESS="0.0.0.0" +RISEDEV_RW_FRONTEND_PORT="4566" EOF echo "--- Setting up table" diff --git a/src/risedevtool/src/risedev_env.rs b/src/risedevtool/src/risedev_env.rs index f4866353dff7..bae57b275b5d 100644 --- a/src/risedevtool/src/risedev_env.rs +++ b/src/risedevtool/src/risedev_env.rs @@ -25,50 +25,49 @@ use crate::{add_hummock_backend, HummockInMemoryStrategy, ServiceConfig}; pub fn generate_risedev_env(services: &Vec) -> String { let mut env = String::new(); for item in services { - if let ServiceConfig::ComputeNode(c) = item { - // RW_HUMMOCK_URL - // If the cluster is launched without a shared storage, we will skip this. - { - let mut cmd = Command::new("compute-node"); - if add_hummock_backend( - "dummy", - c.provide_opendal.as_ref().unwrap(), - c.provide_minio.as_ref().unwrap(), - c.provide_aws_s3.as_ref().unwrap(), - HummockInMemoryStrategy::Disallowed, - &mut cmd, - ) - .is_ok() + match item { + ServiceConfig::ComputeNode(c) => { + // RW_HUMMOCK_URL + // If the cluster is launched without a shared storage, we will skip this. { + let mut cmd = Command::new("compute-node"); + if add_hummock_backend( + "dummy", + c.provide_opendal.as_ref().unwrap(), + c.provide_minio.as_ref().unwrap(), + c.provide_aws_s3.as_ref().unwrap(), + HummockInMemoryStrategy::Disallowed, + &mut cmd, + ) + .is_ok() + { + writeln!( + env, + "RW_HUMMOCK_URL=\"{}\"", + cmd.get_args().nth(1).unwrap().to_str().unwrap() + ) + .unwrap(); + } + } + + // RW_META_ADDR + { + let meta_node = &c.provide_meta_node.as_ref().unwrap()[0]; writeln!( env, - "RW_HUMMOCK_URL=\"{}\"", - cmd.get_args().nth(1).unwrap().to_str().unwrap() + "RW_META_ADDR=\"http://{}:{}\"", + meta_node.address, meta_node.port ) .unwrap(); } } - - // RW_META_ADDR - { - let meta_node = &c.provide_meta_node.as_ref().unwrap()[0]; - writeln!( - env, - "RW_META_ADDR=\"http://{}:{}\"", - meta_node.address, meta_node.port - ) - .unwrap(); + ServiceConfig::Frontend(c) => { + let listen_address = &c.listen_address; + writeln!(env, "RISEDEV_RW_FRONTEND_LISTEN_ADDRESS=\"{listen_address}\"",).unwrap(); + let port = &c.port; + writeln!(env, "RISEDEV_RW_FRONTEND_PORT=\"{port}\"",).unwrap(); } - break; - } - } - for item in services { - if let ServiceConfig::Frontend(c) = item { - let listen_address = &c.listen_address; - writeln!(env, "RW_FRONTEND_LISTEN_ADDRESS=\"{listen_address}\"",).unwrap(); - let port = &c.port; - writeln!(env, "RW_FRONTEND_PORT=\"{port}\"",).unwrap(); - break; + _ => {} } } env From c98dfd5bda1fbda8618bcf24c1429a599f9e9449 Mon Sep 17 00:00:00 2001 From: xxchan Date: Sat, 13 Apr 2024 02:22:05 +0800 Subject: [PATCH 02/26] fix Signed-off-by: xxchan --- Makefile.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile.toml b/Makefile.toml index 295c506081d7..604b7b2b4e44 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -1290,7 +1290,7 @@ echo "All processes has exited." """ [tasks.slt] -env = { SLT_HOST = "${RISEDEV_RW_FRONTEND_LISTEN_ADDRESS}", SLT_PORT = "${RISEDEV_RW_FRONTEND_PORT}", SLT_DB = "dev", RISEDEV_HIDE_CARGO_MAKE_LOG = "true" } +env = { SLT_HOST = "${RISEDEV_RW_FRONTEND_LISTEN_ADDRESS}", SLT_PORT = "${RISEDEV_RW_FRONTEND_PORT}", SLT_DB = "dev" } category = "RiseDev - Test - SQLLogicTest" install_crate = { version = "0.20.0", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ "--help", From e11de323b9f17b78d2c53611c03438ce4316f8dd Mon Sep 17 00:00:00 2001 From: xxchan Date: Sat, 13 Apr 2024 03:22:00 +0800 Subject: [PATCH 03/26] fmt Signed-off-by: xxchan --- src/risedevtool/src/risedev_env.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/risedevtool/src/risedev_env.rs b/src/risedevtool/src/risedev_env.rs index bae57b275b5d..077f1ce51f82 100644 --- a/src/risedevtool/src/risedev_env.rs +++ b/src/risedevtool/src/risedev_env.rs @@ -63,7 +63,11 @@ pub fn generate_risedev_env(services: &Vec) -> String { } ServiceConfig::Frontend(c) => { let listen_address = &c.listen_address; - writeln!(env, "RISEDEV_RW_FRONTEND_LISTEN_ADDRESS=\"{listen_address}\"",).unwrap(); + writeln!( + env, + "RISEDEV_RW_FRONTEND_LISTEN_ADDRESS=\"{listen_address}\"", + ) + .unwrap(); let port = &c.port; writeln!(env, "RISEDEV_RW_FRONTEND_PORT=\"{port}\"",).unwrap(); } From 143d5692150fba293afb70845ba6ea7967bb35b3 Mon Sep 17 00:00:00 2001 From: xxchan Date: Sat, 13 Apr 2024 03:28:13 +0800 Subject: [PATCH 04/26] Squashed commit of the following: commit b3c8e4e4f2ae1607697ec42828c8ac3f1d3e7fb8 Author: xxchan Date: Sat Apr 13 02:39:13 2024 +0800 fix Signed-off-by: xxchan commit a575c3a3e1b2e3ef76008b768d5e03f842942d9c Author: xxchan Date: Sat Apr 13 02:38:15 2024 +0800 fix Signed-off-by: xxchan commit df9f9053f1a21baf893ba05652da476af5b4cd3e Author: xxchan Date: Sat Apr 13 02:26:00 2024 +0800 update Signed-off-by: xxchan commit b8859ca6d575017f43324c956014e20a922529a9 Author: xxchan Date: Sat Apr 13 02:25:14 2024 +0800 fix Signed-off-by: xxchan commit f001e31d2be245b17ecc2f6a4f6100b1a267fd0f Author: xxchan Date: Sat Apr 13 02:23:06 2024 +0800 revert risedev Signed-off-by: xxchan commit 33aa17883e19db0619329b44c439149842e87935 Merge: 530eb8678e c98dfd5bda Author: xxchan Date: Sat Apr 13 02:22:13 2024 +0800 Merge branch 'xxchan/latin-tyrannosaurus' into xxchan/source-test commit 530eb8678e56b64fa7e3c1d2889568aeb3da4633 Merge: 0b6f74c0b8 cf221e3bfd Author: xxchan Date: Sat Apr 13 02:21:42 2024 +0800 Merge branch 'xxchan/latin-tyrannosaurus' into xxchan/source-test Signed-off-by: xxchan commit 0b6f74c0b8611ce651bdc81a12e01da95934e3ba Author: xxchan Date: Sat Apr 13 02:11:40 2024 +0800 sleep more? Signed-off-by: xxchan commit 47c2c61c16e45e33d3012e45f87c1734ab06ed23 Author: xxchan Date: Fri Apr 12 22:30:09 2024 +0800 sleep more Signed-off-by: xxchan commit 1853adf016441409edee8d7eed8756ead2217749 Author: xxchan Date: Fri Apr 12 21:59:31 2024 +0800 fix Signed-off-by: xxchan commit 8711cf5aaea7f617bdddd61255de489ee0f529be Merge: f1c0185070 4f53f8927a Author: xxchan Date: Fri Apr 12 17:52:38 2024 +0800 Merge remote-tracking branch 'origin/main' into xxchan/source-test Signed-off-by: xxchan commit f1c0185070c129eb0564e967bbbf98eeda312dfe Author: xxchan Date: Fri Apr 12 17:52:12 2024 +0800 fix Signed-off-by: xxchan commit e9e2dc45c9f6def717e42147228abdf03ef96dc3 Author: xxchan Date: Fri Apr 12 16:49:20 2024 +0800 install rpk Signed-off-by: xxchan commit e2f2a8e6923948f620f8ad5a7d6b2b91fd964054 Author: xxchan Date: Fri Apr 12 15:16:44 2024 +0800 f Signed-off-by: xxchan commit bae6495a20a746df0ad2cfc87314ab219069b734 Merge: 58de398dd1 4ac029cde9 Author: xxchan Date: Fri Apr 12 15:11:54 2024 +0800 Merge remote-tracking branch 'origin/main' into xxchan/source-test commit 58de398dd1418a643654b13e6092af758dd50480 Author: xxchan Date: Fri Apr 12 15:11:49 2024 +0800 fix Signed-off-by: xxchan commit 9ba97282c95faea0b611148164e22af04cd7e0e8 Author: xxchan Date: Thu Apr 11 17:45:24 2024 +0800 fix Signed-off-by: xxchan commit d8a2489b3ffc00ce2e390cf94a6cc107081deaa4 Author: xxchan Date: Thu Apr 11 17:18:38 2024 +0800 fix Signed-off-by: xxchan commit c4b4b162d9a5d6eb1aa0f551a7b2de886773dec9 Author: xxchan Date: Thu Apr 11 17:16:25 2024 +0800 fix Signed-off-by: xxchan commit 6dddbf3aeeac2fb1862be488ab8c19a7e7d67c74 Author: xxchan Date: Thu Apr 11 17:15:00 2024 +0800 rename new to inline Signed-off-by: xxchan commit cf50f5e3d524482c5a2a392667e4ec3663463d72 Author: xxchan Date: Thu Apr 11 17:04:54 2024 +0800 bump Signed-off-by: xxchan commit 9ca9d553c3c42cd6a681cb93d0c8687ce7344942 Author: xxchan Date: Thu Apr 11 16:00:07 2024 +0800 support user-managed kafka in risedev Signed-off-by: xxchan commit d4d405d050aa185c8f0733113fb5c10eaf47df68 Author: xxchan Date: Thu Apr 11 15:27:20 2024 +0800 update Signed-off-by: xxchan commit 6ed6cfcd03c5a2baa73b739541ec6d51f5f68686 Author: xxchan Date: Thu Apr 11 10:21:25 2024 +0800 update commit f7a3dd594d8e3c30862fc2da68b4c06d58d171b2 Merge: 8b6c48203b 254ad0cc74 Author: xxchan Date: Thu Apr 11 09:45:50 2024 +0800 Merge remote-tracking branch 'origin/main' into xxchan/source-test commit 8b6c48203b8e72be123e921e9b866d5e46801282 Author: xxchan Date: Tue Apr 9 14:12:37 2024 +0800 update commit f2806032819028bc82c5b6315cb9b12952494392 Author: xxchan Date: Fri Apr 5 23:14:30 2024 +0800 add new source tests commit 0e3ace190bf54a72481b67b669fc22d3bbd2807a Author: xxchan Date: Fri Apr 5 23:13:05 2024 +0800 revert unrelated change commit a3f34095cef20ed68e58cf9a5f6526eeeee0a3e3 Author: xxchan Date: Fri Apr 5 17:58:48 2024 +0800 fix commit 0a2ded00a1d3df433a7893884b8cee64a1c67b3a Author: xxchan Date: Fri Apr 5 17:27:45 2024 +0800 fix commit 8daa64d9c717fdee8f76a6c0ee2601b7c1daf494 Author: xxchan Date: Fri Apr 5 17:19:38 2024 +0800 debug commit 8628c1fbae02be8b76f2d2a822b764465d49e706 Author: xxchan Date: Fri Apr 5 17:15:32 2024 +0800 fix commit c8bde2038e928599b0e72daede6eed02cab77035 Author: xxchan Date: Fri Apr 5 17:00:09 2024 +0800 ci: install risedev to ci image Signed-off-by: xxchan --- Makefile.toml | 1 + ci/Dockerfile | 23 ++-- ci/build-ci-image.sh | 2 +- ci/docker-compose.yml | 10 +- ci/scripts/e2e-source-test.sh | 3 + e2e_test/source/README.md | 4 + e2e_test/source_inline/README.md | 16 +++ e2e_test/source_inline/commands.toml | 108 ++++++++++++++++ .../source_inline/kafka/add_partition.slt | 74 +++++++++++ .../source_inline/kafka/consumer_group.slt | 119 ++++++++++++++++++ risedev.yml | 8 +- scripts/source/prepare_ci_kafka.sh | 6 +- src/risedevtool/src/risedev_env.rs | 6 + src/risedevtool/src/service_config.rs | 2 + .../src/task/ensure_stop_service.rs | 4 + src/risedevtool/src/task/kafka_service.rs | 11 +- .../src/task/task_kafka_ready_check.rs | 8 +- 17 files changed, 384 insertions(+), 21 deletions(-) create mode 100644 e2e_test/source_inline/README.md create mode 100644 e2e_test/source_inline/commands.toml create mode 100644 e2e_test/source_inline/kafka/add_partition.slt create mode 100644 e2e_test/source_inline/kafka/consumer_group.slt diff --git a/Makefile.toml b/Makefile.toml index 604b7b2b4e44..e9085a870804 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -15,6 +15,7 @@ extend = [ { path = "src/storage/backup/integration_tests/Makefile.toml" }, { path = "src/java_binding/make-java-binding.toml" }, { path = "src/stream/tests/integration_tests/integration_test.toml" }, + { path = "e2e_test/source_inline/commands.toml" }, ] env_files = ["./risedev-components.user.env"] diff --git a/ci/Dockerfile b/ci/Dockerfile index 78c48f0c648d..1955538b599d 100644 --- a/ci/Dockerfile +++ b/ci/Dockerfile @@ -34,8 +34,10 @@ RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --no-mo ENV PATH /root/.cargo/bin/:$PATH -RUN rustup show -RUN rustup default `rustup show active-toolchain | awk '{print $1}'` +RUN rustup show && \ + rustup default `rustup show active-toolchain | awk '{print $1}'` && \ + rustup component add rustfmt llvm-tools-preview clippy && \ + rustup target add wasm32-wasi RUN curl -sSL "https://github.com/bufbuild/buf/releases/download/v1.29.0/buf-$(uname -s)-$(uname -m).tar.gz" | \ tar -xvzf - -C /usr/local --strip-components 1 @@ -45,10 +47,17 @@ RUN pip3 install pyarrow pytest # Install poetry RUN curl -sSL https://install.python-poetry.org | python3 - - -# add required rustup components -RUN rustup component add rustfmt llvm-tools-preview clippy -RUN rustup target add wasm32-wasi +# Install rpk +RUN if [ "$(uname -m)" = "amd64" ] || [ "$(uname -m)" = "x86_64" ]; then \ + curl -LO https://github.com/redpanda-data/redpanda/releases/latest/download/rpk-linux-amd64.zip && \ + unzip rpk-linux-amd64.zip -d ~/.local/bin/ && \ + rm rpk-linux-amd64.zip; \ + else \ + curl -LO https://github.com/redpanda-data/redpanda/releases/latest/download/rpk-linux-arm64.zip && \ + unzip rpk-linux-arm64.zip -d ~/.local/bin/ && \ + rm rpk-linux-arm64.zip; \ + fi +ENV PATH /root/.local/bin:$PATH ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse @@ -56,7 +65,7 @@ ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse RUN curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash RUN cargo binstall -y --no-symlinks cargo-llvm-cov cargo-nextest cargo-hakari cargo-sort cargo-cache cargo-audit \ cargo-make@0.37.9 \ - sqllogictest-bin@0.19.1 \ + sqllogictest-bin@0.20.0 \ sccache@0.7.4 \ && cargo cache -a \ && rm -rf "/root/.cargo/registry/index" \ diff --git a/ci/build-ci-image.sh b/ci/build-ci-image.sh index 1a23144a8e8f..532de3cb1831 100755 --- a/ci/build-ci-image.sh +++ b/ci/build-ci-image.sh @@ -10,7 +10,7 @@ cat ../rust-toolchain # shellcheck disable=SC2155 # REMEMBER TO ALSO UPDATE ci/docker-compose.yml -export BUILD_ENV_VERSION=v20240405_1 +export BUILD_ENV_VERSION=v20240412 export BUILD_TAG="public.ecr.aws/w1p7b4n3/rw-build-env:${BUILD_ENV_VERSION}" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 3b99cf1082df..283ee336113e 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -71,7 +71,7 @@ services: retries: 5 source-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240405_1 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240412 depends_on: - mysql - db @@ -84,7 +84,7 @@ services: - ..:/risingwave sink-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240405_1 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240412 depends_on: - mysql - db @@ -103,12 +103,12 @@ services: rw-build-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240405_1 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240412 volumes: - ..:/risingwave ci-flamegraph-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240405_1 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240412 # NOTE(kwannoel): This is used in order to permit # syscalls for `nperf` (perf_event_open), # so it can do CPU profiling. @@ -119,7 +119,7 @@ services: - ..:/risingwave regress-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240405_1 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240412 depends_on: db: condition: service_healthy diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 8e1ea0fa2b4b..8f21cf67cd8a 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -168,6 +168,9 @@ echo "--- e2e, kafka alter source again" ./scripts/source/prepare_data_after_alter.sh 3 risedev slt './e2e_test/source/basic/alter/kafka_after_new_data_2.slt' +echo "--- e2e, inline test" +risedev slt './e2e_test/source_inline/**/*.slt' + echo "--- Run CH-benCHmark" risedev slt './e2e_test/ch_benchmark/batch/ch_benchmark.slt' risedev slt './e2e_test/ch_benchmark/streaming/*.slt' diff --git a/e2e_test/source/README.md b/e2e_test/source/README.md index b6e9dfa30816..4152ab3dc973 100644 --- a/e2e_test/source/README.md +++ b/e2e_test/source/README.md @@ -1,3 +1,7 @@ +> [!NOTE] +> +> Please write new tests according to the style in `e2e_test/source_inline`. + Test in this directory needs some prior setup. See also `ci/scripts/e2e-source-test.sh`, and `scripts/source` diff --git a/e2e_test/source_inline/README.md b/e2e_test/source_inline/README.md new file mode 100644 index 000000000000..01918d30d526 --- /dev/null +++ b/e2e_test/source_inline/README.md @@ -0,0 +1,16 @@ +# "Inline" style source e2e tests + +Compared with prior source tests (`e2e_test/source`), tests in this directory are expected to be easy to run locally and easy to write. + +To run locally, use `risdev d` to start services (including external systems like Kafka and Postgres, or specify `user-managed` to use your own service). +Then use `risedev slt` to run the tests, which will load the environment variables (ports, etc.) +according to the services started by `risdev d`. + +```sh +risedev slt 'e2e_test/source-inline/**/*.slt' +``` + +To write tests, please ensure each file is self-contained and does not depend on external scripts to setup the environment. +Use `system` command to setup instead. + +Refer to https://github.com/risingwavelabs/risingwave/issues/12451#issuecomment-2051861048 for more details. diff --git a/e2e_test/source_inline/commands.toml b/e2e_test/source_inline/commands.toml new file mode 100644 index 000000000000..aab89f1fc612 --- /dev/null +++ b/e2e_test/source_inline/commands.toml @@ -0,0 +1,108 @@ +# This file contains commands used by the tests. + +[tasks.source-test-hook] +private = true +dependencies = ["check-risedev-env-file"] +env_files = ["${PREFIX_CONFIG}/risedev-env"] + +# Note about the Kafka CLI tooling: +# - Built-in Kafka console tools: +# Java based. +# Style example: kafka-topics.sh --bootstrap-server localhost:9092 --topic t --create +# Some limitations: cannot disable logging easily, cannot consume to end and then exit. +# - kcat: +# C based (rdkafka) +# Some limitations: cannot do admin operations, only consume/produce. +# - rpk: +# Golang based. +# Style example: RPK_BROKERS=localhost:9092 rpk topic create t +[tasks.kafka-hook] +private = true +description = "Check if Kafka is started by RiseDev" +dependencies = ["source-test-hook"] +script = ''' +#!/usr/bin/env sh +set -e + +if [ ! -d "${PREFIX_BIN}/kafka" ]; then + echo "Kafka is not installed in ${PREFIX_BIN}/kafka. Did you enable Kafka using $(tput setaf 4)\`./risedev configure\`$(tput sgr0)?" + exit 1 +fi + +# TODO: we may support risedev-env.override so that we can connect to a Kafka not started by risedev-dev. +if [ -z "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then + echo "RISEDEV_KAFKA_BOOTSTRAP_SERVERS is not set in risedev-env file. Did you start Kafka using $(tput setaf 4)\`./risedev d\`$(tput sgr0)?" + exit 1 +fi +''' + +[tasks.clean-kafka] +category = "RiseDev - Test - Source Test - Kafka" +description = "Delete all kafka topics." +dependencies = ["kafka-hook"] +command = "rpk" +args = ["topic", "delete", "-r", "*"] + +[tasks.kafka-topics] +category = "RiseDev - Test - Source Test - Kafka" +dependencies = ["kafka-hook"] +script = """ +#!/usr/bin/env sh +set -e +${PREFIX_BIN}/kafka/bin/kafka-topics.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTSTRAP_SERVERS} "$@" +""" + +[tasks.kafka-produce] +category = "RiseDev - Test - Source Test - Kafka" +dependencies = ["kafka-hook"] +script = """ +#!/usr/bin/env sh +set -e +${PREFIX_BIN}/kafka/bin/kafka-console-producer.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTSTRAP_SERVERS} "$@" +""" + +[tasks.kafka-consume] +category = "RiseDev - Test - Source Test - Kafka" +dependencies = ["kafka-hook"] +script = """ +#!/usr/bin/env sh +set -e +${PREFIX_BIN}/kafka/bin/kafka-console-consumer.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTSTRAP_SERVERS} "$@" +""" + +[tasks.kafka-consumer-groups] +category = "RiseDev - Test - Source Test - Kafka" +dependencies = ["kafka-hook"] +script = """ +#!/usr/bin/env sh +set -e +${PREFIX_BIN}/kafka/bin/kafka-consumer-groups.sh --bootstrap-server ${RISEDEV_KAFKA_BOOTSTRAP_SERVERS} "$@" +""" + +# rpk tools +[tasks.rpk] +category = "RiseDev - Test - Source Test - Kafka" +dependencies = ["kafka-hook"] +# check https://docs.redpanda.com/current/reference/rpk/rpk-x-options/ or rpk -X help/list for options +script = """ +#!/usr/bin/env sh +set -e + +if [ -z "$(which rpk)" ]; then + echo "rpk is not installed. Install it via https://docs.redpanda.com/current/get-started/rpk-install/" + exit 1 +fi + +rpk "$@" +""" + +[tasks.redpanda-console] +category = "RiseDev - Test - Source Test - Kafka" +description = "Start Redpanda console (Kafka GUI) at localhost:8080." +dependencies = ["kafka-hook"] +script = ''' +#!/usr/bin/env sh +set -e +echo "$(tput setaf 2)Start Redpanda console at http://localhost:8080$(tput sgr0)" +docker run --network host -e KAFKA_BROKERS=$RPK_BROKERS docker.redpanda.com/redpandadata/console:latest +''' diff --git a/e2e_test/source_inline/kafka/add_partition.slt b/e2e_test/source_inline/kafka/add_partition.slt new file mode 100644 index 000000000000..9399cf732b97 --- /dev/null +++ b/e2e_test/source_inline/kafka/add_partition.slt @@ -0,0 +1,74 @@ +# Note: control substitution on will force us to use "\\n" instead of "\n" in commands +control substitution on + +system ok +rpk topic create test_add_partition -p 3 + +system ok +cat </dev/null 2>&1 && pwd)" cd "$SCRIPT_PATH/.." || exit 1 -KAFKA_BIN="$SCRIPT_PATH/../../.risingwave/bin/kafka/bin" - echo "$SCRIPT_PATH" if [ "$1" == "compress" ]; then @@ -46,10 +44,10 @@ for filename in $kafka_data_files; do # always ok echo "Drop topic $topic" - "$KAFKA_BIN"/kafka-topics.sh --bootstrap-server message_queue:29092 --topic "$topic" --delete || true + risedev kafka-topics --topic "$topic" --delete || true echo "Recreate topic $topic with partition $partition" - "$KAFKA_BIN"/kafka-topics.sh --bootstrap-server message_queue:29092 --topic "$topic" --create --partitions "$partition") & + risedev kafka-topics --topic "$topic" --create --partitions "$partition") & done wait diff --git a/src/risedevtool/src/risedev_env.rs b/src/risedevtool/src/risedev_env.rs index 077f1ce51f82..1efdf1470998 100644 --- a/src/risedevtool/src/risedev_env.rs +++ b/src/risedevtool/src/risedev_env.rs @@ -71,6 +71,12 @@ pub fn generate_risedev_env(services: &Vec) -> String { let port = &c.port; writeln!(env, "RISEDEV_RW_FRONTEND_PORT=\"{port}\"",).unwrap(); } + ServiceConfig::Kafka(c) => { + let brokers = format!("{}:{}", c.address, c.port); + writeln!(env, r#"RISEDEV_KAFKA_BOOTSTRAP_SERVERS="{brokers}""#,).unwrap(); + writeln!(env, r#"RISEDEV_KAFKA_WITH_OPTIONS_COMMON="connector='kafka',properties.bootstrap.server='{brokers}'""#).unwrap(); + writeln!(env, r#"RPK_BROKERS="{brokers}""#).unwrap(); + } _ => {} } } diff --git a/src/risedevtool/src/service_config.rs b/src/risedevtool/src/service_config.rs index 97996bbeb510..83e90147b1b6 100644 --- a/src/risedevtool/src/service_config.rs +++ b/src/risedevtool/src/service_config.rs @@ -277,6 +277,8 @@ pub struct KafkaConfig { pub provide_zookeeper: Option>, pub persist_data: bool, pub broker_id: u32, + + pub user_managed: bool, } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] diff --git a/src/risedevtool/src/task/ensure_stop_service.rs b/src/risedevtool/src/task/ensure_stop_service.rs index 24de2d32e0bb..62c2aa7de7c3 100644 --- a/src/risedevtool/src/task/ensure_stop_service.rs +++ b/src/risedevtool/src/task/ensure_stop_service.rs @@ -31,6 +31,10 @@ impl Task for EnsureStopService { ctx.service(self); for (port, service) in &self.ports { + // Do not require stopping kafka services + if service.starts_with("kafka") { + continue; + } let address = format!("127.0.0.1:{}", port); ctx.pb.set_message(format!( diff --git a/src/risedevtool/src/task/kafka_service.rs b/src/risedevtool/src/task/kafka_service.rs index 9bbdd3ac5efa..df0eb4a0fa31 100644 --- a/src/risedevtool/src/task/kafka_service.rs +++ b/src/risedevtool/src/task/kafka_service.rs @@ -74,7 +74,16 @@ impl Task for KafkaService { cmd.arg(config_path); - ctx.run_command(ctx.tmux_run(cmd)?)?; + if !self.config.user_managed { + ctx.run_command(ctx.tmux_run(cmd)?)?; + } else { + ctx.pb.set_message("user managed"); + writeln!( + &mut ctx.log, + "Please start your Kafka at {}:{}\n\n", + self.config.listen_address, self.config.port + )?; + } ctx.pb.set_message("started"); diff --git a/src/risedevtool/src/task/task_kafka_ready_check.rs b/src/risedevtool/src/task/task_kafka_ready_check.rs index ef8823956206..79838bf8eca6 100644 --- a/src/risedevtool/src/task/task_kafka_ready_check.rs +++ b/src/risedevtool/src/task/task_kafka_ready_check.rs @@ -33,8 +33,12 @@ impl KafkaReadyCheckTask { impl Task for KafkaReadyCheckTask { fn execute(&mut self, ctx: &mut ExecuteContext) -> anyhow::Result<()> { - ctx.pb.set_message("waiting for online..."); - + if self.config.user_managed { + ctx.pb + .set_message("waiting for user-managed service online..."); + } else { + ctx.pb.set_message("waiting for online..."); + } let mut config = ClientConfig::new(); config.set( "bootstrap.servers", From 9504bbacb8fbb73cd3f4e4545537a48993addcab Mon Sep 17 00:00:00 2001 From: xxchan Date: Sat, 13 Apr 2024 03:52:09 +0800 Subject: [PATCH 05/26] why? would sleep longer help? Signed-off-by: xxchan --- e2e_test/source_inline/kafka/consumer_group.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source_inline/kafka/consumer_group.slt b/e2e_test/source_inline/kafka/consumer_group.slt index 4f326f441ea2..28bd55a7cc5b 100644 --- a/e2e_test/source_inline/kafka/consumer_group.slt +++ b/e2e_test/source_inline/kafka/consumer_group.slt @@ -44,7 +44,7 @@ c # There are 4 consumer groups, 1 for batch query, 3 for MV. All of them are "Empty" state with 0 members, because we manually `assign` partitions to them. # At the begginning, the MV's consumer group will not occur. They will be created after committing offset to Kafka. # (enable.auto.commit defaults to true, and auto.commit.interval.ms defaults to 5s) -sleep 8s +sleep 15s system ok rpk group list | tail -n +2 | cut -w -f2 | xargs -n1 -I {} sh -c "rpk group describe -s {} | grep "MEMBERS"" From bc1cce25158c2bbc9edfab838354fc3dc084129f Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 15 Apr 2024 09:44:39 +0800 Subject: [PATCH 06/26] debug --- e2e_test/source_inline/kafka/consumer_group.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source_inline/kafka/consumer_group.slt b/e2e_test/source_inline/kafka/consumer_group.slt index 28bd55a7cc5b..6a40dc78c9f8 100644 --- a/e2e_test/source_inline/kafka/consumer_group.slt +++ b/e2e_test/source_inline/kafka/consumer_group.slt @@ -47,7 +47,7 @@ c sleep 15s system ok -rpk group list | tail -n +2 | cut -w -f2 | xargs -n1 -I {} sh -c "rpk group describe -s {} | grep "MEMBERS"" +rpk group list && (rpk group list | tail -n +2 | cut -w -f2 | xargs -n1 -I {} sh -c "rpk group describe -s {} ") ---- MEMBERS 0 MEMBERS 0 From 6abc1418b07d5ec5644bc4eb9a614d75ae1ad163 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 16 Apr 2024 14:23:11 +0800 Subject: [PATCH 07/26] fix Signed-off-by: xxchan --- src/risedevtool/src/risedev_env.rs | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/src/risedevtool/src/risedev_env.rs b/src/risedevtool/src/risedev_env.rs index 0b66aa8511ac..1efdf1470998 100644 --- a/src/risedevtool/src/risedev_env.rs +++ b/src/risedevtool/src/risedev_env.rs @@ -25,30 +25,6 @@ use crate::{add_hummock_backend, HummockInMemoryStrategy, ServiceConfig}; pub fn generate_risedev_env(services: &Vec) -> String { let mut env = String::new(); for item in services { - match item { - ServiceConfig::ComputeNode(c) => { - // RW_HUMMOCK_URL - // If the cluster is launched without a shared storage, we will skip this. - { - let mut cmd = Command::new("compute-node"); - if add_hummock_backend( - "dummy", - c.provide_opendal.as_ref().unwrap(), - c.provide_minio.as_ref().unwrap(), - c.provide_aws_s3.as_ref().unwrap(), - HummockInMemoryStrategy::Disallowed, - &mut cmd, - ) - .is_ok() - { - writeln!( - env, - "RW_HUMMOCK_URL=\"{}\"", - cmd.get_args().nth(1).unwrap().to_str().unwrap() - ) - .unwrap(); - } - } match item { ServiceConfig::ComputeNode(c) => { // RW_HUMMOCK_URL From dcb3cd744f88dd813e1b03d8078ffc7920467ba7 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 16 Apr 2024 18:03:12 +0800 Subject: [PATCH 08/26] fix Signed-off-by: xxchan --- ci/scripts/e2e-source-test.sh | 3 ++ e2e_test/source_inline/README.md | 2 +- .../source_inline/kafka/consumer_group.mjs | 54 +++++++++++++++++++ .../source_inline/kafka/consumer_group.slt | 19 +++---- 4 files changed, 66 insertions(+), 12 deletions(-) create mode 100755 e2e_test/source_inline/kafka/consumer_group.mjs diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 8f21cf67cd8a..2c77873d95ab 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -24,6 +24,9 @@ while getopts 'p:' opt; do done shift $((OPTIND -1)) +apt install -y nodejs npm +npm i -g npx + download_and_prepare_rw "$profile" source echo "--- Download connector node package" diff --git a/e2e_test/source_inline/README.md b/e2e_test/source_inline/README.md index 01918d30d526..7b049e42e2c8 100644 --- a/e2e_test/source_inline/README.md +++ b/e2e_test/source_inline/README.md @@ -10,7 +10,7 @@ according to the services started by `risdev d`. risedev slt 'e2e_test/source-inline/**/*.slt' ``` -To write tests, please ensure each file is self-contained and does not depend on external scripts to setup the environment. +To write tests, please ensure each file is self-contained and does not depend on running external scripts to setup the environment. Use `system` command to setup instead. Refer to https://github.com/risingwavelabs/risingwave/issues/12451#issuecomment-2051861048 for more details. diff --git a/e2e_test/source_inline/kafka/consumer_group.mjs b/e2e_test/source_inline/kafka/consumer_group.mjs new file mode 100755 index 000000000000..05a66f4c9947 --- /dev/null +++ b/e2e_test/source_inline/kafka/consumer_group.mjs @@ -0,0 +1,54 @@ +#!npx zx + +// zx: A tool for writing better scripts +// https://google.github.io/zx/ + +const { mv: mv, topic: topic, _: _command } = minimist(process.argv.slice(3), { + string: ['mv', 'topic'], + _: ['list-members', 'list-lags'], +}) +const command = _command[0]; + +async function get_fragment_id_of_mv(mv_name) { + const id = ( + await $` + psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev \ + --csv -t -c "select fragment_id from rw_materialized_views JOIN rw_fragments on rw_materialized_views.id = rw_fragments.table_id where name='${mv_name}';" + ` + ) + .toString() + .trim(); + if (id == "") { + throw new Error(`Materialized view ${mv_name} not found`); + } + return id; +} + +async function list_consumer_groups(fragment_id) { + const res = + await $`rpk group list | tail -n +2 | cut -w -f2 | grep "rw-consumer-${fragment_id}"`; + return res; +} + +async function list_consumer_group_members(fragment_id) { + const res = + await $`rpk group list | tail -n +2 | cut -w -f2 | grep "rw-consumer-${fragment_id}" | xargs -n1 -I {} sh -c "rpk group describe -s {} | grep "MEMBERS""`; + return res; +} + +async function list_consumer_group_lags(fragment_id, topic_name) { + const res = + await $`rpk group list | tail -n +2 | cut -w -f2 | grep "rw-consumer-${fragment_id}" | xargs -n1 -I {} sh -c "rpk group describe -t {} | grep "${topic_name}""`; + return res; +} + +const fragment_id = await get_fragment_id_of_mv(mv); +if (command == "list-groups") { + echo`${await list_consumer_groups(fragment_id)}`; +} else if (command == "list-members") { + echo`${await list_consumer_group_members(fragment_id)}`; +} else if (command == "list-lags") { + echo`${await list_consumer_group_lags(fragment_id, topic)}`; +} else { + throw new Error(`Invalid command: ${command}`); +} diff --git a/e2e_test/source_inline/kafka/consumer_group.slt b/e2e_test/source_inline/kafka/consumer_group.slt index 6a40dc78c9f8..3a93bf1ca54f 100644 --- a/e2e_test/source_inline/kafka/consumer_group.slt +++ b/e2e_test/source_inline/kafka/consumer_group.slt @@ -41,25 +41,24 @@ b c -# There are 4 consumer groups, 1 for batch query, 3 for MV. All of them are "Empty" state with 0 members, because we manually `assign` partitions to them. +# There are 4 consumer groups, 1 for batch query (not listed below), 3 for MV. +# All of them are "Empty" state with 0 members, because we manually `assign` partitions to them. # At the begginning, the MV's consumer group will not occur. They will be created after committing offset to Kafka. # (enable.auto.commit defaults to true, and auto.commit.interval.ms defaults to 5s) -sleep 15s +sleep 5s system ok -rpk group list && (rpk group list | tail -n +2 | cut -w -f2 | xargs -n1 -I {} sh -c "rpk group describe -s {} ") +./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv --topic test_consumer_group list-members ---- MEMBERS 0 MEMBERS 0 MEMBERS 0 -MEMBERS 0 # The lag for batch query's group is 0, and each MV parition's group is 2 (1 of 3 consumed). system ok -rpk group list | tail -n +2 | cut -w -f2 | xargs -n1 -I {} sh -c "rpk group describe -t {} | grep "test_consumer_group"" | sort +./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv --topic test_consumer_group list-lags ---- -test_consumer_group 0 test_consumer_group 2 test_consumer_group 2 test_consumer_group 2 @@ -67,19 +66,18 @@ test_consumer_group 2 # We try to interfere by creating consumers that subscribing to the topic with the RW's group id. system ok -rpk group list | tail -n +2 | cut -w -f2 | xargs -P4 -n1 -I {} sh -c "timeout 10s rpk topic consume test_consumer_group -g {}" & +./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv --topic test_consumer_group list-groups | xargs -P4 -n1 -I {} sh -c "timeout 10s rpk topic consume test_consumer_group -g {}" & # Wait a while for them to subscribe to the topic. sleep 5s # The lag is changed to 0 system ok -rpk group list | tail -n +2 | cut -w -f2 | xargs -n1 -I {} sh -c "rpk group describe -t {} | grep "test_consumer_group"" +./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv --topic test_consumer_group list-lags ---- test_consumer_group 0 test_consumer_group 0 test_consumer_group 0 -test_consumer_group 0 system ok @@ -107,12 +105,11 @@ DROP SOURCE s CASCADE; # consumer groups are not deleted after MV is dropped. system ok -rpk group list | tail -n +2 | cut -w -f2 | xargs -n1 -I {} sh -c "rpk group describe -s {} | grep "MEMBERS"" +./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv --topic test_consumer_group list-members ---- MEMBERS 0 MEMBERS 0 MEMBERS 0 -MEMBERS 0 system ok From 8f3d92772e32e2987a000a02cede31c2b10726b8 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 16 Apr 2024 19:46:54 +0800 Subject: [PATCH 09/26] fix Signed-off-by: xxchan --- e2e_test/source_inline/kafka/consumer_group.mjs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source_inline/kafka/consumer_group.mjs b/e2e_test/source_inline/kafka/consumer_group.mjs index 05a66f4c9947..fabf84d25359 100755 --- a/e2e_test/source_inline/kafka/consumer_group.mjs +++ b/e2e_test/source_inline/kafka/consumer_group.mjs @@ -1,4 +1,4 @@ -#!npx zx +#!/usr/bin/env zx // zx: A tool for writing better scripts // https://google.github.io/zx/ From 0a98016b5a40368ad436b35eec1ac3a109dfed59 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 16 Apr 2024 20:11:40 +0800 Subject: [PATCH 10/26] fixx Signed-off-by: xxchan --- ci/scripts/e2e-source-test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 2c77873d95ab..2c69acf78064 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -25,7 +25,7 @@ done shift $((OPTIND -1)) apt install -y nodejs npm -npm i -g npx +npm i -g zx download_and_prepare_rw "$profile" source From b92c94f1231446b441d995f90043a8f7b13ecafa Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 16 Apr 2024 20:56:46 +0800 Subject: [PATCH 11/26] f Signed-off-by: xxchan --- ci/Dockerfile | 9 +++++++++ ci/build-ci-image.sh | 2 +- ci/docker-compose.yml | 10 +++++----- ci/scripts/e2e-source-test.sh | 3 --- 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/ci/Dockerfile b/ci/Dockerfile index 1955538b599d..9335b4c81ac6 100644 --- a/ci/Dockerfile +++ b/ci/Dockerfile @@ -22,6 +22,15 @@ RUN add-apt-repository ppa:deadsnakes/ppa -y && \ rm -rf /var/lib/{apt,dpkg,cache,log}/ ENV PYO3_PYTHON=python3.12 +# Install nvm and zx +ENV NVM_DIR /root/.nvm +ENV NODE_VERSION 20.11.1 +RUN curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.7/install.sh | bash \ + && . $NVM_DIR/nvm.sh \ + && nvm install $NODE_VERSION +ENV PATH $NVM_DIR/versions/node/v$NODE_VERSION/bin:$PATH +RUN npm install -g zx + SHELL ["/bin/bash", "-c"] RUN mkdir -p /risingwave diff --git a/ci/build-ci-image.sh b/ci/build-ci-image.sh index 532de3cb1831..074f60aa97e0 100755 --- a/ci/build-ci-image.sh +++ b/ci/build-ci-image.sh @@ -10,7 +10,7 @@ cat ../rust-toolchain # shellcheck disable=SC2155 # REMEMBER TO ALSO UPDATE ci/docker-compose.yml -export BUILD_ENV_VERSION=v20240412 +export BUILD_ENV_VERSION=v20240413 export BUILD_TAG="public.ecr.aws/w1p7b4n3/rw-build-env:${BUILD_ENV_VERSION}" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 283ee336113e..1d67188b1c4f 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -71,7 +71,7 @@ services: retries: 5 source-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240412 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413 depends_on: - mysql - db @@ -84,7 +84,7 @@ services: - ..:/risingwave sink-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240412 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413 depends_on: - mysql - db @@ -103,12 +103,12 @@ services: rw-build-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240412 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413 volumes: - ..:/risingwave ci-flamegraph-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240412 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413 # NOTE(kwannoel): This is used in order to permit # syscalls for `nperf` (perf_event_open), # so it can do CPU profiling. @@ -119,7 +119,7 @@ services: - ..:/risingwave regress-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240412 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413 depends_on: db: condition: service_healthy diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 2c69acf78064..8f21cf67cd8a 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -24,9 +24,6 @@ while getopts 'p:' opt; do done shift $((OPTIND -1)) -apt install -y nodejs npm -npm i -g zx - download_and_prepare_rw "$profile" source echo "--- Download connector node package" From e731307c9bb7caf75007aa8dd8e37c23baf84bee Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 16 Apr 2024 21:46:58 +0800 Subject: [PATCH 12/26] fix Signed-off-by: xxchan --- ci/build-ci-image.sh | 2 +- ci/docker-compose.yml | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ci/build-ci-image.sh b/ci/build-ci-image.sh index 074f60aa97e0..d42926cd61d4 100755 --- a/ci/build-ci-image.sh +++ b/ci/build-ci-image.sh @@ -10,7 +10,7 @@ cat ../rust-toolchain # shellcheck disable=SC2155 # REMEMBER TO ALSO UPDATE ci/docker-compose.yml -export BUILD_ENV_VERSION=v20240413 +export BUILD_ENV_VERSION=v20240413_x export BUILD_TAG="public.ecr.aws/w1p7b4n3/rw-build-env:${BUILD_ENV_VERSION}" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 1d67188b1c4f..d05613267342 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -71,7 +71,7 @@ services: retries: 5 source-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413_x depends_on: - mysql - db @@ -84,7 +84,7 @@ services: - ..:/risingwave sink-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413_x depends_on: - mysql - db @@ -103,12 +103,12 @@ services: rw-build-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413_x volumes: - ..:/risingwave ci-flamegraph-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413_x # NOTE(kwannoel): This is used in order to permit # syscalls for `nperf` (perf_event_open), # so it can do CPU profiling. @@ -119,7 +119,7 @@ services: - ..:/risingwave regress-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413_x depends_on: db: condition: service_healthy From 2bef8a761050582a1bbcefbe72e34a4e072a8151 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 16 Apr 2024 23:02:06 +0800 Subject: [PATCH 13/26] simplify script Signed-off-by: xxchan --- .../source_inline/kafka/consumer_group.mjs | 65 ++++++++++++++----- .../source_inline/kafka/consumer_group.slt | 18 ++--- 2 files changed, 55 insertions(+), 28 deletions(-) diff --git a/e2e_test/source_inline/kafka/consumer_group.mjs b/e2e_test/source_inline/kafka/consumer_group.mjs index fabf84d25359..7002f0e0d9d6 100755 --- a/e2e_test/source_inline/kafka/consumer_group.mjs +++ b/e2e_test/source_inline/kafka/consumer_group.mjs @@ -3,10 +3,14 @@ // zx: A tool for writing better scripts // https://google.github.io/zx/ -const { mv: mv, topic: topic, _: _command } = minimist(process.argv.slice(3), { - string: ['mv', 'topic'], - _: ['list-members', 'list-lags'], -}) +const { + mv: mv, + topic: topic, + _: _command, +} = minimist(process.argv.slice(3), { + string: ["mv", "topic"], + _: ["list-members", "list-lags"], +}); const command = _command[0]; async function get_fragment_id_of_mv(mv_name) { @@ -25,26 +29,57 @@ async function get_fragment_id_of_mv(mv_name) { } async function list_consumer_groups(fragment_id) { - const res = - await $`rpk group list | tail -n +2 | cut -w -f2 | grep "rw-consumer-${fragment_id}"`; - return res; + return (await $`rpk group list`) + .toString() + .trim() + .split("\n") + .slice(1) + .map((line) => { + const [_broker_id, group_name] = line.split(/\s+/); + return group_name; + }) + .filter((group_name) => { + return group_name.startsWith(`rw-consumer-${fragment_id}`); + }); +} + +async function describe_consumer_group(group_name) { + const res = await $`rpk group describe -s ${group_name}`; + // GROUP rw-consumer-1-1 + // COORDINATOR 0 + // STATE Empty + // BALANCER + // MEMBERS 0 + // TOTAL-LAG 2 + const obj = {}; + for (const line of res.toString().trim().split("\n")) { + const [key, value] = line.split(/\s+/); + obj[key] = value; + } + return obj; } async function list_consumer_group_members(fragment_id) { - const res = - await $`rpk group list | tail -n +2 | cut -w -f2 | grep "rw-consumer-${fragment_id}" | xargs -n1 -I {} sh -c "rpk group describe -s {} | grep "MEMBERS""`; - return res; + const groups = await list_consumer_groups(fragment_id); + return Promise.all( + groups.map(async (group_name) => { + return (await describe_consumer_group(group_name))["MEMBERS"] + }) + ); } -async function list_consumer_group_lags(fragment_id, topic_name) { - const res = - await $`rpk group list | tail -n +2 | cut -w -f2 | grep "rw-consumer-${fragment_id}" | xargs -n1 -I {} sh -c "rpk group describe -t {} | grep "${topic_name}""`; - return res; +async function list_consumer_group_lags(fragment_id) { + const groups = await list_consumer_groups(fragment_id); + return Promise.all( + groups.map(async (group_name) => { + return (await describe_consumer_group(group_name))["TOTAL-LAG"] + }) + ); } const fragment_id = await get_fragment_id_of_mv(mv); if (command == "list-groups") { - echo`${await list_consumer_groups(fragment_id)}`; + echo`${(await list_consumer_groups(fragment_id))}`; } else if (command == "list-members") { echo`${await list_consumer_group_members(fragment_id)}`; } else if (command == "list-lags") { diff --git a/e2e_test/source_inline/kafka/consumer_group.slt b/e2e_test/source_inline/kafka/consumer_group.slt index 3a93bf1ca54f..cac2fbf64962 100644 --- a/e2e_test/source_inline/kafka/consumer_group.slt +++ b/e2e_test/source_inline/kafka/consumer_group.slt @@ -50,23 +50,19 @@ sleep 5s system ok ./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv --topic test_consumer_group list-members ---- -MEMBERS 0 -MEMBERS 0 -MEMBERS 0 +0,0,0 # The lag for batch query's group is 0, and each MV parition's group is 2 (1 of 3 consumed). system ok ./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv --topic test_consumer_group list-lags ---- -test_consumer_group 2 -test_consumer_group 2 -test_consumer_group 2 +2,2,2 # We try to interfere by creating consumers that subscribing to the topic with the RW's group id. system ok -./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv --topic test_consumer_group list-groups | xargs -P4 -n1 -I {} sh -c "timeout 10s rpk topic consume test_consumer_group -g {}" & +./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv --topic test_consumer_group list-groups | tr ',' '\n' | xargs -P4 -n1 -I {} sh -c "timeout 10s rpk topic consume test_consumer_group -g {}" & # Wait a while for them to subscribe to the topic. sleep 5s @@ -75,9 +71,7 @@ sleep 5s system ok ./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv --topic test_consumer_group list-lags ---- -test_consumer_group 0 -test_consumer_group 0 -test_consumer_group 0 +0,0,0 system ok @@ -107,9 +101,7 @@ DROP SOURCE s CASCADE; system ok ./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv --topic test_consumer_group list-members ---- -MEMBERS 0 -MEMBERS 0 -MEMBERS 0 +0,0,0 system ok From d6566788fc0575706ac3ef15b4f99c6b07b23043 Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 16 Apr 2024 23:39:40 +0800 Subject: [PATCH 14/26] fix Signed-off-by: xxchan --- e2e_test/source_inline/kafka/consumer_group.mjs | 3 +-- e2e_test/source_inline/kafka/consumer_group.slt | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/e2e_test/source_inline/kafka/consumer_group.mjs b/e2e_test/source_inline/kafka/consumer_group.mjs index 7002f0e0d9d6..5487b0dfffcd 100755 --- a/e2e_test/source_inline/kafka/consumer_group.mjs +++ b/e2e_test/source_inline/kafka/consumer_group.mjs @@ -5,7 +5,6 @@ const { mv: mv, - topic: topic, _: _command, } = minimist(process.argv.slice(3), { string: ["mv", "topic"], @@ -83,7 +82,7 @@ if (command == "list-groups") { } else if (command == "list-members") { echo`${await list_consumer_group_members(fragment_id)}`; } else if (command == "list-lags") { - echo`${await list_consumer_group_lags(fragment_id, topic)}`; + echo`${await list_consumer_group_lags(fragment_id)}`; } else { throw new Error(`Invalid command: ${command}`); } diff --git a/e2e_test/source_inline/kafka/consumer_group.slt b/e2e_test/source_inline/kafka/consumer_group.slt index cac2fbf64962..7c2ca300b155 100644 --- a/e2e_test/source_inline/kafka/consumer_group.slt +++ b/e2e_test/source_inline/kafka/consumer_group.slt @@ -62,7 +62,7 @@ system ok # We try to interfere by creating consumers that subscribing to the topic with the RW's group id. system ok -./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv --topic test_consumer_group list-groups | tr ',' '\n' | xargs -P4 -n1 -I {} sh -c "timeout 10s rpk topic consume test_consumer_group -g {}" & +./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv --topic test_consumer_group list-groups | tr ',' '\\n' | xargs -P4 -n1 -I {} sh -c "timeout 10s rpk topic consume test_consumer_group -g {}" & # Wait a while for them to subscribe to the topic. sleep 5s From e5fb6209baef9f164e67e00557ac9683a9be1e7c Mon Sep 17 00:00:00 2001 From: xxchan Date: Tue, 16 Apr 2024 23:46:38 +0800 Subject: [PATCH 15/26] fix Signed-off-by: xxchan --- e2e_test/source_inline/kafka/consumer_group.mjs | 2 +- e2e_test/source_inline/kafka/consumer_group.slt | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/e2e_test/source_inline/kafka/consumer_group.mjs b/e2e_test/source_inline/kafka/consumer_group.mjs index 5487b0dfffcd..12e906631319 100755 --- a/e2e_test/source_inline/kafka/consumer_group.mjs +++ b/e2e_test/source_inline/kafka/consumer_group.mjs @@ -47,7 +47,7 @@ async function describe_consumer_group(group_name) { // GROUP rw-consumer-1-1 // COORDINATOR 0 // STATE Empty - // BALANCER + // BALANCER // MEMBERS 0 // TOTAL-LAG 2 const obj = {}; diff --git a/e2e_test/source_inline/kafka/consumer_group.slt b/e2e_test/source_inline/kafka/consumer_group.slt index 7c2ca300b155..6c10fb64304e 100644 --- a/e2e_test/source_inline/kafka/consumer_group.slt +++ b/e2e_test/source_inline/kafka/consumer_group.slt @@ -48,28 +48,28 @@ c sleep 5s system ok -./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv --topic test_consumer_group list-members +./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-members ---- 0,0,0 # The lag for batch query's group is 0, and each MV parition's group is 2 (1 of 3 consumed). system ok -./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv --topic test_consumer_group list-lags +./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-lags ---- 2,2,2 # We try to interfere by creating consumers that subscribing to the topic with the RW's group id. system ok -./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv --topic test_consumer_group list-groups | tr ',' '\\n' | xargs -P4 -n1 -I {} sh -c "timeout 10s rpk topic consume test_consumer_group -g {}" & +./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-groups | tr ',' '\\n' | xargs -P4 -n1 -I {} sh -c "timeout 10s rpk topic consume test_consumer_group -g {}" & # Wait a while for them to subscribe to the topic. sleep 5s # The lag is changed to 0 system ok -./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv --topic test_consumer_group list-lags +./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-lags ---- 0,0,0 @@ -99,7 +99,7 @@ DROP SOURCE s CASCADE; # consumer groups are not deleted after MV is dropped. system ok -./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv --topic test_consumer_group list-members +./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-members ---- 0,0,0 From fdf05fbb64a6ea96ae67103406a10f2790b25e05 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 17 Apr 2024 00:29:05 +0800 Subject: [PATCH 16/26] remove a bad test..... Signed-off-by: xxchan --- e2e_test/source_inline/kafka/consumer_group.slt | 7 ------- 1 file changed, 7 deletions(-) diff --git a/e2e_test/source_inline/kafka/consumer_group.slt b/e2e_test/source_inline/kafka/consumer_group.slt index 6c10fb64304e..4d3e62dc14e3 100644 --- a/e2e_test/source_inline/kafka/consumer_group.slt +++ b/e2e_test/source_inline/kafka/consumer_group.slt @@ -97,12 +97,5 @@ f statement ok DROP SOURCE s CASCADE; -# consumer groups are not deleted after MV is dropped. -system ok -./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-members ----- -0,0,0 - - system ok rpk topic delete test_consumer_group From 95581a9c33ff1aaa3c99f825daebdb22c1db09d1 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 17 Apr 2024 11:26:14 +0800 Subject: [PATCH 17/26] bump ci image version Signed-off-by: xxchan --- ci/build-ci-image.sh | 2 +- ci/docker-compose.yml | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ci/build-ci-image.sh b/ci/build-ci-image.sh index d42926cd61d4..dafae048d64c 100755 --- a/ci/build-ci-image.sh +++ b/ci/build-ci-image.sh @@ -10,7 +10,7 @@ cat ../rust-toolchain # shellcheck disable=SC2155 # REMEMBER TO ALSO UPDATE ci/docker-compose.yml -export BUILD_ENV_VERSION=v20240413_x +export BUILD_ENV_VERSION=v20240414 export BUILD_TAG="public.ecr.aws/w1p7b4n3/rw-build-env:${BUILD_ENV_VERSION}" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index d05613267342..9d6fbc1728a4 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -71,7 +71,7 @@ services: retries: 5 source-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413_x + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240414 depends_on: - mysql - db @@ -84,7 +84,7 @@ services: - ..:/risingwave sink-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413_x + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240414 depends_on: - mysql - db @@ -103,12 +103,12 @@ services: rw-build-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413_x + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240414 volumes: - ..:/risingwave ci-flamegraph-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413_x + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240414 # NOTE(kwannoel): This is used in order to permit # syscalls for `nperf` (perf_event_open), # so it can do CPU profiling. @@ -119,7 +119,7 @@ services: - ..:/risingwave regress-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240413_x + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240414 depends_on: db: condition: service_healthy From dba976201b53aafe91ce49b4c8b330e7369ab50a Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 17 Apr 2024 11:27:52 +0800 Subject: [PATCH 18/26] fix conflict Signed-off-by: xxchan --- ci/Dockerfile | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ci/Dockerfile b/ci/Dockerfile index ed640887f848..9f8d9af9178e 100644 --- a/ci/Dockerfile +++ b/ci/Dockerfile @@ -39,10 +39,6 @@ RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --no-mo ENV PATH /root/.cargo/bin/:$PATH -RUN rustup show && \ - rustup default `rustup show active-toolchain | awk '{print $1}'` && \ - rustup component add rustfmt llvm-tools-preview clippy && \ - rustup target add wasm32-wasi RUN rustup show && \ rustup default `rustup show active-toolchain | awk '{print $1}'` && \ rustup component add rustfmt llvm-tools-preview clippy && \ From 374106fab1a07c7002f341de45abfe190cbe314b Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 17 Apr 2024 11:29:35 +0800 Subject: [PATCH 19/26] fix typo Signed-off-by: xxchan --- e2e_test/source_inline/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/e2e_test/source_inline/README.md b/e2e_test/source_inline/README.md index 7b049e42e2c8..64500a55bc28 100644 --- a/e2e_test/source_inline/README.md +++ b/e2e_test/source_inline/README.md @@ -2,12 +2,12 @@ Compared with prior source tests (`e2e_test/source`), tests in this directory are expected to be easy to run locally and easy to write. -To run locally, use `risdev d` to start services (including external systems like Kafka and Postgres, or specify `user-managed` to use your own service). +To run locally, use `risedev d` to start services (including external systems like Kafka and Postgres, or specify `user-managed` to use your own service). Then use `risedev slt` to run the tests, which will load the environment variables (ports, etc.) -according to the services started by `risdev d`. +according to the services started by `risedev d`. ```sh -risedev slt 'e2e_test/source-inline/**/*.slt' +risedev slt 'e2e_test/source_inline/**/*.slt' ``` To write tests, please ensure each file is self-contained and does not depend on running external scripts to setup the environment. From 39a95a37497a422ec3b3854ad7d44ec0988c9d13 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 17 Apr 2024 11:49:28 +0800 Subject: [PATCH 20/26] minor Signed-off-by: xxchan --- e2e_test/source_inline/commands.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/e2e_test/source_inline/commands.toml b/e2e_test/source_inline/commands.toml index aab89f1fc612..8af865099ac7 100644 --- a/e2e_test/source_inline/commands.toml +++ b/e2e_test/source_inline/commands.toml @@ -29,7 +29,6 @@ if [ ! -d "${PREFIX_BIN}/kafka" ]; then exit 1 fi -# TODO: we may support risedev-env.override so that we can connect to a Kafka not started by risedev-dev. if [ -z "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then echo "RISEDEV_KAFKA_BOOTSTRAP_SERVERS is not set in risedev-env file. Did you start Kafka using $(tput setaf 4)\`./risedev d\`$(tput sgr0)?" exit 1 From 181a06c29597c8261bd280834ac73678b84c86ec Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 17 Apr 2024 14:32:56 +0800 Subject: [PATCH 21/26] debug Signed-off-by: xxchan --- ci/scripts/run-e2e-test.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh index 84b336362613..d26d59b72528 100755 --- a/ci/scripts/run-e2e-test.sh +++ b/ci/scripts/run-e2e-test.sh @@ -102,6 +102,7 @@ sqllogictest -p 4566 -d dev './e2e_test/superset/*.slt' --junit "batch-${profile echo "--- e2e, $mode, external python udf" python3 e2e_test/udf/test.py & sleep 1 +export RUST_LOG="debug" sqllogictest -p 4566 -d dev './e2e_test/udf/external_udf.slt' pkill python3 From 1636afaf30df70742cd8f011cfdc03f16f67f0cb Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 17 Apr 2024 19:37:39 +0800 Subject: [PATCH 22/26] bump slt Signed-off-by: xxchan --- Cargo.lock | 4 ++-- Makefile.toml | 2 +- ci/Dockerfile | 2 +- ci/build-ci-image.sh | 2 +- ci/docker-compose.yml | 10 +++++----- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d86418775220..14542a84a323 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12079,9 +12079,9 @@ dependencies = [ [[package]] name = "sqllogictest" -version = "0.20.0" +version = "0.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8518892e5e36bfa90163e53c4e4f36a388e0afa1cd6a3de0614253b3c9029c7" +checksum = "7e7c6a33098cd55e4fead1bd1f85c1d2064f02bafdb9fe004ca39fd94aee36e6" dependencies = [ "async-trait", "educe 0.4.23", diff --git a/Makefile.toml b/Makefile.toml index e9085a870804..f95ed99e5ec4 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -1293,7 +1293,7 @@ echo "All processes has exited." [tasks.slt] env = { SLT_HOST = "${RISEDEV_RW_FRONTEND_LISTEN_ADDRESS}", SLT_PORT = "${RISEDEV_RW_FRONTEND_PORT}", SLT_DB = "dev" } category = "RiseDev - Test - SQLLogicTest" -install_crate = { version = "0.20.0", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ +install_crate = { version = "0.20.1", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ "--help", ], install_command = "binstall" } dependencies = ["check-risedev-env-file"] diff --git a/ci/Dockerfile b/ci/Dockerfile index 9f8d9af9178e..cffa1a026be3 100644 --- a/ci/Dockerfile +++ b/ci/Dockerfile @@ -70,7 +70,7 @@ ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse RUN curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash RUN cargo binstall -y --no-symlinks cargo-llvm-cov cargo-nextest cargo-hakari cargo-sort cargo-cache cargo-audit \ cargo-make@0.37.9 \ - sqllogictest-bin@0.20.0 \ + sqllogictest-bin@0.20.1 \ sccache@0.7.4 \ && cargo cache -a \ && rm -rf "/root/.cargo/registry/index" \ diff --git a/ci/build-ci-image.sh b/ci/build-ci-image.sh index dafae048d64c..1ec12359d896 100755 --- a/ci/build-ci-image.sh +++ b/ci/build-ci-image.sh @@ -10,7 +10,7 @@ cat ../rust-toolchain # shellcheck disable=SC2155 # REMEMBER TO ALSO UPDATE ci/docker-compose.yml -export BUILD_ENV_VERSION=v20240414 +export BUILD_ENV_VERSION=v20240414_x export BUILD_TAG="public.ecr.aws/w1p7b4n3/rw-build-env:${BUILD_ENV_VERSION}" diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 9d6fbc1728a4..c754dcc174ed 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -71,7 +71,7 @@ services: retries: 5 source-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240414 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240414_x depends_on: - mysql - db @@ -84,7 +84,7 @@ services: - ..:/risingwave sink-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240414 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240414_x depends_on: - mysql - db @@ -103,12 +103,12 @@ services: rw-build-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240414 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240414_x volumes: - ..:/risingwave ci-flamegraph-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240414 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240414_x # NOTE(kwannoel): This is used in order to permit # syscalls for `nperf` (perf_event_open), # so it can do CPU profiling. @@ -119,7 +119,7 @@ services: - ..:/risingwave regress-test-env: - image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240414 + image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240414_x depends_on: db: condition: service_healthy From 3757c4f569e5809649aab40d3c08c2fe62d49bd5 Mon Sep 17 00:00:00 2001 From: xxchan Date: Wed, 17 Apr 2024 23:12:29 +0800 Subject: [PATCH 23/26] sleep longer Signed-off-by: xxchan --- e2e_test/source_inline/kafka/consumer_group.slt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/e2e_test/source_inline/kafka/consumer_group.slt b/e2e_test/source_inline/kafka/consumer_group.slt index 4d3e62dc14e3..ed97dec558f3 100644 --- a/e2e_test/source_inline/kafka/consumer_group.slt +++ b/e2e_test/source_inline/kafka/consumer_group.slt @@ -62,10 +62,10 @@ system ok # We try to interfere by creating consumers that subscribing to the topic with the RW's group id. system ok -./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-groups | tr ',' '\\n' | xargs -P4 -n1 -I {} sh -c "timeout 10s rpk topic consume test_consumer_group -g {}" & +./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-groups | tr ',' '\\n' | xargs -P4 -I {} sh -c "timeout 40s rpk topic consume test_consumer_group -g {}" & # Wait a while for them to subscribe to the topic. -sleep 5s +sleep 15s # The lag is changed to 0 system ok From 54543430e69d596aa21bda019f3cbbf5fafcdc37 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 18 Apr 2024 12:47:35 +0800 Subject: [PATCH 24/26] remove debug log Signed-off-by: xxchan --- ci/scripts/run-e2e-test.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/ci/scripts/run-e2e-test.sh b/ci/scripts/run-e2e-test.sh index d26d59b72528..84b336362613 100755 --- a/ci/scripts/run-e2e-test.sh +++ b/ci/scripts/run-e2e-test.sh @@ -102,7 +102,6 @@ sqllogictest -p 4566 -d dev './e2e_test/superset/*.slt' --junit "batch-${profile echo "--- e2e, $mode, external python udf" python3 e2e_test/udf/test.py & sleep 1 -export RUST_LOG="debug" sqllogictest -p 4566 -d dev './e2e_test/udf/external_udf.slt' pkill python3 From 8c4be2877e645f78ce406d733f64f36986ac6d47 Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 18 Apr 2024 13:00:27 +0800 Subject: [PATCH 25/26] update readme Signed-off-by: xxchan --- e2e_test/source_inline/README.md | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/e2e_test/source_inline/README.md b/e2e_test/source_inline/README.md index 64500a55bc28..b827a5d3801c 100644 --- a/e2e_test/source_inline/README.md +++ b/e2e_test/source_inline/README.md @@ -1,16 +1,30 @@ # "Inline" style source e2e tests -Compared with prior source tests (`e2e_test/source`), tests in this directory are expected to be easy to run locally and easy to write. +Compared with prior source tests ( `e2e_test/source` ), tests in this directory are expected to be easy to run locally and easy to write. + +Refer to https://github.com/risingwavelabs/risingwave/issues/12451#issuecomment-2051861048 for more details. + +## Install Dependencies + +Some additional tools are needed to run the `system` commands in tests. + +- `rpk`: Redpanda (Kafka) CLI toolbox. https://docs.redpanda.com/current/get-started/rpk-install/ +- `zx`: A tool for writing better scripts. `npm install -g zx` + +## Run tests To run locally, use `risedev d` to start services (including external systems like Kafka and Postgres, or specify `user-managed` to use your own service). Then use `risedev slt` to run the tests, which will load the environment variables (ports, etc.) -according to the services started by `risedev d`. +according to the services started by `risedev d` . ```sh risedev slt 'e2e_test/source_inline/**/*.slt' ``` +## Write tests + To write tests, please ensure each file is self-contained and does not depend on running external scripts to setup the environment. -Use `system` command to setup instead. -Refer to https://github.com/risingwavelabs/risingwave/issues/12451#issuecomment-2051861048 for more details. +Use `system` command to setup instead. +For simple cases, you can directly write a bash command; +For more complex cases, you can write a test script (with any language like bash, python, zx), and invoke it in the `system` command. From 9ab5045be10e8092536d2c00c1322680c827080f Mon Sep 17 00:00:00 2001 From: xxchan Date: Thu, 18 Apr 2024 13:12:37 +0800 Subject: [PATCH 26/26] refine EnsureStopService Signed-off-by: xxchan --- e2e_test/source_inline/README.md | 2 +- src/risedevtool/src/bin/risedev-dev.rs | 24 +--------- src/risedevtool/src/service_config.rs | 44 +++++++++++++++++++ .../src/task/ensure_stop_service.rs | 13 +++--- 4 files changed, 54 insertions(+), 29 deletions(-) diff --git a/e2e_test/source_inline/README.md b/e2e_test/source_inline/README.md index b827a5d3801c..3a9070639b8c 100644 --- a/e2e_test/source_inline/README.md +++ b/e2e_test/source_inline/README.md @@ -26,5 +26,5 @@ risedev slt 'e2e_test/source_inline/**/*.slt' To write tests, please ensure each file is self-contained and does not depend on running external scripts to setup the environment. Use `system` command to setup instead. -For simple cases, you can directly write a bash command; +For simple cases, you can directly write a bash command; For more complex cases, you can write a test script (with any language like bash, python, zx), and invoke it in the `system` command. diff --git a/src/risedevtool/src/bin/risedev-dev.rs b/src/risedevtool/src/bin/risedev-dev.rs index 1e4df4b43a80..d5a14cbf8c1b 100644 --- a/src/risedevtool/src/bin/risedev-dev.rs +++ b/src/risedevtool/src/bin/risedev-dev.rs @@ -98,28 +98,8 @@ fn task_main( let mut ports = vec![]; for service in services { - let listen_info = match service { - ServiceConfig::Minio(c) => Some((c.port, c.id.clone())), - ServiceConfig::Etcd(c) => Some((c.port, c.id.clone())), - ServiceConfig::Sqlite(_) => None, - ServiceConfig::Prometheus(c) => Some((c.port, c.id.clone())), - ServiceConfig::ComputeNode(c) => Some((c.port, c.id.clone())), - ServiceConfig::MetaNode(c) => Some((c.port, c.id.clone())), - ServiceConfig::Frontend(c) => Some((c.port, c.id.clone())), - ServiceConfig::Compactor(c) => Some((c.port, c.id.clone())), - ServiceConfig::Grafana(c) => Some((c.port, c.id.clone())), - ServiceConfig::Tempo(c) => Some((c.port, c.id.clone())), - ServiceConfig::Kafka(c) => Some((c.port, c.id.clone())), - ServiceConfig::Pubsub(c) => Some((c.port, c.id.clone())), - ServiceConfig::Redis(c) => Some((c.port, c.id.clone())), - ServiceConfig::ZooKeeper(c) => Some((c.port, c.id.clone())), - ServiceConfig::AwsS3(_) => None, - ServiceConfig::Opendal(_) => None, - ServiceConfig::RedPanda(_) => None, - }; - - if let Some(x) = listen_info { - ports.push(x); + if let Some(port) = service.port() { + ports.push((port, service.id().to_string(), service.user_managed())); } } diff --git a/src/risedevtool/src/service_config.rs b/src/risedevtool/src/service_config.rs index 83e90147b1b6..e5f149b8d10c 100644 --- a/src/risedevtool/src/service_config.rs +++ b/src/risedevtool/src/service_config.rs @@ -380,6 +380,50 @@ impl ServiceConfig { Self::Opendal(c) => &c.id, } } + + pub fn port(&self) -> Option { + match self { + Self::ComputeNode(c) => Some(c.port), + Self::MetaNode(c) => Some(c.port), + Self::Frontend(c) => Some(c.port), + Self::Compactor(c) => Some(c.port), + Self::Minio(c) => Some(c.port), + Self::Etcd(c) => Some(c.port), + Self::Sqlite(_) => None, + Self::Prometheus(c) => Some(c.port), + Self::Grafana(c) => Some(c.port), + Self::Tempo(c) => Some(c.port), + Self::AwsS3(_) => None, + Self::ZooKeeper(c) => Some(c.port), + Self::Kafka(c) => Some(c.port), + Self::Pubsub(c) => Some(c.port), + Self::Redis(c) => Some(c.port), + Self::RedPanda(_c) => None, + Self::Opendal(_) => None, + } + } + + pub fn user_managed(&self) -> bool { + match self { + Self::ComputeNode(c) => c.user_managed, + Self::MetaNode(c) => c.user_managed, + Self::Frontend(c) => c.user_managed, + Self::Compactor(c) => c.user_managed, + Self::Minio(_c) => false, + Self::Etcd(_c) => false, + Self::Sqlite(_c) => false, + Self::Prometheus(_c) => false, + Self::Grafana(_c) => false, + Self::Tempo(_c) => false, + Self::AwsS3(_c) => false, + Self::ZooKeeper(_c) => false, + Self::Kafka(c) => c.user_managed, + Self::Pubsub(_c) => false, + Self::Redis(_c) => false, + Self::RedPanda(_c) => false, + Self::Opendal(_c) => false, + } + } } mod string { diff --git a/src/risedevtool/src/task/ensure_stop_service.rs b/src/risedevtool/src/task/ensure_stop_service.rs index 62c2aa7de7c3..519804d11688 100644 --- a/src/risedevtool/src/task/ensure_stop_service.rs +++ b/src/risedevtool/src/task/ensure_stop_service.rs @@ -17,11 +17,12 @@ use anyhow::Result; use super::{ExecuteContext, Task}; pub struct EnsureStopService { - ports: Vec<(u16, String)>, + /// `(port, id, user_managed)` + ports: Vec<(u16, String, bool)>, } impl EnsureStopService { - pub fn new(ports: Vec<(u16, String)>) -> Result { + pub fn new(ports: Vec<(u16, String, bool)>) -> Result { Ok(Self { ports }) } } @@ -30,16 +31,16 @@ impl Task for EnsureStopService { fn execute(&mut self, ctx: &mut ExecuteContext) -> anyhow::Result<()> { ctx.service(self); - for (port, service) in &self.ports { - // Do not require stopping kafka services - if service.starts_with("kafka") { + for (port, service_id, user_managed) in &self.ports { + // Do not require stopping user-managed services + if *user_managed { continue; } let address = format!("127.0.0.1:{}", port); ctx.pb.set_message(format!( "waiting for port close - {} (will be used by {})", - address, service + address, service_id )); ctx.wait_tcp_close(&address)?; }