Skip to content

Commit

Permalink
Merge pull request redpanda-data#15967 from rockwotj/more-wasm-config
Browse files Browse the repository at this point in the history
wasm: add runtime and max binary configs
  • Loading branch information
piyushredpanda authored Jan 8, 2024
2 parents 049e2f2 + 162effc commit 26c7b9b
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 21 deletions.
18 changes: 18 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,24 @@ configuration::configuration()
2_MiB,
// WebAssembly uses 64KiB pages and has a 32bit address space
{.min = 64_KiB, .max = 4_GiB})
, data_transforms_runtime_limit_ms(
*this,
"data_transforms_runtime_limit_ms",
"The maximum amount of runtime for startup time of a data transform, and "
"the time it takes for a single record to be transformed.",
{.needs_restart = needs_restart::yes, .visibility = visibility::tunable},
3s)
, data_transforms_binary_max_size(
*this,
"data_transforms_binary_max_size",
"The maximum size for a deployable WebAssembly binary that the broker "
"can store.",
{
.needs_restart = needs_restart::no,
.visibility = visibility::tunable,
},
10_MiB,
{.min = 1_MiB, .max = 128_MiB})
, topic_memory_per_partition(
*this,
"topic_memory_per_partition",
Expand Down
2 changes: 2 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ struct configuration final : public config_store {
property<std::chrono::milliseconds> data_transforms_commit_interval_ms;
bounded_property<size_t> data_transforms_per_core_memory_reservation;
bounded_property<size_t> data_transforms_per_function_memory_limit;
property<std::chrono::milliseconds> data_transforms_runtime_limit_ms;
bounded_property<size_t> data_transforms_binary_max_size;

// Controller
bounded_property<std::optional<std::size_t>> topic_memory_per_partition;
Expand Down
11 changes: 10 additions & 1 deletion src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,11 @@ void application::wire_up_runtime_services(
&controller->get_members_table());
}),
&_connection_cache,
&_transform_rpc_service)
&_transform_rpc_service,
ss::sharded_parameter([] {
return config::shard_local_cfg()
.data_transforms_binary_max_size.bind();
}))
.get();

construct_service(
Expand Down Expand Up @@ -2418,8 +2422,13 @@ void application::wire_up_and_start(::stop_signal& app_signal, bool test_mode) {
.stack_memory = {
.debug_host_stack_usage = false,
},
.cpu = {
.per_invocation_timeout = cluster.data_transforms_runtime_limit_ms.value(),
},
};
_wasm_runtime->start(config).get();
_transform_rpc_client.invoke_on_all(&transform::rpc::client::start)
.get();
_transform_service.invoke_on_all(&transform::service::start).get();
}

Expand Down
68 changes: 62 additions & 6 deletions src/v/transform/rpc/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "cluster/errc.h"
#include "cluster/partition_leaders_table.h"
#include "cluster/scheduling/constraints.h"
#include "cluster/types.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/namespace.h"
Expand Down Expand Up @@ -189,14 +190,16 @@ client::client(
std::unique_ptr<topic_creator> t,
std::unique_ptr<cluster_members_cache> m,
ss::sharded<::rpc::connection_cache>* c,
ss::sharded<local_service>* s)
ss::sharded<local_service>* s,
config::binding<size_t> b)
: _self(self)
, _cluster_members(std::move(m))
, _leaders(std::move(l))
, _topic_metadata(std::move(md_cache))
, _topic_creator(std::move(t))
, _connections(c)
, _local_service(s) {}
, _local_service(s)
, _max_wasm_binary_size(std::move(b)) {}

