36. 协程和异步
36. 协程和异步
协程是用于异步编程的可恢复计算。他们让 Python 代码暂停在await点,将控制权返回到事件循环,然后在等待的操作有结果时恢复。
协程类似于生成器,因为两者都在挂起期间保留执行状态。区别在于协议和目的。
生成器向迭代器使用者产生值。
协程等待其他异步操作并最终返回一个最终结果。python id="o2u8bi" async def fetch(): data = await read() return data 呼唤fetch()不运行主体完成。它创建一个协程对象。```python id="9i5ai1"
coro = fetch()
## 36.1 协程函数与协程对象
安`async def`语句创建一个协程函数。
调用协程函数会创建一个协程对象。```python id="rt8ky9"
async def work():
return 42
coro = work()
```从概念上讲:```text id="q71j65"
work
coroutine function object
work()
coroutine object
suspended execution state
code object
frame or frame-like state
```这与普通函数不同:```python id="nv1t8j"
def work():
return 42
work()
```普通函数调用立即运行并返回`42`。
异步函数调用返回一个稍后必须驱动的协程对象。
## 36.2`await`
`await`暂停当前协程,直到另一个等待完成。```python id="wjue72"
async def outer():
value = await inner()
return value + 1
```表达式:```python id="pbwts5"
await inner()
```从概念上讲,做了几件事:```text id="n2ycfi"
call inner()
obtain awaitable object
suspend current coroutine
let event loop drive awaitable
resume current coroutine with result
```如果引发等待的操作,则异常将被注入回等待的协程中。```python id="yc56f3"
async def outer():
try:
value = await inner()
except ValueError:
value = 0
return value
```## 36.3 等待事项`await`适用于可等待的对象。
常见的等待事项包括:```text id="o2vwvk"
coroutine objects
asyncio Task objects
asyncio Future objects
objects implementing __await__
```自定义对象可以通过定义来等待`__await__`。```python id="f4sd9q"
class Immediate:
def __await__(self):
yield
return 42
```在实践中,大多数应用程序代码等待异步框架创建的协程、任务和 future。
## 36.4 协程状态
协程可以处于类似于生成器状态的状态:```text id="qcur3i"
created
running
suspended
closed
```使用`inspect`:
```python id="84zov8"
import inspect
async def work():
await other()
coro = work()
print(inspect.getcoroutinestate(coro))
```常见的状态包括:```text id="h0yb3u"
CORO_CREATED
CORO_RUNNING
CORO_SUSPENDED
CORO_CLOSED
```协程不能同时被多个消费者等待。它代表一次执行。
## 36.5 协程框架
挂起的协程保持其执行状态。
该状态包括:```text id="o61wj7"
code object
instruction position
local variables
value stack
exception state
currently awaited object
running state
closed state
```例子:```python id="1tow3u"
async def process():
data = await read()
return transform(data)
```暂停期间`await read()`,协程必须记住:```text id="985lzm"
current frame
local variables
await target
next instruction after await
```什么时候`read()`完成后,协程恢复并将返回值分配给`data`。
## 36.6 协程使用相同的框架模型
协程不使用单独的解释器。它们由相同的 CPython 评估循环执行。
循环运行直到协程:```text id="p7w1wv"
awaits and suspends
returns
raises
is cancelled
```从概念上讲:```text id="xzr16h"
coroutine object
frame state
bytecode
stack
locals
event loop
resumes coroutine
evaluation loop runs frame
stops at await or completion
```事件循环进行调度。 CPython 提供了可恢复执行机制。
## 36.7 事件循环
事件循环协调异步任务。
在`asyncio`,事件循环管理:```text id="n7xdfh"
ready tasks
sleep timers
socket readiness
future completion
callbacks
cancellation
exception reporting
```例子:```python id="0chlb7"
import asyncio
async def main():
await asyncio.sleep(1)
return 42
result = asyncio.run(main())
print(result)
asyncio.run创建并驱动事件循环,直到main()完成。
CPython 本身提供了协程对象,并且await语义。asyncio提供一种标准事件循环实现。
36.8 任务
单独的协程对象是被动的。任务安排它执行。```python id="8et1tq" import asyncio
async def work(): await asyncio.sleep(1) return 42
async def main():
task = asyncio.create_task(work())
result = await task
return result
从概念上讲:text id="p93t67"
coroutine object
passive resumable computation
task event-loop managed wrapper resumes coroutine stores result or exception supports cancellation
## 36.9 合作并发
异步 Python 使用协作并发。
协程运行直到到达等待点。```python id="8ux1ur"
async def work():
step1()
await wait()
step2()
```期间`step1()`,该事件循环上没有其他任务运行,除非`step1()`本身等待或返回控制权。
在`await wait()`,协程挂起,事件循环可以运行另一个任务。
这意味着协程内的 CPU 密集型代码可能会阻塞事件循环:```python id="g8fwp9"
async def bad():
while True:
compute()
```没有`await`,该协程不会产生控制。
## 36.10 异步不是并行
异步并发并不自动意味着 CPU 并行。
一个事件循环通常在一个操作系统线程中运行。它仅在暂停点在任务之间切换。
异步适用于:```text id="oell4m"
network I/O
many open connections
timeouts
sleeping
waiting for subprocesses
streaming protocols
cooperative task orchestration
```它不会使受 CPU 限制的 Python 字节码并行运行。
对于 CPU 密集型工作,请使用:```text id="r5575k"
native extensions
multiprocessing
process pools
thread pools for blocking calls
free-threaded builds where appropriate
external workers
```## 36.11 等待协程
鉴于:```python id="h5auqh"
async def inner():
return 10
async def outer():
x = await inner()
return x + 1
```流程是:```text id="fi924o"
outer starts
outer calls inner(), producing inner coroutine
outer awaits inner coroutine
event loop drives inner
inner returns 10
outer resumes with x = 10
outer returns 11
```返回值穿过等待边界回到挂起的框架中。
## 36.12 等待未来
未来代表着可能尚不存在的结果。```python id="6xf2w8"
future = loop.create_future()
```协程可以等待它:```python id="qze0h0"
value = await future
```如果 future 不完整,协程就会挂起。
之后:```python id="nrkg2x"
future.set_result(42)
```事件循环将等待任务标记为就绪。等待的协程恢复`42`。
如果未来有例外:```python id="xzr05h"
future.set_exception(ValueError("bad"))
```等待的协程通过在等待点引发异常来恢复。
## 36.13 等待任务
还有一个任务正在等待。```python id="w7s6x8"
task = asyncio.create_task(work())
result = await task
```如果任务正常完成的话`await task`返回其结果。
如果任务提出来,`await task`引发同样的异常。
如果任务被取消,`await task`提高`CancelledError`。
## 36.14 取消
取消要求任务停止。```python id="j5ce0q"
task.cancel()
```在`asyncio`,取消是通过注入传递的`CancelledError`在等待点进入协程。```python id="lczbrw"
async def work():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
cleanup()
raise
```协程可能会捕获取消以进行清理,但通常应该重新引发它,除非它故意抑制取消。
取消是合作的。从不等待的协程可能不会立即观察到取消。
## 36.15 清理`try/finally`使用`try/finally`用于异步清理。```python id="j83mx6"
async def work():
resource = await acquire()
try:
await use(resource)
finally:
await release(resource)
```这`finally`块运行于:```text id="m2ocv7"
normal completion
exception
cancellation
```如果清理本身正在等待,协程可能会在清理期间挂起。
这使得取消和清理变得微妙。协程可以处于取消状态,同时仍在执行终结代码。
## 36.16 异步上下文管理器
异步上下文管理器使用`async with`。```python id="v2e2wy"
async with lock:
await work()
```该对象必须提供:```text id="5scc5n"
__aenter__
__aexit__
```从概念上讲:```python id="dkm7oc"
mgr = lock
value = await mgr.__aenter__()
try:
await work()
except BaseException as exc:
suppress = await mgr.__aexit__(type(exc), exc, exc.__traceback__)
if not suppress:
raise
else:
await mgr.__aexit__(None, None, None)
```异步上下文管理器允许暂停资源获取和释放。
## 36.17 异步迭代器
异步迭代使用`async for`。```python id="hvh27p"
async for item in stream:
process(item)
```该对象必须提供:```text id="u37g80"
__aiter__
__anext__
```从概念上讲:```python id="z1gd9g"
ait = stream.__aiter__()
while True:
try:
item = await ait.__anext__()
except StopAsyncIteration:
break
process(item)
```异步迭代对于每个下一项可能需要 I/O 的流很有用。
## 36.18 异步生成器
异步生成器定义为`async def`和`yield`。```python id="65vpyy"
async def lines(reader):
async for line in reader:
yield line.strip()
```它产生一个异步迭代器。
使用:```python id="5lhj9g"
async for line in lines(reader):
print(line)
```异步生成器既可以等待也可以让出。```python id="8c9u5e"
async def gen():
await asyncio.sleep(1)
yield 1
```它们将协程悬挂与生成器式价值生产相结合。
## 36.19`anext`
`anext`从异步迭代器中检索下一个项目。```python id="vs0dl0"
item = await anext(ait)
```这等待着`ait.__anext__()`。
如果异步迭代器耗尽,它会引发`StopAsyncIteration`。
可以提供默认值:```python id="8d97ug"
item = await anext(ait, default)
```## 36.20`StopAsyncIteration`异步迭代器发出完成信号`StopAsyncIteration`。
这与`StopIteration`。```text id="prf78x"
normal iterator ends with StopIteration
async iterator ends with StopAsyncIteration
async for捕获物StopAsyncIteration退出循环。
提高StopIteration来自异步迭代是不正确的。
36.21 协程字节码
异步函数编译为标记为协程代码的代码对象。```python id="m36blp" async def f(): return 1
带有await的异步函数:```python id="vb752s"
async def f():
value = await g()
return value
```从概念上讲,字节码包括:```text id="aw12fn"
call g
get awaitable iterator
send into awaitable
suspend if not done
resume with result
store value
return value
```确切的字节码因 Python 版本而异。模型仍然是:`await`被实现为可恢复帧执行。
## 36.22`SEND`并等待
现代 CPython 字节码使用类似发送的机制来驱动等待。
从概念上讲:```text id="c07p30"
await awaitable:
get awaitable iterator
send None or value
if it yields:
suspend current coroutine
if it returns:
resume with result
if it raises:
propagate exception
```这与生成器委托有关。协程基于将值发送到挂起计算的相同想法。
## 36.23 协程返回
协程返回一个最终值。```python id="11vkrn"
async def f():
return 42
```等待时:```python id="q0irxd"
value = await f()
```等待的协程接收到`42`。
在内部,完成是通过可等待协议表示的。在用户层面,`await`将协程完成转换为普通的表达式结果。
## 36.24 协程异常
如果协程引发,则等待它引发。```python id="ufon8f"
async def f():
raise ValueError("bad")
async def main():
await f()
```异常传播到`main`在等待点。
你可以抓住它:```python id="vi414s"
async def main():
try:
await f()
except ValueError:
return 0
```回溯包括异步调用链中涉及的协程帧。
## 36.25 协程警告
创建协程但从不等待它通常是一个错误。```python id="j9rvzc"
async def f():
return 1
f()
```这将创建一个协程对象并丢弃它。
Python 可能会警告:```text id="e1rugz"
RuntimeWarning: coroutine was never awaited
```身体从来没有跑过。
正确用法:```python id="5e8f4p"
await f()
```或者:```python id="daxsm1"
asyncio.create_task(f())
```## 36.26`asyncio.run`
`asyncio.run`是运行异步主函数的标准入口点。```python id="45lxp3"
import asyncio
async def main():
await asyncio.sleep(1)
return 42
print(asyncio.run(main()))
```它创建一个事件循环,运行协程直至完成,处理最终的异步生成器清理,并关闭循环。
您通常在程序的顶层调用它一次。
## 36.27 在异步代码中阻止调用
阻塞调用会停止事件循环。
坏的:```python id="upggpj"
import time
async def main():
time.sleep(5)
```期间`time.sleep`,事件循环不能运行其他任务。
更好的:```python id="j76gyt"
import asyncio
async def main():
await asyncio.sleep(5)
```对于无法异步的阻塞函数,请使用执行器或线程帮助器:```python id="bo70ck"
result = await asyncio.to_thread(blocking_function, arg)
```这使事件循环保持响应。
## 36.28 任务组
结构化并发组相关任务。```python id="jrw68m"
import asyncio
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(fetch_one())
tg.create_task(fetch_two())
```任务组等待其子任务。如果一项任务失败,该组会协调取消并根据需要引发分组异常。
这比创建任务而不跟踪它们更安全。
## 36.29 异步代码中的异常组
并发任务可能会一起失败。
任务组可以使用异常组报告多个故障。```python id="75j6zk"
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(a())
tg.create_task(b())
except* ValueError as group:
handle_value_errors(group)
except*可以按类型拆分分组的异常。
这对于异步系统很重要,因为在相同的结构化操作期间独立任务可能会失败。
36.30 异步生成器和清理
异步生成器需要异步清理。python id="6lt8y0" async def gen(): try: yield 1 finally: await cleanup() 如果异步生成器未耗尽,则应将其关闭aclose()。```python id="hupibv"
agen = gen()
item = await anext(agen)
await agen.aclose()
## 36.31 异步生成器方法
异步生成器支持与生成器类似的方法,但可等待:
|方法|意义|
|---|---|
|`__anext__()`|异步获取下一项 |
|`asend(value)`|将值发送到异步生成器 |
|`athrow(exc)`|将异常抛出到异步生成器 |
|`aclose()`|关闭异步生成器 |
这些方法返回等待对象。
例子:```python id="iuoltd"
value = await agen.__anext__()
await agen.aclose()
```## 36.32 协程和参考生命周期
挂起的协程使局部变量保持活动状态。```python id="3yoa52"
async def work():
data = bytearray(100_000_000)
await asyncio.sleep(10)
return len(data)
```睡觉时,`data`保持活动状态,因为协程可以恢复并使用它。
保留链:```text id="fhhddc"
task
coroutine
suspended frame
local data
```如果可能的话,请尽快清除大量本地人:```python id="7hxthq"
async def work():
data = bytearray(100_000_000)
result = process(data)
data = None
await asyncio.sleep(10)
return result
```## 36.33 任务生命周期
任务使其协程保持活动状态。```python id="4ffgq1"
task = asyncio.create_task(work())
```在任务完成或取消并最终确定之前,它会引用协程及其帧状态。
如果您创建任务并失去对它们的跟踪,则可能会稍后记录异常,并且资源保持活动状态的时间可能会比预期长。
任务组等结构化模式有助于控制生命周期。
## 36.34 异步堆栈跟踪
异步堆栈跟踪包括挂起的协程链。
当异常跨越时`await`边界,回溯显示它在哪里被提出以及在哪里等待。
例子:```python id="uyj5ih"
async def a():
await b()
async def b():
await c()
async def c():
1 / 0
```回溯可以显示异步链`a`到`b`到`c`。
这是可能的,因为协程框架保留代码对象和指令位置信息。
## 36.35`contextvars`异步代码经常使用`contextvars`对于上下文本地状态。```python id="ulzcon"
from contextvars import ContextVar
request_id = ContextVar("request_id")
async def handle():
print(request_id.get())
```与线程本地存储不同,上下文变量旨在跨异步任务切换正常工作。
每个任务都可以带有自己的上下文。
这对于以下方面很重要:```text id="ucdnv4"
request IDs
tracing
logging context
database transaction context
auth state
locale settings
```## 36.36 异步和 GIL
异步代码不会删除 GIL。
在传统的 CPython 中:```text id="vow7l4"
one thread executes Python bytecode at a time per interpreter
async tasks switch cooperatively at await points
```异步代码可以有效地处理许多 I/O 密集型任务,因为大部分时间都花在等待外部事件上。
它不会使 CPU 密集型 Python 循环并行执行。
## 36.37 异步与线程
异步和线程解决不同的问题。
|特色|异步任务 |主题 |
|---|---|---|
|日程安排 |合作|通过操作系统和解释器检查抢先|
|开关点|`await`|线程调度和GIL切换|
|最适合 |许多 I/O 任务 |阻止 API,与同步代码集成 |
|共享内存|通常相同的线程 |跨线程共享 |
|种族风险|更低但仍有可能|更高 |
|传统 CPython 中的 CPU 并行性 |没有 | Python 字节码受 GIL 限制 |
异步代码对于暂停是明确的。线程可以在不太明显的点切换。
## 36.38 异步与生成器
协程和生成器共享可恢复执行,但协议不同。
|特色|发电机|协程 |
|---|---|---|
|定义|`def`和`yield` | `async def`|
|生产 |多个产生值 |最终结果之一|
|消费者 |`for`, `next`, `send` | `await`, 事件循环 |
|完成 |`StopIteration`|等待结果或异常 |
|主要用途|惰性迭代 |异步 I/O 协调 |
|可以等待|没有 |是的 |
|可以产生简单的值 |是的 |否,异步生成器除外 |
协程不是迭代器。除非经过修改,否则生成器不能直接等待。
## 36.39 最小协程调度程序
玩具调度程序可以展示核心思想。```python id="zytwib"
from collections import deque
class Sleep:
def __await__(self):
yield self
return None
async def task(name):
print(name, "start")
await Sleep()
print(name, "end")
def run(coros):
ready = deque(c.__await__() for c in coros)
while ready:
aw = ready.popleft()
try:
next(aw)
except StopIteration:
continue
ready.append(aw)
run([task("a"), task("b")])
```这不是`asyncio`。它忽略真实的 I/O、计时器、未来、取消、异常和任务状态。但它显示了关键机制:```text id="6z9pem"
coroutine runs
await yields control
scheduler later resumes it
```## 36.40 常见误解
|误会 |正确型号 |
|---|---|
|调用异步函数来运行它 |它创建一个协程对象 |
|`await`开始一个新线程 |它暂停当前的协程,直到等待完成为止 |
|异步使 CPU 代码并行 |主要帮助协作I/O并发 |
|协程可以重复等待 |一个协程对象代表一次执行 |
|`asyncio.create_task`是一样的`await`|它同时安排;`await`等待完成 |
|阻塞调用在异步代码中是没问题的 |它们阻塞了事件循环 |
|取消会立即终止代码 |它协同注入异常 |
|异步使用不同的解释器 |它使用 CPython 框架和评估循环 |
## 36.41 阅读策略
从以下开始:```python id="bltl80"
async def f():
return 1
```然后:```python id="zf4zrn"
async def g():
value = await f()
return value + 1
```检查:```python id="60r9ah"
import dis
import inspect
print(inspect.iscoroutinefunction(f))
coro = f()
print(inspect.getcoroutinestate(coro))
dis.dis(g)
```然后学习:```text id="gpxkbm"
await
async with
async for
async generators
tasks
cancellation
TaskGroup
contextvars
blocking calls inside async code
```对于每种情况,跟踪:```text id="xhs827"
when the coroutine object is created
when the body starts
where it suspends
what object it awaits
how it resumes
what happens on exception or cancellation
when its frame is cleared
```## 36.42 章节总结
协程是用于异步编程的可恢复计算。一个`async def`call 创建一个协程对象。协程主体仅在等待或计划时运行。在每个`await`,协程可以挂起,保留其帧状态,然后使用值或异常恢复。
核心模型是:```text id="r29f1z"
async function call
↓
create coroutine object
↓
event loop or await resumes coroutine
↓
run until await, return, raise, or cancellation
↓
await suspends and yields control
↓
resume later with result or exception
```异步执行与普通执行建立在相同的 CPython 基础之上:代码对象、帧、字节码、异常和对象协议。事件循环提供调度; CPython 提供可恢复执行。