Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lua cluster_specifier: give access to cluster connection/request counts #36998

Merged
merged 1 commit into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ new_features:
- area: lua
change: |
Added ssl :ref:`parsedSubjectPeerCertificate() <config_http_filters_lua_parsed_name>` API.
- area: lua cluster specifier
change: |
Added ability for a Lua script to query clusters for current requests and connections.
- area: udp_proxy
change: |
Added support for dynamic cluster selection in UDP proxy. The cluster can be set by one of the session filters
Expand Down
42 changes: 42 additions & 0 deletions docs/root/configuration/http/cluster_specifier/lua.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ Returns the stream's headers. The headers can be used to select a specific clust

Returns a :ref:`header object <config_lua_cluster_specifier_header_wrapper>`.

``getCluster()``
++++++++++++++++

.. code-block:: lua

local cluster = route_header:getCluster('my_cluster')

Returns a :ref:`handle <config_lua_cluster_specifier_cluster>` to the specified cluster, or nil if it isn't found.

.. _config_lua_cluster_specifier_header_wrapper:

Header object API
Expand All @@ -90,3 +99,36 @@ This method gets a header.
Returns either a string containing the header value, or ``nil`` if the header does not exist.

If there are multiple headers in the same case-insensitive key, their values will be concatenated to a string separated by ``,``.

.. _config_lua_cluster_specifier_cluster:

Cluster API
-----------

``numConnections()``
++++++++++++++++++++

.. code-block:: lua

cluster:numConnections()

This method gets the current number of connections for this cluster.

``numRequests()``
+++++++++++++++++

.. code-block:: lua

cluster:numRequests()

This method gets the current number of requests for this cluster.

``numPendingRequests()``
++++++++++++++++++++++++

.. code-block:: lua

cluster:numPendingRequests()

