Skip to content

[Feature] 添加LLM Token限流功能 | Add LLM Token Rate Limit #596

@dancing-ui

Description

@dancing-ui

Issue Description

Type: feature request

Describe what feature you want

背景

Sentinel 作为面向分布式应用场景、多语言异构化服务架构的流量治理组件,以丰富的流量防护能力满足了各种应用场景的限流需求。当下 AI 应用成为广大开发者关注的领域,但是想要将 AI 应用真正用于生产,高可用的能力是必不可少的,由此也出现了很多 AI 应用场景下新的流量防护需求,例如 Token 限流,Token 这个 AI 场景下的常用单位,在作为限流的统计维度时存在着限流时机与统计时机不一致,强需求集群限流等特点。

Token限流现状

AI网关Token限流能力

现在已经有多个AI网关(如Kong、Higress等)以插件的方式实现了LLM Token限流能力,限流算法大多使用固定窗口或滑动窗口,限流过程与传统限流的区别在于:限流信息的更新依赖于下游服务的调用结果

竞品分析

项目名称 Kong Higress APISIX Envoy AI Gateway
简介 企业级 API 网关,提供商业化 AI 限流能力 开源云原生网关,源自阿里云实践 轻量级开源 API 网关 基于 Envoy 的可扩展 AI 网关
Token 维度 总/输入/输出/自定义成本 Token 仅总 Token 输入/输出/总 Token CEL 表达式
存储策略 内存/内置 DB/Redis Redis 强依赖 本地 LRU 缓存 内存/Redis
请求维度 IP/凭证/消费者/服务/请求头等 IP/参数/请求头/Consumer 仅模型名称 请求头/URL 参数/IP
限流算法 固定+滑动窗口(秒级) 固定窗口(秒至天) 固定窗口(秒级) Envoy 原生限流
响应头部 丰富 X-AI-RateLimit-* 信息 基础限流头部 基础限流头部
核心优势 多维度控制;成本限流;数据一致性 长周期限流;开源免费 轻量部署;实时反馈 CEL 灵活定制;存储可选
主要不足 闭源付费;无降级策略 无输入输出区分;强依赖 Redis;无降级策略 无集群同步;扩展性差;无降级策略 配置复杂;无响应头部;无降级策略
开源情况 企业版闭源 开源 开源 开源

计划功能点对比Image

考虑到LLM应用的强集群需求,所以本issue不实现单机模式

总结

尽管当前AI网关实现的Token限流能力已经非常多样化,但是仍存在功能点分散、流量临界、无法应对突发流量等问题,无法满足LLM开发框架(如LangChainGoEino等)实际需要的Token限流需求。

因此,本issue希望在 Sentinel 中实现基础的 Token 限流能力,并在此基础上,提供多样化的限流策略,限制 AI 应用的 Token 消耗,解决流量临界问题,应对突发流量,保障 AI 应用的整体稳定性。

预期提供的API

本issue预期提供Token限流相关的功能点如下:

  • 核心功能:集群模式下的基础Token限流能力
  • 实现1~2个LLM开发框架的适配器
  • 扩展功能:预测误差时序分摊(Predictive Error Temporal Amortized,PETA)、LLM用量及限流响应信息

详细设计

总体流程图Image

LLM应用实例主要是指以LangChainGo为代表的LLM应用框架,计划Sentinel以包装函数的方式适配到LLM开发框架中,对模型调用Token进行限流。

基础Token限流

总体时序图Image
功能点列表

基础Token限流方案将包含下列功能点:

  • 支持集群模式
  • 支持多请求维度(优先支持header维度)
  • 支持多时间维度
  • 适配1~2个LLM开发框架
  • 限流算法:固定窗口限流
初始化

在Token限流中,Sentinel需要读取配置文件完成以下初始化工作:

  • 连接Redis
  • 初始化Token限流规则(支持配置文件初始化、提供LoadRules API动态加载)
Entry设计

Sentinel使用Entry函数将业务逻辑封装起来,这一过程称为“埋点”,每个埋点都关联一个资源名称,它标识了触发该资源的调用或访问。函数原型如下:

func Entry(resource string, opts ...Option) (*base.SentinelEntry, *base.BlockError)

在微服务场景下,调用Entry函数时,传入的resource一般是作为接口路径或名称。

