apisix etcd机制
基于 3.10.0 版本
机制
0. 入口
在 apisix 的ngx_tpl.lua中
init_worker_by_lua_block {
apisix.http_init_worker()
}
apisix/init.lua
local router = require("apisix.router")
function _M.http_init_worker()
.....
router.http_init_worker()
.....
end
1. http_init_worker
function _M.http_init_worker()
local conf = core.config.local_conf()
local router_http_name = "radixtree_uri"
if conf and conf.apisix and conf.apisix.router then
router_http_name = conf.apisix.router.http or router_http_name
......
end
local router_http = require("apisix.http.router." .. router_http_name)
attach_http_router_common_methods(router_http) -- 这里 attach 两个方法给到模块
router_http.init_worker(filter) -- 这里调用 init_worker 完成初始化
_M.router_http = router_http
......
end
其中 attach_http_router_common_methods
local function attach_http_router_common_methods(http_router)
if http_router.routes == nil then
http_router.routes = function ()
if not http_router.user_routes then
return nil, nil
end
-- 返回模块中的 user_routes 的值值和版本
local user_routes = http_router.user_routes
return user_routes.values, user_routes.conf_version
end
end
if http_router.init_worker == nil then
http_router.init_worker = function (filter)
http_router.user_routes = http_route.init_worker(filter)
-- 这里引用 local http_route = require("apisix.http.route") 的 init_worker
-- 获取 routes, 赋值给模块中的 user_routes
end
end
end
http_route.init_worker
从 core.config.new("/routes", {})
function _M.init_worker(filter)
local user_routes, err = core.config.new("/routes", {
automatic = true,
item_schema = core.schema.route,
checker = check_route,
filter = filter,
})
if not user_routes then
error("failed to create etcd instance for fetching /routes : " .. err)
end
return user_routes
end
2. 具体某个router.lua
apisix/http/router
├── radixtree_host_uri.lua
├── radixtree_uri.lua
└── radixtree_uri_with_parameter.lua
即,如果配置文件中有配置 apisix.router.http
, 那么将会使用配置文件中的,否则使用 radixtree_uri
假设我们配置了 apisix.router.http=radixtree_uri_with_parameter
local router_http = require("apisix.http.router.radixtree_uri_with_parameter")
对应代码
local require = require
local core = require("apisix.core")
local base_router = require("apisix.http.route")
local get_services = require("apisix.http.service").services
local cached_router_version
local cached_service_version
local _M = {}
local uri_routes = {}
local uri_router
function _M.match(api_ctx)
local user_routes = _M.user_routes
local _, service_version = get_services()
if not cached_router_version or cached_router_version ~= user_routes.conf_version
or not cached_service_version or cached_service_version ~= service_version
then
uri_router = base_router.create_radixtree_uri_router(user_routes.values,
uri_routes, true)
cached_router_version = user_routes.conf_version
cached_service_version = service_version
end
if not uri_router then
core.log.error("failed to fetch valid `uri_with_parameter` router: ")
return true
end
return _M.matching(api_ctx)
end
function _M.matching(api_ctx)
core.log.info("route match mode: radixtree_uri_with_parameter")
return base_router.match_uri(uri_router, api_ctx)
end
return _M
说明:
- 声明了
match(api_ctx)
和matching(api_ctx)
, 并且前者调用后者 - 声明了模块变量
user_routes
,在启动时,通过http_route.init_worker(filter)
被初始化 - 声明了模块变量
cached_router_version/cached_service_version
, 用于判断当前 router 是否最新,如果 etcd watch到变更,那么会导致这里判断版本不一致,进而调用base_router.create_radixtree_uri_router(user_routes.values, uri_routes, true)
重建radixtree - 这个过程是在
match
中,意味着,数据面在处理请求的时候,执行match
,发现service或route有更新,此时会发生radixtree重建
这个可能会导致问题,见下面【特性跟踪-实时更新】
我们回到原点,关注 user_routes 的初始化
3. user_routes
引用 local http_route = require("apisix.http.route")
的 init_worker
function _M.init_worker(filter)
local user_routes, err = core.config.new("/routes", {
automatic = true, -- 注意这个标志
item_schema = core.schema.route,
checker = check_route,
filter = filter,
})
if not user_routes then
error("failed to create etcd instance for fetching /routes : " .. err)
end
return user_routes
end
local config_provider = local_conf.deployment and local_conf.deployment.config_provider
or "etcd"
log.info("use config_provider: ", config_provider)
local config = require("apisix.core.config_" .. config_provider)
config.type = config_provider
return {
config = config,
}
配置的config_provider=etcd
, 则
local config = require("apisix.core.config_etcd")
则调用的new方法是 apisix/core/config_etcd.lua
---
-- Create a new connection to communicate with the control plane.
-- This function should be used in the `init_worker_by_lua` phase.
--
function _M.new(key, opts)
......
-- etcd 相关配置
local etcd_conf = local_conf.etcd
local prefix = etcd_conf.prefix
local resync_delay = etcd_conf.resync_delay
if not resync_delay or resync_delay < 0 then
resync_delay = 5
end
local health_check_timeout = etcd_conf.health_check_timeout
if not health_check_timeout or health_check_timeout < 0 then
health_check_timeout = 10
end
local automatic = opts and opts.automatic
-- 初始化对象
local obj = setmetatable({
etcd_cli = nil,
key = key and prefix .. key,
automatic = automatic, -- true
item_schema = item_schema,
checker = checker,
sync_times = 0, -- 同步次数
running = true,
conf_version = 0, -- 当前数据版本号,可以通过这个判断是否有更新
values = nil, -- 所有同步过来的数据
need_reload = true, -- 注意这里,首次 new 的时候,need_reload 是 true
watching_stream = nil,
routes_hash = nil,
prev_index = 0, -- 上一次执行同步etcd 返回的 modifiedIndex
last_err = nil,
last_err_time = nil,
resync_delay = resync_delay,
health_check_timeout = health_check_timeout,
timeout = timeout, -- 注意,这里没有设置timeout, 那么在连接的时候,http_waitdir 将使用代码中的默认值 ` local ok, err = self.watch_sema:wait(timeout or 60)`
single_item = single_item,
filter = filter_fun,
}, mt)
if automatic then
......
-- 如果 之前加载过,直接拿上一次数据全量加载
if loaded_configuration[key] then
local res = loaded_configuration[key]
loaded_configuration[key] = nil -- tried to load
log.notice("use loaded configuration ", key)
local dir_res, headers = res.body, res.headers
load_full_data(obj, dir_res, headers)
end
-- 启动一个定时器, local ngx_timer_at = ngx.timer.at
ngx_timer_at(0, _automatic_fetch, obj)
else
-- 初始化 etcd_cli
local etcd_cli, err = get_etcd()
if not etcd_cli then
return nil, "failed to start an etcd instance: " .. err
end
obj.etcd_cli = etcd_cli
end
-- 将 `/routes` 放入 created_obj
if key then
created_obj[key] = obj
end
return obj
end
在 _automatic_fetch
中, 执行结束前判断如果还在运行时,启动下一个定时器, 从而达到持续同步的目的,ngx.timer.at
更多资料可以阅读 OpenResty最佳实践: 定时任务
local function _automatic_fetch(premature, self)
......
local ok, err = sync_data(self)
......
if not exiting() and self.running then
ngx_timer_at(0, _automatic_fetch, self)
end
end
然后 sync_data 进行数据同步, 注意core.config.new("/routes, {})"
的时候,首次调用 sync_data
时, need_reload=true
所以这里会涉及两个关键函数
- 如果
need_reload=true
, 调用readdir(self.etcd_cli, self.key)
, 全量拉取数据, 最终调用load_full_data(self, dir_res, headers)
- 否则, 调用
waitdir(self)
watch增量变更数据, 更新self.values
以及更新版本self.prev_index = new_ver
- 如果
self.sync_times>100
, 会重建self.values
和self.values_hash
self.conf_version = self.conf_version + 1
(外部可以通过这个判断是否存在更新)
- 如果
local function sync_data(self)
if not self.key then
return nil, "missing 'key' arguments"
end
init_watch_ctx(self.key)
if self.need_reload then
local res, err = readdir(self.etcd_cli, self.key)
if not res then
return false, err
end
local dir_res, headers = res.body.list or res.body.node or {}, res.headers
log.debug("readdir key: ", self.key, " res: ",
json.delay_encode(dir_res))
if self.values then
for i, val in ipairs(self.values) do
config_util.fire_all_clean_handlers(val)
end
self.values = nil
self.values_hash = nil
end
load_full_data(self, dir_res, headers)
return true
end
local dir_res, err = waitdir(self)
if not dir_res then
if err == "compacted" then
self.need_reload = true
log.error("waitdir [", self.key, "] err: ", err,
", will read the configuration again via readdir")
return false
end
return false, err
end
另外,如果 waitdir
的时候,发现 etcd 发生了 compacted
, 将会设置 self.need_reload = true
, 触发全量同步, 可能会带来问题,见下面【问题】
问题
etcd compacted
apisix 同步 etcd, 如果 etcd 发生了 compact, 此时apisix watch 的revision 小于 etcd compact revision, apisix watch 到事件变更会直接触发全量同步, 具体机制参考文档
带来的问题:
-
apisix本身的性能抖动(全量同步后, 会重建radixtree)
- 会导致请求
499
(相当于请求进来,在等待radixtree重建,客户端等不及或配置了timeout, 主动关闭掉了连接)
- 会导致请求
-
etcd 读 IO 波峰 / 内存波峰(所有连接的 worker 都进行了全量拉取)
解决方案:
默认etcd的配置 --auto-compaction-mode=periodic --auto-compaction-retention=5m
, 如果apisix实例很多, 无论怎么配置, apisix 全量同步的概率还是很大;
建议将配置改成 --auto-compaction-mode=revision --auto-compaction-retention=1000
, 默认会保留 1000 个有效的revision
实时更新的问题
压测可以看到,如果持续变更,那么所有请求在同一个时间 rebuild radixtree
2023/10/09 12:10:26 [info] 38#38: *231868 [lua] radixtree.lua:355: pre_insert_route():
2023/10/09 12:10:26 [info] 40#40: *231883 [lua] radixtree.lua:355: pre_insert_route():
2023/10/09 12:10:26 [info] 37#37: *231820 [lua] radixtree.lua:355: pre_insert_route():
2023/10/09 12:10:26 [info] 39#39: *231882 [lua] radixtree.lua:355: pre_insert_route():
如果路由比较多,并且存在路由频繁更新,那么可能带来性能抖动。
因为 watch 变更后,下一次请求对比版本的时候发现不一致,会重建 radixtree, 而请求需要等到重建之后才继续执行;
我提过一个issue help request: Is the radix tree rebuilt every time any route is updated? 讨论这个事情。
最终,我们暂时是通过将重建打散到一个时间范围内,确保所有线上实例不会在同一时间rebuild radixtree, 有需要可以参考 patch
特性追踪
etcd http
在 APISIX’s V3 (2022) Roadmap 中提到 Connect to etcd via gRPC and reduce the number of etcd connection
最早使用 http api 连接 etcd, 意味着每个worker中的每一种资源类型都会存在连接(实例数 * worker 数 * 资源类型数)。
在apisix 3.2.2 合入了一个 PRfeat(config_etcd): use a single long http connection to watch all resources, 使用一个 http connection 来watch 所有资源类型,这样能有效降低连接数(实例数*worker 数, 当然还是可能很大)。但是,这个 PR 引入了一个bug bug: route 404 after upgrade to 3.2.2, 当 etcd prefix 中带了-
会 watch 不到,这个在 3.5.0及以上的版本中被修复 fix: can’t sync etcd data if key has special character
etcd grpc
原先在配置 etcd 连接的时候是支持配置 use_grpc: true
通过 grpc 连接 etcd 的
后来 [DISCUSS] Proposal: APISIX: remove etcd grpc and conf server 的讨论中, this module has too many bugs , 最终在 PR:fix: remove etcd.use_grpc中移除了。
增量更新
社区曾经有个 pr feat: increment route update for radixtree host uri, radixtree uri and radi… ,但是最终被close 了(个人认为这个 PR 还是很重要的)。
PS:近期 apisix和 apisix-ingress-controller github 仓库的 issue 陆续被无差别close(机器人根据时间、活跃自动化执行的), 但是其中有一些是比较重要的 issue。并且比较影响用户的积极性和活跃度,再持续下去可能后续就没多少 issue 了。
依赖库
lua-resty-etcd Nonblocking Lua etcd driver library for OpenResty, this module supports etcd API v3.
涉及函数
readdir:
- syntax:
res, err = cli:readdir(dir:string [, opts:table])
- opts
- timeout: (int) request timeout seconds. Set to 0 would use
lua_socket_connect_timeout
as timeout. (5 seconds) - revision: (int) revision is the point-in-time of the key-value store to use for the range. If revision is less than or equal to zero, the range is over the newest key-value store. If the revision has been compacted, ErrCompacted is returned as a response.
- limit: (int) limit is a limit on the number of keys returned for the request. When limit is set to 0, it is treated as no limit.
- sort_order: (int [SortNone:0, SortAscend:1, SortDescend:2]) sort_order is the order for returned sorted results.
- sort_target: (int [SortByKey:0, SortByVersion:1, SortByCreateRevision:2, SortByModRevision:3, SortByValue:4]) sort_target is the key-value field to use for sorting.
- keys_only: (bool) keys_only when set returns only the keys and not the values.
- count_only: (bool) count_only when set returns only the count of the keys in the range.
- timeout: (int) request timeout seconds. Set to 0 would use
特别注意,当revision被compacted
,将会返回异常 ErrCompacted
watchdir
- syntax:
res, err = cli:watchdir(dir:string [, opts:table])
- opts: optional options.
- timeout: (int) request timeout seconds. Set to 0 would use
lua_socket_connect_timeout
as timeout. (5 seconds) - start_revision: (int) start_revision is an optional revision to watch from (inclusive). No start_revision is “now”.
- progress_notify: (bool) progress_notify is set so that the etcd server will periodically send a WatchResponse with no events to the new watcher if there are no recent events.
- filters: (slice of [enum FilterType {NOPUT = 0;NODELETE = 1;}]) filters filter the events at server side before it sends back to the watcher.
- prev_kv: (bool) If prev_kv is set, created watcher gets the previous KV before the event happens. If the previous KV is already compacted, nothing will be returned.
- watch_id: (int) If watch_id is provided and non-zero, it will be assigned to this watcher. Since creating a watcher in etcd is not a synchronous operation, this can be used to ensure that ordering is correct when creating multiple watchers on the same stream. Creating a watcher with an ID already in use on the stream will cause an error to be returned.
- fragment: (bool) fragment enables splitting large revisions into multiple watch responses.
- timeout: (int) request timeout seconds. Set to 0 would use
注意, watchdir 默认如果超过 5s
没有结束,会直接timeout异常