diff --git a/src/go/rpk/pkg/adminapi/api_transform.go b/src/go/rpk/pkg/adminapi/api_transform.go index cb69f6d271793..5d860e8a111e8 100644 --- a/src/go/rpk/pkg/adminapi/api_transform.go +++ b/src/go/rpk/pkg/adminapi/api_transform.go @@ -69,3 +69,28 @@ func (a *AdminAPI) ListWasmTransforms(ctx context.Context) ([]TransformMetadata, err := a.sendAny(ctx, http.MethodGet, baseTransformEndpoint, nil, &resp) return resp, err } + +type patchMetadataRequest struct { + IsPaused *bool `json:"is_paused,omitempty"` + Environment *[]EnvironmentVariable `json:"env,omitempty"` +} + +// PauseTransform patches transform metadata to set paused = true, with no effect on the transform's env +func (a *AdminAPI) PauseTransform(ctx context.Context, transformName string) error { + paused := true + body := patchMetadataRequest{ + IsPaused: &paused, + Environment: nil, + } + return a.sendAny(ctx, http.MethodPut, baseTransformEndpoint+url.PathEscape(transformName)+"/meta", body, nil) +} + +// ResumeTransform patches transform metadata to set paused = false, with no effect on the transform's env +func (a *AdminAPI) ResumeTransform(ctx context.Context, transformName string) error { + paused := false + body := patchMetadataRequest{ + IsPaused: &paused, + Environment: nil, + } + return a.sendAny(ctx, http.MethodPut, baseTransformEndpoint+url.PathEscape(transformName)+"/meta", body, nil) +} diff --git a/src/go/rpk/pkg/cli/transform/meta.go b/src/go/rpk/pkg/cli/transform/meta.go new file mode 100644 index 0000000000000..c42951368d9d2 --- /dev/null +++ b/src/go/rpk/pkg/cli/transform/meta.go @@ -0,0 +1,83 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package transform + +import ( + "fmt" + + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" + "github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" + "github.com/spf13/afero" + "github.com/spf13/cobra" +) + +func newPauseCommand(fs afero.Fs, p *config.Params) *cobra.Command { + cmd := &cobra.Command{ + Use: "pause [NAME]", + Short: "Pause a data transform", + Long: `Pause a data transform. + +This command suspends execution of the specified transform without removing +it from the system. In this way, a transform may resume at a later time, with +each new processor picking up processing from the last committed offset on the +corresponding input partition. + +Subsequent 'rpk transform list' operations will show transform processors as +"inactive". + +To resume a paused transform, use 'rpk transform resume'. +`, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + p, err := p.LoadVirtualProfile(fs) + out.MaybeDie(err, "rpk unable to load config: %v", err) + config.CheckExitServerlessAdmin(p) + api, err := adminapi.NewClient(fs, p) + out.MaybeDie(err, "unable to initialize admin api client: %v", err) + functionName := args[0] + err = api.PauseTransform(cmd.Context(), functionName) + out.MaybeDie(err, "unable to pause transform %q: %v", functionName, err) + fmt.Println("Transform paused!") + }, + } + return cmd +} + +func newResumeCommand(fs afero.Fs, p *config.Params) *cobra.Command { + cmd := &cobra.Command{ + Use: "resume [NAME]", + Short: "Resume a data transform", + Long: `Resume a data transform. + +This command resumes execution of the specified data transform, if it was +previously paused. Transform processors are restarted and resume processing +from the last committed offset on the corresponding input partition. + +Subsequent 'rpk transform list' operations will show transform processors as +"running". +`, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + p, err := p.LoadVirtualProfile(fs) + out.MaybeDie(err, "rpk unable to load config: %v", err) + config.CheckExitServerlessAdmin(p) + api, err := adminapi.NewClient(fs, p) + out.MaybeDie(err, "unable to initialize admin api client: %v", err) + functionName := args[0] + err = api.ResumeTransform(cmd.Context(), functionName) + out.MaybeDie(err, "unable to resume transform %q: %v", functionName, err) + fmt.Println("Transform resumed!") + }, + } + return cmd +} diff --git a/src/go/rpk/pkg/cli/transform/transform.go b/src/go/rpk/pkg/cli/transform/transform.go index dbd90608e9e5c..f2691414b2df2 100644 --- a/src/go/rpk/pkg/cli/transform/transform.go +++ b/src/go/rpk/pkg/cli/transform/transform.go @@ -29,6 +29,8 @@ func NewCommand(fs afero.Fs, p *config.Params, execFn func(string, []string) err newInitializeCommand(fs), newBuildCommand(fs, execFn), newLogsCommand(fs, p), + newPauseCommand(fs, p), + newResumeCommand(fs, p), ) return cmd } diff --git a/src/v/cluster/plugin_frontend.cc b/src/v/cluster/plugin_frontend.cc index c57e92f331347..b110656fb1527 100644 --- a/src/v/cluster/plugin_frontend.cc +++ b/src/v/cluster/plugin_frontend.cc @@ -18,6 +18,7 @@ #include "model/fundamental.h" #include "model/metadata.h" #include "model/namespace.h" +#include "model/transform.h" #include "strings/utf8.h" #include @@ -632,6 +633,10 @@ std::optional plugin_frontend::lookup_transform(transform_id id) const { return _table->find_by_id(id); } +std::optional +plugin_frontend::lookup_transform(const transform_name& name) const { + return _table->find_by_name(name); +} absl::btree_map plugin_frontend::lookup_transforms_by_input_topic( model::topic_namespace_view tp_ns) const { diff --git a/src/v/cluster/plugin_frontend.h b/src/v/cluster/plugin_frontend.h index 838089c9bb67e..8a0071aeeb85a 100644 --- a/src/v/cluster/plugin_frontend.h +++ b/src/v/cluster/plugin_frontend.h @@ -84,6 +84,10 @@ class plugin_frontend : public ss::peering_sharded_service { std::optional lookup_transform(model::transform_id) const; + // Lookup a transform by name. + std::optional + lookup_transform(const model::transform_name&) const; + // Lookup transforms for input topics. absl::btree_map lookup_transforms_by_input_topic(model::topic_namespace_view) const; diff --git a/src/v/model/transform.cc b/src/v/model/transform.cc index efcc8965a3edd..6da590523e394 100644 --- a/src/v/model/transform.cc +++ b/src/v/model/transform.cc @@ -91,13 +91,14 @@ std::ostream& operator<<(std::ostream& os, const transform_metadata& meta) { fmt::print( os, "{{name: \"{}\", input: {}, outputs: {}, " - "env: , uuid: {}, source_ptr: {} }}", + "env: , uuid: {}, source_ptr: {}, is_paused: {} }}", meta.name, meta.input_topic, meta.output_topics, // skip env becuase of pii meta.uuid, - meta.source_ptr); + meta.source_ptr, + meta.paused); return os; } diff --git a/src/v/model/transform.h b/src/v/model/transform.h index dfe02de94c6de..70f381076d7c6 100644 --- a/src/v/model/transform.h +++ b/src/v/model/transform.h @@ -70,6 +70,12 @@ using transform_name = named_type; using transform_name_view = named_type; +/** + * Whether a transform is or should be paused (i.e. stopped but not removed from + * the system). + */ +using is_transform_paused = ss::bool_class; + /** * The options related to the offset at which transforms are at. * @@ -112,7 +118,7 @@ struct transform_offset_options struct transform_metadata : serde::envelope< transform_metadata, - serde::version<1>, + serde::version<2>, serde::compat_version<0>> { // The user specified name of the transform. transform_name name; @@ -133,6 +139,8 @@ struct transform_metadata // The options related to the offset that the transform processor. transform_offset_options offset_options; + model::is_transform_paused paused{false}; + friend bool operator==(const transform_metadata&, const transform_metadata&) = default; @@ -146,10 +154,24 @@ struct transform_metadata environment, uuid, source_ptr, - offset_options); + offset_options, + paused); } }; +// A patch update for transform metadata. +// +// This is used by the Admin API handler for `PUT /v1/transform/{name}/meta` +// See `redpanda/admin/transform.cc` or `redpanda/admin/api-doc/transform.json` +// for detail. +struct transform_metadata_patch { + // This has PUT semantics, such that the existing env values will be + // completely overwritten by the contents of this map. + std::optional> env; + // Desired paused state for the transform + std::optional paused; +}; + using output_topic_index = named_type; // key / value types used to track consumption offsets by transforms. diff --git a/src/v/redpanda/admin/api-doc/transform.json b/src/v/redpanda/admin/api-doc/transform.json index 4ce117ca701da..4954432e24264 100644 --- a/src/v/redpanda/admin/api-doc/transform.json +++ b/src/v/redpanda/admin/api-doc/transform.json @@ -45,6 +45,37 @@ } ] }, + { + "path": "/v1/transform/{name}/meta", + "operations": [ + { + "method": "PUT", + "summary": "Patch transform metadata.", + "type": "void", + "nickname": "patch_transform_metadata", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "parameters": [ + { + "name": "name", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "metadata", + "in": "body", + "required": true, + "type": "transform_metadata_patch" + } + ] + } + ] + }, { "path": "/v1/transform", "operations": [ @@ -205,6 +236,25 @@ "description": "The offset within the input topic that has been committed" } } + }, + "transform_metadata_patch": { + "id": "transform_metadata_patch", + "description": "A partial update to transform metadata. See individual field descriptions for semantics.", + "properties": { + "env": { + "required": false, + "description": "If present, overrides the transform's existing environment map, wholesale. If not present, the existing map is unchanged.", + "type": "array", + "items": { + "type": "environment_variable" + } + }, + "is_paused": { + "required": false, + "description": "If present, sets the transform's 'paused' state. If not present, paused state is unchanged.", + "type": "boolean" + } + } } } } diff --git a/src/v/redpanda/admin/server.h b/src/v/redpanda/admin/server.h index c49f6956a2c13..a5ffb9bcbd5b0 100644 --- a/src/v/redpanda/admin/server.h +++ b/src/v/redpanda/admin/server.h @@ -636,6 +636,8 @@ class admin_server { list_committed_offsets(std::unique_ptr); ss::future garbage_collect_committed_offsets(std::unique_ptr); + ss::future + patch_transform_metadata(std::unique_ptr); ss::future<> throw_on_error( ss::http::request& req, diff --git a/src/v/redpanda/admin/transform.cc b/src/v/redpanda/admin/transform.cc index 582b4ceba411d..7df8bf7498528 100644 --- a/src/v/redpanda/admin/transform.cc +++ b/src/v/redpanda/admin/transform.cc @@ -21,6 +21,8 @@ #include "redpanda/admin/util.h" #include "transform/api.h" +#include + #include namespace { @@ -50,6 +52,9 @@ void admin_server::register_wasm_transform_routes() { [this](auto req) { return garbage_collect_committed_offsets(std::move(req)); }); + register_route( + ss::httpd::transform_json::patch_transform_metadata, + [this](auto req) { return patch_transform_metadata(std::move(req)); }); } ss::future @@ -269,3 +274,101 @@ admin_server::garbage_collect_committed_offsets( co_await throw_on_error(*req, ec, model::controller_ntp); co_return ss::json::json_void(); } + +void validate_transform_patch_document(const json::Document& doc) { + const std::string schema = R"( +{ + "type": "object", + "properties": { + "env": { + "type": "array", + "items": { + "type": "object", + "properties": { + "key": { + "type": "string" + }, + "value": { + "type": "string" + } + }, + "required": [ + "key", + "value" + ], + "additionalProperties": false + } + }, + "is_paused": { + "type": "boolean" + } + }, + "required": [], + "additionalProperties": false +} +)"; + + auto validator = json::validator(schema); + try { + json::validate(validator, doc); + } catch (json::json_validation_error& err) { + throw ss::httpd::bad_request_exception( + fmt::format("invalid JSON request body: {}", err.what())); + } +} + +model::transform_metadata_patch +parse_json_metadata_patch(const json::Document& doc) { + validate_transform_patch_document(doc); + + model::transform_metadata_patch result; + + if (doc.HasMember("env")) { + result.env.emplace(); + absl::c_transform( + doc["env"].GetArray(), + std::inserter(result.env.value(), result.env.value().end()), + [](const auto& p) { + return std::make_pair( + p["key"].GetString(), p["value"].GetString()); + }); + } + + if (doc.HasMember("is_paused")) { + result.paused.emplace(doc["is_paused"].GetBool()); + } + + return result; +} + +ss::future +admin_server::patch_transform_metadata(std::unique_ptr req) { + if (!_transform_service->local_is_initialized()) { + throw transforms_not_enabled(); + } + + ss::sstring raw_name = req->get_path_param("name"); + if (raw_name.empty()) { + throw seastar::httpd::bad_request_exception("invalid transform name"); + } + model::transform_name name{std::move(raw_name)}; + + auto doc = co_await parse_json_body(req.get()); + if (!doc.IsObject()) { + vlog(adminlog.debug, "Request body is not a JSON object"); + throw seastar::httpd::bad_request_exception( + "Request body is not a JSON object"); + } + + auto patch = parse_json_metadata_patch(doc); + if (!patch.env.has_value() && !patch.paused.has_value()) { + vlog(adminlog.debug, "Empty metadata patch ...ignoring"); + co_return ss::json::json_void(); + } + + std::error_code ec + = co_await _transform_service->local().patch_transform_metadata( + std::move(name), std::move(patch)); + co_await throw_on_error(*req, ec, model::controller_ntp); + co_return ss::json::json_void(); +} diff --git a/src/v/transform/api.cc b/src/v/transform/api.cc index b0898ac0b734b..3f3ef5308b78e 100644 --- a/src/v/transform/api.cc +++ b/src/v/transform/api.cc @@ -816,7 +816,28 @@ service::compute_node_local_report() { model::cluster_transform_report service::compute_default_report() { using state = model::transform_report::processor::state; model::cluster_transform_report report; - // Mark all transforms in an unknown state if they don't get an update + + // Mark all partitions for each transform in an unknown or inactive state if + // they don't get an update. + // + // This pattern arises from the way we model state in transform_manager. + // Namely, when the plugin_table reports an update to some transform, the + // manager responds by removing ALL processors associated with that + // transform ID from the processor_table, irrespective of the exact details + // of the update, susequently restarting those processors in all but two + // cases: + // a. The ID no longer exists in the plugin table (i.e. the transform + // has been removed from the system) + // b. The transform is "paused" + // If either (a) or (b) is true (i.e. processors were NOT restarted), the + // report entries below will not get an update, and the processor entries + // in the final report carry the "default" status noted below. + // + // Therefore, since, by design, the processor_table does not contain any + // entries for a paused transform at rest and this is only reflected in + // the _absence_ of processor entries in the cluster-wide report, we must + // account for the "paused"ness of each transform in the default report, + // below. for (auto [id, transform] : _plugin_frontend->local().all_transforms()) { auto cfg = _topic_table->local().get_topic_cfg(transform.input_topic); if (!cfg) { @@ -828,7 +849,7 @@ model::cluster_transform_report service::compute_default_report() { transform, { .id = model::partition_id(i), - .status = state::unknown, + .status = transform.paused ? state::inactive : state::unknown, .node = _self, .lag = 0, }); @@ -890,4 +911,29 @@ ss::future service::garbage_collect_committed_offsets() { std::move(ids)); } +ss::future service::patch_transform_metadata( + model::transform_name name, model::transform_metadata_patch patch) { + if (!_feature_table->local().is_active( + features::feature::wasm_transforms)) { + co_return cluster::make_error_code(cluster::errc::feature_disabled); + } + auto _ = _gate.hold(); + auto transform = _plugin_frontend->local().lookup_transform(name); + if (!transform.has_value()) { + co_return cluster::make_error_code( + cluster::errc::transform_does_not_exist); + } + + transform->paused = patch.paused.value_or(transform->paused); + if (patch.env.has_value()) { + std::exchange(transform->environment, std::move(patch.env).value()); + } + cluster::errc ec = co_await _plugin_frontend->local().upsert_transform( + transform.value(), model::timeout_clock::now() + metadata_timeout); + + vlog(tlog.info, "patching transform metadata {}", transform.value()); + + co_return cluster::make_error_code(ec); +} + } // namespace transform diff --git a/src/v/transform/include/transform/api.h b/src/v/transform/include/transform/api.h index 0788166214c0f..c714c295bd7a3 100644 --- a/src/v/transform/include/transform/api.h +++ b/src/v/transform/include/transform/api.h @@ -111,6 +111,12 @@ class service : public ss::peering_sharded_service { */ ss::future garbage_collect_committed_offsets(); + /** + * Patch the metadata for the given transform. + */ + ss::future patch_transform_metadata( + model::transform_name, model::transform_metadata_patch data); + /** * Create a reporter of the transform subsystem. */ diff --git a/src/v/transform/tests/transform_manager_test.cc b/src/v/transform/tests/transform_manager_test.cc index 6199dd8cae3c8..75008b03035de 100644 --- a/src/v/transform/tests/transform_manager_test.cc +++ b/src/v/transform/tests/transform_manager_test.cc @@ -331,6 +331,13 @@ class TransformManagerTest : public ::testing::Test { auto id = _registry->delete_transform(meta.name); _manager->on_plugin_change(id); } + void + pause_transform(std::string_view name, model::is_transform_paused pause) { + auto meta = parse_transform(name); + meta.paused = pause; + auto id = _registry->put_transform(meta); + _manager->on_plugin_change(id); + } void report_error(std::string_view str) { auto [ntp, meta] = transform_and_partition(str); auto entry = _registry->lookup_by_name(meta.name); @@ -467,6 +474,19 @@ TEST_F(TransformManagerTest, FullLifecycle) { EXPECT_THAT(status(), status_is("foo->bar/1", lifecycle_status::destroyed)); } +TEST_F(TransformManagerTest, PauseUnpause) { + become_leader("foo/1"); + deploy_transform("foo->bar"); + drain_queue(); + EXPECT_THAT(status(), status_is("foo->bar/1", lifecycle_status::active)); + pause_transform("foo->bar", model::is_transform_paused::yes); + drain_queue(); + EXPECT_THAT(status(), status_is("foo->bar/1", lifecycle_status::destroyed)); + pause_transform("foo->bar", model::is_transform_paused::no); + drain_queue(); + EXPECT_THAT(status(), status_is("foo->bar/1", lifecycle_status::active)); +} + TEST_F(TransformManagerTest, DeleteTransform) { become_leader("foo/1"); deploy_transform("foo->bar"); diff --git a/src/v/transform/transform_manager.cc b/src/v/transform/transform_manager.cc index e689c740d9096..fecae8d17b6a3 100644 --- a/src/v/transform/transform_manager.cc +++ b/src/v/transform/transform_manager.cc @@ -340,11 +340,16 @@ ss::future<> manager::handle_plugin_change(model::transform_id id) { co_await _processors->erase_by_id(id); auto transform = _registry->lookup_by_id(id); - // If there is no transform we're good to go, everything is shutdown if - // needed. - if (!transform) { + // If there is no transform OR the transform is paused, we're good to go, + // everything is shutdown if needed. + // + // Note that this has implications for how we model processor state in the + // cluster-wide transform report. + // see `transform::service::compute_default_report` for detail. + if (!transform || transform->paused) { co_return; } + // Otherwise, start a processor for every partition we're a leader of. auto partitions = _registry->get_leader_partitions(transform->input_topic); for (model::partition_id partition : partitions) { diff --git a/tests/rptest/clients/rpk.py b/tests/rptest/clients/rpk.py index b21575343e962..9d8661db681ee 100644 --- a/tests/rptest/clients/rpk.py +++ b/tests/rptest/clients/rpk.py @@ -1738,14 +1738,17 @@ def transform_from_json(loaded): input_topic=loaded["input_topic"], output_topics=loaded["output_topics"], status=[status_from_json(s) for s in loaded["status"]], - environment={ - obj["key"]: obj["value"] - for obj in loaded["environment"] - }, + environment=loaded["environment"], ) return [transform_from_json(o) for o in loaded] + def pause_wasm(self, name): + self._run_wasm(["pause", name]) + + def resume_wasm(self, name): + self._run_wasm(["resume", name]) + def describe_txn_producers(self, topics, partitions, all=False): cmd = [ "describe-producers", "--topics", ",".join(topics), "--partitions", diff --git a/tests/rptest/services/admin.py b/tests/rptest/services/admin.py index d1d0f6b52f3c1..7b3e87cb558b0 100644 --- a/tests/rptest/services/admin.py +++ b/tests/rptest/services/admin.py @@ -1540,3 +1540,15 @@ def transforms_gc_committed_offsets(self, node: Optional[ClusterNode] = None): path = "transform/debug/committed_offsets/garbage_collect" return self._request("POST", path, node=node) + + def transforms_patch_meta(self, + name: str, + pause: bool | None = None, + env: dict[str, str] | None = None): + path = f"transform/{name}/meta" + body = {} + if pause is not None: + body["is_paused"] = pause + if env is not None: + body["env"] = [dict(key=k, value=env[k]) for k in env] + return self._request("PUT", path, json=body) diff --git a/tests/rptest/tests/data_transforms_test.py b/tests/rptest/tests/data_transforms_test.py index 0b4eee308987e..838eafca8ae37 100644 --- a/tests/rptest/tests/data_transforms_test.py +++ b/tests/rptest/tests/data_transforms_test.py @@ -23,6 +23,7 @@ from rptest.services.cluster import cluster from rptest.services.redpanda import MetricSamples, MetricsEndpoint from ducktape.utils.util import wait_until +from ducktape.errors import TimeoutError from rptest.services.transform_verifier_service import TransformVerifierProduceConfig, TransformVerifierProduceStatus, TransformVerifierService, TransformVerifierConsumeConfig, TransformVerifierConsumeStatus from rptest.services.admin import Admin, CommittedWasmOffset @@ -317,6 +318,86 @@ def all_offsets_removed(): retry_on_exc=True, ) + @cluster(num_nodes=3) + @matrix(use_rpk=[False, True]) + def test_patch(self, use_rpk): + input_topic = self.topics[0] + output_topic = self.topics[1] + self._deploy_wasm(name="identity-xform", + input_topic=input_topic, + output_topic=output_topic, + wait_running=True) + + def all_partitions_status(stat: str): + report = self._rpk.list_wasm() + return all(s.status == stat for s in report[0].status) + + def env_is(env: dict[str, str]): + report = self._rpk.list_wasm() + return report[0].environment == env + + if use_rpk: + self._rpk.pause_wasm("identity-xform") + else: + rsp = self._admin.transforms_patch_meta("identity-xform", + pause=True) + assert rsp.status_code == 200, f"/meta request failed, status: {rsp.status_code}" + + wait_until(lambda: all_partitions_status("inactive"), + timeout_sec=30, + backoff_sec=1, + err_msg=f"some partitions didn't become inactive", + retry_on_exc=True) + + with expect_exception(TimeoutError, lambda _: True): + wait_until(lambda: all_partitions_status("running"), + timeout_sec=15, + backoff_sec=1, + retry_on_exc=True) + + if use_rpk: + self._rpk.resume_wasm("identity-xform") + wait_until(lambda: all_partitions_status("running"), + timeout_sec=30, + backoff_sec=1, + err_msg=f"some partitions didn't become active", + retry_on_exc=True) + else: # NOTE: patching ENV is not yet implemented in rpk + env1 = { + "FOO": "bar", + "BAZ": "quux", + } + env2 = {"FOO": "bells"} + + rsp = self._admin.transforms_patch_meta("identity-xform", + pause=False, + env=env1) + assert rsp.status_code == 200, f"/meta request failed, status: {rsp.status_code}" + + wait_until( + lambda: all_partitions_status("running") and env_is(env1), + timeout_sec=30, + backoff_sec=1, + err_msg=f"some partitions didn't come back", + retry_on_exc=True) + + rsp = self._admin.transforms_patch_meta("identity-xform", env=env2) + + wait_until( + lambda: all_partitions_status("running") and env_is(env2), + timeout_sec=30, + backoff_sec=1, + err_msg="some partitions didn't take the env update", + retry_on_exc=True) + + rsp = self._admin.transforms_patch_meta("identity-xform", env={}) + + wait_until(lambda: all_partitions_status("running") and env_is({}), + timeout_sec=30, + backoff_sec=1, + err_msg="some partitions did not clear their envs", + retry_on_exc=True) + class DataTransformsChainingTest(BaseDataTransformsTest): """