apisix etcd机制

基于 3.10.0 版本

机制

0. 入口

在 apisix 的ngx_tpl.lua中

    init_worker_by_lua_block {
        apisix.http_init_worker()
    }

apisix/init.lua

local router          = require("apisix.router")

function _M.http_init_worker()
    .....
    router.http_init_worker()
    .....
end

1. http_init_worker

apisix/router.lua

function _M.http_init_worker()
    local conf = core.config.local_conf()
    local router_http_name = "radixtree_uri"

    if conf and conf.apisix and conf.apisix.router then
        router_http_name = conf.apisix.router.http or router_http_name
        ......
    end

    local router_http = require("apisix.http.router." .. router_http_name)
    attach_http_router_common_methods(router_http) -- 这里 attach 两个方法给到模块
    router_http.init_worker(filter) -- 这里调用 init_worker 完成初始化
    _M.router_http = router_http

    ......
end

其中 attach_http_router_common_methods

local function attach_http_router_common_methods(http_router)
    if http_router.routes == nil then
        http_router.routes = function ()
            if not http_router.user_routes then
                return nil, nil
            end
            -- 返回模块中的 user_routes 的值值和版本
            local user_routes = http_router.user_routes
            return user_routes.values, user_routes.conf_version
        end
    end

    if http_router.init_worker == nil then
        http_router.init_worker = function (filter)
            http_router.user_routes = http_route.init_worker(filter)
            -- 这里引用 local http_route = require("apisix.http.route") 的 init_worker
            -- 获取 routes, 赋值给模块中的 user_routes
        end
    end
end

apisix/http/route.lua

http_route.init_workercore.config.new("/routes", {})

function _M.init_worker(filter)
    local user_routes, err = core.config.new("/routes", {
            automatic = true,
            item_schema = core.schema.route,
            checker = check_route,
            filter = filter,
        })
    if not user_routes then
        error("failed to create etcd instance for fetching /routes : " .. err)
    end

    return user_routes
end

2. 具体某个router.lua

apisix/http/router
├── radixtree_host_uri.lua
├── radixtree_uri.lua
└── radixtree_uri_with_parameter.lua

即,如果配置文件中有配置 apisix.router.http, 那么将会使用配置文件中的,否则使用 radixtree_uri

假设我们配置了 apisix.router.http=radixtree_uri_with_parameter

local router_http = require("apisix.http.router.radixtree_uri_with_parameter")

对应代码

local require = require
local core = require("apisix.core")
local base_router = require("apisix.http.route")
local get_services = require("apisix.http.service").services
local cached_router_version
local cached_service_version

local _M = {}

local uri_routes = {}
local uri_router

function _M.match(api_ctx)
    local user_routes = _M.user_routes
    local _, service_version = get_services()
    if not cached_router_version or cached_router_version ~= user_routes.conf_version
        or not cached_service_version or cached_service_version ~= service_version
    then
        uri_router = base_router.create_radixtree_uri_router(user_routes.values,
                                                             uri_routes, true)
        cached_router_version = user_routes.conf_version
        cached_service_version = service_version
    end

    if not uri_router then
        core.log.error("failed to fetch valid `uri_with_parameter` router: ")
        return true
    end

    return _M.matching(api_ctx)
end


function _M.matching(api_ctx)
    core.log.info("route match mode: radixtree_uri_with_parameter")
    return base_router.match_uri(uri_router, api_ctx)
end


return _M

说明:

  1. 声明了 match(api_ctx)matching(api_ctx), 并且前者调用后者
  2. 声明了模块变量 user_routes,在启动时,通过 http_route.init_worker(filter) 被初始化
  3. 声明了模块变量 cached_router_version/cached_service_version, 用于判断当前 router 是否最新,如果 etcd watch到变更,那么会导致这里判断版本不一致,进而调用 base_router.create_radixtree_uri_router(user_routes.values, uri_routes, true)重建radixtree
  4. 这个过程是在 match中,意味着,数据面在处理请求的时候,执行match,发现service或route有更新,此时会发生radixtree重建

