Skip to content

Commit

Permalink
feature: support discovery center (apache#1440)
Browse files Browse the repository at this point in the history
  • Loading branch information
qiujiayu authored and SaberMaster committed Jun 30, 2020
1 parent 77646fd commit 673c074
Show file tree
Hide file tree
Showing 25 changed files with 2,033 additions and 114 deletions.
2 changes: 2 additions & 0 deletions .travis/linux_openresty_runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ before_install() {
docker network create kafka-net --driver bridge
docker run --name zookeeper-server -d -p 2181:2181 --network kafka-net -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0
docker run --name kafka-server1 -d --network kafka-net -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 -p 9092:9092 -e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest
docker pull bitinit/eureka
docker run --name eureka -d -p 8761:8761 --env ENVIRONMENT=apisix --env spring.application.name=apisix-eureka --env server.port=8761 --env eureka.instance.ip-address=127.0.0.1 --env eureka.client.registerWithEureka=true --env eureka.client.fetchRegistry=false --env eureka.client.serviceUrl.defaultZone=http://127.0.0.1:8761/eureka/ bitinit/eureka
sleep 5
docker exec -it kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 1 --topic test2
}
Expand Down
2 changes: 2 additions & 0 deletions .travis/linux_tengine_runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ before_install() {
docker network create kafka-net --driver bridge
docker run --name zookeeper-server -d -p 2181:2181 --network kafka-net -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0
docker run --name kafka-server1 -d --network kafka-net -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 -p 9092:9092 -e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest
docker pull bitinit/eureka
docker run --name eureka -d -p 8761:8761 --env ENVIRONMENT=apisix --env spring.application.name=apisix-eureka --env server.port=8761 --env eureka.instance.ip-address=127.0.0.1 --env eureka.client.registerWithEureka=true --env eureka.client.fetchRegistry=false --env eureka.client.serviceUrl.defaultZone=http://127.0.0.1:8761/eureka/ bitinit/eureka
sleep 5
docker exec -it kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 1 --topic test2
}
Expand Down
2 changes: 1 addition & 1 deletion .travis/osx_openresty_runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ do_install() {
git clone https://github.com/iresty/test-nginx.git test-nginx

wget -P utils https://raw.githubusercontent.com/openresty/openresty-devel-utils/master/lj-releng
chmod a+x utils/lj-releng
chmod a+x utils/lj-releng

wget https://github.com/iresty/grpc_server_example/releases/download/20200314/grpc_server_example-darwin-amd64.tar.gz
tar -xvf grpc_server_example-darwin-amd64.tar.gz
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ install:
$(INSTALL) -d $(INST_LUADIR)/apisix/http/router
$(INSTALL) apisix/http/router/*.lua $(INST_LUADIR)/apisix/http/router/

$(INSTALL) -d $(INST_LUADIR)/apisix/discovery
$(INSTALL) apisix/discovery/*.lua $(INST_LUADIR)/apisix/discovery/

$(INSTALL) -d $(INST_LUADIR)/apisix/plugins
$(INSTALL) apisix/plugins/*.lua $(INST_LUADIR)/apisix/plugins/

Expand Down
96 changes: 64 additions & 32 deletions apisix/balancer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
--
local healthcheck = require("resty.healthcheck")
local roundrobin = require("resty.roundrobin")
local discovery = require("apisix.discovery.init").discovery
local resty_chash = require("resty.chash")
local balancer = require("ngx.balancer")
local core = require("apisix.core")
local error = error
local str_char = string.char
local str_gsub = string.gsub
local pairs = pairs
local ipairs = ipairs
local tostring = tostring
local set_more_tries = balancer.set_more_tries
local get_last_failure = balancer.get_last_failure
Expand All @@ -42,30 +44,37 @@ local lrucache_checker = core.lrucache.new({


local _M = {
version = 0.1,
version = 0.2,
name = module_name,
}


local function fetch_health_nodes(upstream, checker)
local nodes = upstream.nodes
if not checker then
return upstream.nodes
local new_nodes = core.table.new(0, #nodes)
for _, node in ipairs(nodes) do
-- TODO filter with metadata
new_nodes[node.host .. ":" .. node.port] = node.weight
end
return new_nodes
end

local host = upstream.checks and upstream.checks.host
local up_nodes = core.table.new(0, core.table.nkeys(upstream.nodes))

for addr, weight in pairs(upstream.nodes) do
local ip, port = core.utils.parse_addr(addr)
local ok = checker:get_target_status(ip, port, host)
local up_nodes = core.table.new(0, #nodes)
for _, node in ipairs(nodes) do
local ok = checker:get_target_status(node.host, node.port, host)
if ok then
up_nodes[addr] = weight
-- TODO filter with metadata
up_nodes[node.host .. ":" .. node.port] = node.weight
end
end

if core.table.nkeys(up_nodes) == 0 then
core.log.warn("all upstream nodes is unhealth, use default")
up_nodes = upstream.nodes
for _, node in ipairs(nodes) do
up_nodes[node.host .. ":" .. node.port] = node.weight
end
end

return up_nodes
Expand All @@ -78,13 +87,11 @@ local function create_checker(upstream, healthcheck_parent)
shm_name = "upstream-healthcheck",
checks = upstream.checks,
})

for addr, weight in pairs(upstream.nodes) do
local ip, port = core.utils.parse_addr(addr)
local ok, err = checker:add_target(ip, port, upstream.checks.host)
for _, node in ipairs(upstream.nodes) do
local ok, err = checker:add_target(node.host, node.port, upstream.checks.host)
if not ok then
core.log.error("failed to add new health check target: ", addr,
" err: ", err)
core.log.error("failed to add new health check target: ", node.host, ":", node.port,
" err: ", err)
end
end

Expand Down Expand Up @@ -230,7 +237,14 @@ local function pick_server(route, ctx)
key = up_conf.type .. "#route_" .. route.value.id
end

if core.table.nkeys(up_conf.nodes) == 0 then
if up_conf.service_name then
if not discovery then
return nil, nil, "discovery is uninitialized"
end
up_conf.nodes = discovery.nodes(up_conf.service_name)
end

if not up_conf.nodes or #up_conf.nodes == 0 then
return nil, nil, "no valid upstream node"
end

Expand All @@ -256,11 +270,10 @@ local function pick_server(route, ctx)

if ctx.balancer_try_count == 1 then
local retries = up_conf.retries
if retries and retries > 0 then
set_more_tries(retries)
else
set_more_tries(core.table.nkeys(up_conf.nodes))
if not retries or retries <= 0 then
retries = #up_conf.nodes
end
set_more_tries(retries)
end

if checker then
Expand All @@ -281,8 +294,7 @@ local function pick_server(route, ctx)

if up_conf.timeout then
local timeout = up_conf.timeout
local ok, err = set_timeouts(timeout.connect, timeout.send,
timeout.read)
local ok, err = set_timeouts(timeout.connect, timeout.send, timeout.read)
if not ok then
core.log.error("could not set upstream timeouts: ", err)
end
Expand All @@ -291,9 +303,11 @@ local function pick_server(route, ctx)
local ip, port, err = core.utils.parse_addr(server)
ctx.balancer_ip = ip
ctx.balancer_port = port

core.log.info("proxy to ", ip, ":", port)
return ip, port, err
end


-- for test
_M.pick_server = pick_server

Expand Down Expand Up @@ -323,21 +337,39 @@ function _M.init_worker()
item_schema = core.schema.upstream,
filter = function(upstream)
upstream.has_domain = false
if not upstream.value then
if not upstream.value or not upstream.value.nodes then
return
end

for addr, _ in pairs(upstream.value.nodes or {}) do
local host = core.utils.parse_addr(addr)
if not core.utils.parse_ipv4(host) and
not core.utils.parse_ipv6(host) then
upstream.has_domain = true
break
local nodes = upstream.value.nodes
if core.table.isarray(nodes) then
for _, node in ipairs(nodes) do
local host = node.host
if not core.utils.parse_ipv4(host) and
not core.utils.parse_ipv6(host) then
upstream.has_domain = true
break
end
end
else
local new_nodes = core.table.new(core.table.nkeys(nodes), 0)
for addr, weight in pairs(nodes) do
local host, port = core.utils.parse_addr(addr)
if not core.utils.parse_ipv4(host) and
not core.utils.parse_ipv6(host) then
upstream.has_domain = true
end
local node = {
host = host,
port = port,
weight = weight,
}
core.table.insert(new_nodes, node)
end
upstream.value.nodes = new_nodes
end

core.log.info("filter upstream: ",
core.json.delay_encode(upstream))
core.log.info("filter upstream: ", core.json.delay_encode(upstream))
end,
})
if not upstreams_etcd then
Expand Down
3 changes: 2 additions & 1 deletion apisix/core/table.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ local type = type


local _M = {
version = 0.1,
version = 0.2,
new = new_tab,
clear = require("table.clear"),
nkeys = nkeys,
insert = table.insert,
concat = table.concat,
clone = require("table.clone"),
isarray = require("table.isarray"),
}


Expand Down
Loading

0 comments on commit 673c074

Please sign in to comment.