实时数据是现代许多软件运行的基础:实时的股票价格、聊天应用程序、体育赛事比分以及各种协作工具。要构建这些系统,就必须了解实时通信的运作原理——而这并不总是那么容易理解。

我在尝试开发一个实时期权信息面板时亲身体会到了这一点。使用HTTP请求是无法满足需求的,而且我读到的所有相关资料看起来都过于复杂,直到我重新回归基础知识。这篇文章就是这个学习过程的成果。

我们将从零开始学习Python的`websockets`库,然后进一步了解FastAPI——许多Python后端框架都是基于FastAPI开发的。需要指出的是,WebSocket并不是实现实时通信的唯一解决方案。根据具体的使用场景,WebRTC可能会更适合某些需求,但在深入研究其他技术之前,掌握WebSocket的基本知识才是正确的起点。

目录

  1. WebSocket连接与相关方法

  2. 如何用Python构建第一个WebSocket应用

  3. 通过WebSocket进行文件传输

  4. 如何连接外部WebSocket服务器

  5. FastAPI中的WebSocket技术

  6. 如何在FastAPI中处理WebSocket连接中断

  7. 总结

WebSocket连接与相关方法

WebSocket连接允许客户端与服务器之间进行双向通信。一旦连接建立,双方就可以自由地交换信息,而无需事先发起请求。这与普通的HTTP请求不同,在HTTP请求中,客户端总是需要先发送请求,服务器才会做出响应。

其连接过程大致如下:

        客户端 ←→ 建立连接 →→ 服务器

需要注意的是,WebSocket地址并不是普通的网页地址,因此你不能像访问普通网站那样来“浏览”它。你需要使用专门的客户端程序来与WebSocket服务器进行交互。

不同的开发框架提供了不同的方式来处理WebSocket连接。以Python的`websockets`库为例,当客户端建立连接时,系统会自动接受该连接;而对于FastAPI这样的框架,则需要明确调用`await websocket.accept()`函数,否则连接将会被拒绝。

接下来让我们来看看Python的`websockets`库提供的一些核心方法:

  1. websockets.serve(...):启动WebSocket服务器。

  2. websockets.connect(...):连接到一个WebSocket服务器。

  3. websockets.send(...):向对方发送消息。

  4. websockets.recv():从客户端或服务器接收消息。

recv()方法不接受任何参数,因为它仅仅是一个用于等待操作的函数。它会等待下一条消息的到来,并将其返回:
message = await websocket.recv()

如何用Python创建你的第一个WebSocket连接

在深入学习各种框架之前,我们先来了解一下Python的websockets库。你将搭建一个简单的服务器和客户端,并通过WebSocket连接交换消息,这样就能扎实地掌握WebSocket的工作原理了。

环境配置

在你的虚拟环境中运行以下命令,以安装或验证WebSockets包:

pip install websockets
# 或者,用来检查它是否已经安装:
pip show websockets

创建WebSocket服务器

在项目文件夹中创建一个名为server.py的文件,然后将以下代码粘贴到其中:

import asyncio
import websockets

async def handler(connection):
print("客户端已连接")

