-
Notifications
You must be signed in to change notification settings - Fork 592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CORE-2736] Adds output batch compression for Data Transforms #18514
[CORE-2736] Adds output batch compression for Data Transforms #18514
Conversation
4adf296
to
58df36c
Compare
/ci-repeat 1 |
/ci-repeat 1 |
new failures in https://buildkite.com/redpanda/redpanda/builds/49248#018f82f5-d2fc-4bd5-b204-a1774afd79bf:
new failures in https://buildkite.com/redpanda/redpanda/builds/49248#018f82f5-d2ff-46d2-8840-d9e063a709e2:
new failures in https://buildkite.com/redpanda/redpanda/builds/50134#019009db-8de9-47d5-a29d-ba3e1ac43ce8:
|
d9f6691
to
f13207f
Compare
src/go/rpk/pkg/cli/transform/meta.go
Outdated
at a cost. So, while it does occur asynchronously with respect to transform | ||
execution, compression may introduce latency on the output topic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at a cost. So, while it does occur asynchronously with respect to transform | |
execution, compression may introduce latency on the output topic. | |
at a cost. So, while it occurs asynchronously with respect to transform | |
execution, compression may introduce latency on the output topic. |
f13207f
to
1e48f71
Compare
force push to sync w/ dev after a couple weeks of inactivity |
/ci-repeat 1 |
src/v/model/model.cc
Outdated
.match("producer", compression::producer); | ||
} catch (std::runtime_error& e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe could avoid the try/catch with
string_switch<std::optional>(c)
...
.default_match(std::nullopt);
if (!c.has_value()) {
i.setstate(...)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair point. force of habit from regular switch, but static analysis is no help here obviously.
@@ -170,6 +173,10 @@ struct transform_metadata_patch { | |||
std::optional<absl::flat_hash_map<ss::sstring, ss::sstring>> env; | |||
// Desired paused state for the transform | |||
std::optional<is_transform_paused> paused; | |||
// Desired compression mode for the transform | |||
std::optional<compression> compression_mode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the difference between compression_mode:none and nullopt here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in this context, whether or not to update the setting on persisted metadata. comments on these fields could be better.
d6b3bb0
to
4c2460b
Compare
force push contents:
|
CI Failure is some pandatriage issue |
@@ -233,12 +240,16 @@ func mergeProjectConfigs(lhs project.Config, rhs project.Config) (out project.Co | |||
if len(rhs.OutputTopics) > 0 { | |||
out.OutputTopics = rhs.OutputTopics | |||
} | |||
if rhs.Compression != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is why the flag should default to empty string 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's what I was harping on up here 🤷
I have to push again to fix a conflict, so I think I'll change it back. But I (clearly) don't have a strong opinion about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I was agreeing with you 😄
boost::lexical_cast<compression>("bogus) should raise bad_lexical cast. Previous behavior: runtime_error("Fell off the end of a string-switch") Failure to match the source string to one of the compression lexical cases should set the fail bit on the istream instead of allowing the runtime_error to escape. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Also adds bool transform_metadata_patch::empty() Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Respecting transform_metadata::compression_mode Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
- tranform_from_json - deploy Signed-off-by: Oren Leiman <oren.leiman@redpanda.com> dt/rpk
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
4c2460b
to
04cd18e
Compare
force push rebase dev to fix merge conflict |
CI Failure:
|
This PR adds the ability to configure output batch compression on WASM transforms at deploy time or via metadata patch request. Includes rpk experience for the former. The latter case (metadata patch) is available via direct admin API invocation only.
TODO:
Closes CORE-2736
Backports Required
Release Notes
Improvements