一般来说,由于Token限流总是作为网关接收外部请求,Token限流方案的流量类型TrafficType固定为Inbound,其中Inbound代表入口流量,Outbound代表出口流量。实际上,集群模式下的Token限流用不到TrafficType字段。

下面是Entry函数调用示例:

// Wrapper request
reqInfos := llmtokenratelimit.GenerateRequestInfos(
    llmtokenratelimit.WithHeader(header),
    llmtokenratelimit.WithPrompts(prompts),
)
// Check
e, b := sentinel.Entry(resource, sentinel.WithTrafficType(base.Inbound), sentinel.WithArgs(reqInfos))
if b != nil {
    // Blocked
    println("Blocked")
    continue
}
// Pass
println("Pass")
// Simulate llm service
time.Sleep(llmRunningTime)
// Update used token info
entry.SetPair(llmtokenratelimit.KeyUsedTokenInfos, usedTokenInfos)
// Must be executed immediately after the SetPair function
e.Exit()
请求信息包装

为了规范化传入Entry的请求参数信息,计划向用户提供一个参数结构体,用户需将本次请求的相关信息填写到结构体里面,再通过WithArgs方法传入到Entry中。

type RequestInfos struct {
	Headers map[string][]string `json:"headers"`
	Prompts []string            `json:"prompts"`
}
LLM Token消耗信息包装

为了规范化LLM Token消耗信息,计划向用户提供一个参数结构体,用户需将本次LLM请求Token消耗的相关信息填写到结构体里面,再通过提供的API方法更新Token信息。

type UsedTokenInfos struct {
	InputTokens  int64 `json:"inputTokens"`
	OutputTokens int64 `json:"outputTokens"`
	TotalTokens  int64 `json:"totalTokens"`
}

除此之外,为方便使用,计划以模型厂商维度提供一个包装函数,便于用户使用,如下为包装OpenAI token消耗的辅助函数:

func OpenAITokenExtractor(response interface{}) *UsedTokenInfos {
	if response == nil {
		return nil
	}

	resp, ok := response.(map[string]any)
	if !ok {
		return nil
	}

	inputTokens, ok := resp["prompt_tokens"].(int)
	if !ok {
		return nil
	}
	outputTokens, ok := resp["completion_tokens"].(int)
	if !ok {
		return nil
	}
	totalTokens, ok := resp["total_tokens"].(int)
	if !ok {
		return nil
	}

	return GenerateUsedTokenInfos(
		WithInputTokens(inputTokens),
		WithOutputTokens(outputTokens),
		WithTotalTokens(totalTokens),
	)
}
限流算法

下面描述如何使用固定窗口算法实现Token限流的完整过程。

对于用户的每个LLM调用请求,都需要先经过Sentinel的Entry函数,该函数使用责任链模式依次执行提前注册好的限流、熔断等组件初始化、规则检查、调用统计功能(称其为StatPrepareSlot、RuleCheckSlot、StatSlot)。所以,Token限流也需要以同样的方式实现这三类方法。

  • StatPrepareSlot:Token限流不需要在该方法中完成任何事情。
  • RuleCheckSlot:限流规则检查采用分层匹配设计。首先该方法会读取Entry传入的resource,然后将resource与所有已配置的限流规则(配置文件中对应的是resource字段)进行正则匹配。针对每条匹配命中的规则,将会提取每条具体规则项(配置文件中对应的是specificItems字段)与当前请求的相关信息进行具体规则项正则匹配,若匹配未命中,则跳过;否则,将规则项信息通过固定的格式组成Redis Key,然后利用Lua脚本原子读取Redis中剩余的Token数量,若剩余Token数量大于等于0,则放行请求,返回True;否则拒绝请求,返回False。Lua脚本实现如下:
-- KEYS[1]: Fixed Window Key ("<redisRatelimitKey>")

-- ARGV[1]: Maximum Token capacity
-- ARGV[2]: Window size (milliseconds)

local fixed_window_key = KEYS[1]

local max_token_capacity = tonumber(ARGV[1])
local window_size = tonumber(ARGV[2])

local ttl = redis.call('PTTL', fixed_window_key)
if ttl < 0 then
    redis.call('SET', fixed_window_key, max_token_capacity, 'PX', window_size)
    return {max_token_capacity, window_size}