message = await connection.recv()
print("从客户端收到的消息:", message)
await connection.send("Hello client!"

async def main():
async with websockets.serve(handler, "localhost", 8000):
print("服务器正在运行,地址为ws://localhost:8000")
#await asyncio.Future() # 这行代码会无限等待
await asyncio.sleep(30)

asyncio.run(main())

当执行到这一行代码时:

async with websockets.serve(handler, "localhost", 8000):

该库会在指定的主机和端口上打开一个TCP套接字,并等待有客户端连接进来。一旦有客户端连接成功,就会创建一个连接对象,并将其传递给你的处理函数。

handler函数是必不可少的,因为它决定了服务器应该如何处理每个连接请求。hostport参数也很重要。它们的默认值都是None——如果不指定这两个参数,程序也不会出现错误,因为操作系统在没有指定端口号的情况下是无法创建网络服务器的。

你也可以将port=0作为参数传入,这样操作系统就会自动为你分配一个空闲的端口号。不过这样一来,你就需要额外编写代码来获取这个端口号的具体数值,以便客户端能够连接到服务器:

server.sockets[0].getsockname()

直接明确指定主机和端口号会更为方便,这样客户端就能清楚地知道服务器运行在哪个地址上了。

配置客户端

在同一文件夹中创建一个名为client.py的文件,然后添加以下代码:

import asyncio
import websockets

async def client():
async with websockets.connect("ws://localhost:8000") as websocket:
await websocket.send("Hello server!")
response = await websocket.recv()
print("服务器回复的内容:", response)

asyncio.run(client())

测试连接

首先,打开终端并运行server.py。你应该会看到这样的输出:

服务器正在运行,地址为ws://localhost:8000

在另一个终端中运行client.py。两个终端中都应该会出现确认消息,说明连接已经建立,双方正在进行通信。

请注意,在启动客户端之前,服务器必须已经运行起来——否则客户端将无法找到可连接的服务器,从而导致连接失败。

保持服务器持续运行:关于`asyncio.Future()`的注意事项

server.py中,有一行代码目前被注释掉了:

await asyncio.Future()

这样可以让服务器无限期地运行下去。然而,在进行本地开发和测试时,使用await asyncio.sleep(30)会更为简便。这种方法能让服务器在固定时间内保持运行状态,而不会永远持续运行。

通过WebSockets传输文件

WebSockets不仅支持文本数据,也支持原始字节数据,这意味着你可以直接通过这种连接方式发送文件。以下是客户端如何通过WebSockets将文件发送给服务器的示例:

更新server.py

async def file_handler(ws):
    print("客户端已连接,正在等待文件传输...")
    file_bytes = await ws.recv()  # 接收字节数据
    with open("received_file.png", "wb") as f:
        f.write(file_bytes)
    print("文件已接收并保存!")
    await ws.send("文件发送成功!")

async def main():
    async with websockets.serve(file_handler, "localhost", 8000):
        print("服务器正在运行,地址为ws://localhost:8000")
        await asyncio.sleep(50)  # 保持服务器持续运行

asyncio.run(main())

处理程序会使用await ws.recv()来接收传入的字节数据;websockets库会自动判断接收到的数据是文本还是字节,因此不需要进行额外的配置。接收到文件数据后,会以二进制模式将其写入磁盘(使用"wb"),然后服务器会向客户端发送确认消息。

更新client.py

import asyncio
import websockets

async def send_file():
    uri = "ws://localhost:8000"
    async with websockets.connect(uri) as ws:
        with open("portfolio-image.png", "rb") as f:  # 以二进制模式打开文件
            file_bytes = f.read()
        await ws.send(file_bytes)  # 发送字节数据
        response = await ws.recv()
        print("服务器的回复:", response)

asyncio.run(send_file())

客户端会以二进制模式打开文件,将整个文件内容读入内存中,然后通过一次ws.send()调用将其发送给服务器。在收到服务器的确认消息后,客户端才会关闭连接。

进行测试

在你的项目文件夹中添加一张图片,并确保client.py中的文件名与实际文件的名称一致。首先运行server.py,然后在另一个终端中运行client.py

文件传输完成后,服务器会将接收到的文件保存为received_file.png,保存在同一目录下。你应该会立即看到这个文件出现在你的工作区中。

这种方法会在发送数据之前将整个文件加载到内存中。对于较大的文件来说,分块读取并发送会更为合适。但无论如何,这种方式都是理解WebSocket字节传输机制的最简单途径。

如何连接外部WebSocket服务器

到目前为止,你连接的都是自己构建的服务器。但实际上,WebSocket客户端也可以连接到公共服务器。例如,客户端可以连接到Postman的回显服务器:

import asyncio
import websockets

async def connect_external():
    uri = "wss://ws.postman-echo.com/raw"  # 公共WebSocket服务器
    async with websockets.connect(uri) as ws:
        print("已连接到外部服务器!")
        
        # 发送消息
        await ws.send("Hello external server!")
        print("消息发送成功")
        
        # 接收响应
        response = await ws.recv()
        print("从服务器收到的回复:", response)
asyncio.run(connect_external())

请注意,客户端是使用wss://这个URI方案来连接Postman的回显服务器的,而不是ws://。这说明该连接是通过TLS进行加密的,这与https://用于保护普通Web请求的方式类似。

回显服务器会直接返回你发送给它的任何内容。因此,当你发送“Hello external server!”时,它也会以同样的形式作为响应返回给你。这种服务器非常适合用来测试你的客户端WebSocket代码,而无需你自己搭建服务器。

FastAPI中的WebSocket

FastAPI提供了WebSocket对象(实际上是通过Starlette实现的),用于管理实时连接。你可以像定义HTTP路由一样来定义WebSocket端点,而Uvicorn会负责处理事件循环——因此完全不需要手动管理asyncio服务器。正因为如此,FastAPI非常适合用于开发各种实时应用,无论是聊天应用程序还是实时数据展示界面。

在开始编写代码之前,先来看看你会经常使用的一些核心方法吧。

接收消息:

  • await websocket.accept():必须首先调用accept()方法,否则连接将会被拒绝。

发送数据:

  • await websocket.send_text(data):用于发送字符串。
  • await websocket.send_bytes(data):用于发送二进制数据。
  • await websocket.send_json(data):用于将数据序列化为JSON格式后再发送。

接收响应:

  • await websocket.receive_text():用于接收文本消息。
  • await websocket.receive_bytes():用于接收二进制数据。
  • await websocket.receive_json():用于接收并解析JSON格式的数据。
  • async for msg in websocket.iter_text():用于遍历接收到的所有消息,在连接断开时会自动退出循环。

关闭连接:

  • await websocket.close(code=1000):这是标准意义上的关闭连接操作。该方法允许传入一个可选的“原因”参数。

以下是FastAPI中WebSocket生命周期的运作方式:

FastAPI中的WebSocket

使用FastAPI构建一个简单的回声服务器

正如你在Postman示例中看到的那样,回声服务器会将客户端发送的消息原封不动地返回给客户端。现在让我们用FastAPI来构建这样一个服务器吧。

1. 安装FastAPI:

pip install "fastapi[standard]"

2. 更新`server.py`文件:

from fastapi import FastAPI, WebSocket

app = FastAPI()

@appwebsocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    data = await websocket.receive_text()
   
    await websocket.send_text(f"您输入的内容是:{data}")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000)