ss::future<cluster::errc> client::produce(
model::topic_partition tp, ss::chunked_fifo<model::record_batch> batches) {
Expand Down Expand Up @@ -235,9 +238,20 @@ ss::future<cluster::errc> client::do_produce_once(produce_request req) {
co_return reply.results.front().err;
}

ss::future<> client::start() {
if (ss::this_shard_id() != 0) {
co_return;
}
ssx::spawn_with_gate(_gate, [this] { return update_wasm_binary_size(); });
_max_wasm_binary_size.watch([this] {
ssx::spawn_with_gate(
_gate, [this] { return update_wasm_binary_size(); });
});
}

ss::future<> client::stop() {
_as.request_abort();
return ss::now();
co_await _gate.close();
}

ss::future<produce_reply> client::do_local_produce(produce_request req) {
Expand Down Expand Up @@ -457,9 +471,7 @@ ss::future<result<iobuf, cluster::errc>> client::do_remote_load_wasm_binary(

ss::future<bool> client::try_create_wasm_binary_ntp() {
cluster::topic_properties topic_props;
// TODO: This should be configurable
constexpr size_t wasm_binaries_max_bytes = 10_MiB;
topic_props.batch_max_bytes = wasm_binaries_max_bytes;
topic_props.batch_max_bytes = _max_wasm_binary_size();
// Mark all these as disabled
topic_props.retention_bytes = tristate<size_t>();
topic_props.retention_local_target_bytes = tristate<size_t>();
Expand Down Expand Up @@ -799,4 +811,48 @@ std::invoke_result_t<Func> client::retry(Func&& func) {
return retry_with_backoff(std::forward<Func>(func), &_as);
}

ss::future<> client::update_wasm_binary_size() {
mutex::units _ = co_await _wasm_binary_max_size_updater_mu.get_units();
auto tn = model::topic_namespace_view(model::wasm_binaries_internal_ntp);
auto config = _topic_metadata->find_topic_cfg(tn);
if (!config) {
// Topic hasn't been created yet.
co_return;
}
if (
config->properties.batch_max_bytes.has_value()
&& config->properties.batch_max_bytes.value()
== uint32_t(_max_wasm_binary_size())) {
// Nothing to do.
co_return;
}
// We need to update the size.
ss::future<cluster::errc> fut
= co_await ss::coroutine::as_future<cluster::errc>(retry([this, tn] {
auto updates = cluster::incremental_topic_updates();
updates.batch_max_bytes.value = _max_wasm_binary_size();
updates.batch_max_bytes.op
= cluster::incremental_update_operation::set;
return _topic_creator->update_topic(
cluster::topic_properties_update(
model::topic_namespace(tn),
updates,
cluster::incremental_topic_custom_updates()));
}));
if (fut.failed()) {
vlog(
log.warn,
"unable to update internal wasm binary topic size: {}",
fut.get_exception());
}
cluster::errc ec = fut.get();
if (ec == cluster::errc::success) {
co_return;
}
vlog(
log.warn,
"unable to update internal wasm binary topic size: {}",
cluster::error_category().message(int(ec)));
}

} // namespace transform::rpc
10 changes: 9 additions & 1 deletion src/v/transform/rpc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "cluster/errc.h"
#include "cluster/fwd.h"
#include "config/property.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/record.h"
Expand Down Expand Up @@ -48,7 +49,8 @@ class client {
std::unique_ptr<topic_creator>,
std::unique_ptr<cluster_members_cache>,
ss::sharded<::rpc::connection_cache>*,
ss::sharded<local_service>*);
ss::sharded<local_service>*,
config::binding<size_t>);
client(client&&) = delete;
client& operator=(client&&) = delete;
client(const client&) = delete;
Expand Down Expand Up @@ -82,6 +84,7 @@ class client {

ss::future<model::cluster_transform_report> generate_report();

ss::future<> start();
ss::future<> stop();

private:
Expand Down Expand Up @@ -149,6 +152,8 @@ class client {
template<typename Func>
std::invoke_result_t<Func> retry(Func&&);

ss::future<> update_wasm_binary_size();

model::node_id _self;
std::unique_ptr<cluster_members_cache> _cluster_members;
// need partition_leaders_table to know which node owns the partitions
Expand All @@ -158,6 +163,9 @@ class client {
ss::sharded<::rpc::connection_cache>* _connections;
ss::sharded<local_service>* _local_service;
ss::abort_source _as;
ss::gate _gate;
mutex _wasm_binary_max_size_updater_mu;
config::binding<size_t> _max_wasm_binary_size;
};

} // namespace transform::rpc
19 changes: 19 additions & 0 deletions src/v/transform/rpc/deps.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include <seastar/core/do_with.hh>
#include <seastar/core/future.hh>
#include <seastar/core/lowres_clock.hh>

#include <memory>
#include <type_traits>
Expand Down Expand Up @@ -272,6 +273,24 @@ class topic_creator_impl : public topic_creator {
}
}

ss::future<cluster::errc>
update_topic(cluster::topic_properties_update update) final {
try {
auto res
= co_await _controller->get_topics_frontend()
.local()
.update_topic_properties(
{update},
ss::lowres_clock::now()
+ config::shard_local_cfg().alter_topic_cfg_timeout_ms());
vassert(res.size() == 1, "expected a single result");
co_return res[0].ec;
} catch (const std::exception& ex) {
vlog(log.warn, "unable to update topic {}: {}", update.tp_ns, ex);
co_return cluster::errc::topic_operation_error;
}
}

private:
cluster::controller* _controller;
};
Expand Down
6 changes: 6 additions & 0 deletions src/v/transform/rpc/deps.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ class topic_creator {
int32_t partition_count,
cluster::topic_properties)
= 0;

/**
* Update a topic.
*/
virtual ss::future<cluster::errc>
update_topic(cluster::topic_properties_update) = 0;
};

/**
Expand Down
Loading

0 comments on commit 26c7b9b

Please sign in to comment.