apisix 中的负载均衡

基于 3.10.0 版本

机制

0. 入口

在 apisix 的ngx_tpl.lua中

    -- 初始化
    init_worker_by_lua_block {
        apisix.http_init_worker()
    }

        -- balance
        balancer_by_lua_block {
            apisix.http_balancer_phase()
        }

1. 初始化

apisix/init.lua

local router          = require("apisix.router")

local load_balancer

function _M.http_init_worker()
    .....
    -- 这个什么都没做
    require("apisix.balancer").init_worker()
    -- 这个这个是核心逻辑
    load_balancer = require("apisix.balancer")
  

    .....
end

2. 执行入口

在 balance 阶段,进入http_balancer_phase

apisix/init.lua

function _M.http_balancer_phase()
    ......

    load_balancer.run(api_ctx.matched_route, api_ctx, common_phase)
end

进入

apisix/balancer.lua

function _M.run(route, ctx, plugin_funcs)
    local server, err

    if ctx.picked_server then
        -- use the server picked in the access phase
        server = ctx.picked_server
        ctx.picked_server = nil

        set_balancer_opts(route, ctx)

    else
        -- retry 相关,暂时忽略
        ......
    end

    local ok, err = set_current_peer(server, ctx)
    if not ok then
        return core.response.exit(502)
    end

    ctx.proxy_passed = true
end

3. api_ctx.picked_server

apisix/init.lua

实际上,在 http_access_phase 就已经做了 pick_server

function _M.http_access_phase()
      ......
      _M.handle_upstream(api_ctx, route, enable_websocket)
end

function _M.handle_upstream(api_ctx, route, enable_websocket)
    ......
    local server, err = load_balancer.pick_server(route, api_ctx)
    if not server then
        core.log.error("failed to pick server: ", err)
        return core.response.exit(502)
    end

    api_ctx.picked_server = server
    ......
  

4. load_balancer.pick_server

apisix/balancer.lua

-- pick_server will be called:
-- 1. in the access phase so that we can set headers according to the picked server
-- 2. each time we need to retry upstream
local function pick_server(route, ctx)
    local up_conf = ctx.upstream_conf
    
    local nodes_count = #up_conf.nodes
    -- 如果只有一个 node, 直接返回
    if nodes_count == 1 then
        local node = up_conf.nodes[1]
        ctx.balancer_ip = node.host
        ctx.balancer_port = node.port
        node.upstream_host = parse_server_for_upstream_host(node, ctx.upstream_scheme)
        return node
    end

    -- the same picker will be used in the whole request, especially during the retry
    local server_picker = ctx.server_picker
    if not server_picker then
        -- 从lrucache中获取,没有的话创建一个 create_server_picker
        server_picker = lrucache_server_picker(key, version,
                                               create_server_picker, up_conf, checker)
    end
    if not server_picker then
        return nil, "failed to fetch server picker"
    end

    -- 获取一个
    local server, err = server_picker.get(ctx)
    if not server then
        err = err or "no valid upstream node"
        return nil, "failed to find valid upstream server, " .. err
    end
    ctx.balancer_server = server
  
  
local function create_server_picker(upstream, checker)
    -- 根据配置加载对应的 负载均衡算法实现模块
    local picker = pickers[upstream.type]
    if not picker then
        pickers[upstream.type] = require("apisix.balancer." .. upstream.type)
        picker = pickers[upstream.type]
    end
  
    if picker then
        ......
        local server_picker = picker.new(up_nodes[up_nodes._priority_index[1]], upstream)

        return server_picker
    end  
  

the balance algorithm chash/ewma/least_conn/priority/roundrobin

以 roundrobin 为例 apisix/balancer/roundrobin.lua


function _M.new(up_nodes, upstream)
    local safe_limit = 0
    for _, weight in pairs(up_nodes) do
        -- the weight can be zero
        safe_limit = safe_limit + weight + 1
    end

    local picker = roundrobin:new(up_nodes)
    local nodes_count = nkeys(up_nodes)
    return {
        upstream = upstream,
        get = function (ctx)
            if ctx.balancer_tried_servers and ctx.balancer_tried_servers_count == nodes_count then
                return nil, "all upstream servers tried"
            end

            local server, err
            for i = 1, safe_limit do
                server, err = picker:find()
                if not server then
                    return nil, err
                end
                if ctx.balancer_tried_servers then
                    if not ctx.balancer_tried_servers[server] then
                        break
                    end
                else
                    break
                end
            end

            return server
        end,
        after_balance = function (ctx, before_retry)
            if not before_retry then
                if ctx.balancer_tried_servers then
                    core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers)
                    ctx.balancer_tried_servers = nil
                end

                return nil
            end

            if not ctx.balancer_tried_servers then
                ctx.balancer_tried_servers = core.tablepool.fetch("balancer_tried_servers", 0, 2)
            end
            -- 标记 及 计数
            ctx.balancer_tried_servers[ctx.balancer_server] = true
            ctx.balancer_tried_servers_count = (ctx.balancer_tried_servers_count or 0) + 1
        end,
        before_retry_next_priority = function (ctx)
            if ctx.balancer_tried_servers then
                core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers)
                ctx.balancer_tried_servers = nil
            end

            ctx.balancer_tried_servers_count = 0
        end,
    }
end

这几个函数在哪里被调用的?

  • get: 在pick_server(route, ctx) new 得到一个实例后, 立马get, local server, err = server_picker.get(ctx)并且执行赋值ctx.balancer_server = server
  • after_balance: 在 http_log_phase 阶段, 请求结束后, 调用的api_ctx.server_picker.after_balance(api_ctx, false); 另外, pick_server的retry过程中, 也会ctx.server_picker.after_balance(ctx, true)
  • before_retry_next_priority: balancer/priority.lua 中调用的, priority.get(ctx) -> picker.before_retry_next_priority; 在逐一确认priority的过程中, 调用这个函数