Python 异步编程完全指南(二):深入 asyncio 核心概念
Python 异步编程完全指南二深入 asyncio 核心概念系列导航入门篇 → [核心概念篇] → 实战案例篇 → 高级技巧篇 → 避坑指南篇前言上一篇我们学习了异步编程的基础知识。本篇将深入 asyncio 的核心概念帮你建立完整的知识体系。一、事件循环 (Event Loop)事件循环是异步编程的心脏负责调度和执行协程。1.1 事件循环工作原理┌────────────────────────────────────────────────────────────────┐ │ 事件循环工作原理 │ ├────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ 协程 A │ │ 协程 B │ │ 协程 C │ │ │ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌───────────────────────────────────────────────────┐ │ │ │ 任务队列 (Task Queue) │ │ │ └───────────────────────┬───────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────────────────────────────────────────┐ │ │ │ 事件循环 (Event Loop) │ │ │ │ • 检查就绪的任务 │ │ │ │ • 执行任务直到遇到 await │ │ │ │ • 切换到下一个就绪任务 │ │ │ │ • 处理 I/O 事件 │ │ │ └───────────────────────────────────────────────────┘ │ │ │ └────────────────────────────────────────────────────────────────┘简单理解事件循环就像一个调度员不断检查哪些任务可以执行执行到需要等待时遇到 await就切换去执行其他任务。1.2 事件循环的基本用法importasyncioasyncdefmain():print(Hello, asyncio!)# 方式 1推荐Python 3.7asyncio.run(main())# 方式 2手动管理更多控制loopasyncio.new_event_loop()asyncio.set_event_loop(loop)try:loop.run_until_complete(main())finally:loop.close()# 方式 3获取当前运行的循环在协程内部使用asyncdefinside_coroutine():loopasyncio.get_running_loop()print(f当前循环:{loop})1.3 事件循环的生命周期importasyncioasyncdeftask(name):print(f任务{name}开始)awaitasyncio.sleep(1)print(f任务{name}结束)asyncdefmain():# 获取当前事件循环loopasyncio.get_running_loop()print(f事件循环状态: running{loop.is_running()})awaittask(A)# asyncio.run() 会# 1. 创建新的事件循环# 2. 运行传入的协程# 3. 关闭事件循环asyncio.run(main())二、协程 (Coroutine)协程是可以暂停和恢复的函数是异步编程的基本单元。2.1 协程的三种状态创建 ──────► 挂起 ──────► 完成 │ ▲ │ │ │ │ └───────────┘ │ 等待 (await) │ ▼ 返回结果2.2 协程对象详解importasyncioasyncdefmy_coroutine():print(协程执行中)awaitasyncio.sleep(1)return完成# 调用协程函数返回协程对象coromy_coroutine()print(f类型:{type(coro)})# class coroutine# 协程对象必须被 await 或作为 Task 运行asyncdefmain():resultawaitcoroprint(f结果:{result})asyncio.run(main())2.3 协程 vs 普通函数特性普通函数协程函数定义方式def func()async def func()调用结果直接执行返回结果返回协程对象执行方式同步执行需要 await 或事件循环可暂停否是在 await 处三、Task任务Task 是对协程的包装用于并发执行多个协程。3.1 创建 Taskimportasyncioasyncdefworker(name,delay):print(fWorker{name}开始)awaitasyncio.sleep(delay)print(fWorker{name}完成)returnfresult_{name}asyncdefmain():# 创建 Task会立即开始调度task1asyncio.create_task(worker(A,2))task2asyncio.create_task(worker(B,1))task3asyncio.create_task(worker(C,3))print(所有任务已创建)# 等待所有任务完成result1awaittask1 result2awaittask2 result3awaittask3print(f结果:{result1},{result2},{result3})asyncio.run(main())输出所有任务已创建 Worker A 开始 Worker B 开始 Worker C 开始 Worker B 完成 Worker A 完成 Worker C 完成 结果: result_A, result_B, result_C3.2 Task 的属性和方法importasyncioasyncdefmy_task():awaitasyncio.sleep(2)returndoneasyncdefmain():taskasyncio.create_task(my_task(),nameMyTask)# 任务属性print(f任务名称:{task.get_name()})# MyTaskprint(f是否完成:{task.done()})# Falseprint(f是否取消:{task.cancelled()})# False# 等待完成awaittaskprint(f是否完成:{task.done()})# Trueprint(f结果:{task.result()})# doneasyncio.run(main())3.3 协程执行流程图┌─────────────────────────────────────────────────────────────────────┐ │ 协程执行流程 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ async def fetch(): 事件循环调度器 │ │ print(start) ┌─────────────┐ │ │ await sleep(1) ──────────►│ 暂停 fetch │ │ │ print(end) │ 执行其他任务 │ │ │ return data │ sleep完成后 │ │ │ ◄──────────│ 恢复 fetch │ │ │ └─────────────┘ │ │ │ │ 时间线: │ │ ───────────────────────────────────────────────────────────► │ │ │ print │ 其他任务执行 │ print │ │ │ │ start │ (1秒等待) │ end │ │ │ │ └─────────────────────────────────────────────────────────────────────┘四、并发控制gather vs wait vs TaskGroup4.1asyncio.gather()- 最常用importasyncioasyncdeftask(id,delay):awaitasyncio.sleep(delay)returnftask_{id}asyncdefmain():# 并发执行按传入顺序返回结果resultsawaitasyncio.gather(task(1,2),task(2,1),task(3,3),)print(results)# [task_1, task_2, task_3]asyncio.run(main())特点按传入顺序返回结果不是完成顺序任一任务失败会中断所有任务除非设置return_exceptionsTrue4.2 异常处理asyncdefrisky_task(id):ifid2:raiseValueError(f任务{id}出错!)awaitasyncio.sleep(1)returnftask_{id}asyncdefmain():# 不处理异常一个失败gather 就抛出异常# results await asyncio.gather(# risky_task(1),# risky_task(2), # 这个会失败# risky_task(3),# )# 处理异常异常作为结果返回不中断其他任务resultsawaitasyncio.gather(risky_task(1),risky_task(2),# 返回 ValueError 对象risky_task(3),return_exceptionsTrue)fori,resultinenumerate(results,1):ifisinstance(result,Exception):print(f任务{i}失败:{result})else:print(f任务{i}成功:{result})asyncio.run(main())4.3asyncio.wait()- 更多控制importasyncioasyncdeftask(id,delay):awaitasyncio.sleep(delay)returnftask_{id}asyncdefmain():tasks[asyncio.create_task(task(1,2)),asyncio.create_task(task(2,1)),asyncio.create_task(task(3,3)),]# 等待第一个完成done,pendingawaitasyncio.wait(tasks,return_whenasyncio.FIRST_COMPLETED)print(f完成:{len(done)}, 待处理:{len(pending)})fortaskindone:print(f结果:{task.result()})# 可以选择取消剩余任务fortaskinpending:task.cancel()asyncio.run(main())return_when参数FIRST_COMPLETED- 第一个完成时返回FIRST_EXCEPTION- 第一个异常时返回ALL_COMPLETED- 全部完成时返回默认4.4asyncio.TaskGroup- Python 3.11 推荐importasyncioasyncdeftask(id,delay):awaitasyncio.sleep(delay)ifid2:raiseValueError(f任务{id}失败)returnftask_{id}asyncdefmain():try:asyncwithasyncio.TaskGroup()astg:task1tg.create_task(task(1,1))task2tg.create_task(task(2,2))# 这个会失败task3tg.create_task(task(3,1))# TaskGroup 自动等待所有任务完成print(task1.result(),task3.result())except*ValueErroraseg:# Python 3.11 的 ExceptionGroup 处理forexcineg.exceptions:print(f捕获异常:{exc})asyncio.run(main())TaskGroup 优势结构化并发自动管理任务生命周期任何任务失败自动取消其他任务更清晰的异常处理4.5 三种方式对比特性gatherwaitTaskGroupPython 版本3.43.43.11返回格式结果列表(done, pending) 集合无直接返回异常处理return_exceptions手动处理ExceptionGroup取消传播手动手动自动推荐场景简单并发需要细粒度控制现代 Python五、Future 对象Future 是一个低级的可等待对象表示异步操作的最终结果。5.1 Future 基础importasyncioasyncdefmain():loopasyncio.get_running_loop()# 创建 Futurefutureloop.create_future()# 模拟异步设置结果asyncdefset_result():awaitasyncio.sleep(1)future.set_result(Hello from Future!)# 启动设置结果的任务asyncio.create_task(set_result())# 等待 Future 完成resultawaitfutureprint(result)asyncio.run(main())5.2 Task vs Future┌─────────────────────────────────────────────────────────────┐ │ Task 与 Future 的关系 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ Future │ │ ├── 表示异步操作的最终结果 │ │ ├── 低级 API很少直接使用 │ │ └── 可以手动设置结果 │ │ │ │ Task (继承自 Future) │ │ ├── 包装协程的 Future │ │ ├── 高级 API常用 │ │ └── 结果由协程返回值自动设置 │ │ │ │ 简单理解Task Future 协程执行器 │ │ │ └─────────────────────────────────────────────────────────────┘六、本篇小结本篇我们深入学习了✅事件循环异步编程的心脏负责调度协程✅协程可暂停和恢复的函数通过async def定义✅Task协程的包装用于并发执行✅gather/wait/TaskGroup三种并发控制方式及其区别✅Future低级可等待对象Task 的基类核心要点速查# 1. 运行协程asyncio.run(main())# 2. 创建任务taskasyncio.create_task(coroutine())# 3. 并发执行resultsawaitasyncio.gather(coro1(),coro2(),coro3())# 4. 获取当前循环loopasyncio.get_running_loop()下篇预告在下一篇实战案例篇中我们将通过 4 个完整项目实战批量下载图片异步 API 客户端实时数据处理管道WebSocket 实时通信系列导航入门篇 → [核心概念篇] → 实战案例篇 → 高级技巧篇 → 避坑指南篇如果这篇文章对你有帮助欢迎点赞、收藏、关注有问题欢迎评论区讨论。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2413939.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!