从 FastAPI 说起,理解 Python 异步 IO 的原理

因为 Python 是解释型语言,当使用 Python 做后端开发时,如 Python + Django,相比 Java + Spring,其响应时间会长一点,但只要代码合理,差别也不太大。但 Django 即使使用多进程模式,其并发处理能力还是会差不少。Python 有一些提升并发处理能力的方案,比如使用异步框架 FastAPI,借助其异步能力,可以大大提升 IO 密集型任务的并发处理能力。FastAPI 算是最快的 Python 框架之一

FastAPI

我们先简单看看 FastAPI 怎么用。

示例 1:默认网络异步 IO

安装:

python -m venv venv
. ./venv/bin/activate
pip install fastapi

简单的 Server 端代码:

# main.py
from typing import Union

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
async def read_root():
return {"Hello": "World"}

启动:

uvicorn main:app --reload

效果:

我们可以看出,FastAPI 的接口相比其它框架,写法只是多了个 async 关键字,async 定义接口是异步的。

单从返回结果中看不出来 FastAPI 与其它 Python 框架的区别。区别在于并发访问时,FastAPI 的服务器线程处理路由请求,如 http://127.0.0.1:8000/,如果遇到网络 IO,不再等待网络 IO,而是去处理其它请求,当网络 IO 完成时,再恢复继续执行,这个异步能力提升了对 IO 密集型任务的处理能力。

示例 2:显式网络异步 IO

再看另一个示例,在业务代码中,显式的发起异步网络请求,这个网络 IO,如同路由请求一样,FastAPI 也会异步的处理。

# main.py
from fastapi import FastAPI, HTTPException
import httpx

app = FastAPI()

# 异步 GET 请求示例
@app.get("/external-api")
async def call_external_api():
url = "https://jsonplaceholder.typicode.com/posts/1"
async with httpx.AsyncClient() as client:
response = await client.get(url)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail="Failed to fetch data")
return response.json()

如果想数据库 IO 异步,需要数据库驱动或 ORM 支持异步操作。

异步 IO

FastAPI 异步的核心实现是「异步 IO」,我们可以不用 FastAPI,直接使用异步 IO 来启动一个有异步处理能力的 Server。

import asyncio

from aiohttp import web

async def index(request):
await asyncio.sleep(1) # 模拟 I/O 操作
return web.Response(text='{"Hello": "World"}', content_type='application/json')

async def init(loop):
# 使用事件循环监控 web 请求
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
# 启动 server,事件循环监控处理 web 请求
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv

# 显式获取一个事件循环
loop = asyncio.get_event_loop()
# 启动事件循环
loop.run_until_complete(init(loop))
loop.run_forever()

启动这个示例, http://127.0.0.1:8000/ 返回结果跟示例 1 一样。

异步 IO 的底层实现原理是「协程」与「事件循环」。

协程

async def index(request):
await asyncio.sleep(1) # 模拟 I/O 操作
return web.Response(text='{"Hello": "World"}', content_type='application/json')

index 使用 async def 定义,代表它是一个协程。await 用于 I/O 操作前,告诉执行线程不用等待这个 IO 操作。正常函数的调用是通过栈实现,函数只能依次调用执行。而协程 (coroutine)是一种特殊的函数(不是协作的线程),它可以让线程在 await 标记处暂停执行,转而执行其它任务,当 IO 操作完全时,再继续执行。

我们看看多个协程并发执行的效果。

import asyncio
from datetime import datetime

async def coroutine3():
print(f"Coroutine 3 started at {datetime.now()}")
await asyncio.sleep(1) # 模拟 I/O 操作
print(f"Coroutine 3 finished at {datetime.now()}")

async def coroutine2():
print(f"Coroutine 2 started at {datetime.now()}")
await asyncio.sleep(1) # 模拟 I/O 操作
print(f"Coroutine 2 finished at {datetime.now()}")

async def coroutine1():
print(f"Coroutine 1 started at {datetime.now()}")
await asyncio.sleep(1) # 模拟 I/O 操作
print(f"Coroutine 1 finished at {datetime.now()}")

async def main():
print("Main started")

# 创建任务,让协程并发执行
task1 = asyncio.create_task(coroutine1())
task2 = asyncio.create_task(coroutine2())
task3 = asyncio.create_task(coroutine3())

