apisix 中的服务发现机制
基于 3.10.0 版本
机制
0. 入口
在 apisix 的ngx_tpl.lua中
init_worker_by_lua_block {
apisix.http_init_worker()
}
apisix/init.lua
local router = require("apisix.router")
function _M.http_init_worker()
.....
local discovery = require("apisix.discovery.init").discovery
if discovery and discovery.init_worker then
discovery.init_worker()
end
.....
end
1. discovery.init_worker
local discovery_type = local_conf.discovery
local discovery = {}
if discovery_type then
for discovery_name, _ in pairs(discovery_type) do
log.info("use discovery: ", discovery_name)
discovery[discovery_name] = require("apisix.discovery." .. discovery_name)
end
end
function discovery.init_worker()
if discovery_type then
for discovery_name, _ in pairs(discovery_type) do
discovery[discovery_name].init_worker()
end
end
end
根据配置文件中配置的服务发现类型,加载对应的模块,在 init_worker
中调用对应模块的init_worker
假设配置文件 config.yaml
中我们配置的是 dns (最简单的实现,其他的实现原理上一一样的)
discovery: # Service Discovery
dns:
servers:
- "127.0.0.1:8600" # Replace with the address of your DNS server.
resolv_conf: /etc/resolv.conf # Replace with the path to the local DNS resolv config. Configure either "servers" or "resolv_conf".
order: # Resolve DNS records this order.
- last # Try the latest successful type for a hostname.
- SRV
- A
- AAAA
- CNAME
2. apisix.discovery.dns
一共两个方法 (实现其他类型服务发现同理需要实现这两个方法)
init_worker()
初始化,根据配置,构建一个client
nodes(service_name)
根据 service_name, 使用client做服务发现,返回 service_name 对应可用的 nodes
function _M.init_worker()
......
local client, err = core.dns_client.new(opts)
if not client then
error("failed to init the dns client: ", err)
return
end
dns_client = client
end
unction _M.nodes(service_name)
local host, port = core.utils.parse_addr(service_name)
core.log.info("discovery dns with host ", host, ", port ", port)
local records, err = dns_client:resolve(host, core.dns_client.RETURN_ALL)
if not records then
return nil, err
end
local nodes = core.table.new(#records, 0)
local index = 1
for _, r in ipairs(records) do
......
end
return nodes
end
nodes示例
[
{
"host": "192.168.1.100",
"port": 8761,
"weight": 100,
"metadata": {
"management.port": "8761"
}
}
]
3. 调用点在哪里?
local set_upstream = apisix_upstream.set_by_route
-- from access phase
function _M.http_access_phase()
......
_M.handle_upstream(api_ctx, route, enable_websocket)
end
function _M.handle_upstream(api_ctx, route, enable_websocket)
......
local code, err = set_upstream(route, api_ctx)
......
end
function _M.set_by_route(route, api_ctx)
......
-- 如果 upstream 中有配置 service_name, 则需要服务发现
if up_conf.service_name then
......
-- 获取服务发现类型对应的实例
local dis = discovery[up_conf.discovery_type]
-- 根据service_name 获取 nodes
local new_nodes, err = dis.nodes(up_conf.service_name, up_conf.discovery_args)
-- 跟之前的比较是否一致
local same = upstream_util.compare_upstream_node(up_conf, new_nodes)
if not same then
......
-- 这里设置了 新的节点
up_conf.nodes = new_nodes
up_conf.original_nodes = up_conf.nodes
local new_up_conf = core.table.clone(up_conf)
local parent = up_conf.parent
if parent.value.upstream then
-- the up_conf comes from route or service
parent.value.upstream = new_up_conf
else
parent.value = new_up_conf
end
up_conf = new_up_conf
end
end
local id = up_conf.parent.value.id
local conf_version = up_conf.parent.modifiedIndex
-- include the upstream object as part of the version, because the upstream will be changed
-- by service discovery or dns resolver.
set_directly(api_ctx, id, conf_version .. "#" .. tostring(up_conf), up_conf)
local function set_directly(ctx, key, ver, conf)
......
-- 注意这里,更新了 api_ctx 的这几个字段
ctx.upstream_conf = conf
ctx.upstream_version = ver
ctx.upstream_key = key
return
end
-- pick_server will be called:
-- 1. in the access phase so that we can set headers according to the picked server
-- 2. each time we need to retry upstream
local function pick_server(route, ctx)
......
local version = ctx.upstream_version
local key = ctx.upstream_key
local checker = ctx.up_checker
-- the same picker will be used in the whole request, especially during the retry
local server_picker = ctx.server_picker
if not server_picker then
server_picker = lrucache_server_picker(key, version,
create_server_picker, up_conf, checker)
end
......
local server, err = server_picker.get(ctx)
这里,如果服务发现导致 upstream_key/upstream_version
变化,那么意味着 server_picker 对应的 lrucache 会失效,进入 create_server_picker
的逻辑
数据面服务发现的问题
如果服务发现获取上游的 node 数据变更非常频繁
- 会被对比出来,有差异
- 会多一些赋值
- 负载均衡lrucache会失效重建,重建时,如果配置有主动健康检查,还会触发主动健康检查获取健康的节点列表
这样会导致服务出现抖动。
基于控制面的服务发现
官方有一个基于控制面的服务发现项目 api7/apisix-seed: Do service discovery on the CP side
通过订阅 service_name的变更, 获取服务发现的变更,写入到etcd