claude生成的python多线程、异步代码示例,模拟20个网页的爬取,每个网页假设要0.5-2秒完成。
代码
Python多线程爬虫教程
核心概念
多线程:允许程序同时执行多个任务,提高IO密集型任务(如网络请求)的效率。
线程池:预先创建固定数量的线程,避免频繁创建销毁线程的开销。
1. 基础单线程版本
import time
import random
from urllib.parse import urljoin
def simulate_fetch(url):
"""模拟网络请求,随机延迟0.5-2秒"""
delay = random.uniform(0.5, 2.0)
time.sleep(delay)
return f"Data from {url} (took {delay:.2f}s)"
def single_thread_crawler(urls):
"""单线程爬虫"""
results = []
start_time = time.time()
for url in urls:
result = simulate_fetch(url)
results.append(result)
print(f"✓ {result}")
print(f"单线程总耗时: {time.time() - start_time:.2f}秒")
return results
2. 多线程版本(Thread类)
import threading
class CrawlerThread(threading.Thread):
def __init__(self, url, results, lock):
super().__init__()
self.url = url
self.results = results
self.lock = lock # 线程锁,保护共享资源
def run(self):
result = simulate_fetch(self.url)
# 使用锁保护共享数据
with self.lock:
self.results.append(result)
print(f"✓ {result}")
def multi_thread_crawler_basic(urls):
"""基础多线程爬虫"""
results = []
lock = threading.Lock()
threads = []
start_time = time.time()
# 创建并启动线程
for url in urls:
thread = CrawlerThread(url, results, lock)
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
print(f"多线程总耗时: {time.time() - start_time:.2f}秒")
return results
3. 线程池版本(推荐)
from concurrent.futures import ThreadPoolExecutor, as_completed
def multi_thread_crawler_pool(urls, max_workers=5):
"""使用线程池的多线程爬虫"""
results = []
start_time = time.time()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_url = {executor.submit(simulate_fetch, url): url
for url in urls}
# 获取结果
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
results.append(result)
print(f"✓ {result}")
except Exception as e:
print(f"✗ {url} failed: {e}")
print(f"线程池总耗时: {time.time() - start_time:.2f}秒")
return results
4. 异步版本(asyncio)
import asyncio
import aiohttp
from aiohttp import ClientSession
async def async_fetch(session, url):
"""异步模拟网络请求"""
delay = random.uniform(0.5, 2.0)
await asyncio.sleep(delay) # 异步等待
return f"Data from {url} (took {delay:.2f}s)"
async def async_crawler(urls, max_concurrent=5):
"""异步爬虫"""
results = []
start_time = time.time()
# 创建信号量限制并发数
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_semaphore(session, url):
async with semaphore:
return await async_fetch(session, url)
# 模拟session(实际使用中用aiohttp.ClientSession)
session = None
# 创建所有任务
tasks = [fetch_with_semaphore(session, url) for url in urls]
# 并发执行所有任务
for coro in asyncio.as_completed(tasks):
result = await coro
results.append(result)
print(f"✓ {result}")
print(f"异步总耗时: {time.time() - start_time:.2f}秒")
return results
def run_async_crawler(urls):
"""运行异步爬虫"""
return asyncio.run(async_crawler(urls))
5. 完整测试代码
def main():
# 测试数据:20个URL
urls = [f"https://example.com/page{i}" for i in range(1, 21)]
print("=" * 50)
print("单线程爬虫测试")
print("=" * 50)
single_thread_crawler(urls[:5]) # 测试5个URL避免等待太久
print("\n" + "=" * 50)
print("多线程爬虫测试(Thread类)")
print("=" * 50)
multi_thread_crawler_basic(urls[:10])
print("\n" + "=" * 50)
print("线程池爬虫测试(推荐)")
print("=" * 50)
multi_thread_crawler_pool(urls, max_workers=5)
print("\n" + "=" * 50)
print("异步爬虫测试")
print("=" * 50)
run_async_crawler(urls)
if __name__ == "__main__":
main()
关键知识点
线程锁(Lock)
lock = threading.Lock()
with lock: # 自动获取和释放锁
# 临界区代码
shared_data.append(result)
线程池优势
- 自动管理线程生命周期
- 限制并发数量,避免资源耗尽
- 异常处理更完善
- 代码更简洁
异步 vs 多线程
- 异步:单线程,通过事件循环处理IO等待
- 多线程:多个线程并行执行
- 选择:IO密集型优先考虑异步,CPU密集型考虑多线程
性能对比
在20个URL的测试中:
- 单线程:约20-40秒
- 多线程:约4-8秒
- 异步:约4-8秒
实际应用建议
- 简单场景:使用
ThreadPoolExecutor
- 大规模爬虫:使用
asyncio + aiohttp
- 混合任务:结合多线程和异步
- 注意事项:
- 控制并发数,避免被网站封禁
- 添加重试机制和异常处理
- 遵守robots.txt和网站使用条款