# 等待所有任务完成
await task1
await task2
await task3

print("Main finished")

# 运行主协程
asyncio.run(main())

输出:

Main started
Coroutine 1 started at 2024-06-03 10:28:00.665051
Coroutine 2 started at 2024-06-03 10:28:00.665076
Coroutine 3 started at 2024-06-03 10:28:00.665087
Coroutine 1 finished at 2024-06-03 10:28:01.665265
Coroutine 2 finished at 2024-06-03 10:28:01.665310
Coroutine 3 finished at 2024-06-03 10:28:01.665319
Main finished

我们可以看出,线程没有依次执行 3 个任务,遇到 IO 操作时,转而执行了其它任务。等 IO 操作完成后,再接着执行。也可以看出线程 3 个协程基本是同时开启等待 I/O 操作的,所以最终执行完成时间基本相同。

虽然这里没有显式使用事件循环,但 asyncio.run 会隐式的使用使用事件循环。

生成器

协程是通过生成器实现的。生成器可以让函数执行暂停,也可以让函数执行恢复。也就是协程的特点。

def simple_generator():
print("First value")
yield 1
print("Second value")
yield 2
print("Third value")
yield 3

# simple_generator 是生成器函数, gen 是生成器
gen = simple_generator()

print(next(gen)) # 输出: First value \n 1
print(next(gen)) # 输出: Second value \n 2
print(next(gen)) # 输出: Third value \n 3

使用 next() 运行生成器时,遇到 yield 时,它将暂停,next() 再次运行时,会接着上次暂停的 yield 处继续运行。Python 3.5 以前的协程写法也是使用「注解」+ yeild,3.5 开始使用 async def + await

import asyncio
from datetime import datetime

@asyncio.coroutine
def my_coroutine():
print("Start coroutine", datetime.now())
# 异步调用 asyncio.sleep(1):
yield from asyncio.sleep(1)
print("End coroutine", datetime.now())

# 获取 EventLoop
loop = asyncio.get_event_loop()
# 执行 coroutine
loop.run_until_complete(my_coroutine())
loop.close()

生成器的运行暂停与恢复的特点,除了做协程,还可以做不少事,可以边循环边计算,存放算法,比如实现一个杨辉三角(每一行的两端都是 1,其他位置的数字是它上方两个数字之和)。

def yanghui_triangle():
row = [1]
while True:
yield row
new_row = [1] # 每一行的第一个元素总是 1
for i in range(1, len(row)):
new_row.append(row[i - 1] + row[i])
new_row.append(1) # 每一行的最后一个元素总是 1
row = new_row

# 生成并打印前 5 行杨辉三角
triangle = yanghui_triangle()
for _ in range(5):
print(next(triangle))

输出:

[1]
[1, 1]
[1, 2, 1]
[1, 3, 3, 1]
[1, 4, 6, 4, 1]

事件循环

协程执行可以暂停,那协程何时恢复执行,这就需要使用事件循环来告诉执行线程。

# 获取 EventLoop
loop = asyncio.get_event_loop()
# 事件循环执行 coroutine
loop.run_until_complete(my_coroutine())
loop.close()

事件循环是使用 IO 多路复用技术,一直循环监控协程可以继续执行的事件,当可以执行时,线程继续执行协程。

IO 多路复用技术

通俗理解 IO 多路复用:我是快递站点老板,我不用主动问每个快递员的任务完成情况,而是快递员自己完成任务后,主动通过我。这提高了我处理任务的能力,我这个老板可以做更多的事。

+--------+  +--------+  +--------+
|快递员 A|-->|快递员 B|-->|快递员 C|
|任务完成| |任务完成| |任务完成|
+--------+ +--------+ +--------+
\ | /
\ | /
\ | /
\ | /
\ | /
\ | /
\ | /
\ | /
\| /
V
+-------+
|等待通知|
+-------+
|
V
+-------+
|处理任务|
+-------+

select、poll、epoll 都能实现 IO 多路复用,相比 select、poll,epoll 的性能更好。Linux 一般默认使用 epoll,macOS 使用 kqueue,类似于 epoll,跟 epoll 性能差不多。

Socket 服务器使用事件循环

