当你的代码卡住了:聊聊Python里的“假同步真异步”
小李今天差点把电脑砸了。他写了一个爬虫要从一万个网站上抓数据。代码很简单请求网址、解析内容、存进数据库。跑了十分钟才抓了三百个。他打开任务管理器一看CPU占用率才5%网络流量几乎为零。“我这电脑是i9啊怎么就这水平”问题出在哪他的代码老老实实一个一个等请求发出去等服务器响应等数据传回来然后再发下一个。每个请求耗时0.5秒一万个就是5000秒一个多小时。但CPU大部分时间都在闲着因为它在等网络。这就是典型的I/O密集型任务。代码在等但CPU不干活。小李心想能不能让它不等发出去十个请求谁先回来就处理谁当然能。这就是“异步”。但问题是他的代码是同步写的。改写成异步几十个函数都得动一堆库要换想想就头大。于是他想知道有没有办法让同步代码“假装”在异步执行先搞清楚同步和异步到底差在哪举个例子。你点了三份外卖。同步的做法是站在第一家店门口等拿到第一份再去第二家等拿到第二份再去第三家等。第二家店如果忙你就干等着。全程啥也干不了。异步的做法是三家店都下单然后回家坐着。谁做好了给你打电话你去拿。中间你可以看电视、打游戏、甚至再点一份。在代码里“等”通常就是I/O操作——读文件、发HTTP请求、查数据库、等用户输入。这些操作的特点是慢但不太占CPU。同步代码遇到I/O就卡住。异步代码遇到I/O就去干别的等I/O完成了再回来继续。那么问题来了同步代码怎么异步执行Python里有个很直接的办法扔进线程池。说白了就是开几个“小弟”每个小弟跑一个同步任务。主程序不用等继续干自己的。看个例子import time import requests from concurrent.futures import ThreadPoolExecutor # 这是一个同步函数请求一个网址 def fetch(url): print(f开始抓取 {url}) response requests.get(url) # 这里会等 print(f抓取完成 {url}) return response.status_code # 十个网址 urls [fhttps://httpbin.org/delay/{i} for i in range(1, 11)] # 同步执行一个一个等 start time.time() for url in urls: fetch(url) print(f同步耗时: {time.time() - start:.2f}秒) # 异步执行用线程池 start time.time() with ThreadPoolExecutor(max_workers5) as executor: results executor.map(fetch, urls) print(f线程池耗时: {time.time() - start:.2f}秒)跑一下你会发现同步版本大概20秒每个请求等2秒线程池版本只要4秒左右。神奇吗不神奇。就是开了5个线程每个线程处理两个请求同时等。但这里有个坑线程池适合I/O任务不适合CPU密集任务。你如果开10个线程做计算比如循环一亿次反而会因为线程切换开销变慢。有没有更轻量的办法有asyncio线程池虽然好用但每个线程要占内存大概8MB开多了扛不住。而且线程切换有开销。Python 3.4之后引入了asyncio它是真正的异步不靠线程靠一个叫“事件循环”的东西。但问题是asyncio要求你的函数必须是异步的——也就是说你要把requests换成aiohttp把time.sleep换成asyncio.sleep代码几乎要重写。那有没有办法让同步代码跑在asyncio里有。asyncio.to_thread。import asyncio import requests def sync_fetch(url): # 这是一个同步函数没法直接await return requests.get(url).status_code async def main(): urls [...] # 十个网址 # 把同步函数扔到线程池里跑但用异步的方式等待 tasks [asyncio.to_thread(sync_fetch, url) for url in urls] results await asyncio.gather(*tasks) print(results) asyncio.run(main())这个方法的本质还是线程池但写法更优雅可以和真正的异步代码混用。再深一层事件循环是怎么骗过你的如果你想知道“异步到底是怎么做到的”我们得聊聊事件循环。事件循环就像一个调度中心。它手里维护一个任务列表。每个任务要么在运行要么在等某个事情比如网络数据。当一个任务说“我在等”事件循环就把它挂起去执行下一个任务。等那个网络数据到了事件循环再把任务唤醒从刚才停下的地方继续。听起来复杂但Python的asyncio已经帮你封装好了。你只需要把函数写成async def里面用await表示“这里要等”。但问题是我们手头有大量同步代码不可能全改写成async def。有没有一个黑科技能把同步函数直接变成异步的有但不太完美。asyncio提供了一个loop.run_in_executor本质上还是线程池。真正的“把同步代码变成纯异步”是不可能的因为同步代码里如果有time.sleep(10)那就是实打实地阻塞线程谁也救不了你。实战给一个同步爬虫提速假设你写了一个爬虫大概是这样的def crawl_one(url): # 发请求 r requests.get(url) # 解析 soup BeautifulSoup(r.text, html.parser) # 提取数据 title soup.find(title).text # 存数据库 db.insert({url: url, title: title}) return title def crawl_all(urls): results [] for url in urls: results.append(crawl_one(url)) return results要提速最简单的改动from concurrent.futures import ThreadPoolExecutor, as_completed def crawl_all_parallel(urls, workers10): results [] with ThreadPoolExecutor(max_workersworkers) as executor: # 提交所有任务 future_to_url {executor.submit(crawl_one, url): url for url in urls} # 谁先完成就处理谁 for future in as_completed(future_to_url): url future_to_url[future] try: result future.result() results.append(result) print(f完成: {url}) except Exception as e: print(f失败: {url}, 错误: {e}) return results就这么几行改动速度提升接近workers倍受限于网络带宽和对方服务器的承受能力。但要注意如果你的crawl_one里用了数据库连接得确保数据库连接是线程安全的。很多数据库驱动不是这时候你可能需要每个线程单独创建连接。真正的异步怎么把同步库改成异步有时候你不得不面对一个现实你想用的库只有同步版本比如requests、pymysql、redis-py老版本。三个办法方法一线程池包装async def async_get(url): loop asyncio.get_running_loop() return await loop.run_in_executor(None, requests.get, url)这个None表示使用默认的线程池。简单但每个调用都会占用一个线程。方法二找异步替代品requests→aiohttp或httpx支持异步pymysql→aiomysqlredis-py→aredis或redis.asyncio新版自带open文件读写 →aiofiles改代码是麻烦但一旦改完性能提升显著而且不占线程。方法三用anyio或trio这两个库提供了更高级的抽象可以让同步代码在异步环境中运行得更自然。但学习曲线比较陡不推荐新手尝试。一个容易踩的坑假异步很多人写异步代码写着写着就变成这样了async def fetch(url): response requests.get(url) # 同步操作 return response.text async def main(): tasks [fetch(url) for url in urls] await asyncio.gather(*tasks)你猜怎么着完全没有提速。因为requests.get是同步阻塞的。当你await一个任务时这个任务如果内部阻塞了整个事件循环都会被卡住。记住一句话异步的传染性。一旦你用了async从调用链的根到叶子所有涉及I/O的地方都必须是异步的。中间混了一个同步阻塞调用整个异步就废了。检查方法很简单在代码里搜requests、time.sleep、open这些同步操作看它们是否出现在async函数里。有没有更激进的方案有但不太推荐方案一geventgevent是一个第三方库它通过“打补丁”的方式把Python标准库里的同步I/O操作比如socket、time.sleep偷偷替换成异步版本。你不需要写async/await代码看起来完全是同步的但实际是异步执行的。from gevent import monkey monkey.patch_all() # 这行会替换标准库 import requests # 现在requests是异步的了 from gevent.pool import Pool def fetch(url): return requests.get(url).status_code pool Pool(10) urls [...] results pool.map(fetch, urls)看起来很美好但问题也不少调试困难堆栈信息乱七八糟很多C扩展库不兼容已经慢慢过时了社区活跃度下降方案二curio或trio这两个是比asyncio更现代、更易用的异步库。但生态不如asyncio第三方支持少。实际项目里怎么选我见过很多团队纠结这个问题。给你一个决策树你的任务主要是I/O密集网络请求、文件读写、数据库查询→ 考虑异步或并发代码量小愿意重写→ 直接用aiohttpasyncio性能最好代码量大不想大改→ThreadPoolExecutor简单粗暴有效既要又要部分异步部分同步→asyncio.to_thread混用CPU密集型任务→ 别折腾异步了用multiprocessing多进程一个完整的例子混合方案假设你有这样一个需求从1000个API抓数据然后对每个数据做一次CPU密集计算比如图像处理。混合方案最合适import asyncio from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor import requests import time # CPU密集函数 def process_data(data): # 假设这里有复杂的计算 time.sleep(0.1) # 模拟计算 return data * 2 # I/O密集函数 def fetch_data(url): return requests.get(url).json() async def main(): urls [...] # 1000个URL # 用线程池处理I/O with ThreadPoolExecutor(max_workers50) as io_executor: loop asyncio.get_running_loop() fetch_tasks [ loop.run_in_executor(io_executor, fetch_data, url) for url in urls ] raw_data await asyncio.gather(*fetch_tasks) # 用进程池处理CPU密集任务 with ProcessPoolExecutor(max_workers8) as cpu_executor: process_tasks [ loop.run_in_executor(cpu_executor, process_data, data) for data in raw_data ] results await asyncio.gather(*process_tasks) return results asyncio.run(main())这个方案里网络请求并发50个不浪费带宽计算部分用多进程避开GIL整体用异步协调代码清晰总结一句话同步代码想异步执行最简单的就是线程池。想要更高效更优雅就用asyncio配合to_thread。但记住没有银弹真正的异步需要你从底层改起。回到小李的爬虫。他最后选择了ThreadPoolExecutor改了10行代码速度提升了8倍。虽然不完美但够用了。“够用就好”这四个字在工程里往往比“最优”更重要。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2543788.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!