Skip to content

Commit

Permalink
Merge pull request #18236 from oleiman/xform/core-1653/pause-transform
Browse files Browse the repository at this point in the history
PUT /v1/transform/{name}/meta
  • Loading branch information
oleiman authored May 15, 2024
2 parents bd81a79 + e9fd450 commit d0eedf7
Show file tree
Hide file tree
Showing 17 changed files with 483 additions and 13 deletions.
25 changes: 25 additions & 0 deletions src/go/rpk/pkg/adminapi/api_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,28 @@ func (a *AdminAPI) ListWasmTransforms(ctx context.Context) ([]TransformMetadata,
err := a.sendAny(ctx, http.MethodGet, baseTransformEndpoint, nil, &resp)
return resp, err
}

type patchMetadataRequest struct {
IsPaused *bool `json:"is_paused,omitempty"`
Environment *[]EnvironmentVariable `json:"env,omitempty"`
}

// PauseTransform patches transform metadata to set paused = true, with no effect on the transform's env
func (a *AdminAPI) PauseTransform(ctx context.Context, transformName string) error {
paused := true
body := patchMetadataRequest{
IsPaused: &paused,
Environment: nil,
}
return a.sendAny(ctx, http.MethodPut, baseTransformEndpoint+url.PathEscape(transformName)+"/meta", body, nil)
}

// ResumeTransform patches transform metadata to set paused = false, with no effect on the transform's env
func (a *AdminAPI) ResumeTransform(ctx context.Context, transformName string) error {
paused := false
body := patchMetadataRequest{
IsPaused: &paused,
Environment: nil,
}
return a.sendAny(ctx, http.MethodPut, baseTransformEndpoint+url.PathEscape(transformName)+"/meta", body, nil)
}
83 changes: 83 additions & 0 deletions src/go/rpk/pkg/cli/transform/meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package transform

import (
"fmt"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
)

func newPauseCommand(fs afero.Fs, p *config.Params) *cobra.Command {
cmd := &cobra.Command{
Use: "pause [NAME]",
Short: "Pause a data transform",
Long: `Pause a data transform.
This command suspends execution of the specified transform without removing
it from the system. In this way, a transform may resume at a later time, with
each new processor picking up processing from the last committed offset on the
corresponding input partition.
Subsequent 'rpk transform list' operations will show transform processors as
"inactive".
To resume a paused transform, use 'rpk transform resume'.
`,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitServerlessAdmin(p)
api, err := adminapi.NewClient(fs, p)
out.MaybeDie(err, "unable to initialize admin api client: %v", err)
functionName := args[0]
err = api.PauseTransform(cmd.Context(), functionName)
out.MaybeDie(err, "unable to pause transform %q: %v", functionName, err)
fmt.Println("Transform paused!")
},
}
return cmd
}

func newResumeCommand(fs afero.Fs, p *config.Params) *cobra.Command {
cmd := &cobra.Command{
Use: "resume [NAME]",
Short: "Resume a data transform",
Long: `Resume a data transform.
This command resumes execution of the specified data transform, if it was
previously paused. Transform processors are restarted and resume processing
from the last committed offset on the corresponding input partition.
Subsequent 'rpk transform list' operations will show transform processors as
"running".
`,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitServerlessAdmin(p)
api, err := adminapi.NewClient(fs, p)
out.MaybeDie(err, "unable to initialize admin api client: %v", err)
functionName := args[0]
err = api.ResumeTransform(cmd.Context(), functionName)
out.MaybeDie(err, "unable to resume transform %q: %v", functionName, err)
fmt.Println("Transform resumed!")
},
}
return cmd
}
2 changes: 2 additions & 0 deletions src/go/rpk/pkg/cli/transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func NewCommand(fs afero.Fs, p *config.Params, execFn func(string, []string) err
newInitializeCommand(fs),
newBuildCommand(fs, execFn),
newLogsCommand(fs, p),
newPauseCommand(fs, p),
newResumeCommand(fs, p),
)
return cmd
}
5 changes: 5 additions & 0 deletions src/v/cluster/plugin_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/namespace.h"
#include "model/transform.h"
#include "strings/utf8.h"

#include <absl/container/flat_hash_set.h>
Expand Down Expand Up @@ -632,6 +633,10 @@ std::optional<transform_metadata>
plugin_frontend::lookup_transform(transform_id id) const {
return _table->find_by_id(id);
}
std::optional<transform_metadata>
plugin_frontend::lookup_transform(const transform_name& name) const {
return _table->find_by_name(name);
}
absl::btree_map<transform_id, transform_metadata>
plugin_frontend::lookup_transforms_by_input_topic(
model::topic_namespace_view tp_ns) const {
Expand Down
4 changes: 4 additions & 0 deletions src/v/cluster/plugin_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ class plugin_frontend : public ss::peering_sharded_service<plugin_frontend> {
std::optional<model::transform_metadata>
lookup_transform(model::transform_id) const;

// Lookup a transform by name.
std::optional<model::transform_metadata>
lookup_transform(const model::transform_name&) const;

// Lookup transforms for input topics.
absl::btree_map<model::transform_id, model::transform_metadata>
lookup_transforms_by_input_topic(model::topic_namespace_view) const;
Expand Down
5 changes: 3 additions & 2 deletions src/v/model/transform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,14 @@ std::ostream& operator<<(std::ostream& os, const transform_metadata& meta) {
fmt::print(
os,
"{{name: \"{}\", input: {}, outputs: {}, "
"env: <redacted>, uuid: {}, source_ptr: {} }}",
"env: <redacted>, uuid: {}, source_ptr: {}, is_paused: {} }}",
meta.name,
meta.input_topic,
meta.output_topics,
// skip env becuase of pii
meta.uuid,
meta.source_ptr);
meta.source_ptr,
meta.paused);
return os;
}

Expand Down
26 changes: 24 additions & 2 deletions src/v/model/transform.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ using transform_name = named_type<ss::sstring, struct transform_name_tag>;
using transform_name_view
= named_type<std::string_view, struct transform_name_view_tag>;

/**
* Whether a transform is or should be paused (i.e. stopped but not removed from
* the system).
*/
using is_transform_paused = ss::bool_class<struct is_paused_tag>;

/**
* The options related to the offset at which transforms are at.
*
Expand Down Expand Up @@ -112,7 +118,7 @@ struct transform_offset_options
struct transform_metadata
: serde::envelope<
transform_metadata,
serde::version<1>,
serde::version<2>,
serde::compat_version<0>> {
// The user specified name of the transform.
transform_name name;
Expand All @@ -133,6 +139,8 @@ struct transform_metadata
// The options related to the offset that the transform processor.
transform_offset_options offset_options;

model::is_transform_paused paused{false};

friend bool operator==(const transform_metadata&, const transform_metadata&)
= default;

Expand All @@ -146,10 +154,24 @@ struct transform_metadata
environment,
uuid,
source_ptr,
offset_options);
offset_options,
paused);
}
};

// A patch update for transform metadata.
//
// This is used by the Admin API handler for `PUT /v1/transform/{name}/meta`
// See `redpanda/admin/transform.cc` or `redpanda/admin/api-doc/transform.json`
// for detail.
struct transform_metadata_patch {
// This has PUT semantics, such that the existing env values will be
// completely overwritten by the contents of this map.
std::optional<absl::flat_hash_map<ss::sstring, ss::sstring>> env;
// Desired paused state for the transform
std::optional<is_transform_paused> paused;
};

using output_topic_index = named_type<uint32_t, struct output_topic_index_tag>;

// key / value types used to track consumption offsets by transforms.
Expand Down
50 changes: 50 additions & 0 deletions src/v/redpanda/admin/api-doc/transform.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,37 @@
}
]
},
{
"path": "/v1/transform/{name}/meta",
"operations": [
{
"method": "PUT",
"summary": "Patch transform metadata.",
"type": "void",
"nickname": "patch_transform_metadata",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"parameters": [
{
"name": "name",
"in": "path",
"required": true,
"type": "string"
},
{
"name": "metadata",
"in": "body",
"required": true,
"type": "transform_metadata_patch"
}
]
}
]
},
{
"path": "/v1/transform",
"operations": [
Expand Down Expand Up @@ -205,6 +236,25 @@
"description": "The offset within the input topic that has been committed"
}
}
},
"transform_metadata_patch": {
"id": "transform_metadata_patch",
"description": "A partial update to transform metadata. See individual field descriptions for semantics.",
"properties": {
"env": {
"required": false,
"description": "If present, overrides the transform's existing environment map, wholesale. If not present, the existing map is unchanged.",
"type": "array",
"items": {
"type": "environment_variable"
}
},
"is_paused": {
"required": false,
"description": "If present, sets the transform's 'paused' state. If not present, paused state is unchanged.",
"type": "boolean"
}
}
}
}
}
2 changes: 2 additions & 0 deletions src/v/redpanda/admin/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,8 @@ class admin_server {
list_committed_offsets(std::unique_ptr<ss::http::request>);
ss::future<ss::json::json_return_type>
garbage_collect_committed_offsets(std::unique_ptr<ss::http::request>);
ss::future<ss::json::json_return_type>
patch_transform_metadata(std::unique_ptr<ss::http::request>);

ss::future<> throw_on_error(
ss::http::request& req,
Expand Down
Loading

0 comments on commit d0eedf7

Please sign in to comment.