阅读本文前先参考
https://blog.csdn.net/MinggeQingchun/article/details/145904572
在 Scrapy 中,扩展(Extensions)是一种插件,允许你添加额外的功能到你的爬虫项目中。这些扩展可以在项目的不同阶段执行,比如启动、关闭、处理请求、处理响应等。
Extensions官网文档:Extensions — Scrapy 2.12.0 documentation
Signals官网文档:
在 Scrapy 中,扩展是通过实现 scrapy.interfaces.ISpiderLoader
、scrapy.interfaces.IDownloaderMiddleware
、scrapy.interfaces.IExtension
等接口来定义的。最常用的扩展接口是 IExtension
。
一、创建和使用扩展
1、定义扩展
首先,定义一个扩展类,该类需要实现 scrapy.extensions.Extension
类。例如,创建一个简单的扩展来记录每个请求的 URL:
from scrapy import signals
class UrlLogExtension:
def __init__(self, stats):
self.stats = stats
@classmethod
def from_crawler(cls, crawler):
# 从爬虫设置中获取统计对象
stats = crawler.stats
ext = cls(stats)
crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
return ext
def spider_opened(self, spider):
self.stats.set_value('url_count', 0)
def spider_closed(self, spider):
url_count = self.stats.get_value('url_count')
print(f"Total URLs processed: {url_count}")
2、在 settings.py 中启用扩展
在Scrapy 项目的 settings.py
文件中,添加你的扩展到 EXTENSIONS
设置中:
EXTENSIONS = {
'path.to.your.extension.UrlLogExtension': 500, # 数字表示优先级,数字越小优先级越高
}
3、编写中间件或信号处理逻辑(如果需要)
如果你的扩展需要处理特定的信号(如请求、响应等),你可以在扩展的类中定义相应的方法,并通过 crawler.signals.connect
方法连接到这些信号。例如,在上面的 UrlLogExtension
中,我们连接了 spider_opened
和 spider_closed
信号。
内置扩展
Scrapy 提供了9个内置的扩展:
-
scrapy.extensions.corestats.CoreStats:scrapy核心数据统计
-
scrapy.extensions.telnet.TelnetConsole:scrapy运行时开启tcp服务,利用telnet进行连接查询scrapy的实时状态
-
scrapy.extensions.memusage.MemoryUsage:内存使用预警功能,不能在window上面使用
-
scrapy.extensions.memdebug.MemoryDebugger:开启gc,垃圾回收,然后统计对应的信息
-
scrapy.extensions.closespider.CloseSpider:主要功能是控制超时个数、page个数、item个数、错误次数
-
scrapy.extensions.feedexport.FeedExporter:将抓取的数据导出到文件。支持多种序列化格式(如JSON、CSV、XML等)和存储后端(如本地文件系统、FTP、S3等),使得用户可以根据需求将数据导出为所需的格式并保存到适当的存储介质中
-
scrapy.extensions.logstats.LogStats:主要统计page、item的个数等信息,从而计算频率。
-
scrapy.extensions.spiderstate.SpiderState:保存SpiderState信息
-
scrapy.extensions.throttle.AutoThrottle:自适应调整延迟下载时间
在Scrapy下的default_settings.py文件中
D:\xx\项目\env\Lib\site-packages\scrapy\settings\default_settings.py
EXTENSIONS = {}
EXTENSIONS_BASE = {
"scrapy.extensions.corestats.CoreStats": 0,
"scrapy.extensions.telnet.TelnetConsole": 0,
"scrapy.extensions.memusage.MemoryUsage": 0,
"scrapy.extensions.memdebug.MemoryDebugger": 0,
"scrapy.extensions.closespider.CloseSpider": 0,
"scrapy.extensions.feedexport.FeedExporter": 0,
"scrapy.extensions.logstats.LogStats": 0,
"scrapy.extensions.spiderstate.SpiderState": 0,
"scrapy.extensions.throttle.AutoThrottle": 0,
}
可以在
settings.py
中启用这些扩展,如:
EXTENSIONS = {
'scrapy.extensions.logstats.LogStats': 500, # 日志统计信息
'scrapy.extensions.telnet.TelnetConsole': 500, # Telnet 控制台
}
二、创建和使用扩展
1、scrapy.extensions.corestats.CoreStats
"""
Extension for collecting core stats like items scraped and start/finish times
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any
from scrapy import Spider, signals
if TYPE_CHECKING:
# typing.Self requires Python 3.11
from typing_extensions import Self
from scrapy.crawler import Crawler
from scrapy.statscollectors import StatsCollector
[docs]class CoreStats:
def __init__(self, stats: StatsCollector):
self.stats: StatsCollector = stats
self.start_time: datetime | None = None
@classmethod
def from_crawler(cls, crawler: Crawler) -> Self:
assert crawler.stats
o = cls(crawler.stats)
crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)
crawler.signals.connect(o.item_scraped, signal=signals.item_scraped)
crawler.signals.connect(o.item_dropped, signal=signals.item_dropped)
crawler.signals.connect(o.response_received, signal=signals.response_received)
return o
def spider_opened(self, spider: Spider) -> None:
self.start_time = datetime.now(tz=timezone.utc)
self.stats.set_value("start_time", self.start_time, spider=spider)
def spider_closed(self, spider: Spider, reason: str) -> None:
assert self.start_time is not None
finish_time = datetime.now(tz=timezone.utc)
elapsed_time = finish_time - self.start_time
elapsed_time_seconds = elapsed_time.total_seconds()
self.stats.set_value(
"elapsed_time_seconds", elapsed_time_seconds, spider=spider
)
self.stats.set_value("finish_time", finish_time, spider=spider)
self.stats.set_value("finish_reason", reason, spider=spider)
def item_scraped(self, item: Any, spider: Spider) -> None:
self.stats.inc_value("item_scraped_count", spider=spider)
def response_received(self, spider: Spider) -> None:
self.stats.inc_value("response_received_count", spider=spider)
def item_dropped(self, item: Any, spider: Spider, exception: BaseException) -> None:
reason = exception.__class__.__name__
self.stats.inc_value("item_dropped_count", spider=spider)
self.stats.inc_value(f"item_dropped_reasons_count/{reason}", spider=spider)
监听spider_opened、spider_closed、item_scraped、item_dropped、response_received信号,进行数据统计。
2、scrapy.extensions.telnet.TelnetConsole
"""
Scrapy Telnet Console extension
See documentation in docs/topics/telnetconsole.rst
"""
from __future__ import annotations
import binascii
import logging
import os
import pprint
from typing import TYPE_CHECKING, Any
from twisted.internet import protocol
from twisted.internet.tcp import Port
from scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.utils.decorators import defers
from scrapy.utils.engine import print_engine_status
from scrapy.utils.reactor import listen_tcp
from scrapy.utils.trackref import print_live_refs
if TYPE_CHECKING:
from twisted.conch import telnet
# typing.Self requires Python 3.11
from typing_extensions import Self
from scrapy.crawler import Crawler
logger = logging.getLogger(__name__)
# signal to update telnet variables
# args: telnet_vars
update_telnet_vars = object()
[docs]class TelnetConsole(protocol.ServerFactory):
def __init__(self, crawler: Crawler):
if not crawler.settings.getbool("TELNETCONSOLE_ENABLED"):
raise NotConfigured
self.crawler: Crawler = crawler
self.noisy: bool = False
self.portrange: list[int] = [
int(x) for x in crawler.settings.getlist("TELNETCONSOLE_PORT")
]
self.host: str = crawler.settings["TELNETCONSOLE_HOST"]
self.username: str = crawler.settings["TELNETCONSOLE_USERNAME"]
self.password: str = crawler.settings["TELNETCONSOLE_PASSWORD"]
if not self.password:
self.password = binascii.hexlify(os.urandom(8)).decode("utf8")
logger.info("Telnet Password: %s", self.password)
self.crawler.signals.connect(self.start_listening, signals.engine_started)
self.crawler.signals.connect(self.stop_listening, signals.engine_stopped)
@classmethod
def from_crawler(cls, crawler: Crawler) -> Self:
return cls(crawler)
def start_listening(self) -> None:
self.port: Port = listen_tcp(self.portrange, self.host, self)
h = self.port.getHost()
logger.info(
"Telnet console listening on %(host)s:%(port)d",
{"host": h.host, "port": h.port},
extra={"crawler": self.crawler},
)
def stop_listening(self) -> None:
self.port.stopListening()
def protocol(self) -> telnet.TelnetTransport: # type: ignore[override]
# these import twisted.internet.reactor
from twisted.conch import manhole, telnet
from twisted.conch.insults import insults
class Portal:
"""An implementation of IPortal"""
@defers
def login(self_, credentials, mind, *interfaces):
if not (
credentials.username == self.username.encode("utf8")
and credentials.checkPassword(self.password.encode("utf8"))
):
raise ValueError("Invalid credentials")
protocol = telnet.TelnetBootstrapProtocol(
insults.ServerProtocol, manhole.Manhole, self._get_telnet_vars()
)
return (interfaces[0], protocol, lambda: None)
return telnet.TelnetTransport(telnet.AuthenticatingTelnetProtocol, Portal())
def _get_telnet_vars(self) -> dict[str, Any]:
# Note: if you add entries here also update topics/telnetconsole.rst
assert self.crawler.engine
telnet_vars: dict[str, Any] = {
"engine": self.crawler.engine,
"spider": self.crawler.engine.spider,
"slot": self.crawler.engine.slot,
"crawler": self.crawler,
"extensions": self.crawler.extensions,
"stats": self.crawler.stats,
"settings": self.crawler.settings,
"est": lambda: print_engine_status(self.crawler.engine),
"p": pprint.pprint,
"prefs": print_live_refs,
"help": "This is Scrapy telnet console. For more info see: "
"https://docs.scrapy.org/en/latest/topics/telnetconsole.html",
}
self.crawler.signals.send_catch_log(update_telnet_vars, telnet_vars=telnet_vars)
return telnet_vars
通过telnet可以执行本地的变量有engine、spider、slot、crawler、extensions、stats、settings、est、p、prefs、help等。
3、scrapy.extensions.memusage.MemoryUsage 内存利用
"""
MemoryUsage extension
See documentation in docs/topics/extensions.rst
"""
from __future__ import annotations
import logging
import socket
import sys
from importlib import import_module
from pprint import pformat
from typing import TYPE_CHECKING
from twisted.internet import task
from scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.mail import MailSender
from scrapy.utils.engine import get_engine_status
if TYPE_CHECKING:
# typing.Self requires Python 3.11
from typing_extensions import Self
from scrapy.crawler import Crawler
logger = logging.getLogger(__name__)
[docs]class MemoryUsage:
def __init__(self, crawler: Crawler):
if not crawler.settings.getbool("MEMUSAGE_ENABLED"):
raise NotConfigured
try:
# stdlib's resource module is only available on unix platforms.
self.resource = import_module("resource")
except ImportError:
raise NotConfigured
self.crawler: Crawler = crawler
self.warned: bool = False
self.notify_mails: list[str] = crawler.settings.getlist("MEMUSAGE_NOTIFY_MAIL")
self.limit: int = crawler.settings.getint("MEMUSAGE_LIMIT_MB") * 1024 * 1024
self.warning: int = crawler.settings.getint("MEMUSAGE_WARNING_MB") * 1024 * 1024
self.check_interval: float = crawler.settings.getfloat(
"MEMUSAGE_CHECK_INTERVAL_SECONDS"
)
self.mail: MailSender = MailSender.from_crawler(crawler)
crawler.signals.connect(self.engine_started, signal=signals.engine_started)
crawler.signals.connect(self.engine_stopped, signal=signals.engine_stopped)
@classmethod
def from_crawler(cls, crawler: Crawler) -> Self:
return cls(crawler)
def get_virtual_size(self) -> int:
size: int = self.resource.getrusage(self.resource.RUSAGE_SELF).ru_maxrss
if sys.platform != "darwin":
# on macOS ru_maxrss is in bytes, on Linux it is in KB
size *= 1024
return size
def engine_started(self) -> None:
assert self.crawler.stats
self.crawler.stats.set_value("memusage/startup", self.get_virtual_size())
self.tasks: list[task.LoopingCall] = []
tsk = task.LoopingCall(self.update)
self.tasks.append(tsk)
tsk.start(self.check_interval, now=True)
if self.limit:
tsk = task.LoopingCall(self._check_limit)
self.tasks.append(tsk)
tsk.start(self.check_interval, now=True)
if self.warning:
tsk = task.LoopingCall(self._check_warning)
self.tasks.append(tsk)
tsk.start(self.check_interval, now=True)
def engine_stopped(self) -> None:
for tsk in self.tasks:
if tsk.running:
tsk.stop()
def update(self) -> None:
assert self.crawler.stats
self.crawler.stats.max_value("memusage/max", self.get_virtual_size())
def _check_limit(self) -> None:
assert self.crawler.engine
assert self.crawler.stats
peak_mem_usage = self.get_virtual_size()
if peak_mem_usage > self.limit:
self.crawler.stats.set_value("memusage/limit_reached", 1)
mem = self.limit / 1024 / 1024
logger.error(
"Memory usage exceeded %(memusage)dMiB. Shutting down Scrapy...",
{"memusage": mem},
extra={"crawler": self.crawler},
)
if self.notify_mails:
subj = (
f"{self.crawler.settings['BOT_NAME']} terminated: "
f"memory usage exceeded {mem}MiB at {socket.gethostname()}"
)
self._send_report(self.notify_mails, subj)
self.crawler.stats.set_value("memusage/limit_notified", 1)
if self.crawler.engine.spider is not None:
self.crawler.engine.close_spider(
self.crawler.engine.spider, "memusage_exceeded"
)
else:
self.crawler.stop()
else:
logger.info(
"Peak memory usage is %(virtualsize)dMiB",
{"virtualsize": peak_mem_usage / 1024 / 1024},
)
def _check_warning(self) -> None:
if self.warned: # warn only once
return
assert self.crawler.stats
if self.get_virtual_size() > self.warning:
self.crawler.stats.set_value("memusage/warning_reached", 1)
mem = self.warning / 1024 / 1024
logger.warning(
"Memory usage reached %(memusage)dMiB",
{"memusage": mem},
extra={"crawler": self.crawler},
)
if self.notify_mails:
subj = (
f"{self.crawler.settings['BOT_NAME']} warning: "
f"memory usage reached {mem}MiB at {socket.gethostname()}"
)
self._send_report(self.notify_mails, subj)
self.crawler.stats.set_value("memusage/warning_notified", 1)
self.warned = True
def _send_report(self, rcpts: list[str], subject: str) -> None:
"""send notification mail with some additional useful info"""
assert self.crawler.engine
assert self.crawler.stats
stats = self.crawler.stats
s = f"Memory usage at engine startup : {stats.get_value('memusage/startup') / 1024 / 1024}M\r\n"
s += f"Maximum memory usage : {stats.get_value('memusage/max') / 1024 / 1024}M\r\n"
s += f"Current memory usage : {self.get_virtual_size() / 1024 / 1024}M\r\n"
s += (
"ENGINE STATUS ------------------------------------------------------- \r\n"
)
s += "\r\n"
s += pformat(get_engine_status(self.crawler.engine))
s += "\r\n"
self.mail.send(rcpts, subject, s)
该功能执行需要部署在linux上,可以配置预警监控、发送预警邮件等,
配置预警邮件参数:
MAIL_HOST = 'localhost' # 邮件服务器
MAIL_PORT = 25 # 邮箱端口号
MAIL_FROM = 'scrapy@localhost' # 邮箱名称
MAIL_PASS = None # 邮箱密码
MAIL_USER = None # 邮箱地址配置预警监控的参数如下:
MEMUSAGE_CHECK_INTERVAL_SECONDS = 60.0 # 每60s检测一次
MEMUSAGE_ENABLED = True # 开启预警监控
MEMUSAGE_LIMIT_MB = 0 # 预警限制使用内存
MEMUSAGE_NOTIFY_MAIL = [] # 预警邮件接收邮箱
MEMUSAGE_WARNING_MB = 0 # 预警警告信息内存大小
当使用内存查过limit和waring内存时,会发送对应的邮件提醒。
4、scrapy.extensions.memdebug.MemoryDebugger
"""
MemoryDebugger extension
See documentation in docs/topics/extensions.rst
"""
from __future__ import annotations
import gc
from typing import TYPE_CHECKING
from scrapy import Spider, signals
from scrapy.exceptions import NotConfigured
from scrapy.utils.trackref import live_refs
if TYPE_CHECKING:
# typing.Self requires Python 3.11
from typing_extensions import Self
from scrapy.crawler import Crawler
from scrapy.statscollectors import StatsCollector
[docs]class MemoryDebugger:
def __init__(self, stats: StatsCollector):
self.stats: StatsCollector = stats
@classmethod
def from_crawler(cls, crawler: Crawler) -> Self:
if not crawler.settings.getbool("MEMDEBUG_ENABLED"):
raise NotConfigured
assert crawler.stats
o = cls(crawler.stats)
crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)
return o
def spider_closed(self, spider: Spider, reason: str) -> None:
gc.collect()
self.stats.set_value(
"memdebug/gc_garbage_count", len(gc.garbage), spider=spider
)
for cls, wdict in live_refs.items():
if not wdict:
continue
self.stats.set_value(
f"memdebug/live_refs/{cls.__name__}", len(wdict), spider=spider
)
参数
MEMDEBUG_ENABLED = False # enable memory debugging
MEMDEBUG_NOTIFY = [] # send memory debugging report by mail at engine shutdown
其中MEMDEBUG_NOTITY目前项目中未使用。
主要功能就是开启gc,垃圾回收,然后统计对应的信息。
5、scrapy.extensions.closespider.CloseSpider
"""CloseSpider is an extension that forces spiders to be closed after certain
conditions are met.
See documentation in docs/topics/extensions.rst
"""
from __future__ import annotations
import logging
from collections import defaultdict
from typing import TYPE_CHECKING, Any
from scrapy import Request, Spider, signals
from scrapy.exceptions import NotConfigured
if TYPE_CHECKING:
from twisted.python.failure import Failure
# typing.Self requires Python 3.11
from typing_extensions import Self
from scrapy.crawler import Crawler
from scrapy.http import Response
logger = logging.getLogger(__name__)
[docs]class CloseSpider:
def __init__(self, crawler: Crawler):
self.crawler: Crawler = crawler
self.close_on: dict[str, Any] = {
"timeout": crawler.settings.getfloat("CLOSESPIDER_TIMEOUT"),
"itemcount": crawler.settings.getint("CLOSESPIDER_ITEMCOUNT"),
"pagecount": crawler.settings.getint("CLOSESPIDER_PAGECOUNT"),
"errorcount": crawler.settings.getint("CLOSESPIDER_ERRORCOUNT"),
"timeout_no_item": crawler.settings.getint("CLOSESPIDER_TIMEOUT_NO_ITEM"),
"pagecount_no_item": crawler.settings.getint(
"CLOSESPIDER_PAGECOUNT_NO_ITEM"
),
}
if not any(self.close_on.values()):
raise NotConfigured
self.counter: defaultdict[str, int] = defaultdict(int)
if self.close_on.get("errorcount"):
crawler.signals.connect(self.error_count, signal=signals.spider_error)
if self.close_on.get("pagecount") or self.close_on.get("pagecount_no_item"):
crawler.signals.connect(self.page_count, signal=signals.response_received)
if self.close_on.get("timeout"):
crawler.signals.connect(self.spider_opened, signal=signals.spider_opened)
if self.close_on.get("itemcount") or self.close_on.get("pagecount_no_item"):
crawler.signals.connect(self.item_scraped, signal=signals.item_scraped)
if self.close_on.get("timeout_no_item"):
self.timeout_no_item: int = self.close_on["timeout_no_item"]
self.items_in_period: int = 0
crawler.signals.connect(
self.spider_opened_no_item, signal=signals.spider_opened
)
crawler.signals.connect(
self.item_scraped_no_item, signal=signals.item_scraped
)
crawler.signals.connect(self.spider_closed, signal=signals.spider_closed)
@classmethod
def from_crawler(cls, crawler: Crawler) -> Self:
return cls(crawler)
def error_count(self, failure: Failure, response: Response, spider: Spider) -> None:
self.counter["errorcount"] += 1
if self.counter["errorcount"] == self.close_on["errorcount"]:
assert self.crawler.engine
self.crawler.engine.close_spider(spider, "closespider_errorcount")
def page_count(self, response: Response, request: Request, spider: Spider) -> None:
self.counter["pagecount"] += 1
self.counter["pagecount_since_last_item"] += 1
if self.counter["pagecount"] == self.close_on["pagecount"]:
assert self.crawler.engine
self.crawler.engine.close_spider(spider, "closespider_pagecount")
return
if self.close_on["pagecount_no_item"] and (
self.counter["pagecount_since_last_item"]
>= self.close_on["pagecount_no_item"]
):
assert self.crawler.engine
self.crawler.engine.close_spider(spider, "closespider_pagecount_no_item")
def spider_opened(self, spider: Spider) -> None:
from twisted.internet import reactor
assert self.crawler.engine
self.task = reactor.callLater(
self.close_on["timeout"],
self.crawler.engine.close_spider,
spider,
reason="closespider_timeout",
)
def item_scraped(self, item: Any, spider: Spider) -> None:
self.counter["itemcount"] += 1
self.counter["pagecount_since_last_item"] = 0
if self.counter["itemcount"] == self.close_on["itemcount"]:
assert self.crawler.engine
self.crawler.engine.close_spider(spider, "closespider_itemcount")
def spider_closed(self, spider: Spider) -> None:
task = getattr(self, "task", None)
if task and task.active():
task.cancel()
task_no_item = getattr(self, "task_no_item", None)
if task_no_item and task_no_item.running:
task_no_item.stop()
def spider_opened_no_item(self, spider: Spider) -> None:
from twisted.internet import task
self.task_no_item = task.LoopingCall(self._count_items_produced, spider)
self.task_no_item.start(self.timeout_no_item, now=False)
logger.info(
f"Spider will stop when no items are produced after "
f"{self.timeout_no_item} seconds."
)
def item_scraped_no_item(self, item: Any, spider: Spider) -> None:
self.items_in_period += 1
def _count_items_produced(self, spider: Spider) -> None:
if self.items_in_period >= 1:
self.items_in_period = 0
else:
logger.info(
f"Closing spider since no items were produced in the last "
f"{self.timeout_no_item} seconds."
)
assert self.crawler.engine
self.crawler.engine.close_spider(spider, "closespider_timeout_no_item")
参数
CLOSESPIDER_TIMEOUT = 0 # download超时次数超过该数值时关系Spider
CLOSESPIDER_PAGECOUNT = 0 # download page个数超过该数值时关系Spider
CLOSESPIDER_ITEMCOUNT = 0 # pipeline item个数超过该数值时关系Spider
CLOSESPIDER_ERRORCOUNT = 0 # download 错误次数超过该数值时关系Spider
主要功能是控制超时个数、page个数、item个数、错误次数等。
6、scrapy.extensions.logstats.LogStats
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from twisted.internet import task
from scrapy import Spider, signals
from scrapy.exceptions import NotConfigured
if TYPE_CHECKING:
# typing.Self requires Python 3.11
from typing_extensions import Self
from scrapy.crawler import Crawler
from scrapy.statscollectors import StatsCollector
logger = logging.getLogger(__name__)
[docs]class LogStats:
"""Log basic scraping stats periodically like:
* RPM - Requests per Minute
* IPM - Items per Minute
"""
def __init__(self, stats: StatsCollector, interval: float = 60.0):
self.stats: StatsCollector = stats
self.interval: float = interval
self.multiplier: float = 60.0 / self.interval
self.task: task.LoopingCall | None = None
@classmethod
def from_crawler(cls, crawler: Crawler) -> Self:
interval: float = crawler.settings.getfloat("LOGSTATS_INTERVAL")
if not interval:
raise NotConfigured
assert crawler.stats
o = cls(crawler.stats, interval)
crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)
return o
def spider_opened(self, spider: Spider) -> None:
self.pagesprev: int = 0
self.itemsprev: int = 0
self.task = task.LoopingCall(self.log, spider)
self.task.start(self.interval)
def log(self, spider: Spider) -> None:
self.calculate_stats()
msg = (
"Crawled %(pages)d pages (at %(pagerate)d pages/min), "
"scraped %(items)d items (at %(itemrate)d items/min)"
)
log_args = {
"pages": self.pages,
"pagerate": self.prate,
"items": self.items,
"itemrate": self.irate,
}
logger.info(msg, log_args, extra={"spider": spider})
def calculate_stats(self) -> None:
self.items: int = self.stats.get_value("item_scraped_count", 0)
self.pages: int = self.stats.get_value("response_received_count", 0)
self.irate: float = (self.items - self.itemsprev) * self.multiplier
self.prate: float = (self.pages - self.pagesprev) * self.multiplier
self.pagesprev, self.itemsprev = self.pages, self.items
def spider_closed(self, spider: Spider, reason: str) -> None:
if self.task and self.task.running:
self.task.stop()
rpm_final, ipm_final = self.calculate_final_stats(spider)
self.stats.set_value("responses_per_minute", rpm_final)
self.stats.set_value("items_per_minute", ipm_final)
def calculate_final_stats(
self, spider: Spider
) -> tuple[None, None] | tuple[float, float]:
start_time = self.stats.get_value("start_time")
finished_time = self.stats.get_value("finished_time")
if not start_time or not finished_time:
return None, None
mins_elapsed = (finished_time - start_time).seconds / 60
items = self.stats.get_value("item_scraped_count", 0)
pages = self.stats.get_value("response_received_count", 0)
return (pages / mins_elapsed), (items / mins_elapsed)
参数
LOGSTATS_INTERVAL = 60.0 # 每60s统计一次数据 当为0时,则不进行统计
主要统计page、item的个数等信息,从而计算频率。
7、scrapy.extensions.spiderstate.SpiderState
from __future__ import annotations
import pickle # nosec
from pathlib import Path
from typing import TYPE_CHECKING
from scrapy import Spider, signals
from scrapy.exceptions import NotConfigured
from scrapy.utils.job import job_dir
if TYPE_CHECKING:
# typing.Self requires Python 3.11
from typing_extensions import Self
from scrapy.crawler import Crawler
[docs]class SpiderState:
"""Store and load spider state during a scraping job"""
def __init__(self, jobdir: str | None = None):
self.jobdir: str | None = jobdir
@classmethod
def from_crawler(cls, crawler: Crawler) -> Self:
jobdir = job_dir(crawler.settings)
if not jobdir:
raise NotConfigured
obj = cls(jobdir)
crawler.signals.connect(obj.spider_closed, signal=signals.spider_closed)
crawler.signals.connect(obj.spider_opened, signal=signals.spider_opened)
return obj
def spider_closed(self, spider: Spider) -> None:
if self.jobdir:
with Path(self.statefn).open("wb") as f:
assert hasattr(spider, "state") # set in spider_opened
pickle.dump(spider.state, f, protocol=4)
def spider_opened(self, spider: Spider) -> None:
if self.jobdir and Path(self.statefn).exists():
with Path(self.statefn).open("rb") as f:
spider.state = pickle.load(f) # type: ignore[attr-defined] # nosec
else:
spider.state = {} # type: ignore[attr-defined]
@property
def statefn(self) -> str:
assert self.jobdir
return str(Path(self.jobdir, "spider.state"))
参数
JOBDIR='' # 项目spider state保存地址
配置JOBDIR时,会自动创建文件夹然后保存spider state到文件夹内。默认是不配置的。
8、scrapy.extensions.throttle.AutoThrottle
class AutoThrottle:
def __init__(self, crawler):
self.crawler = crawler
if not crawler.settings.getbool('AUTOTHROTTLE_ENABLED'):
raise NotConfigured
self.debug = crawler.settings.getbool("AUTOTHROTTLE_DEBUG")
self.target_concurrency = crawler.settings.getfloat("AUTOTHROTTLE_TARGET_CONCURRENCY")
crawler.signals.connect(self._spider_opened, signal=signals.spider_opened)
crawler.signals.connect(self._response_downloaded, signal=signals.response_downloaded)
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def _spider_opened(self, spider):
self.mindelay = self._min_delay(spider)
self.maxdelay = self._max_delay(spider)
spider.download_delay = self._start_delay(spider)
def _min_delay(self, spider):
s = self.crawler.settings
return getattr(spider, 'download_delay', s.getfloat('DOWNLOAD_DELAY'))
def _max_delay(self, spider):
return self.crawler.settings.getfloat('AUTOTHROTTLE_MAX_DELAY')
def _start_delay(self, spider):
return max(self.mindelay, self.crawler.settings.getfloat('AUTOTHROTTLE_START_DELAY'))
def _response_downloaded(self, response, request, spider):
key, slot = self._get_slot(request, spider)
latency = request.meta.get('download_latency')
if latency is None or slot is None:
return
olddelay = slot.delay
self._adjust_delay(slot, latency, response)
if self.debug:
diff = slot.delay - olddelay
size = len(response.body)
conc = len(slot.transferring)
logger.info(
"slot: %(slot)s | conc:%(concurrency)2d | "
"delay:%(delay)5d ms (%(delaydiff)+d) | "
"latency:%(latency)5d ms | size:%(size)6d bytes",
{
'slot': key, 'concurrency': conc,
'delay': slot.delay * 1000, 'delaydiff': diff * 1000,
'latency': latency * 1000, 'size': size
},
extra={'spider': spider}
)
def _get_slot(self, request, spider):
key = request.meta.get('download_slot')
return key, self.crawler.engine.downloader.slots.get(key)
def _adjust_delay(self, slot, latency, response):
"""Define delay adjustment policy"""
# If a server needs `latency` seconds to respond then
# we should send a request each `latency/N` seconds
# to have N requests processed in parallel
target_delay = latency / self.target_concurrency
# Adjust the delay to make it closer to target_delay
new_delay = (slot.delay + target_delay) / 2.0
# If target delay is bigger than old delay, then use it instead of mean.
# It works better with problematic sites.
new_delay = max(target_delay, new_delay)
# Make sure self.mindelay <= new_delay <= self.max_delay
new_delay = min(max(self.mindelay, new_delay), self.maxdelay)
# Dont adjust delay if response status != 200 and new delay is smaller
# than old one, as error pages (and redirections) are usually small and
# so tend to reduce latency, thus provoking a positive feedback by
# reducing delay instead of increase.
if response.status != 200 and new_delay <= slot.delay:
return
slot.delay = new_delay
参数
AUTOTHROTTLE_ENABLED = False # 是否开启自适应下载延迟
AUTOTHROTTLE_DEBUG = False # 是否开启自适应DEBUG
AUTOTHROTTLE_MAX_DELAY = 60.0 # 最大延迟60s
AUTOTHROTTLE_START_DELAY = 5.0 # 开始延迟5s
AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0 # 自动调整精度为1s该功能默认不开启。
参考链接
Scrapy 源码分析 4 extensions middlewares详解_scrapy.extensions.logstats-CSDN博客