实时数据是现代许多软件运行的基础:实时的股票价格、聊天应用程序、体育赛事比分以及各种协作工具。要构建这些系统,就必须了解实时通信的运作原理——而这并不总是那么容易理解。
我在尝试开发一个实时期权信息面板时亲身体会到了这一点。使用HTTP请求是无法满足需求的,而且我读到的所有相关资料看起来都过于复杂,直到我重新回归基础知识。这篇文章就是这个学习过程的成果。
我们将从零开始学习Python的`websockets`库,然后进一步了解FastAPI——许多Python后端框架都是基于FastAPI开发的。需要指出的是,WebSocket并不是实现实时通信的唯一解决方案。根据具体的使用场景,WebRTC可能会更适合某些需求,但在深入研究其他技术之前,掌握WebSocket的基本知识才是正确的起点。
目录
WebSocket连接与相关方法
WebSocket连接允许客户端与服务器之间进行双向通信。一旦连接建立,双方就可以自由地交换信息,而无需事先发起请求。这与普通的HTTP请求不同,在HTTP请求中,客户端总是需要先发送请求,服务器才会做出响应。
其连接过程大致如下:
客户端 ←→ 建立连接 →→ 服务器
需要注意的是,WebSocket地址并不是普通的网页地址,因此你不能像访问普通网站那样来“浏览”它。你需要使用专门的客户端程序来与WebSocket服务器进行交互。
不同的开发框架提供了不同的方式来处理WebSocket连接。以Python的`websockets`库为例,当客户端建立连接时,系统会自动接受该连接;而对于FastAPI这样的框架,则需要明确调用`await websocket.accept()`函数,否则连接将会被拒绝。
接下来让我们来看看Python的`websockets`库提供的一些核心方法:
-
websockets.serve(...):启动WebSocket服务器。 -
websockets.connect(...):连接到一个WebSocket服务器。 -
websockets.send(...):向对方发送消息。 -
websockets.recv():从客户端或服务器接收消息。
recv()方法不接受任何参数,因为它仅仅是一个用于等待操作的函数。它会等待下一条消息的到来,并将其返回:
message = await websocket.recv()
如何用Python创建你的第一个WebSocket连接
在深入学习各种框架之前,我们先来了解一下Python的websockets库。你将搭建一个简单的服务器和客户端,并通过WebSocket连接交换消息,这样就能扎实地掌握WebSocket的工作原理了。
环境配置
在你的虚拟环境中运行以下命令,以安装或验证WebSockets包:
pip install websockets
# 或者,用来检查它是否已经安装:
pip show websockets
创建WebSocket服务器
在项目文件夹中创建一个名为server.py的文件,然后将以下代码粘贴到其中:
import asyncio
import websockets
async def handler(connection):
print("客户端已连接")
message = await connection.recv()
print("从客户端收到的消息:", message)
await connection.send("Hello client!"
async def main():
async with websockets.serve(handler, "localhost", 8000):
print("服务器正在运行,地址为ws://localhost:8000")
#await asyncio.Future() # 这行代码会无限等待
await asyncio.sleep(30)
asyncio.run(main())
当执行到这一行代码时:
async with websockets.serve(handler, "localhost", 8000):
该库会在指定的主机和端口上打开一个TCP套接字,并等待有客户端连接进来。一旦有客户端连接成功,就会创建一个连接对象,并将其传递给你的处理函数。
handler函数是必不可少的,因为它决定了服务器应该如何处理每个连接请求。host和port参数也很重要。它们的默认值都是None——如果不指定这两个参数,程序也不会出现错误,因为操作系统在没有指定端口号的情况下是无法创建网络服务器的。
你也可以将port=0作为参数传入,这样操作系统就会自动为你分配一个空闲的端口号。不过这样一来,你就需要额外编写代码来获取这个端口号的具体数值,以便客户端能够连接到服务器:
server.sockets[0].getsockname()
直接明确指定主机和端口号会更为方便,这样客户端就能清楚地知道服务器运行在哪个地址上了。
配置客户端
在同一文件夹中创建一个名为client.py的文件,然后添加以下代码:
import asyncio
import websockets
async def client():
async with websockets.connect("ws://localhost:8000") as websocket:
await websocket.send("Hello server!")
response = await websocket.recv()
print("服务器回复的内容:", response)
asyncio.run(client())
测试连接
首先,打开终端并运行server.py。你应该会看到这样的输出:
服务器正在运行,地址为ws://localhost:8000
在另一个终端中运行client.py。两个终端中都应该会出现确认消息,说明连接已经建立,双方正在进行通信。
请注意,在启动客户端之前,服务器必须已经运行起来——否则客户端将无法找到可连接的服务器,从而导致连接失败。
保持服务器持续运行:关于`asyncio.Future()`的注意事项
在server.py中,有一行代码目前被注释掉了:
await asyncio.Future()
这样可以让服务器无限期地运行下去。然而,在进行本地开发和测试时,使用await asyncio.sleep(30)会更为简便。这种方法能让服务器在固定时间内保持运行状态,而不会永远持续运行。
通过WebSockets传输文件
WebSockets不仅支持文本数据,也支持原始字节数据,这意味着你可以直接通过这种连接方式发送文件。以下是客户端如何通过WebSockets将文件发送给服务器的示例:
更新server.py
async def file_handler(ws):
print("客户端已连接,正在等待文件传输...")
file_bytes = await ws.recv() # 接收字节数据
with open("received_file.png", "wb") as f:
f.write(file_bytes)
print("文件已接收并保存!")
await ws.send("文件发送成功!")
async def main():
async with websockets.serve(file_handler, "localhost", 8000):
print("服务器正在运行,地址为ws://localhost:8000")
await asyncio.sleep(50) # 保持服务器持续运行
asyncio.run(main())
处理程序会使用await ws.recv()来接收传入的字节数据;websockets库会自动判断接收到的数据是文本还是字节,因此不需要进行额外的配置。接收到文件数据后,会以二进制模式将其写入磁盘(使用"wb"),然后服务器会向客户端发送确认消息。
更新client.py
import asyncio
import websockets
async def send_file():
uri = "ws://localhost:8000"
async with websockets.connect(uri) as ws:
with open("portfolio-image.png", "rb") as f: # 以二进制模式打开文件
file_bytes = f.read()
await ws.send(file_bytes) # 发送字节数据
response = await ws.recv()
print("服务器的回复:", response)
asyncio.run(send_file())
客户端会以二进制模式打开文件,将整个文件内容读入内存中,然后通过一次ws.send()调用将其发送给服务器。在收到服务器的确认消息后,客户端才会关闭连接。
进行测试
在你的项目文件夹中添加一张图片,并确保client.py中的文件名与实际文件的名称一致。首先运行server.py,然后在另一个终端中运行client.py。
文件传输完成后,服务器会将接收到的文件保存为received_file.png,保存在同一目录下。你应该会立即看到这个文件出现在你的工作区中。
这种方法会在发送数据之前将整个文件加载到内存中。对于较大的文件来说,分块读取并发送会更为合适。但无论如何,这种方式都是理解WebSocket字节传输机制的最简单途径。
如何连接外部WebSocket服务器
到目前为止,你连接的都是自己构建的服务器。但实际上,WebSocket客户端也可以连接到公共服务器。例如,客户端可以连接到Postman的回显服务器:
import asyncio
import websockets
async def connect_external():
uri = "wss://ws.postman-echo.com/raw" # 公共WebSocket服务器
async with websockets.connect(uri) as ws:
print("已连接到外部服务器!")
# 发送消息
await ws.send("Hello external server!")
print("消息发送成功")
# 接收响应
response = await ws.recv()
print("从服务器收到的回复:", response)
asyncio.run(connect_external())
请注意,客户端是使用wss://这个URI方案来连接Postman的回显服务器的,而不是ws://。这说明该连接是通过TLS进行加密的,这与https://用于保护普通Web请求的方式类似。
回显服务器会直接返回你发送给它的任何内容。因此,当你发送“Hello external server!”时,它也会以同样的形式作为响应返回给你。这种服务器非常适合用来测试你的客户端WebSocket代码,而无需你自己搭建服务器。
FastAPI中的WebSocket
FastAPI提供了WebSocket对象(实际上是通过Starlette实现的),用于管理实时连接。你可以像定义HTTP路由一样来定义WebSocket端点,而Uvicorn会负责处理事件循环——因此完全不需要手动管理asyncio服务器。正因为如此,FastAPI非常适合用于开发各种实时应用,无论是聊天应用程序还是实时数据展示界面。
在开始编写代码之前,先来看看你会经常使用的一些核心方法吧。
接收消息:
await websocket.accept():必须首先调用accept()方法,否则连接将会被拒绝。
发送数据:
await websocket.send_text(data):用于发送字符串。await websocket.send_bytes(data):用于发送二进制数据。await websocket.send_json(data):用于将数据序列化为JSON格式后再发送。
接收响应:
await websocket.receive_text():用于接收文本消息。await websocket.receive_bytes():用于接收二进制数据。await websocket.receive_json():用于接收并解析JSON格式的数据。async for msg in websocket.iter_text():用于遍历接收到的所有消息,在连接断开时会自动退出循环。
关闭连接:
await websocket.close(code=1000):这是标准意义上的关闭连接操作。该方法允许传入一个可选的“原因”参数。
以下是FastAPI中WebSocket生命周期的运作方式:

使用FastAPI构建一个简单的回声服务器
正如你在Postman示例中看到的那样,回声服务器会将客户端发送的消息原封不动地返回给客户端。现在让我们用FastAPI来构建这样一个服务器吧。
1. 安装FastAPI:
pip install "fastapi[standard]"
2. 更新`server.py`文件:
from fastapi import FastAPI, WebSocket
app = FastAPI()
@appwebsocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
data = await websocket.receive_text()
await websocket.send_text(f"您输入的内容是:{data}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)
与普通的`websockets`库相比,这里有几点需要注意:
-
WebSocket端点的定义方式与HTTP路由相同,使用`@appwebsocket(“/ws”)`即可。
-
在开始其他操作之前,必须先执行`await websocket.accept()`。如果不这样做,FastAPI将无法接受连接请求。
-
通过`if name == “__main__”`这块代码,Uvicorn会自动处理事件循环和服务器启动过程。因此不需要使用`asyncio.run()`或`asyncio.Future()`。
3. 更新`client.py`文件:
async def test_client():
uri = "ws://127.0.0.1:8000/ws"
async with websockets.connect(uri) as ws:
await ws.send("Hello FastAPI server!")
response = await ws.recv()
print("服务器的回复是:", response)
asyncio.run(test_client())
由于FastAPI服务器没有使用TLS进行加密,因此客户端使用的URI地址为`ws://`而非`wss://`。请确保你的服务器代码中指定的主机和端口信息是正确的。
4. 测试回声服务器的功能:
首先运行`server.py`,然后在另一个终端中执行`client.py`。服务器端应该会显示出客户端发送的消息。

如何在FastAPI中处理WebSocket连接断开的情况
在实时应用中,客户端有时会主动断开连接,有时则会意外中断连接。如果处理不当,这可能会导致服务器崩溃或处于异常状态。
当客户端突然关闭连接时,FastAPI会抛出`WebSocketDisconnect`异常,这样服务器就可以优雅地处理这种断开情况,记录相关事件,并及时释放资源,从而避免系统出现故障。
这里有一个例子:
@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
await ws.accept()
try:
while True:
data = await ws.receive_text()
if "bye" in data or "quit" in data:
await ws.send_text("关闭连接")
await ws.close(code=1000, reason="服务器请求关闭")
break
await ws.send_text(f“我收到了您的请求:{data}”
except WebSocketDisconnect:
print("客户端已断开连接") # 连接已经关闭
服务器会进入一个持续运行的循环,等待接收来自客户端的消息。如果客户端发送的消息中包含“bye”或“quit”,服务器会立即响应,执行await ws.close(code=1000)命令,从而正常退出循环。
但如果客户端意外断开连接,WebSocketDisconnect异常会被捕获,服务器也会继续正常运行而不会崩溃。由于此时客户端的连接已经关闭,因此在异常处理块中调用ws.close()其实是多余的。
总结
WebSockets通过保持客户端与服务器之间的持久连接,使得实时通信成为可能。使用Python的websockets库可以帮助我们了解这一协议的具体工作原理,而像FastAPI这样的框架则能为生产环境中的应用程序提供所需的架构支持。
初学者最容易遇到困惑的地方通常是与asyncio以及FastAPI中显式需要的websocket.accept()>方法相关的内容。对于使用asyncio的人来说,往往会疑惑为什么需要这个方法,以及为什么如果不执行它服务器就会立即关闭。而如果之前只接触过普通的websockets库,那么可能会忽略掉websocket.accept()>这一步骤,因为在该库中这个操作是自动完成的。一旦弄明白了这些原理,其他相关内容就会自然而然地被理解了。