Python异步爬虫实战:aiohttp并发采集与验证码异步处理完整教程
前言爬虫效率是每个数据工程师都关心的问题。当你需要采集上万个页面时同步请求一个一个排队等待的方式实在太慢了。Python的asyncio aiohttp组合可以让你的爬虫速度提升10-50倍而且代码改动并不大。本文将从零开始讲解异步爬虫的原理和实战包括并发控制、错误处理、以及如何在异步流程中处理验证码。同步 vs 异步为什么差这么多同步爬虫的瓶颈importrequestsimporttime urls[fhttps://httpbin.org/delay/1for_inrange(10)]starttime.time()forurlinurls:resprequests.get(url)print(f{resp.status_code},end )print(f\n同步耗时:{time.time()-start:.1f}秒)# 输出: 同步耗时: 10.3秒 (每个请求等1秒串行排队)问题很明显每个请求都在等网络IO返回CPU其实在空闲。异步爬虫的优势importaiohttpimportasyncioimporttimeasyncdeffetch(session,url):asyncwithsession.get(url)asresp:returnresp.statusasyncdefmain():urls[fhttps://httpbin.org/delay/1for_inrange(10)]asyncwithaiohttp.ClientSession()assession:tasks[fetch(session,url)forurlinurls]resultsawaitasyncio.gather(*tasks)print(f状态码:{results})starttime.time()asyncio.run(main())print(f异步耗时:{time.time()-start:.1f}秒)# 输出: 异步耗时: 1.2秒 (10个请求并发几乎同时完成)10个请求从10秒变成1秒这就是异步的威力。aiohttp基础安装pipinstallaiohttpSession管理importaiohttpimportasyncioasyncdefmain():# 创建session复用TCP连接性能更好asyncwithaiohttp.ClientSession()assession:# GET请求asyncwithsession.get(https://httpbin.org/get)asresp:dataawaitresp.json()print(data)# POST请求asyncwithsession.post(https://httpbin.org/post,json{key:value})asresp:dataawaitresp.json()print(data)# 自定义请求头headers{User-Agent:Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/124.0.0.0,Accept-Language:zh-CN,zh;q0.9,en;q0.8,}asyncwithsession.get(https://httpbin.org/headers,headersheaders)asresp:print(awaitresp.json())asyncio.run(main())超时和代理importaiohttpimportasyncioasyncdefmain():# 设置超时timeoutaiohttp.ClientTimeout(total30,connect10)# 设置代理proxyhttp://user:passproxy:8080asyncwithaiohttp.ClientSession(timeouttimeout)assession:asyncwithsession.get(https://httpbin.org/ip,proxyproxy)asresp:print(awaitresp.json())asyncio.run(main())并发控制Semaphore不加限制地并发会导致目标服务器拒绝连接甚至把自己的IP封了。用Semaphore控制并发数importaiohttpimportasyncioasyncdeffetch_with_limit(sem,session,url):asyncwithsem:# 信号量控制并发try:asyncwithsession.get(url)asresp:textawaitresp.text()return{url:url,status:resp.status,length:len(text)}exceptExceptionase:return{url:url,error:str(e)}asyncdefmain():urls[fhttps://httpbin.org/get?page{i}foriinrange(100)]semasyncio.Semaphore(10)# 最多同时10个请求connectoraiohttp.TCPConnector(limit20)# TCP连接池上限asyncwithaiohttp.ClientSession(connectorconnector)assession:tasks[fetch_with_limit(sem,session,url)forurlinurls]resultsawaitasyncio.gather(*tasks)success[rforrinresultsiferrornotinr]failed[rforrinresultsiferrorinr]print(f成功:{len(success)}, 失败:{len(failed)})asyncio.run(main())实战异步爬虫完整模板importaiohttpimportasynciofromdataclassesimportdataclassfromtypingimportOptionalimportloggingimportrandom logging.basicConfig(levellogging.INFO)loggerlogging.getLogger(__name__)dataclassclassScrapedItem:url:strtitle:strcontent:strstatus:intclassAsyncScraper:def__init__(self,concurrency10,delay_range(0.5,2.0),proxyNone):self.concurrencyconcurrency self.delay_rangedelay_range self.proxyproxy self.semasyncio.Semaphore(concurrency)self.results[]self.errors[]asyncdeffetch_page(self,session,url):asyncwithself.sem:# 随机延迟避免请求过于规律awaitasyncio.sleep(random.uniform(*self.delay_range))try:asyncwithsession.get(url,proxyself.proxy)asresp:ifresp.status200:htmlawaitresp.text()itemself.parse(url,html,resp.status)self.results.append(item)logger.info(f[OK]{url})returnitemelifresp.status403:logger.warning(f[403]{url}- 可能需要处理验证码)self.errors.append({url:url,status:403})else:logger.warning(f[{resp.status}]{url})self.errors.append({url:url,status:resp.status})exceptasyncio.TimeoutError:logger.error(f[TIMEOUT]{url})self.errors.append({url:url,error:timeout})exceptExceptionase:logger.error(f[ERROR]{url}:{e})self.errors.append({url:url,error:str(e)})defparse(self,url,html,status):# 替换为你的解析逻辑fromhtml.parserimportHTMLParser titleclassTitleParser(HTMLParser):defhandle_starttag(self,tag,attrs):nonlocaltitle self._in_titletagtitledefhandle_data(self,data):nonlocaltitleifgetattr(self,_in_title,False):titledata self._in_titleFalseparserTitleParser()parser.feed(html[:5000])returnScrapedItem(urlurl,titletitle,contenthtml[:500],statusstatus)asyncdefrun(self,urls):timeoutaiohttp.ClientTimeout(total30)connectoraiohttp.TCPConnector(limitself.concurrency*2)headers{User-Agent:Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/124.0.0.0,Accept:text/html,application/xhtmlxml,application/xml;q0.9,*/*;q0.8,Accept-Language:zh-CN,zh;q0.9,en;q0.8,}asyncwithaiohttp.ClientSession(timeouttimeout,connectorconnector,headersheaders)assession:tasks[self.fetch_page(session,url)forurlinurls]awaitasyncio.gather(*tasks,return_exceptionsTrue)logger.info(f完成:{len(self.results)}成功,{len(self.errors)}失败)returnself.results# 使用asyncdefmain():scraperAsyncScraper(concurrency5,delay_range(1.0,3.0))urls[fhttps://httpbin.org/get?id{i}foriinrange(50)]resultsawaitscraper.run(urls)foriteminresults[:3]:print(f{item.title}-{item.url})asyncio.run(main())异步流程中处理验证码异步爬虫遇到验证码时不能阻塞整个事件循环。正确做法是将验证码解决也异步化importaiohttpimportasynciofrompassxapiimportAsyncPassXAPIclassAsyncScraperWithCaptcha:def__init__(self,captcha_api_key,concurrency10):self.semasyncio.Semaphore(concurrency)self.solverAsyncPassXAPI(api_keycaptcha_api_key)asyncdeffetch_with_captcha(self,session,url):asyncwithself.sem:asyncwithsession.get(url)asresp:htmlawaitresp.text()# 检测验证码ifdata-sitekeyinhtml:tokenawaitself._solve_captcha(html,url)iftoken:asyncwithsession.post(url,data{cf-turnstile-response:token,g-recaptcha-response:token,})asretry_resp:returnawaitretry_resp.text()returnhtmlasyncdef_solve_captcha(self,html,url):importrematchre.search(rdata-sitekey([^]),html)ifnotmatch:returnNonesitekeymatch.group(1)ifcf-turnstileinhtml:resultawaitself.solver.solve_turnstile(sitekeysitekey,urlurl)elifh-captchainhtml:resultawaitself.solver.solve_hcaptcha(sitekeysitekey,urlurl)else:resultawaitself.solver.solve_recaptcha(sitekeysitekey,urlurl)returnresult.get(token)asyncdefrun(self,urls):asyncwithaiohttp.ClientSession()assession:tasks[self.fetch_with_captcha(session,url)forurlinurls]returnawaitasyncio.gather(*tasks,return_exceptionsTrue)# 使用asyncdefmain():scraperAsyncScraperWithCaptcha(captcha_api_keyyour_passxapi_key,concurrency10)urls[https://protected-site.com/page/1,https://protected-site.com/page/2]resultsawaitscraper.run(urls)asyncio.run(main())生产者-消费者模式对于大规模爬取推荐使用asyncio.Queue实现生产者-消费者模式importasyncioimportaiohttpasyncdefproducer(queue,urls):forurlinurls:awaitqueue.put(url)# 发送结束信号for_inrange(5):# worker数量awaitqueue.put(None)asyncdefconsumer(queue,session,results,worker_id):whileTrue:urlawaitqueue.get()ifurlisNone:breaktry:asyncwithsession.get(url)asresp:dataawaitresp.text()results.append({url:url,length:len(data)})print(f[Worker-{worker_id}]{url}-{len(data)}bytes)exceptExceptionase:print(f[Worker-{worker_id}] Error:{url}-{e})queue.task_done()asyncdefmain():urls[fhttps://httpbin.org/get?id{i}foriinrange(30)]queueasyncio.Queue(maxsize20)results[]asyncwithaiohttp.ClientSession()assession:# 启动1个生产者 5个消费者producer_taskasyncio.create_task(producer(queue,urls))workers[asyncio.create_task(consumer(queue,session,results,i))foriinrange(5)]awaitproducer_taskawaitasyncio.gather(*workers)print(f总计采集:{len(results)}个页面)asyncio.run(main())性能优化技巧1. 连接池复用# 配置TCP连接池connectoraiohttp.TCPConnector(limit100,# 总连接数上限limit_per_host10,# 单个域名连接数ttl_dns_cache300,# DNS缓存5分钟enable_cleanup_closedTrue,)### 2. 流式读取大文件pythonasyncdefdownload_file(session,url,filepath):asyncwithsession.get(url)asresp:withopen(filepath,wb)asf:asyncforchunkinresp.content.iter_chunked(8192):f.write(chunk)### 3. 优雅关闭pythonimportsignalasyncdefgraceful_shutdown(scraper):print(收到退出信号正在优雅关闭...)scraper.runningFalse# 等待当前任务完成awaitasyncio.sleep(2)loopasyncio.get_event_loop()loop.add_signal_handler(signal.SIGINT,lambda:asyncio.create_task(graceful_shutdown(scraper)))常见坑不要在async函数中使用time.sleep会阻塞整个事件循环用await asyncio.sleep()Session要复用每个请求创建新Session浪费TCP连接并发不是越多越好过高并发会触发反爬建议5-20异常要捕获一个任务的异常不应该影响其他任务总结异步爬虫是提升采集效率的最有效手段aiohttp asyncio可以轻松实现10倍以上的速度提升用Semaphore控制并发数避免被封IP验证码解决也要异步化不能阻塞事件循环生产者-消费者模式适合大规模采集场景验证码异步解决方案可以参考passxapi-python提供AsyncPassXAPI异步客户端完美融入asyncio工作流。觉得有帮助请点赞收藏有问题欢迎评论区讨论。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2459120.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!