end
return {redis.call('GET', fixed_window_key), ttl}
  • StatSlot:放行请求后,读取实际消耗的Token数,通过SentinelEntry.SetPair方法记录Token消耗,然后在OnCompleted方法中依次遍历命中的具体规则项,根据当前具体规则项的Token计算策略,计算本次消耗的Token数,最后利用Lua脚本和decrby命令原子更新Redis中剩余的Token数量(细节:SetPair方法必须在SentinelEntry.Exit方法前执行,且Exit必须被立即执行,否则会更新失败)。Lua脚本实现如下:
-- KEYS[1]: Fixed Window Key ("<redisRatelimitKey>")

-- ARGV[1]: Maximum Token capacity
-- ARGV[2]: Window size (milliseconds)
-- ARGV[3]: Actual token consumption

local fixed_window_key = KEYS[1]

local max_token_capacity = tonumber(ARGV[1])
local window_size = tonumber(ARGV[2])
local actual = tonumber(ARGV[3])

local ttl = redis.call('PTTL', fixed_window_key)
if ttl < 0 then
    redis.call('SET', fixed_window_key, max_token_capacity-actual, 'PX', window_size)
    return {max_token_capacity-actual, window_size}
end
return {redis.call('DECRBY', fixed_window_key, actual), ttl}
限流规则检查补充例子说明

现在有规则配置如下:

  • 接口前缀为/a/,检查请求中所有header键的前缀是X-CA-、值的前缀是123的键值对在60秒内的总token不超过900,下简称该规则配置为rules-A,对应配置文件如下:
- resource: "/a/*"
  specificItems:
    - identifier:
        type: header
        value: "X-CA-*"
      keyItems:
        - key: "123*"
          token: 
            number: 900
            countStrategy: "total-tokens"
          time:
            unit: second
            value: 60