import selectors
import socket

# 创建一个 selectors 对象,相于 epoll 的实现,Linux 中运行的话
sel = selectors.DefaultSelector()

# 请求接收事件处理函数。接受新的连接并注册读事件
def accept(sock, mask):
conn, addr = sock.accept() # 接受连接
print('Accepted connection from', addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read) # 注册读取事件

# 请求读取事件处理函数。读取请求数据并发送 HTTP 响应,之后关闭连接。
def read(conn, mask):
data = conn.recv(100) # 从连接中读取数据
print('response to')
response = "HTTP/1.1 200 OK\r\n" \
"Content-Type: application/json\r\n" \
"Content-Length: 18\r\n" \
"Connection: close\r\n" \
"\r\n" \
"{\"Hello\": \"World\"}"
conn.send(response.encode()) # 回显数据
print('Closing connection')
sel.unregister(conn) # 注销事件
conn.close() # 关闭连接

# 创建一个服务器 socket
sock = socket.socket()
sock.bind(('localhost', 8000))
sock.listen()
sock.setblocking(False)

# 注册 accept 事件
sel.register(sock, selectors.EVENT_READ, accept)

print("Server is running on port 8000...")

# 事件循环
while True:
# 当没有请求时,这里会阻塞
events = sel.select() # 选择已准备好的文件描述符(事件)
print("events length: ", len(events))
for key, mask in events:
callback = key.data # 获取事件处理函数
print("事件函数名:", callback.__name__)
callback(key.fileobj, mask) # 调用事件处理函数

启动服务器 Socket,监控指定端口。如果运行在 Linux 系统上,selectors 默认使用 epoll 作为其实现。代码使用 epoll 注册一个请求接收事件(accept 事件)。当有新的请求到来时,epoll 会触发并执行该事件处理函数,同时注册一个读取事件(read 事件)用于处理和响应请求数据。

当 WEB 端使用 http://127.0.0.1:8000/ 访问时,返回结果与示例 1 一样,Server 运行日志:

Server is running on port 8000...
events length: 1
事件函数名: accept
Accepted connection from ('127.0.0.1', 60941)
events length: 1
事件函数名: read
response to
Closing connection

Socket 服务器

直接使用 Socket 启动一个 Server,浏览器使用 http://127.0.0.1:8080/ 访问或使用 curl http://127.0.0.1:8080/ 返回 {"Hello": "World"}

import socket
from datetime import datetime

# 创建一个 TCP socket
server_socket = socket.socket()

# 将 socket 绑定到指定的IP地址和端口号
server_socket.bind(('127.0.0.1', 8001))

# 开始监听传入连接
server_socket.listen(5)

# 循环接受客户端连接
while True:
print("%s Waiting for a connection..." % datetime.now())
client_socket, addr = server_socket.accept() # 这里会阻塞,等待客户端连接
print(f"{datetime.now()} Got connection from {addr}")

# 接收客户端数据
data = client_socket.recv(1024)
print(f"Received: {data.decode()}")

# 发送响应数据
response = "HTTP/1.1 200 OK\r\n" \
"Content-Type: application/json\r\n" \
"Content-Length: 18\r\n" \
"Connection: close\r\n" \
"\r\n" \
"{\"Hello\": \"World\"}"

client_socket.sendall(response.encode())

# 关闭客户端套接字
client_socket.close()

curl http://127.0.0.1:8001/ 访问,Server 运行日志:

2024-06-03 09:53:36.711732 Waiting for a connection...
2024-06-03 09:54:30.715928 Got connection from ('127.0.0.1', 64361)
Received: GET / HTTP/1.1
Host: 127.0.0.1:8001
User-Agent: curl/8.4.0
Accept: */*


2024-06-03 09:54:30.716046 Waiting for a connection...

一句话总结

异步 IO 底层使用「协程」与「事件循环」实现。「协程」保证当线程执行时遇到标记的 IO 操作时,可以不用等待 IO 完成,而是暂停,让线程可以执行其他任务,不阻塞线程。「事件循环」使用 IO 多路复用技术,一直循环监控 IO 事件,当某个 IO 事件完成时,触发对应的回调,使协程继续执行。

评论默认使用 ,你也可以切换到 来留言。