API为从移动应用到企业平台等各种系统提供支持,每天要默默处理数以百万计的请求。如果没有相应的保护机制,哪怕是一个配置错误的客户端,或者突然出现的大量自动化请求,都可能使你的服务不堪重负,从而导致所有用户的体验下降。

限流机制可以避免这种情况。它能够控制客户端在指定时间内所能发出的请求数量,从而保护你的基础设施免受恶意攻击或意外超负荷的影响。

在各种用于限流的算法中,令牌桶算法因其简洁性与灵活性而备受青睐。与那些会突然重置的固定时间窗口计数器不同,令牌桶算法允许短时间内发出大量请求,同时仍能确保长期内的请求速率得到有效控制。因此,对于那些需要客户端偶尔快速发送少量请求的场景来说,这种算法是非常实用的选择。

在本指南中,你将学习如何在FastAPI应用中实现令牌桶限流机制。你会从零开始编写这个算法,将其作为Python类来实现;然后将其配置为中间件,以便能够针对不同用户进行请求限制;同时还会在响应中添加标准的限流头部信息,并通过简单的测试脚本来验证整个系统的功能。最终,你将获得一个可以直接应用于任何FastAPI项目的实用限流工具。

我们将涵盖的内容:

  1. 先决条件

  2. 了解令牌桶算法

  3. 配置FastAPI项目

  4. 实现令牌桶类

  5. 添加针对用户的限流中间件

  6. 测试限流机制

  7. 了解限流在系统架构中的位置

  8. 总结

先决条件

要完成本教程的学习,你需要满足以下要求:

  • 你的计算机上必须安装了Python 3.9或更高版本。你可以通过运行`python –version`来验证自己的Python版本。

  • 你需要对Python有一定的了解,并掌握HTTP API的基本工作原理。

  • 你需要准备一个文本编辑器,比如VS Code、Vim或任何你喜欢的编辑工具。

了解令牌桶算法

在开始编写代码之前,先了解所要实现的限流机制是非常有帮助的。

令牌桶算法通过两个简单的概念来实现限流功能:一个是用来存储令牌的令牌桶,另一个则是以恒定速率向令牌桶中添加令牌的补充机制

其工作原理如下:

  1. 该“令牌桶”最初是满的,其中存储着固定数量的令牌(即其最大容量)。

  2. 每个传入的请求都会消耗一个令牌。如果令牌桶中有足够的令牌,请求就会被允许通过,同时会扣除一个令牌。

  3. 如果令牌桶为空,请求将会被拒绝,并返回429 Too Many Requests错误响应。

  4. 无论是否有新的请求传入,令牌都会以固定的速率被重新添加到令牌桶中。因此,令牌桶的容量永远不会超过其最大值。

“容量”决定了系统能够短时间内处理多少请求;“补充速率”则决定了系统的持续吞吐能力。例如,如果一个令牌桶的最大容量为10个令牌,且每秒有2个令牌被添加到其中,那么客户端就可以一次性发送10个请求;但在之后,他们每秒只能发送2个请求,直到令牌桶再次被补充满为止。

这种由两个参数构成的设计方式能够让你对系统的行为进行精确的控制:

>

参数 控制作用 示例
容量(最大令牌数) 系统允许短时间内处理的请求数量上限 10个令牌 = 一次性可以发送10个请求
补充速率 系统的持续吞吐能力 2个令牌/秒 = 长期而言,每秒只能发送2个请求
补充间隔时间 令牌被重新添加的频率 1.0秒 = 每秒会添加一个令牌

与其他速率限制算法相比:

  • 固定窗口算法的计数器会在固定的时间点重置,这可能会导致在窗口边界处系统的处理速度突然加倍。而令牌桶算法并没有这样的限制。

  • 滑动窗口算法虽然更精确,但实现和维护起来更为复杂。

  • 漏桶算法会以固定的速率处理请求,其余的请求则会被排队等待。令牌桶算法与之类似,但它允许短时间内集中处理大量请求,而不是强制保持恒定的处理速度。

令牌桶算法在许多生产环境中都被广泛使用。AWS API Gateway、NGINX以及Stripe等工具都采用了这种算法的变体。

配置FastAPI项目

首先创建一个项目目录,然后安装所需的依赖项:

mkdir fastapi-ratelimit && cd fastapi-ratelimit

接下来创建并激活虚拟环境:

python -m venv venv

在Linux/macOS系统中:

source venv/bin/activate

在Windows系统中:

venv\Scripts\activate

然后安装FastAPI和Uvicorn:

pip install fastapi uvicorn

最后创建项目文件的结构:

fastapi-ratelimit/
├── main.py
└── ratelimiter.py

接下来编写main.py文件,创建一个最基本的FastAPI应用程序:

from fastapi import FastAPI

app = FastAPI()

@app.get/)
async def root():
    return {"message": "Hello, world!"}

启动服务器以验证配置是否正确:

uvicorn main:app --reload

你应该会看到类似的输出结果:

INFO:     Uvicorn正在运行,地址为http://127.0.0.1:8000(按CTRL+C可退出)
INFO:     重载进程已启动

在浏览器中访问 http://127.0.0.1:8000,或者使用curl命令访问 http://127.0.0.1:8000。你应该会收到如下响应:

{"message": "Hello, world!"}

当项目运行正常后,你就可以继续构建速率限制器了。

实现令牌桶算法

在编辑器中打开 ratelimiter.py 文件,并添加以下代码。这个类实现了令牌桶算法,同时确保操作是线程安全的:

import time
import threading

class TokenBucket:
    """
    令牌桶速率限制器。

    每个令牌桶最初都包含 `max_tokens` 个令牌,每隔 `interval` 秒会补充 `refill_rate` 个令牌,直到达到最大容量为止。

def __init__(self, max_tokens: int, refill_rate: int, interval: float): """ 初始化一个新的令牌桶对象。 :param max_tokens: 令牌桶最多可以容纳的令牌数量。 :param refill_rate: 每次补充时添加的令牌数量。 :param interval: 补充令牌之间的时间间隔(以秒为单位)。 """ assert max_tokens > 0, "max_tokens必须为正数" assert refill_rate > 0, "refill_rate必须为正数" assert interval > 0, "interval必须为正数" self.max_tokens = max_tokens self.refill_rate = refill_rate self.interval = interval self.tokens = max_tokens self/refilled_at = time.time() self.lock = threading.Lock() def _refill(self): """根据自上次补充令牌以来经过的时间来添加新的令牌。""" now = time.time() elapsed = now - self.refilled_at if elapsed >= self.interval: num_refills = int(elapsed // self.interval) self.tokens = min( self.max_tokens, selftokens + num_refills * self/refill_rate ) # 将时间戳更新为当前时间加上已消耗的间隔时间, # 这样就不会丢失部分间隔时间。 self.refilled_at += num_refills * self.interval def allow_request(self, tokens: int = 1) -> bool: """ 尝试从令牌桶中获取 `tokens` 个令牌。 如果令牌桶中有足够的令牌,则返回 True;否则返回 False。 """ with self.lock: self._refill() if self.tokens >= tokens: self.tokens -= tokens return True return False def get_remaining(self) -> int: """返回当前剩余的令牌数量。""" with self.lock: self._refill() return selftokens def get_reset_time(self) -> float: """返回下一次补充令牌时的 Unix 时间戳。""" with self.lock: return self.refilled_at + self.interval

该类包含三个公共方法:

  • allow_request()是核心方法。它会根据经过的时间来补充令牌数量,然后尝试使用其中一个令牌。如果请求被允许,该方法会返回True;如果令牌池为空,则返回False

  • get_remaining()用于返回客户端剩余的令牌数量。你可以将这个值添加到响应头中。

  • get_reset_time()用于告知下一次何时会补充新的令牌。这一信息也会被包含在响应头中。

threading.Lock确保了在同时有多个请求读取或修改令牌数量时,不会发生竞争条件。这一点非常重要,因为FastAPI是并发执行请求处理函数的。

注意:当前的实现是将令牌池的状态存储在内存中。如果你重新启动服务器,所有令牌池的状态都会被重置。如果需要在重启后或在不同服务器实例之间保持数据持久性,你应该将令牌数量存储在Redis或其他外部存储系统中。对于单实例部署来说,这种内存存储方式已经足够使用了。

添加针对每个用户的速率限制中间件

如果使用一个全局的令牌池来控制所有用户的请求,那么一个高负载用户可能会耗尽所有人的限额。因此,应该为每个用户分配一个独立的令牌池,而这些令牌池是按照用户的IP地址来区分的。

ratelimiter.py文件中,在TokenBucket类之后添加以下代码:

from collections import defaultdict


class RateLimiterStore:
    """
    用于管理针对每个用户的令牌池。

    每个唯一的客户端标识符(例如IP地址)都会对应一个具有相同配置的令牌池。
    """

    def __init__(self, max_tokens: int, refill_rate: int, interval: float):
        self.max_tokens = max_tokens
        self.refill_rate = refill_rate
        self.interval = interval
        self._buckets: dict[str, TokenBucket] = {}
        self._lock = threading.Lock()

    def get_bucket(self, key: str) -> TokenBucket:
        """
        返回与指定客户端标识符对应的令牌池。
        如果该令牌池还不存在,就会创建一个新的。
        """
        with self._lock:
            if key not in self._buckets:
                self._buckets[key] = TokenBucket(
                    max_tokens=self.max_tokens,
                    refill_rate=self.refill_rate,
                    interval=self.interval,
                )
            return self._buckets[key]

现在打开main.py文件,将其内容替换为完整的应用程序代码,包括速率限制中间件:

import time

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

from ratelimiter import RateLimiterStore

app = FastAPI()

# 配置速率限制:允许10次连续请求,每1秒补充2个令牌。
limiter = RateLimiterStore(max_tokens=10, refill_rate=2, interval=1.0)


@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    """
    该中间件会对每个请求实施基于IP地址的速率限制,并在所有响应中添加相应的速率限制头部信息。
    """

    # 根据客户端的IP地址来识别其对应的令牌池。
    client_ip = request.client.host
    bucket = limiter.get_bucket(client_ip)

    # 检查客户端是否有可用的令牌。
    if not bucket.allow_request():
        retry_after = bucket.get_reset_time() - time.time()
        return JSONResponse(
            status_code=429,
            content={"detail": "请求次数过多,请稍后再试"},
            headers={
                "Retry-After": str(max(1, int(retry_after))),
                "X-RateLimit-Limit": str(bucket.max_tokens),
                "X-RateLimit-Remaining": str(bucket.get_remaining()),
                "X-RateLimit-Reset": str(int(bucket.get_reset_time')),
            },
        )

    # 请求被允许,继续处理并在响应中添加速率限制头部信息。
    response = await call_next(request)
    response.headers["X-RateLimit-Limit"] = str(bucket.max_tokens)
    response.headers["X-RateLimit-Remaining"] = str(bucket.get_remaining())
    responseheaders["X-RateLimit-Reset"] = str(int(bucket.get_reset_time()))
    return response


@app.get/)
async def root():
    return {"message": "Hello, world!"}


@app.get("/data")
async def get_data():
    return {"data": "一些重要信息"}


@app.get("/health")
async def health():
    return {"status": "ok"}

该中间件会对每一个传入的请求执行以下操作:

  1. request.client.host中提取客户的IP地址。

  2. 从存储系统中检索(或创建)该客户的令牌桶。

  3. 调用allow_request()函数。如果令牌桶为空,它会返回一个429响应,并在响应头中设置Retry-After字段,告知客户需要等待多长时间。

  4. 如果令牌可用,它就会正常处理请求,并在响应中添加速率限制相关的头部信息。

这三个X-RateLimit-*头部信息遵循广泛采用的规范

头部信息 含义
X-RateLimit-Limit 最大并发请求量(即最大可使用的令牌数)
X-RateLimit-Remaining 当前令牌桶中剩余的令牌数量
X-RateLimit-Reset 下次令牌补充发生的Unix时间戳

这些头部信息能够让行为规范的客户端在达到速率限制之前自行控制请求频率。

测试速率限制功能

如果服务器尚未运行,请先重启它:

uvicorn main:app --reload

使用curl进行手动测试

在开发过程中,使用curl进行手动测试非常有用,因为这样你可以快速验证中间件的功能是否正常。通过发送一个请求,你可以确认速率限制头部信息是否存在、其数值是否正确,以及是否确实消耗了一个令牌。

这种测试方法速度很快,也不需要额外的配置,因此非常适合在修改配置后快速检查其效果。

发送一个请求并查看响应结果:

curl -i http://127.0.0.1:8000/data

你应该会看到一个200状态的响应,其中包含以下头部信息:

HTTP/1.1 200 OK
x-ratelimit-limit: 10
x-ratelimit-remaining: 9
x-ratelimit-reset: 1739836801

自动化突发测试

虽然curl可以确认速率限制功能是否处于激活状态,但它无法验证当令牌桶为空时该功能是否真的会阻止请求的发送。为此,你需要以超过令牌补充频率的速度发送请求,然后观察系统是否会返回429响应。在将中间件部署到生产环境之前、在修改令牌桶相关参数之后,或者当需要验证其限制请求和补充令牌的功能时,进行自动化突发测试是非常必要的。

在项目目录中创建一个名为test_ratelimit.py的文件:

import requests
import time

def test_burst():
"""发送15次快速请求,以触发速率限制机制。」
url = "http://127.0.0.1:8000/data"
results = []