这个可能会导致问题,见下面【特性跟踪-实时更新】

我们回到原点,关注 user_routes 的初始化

3. user_routes

apisix/http/route.lua

引用 local http_route = require("apisix.http.route")init_worker

function _M.init_worker(filter)
    local user_routes, err = core.config.new("/routes", {
            automatic = true,  -- 注意这个标志
            item_schema = core.schema.route,
            checker = check_route,
            filter = filter,
        })
    if not user_routes then
        error("failed to create etcd instance for fetching /routes : " .. err)
    end

    return user_routes
end

apisix/core.lua

local config_provider = local_conf.deployment and local_conf.deployment.config_provider
                      or "etcd"
log.info("use config_provider: ", config_provider)
local config = require("apisix.core.config_" .. config_provider)
config.type = config_provider

return {
  config      = config,
}

配置的config_provider=etcd, 则

local config = require("apisix.core.config_etcd")

则调用的new方法是 apisix/core/config_etcd.lua

---
-- Create a new connection to communicate with the control plane.
-- This function should be used in the `init_worker_by_lua` phase.
--

function _M.new(key, opts)
    ......
    -- etcd 相关配置
    local etcd_conf = local_conf.etcd
    local prefix = etcd_conf.prefix
    local resync_delay = etcd_conf.resync_delay
    if not resync_delay or resync_delay < 0 then
        resync_delay = 5
    end
    local health_check_timeout = etcd_conf.health_check_timeout
    if not health_check_timeout or health_check_timeout < 0 then
        health_check_timeout = 10
    end

    local automatic = opts and opts.automatic
    -- 初始化对象
    local obj = setmetatable({
        etcd_cli = nil,
        key = key and prefix .. key,
        automatic = automatic,  -- true
        item_schema = item_schema,
        checker = checker,
        sync_times = 0,   -- 同步次数
        running = true,
        conf_version = 0,  -- 当前数据版本号,可以通过这个判断是否有更新
        values = nil,      -- 所有同步过来的数据
        need_reload = true,  -- 注意这里,首次 new 的时候,need_reload 是 true
        watching_stream = nil,
        routes_hash = nil,
        prev_index = 0,     -- 上一次执行同步etcd 返回的 modifiedIndex
        last_err = nil,
        last_err_time = nil,
        resync_delay = resync_delay,
        health_check_timeout = health_check_timeout,
        timeout = timeout,  -- 注意,这里没有设置timeout, 那么在连接的时候,http_waitdir 将使用代码中的默认值 ` local ok, err = self.watch_sema:wait(timeout or 60)`
        single_item = single_item,
        filter = filter_fun,
    }, mt)

    if automatic then
        ......
        -- 如果 之前加载过,直接拿上一次数据全量加载
        if loaded_configuration[key] then
            local res = loaded_configuration[key]
            loaded_configuration[key] = nil -- tried to load

            log.notice("use loaded configuration ", key)

            local dir_res, headers = res.body, res.headers
            load_full_data(obj, dir_res, headers)
        end
   
        -- 启动一个定时器, local ngx_timer_at = ngx.timer.at
        ngx_timer_at(0, _automatic_fetch, obj)

    else
        -- 初始化 etcd_cli
        local etcd_cli, err = get_etcd()
        if not etcd_cli then
            return nil, "failed to start an etcd instance: " .. err
        end
        obj.etcd_cli = etcd_cli
    end

    -- 将 `/routes` 放入 created_obj
    if key then
        created_obj[key] = obj
    end

    return obj
end
    

_automatic_fetch 中, 执行结束前判断如果还在运行时,启动下一个定时器, 从而达到持续同步的目的,ngx.timer.at更多资料可以阅读 OpenResty最佳实践: 定时任务

local function _automatic_fetch(premature, self)
            ......
            local ok, err = sync_data(self)
            ......
            
    if not exiting() and self.running then
        ngx_timer_at(0, _automatic_fetch, self)
    end
