最近因为爬虫程序太多,想要为Python爬虫设计一个监控程序,主要功能包括一下几种:
1、监控爬虫的运行状态(是否在运行、运行时间等)
2、监控爬虫的性能(如请求频率、响应时间、错误率等)
3、资源使用情况(CPU、内存、网络等)
4、异常捕获与告警(当爬虫出现异常时能够及时通知)
要为Python爬虫创建一个监控程序,根据上面思路我们可以按照以下步骤实现,涵盖运行状态、性能指标、异常告警和可视化:
核心监控功能设计
-
运行状态监控
- 心跳检测:定期记录爬虫存活状态
- 进程检查:验证爬虫进程是否运行中
-
性能指标监控
- 请求统计:成功/失败请求计数
- 数据处理:已抓取/解析的项目数
- 资源使用:CPU/内存占用
- 时效指标:请求响应时间、运行时长
-
异常告警
- 错误捕获:网络异常、解析失败等
- 阈值告警:连续失败/资源超限
- 通知渠道:邮件/Slack/钉钉
-
数据持久化
- 存储日志:运行日志和错误日志
- 记录指标:时间序列数据库存储
实现方案代码示例
1. 基础监控类 (monitor.py)
import time
import logging
import psutil
from prometheus_client import start_http_server, Counter, Gauge, Histogram
class SpiderMonitor:
def __init__(self, spider_name):
self.spider_name = spider_name
self.start_time = time.time()
# 初始化监控指标
self.requests_total = Counter(f'{spider_name}_requests_total', 'Total requests')
self.requests_failed = Counter(f'{spider_name}_requests_failed', 'Failed requests')
self.items_scraped = Counter(f'{spider_name}_items_scraped', 'Items scraped')
self.memory_usage = Gauge(f'{spider_name}_memory_usage', 'Memory usage (MB)')
self.request_latency = Histogram(f'{spider_name}_request_latency', 'Request latency (seconds)')
# 启动指标服务器
start_http_server(8000)
logging.basicConfig(filename=f'{spider_name}.log', level=logging.INFO)
def record_request(self, success=True, latency=0):
self.requests_total.inc()
if not success:
self.requests_failed.inc()
if latency > 0:
self.request_latency.observe(latency)
def record_item(self, count=1):
self.items_scraped.inc(count)
def update_resources(self):
process = psutil.Process()
self.memory_usage.set(process.memory_info().rss / 1024 / 1024) # MB
def log_error(self, error):
logging.error(f"[{time.ctime()}] ERROR: {error}")
def uptime(self):
return time.time() - self.start_time
2. 爬虫集成示例 (my_spider.py)
import requests
from monitor import SpiderMonitor
class MySpider:
def __init__(self):
self.monitor = SpiderMonitor("my_spider")
self.session = requests.Session()
def crawl(self, url):
start = time.time()
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
# 处理数据
items = self.parse(response)
self.monitor.record_item(len(items))
self.monitor.record_request(success=True, latency=time.time()-start)
return items
except Exception as e:
self.monitor.record_request(success=False)
self.monitor.log_error(f"URL: {url} - Error: {str(e)}")
return []
def parse(self, response):
# 解析逻辑
return [{"data": "sample"}]
def run(self):
while True:
self.crawl("https://example.com/data")
self.monitor.update_resources()
time.sleep(5)
if __name__ == "__main__":
spider = MySpider()
spider.run()
3. 独立监控进程 (monitor_daemon.py)
import time
import subprocess
import smtplib
from email.mime.text import MIMEText
def check_heartbeat(spider_name):
"""检查最近15分钟是否有活动日志"""
try:
with open(f"{spider_name}.log") as f:
logs = f.readlines()[-100:]
return any(time.time() - get_log_time(line) < 900 for line in logs)
except FileNotFoundError:
return False
def get_log_time(log_line):
# 从日志行提取时间戳
timestamp_str = log_line.split("]")[0][1:]
return time.mktime(time.strptime(timestamp_str))
def send_alert(subject, message):
"""发送邮件告警"""
msg = MIMEText(message)
msg['Subject'] = f"[SPIDER ALERT] {subject}"
msg['From'] = 'monitor@example.com'
msg['To'] = 'admin@example.com'
with smtplib.SMTP('smtp.example.com') as server:
server.send_message(msg)
def monitor_daemon():
spider_name = "my_spider"
consecutive_failures = 0
while True:
if not check_heartbeat(spider_name):
consecutive_failures += 1
if consecutive_failures >= 3:
send_alert("Spider Down",
f"{spider_name} has been inactive for 45+ minutes")
else:
consecutive_failures = 0
time.sleep(300) # 每5分钟检查一次
if __name__ == "__main__":
monitor_daemon()
监控系统部署方案
-
指标可视化
- 使用Prometheus收集指标(默认端口8000)
- 配置Grafana仪表盘展示:
- 请求成功率 = (1 - requests_failed/requests_total) * 100
- 内存使用趋势图
- 最近1小时错误日志
-
告警配置
# Prometheus alert.rules groups: - name: spider_alerts rules: - alert: HighFailureRate expr: rate(my_spider_requests_failed[5m]) / rate(my_spider_requests_total[5m]) > 0.1 for: 10m labels: severity: critical annotations: description: "超过10%的请求失败"
-
进程管理
- 使用Supervisor管理进程:
[program:my_spider] command=python /path/to/my_spider.py autostart=true autorestart=true stderr_logfile=/var/log/spider.err.log
高级功能扩展
-
分布式监控
- 使用Redis共享监控数据:
import redis r = redis.Redis() r.incr('global_requests_count')
-
网页状态面板
# 添加Flask状态页 from flask import Flask app = Flask(__name__ @app.route('/status') def status(): return { "uptime": monitor.uptime(), "items": monitor.items_scraped._value.get() }
-
云服务集成
- 错误跟踪:Sentry
- 日志管理:ELK Stack
- 云监控:Datadog/Prometheus Cloud
监控仪表盘示例 (Grafana)
-
核心面板
- 请求成功率 (百分比)
- 每分钟请求量
- 内存/CPU使用曲线
- 最近错误列表
-
报警阈值
- 成功率 < 95% (警告)
- 内存 > 500MB (警告)
- 1小时无活动 (严重)
这种监控方案提供实时性能跟踪、自动告警和可视化展示,能有效提升爬虫的稳定性和可维护性。最终我们可根据实际需求调整监控粒度和告警阈值。如有任何疑问可以留言讨论。