apisix 中的负载均衡
基于 3.10.0 版本
机制
0. 入口
在 apisix 的ngx_tpl.lua中
-- 初始化
init_worker_by_lua_block {
apisix.http_init_worker()
}
-- balance
balancer_by_lua_block {
apisix.http_balancer_phase()
}
1. 初始化
local router = require("apisix.router")
local load_balancer
function _M.http_init_worker()
.....
-- 这个什么都没做
require("apisix.balancer").init_worker()
-- 这个这个是核心逻辑
load_balancer = require("apisix.balancer")
.....
end
2. 执行入口
在 balance 阶段,进入http_balancer_phase
function _M.http_balancer_phase()
......
load_balancer.run(api_ctx.matched_route, api_ctx, common_phase)
end
进入
function _M.run(route, ctx, plugin_funcs)
local server, err
if ctx.picked_server then
-- use the server picked in the access phase
server = ctx.picked_server
ctx.picked_server = nil
set_balancer_opts(route, ctx)
else
-- retry 相关,暂时忽略
......
end
local ok, err = set_current_peer(server, ctx)
if not ok then
return core.response.exit(502)
end
ctx.proxy_passed = true
end
3. api_ctx.picked_server
实际上,在 http_access_phase
就已经做了 pick_server
function _M.http_access_phase()
......
_M.handle_upstream(api_ctx, route, enable_websocket)
end
function _M.handle_upstream(api_ctx, route, enable_websocket)
......
local server, err = load_balancer.pick_server(route, api_ctx)
if not server then
core.log.error("failed to pick server: ", err)
return core.response.exit(502)
end
api_ctx.picked_server = server
......
4. load_balancer.pick_server
-- pick_server will be called:
-- 1. in the access phase so that we can set headers according to the picked server
-- 2. each time we need to retry upstream
local function pick_server(route, ctx)
local up_conf = ctx.upstream_conf
local nodes_count = #up_conf.nodes
-- 如果只有一个 node, 直接返回
if nodes_count == 1 then
local node = up_conf.nodes[1]
ctx.balancer_ip = node.host
ctx.balancer_port = node.port
node.upstream_host = parse_server_for_upstream_host(node, ctx.upstream_scheme)
return node
end
-- the same picker will be used in the whole request, especially during the retry
local server_picker = ctx.server_picker
if not server_picker then
-- 从lrucache中获取,没有的话创建一个 create_server_picker
server_picker = lrucache_server_picker(key, version,
create_server_picker, up_conf, checker)
end
if not server_picker then
return nil, "failed to fetch server picker"
end
-- 获取一个
local server, err = server_picker.get(ctx)
if not server then
err = err or "no valid upstream node"
return nil, "failed to find valid upstream server, " .. err
end
ctx.balancer_server = server
local function create_server_picker(upstream, checker)
-- 根据配置加载对应的 负载均衡算法实现模块
local picker = pickers[upstream.type]
if not picker then
pickers[upstream.type] = require("apisix.balancer." .. upstream.type)
picker = pickers[upstream.type]
end
if picker then
......
local server_picker = picker.new(up_nodes[up_nodes._priority_index[1]], upstream)
return server_picker
end
the balance algorithm chash/ewma/least_conn/priority/roundrobin
以 roundrobin 为例 apisix/balancer/roundrobin.lua
function _M.new(up_nodes, upstream)
local safe_limit = 0
for _, weight in pairs(up_nodes) do
-- the weight can be zero
safe_limit = safe_limit + weight + 1
end
local picker = roundrobin:new(up_nodes)
local nodes_count = nkeys(up_nodes)
return {
upstream = upstream,
get = function (ctx)
if ctx.balancer_tried_servers and ctx.balancer_tried_servers_count == nodes_count then
return nil, "all upstream servers tried"
end
local server, err
for i = 1, safe_limit do
server, err = picker:find()
if not server then
return nil, err
end
if ctx.balancer_tried_servers then
if not ctx.balancer_tried_servers[server] then
break
end
else
break
end
end
return server
end,
after_balance = function (ctx, before_retry)
if not before_retry then
if ctx.balancer_tried_servers then
core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers)
ctx.balancer_tried_servers = nil
end
return nil
end
if not ctx.balancer_tried_servers then
ctx.balancer_tried_servers = core.tablepool.fetch("balancer_tried_servers", 0, 2)
end
-- 标记 及 计数
ctx.balancer_tried_servers[ctx.balancer_server] = true
ctx.balancer_tried_servers_count = (ctx.balancer_tried_servers_count or 0) + 1
end,
before_retry_next_priority = function (ctx)
if ctx.balancer_tried_servers then
core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers)
ctx.balancer_tried_servers = nil
end
ctx.balancer_tried_servers_count = 0
end,
}
end
这几个函数在哪里被调用的?
get
: 在pick_server(route, ctx)
new 得到一个实例后, 立马get
,local server, err = server_picker.get(ctx)
并且执行赋值ctx.balancer_server = server
after_balance
: 在 http_log_phase 阶段, 请求结束后, 调用的api_ctx.server_picker.after_balance(api_ctx, false)
; 另外,pick_server
的retry过程中, 也会ctx.server_picker.after_balance(ctx, true)
before_retry_next_priority
: balancer/priority.lua 中调用的,priority.get(ctx) -> picker.before_retry_next_priority;
在逐一确认priority的过程中, 调用这个函数