end

然后 sync_data 进行数据同步, 注意core.config.new("/routes, {})"的时候,首次调用 sync_data时, need_reload=true

所以这里会涉及两个关键函数

  • 如果 need_reload=true, 调用 readdir(self.etcd_cli, self.key), 全量拉取数据, 最终调用 load_full_data(self, dir_res, headers)
  • 否则, 调用 waitdir(self) watch增量变更数据, 更新 self.values 以及更新版本 self.prev_index = new_ver
    • 如果self.sync_times>100, 会重建 self.valuesself.values_hash
    • self.conf_version = self.conf_version + 1 (外部可以通过这个判断是否存在更新)
local function sync_data(self)
    if not self.key then
        return nil, "missing 'key' arguments"
    end

    init_watch_ctx(self.key)

    if self.need_reload then
        local res, err = readdir(self.etcd_cli, self.key)
        if not res then
            return false, err
        end

        local dir_res, headers = res.body.list or res.body.node or {}, res.headers
        log.debug("readdir key: ", self.key, " res: ",
                  json.delay_encode(dir_res))

        if self.values then
            for i, val in ipairs(self.values) do
                config_util.fire_all_clean_handlers(val)
            end

            self.values = nil
            self.values_hash = nil
        end

        load_full_data(self, dir_res, headers)

        return true
    end

    local dir_res, err = waitdir(self)

    if not dir_res then
        if err == "compacted" then
            self.need_reload = true
            log.error("waitdir [", self.key, "] err: ", err,
                     ", will read the configuration again via readdir")
            return false
        end

        return false, err
    end

另外,如果 waitdir的时候,发现 etcd 发生了 compacted, 将会设置 self.need_reload = true, 触发全量同步, 可能会带来问题,见下面【问题】

问题

etcd compacted

apisix 同步 etcd, 如果 etcd 发生了 compact, 此时apisix watch 的revision 小于 etcd compact revision, apisix watch 到事件变更会直接触发全量同步, 具体机制参考文档

带来的问题:

  • apisix本身的性能抖动(全量同步后, 会重建radixtree)

    • 会导致请求 499(相当于请求进来,在等待radixtree重建,客户端等不及或配置了timeout, 主动关闭掉了连接)
  • etcd 读 IO 波峰 / 内存波峰(所有连接的 worker 都进行了全量拉取)

解决方案: 默认etcd的配置 --auto-compaction-mode=periodic --auto-compaction-retention=5m, 如果apisix实例很多, 无论怎么配置, apisix 全量同步的概率还是很大;

建议将配置改成 --auto-compaction-mode=revision --auto-compaction-retention=1000, 默认会保留 1000 个有效的revision

实时更新的问题

压测可以看到,如果持续变更,那么所有请求在同一个时间 rebuild radixtree

2023/10/09 12:10:26 [info] 38#38: *231868 [lua] radixtree.lua:355: pre_insert_route(): 
2023/10/09 12:10:26 [info] 40#40: *231883 [lua] radixtree.lua:355: pre_insert_route(): 
2023/10/09 12:10:26 [info] 37#37: *231820 [lua] radixtree.lua:355: pre_insert_route(): 
2023/10/09 12:10:26 [info] 39#39: *231882 [lua] radixtree.lua:355: pre_insert_route(): 

如果路由比较多,并且存在路由频繁更新,那么可能带来性能抖动。

因为 watch 变更后,下一次请求对比版本的时候发现不一致,会重建 radixtree, 而请求需要等到重建之后才继续执行;

我提过一个issue help request: Is the radix tree rebuilt every time any route is updated? 讨论这个事情。

最终,我们暂时是通过将重建打散到一个时间范围内,确保所有线上实例不会在同一时间rebuild radixtree, 有需要可以参考 patch

特性追踪

etcd http

APISIX’s V3 (2022) Roadmap 中提到 Connect to etcd via gRPC and reduce the number of etcd connection

最早使用 http api 连接 etcd, 意味着每个worker中的每一种资源类型都会存在连接(实例数 * worker 数 * 资源类型数)。

在apisix 3.2.2 合入了一个 PRfeat(config_etcd): use a single long http connection to watch all resources, 使用一个 http connection 来watch 所有资源类型,这样能有效降低连接数(实例数*worker 数, 当然还是可能很大)。但是,这个 PR 引入了一个bug bug: route 404 after upgrade to 3.2.2, 当 etcd prefix 中带了-会 watch 不到,这个在 3.5.0及以上的版本中被修复 fix: can’t sync etcd data if key has special character

etcd grpc

原先在配置 etcd 连接的时候是支持配置 use_grpc: true 通过 grpc 连接 etcd 的

后来 [DISCUSS] Proposal: APISIX: remove etcd grpc and conf server 的讨论中, this module has too many bugs , 最终在 PR:fix: remove etcd.use_grpc中移除了。

增量更新

社区曾经有个 pr feat: increment route update for radixtree host uri, radixtree uri and radi… ,但是最终被close 了(个人认为这个 PR 还是很重要的)。

PS:近期 apisix和 apisix-ingress-controller github 仓库的 issue 陆续被无差别close(机器人根据时间、活跃自动化执行的), 但是其中有一些是比较重要的 issue。并且比较影响用户的积极性和活跃度,再持续下去可能后续就没多少 issue 了。

依赖库

lua-resty-etcd Nonblocking Lua etcd driver library for OpenResty, this module supports etcd API v3.

涉及函数

readdir:

  • syntax: res, err = cli:readdir(dir:string [, opts:table])
  • opts
    • timeout: (int) request timeout seconds. Set to 0 would use lua_socket_connect_timeout as timeout. (5 seconds)
    • revision: (int) revision is the point-in-time of the key-value store to use for the range. If revision is less than or equal to zero, the range is over the newest key-value store. If the revision has been compacted, ErrCompacted is returned as a response.
    • limit: (int) limit is a limit on the number of keys returned for the request. When limit is set to 0, it is treated as no limit.
    • sort_order: (int [SortNone:0, SortAscend:1, SortDescend:2]) sort_order is the order for returned sorted results.
    • sort_target: (int [SortByKey:0, SortByVersion:1, SortByCreateRevision:2, SortByModRevision:3, SortByValue:4]) sort_target is the key-value field to use for sorting.
    • keys_only: (bool) keys_only when set returns only the keys and not the values.
    • count_only: (bool) count_only when set returns only the count of the keys in the range.

特别注意,当revision被compacted,将会返回异常 ErrCompacted

watchdir

  • syntax: res, err = cli:watchdir(dir:string [, opts:table])
  • opts: optional options.
    • timeout: (int) request timeout seconds. Set to 0 would use lua_socket_connect_timeout as timeout. (5 seconds)
    • start_revision: (int) start_revision is an optional revision to watch from (inclusive). No start_revision is “now”.
    • progress_notify: (bool) progress_notify is set so that the etcd server will periodically send a WatchResponse with no events to the new watcher if there are no recent events.
    • filters: (slice of [enum FilterType {NOPUT = 0;NODELETE = 1;}]) filters filter the events at server side before it sends back to the watcher.
    • prev_kv: (bool) If prev_kv is set, created watcher gets the previous KV before the event happens. If the previous KV is already compacted, nothing will be returned.
    • watch_id: (int) If watch_id is provided and non-zero, it will be assigned to this watcher. Since creating a watcher in etcd is not a synchronous operation, this can be used to ensure that ordering is correct when creating multiple watchers on the same stream. Creating a watcher with an ID already in use on the stream will cause an error to be returned.
    • fragment: (bool) fragment enables splitting large revisions into multiple watch responses.

注意, watchdir 默认如果超过 5s没有结束,会直接timeout异常