apisix 中的服务发现机制

基于 3.10.0 版本

机制

0. 入口

在 apisix 的ngx_tpl.lua中

    init_worker_by_lua_block {
        apisix.http_init_worker()
    }

apisix/init.lua

local router          = require("apisix.router")

function _M.http_init_worker()
    .....
    local discovery = require("apisix.discovery.init").discovery
    if discovery and discovery.init_worker then
        discovery.init_worker()
    end
    .....
end

1. discovery.init_worker

apisix/discovery/init.lua

local discovery_type = local_conf.discovery
local discovery = {}

if discovery_type then
    for discovery_name, _ in pairs(discovery_type) do
        log.info("use discovery: ", discovery_name)
        discovery[discovery_name] = require("apisix.discovery." .. discovery_name)
    end
end

function discovery.init_worker()
    if discovery_type then
        for discovery_name, _ in pairs(discovery_type) do
            discovery[discovery_name].init_worker()
        end
    end
end

根据配置文件中配置的服务发现类型,加载对应的模块,在 init_worker中调用对应模块的init_worker

假设配置文件 config.yaml 中我们配置的是 dns (最简单的实现,其他的实现原理上一一样的)

discovery:                      # Service Discovery
  dns:
    servers:
      - "127.0.0.1:8600"         # Replace with the address of your DNS server.
    resolv_conf: /etc/resolv.conf # Replace with the path to the local DNS resolv config. Configure either "servers" or "resolv_conf".
    order:                       # Resolve DNS records this order.
      - last                     # Try the latest successful type for a hostname.
      - SRV
      - A
      - AAAA
      - CNAME

2. apisix.discovery.dns

apisix/discovery/dns/init.lua

一共两个方法 (实现其他类型服务发现同理需要实现这两个方法)

  1. init_worker() 初始化,根据配置,构建一个 client
  2. nodes(service_name) 根据 service_name, 使用client做服务发现,返回 service_name 对应可用的 nodes
function _M.init_worker()
    ......
    local client, err = core.dns_client.new(opts)
    if not client then
        error("failed to init the dns client: ", err)
        return
    end

    dns_client = client
end

unction _M.nodes(service_name)
    local host, port = core.utils.parse_addr(service_name)
    core.log.info("discovery dns with host ", host, ", port ", port)

    local records, err = dns_client:resolve(host, core.dns_client.RETURN_ALL)
    if not records then
        return nil, err
    end

    local nodes = core.table.new(#records, 0)
    local index = 1
    for _, r in ipairs(records) do
        ......
    end

    return nodes
end

nodes示例

[
  {
    "host": "192.168.1.100",
    "port": 8761,
    "weight": 100,
    "metadata": {
      "management.port": "8761"
    }
  }
]

3. 调用点在哪里?

apisix/init.lua

local set_upstream    = apisix_upstream.set_by_route

-- from access phase
function _M.http_access_phase()
      ......
      _M.handle_upstream(api_ctx, route, enable_websocket)
end

function _M.handle_upstream(api_ctx, route, enable_websocket)
    ......
    local code, err = set_upstream(route, api_ctx)
    ......
end

apisix/upstream.lua

function _M.set_by_route(route, api_ctx)
    ......
    -- 如果 upstream 中有配置 service_name, 则需要服务发现
    if up_conf.service_name then
        ......
        -- 获取服务发现类型对应的实例
        local dis = discovery[up_conf.discovery_type]
        -- 根据service_name 获取 nodes
        local new_nodes, err = dis.nodes(up_conf.service_name, up_conf.discovery_args)
        -- 跟之前的比较是否一致
        local same = upstream_util.compare_upstream_node(up_conf, new_nodes)
        if not same then
            ......
            -- 这里设置了 新的节点
            up_conf.nodes = new_nodes
            up_conf.original_nodes = up_conf.nodes

            local new_up_conf = core.table.clone(up_conf)

            local parent = up_conf.parent
            if parent.value.upstream then
                -- the up_conf comes from route or service
                parent.value.upstream = new_up_conf
            else
                parent.value = new_up_conf
            end
            up_conf = new_up_conf
        end
    end

    local id = up_conf.parent.value.id
    local conf_version = up_conf.parent.modifiedIndex
    -- include the upstream object as part of the version, because the upstream will be changed
    -- by service discovery or dns resolver.
    set_directly(api_ctx, id, conf_version .. "#" .. tostring(up_conf), up_conf)
local function set_directly(ctx, key, ver, conf)
    ......
    -- 注意这里,更新了 api_ctx 的这几个字段
    ctx.upstream_conf = conf
    ctx.upstream_version = ver
    ctx.upstream_key = key
    return
end

apisix/balancer.lua

-- pick_server will be called:
-- 1. in the access phase so that we can set headers according to the picked server
-- 2. each time we need to retry upstream
local function pick_server(route, ctx)
    ......
    local version = ctx.upstream_version
    local key = ctx.upstream_key
    local checker = ctx.up_checker

    -- the same picker will be used in the whole request, especially during the retry
    local server_picker = ctx.server_picker
    if not server_picker then
        server_picker = lrucache_server_picker(key, version,
                                               create_server_picker, up_conf, checker)
    end
    ......

    local server, err = server_picker.get(ctx)

这里,如果服务发现导致 upstream_key/upstream_version 变化,那么意味着 server_picker 对应的 lrucache 会失效,进入 create_server_picker的逻辑

数据面服务发现的问题

如果服务发现获取上游的 node 数据变更非常频繁

  1. 会被对比出来,有差异
  2. 会多一些赋值
  3. 负载均衡lrucache会失效重建,重建时,如果配置有主动健康检查,还会触发主动健康检查获取健康的节点列表

这样会导致服务出现抖动。

基于控制面的服务发现

官方有一个基于控制面的服务发现项目 api7/apisix-seed: Do service discovery on the CP side

通过订阅 service_name的变更, 获取服务发现的变更,写入到etcd

相关文档