for i in range(15):
response = requests.get(url)
remaining = response.headers.get("X-RateLimit-Remaining", "N/A")
results.append((i + 1, response.status_code, remaining))
print(f"请求 {i+1:2d} | 状态:{response.status_code} | 剩余次数:{remaining}")

print()

allowed = sum(1 for _, status, _ in results if status == 200)
blocked = sum(1 for _, status, _ in results if status == 429)
print(f"允许的请求次数:{allowed}, 被阻止的请求次数:{blocked}")

def test_refill():
"""耗尽所有令牌,等待令牌重新补充,然后再次验证请求是否能够成功发送。」
url = "http://127.0.0.1:8000/data"

print("\n--- 正在消耗令牌 ---")
for i in range(12):
response = requests.get(url)
print(f"请求 {i+1:2d} | 状态:{response.status_code}")

print("\n--- 正在等待3秒以补充令牌 ---")
time.sleep(3)

print("\n--- 令牌补充完成后,开始发送请求 ---")
for i in range(5):
response = requests.get(url)
remaining = response.headers.get("X-RateLimit-Remaining", "N/A")
print(f"请求 {i+1:2d} | 状态:{response.status_code} | 剩余次数:{remaining}")

if __name__ == "__main__":
print("=== 快速请求测试 ===")
test_burst()

# 在进行下一次测试之前,让令牌系统有时间重新补充资源
time.sleep(6)

print("\n=== 令牌补充测试 ===")
test_refill()

如果还没有安装 requests 库,请先进行安装:

pip install requests

然后运行测试程序:

python test_ratelimit.py

你应该会看到类似以下的输出结果:

=== 突发请求测试 ===
请求 1 | 状态:200 | 剩余令牌数:9
请求 2 | 状态:200 | 剩余令牌数:8
请求 3 | 状态:200 | 剩余令牌数:7
...
请求 10 | 状态:200 | 剩余令牌数:0
请求 11 | 状态:429 | 剩余令牌数:0
请求 12 | 状态:429 | 剩余令牌数:0
...
请求 15 | 状态:429 | 剩余令牌数:0

允许通过的请求数量:10次;被拒绝的请求数量:5次

前10次请求均成功执行(每次请求都使用了桶中的令牌)。而第11到第15次请求因为桶中的令牌已经用完而被拒绝。后续的测试验证了:在等待一段时间后,令牌会重新被补充到桶中,因此这些请求也能再次成功执行。

注意:由于时间因素的影响,允许通过的请求与被拒绝的请求之间的具体比例可能会略有变化。在连续发送请求的情况下,令牌数量也有可能在短时间内得到补充,这种行为是正常的。

限流机制在您的架构中的应用位置

本教程中介绍的实现方法是在应用程序的内部运行的,这种方法最为简单,也非常适合单实例环境。而在大型系统中,限流机制通常会在多个层次上得到应用:

  • API网关层(如NGINX、Kong、Traefik、Envoy):在所有请求到达应用程序之前,会对这些请求施加全局性的限流限制。这种机制可以有效防止大规模的滥用行为以及DDoS攻击。

  • 应用程序层(本教程中的实现方式):在服务内部为每个用户或每个端点设置详细的限流规则。这种方式有助于针对不同的API接口层级实施不同的配额限制。

  • 两者结合使用:许多生产环境会同时使用网关层的全球性限流机制和应用程序内部的用户级限流机制。网关层用于拦截过多的请求,而应用程序层则负责执行具体的业务规则。

对于多实例部署环境(即在负载均衡器后运行多个服务器进程的情况),内存中的 RateLimiterStore 无法在各个实例之间共享状态数据。在这种情况下,应该将这种内存存储方式替换为Redis。令牌桶的逻辑本身并没有发生变化,只是存储机制发生了改变而已。

总结

通过本教程的学习,您从零开始构建了一个基于令牌桶的限流系统,并将其集成到了FastAPI应用程序中,实现了针对用户的个性化限流功能以及标准的响应头设置。同时,您还对这一实现方式进行了测试,确认其突发处理能力和令牌补充机制确实能够按照预期工作。

令牌桶算法为您提供了两种直观的控制手段:一是用于应对突发请求的容量控制,二是用于保证持续吞吐量的令牌补充速率。这两种机制几乎可以满足所有限流场景的需求。

在此基础上,您还可以进一步扩展这一技术框架……

  • 在多实例部署环境中,用 Redis 替代内存存储系统。

  • 通过创建不同的 RateLimiterStore 实例,为每个接口设置不同的速率限制规则。

  • 使用经过身份验证的用户 ID 而不是 IP 地址,以便更准确地识别客户端。

  • 添加相应的指标监控和日志记录功能,以便了解客户端被限制访问的频率。

因此,

Comments are closed.