This method gets the current number of pending requests for this cluster.

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ PerLuaCodeSetup::PerLuaCodeSetup(const std::string& lua_code, ThreadLocal::SlotA
: lua_state_(lua_code, tls) {
lua_state_.registerType<HeaderMapWrapper>();
lua_state_.registerType<RouteHandleWrapper>();
lua_state_.registerType<ClusterWrapper>();

const Filters::Common::Lua::InitializerList initializers;

Expand All @@ -34,6 +35,30 @@ int HeaderMapWrapper::luaGet(lua_State* state) {
}
}

int ClusterWrapper::luaNumConnections(lua_State* state) {
uint64_t count =
cluster_->resourceManager(Upstream::ResourcePriority::Default).connections().count() +
cluster_->resourceManager(Upstream::ResourcePriority::High).connections().count();
lua_pushinteger(state, count);
return 1;
}

int ClusterWrapper::luaNumRequests(lua_State* state) {
uint64_t count =
cluster_->resourceManager(Upstream::ResourcePriority::Default).requests().count() +
cluster_->resourceManager(Upstream::ResourcePriority::High).requests().count();
lua_pushinteger(state, count);
return 1;
}

int ClusterWrapper::luaNumPendingRequests(lua_State* state) {
uint64_t count =
cluster_->resourceManager(Upstream::ResourcePriority::Default).pendingRequests().count() +
cluster_->resourceManager(Upstream::ResourcePriority::High).pendingRequests().count();
lua_pushinteger(state, count);
return 1;
}

int RouteHandleWrapper::luaHeaders(lua_State* state) {
if (headers_wrapper_.get() != nullptr) {
headers_wrapper_.pushStack();
Expand All @@ -43,10 +68,24 @@ int RouteHandleWrapper::luaHeaders(lua_State* state) {
return 1;
}

int RouteHandleWrapper::luaGetCluster(lua_State* state) {
size_t cluster_name_len = 0;
const char* cluster_name = luaL_checklstring(state, 2, &cluster_name_len);
Upstream::ThreadLocalCluster* cluster =
cm_.getThreadLocalCluster(absl::string_view(cluster_name, cluster_name_len));
if (cluster == nullptr) {
return 0;
}

clusters_.emplace_back(ClusterWrapper::create(state, cluster->info()), true);

return 1;
}

LuaClusterSpecifierConfig::LuaClusterSpecifierConfig(
const LuaClusterSpecifierConfigProto& config,
Server::Configuration::CommonFactoryContext& context)
: main_thread_dispatcher_(context.mainThreadDispatcher()),
: main_thread_dispatcher_(context.mainThreadDispatcher()), cm_(context.clusterManager()),
default_cluster_(config.default_cluster()) {
const std::string code_str = THROW_OR_RETURN_VALUE(
Config::DataSource::read(config.source_code(), true, context.api()), std::string);
Expand All @@ -65,7 +104,8 @@ std::string LuaClusterSpecifierPlugin::startLua(const Http::HeaderMap& headers)
Filters::Common::Lua::CoroutinePtr coroutine = config_->perLuaCodeSetup()->createCoroutine();

RouteHandleRef handle;
handle.reset(RouteHandleWrapper::create(coroutine->luaState(), headers), true);
handle.reset(
RouteHandleWrapper::create(coroutine->luaState(), headers, config_->clusterManager()), true);

TRY_NEEDS_AUDIT {
coroutine->start(function_ref_, 1, []() {});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,39 @@ class HeaderMapWrapper : public Filters::Common::Lua::BaseLuaObject<HeaderMapWra

using HeaderMapRef = Filters::Common::Lua::LuaDeathRef<HeaderMapWrapper>;

class RouteHandleWrapper : public Filters::Common::Lua::BaseLuaObject<RouteHandleWrapper> {
class ClusterWrapper : public Filters::Common::Lua::BaseLuaObject<ClusterWrapper> {
public:
RouteHandleWrapper(const Http::HeaderMap& headers) : headers_(headers) {}
ClusterWrapper(Upstream::ClusterInfoConstSharedPtr cluster) : cluster_(cluster) {}

static ExportedFunctions exportedFunctions() {
return {
{"numConnections", static_luaNumConnections},
{"numRequests", static_luaNumRequests},
{"numPendingRequests", static_luaNumPendingRequests},
};
}

private:
DECLARE_LUA_FUNCTION(ClusterWrapper, luaNumConnections);
DECLARE_LUA_FUNCTION(ClusterWrapper, luaNumRequests);
DECLARE_LUA_FUNCTION(ClusterWrapper, luaNumPendingRequests);

Upstream::ClusterInfoConstSharedPtr cluster_;
};

using ClusterRef = Filters::Common::Lua::LuaRef<ClusterWrapper>;

static ExportedFunctions exportedFunctions() { return {{"headers", static_luaHeaders}}; }
class RouteHandleWrapper : public Filters::Common::Lua::BaseLuaObject<RouteHandleWrapper> {
public:
RouteHandleWrapper(const Http::HeaderMap& headers, Upstream::ClusterManager& cm)
: headers_(headers), cm_(cm) {}

static ExportedFunctions exportedFunctions() {
return {
{"headers", static_luaHeaders},
{"getCluster", static_luaGetCluster},
};
}

// All embedded references should be reset when the object is marked dead. This is to ensure that
// we won't do the resetting in the destructor, which may be called after the referenced
Expand All @@ -70,9 +98,12 @@ class RouteHandleWrapper : public Filters::Common::Lua::BaseLuaObject<RouteHandl
* @return a handle to the headers.
*/
DECLARE_LUA_FUNCTION(RouteHandleWrapper, luaHeaders);
DECLARE_LUA_FUNCTION(RouteHandleWrapper, luaGetCluster);

const Http::HeaderMap& headers_;
Upstream::ClusterManager& cm_;
HeaderMapRef headers_wrapper_;
std::vector<ClusterRef> clusters_;
};

using RouteHandleRef = Filters::Common::Lua::LuaDeathRef<RouteHandleWrapper>;
Expand Down Expand Up @@ -103,9 +134,11 @@ class LuaClusterSpecifierConfig : Logger::Loggable<Logger::Id::lua> {

PerLuaCodeSetup* perLuaCodeSetup() const { return per_lua_code_setup_ptr_.get(); }
const std::string& defaultCluster() const { return default_cluster_; }
Upstream::ClusterManager& clusterManager() { return cm_; }

private:
Event::Dispatcher& main_thread_dispatcher_;
Upstream::ClusterManager& cm_;
PerLuaCodeSetupPtr per_lua_code_setup_ptr_;
const std::string default_cluster_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace Lua {
using testing::InSequence;
using testing::NiceMock;
using testing::Return;
using testing::ReturnRef;

class LuaClusterSpecifierPluginTest : public testing::Test {
public:
Expand Down Expand Up @@ -172,6 +173,100 @@ TEST_F(LuaClusterSpecifierPluginTest, DestructLuaClusterSpecifierConfigDisableRu
config_.reset();
}

TEST_F(LuaClusterSpecifierPluginTest, GetClustersBadArg) {
const std::string config = R"EOF(
source_code:
inline_string: |
function envoy_on_route(route_handle)
local wrong_type = true
route_handle:getCluster(wrong_type)
return "completed"
end
default_cluster: default_service
)EOF";
setUpTest(config);

auto mock_route = std::make_shared<NiceMock<Envoy::Router::MockRoute>>();
Http::TestRequestHeaderMapImpl headers{{":path", "/"}};
auto route = plugin_->route(mock_route, headers);
EXPECT_EQ("default_service", route->routeEntry()->clusterName());
}

TEST_F(LuaClusterSpecifierPluginTest, GetClustersMissing) {
const std::string config = R"EOF(
source_code:
inline_string: |
function envoy_on_route(route_handle)
local result = route_handle:getCluster("not found cluster name")
if result == nil then
return "nil"
end
return "not-nil"
end
default_cluster: default_service
)EOF";
setUpTest(config);

EXPECT_CALL(server_factory_context_.cluster_manager_,
getThreadLocalCluster(absl::string_view("not found cluster name")))
.WillOnce(Return(nullptr));

auto mock_route = std::make_shared<NiceMock<Envoy::Router::MockRoute>>();
Http::TestRequestHeaderMapImpl headers{{":path", "/"}};
auto route = plugin_->route(mock_route, headers);
EXPECT_EQ("nil", route->routeEntry()->clusterName());
}

TEST_F(LuaClusterSpecifierPluginTest, ClusterMethods) {
const std::string config = R"EOF(
source_code:
inline_string: |
function envoy_on_route(route_handle)
local result = route_handle:getCluster("my_cluster")
if result == nil then
return "fail: nil cluster"
end
if result:numConnections() ~= 2 then
return "fail: wrong num connections"
end
if result:numRequests() ~= 4 then
return "fail: wrong num requests"
end
if result:numPendingRequests() ~= 6 then
return "fail: wrong num pending requests"
end
return "pass"
end
default_cluster: default_service
)EOF";
setUpTest(config);

NiceMock<Upstream::MockThreadLocalCluster> cluster;
EXPECT_CALL(server_factory_context_.cluster_manager_,
getThreadLocalCluster(absl::string_view("my_cluster")))
.WillOnce(Return(&cluster));

// The mock object returns the same resource manager for both the default and high priority,
// so the counts in this test seen by lua are all doubled.

// 1 connection.
cluster.cluster_.info_->resource_manager_->connections().inc();

// 2 requests.
cluster.cluster_.info_->resource_manager_->requests().inc();
cluster.cluster_.info_->resource_manager_->requests().inc();

// 3 pending requests.
cluster.cluster_.info_->resource_manager_->pendingRequests().inc();
cluster.cluster_.info_->resource_manager_->pendingRequests().inc();
cluster.cluster_.info_->resource_manager_->pendingRequests().inc();

auto mock_route = std::make_shared<NiceMock<Envoy::Router::MockRoute>>();
Http::TestRequestHeaderMapImpl headers{{":path", "/"}};
auto route = plugin_->route(mock_route, headers);
EXPECT_EQ("pass", route->routeEntry()->clusterName());
}

} // namespace Lua
} // namespace Router
} // namespace Extensions
Expand Down