现在,假设某个请求的接口路径或名称为/a/b,该请求包含了1个header键值对{X-CA-A:123},将/a/b作为resource传入Entry,首先会正则匹配到rules-A.resource/a/*,再往下X-CA-A二次正则匹配到了rules-A.specificItems[0].identifier.valueX-CA-*,再往下123三次正则匹配到了rules-A.specificItems[0].keyItems[0].key123*

到这里为止,我们就认为该请求命中了该具体规则项,对应RedisKey=sentinel-go:llm-token-ratelimit:resource-<hashedResource>:fixed-window:header:60:total-tokens,初始Value=900,由于Value>=0,请求放行。

接着,假设该请求消耗了500 input-tokens500 output-tokens,那么total-tokens=1000,更新Value=-100(允许Value为负)。

再然后,假设又来一个请求的接口名称为/a/c,该请求同样包含了1个header键值对{X-CA-B:123456},依然能够匹配到rules-A,对应RedisKey=sentinel-go:llm-token-ratelimit:resource-<hashedResource>:fixed-window:header:60:total-tokens,但此时Value=-100,由于Value<0,所以拒绝该请求。

综上,我们通过将Rule中的所有统计指标都作为Redis Key的一部分,能够区分同类请求并进行限流。

  • 注:hashedResource = murmur3(resource)
LLM框架适配
获取消耗的Token数

获取Completion方法消耗的Token数,可读取llms.ContentChoice中的GenerationInfo字段,得到消耗的输入、输出、推理(可选)、总Token数。该字段是llm.GenerateContent方法的返回值。

经调研,LangChainGo的embeddings.CreateEmbedding接口返回结果不包含Token消耗结果,以langchaingo/llms/openai/internal/openaiclient/openaiclient.go中实现的CreateEmbedding接口为例,该方法仅返回 [][]float32 作为词向量结果,且其中调用的createEmbedding属于私有方法,不好包装。综上所述,本issue不考虑支持Embedding方法

框架适配

为了LLM框架更方便使用Sentinel提供Token限流能力,计划对部分知名LLM框架提供适配器。

由于未发现LangChainGo存在函数注入的Middleware接口,且LangChainGo提供的回调方法并不专用于GenerateContent方法。综合考虑,采取包装函数方式,传入大模型实例,包装llms.Model接口的GenerateContent、Call(Deprecated)这类文本补全、生成方法

下面以GenerateContent为例,说明如何包装和调用。

func (t *LLMWrapper) GenerateContent(
	ctx context.Context,
	messages []langchaingo.MessageContent,
	options ...langchaingo.CallOption,
) (*langchaingo.ContentResponse, error) {
    // 1.初始化
	// 2. Sentinel限流检查
	// 3. 正常调用模型
	response, err := t.llm.GenerateContent(ctx, messages, options...)
	if err != nil {
		return nil, err
	} 
	// 4. 根据调用结果更新Token数量
    // 5. 返回模型结果
	return response, nil
}

扩展功能项

预测误差时序分摊(Predictive Error Temporal Amortized,PETA)
总体时序图Image
算法描述

为了解决基础Token限流中面临的Token统计滞后性导致的非法请求错放问题,计划提出PETA,改善错放情况。

  • 所谓时序分摊是指将低估的token分摊到未来的窗口中,进而影响后续请求
  • 该算法结合了滑动窗口和类似于令牌桶(没有生成速率,而是回收过期窗口)的方法

基本过程:Token预扣使用外部库(tiktoken-go)计算出可能消耗的Token数量,然后提前更新Redis限流信息,等到实际调用后,再修正结果。

  • 可以影响到 input-tokens、total-tokens
    • 由于total-tokens依赖output-tokens,随机性大,预测效果差,不建议对total-tokens使用该策略
  • output-tokens暂时无法预测,所以影响不到,预估token默认是0
  • 注意:即使命中input-tokens规则,也并不会同时命中total-tokens规则

token计算策略:使用tiktoken-go计算初始输入内容预计消耗token数量;为了能够适应token消耗变化,需要每次Set更新预测值和真实值的差值

  • 使用redis存储差值
    • key格式:"<redisRatelimitKey>:token-encoder:<model-provider>:<model-name>"
      • redisRatelimitKey格式:"sentinel-go:llm-token-ratelimit:<ruleName>:<strategy>:<identifierType>:<timeWindow>:<tokenCountStrategy>"
    • 过期时间与限流策略一致
  • 总结token预测计算策略如下
    • estimatedToken = tiktoken(raw_contents)+query_difference(redis_key)
    • 若estimatedToken 结果为负数,则重置difference为0,estimatedToken = tiktoken(raw_contents)

Token预扣:

-- KEYS[1]: Sliding Window Key ("{shard-<hashtag>}:sliding-window:<redisRatelimitKey>")
-- KEYS[2]: Token Bucket Key ("{shard-<hashtag>}:token-bucket:<redisRatelimitKey>")
-- KEYS[3]: Token Encoder Key ("{shard-<hashtag>}:token-encoder:<provider>:<model>:<redisRatelimitKey>")
-- ARGV[1]: Estimated token consumption
-- ARGV[2]: Current timestamp (milliseconds)
-- ARGV[3]: Token bucket capacity
-- ARGV[4]: Window size (milliseconds)
-- ARGV[5]: Random string for sliding window unique value (length less than or equal to 255)
local function calculate_tokens_in_range(key, start_time, end_time)
    local valid_list = redis.call('ZRANGEBYSCORE', key, start_time, end_time)
    local valid_tokens = 0
    for _, v in ipairs(valid_list) do
        local _, tokens = struct.unpack('Bc0L', v)
        valid_tokens = valid_tokens + tokens
    end
    return valid_tokens
end

local sliding_window_key = tostring(KEYS[1])
local token_bucket_key = tostring(KEYS[2])
local token_encoder_key = tostring(KEYS[3])

local estimated = tonumber(ARGV[1])
local current_timestamp = tonumber(ARGV[2])
local bucket_capacity = tonumber(ARGV[3])
local window_size = tonumber(ARGV[4])
local random_string = tostring(ARGV[5])

-- Valid window start time
local window_start = current_timestamp - window_size
-- Waiting time
local waiting_time = 0
-- Get bucket
local bucket = redis.call('HMGET', token_bucket_key, 'capacity', 'max_capacity')
local current_capacity = tonumber(bucket[1])
local max_capacity = tonumber(bucket[2])
-- Initialize bucket manually if it doesn't exist
if not current_capacity then
    current_capacity = bucket_capacity
    max_capacity = bucket_capacity
    redis.call('HMSET', token_bucket_key, 'capacity', bucket_capacity, 'max_capacity', bucket_capacity)
    redis.call('ZADD', sliding_window_key, current_timestamp,
        struct.pack('Bc0L', string.len(random_string), random_string, 0))
end
-- Calculate expired tokens
local released_tokens = calculate_tokens_in_range(sliding_window_key, 0, window_start)
if released_tokens > 0 then -- Expired tokens exist, attempt to replenish new tokens
    -- Clean up expired data
    redis.call('ZREMRANGEBYSCORE', sliding_window_key, 0, window_start)
    -- Calculate valid tokens
    local valid_tokens = calculate_tokens_in_range(sliding_window_key, '-inf', '+inf')
    -- Update token count
    if current_capacity + released_tokens > max_capacity then -- If current capacity plus released tokens exceeds max capacity, reset to max capacity minus valid tokens
        current_capacity = max_capacity - valid_tokens
    else -- Otherwise, directly add the released tokens
        current_capacity = current_capacity + released_tokens
    end
    -- Immediately replenish new tokens
    redis.call('HSET', token_bucket_key, 'capacity', current_capacity)
end
-- Plus the difference from the token encoder if it exists
local ttl = redis.call('PTTL', token_encoder_key)
local difference = tonumber(redis.call('GET', token_encoder_key))
if ttl < 0 then
    difference = 0
else
    if difference + estimated >= 0 then
        estimated = estimated + difference
    else
        redis.call('SET', token_encoder_key, 0)
    end
end
-- Check if the request can be satisfied
if max_capacity < estimated or estimated <= 0 then -- If max capacity is less than estimated consumption or estimated is less than or equal to 0, return -1 indicating rejection
    waiting_time = -1
elseif current_capacity < estimated then -- If current capacity is insufficient to satisfy estimated consumption, calculate waiting time
    -- Get the earliest valid timestamp
    local first_valid_window = redis.call('ZRANGE', sliding_window_key, 0, 0, 'WITHSCORES')
    local first_valid_start = tonumber(first_valid_window[2])
    if not first_valid_start then
        first_valid_start = current_timestamp
    end
    -- Waiting time = fixed delay + window size - valid window interval
    waiting_time = 3 + window_size - (current_timestamp - first_valid_start)
else -- Otherwise, capacity satisfies estimated consumption, no waiting required, update data
    redis.call('ZADD', sliding_window_key, current_timestamp,
        struct.pack('Bc0L', string.len(random_string), random_string, estimated))
    current_capacity = current_capacity - estimated
    redis.call('HSET', token_bucket_key, 'capacity', current_capacity)
end

-- Set expiration time to window size plus 5 seconds buffer
redis.call('PEXPIRE', sliding_window_key, window_size + 5000)
redis.call('PEXPIRE', token_bucket_key, window_size + 5000)
redis.call('PEXPIRE', token_encoder_key, window_size + 5000)

return {current_capacity, waiting_time, estimated, difference}

Token修正:

-- KEYS[1]: Sliding Window Key ("{shard-<hashtag>}:sliding-window:<redisRatelimitKey>")
-- KEYS[2]: Token Bucket Key ("{shard-<hashtag>}:token-bucket:<redisRatelimitKey>")
-- KEYS[3]: Token Encoder Key ("{shard-<hashtag>}:token-encoder:<provider>:<model>:<redisRatelimitKey>")
-- ARGV[1]: Estimated token consumption
-- ARGV[2]: Current timestamp (milliseconds)
-- ARGV[3]: Token bucket capacity
-- ARGV[4]: Window size (milliseconds)
-- ARGV[5]: Actual token consumption
-- ARGV[6]: Random string for sliding window value (length less than or equal to 255)
local MAX_SEARCH_ITRATIONS = 64

local function calculate_tokens_in_range(key, start_time, end_time)
    local valid_list = redis.call('ZRANGEBYSCORE', key, start_time, end_time)
    local valid_tokens = 0
    for _, v in ipairs(valid_list) do
        local _, tokens = struct.unpack('Bc0L', v)
        valid_tokens = valid_tokens + tokens
    end
    return valid_tokens
end

local function binary_search_compensation_time(key, L, R, window_size, max_capacity, predicted_error)
    local iter = 0
    while L < R and iter < MAX_SEARCH_ITRATIONS do
        iter = iter + 1
        local mid = math.floor((L + R) / 2)
        local valid_tokens = calculate_tokens_in_range(key, mid - window_size, mid)
        if valid_tokens + predicted_error <= max_capacity then
            R = mid
        else
            L = mid + 1
        end
    end
    return L
end

local sliding_window_key = tostring(KEYS[1])
local token_bucket_key = tostring(KEYS[2])
local token_encoder_key = tostring(KEYS[3])

local estimated = tonumber(ARGV[1])
local current_timestamp = tonumber(ARGV[2])
local bucket_capacity = tonumber(ARGV[3])
local window_size = tonumber(ARGV[4])
local actual = tonumber(ARGV[5])
local random_string = tostring(ARGV[6])

-- Valid window start time
local window_start = current_timestamp - window_size
-- Get bucket
local bucket = redis.call('HMGET', token_bucket_key, 'capacity', 'max_capacity')
local current_capacity = tonumber(bucket[1])
local max_capacity = tonumber(bucket[2])
-- Initialize bucket manually if it doesn't exist
if not current_capacity then
    current_capacity = bucket_capacity
    max_capacity = bucket_capacity
    redis.call('HMSET', token_bucket_key, 'capacity', bucket_capacity, 'max_capacity', bucket_capacity)
    redis.call('ZADD', sliding_window_key, current_timestamp,
        struct.pack('Bc0L', string.len(random_string), random_string, 0))
end
-- Calculate expired tokens
local released_tokens = calculate_tokens_in_range(sliding_window_key, 0, window_start)
if released_tokens > 0 then -- Expired tokens exist, attempt to replenish new tokens
    -- Clean up expired data
    redis.call('ZREMRANGEBYSCORE', sliding_window_key, 0, window_start)
    -- Calculate valid tokens
    local valid_tokens = calculate_tokens_in_range(sliding_window_key, '-inf', '+inf')
    -- Update token count
    if current_capacity + released_tokens > max_capacity then -- If current capacity plus released tokens exceeds max capacity, reset to max capacity minus valid tokens
        current_capacity = max_capacity - valid_tokens
    else -- Otherwise, directly add the released tokens
        current_capacity = current_capacity + released_tokens
    end
    -- Immediately replenish new tokens
    redis.call('HSET', token_bucket_key, 'capacity', current_capacity)
end
-- Update the difference from the token encoder
local difference = actual - estimated
redis.call('SET', token_encoder_key, difference)
-- Correction result for reservation
local correct_result = 0
if estimated < 0 or actual < 0 then
    correct_result = 3 -- Invalid value
elseif estimated < actual then -- Underestimation
    -- Mainly handle underestimation cases to properly limit actual usage; overestimation may reject requests but won't affect downstream services
    -- Calculate prediction error
    local predicted_error = math.abs(actual - estimated)
    -- directly deduct all underestimated tokens
    current_capacity = current_capacity - predicted_error
    redis.call('HSET', token_bucket_key, 'capacity', current_capacity)
    -- Get the latest valid timestamp
    local last_valid_window = redis.call('ZRANGE', sliding_window_key, -1, -1, 'WITHSCORES')
    local compensation_start = tonumber(last_valid_window[2])
    if not compensation_start then -- Possibly all data just expired, use current timestamp minus window size as start
        compensation_start = current_timestamp
    end
    while predicted_error ~= 0 do -- Distribute to future windows until all error is distributed
        if max_capacity >= predicted_error then
            local compensation_time = binary_search_compensation_time(sliding_window_key, compensation_start,
                compensation_start + window_size, window_size, max_capacity, predicted_error)
            if calculate_tokens_in_range(sliding_window_key, compensation_time - window_size, compensation_time) +
                predicted_error > max_capacity then
                correct_result = 1 -- If the compensation time exceeds max capacity, return 1 to indicate failure
                break
            end
            redis.call('ZADD', sliding_window_key, compensation_time,
                struct.pack('Bc0L', string.len(random_string), random_string, predicted_error))
            predicted_error = 0
        else
            redis.call('ZADD', sliding_window_key, compensation_start,
                struct.pack('Bc0L', string.len(random_string), random_string, max_capacity))
            predicted_error = predicted_error - max_capacity
            compensation_start = compensation_start + window_size
        end
    end
elseif estimated > actual then -- Overestimation
    correct_result = 2
end

-- Set expiration time to window size plus 5 seconds buffer
redis.call('PEXPIRE', sliding_window_key, window_size + 5000)
redis.call('PEXPIRE', token_bucket_key, window_size + 5000)
redis.call('PEXPIRE', token_encoder_key, window_size + 5000)

return {correct_result}
Metric日志

默认会独立打印预扣与修正信息保存在sentinel-record.log相同目录中,支持日志翻滚与日志覆盖,打印结构体信息如下:

type MetricItem struct {
	Timestamp uint64 `json:"timestamp"`
	RequestID string `json:"request_id"`
	LimitKey  string `json:"limit_key"`

	// PETA.Withhold
	CurrentCapacity    int64 `json:"current_capacity"`
	EstimatedToken     int64 `json:"estimated_token"`
	Difference         int64 `json:"difference"`
	TokenizationLength int   `json:"tokenization_length"`
	WaitingTime        int64 `json:"waiting_time"`

	// PETA.Correct
	ActualToken   int   `json:"actual_token"`
	CorrectResult int64 `json:"correct_result"`
}
  • 单个请求最多同时包含预扣与修正2条日志记录,可根据需要读取相关日志文件内容做可视化展示,以跟踪PETA策略性能。
LLM用量及限流响应信息

计划在请求成功后的响应header中,添加LLM用量及限流信息。响应结构体声明如下:

type ResponseHeader struct {
	headers      map[string]string
	ErrorCode    int32
	ErrorMessage string
}
  • 请求被拒绝时,返回以下内容
    • headers
      • 请求id(X-Sentinel-LLM-Token-Ratelimit-RequestID)
      • token剩余信息(X-Sentinel-LLM-Token-Ratelimit-RemainingTokens)
      • 等待时间(X-Sentinel-LLM-Token-Ratelimit-WaitingTime)
    • 错误码(ErrorCode)
    • 错误信息(ErrorMessage)
  • 请求被接收时,返回以下内容
    • headers
      • 请求id(X-Sentinel-LLM-Token-Ratelimit-RequestID)
      • token剩余信息(X-Sentinel-LLM-Token-Ratelimit-RemainingTokens)

其中错误码和错误信息从配置文件中获取。

用户接入

接入步骤

从用户角度,接入Sentinel提供的Token限流功能,需要以下几步:

  1. 准备Redis实例

  2. 对 Sentinel 的运行环境进行相关配置并初始化。

    1. 仅支持从yaml文件初始化
  3. 埋点(定义资源),固定ResourceType=ResTypeCommonTrafficType=Inbound的资源类型

  4. 根据下面的配置文件加载规则,规则配置项包括:资源名称、限流策略、具体规则项、redis配置、错误码、错误信息。如下是配置规则的示例,具体字段含义在下文的“配置文件描述”中有具体说明。

    _, err = llmtokenratelimit.LoadRules([]*llmtokenratelimit.Rule{
        {
    
            Resource: ".*",
            Strategy: llmtokenratelimit.FixedWindow,
            SpecificItems: []llmtokenratelimit.SpecificItem{
                {
                    Identifier: llmtokenratelimit.Identifier{
                        Type:  llmtokenratelimit.Header,
                        Value: ".*",
                    },
                    KeyItems: []llmtokenratelimit.KeyItem{
                        {
                            Key:      ".*",
                            Token: llmtokenratelimit.Token{
                                Number:        1000,
                                CountStrategy: llmtokenratelimit.TotalTokens,
                            },
                            Time: llmtokenratelimit.Time{
                                Unit:  llmtokenratelimit.Second,
                                Value: 60,
                            },
                        },
                    },
                },
            },
        },
    })
  5. 可选:创建LLM实例嵌入到提供的适配器中即可

配置描述

配置文件
配置项 类型 必填 默认值 说明
enabled bool false 是否启用LLM Token限流功能,取值:false(不启用)、true(启用)
redis object redis实例连接信息
errorCode int 429 错误码,设置为0时会修改为429
errorMessage string "Too Many Requests" 错误信息

redis配置

配置项 类型 必填 默认值 说明
addrs array of addr object [{name: "127.0.0.1", port: 6379}] redis节点服务,见注意事项说明
username string 空字符串 redis用户名
password string 空字符串 redis密码
dialTimeout int 0 建立redis连接的最长等待时间,单位:毫秒
readTimeout int 0 等待Redis服务器响应的最长时间,单位:毫秒
writeTimeout int 0 向网络连接发送命令数据的最长时间,单位:毫秒
poolTimeout int 0 从连接池获取一个空闲连接的最大等待时间,单位:毫秒
poolSize int 10 连接池中的连接数量
minIdleConns int 5 连接池闲置连接的最少数量
maxRetries int 3 操作失败,最大尝试次数

addr配置

配置项 类型 必填 默认值 说明
name string "127.0.0.1" redis节点服务名称,带服务类型的完整 FQDN 名称,例如 my-redis.dns、redis.my-ns.svc.cluster.local
port int 6379 redis节点服务端口
规则配置

特点:支持LoadRules动态加载

配置项 类型 必填 默认值 说明
resource string ".*" 规则资源名称,支持正则表达式,取值:".*"(全局匹配)、用户自定义正则表达式
strategy string "fixed-window" 限流策略,取值:fixed-window(固定窗口)、peta(预测误差时序分摊)
encoding object token编码方式,专用于peta限流策略
specificItems array of specificItem object 具体规则项

encoding配置

配置项 类型 必填 默认值 说明
provider string "openai" 模型厂商
model string "gpt-4" 模型名称

specificItem配置

配置项 类型 必填 默认值 说明
identifier object 请求标识符
keyItems array of keyItem object 规则匹配的键值信息

identifier配置

配置项 类型 必填 默认值 说明
type string "all" 请求标识符类型,取值:all(全局限流)、header
value string ".*" 请求标识符取值,支持正则表达式,取值:".*"(全局匹配)、用户自定义正则表达式

keyItem配置

配置项 类型 必填 默认值 说明
key string ".*" 具体规则项取值,支持正则表达式,取值:".*"(全局匹配)、用户自定义正则表达式
token object token数量和计算策略配置
time object 时间单位和周期配置

token配置

配置项 类型 必填 默认值 说明
number int token数量,大于等于0
countStrategy string "total-tokens" token计算策略,取值:input-tokens、output-tokens、total-tokens

time配置

配置项 类型 必填 默认值 说明
unit string 时间单位,取值:second、minute、hour、day
value int 时间值,大于等于0

配置文件示例

version: "v1"
sentinel:
  app:
    name: sentinel-go-demo
  log:
    metric:
      maxFileCount: 7
  llmTokenRatelimit:
  	enabled: true
  	
    errorCode: 429
    errorMessage: "Too Many Requests"
    
    redis:
      addrs:
        - name: "127.0.0.1"
          port: 6379
      username: "redis"
      password: "redis"
      dialTimeout: 5000
      readTimeout: 5000
      writeTimeout: 5000
      poolTimeout: 5000
      poolSize: 10
      minIdleConns: 5
      maxRetries: 3

LLM框架适配

目前支持Langchaingo和Eino框架无侵入式接入Sentinel提供的Token限流能力,主要适用于文本生成方面,使用方法详见:

  • pkg/adapters/langchaingo/wrapper.go
  • pkg/adapters/eino/wrapper.go

注意事项

  • 由于目前仅可预知input tokens,所以建议使用PETA专对于input tokens进行限流
  • PETA使用tiktoken-go预估输入消耗token数,但是需要下载或预先配置字节对编码(Byte Pair Encoding,BPE)字典
    • 在线模式
      • 首次使用时,tiktoken-go需要联网下载编码文件
    • 离线模式
      • 预先准备缓存好的tiktoken-go的编码文件(非直接下载文件,而是经过tiktoken-go处理后的文件),并通过配置TIKTOKEN_CACHE_DIR环境变量指定文件目录位置
  • 规则去重说明
    • keyItems中,若仅number不同,会去重保留最新的number
    • specificItems中,仅保留去重后的keyItems
    • resource中,仅保留最新的resource
  • redis配置说明
    • 若连接的redis是集群模式,那么addrs里面的地址数量必须大于等于2个,否则会默认进入redis单点模式,导致限流失效

Metadata

Metadata

Assignees

No one assigned

    Labels

    area/flow-controlIssues or PRs related to flow controlkind/featureCategory issues or PRs related to feature request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions