Skip to content

Commit

Permalink
Merge pull request #19975 from oleiman/xform/core-2841/specify-start-…
Browse files Browse the repository at this point in the history
…offset

[CORE-2841] Transforms: Start consuming from an arbitrary offset (numeric from start/end or timestamp)
  • Loading branch information
aanthony-rp authored Jul 3, 2024
2 parents 146b83d + 5d03998 commit 64090fc
Show file tree
Hide file tree
Showing 17 changed files with 461 additions and 15 deletions.
7 changes: 7 additions & 0 deletions src/go/rpk/pkg/adminapi/api_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ type EnvironmentVariable struct {
Value string `json:"value"`
}

// Offset describes an offset relative to some timestamp or the start/end of a topic partition
type Offset struct {
Format string `json:"format"`
Value int64 `json:"value"`
}

// TransformMetadata is the metadata for a live running transform on a cluster.
type TransformMetadata struct {
Name string `json:"name"`
Expand All @@ -62,6 +68,7 @@ type TransformMetadata struct {
Status []PartitionTransformStatus `json:"status,omitempty"`
Environment []EnvironmentVariable `json:"environment,omitempty"`
CompressionMode string `json:"compression,omitempty"`
FromOffset *Offset `json:"offset,omitempty"`
}

// ListWasmTransforms lists the transforms that are running on a cluster.
Expand Down
48 changes: 48 additions & 0 deletions src/go/rpk/pkg/cli/transform/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"io"
"os"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -60,6 +61,17 @@ are separated by an equals for example: --var=KEY=VALUE
The --var flag can be repeated to specify multiple variables like so:
rpk transform deploy --var FOO=BAR --var FIZZ=BUZZ
The --from-offset flag can be used to specify where on the input topic the transform
should begin processing. Expressed as:
- @T - Begin reading records with committed timestamp >= T (UNIX time, ms from epoch)
- +N - Begin reading N records from the start of each input partition
- -N - Begin reading N records prior to the end of each input partition
Note that the broker will only respect from-offset on the first deploy for a given
transform. Re-deploying the transform will cause processing to pick up at the last
committed offset. Recall that this state is maintained until the transform is deleted.
`,
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
Expand Down Expand Up @@ -108,13 +120,17 @@ The --var flag can be repeated to specify multiple variables like so:
}
out.MaybeDieErr(err)

offset, err := parseOffset(cfg.FromOffset)
out.MaybeDieErr(err)

t := adminapi.TransformMetadata{
InputTopic: cfg.InputTopic,
OutputTopics: cfg.OutputTopics,
Name: cfg.Name,
Status: nil,
Environment: mapToEnvVars(cfg.Env),
CompressionMode: cfg.Compression,
FromOffset: offset,
}
if p.FromCloud && !p.CloudCluster.IsServerless() {
url, err := p.CloudCluster.CheckClusterURL()
Expand Down Expand Up @@ -154,6 +170,7 @@ The --var flag can be repeated to specify multiple variables like so:
cmd.Flags().StringVar(&fc.functionName, "name", "", "The name of the transform")
cmd.Flags().Var(&fc.env, "var", "Specify an environment variable in the form of KEY=VALUE")
cmd.Flags().StringVar(&fc.compression, "compression", "", "Output batch compression type")
cmd.Flags().StringVar(&fc.fromOffset, "from-offset", "", "Process an input topic partition from a relative offset; check help text for more information")
return cmd
}

Expand Down Expand Up @@ -202,6 +219,7 @@ type deployFlagConfig struct {
functionName string
env environment
compression string
fromOffset string
}

// ToProjectConfig creates a project.Config from the specified command line flags.
Expand All @@ -211,6 +229,7 @@ func (fc deployFlagConfig) ToProjectConfig() (out project.Config) {
out.OutputTopics = fc.outputTopics
out.Env = fc.env.vars
out.Compression = fc.compression
out.FromOffset = fc.fromOffset
return out
}

Expand Down Expand Up @@ -244,6 +263,10 @@ func mergeProjectConfigs(lhs project.Config, rhs project.Config) (out project.Co
out.Compression = rhs.Compression
}

if rhs.FromOffset != "" {
out.FromOffset = rhs.FromOffset
}

return out
}

Expand Down Expand Up @@ -310,6 +333,31 @@ func mapToEnvVars(env map[string]string) (vars []adminapi.EnvironmentVariable) {
return
}

// parseOffset converts a string formatted offset to the adminapi offset type
func parseOffset(formatted_offset string) (*adminapi.Offset, error) {
if formatted_offset == "" {
return nil, nil
}
format := ""
switch pfx := formatted_offset[0:1]; pfx {
case "@":
format = "timestamp"
case "+":
format = "from_start"
case "-":
format = "from_end"
default:
return nil, fmt.Errorf("Bad prefix: expected one of ['@','+','-'], got: %q", pfx)
}

val, err := strconv.ParseInt(formatted_offset[1:], 10, 64)
if err != nil {
return nil, fmt.Errorf("Bad offset: parse error '%v'", err)
}

return &adminapi.Offset{Format: format, Value: val}, nil
}

func adminApiToDataplaneMetadata(m adminapi.TransformMetadata) *dataplanev1alpha1.DeployTransformRequest {
var envs []*dataplanev1alpha1.TransformMetadata_EnvironmentVariable
for _, e := range m.Environment {
Expand Down
1 change: 1 addition & 0 deletions src/go/rpk/pkg/cli/transform/project/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type Config struct {
Language WasmLang `yaml:"language"`
Env map[string]string `yaml:"env,omitempty"`
Compression string `yaml:"compression,omitempty"`
FromOffset string `yaml:"from-offset,omitempty"`
}

var ConfigFileName = "transform.yaml"
Expand Down
2 changes: 2 additions & 0 deletions src/v/features/feature_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ std::string_view to_string_view(feature f) {
return "data_migrations";
case feature::group_tx_fence_dedicated_batch_type:
return "group_tx_fence_dedicated_batch_type";
case feature::transforms_specify_offset:
return "transforms_specify_offset";

/*
* testing features
Expand Down
7 changes: 7 additions & 0 deletions src/v/features/feature_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ enum class feature : std::uint64_t {
unified_tx_state = 1ULL << 47U,
data_migrations = 1ULL << 48U,
group_tx_fence_dedicated_batch_type = 1ULL << 49U,
transforms_specify_offset = 1ULL << 50U,

// Dummy features for testing only
test_alpha = 1ULL << 61U,
Expand Down Expand Up @@ -402,6 +403,12 @@ constexpr static std::array feature_schema{
feature::group_tx_fence_dedicated_batch_type,
feature_spec::available_policy::always,
feature_spec::prepare_policy::always},
feature_spec{
cluster::cluster_version{13},
"transforms_specify_offset",
feature::transforms_specify_offset,
feature_spec::available_policy::always,
feature_spec::prepare_policy::always},
};

std::string_view to_string_view(feature);
Expand Down
19 changes: 19 additions & 0 deletions src/v/model/fundamental.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
namespace kafka {

using offset = named_type<int64_t, struct kafka_offset_type>;
using offset_delta = named_type<int64_t, struct kafka_offset_delta_type>;

inline offset next_offset(offset p) {
if (p < offset{0}) {
Expand All @@ -53,6 +54,24 @@ inline constexpr offset prev_offset(offset o) {
return o - offset{1};
}

inline constexpr offset operator+(offset o, offset_delta d) {
if (o >= offset{0}) {
if (d() <= offset::max() - o) {
return offset{o() + d()};
} else {
return offset::max();
}
} else {
if (d() >= offset::min() - o) {
return offset{o() + d()};
} else {
return offset::min();
}
}
}

inline constexpr offset operator-(offset o, offset_delta d) { return o + (-d); }

} // namespace kafka

namespace cloud_storage_clients {
Expand Down
88 changes: 88 additions & 0 deletions src/v/model/transform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
#include "model/fundamental.h"
#include "model/record.h"
#include "model/record_batch_types.h"
#include "serde/rw/bool_class.h"
#include "serde/rw/map.h"
#include "serde/rw/rw.h"
#include "serde/rw/uuid.h"
#include "serde/rw/vector.h"
#include "utils/vint.h"

#include <seastar/core/print.hh>
Expand Down Expand Up @@ -74,6 +78,16 @@ void append_vint_to_iobuf(iobuf& b, int64_t v) {

} // namespace

std::ostream& operator<<(std::ostream& os, const transform_from_start& o) {
fmt::print(os, "{{ start + {} }}", o.delta);
return os;
}

std::ostream& operator<<(std::ostream& os, const transform_from_end& o) {
fmt::print(os, "{{ end - {} }}", o.delta);
return os;
}

std::ostream&
operator<<(std::ostream& os, const transform_offset_options& opts) {
ss::visit(
Expand All @@ -83,10 +97,64 @@ operator<<(std::ostream& os, const transform_offset_options& opts) {
},
[&os](model::timestamp ts) {
fmt::print(os, "{{ timequery: {} }}", ts.value());
},
[&os](model::transform_from_start off) {
fmt::print(os, "{{ offset: {} }}", off);
},
[&os](model::transform_from_end off) {
fmt::print(os, "{{ offset: {} }}", off);
});
return os;
}

/**
* Legacy version of transform_offset_options
*
* When writing out transform metadata, we write this version if and
* only if the position variant contains one of the legacy alternatives.
* This allows us to feature-gate the use of the new alternatives
* (see model/transform.h) without completely blocking transform deploys
* during an upgrade.
*/
struct legacy_transform_offset_options
: serde::envelope<
transform_offset_options,
serde::version<0>,
serde::compat_version<0>> {
serde::variant<transform_offset_options::latest_offset, model::timestamp>
position;
bool operator==(const legacy_transform_offset_options&) const = default;

void serde_write(iobuf& out) const { serde::write(out, position); }
};

bool transform_offset_options::is_legacy_compat() const {
return std::holds_alternative<transform_offset_options::latest_offset>(
position)
|| std::holds_alternative<model::timestamp>(position);
}

void transform_offset_options::serde_read(
iobuf_parser& in, const serde::header& h) {
using serde::read_nested;

if (h._version == 0) {
auto p = read_nested<serde::variant<latest_offset, model::timestamp>>(
in, h._bytes_left_limit);
ss::visit(p, [this](auto v) { position = v; });
} else {
position = read_nested<decltype(position)>(in, h._bytes_left_limit);
}

if (in.bytes_left() > h._bytes_left_limit) {
in.skip(in.bytes_left() - h._bytes_left_limit);
}
}

void transform_offset_options::serde_write(iobuf& out) const {
serde::write(out, position);
}

std::ostream& operator<<(std::ostream& os, const transform_metadata& meta) {
fmt::print(
os,
Expand All @@ -102,6 +170,26 @@ std::ostream& operator<<(std::ostream& os, const transform_metadata& meta) {
return os;
}

void transform_metadata::serde_write(iobuf& out) const {
serde::write(out, name);
write(out, input_topic);
serde::write(out, output_topics);
serde::write(out, environment);
serde::write(out, uuid);
serde::write(out, source_ptr);
ss::visit(
offset_options.position,
[&out](transform_offset_options::latest_offset v) {
serde::write(out, legacy_transform_offset_options{.position = v});
},
[&out](model::timestamp v) {
serde::write(out, legacy_transform_offset_options{.position = v});
},
[this, &out](auto) { serde::write(out, offset_options); });
serde::write(out, paused);
serde::write(out, compression_mode);
}

bool transform_metadata_patch::empty() const noexcept {
return !env.has_value() && !paused.has_value()
&& !compression_mode.has_value();
Expand Down
Loading

0 comments on commit 64090fc

Please sign in to comment.