与普通的`websockets`库相比,这里有几点需要注意:

  • WebSocket端点的定义方式与HTTP路由相同,使用`@appwebsocket(“/ws”)`即可。

  • 在开始其他操作之前,必须先执行`await websocket.accept()`。如果不这样做,FastAPI将无法接受连接请求。

  • 通过`if name == “__main__”`这块代码,Uvicorn会自动处理事件循环和服务器启动过程。因此不需要使用`asyncio.run()`或`asyncio.Future()`。

3. 更新`client.py`文件:

async def test_client():
    uri = "ws://127.0.0.1:8000/ws"
    async with websockets.connect(uri) as ws:
        await ws.send("Hello FastAPI server!")
        response = await ws.recv()
        print("服务器的回复是:", response)

asyncio.run(test_client())

由于FastAPI服务器没有使用TLS进行加密,因此客户端使用的URI地址为`ws://`而非`wss://`。请确保你的服务器代码中指定的主机和端口信息是正确的。

4. 测试回声服务器的功能:

首先运行`server.py`,然后在另一个终端中执行`client.py`。服务器端应该会显示出客户端发送的消息。

88629d79-eb91-4af5-9752-f9596ff5e5a4

如何在FastAPI中处理WebSocket连接断开的情况

在实时应用中,客户端有时会主动断开连接,有时则会意外中断连接。如果处理不当,这可能会导致服务器崩溃或处于异常状态。

当客户端突然关闭连接时,FastAPI会抛出`WebSocketDisconnect`异常,这样服务器就可以优雅地处理这种断开情况,记录相关事件,并及时释放资源,从而避免系统出现故障。

这里有一个例子:

@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
    await ws.accept()
    try:
        while True:
            data = await ws.receive_text()
   
            if "bye" in data or "quit" in data:
                await ws.send_text("关闭连接")
                await ws.close(code=1000, reason="服务器请求关闭")  
                break
            await ws.send_text(f“我收到了您的请求:{data}”
    except WebSocketDisconnect:
        print("客户端已断开连接")  # 连接已经关闭

服务器会进入一个持续运行的循环,等待接收来自客户端的消息。如果客户端发送的消息中包含“bye”或“quit”,服务器会立即响应,执行await ws.close(code=1000)命令,从而正常退出循环。

但如果客户端意外断开连接,WebSocketDisconnect异常会被捕获,服务器也会继续正常运行而不会崩溃。由于此时客户端的连接已经关闭,因此在异常处理块中调用ws.close()其实是多余的。

总结

WebSockets通过保持客户端与服务器之间的持久连接,使得实时通信成为可能。使用Python的websockets库可以帮助我们了解这一协议的具体工作原理,而像FastAPI这样的框架则能为生产环境中的应用程序提供所需的架构支持。

初学者最容易遇到困惑的地方通常是与asyncio以及FastAPI中显式需要的websocket.accept()方法相关的内容。对于使用asyncio的人来说,往往会疑惑为什么需要这个方法,以及为什么如果不执行它服务器就会立即关闭。而如果之前只接触过普通的websockets库,那么可能会忽略掉websocket.accept()这一步骤,因为在该库中这个操作是自动完成的。一旦弄明白了这些原理,其他相关内容就会自然而然地被理解了。

Comments are closed.