Python - 爬虫;Scrapy框架之插件Extensions(四)

news2025/6/7 3:27:24

阅读本文前先参考

https://blog.csdn.net/MinggeQingchun/article/details/145904572

在 Scrapy 中,扩展(Extensions)是一种插件,允许你添加额外的功能到你的爬虫项目中。这些扩展可以在项目的不同阶段执行,比如启动、关闭、处理请求、处理响应等。

Extensions官网文档:Extensions — Scrapy 2.12.0 documentation

Signals官网文档:

在 Scrapy 中,扩展是通过实现 scrapy.interfaces.ISpiderLoaderscrapy.interfaces.IDownloaderMiddlewarescrapy.interfaces.IExtension 等接口来定义的。最常用的扩展接口是 IExtension

一、创建和使用扩展

1、定义扩展

首先,定义一个扩展类,该类需要实现 scrapy.extensions.Extension 类。例如,创建一个简单的扩展来记录每个请求的 URL:

from scrapy import signals
 
class UrlLogExtension:
    def __init__(self, stats):
        self.stats = stats
 
    @classmethod
    def from_crawler(cls, crawler):
        # 从爬虫设置中获取统计对象
        stats = crawler.stats
        ext = cls(stats)
        crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
        crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
        return ext
 
    def spider_opened(self, spider):
        self.stats.set_value('url_count', 0)
 
    def spider_closed(self, spider):
        url_count = self.stats.get_value('url_count')
        print(f"Total URLs processed: {url_count}")
2、在 settings.py 中启用扩展

在Scrapy 项目的 settings.py 文件中,添加你的扩展到 EXTENSIONS 设置中:

EXTENSIONS = {
   'path.to.your.extension.UrlLogExtension': 500,  # 数字表示优先级,数字越小优先级越高
}
3、编写中间件或信号处理逻辑(如果需要)

如果你的扩展需要处理特定的信号(如请求、响应等),你可以在扩展的类中定义相应的方法,并通过 crawler.signals.connect 方法连接到这些信号。例如,在上面的 UrlLogExtension 中,我们连接了 spider_opened 和 spider_closed 信号。

内置扩展

Scrapy 提供了9个内置的扩展:

  • scrapy.extensions.corestats.CoreStats:scrapy核心数据统计

  • scrapy.extensions.telnet.TelnetConsole:scrapy运行时开启tcp服务,利用telnet进行连接查询scrapy的实时状态

  • scrapy.extensions.memusage.MemoryUsage:内存使用预警功能,不能在window上面使用

  • scrapy.extensions.memdebug.MemoryDebugger:开启gc,垃圾回收,然后统计对应的信息

  • scrapy.extensions.closespider.CloseSpider:主要功能是控制超时个数、page个数、item个数、错误次数

  • scrapy.extensions.feedexport.FeedExporter:将抓取的数据导出到文件。支持多种序列化格式(如JSON、CSV、XML等)和存储后端(如本地文件系统、FTP、S3等),使得用户可以根据需求将数据导出为所需的格式并保存到适当的存储介质中‌

  • scrapy.extensions.logstats.LogStats:主要统计page、item的个数等信息,从而计算频率。

  • scrapy.extensions.spiderstate.SpiderState:保存SpiderState信息

  • scrapy.extensions.throttle.AutoThrottle:自适应调整延迟下载时间

在Scrapy下的default_settings.py文件中

D:\xx\项目\env\Lib\site-packages\scrapy\settings\default_settings.py

EXTENSIONS = {}

EXTENSIONS_BASE = {
    "scrapy.extensions.corestats.CoreStats": 0,
    "scrapy.extensions.telnet.TelnetConsole": 0,
    "scrapy.extensions.memusage.MemoryUsage": 0,
    "scrapy.extensions.memdebug.MemoryDebugger": 0,
    "scrapy.extensions.closespider.CloseSpider": 0,
    "scrapy.extensions.feedexport.FeedExporter": 0,
    "scrapy.extensions.logstats.LogStats": 0,
    "scrapy.extensions.spiderstate.SpiderState": 0,
    "scrapy.extensions.throttle.AutoThrottle": 0,
}

可以在 settings.py 中启用这些扩展,如:

EXTENSIONS = {
    'scrapy.extensions.logstats.LogStats': 500, # 日志统计信息
    'scrapy.extensions.telnet.TelnetConsole': 500, # Telnet 控制台
}

二、创建和使用扩展 

1、scrapy.extensions.corestats.CoreStats

"""
Extension for collecting core stats like items scraped and start/finish times
"""

from __future__ import annotations

from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any

from scrapy import Spider, signals

if TYPE_CHECKING:
    # typing.Self requires Python 3.11
    from typing_extensions import Self

    from scrapy.crawler import Crawler
    from scrapy.statscollectors import StatsCollector


[docs]class CoreStats:
    def __init__(self, stats: StatsCollector):
        self.stats: StatsCollector = stats
        self.start_time: datetime | None = None

    @classmethod
    def from_crawler(cls, crawler: Crawler) -> Self:
        assert crawler.stats
        o = cls(crawler.stats)
        crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)
        crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)
        crawler.signals.connect(o.item_scraped, signal=signals.item_scraped)
        crawler.signals.connect(o.item_dropped, signal=signals.item_dropped)
        crawler.signals.connect(o.response_received, signal=signals.response_received)
        return o

    def spider_opened(self, spider: Spider) -> None:
        self.start_time = datetime.now(tz=timezone.utc)
        self.stats.set_value("start_time", self.start_time, spider=spider)

    def spider_closed(self, spider: Spider, reason: str) -> None:
        assert self.start_time is not None
        finish_time = datetime.now(tz=timezone.utc)
        elapsed_time = finish_time - self.start_time
        elapsed_time_seconds = elapsed_time.total_seconds()
        self.stats.set_value(
            "elapsed_time_seconds", elapsed_time_seconds, spider=spider
        )
        self.stats.set_value("finish_time", finish_time, spider=spider)
        self.stats.set_value("finish_reason", reason, spider=spider)

    def item_scraped(self, item: Any, spider: Spider) -> None:
        self.stats.inc_value("item_scraped_count", spider=spider)

    def response_received(self, spider: Spider) -> None:
        self.stats.inc_value("response_received_count", spider=spider)

    def item_dropped(self, item: Any, spider: Spider, exception: BaseException) -> None:
        reason = exception.__class__.__name__
        self.stats.inc_value("item_dropped_count", spider=spider)
        self.stats.inc_value(f"item_dropped_reasons_count/{reason}", spider=spider)

监听spider_opened、spider_closed、item_scraped、item_dropped、response_received信号,进行数据统计。

2、scrapy.extensions.telnet.TelnetConsole

"""
Scrapy Telnet Console extension

See documentation in docs/topics/telnetconsole.rst
"""

from __future__ import annotations

import binascii
import logging
import os
import pprint
from typing import TYPE_CHECKING, Any

from twisted.internet import protocol
from twisted.internet.tcp import Port

from scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.utils.decorators import defers
from scrapy.utils.engine import print_engine_status
from scrapy.utils.reactor import listen_tcp
from scrapy.utils.trackref import print_live_refs

if TYPE_CHECKING:
    from twisted.conch import telnet

    # typing.Self requires Python 3.11
    from typing_extensions import Self

    from scrapy.crawler import Crawler


logger = logging.getLogger(__name__)

# signal to update telnet variables
# args: telnet_vars
update_telnet_vars = object()


[docs]class TelnetConsole(protocol.ServerFactory):
    def __init__(self, crawler: Crawler):
        if not crawler.settings.getbool("TELNETCONSOLE_ENABLED"):
            raise NotConfigured

        self.crawler: Crawler = crawler
        self.noisy: bool = False
        self.portrange: list[int] = [
            int(x) for x in crawler.settings.getlist("TELNETCONSOLE_PORT")
        ]
        self.host: str = crawler.settings["TELNETCONSOLE_HOST"]
        self.username: str = crawler.settings["TELNETCONSOLE_USERNAME"]
        self.password: str = crawler.settings["TELNETCONSOLE_PASSWORD"]

        if not self.password:
            self.password = binascii.hexlify(os.urandom(8)).decode("utf8")
            logger.info("Telnet Password: %s", self.password)

        self.crawler.signals.connect(self.start_listening, signals.engine_started)
        self.crawler.signals.connect(self.stop_listening, signals.engine_stopped)

    @classmethod
    def from_crawler(cls, crawler: Crawler) -> Self:
        return cls(crawler)

    def start_listening(self) -> None:
        self.port: Port = listen_tcp(self.portrange, self.host, self)
        h = self.port.getHost()
        logger.info(
            "Telnet console listening on %(host)s:%(port)d",
            {"host": h.host, "port": h.port},
            extra={"crawler": self.crawler},
        )

    def stop_listening(self) -> None:
        self.port.stopListening()

    def protocol(self) -> telnet.TelnetTransport:  # type: ignore[override]
        # these import twisted.internet.reactor
        from twisted.conch import manhole, telnet
        from twisted.conch.insults import insults

        class Portal:
            """An implementation of IPortal"""

            @defers
            def login(self_, credentials, mind, *interfaces):
                if not (
                    credentials.username == self.username.encode("utf8")
                    and credentials.checkPassword(self.password.encode("utf8"))
                ):
                    raise ValueError("Invalid credentials")

                protocol = telnet.TelnetBootstrapProtocol(
                    insults.ServerProtocol, manhole.Manhole, self._get_telnet_vars()
                )
                return (interfaces[0], protocol, lambda: None)

        return telnet.TelnetTransport(telnet.AuthenticatingTelnetProtocol, Portal())

    def _get_telnet_vars(self) -> dict[str, Any]:
        # Note: if you add entries here also update topics/telnetconsole.rst
        assert self.crawler.engine
        telnet_vars: dict[str, Any] = {
            "engine": self.crawler.engine,
            "spider": self.crawler.engine.spider,
            "slot": self.crawler.engine.slot,
            "crawler": self.crawler,
            "extensions": self.crawler.extensions,
            "stats": self.crawler.stats,
            "settings": self.crawler.settings,
            "est": lambda: print_engine_status(self.crawler.engine),
            "p": pprint.pprint,
            "prefs": print_live_refs,
            "help": "This is Scrapy telnet console. For more info see: "
            "https://docs.scrapy.org/en/latest/topics/telnetconsole.html",
        }
        self.crawler.signals.send_catch_log(update_telnet_vars, telnet_vars=telnet_vars)
        return telnet_vars

通过telnet可以执行本地的变量有engine、spider、slot、crawler、extensions、stats、settings、est、p、prefs、help等。

3、scrapy.extensions.memusage.MemoryUsage 内存利用

"""
MemoryUsage extension

See documentation in docs/topics/extensions.rst
"""

from __future__ import annotations

import logging
import socket
import sys
from importlib import import_module
from pprint import pformat
from typing import TYPE_CHECKING

from twisted.internet import task

from scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.mail import MailSender
from scrapy.utils.engine import get_engine_status

if TYPE_CHECKING:
    # typing.Self requires Python 3.11
    from typing_extensions import Self

    from scrapy.crawler import Crawler


logger = logging.getLogger(__name__)


[docs]class MemoryUsage:
    def __init__(self, crawler: Crawler):
        if not crawler.settings.getbool("MEMUSAGE_ENABLED"):
            raise NotConfigured
        try:
            # stdlib's resource module is only available on unix platforms.
            self.resource = import_module("resource")
        except ImportError:
            raise NotConfigured

        self.crawler: Crawler = crawler
        self.warned: bool = False
        self.notify_mails: list[str] = crawler.settings.getlist("MEMUSAGE_NOTIFY_MAIL")
        self.limit: int = crawler.settings.getint("MEMUSAGE_LIMIT_MB") * 1024 * 1024
        self.warning: int = crawler.settings.getint("MEMUSAGE_WARNING_MB") * 1024 * 1024
        self.check_interval: float = crawler.settings.getfloat(
            "MEMUSAGE_CHECK_INTERVAL_SECONDS"
        )
        self.mail: MailSender = MailSender.from_crawler(crawler)
        crawler.signals.connect(self.engine_started, signal=signals.engine_started)
        crawler.signals.connect(self.engine_stopped, signal=signals.engine_stopped)

    @classmethod
    def from_crawler(cls, crawler: Crawler) -> Self:
        return cls(crawler)

    def get_virtual_size(self) -> int:
        size: int = self.resource.getrusage(self.resource.RUSAGE_SELF).ru_maxrss
        if sys.platform != "darwin":
            # on macOS ru_maxrss is in bytes, on Linux it is in KB
            size *= 1024
        return size

    def engine_started(self) -> None:
        assert self.crawler.stats
        self.crawler.stats.set_value("memusage/startup", self.get_virtual_size())
        self.tasks: list[task.LoopingCall] = []
        tsk = task.LoopingCall(self.update)
        self.tasks.append(tsk)
        tsk.start(self.check_interval, now=True)
        if self.limit:
            tsk = task.LoopingCall(self._check_limit)
            self.tasks.append(tsk)
            tsk.start(self.check_interval, now=True)
        if self.warning:
            tsk = task.LoopingCall(self._check_warning)
            self.tasks.append(tsk)
            tsk.start(self.check_interval, now=True)

    def engine_stopped(self) -> None:
        for tsk in self.tasks:
            if tsk.running:
                tsk.stop()

    def update(self) -> None:
        assert self.crawler.stats
        self.crawler.stats.max_value("memusage/max", self.get_virtual_size())

    def _check_limit(self) -> None:
        assert self.crawler.engine
        assert self.crawler.stats
        peak_mem_usage = self.get_virtual_size()
        if peak_mem_usage > self.limit:
            self.crawler.stats.set_value("memusage/limit_reached", 1)
            mem = self.limit / 1024 / 1024
            logger.error(
                "Memory usage exceeded %(memusage)dMiB. Shutting down Scrapy...",
                {"memusage": mem},
                extra={"crawler": self.crawler},
            )
            if self.notify_mails:
                subj = (
                    f"{self.crawler.settings['BOT_NAME']} terminated: "
                    f"memory usage exceeded {mem}MiB at {socket.gethostname()}"
                )
                self._send_report(self.notify_mails, subj)
                self.crawler.stats.set_value("memusage/limit_notified", 1)

            if self.crawler.engine.spider is not None:
                self.crawler.engine.close_spider(
                    self.crawler.engine.spider, "memusage_exceeded"
                )
            else:
                self.crawler.stop()
        else:
            logger.info(
                "Peak memory usage is %(virtualsize)dMiB",
                {"virtualsize": peak_mem_usage / 1024 / 1024},
            )

    def _check_warning(self) -> None:
        if self.warned:  # warn only once
            return
        assert self.crawler.stats
        if self.get_virtual_size() > self.warning:
            self.crawler.stats.set_value("memusage/warning_reached", 1)
            mem = self.warning / 1024 / 1024
            logger.warning(
                "Memory usage reached %(memusage)dMiB",
                {"memusage": mem},
                extra={"crawler": self.crawler},
            )
            if self.notify_mails:
                subj = (
                    f"{self.crawler.settings['BOT_NAME']} warning: "
                    f"memory usage reached {mem}MiB at {socket.gethostname()}"
                )
                self._send_report(self.notify_mails, subj)
                self.crawler.stats.set_value("memusage/warning_notified", 1)
            self.warned = True

    def _send_report(self, rcpts: list[str], subject: str) -> None:
        """send notification mail with some additional useful info"""
        assert self.crawler.engine
        assert self.crawler.stats
        stats = self.crawler.stats
        s = f"Memory usage at engine startup : {stats.get_value('memusage/startup') / 1024 / 1024}M\r\n"
        s += f"Maximum memory usage          : {stats.get_value('memusage/max') / 1024 / 1024}M\r\n"
        s += f"Current memory usage          : {self.get_virtual_size() / 1024 / 1024}M\r\n"

        s += (
            "ENGINE STATUS ------------------------------------------------------- \r\n"
        )
        s += "\r\n"
        s += pformat(get_engine_status(self.crawler.engine))
        s += "\r\n"
        self.mail.send(rcpts, subject, s)

该功能执行需要部署在linux上,可以配置预警监控、发送预警邮件等,

配置预警邮件参数:

MAIL_HOST = 'localhost' # 邮件服务器
MAIL_PORT = 25 # 邮箱端口号
MAIL_FROM = 'scrapy@localhost' # 邮箱名称
MAIL_PASS = None # 邮箱密码
MAIL_USER = None # 邮箱地址

配置预警监控的参数如下:

MEMUSAGE_CHECK_INTERVAL_SECONDS = 60.0  # 每60s检测一次
MEMUSAGE_ENABLED = True  # 开启预警监控
MEMUSAGE_LIMIT_MB = 0 # 预警限制使用内存
MEMUSAGE_NOTIFY_MAIL = [] # 预警邮件接收邮箱
MEMUSAGE_WARNING_MB = 0 # 预警警告信息内存大小
当使用内存查过limit和waring内存时,会发送对应的邮件提醒。

4、scrapy.extensions.memdebug.MemoryDebugger 

"""
MemoryDebugger extension

See documentation in docs/topics/extensions.rst
"""

from __future__ import annotations

import gc
from typing import TYPE_CHECKING

from scrapy import Spider, signals
from scrapy.exceptions import NotConfigured
from scrapy.utils.trackref import live_refs

if TYPE_CHECKING:
    # typing.Self requires Python 3.11
    from typing_extensions import Self

    from scrapy.crawler import Crawler
    from scrapy.statscollectors import StatsCollector


[docs]class MemoryDebugger:
    def __init__(self, stats: StatsCollector):
        self.stats: StatsCollector = stats

    @classmethod
    def from_crawler(cls, crawler: Crawler) -> Self:
        if not crawler.settings.getbool("MEMDEBUG_ENABLED"):
            raise NotConfigured
        assert crawler.stats
        o = cls(crawler.stats)
        crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)
        return o

    def spider_closed(self, spider: Spider, reason: str) -> None:
        gc.collect()
        self.stats.set_value(
            "memdebug/gc_garbage_count", len(gc.garbage), spider=spider
        )
        for cls, wdict in live_refs.items():
            if not wdict:
                continue
            self.stats.set_value(
                f"memdebug/live_refs/{cls.__name__}", len(wdict), spider=spider
            )

参数

MEMDEBUG_ENABLED = False  # enable memory debugging
MEMDEBUG_NOTIFY = []  # send memory debugging report by mail at engine shutdown
其中MEMDEBUG_NOTITY目前项目中未使用。

主要功能就是开启gc,垃圾回收,然后统计对应的信息。

5、scrapy.extensions.closespider.CloseSpider

"""CloseSpider is an extension that forces spiders to be closed after certain
conditions are met.

See documentation in docs/topics/extensions.rst
"""

from __future__ import annotations

import logging
from collections import defaultdict
from typing import TYPE_CHECKING, Any

from scrapy import Request, Spider, signals
from scrapy.exceptions import NotConfigured

if TYPE_CHECKING:
    from twisted.python.failure import Failure

    # typing.Self requires Python 3.11
    from typing_extensions import Self

    from scrapy.crawler import Crawler
    from scrapy.http import Response


logger = logging.getLogger(__name__)


[docs]class CloseSpider:
    def __init__(self, crawler: Crawler):
        self.crawler: Crawler = crawler

        self.close_on: dict[str, Any] = {
            "timeout": crawler.settings.getfloat("CLOSESPIDER_TIMEOUT"),
            "itemcount": crawler.settings.getint("CLOSESPIDER_ITEMCOUNT"),
            "pagecount": crawler.settings.getint("CLOSESPIDER_PAGECOUNT"),
            "errorcount": crawler.settings.getint("CLOSESPIDER_ERRORCOUNT"),
            "timeout_no_item": crawler.settings.getint("CLOSESPIDER_TIMEOUT_NO_ITEM"),
            "pagecount_no_item": crawler.settings.getint(
                "CLOSESPIDER_PAGECOUNT_NO_ITEM"
            ),
        }

        if not any(self.close_on.values()):
            raise NotConfigured

        self.counter: defaultdict[str, int] = defaultdict(int)

        if self.close_on.get("errorcount"):
            crawler.signals.connect(self.error_count, signal=signals.spider_error)
        if self.close_on.get("pagecount") or self.close_on.get("pagecount_no_item"):
            crawler.signals.connect(self.page_count, signal=signals.response_received)
        if self.close_on.get("timeout"):
            crawler.signals.connect(self.spider_opened, signal=signals.spider_opened)
        if self.close_on.get("itemcount") or self.close_on.get("pagecount_no_item"):
            crawler.signals.connect(self.item_scraped, signal=signals.item_scraped)
        if self.close_on.get("timeout_no_item"):
            self.timeout_no_item: int = self.close_on["timeout_no_item"]
            self.items_in_period: int = 0
            crawler.signals.connect(
                self.spider_opened_no_item, signal=signals.spider_opened
            )
            crawler.signals.connect(
                self.item_scraped_no_item, signal=signals.item_scraped
            )

        crawler.signals.connect(self.spider_closed, signal=signals.spider_closed)

    @classmethod
    def from_crawler(cls, crawler: Crawler) -> Self:
        return cls(crawler)

    def error_count(self, failure: Failure, response: Response, spider: Spider) -> None:
        self.counter["errorcount"] += 1
        if self.counter["errorcount"] == self.close_on["errorcount"]:
            assert self.crawler.engine
            self.crawler.engine.close_spider(spider, "closespider_errorcount")

    def page_count(self, response: Response, request: Request, spider: Spider) -> None:
        self.counter["pagecount"] += 1
        self.counter["pagecount_since_last_item"] += 1
        if self.counter["pagecount"] == self.close_on["pagecount"]:
            assert self.crawler.engine
            self.crawler.engine.close_spider(spider, "closespider_pagecount")
            return
        if self.close_on["pagecount_no_item"] and (
            self.counter["pagecount_since_last_item"]
            >= self.close_on["pagecount_no_item"]
        ):
            assert self.crawler.engine
            self.crawler.engine.close_spider(spider, "closespider_pagecount_no_item")

    def spider_opened(self, spider: Spider) -> None:
        from twisted.internet import reactor

        assert self.crawler.engine
        self.task = reactor.callLater(
            self.close_on["timeout"],
            self.crawler.engine.close_spider,
            spider,
            reason="closespider_timeout",
        )

    def item_scraped(self, item: Any, spider: Spider) -> None:
        self.counter["itemcount"] += 1
        self.counter["pagecount_since_last_item"] = 0
        if self.counter["itemcount"] == self.close_on["itemcount"]:
            assert self.crawler.engine
            self.crawler.engine.close_spider(spider, "closespider_itemcount")

    def spider_closed(self, spider: Spider) -> None:
        task = getattr(self, "task", None)
        if task and task.active():
            task.cancel()

        task_no_item = getattr(self, "task_no_item", None)
        if task_no_item and task_no_item.running:
            task_no_item.stop()

    def spider_opened_no_item(self, spider: Spider) -> None:
        from twisted.internet import task

        self.task_no_item = task.LoopingCall(self._count_items_produced, spider)
        self.task_no_item.start(self.timeout_no_item, now=False)

        logger.info(
            f"Spider will stop when no items are produced after "
            f"{self.timeout_no_item} seconds."
        )

    def item_scraped_no_item(self, item: Any, spider: Spider) -> None:
        self.items_in_period += 1

    def _count_items_produced(self, spider: Spider) -> None:
        if self.items_in_period >= 1:
            self.items_in_period = 0
        else:
            logger.info(
                f"Closing spider since no items were produced in the last "
                f"{self.timeout_no_item} seconds."
            )
            assert self.crawler.engine
            self.crawler.engine.close_spider(spider, "closespider_timeout_no_item")

参数

CLOSESPIDER_TIMEOUT = 0  # download超时次数超过该数值时关系Spider
CLOSESPIDER_PAGECOUNT = 0  # download page个数超过该数值时关系Spider
CLOSESPIDER_ITEMCOUNT = 0  # pipeline item个数超过该数值时关系Spider
CLOSESPIDER_ERRORCOUNT = 0  # download 错误次数超过该数值时关系Spider

主要功能是控制超时个数、page个数、item个数、错误次数等。

6、scrapy.extensions.logstats.LogStats

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

from twisted.internet import task

from scrapy import Spider, signals
from scrapy.exceptions import NotConfigured

if TYPE_CHECKING:
    # typing.Self requires Python 3.11
    from typing_extensions import Self

    from scrapy.crawler import Crawler
    from scrapy.statscollectors import StatsCollector


logger = logging.getLogger(__name__)


[docs]class LogStats:
    """Log basic scraping stats periodically like:
    * RPM - Requests per Minute
    * IPM - Items per Minute
    """

    def __init__(self, stats: StatsCollector, interval: float = 60.0):
        self.stats: StatsCollector = stats
        self.interval: float = interval
        self.multiplier: float = 60.0 / self.interval
        self.task: task.LoopingCall | None = None

    @classmethod
    def from_crawler(cls, crawler: Crawler) -> Self:
        interval: float = crawler.settings.getfloat("LOGSTATS_INTERVAL")
        if not interval:
            raise NotConfigured
        assert crawler.stats
        o = cls(crawler.stats, interval)
        crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)
        crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)
        return o

    def spider_opened(self, spider: Spider) -> None:
        self.pagesprev: int = 0
        self.itemsprev: int = 0

        self.task = task.LoopingCall(self.log, spider)
        self.task.start(self.interval)

    def log(self, spider: Spider) -> None:
        self.calculate_stats()

        msg = (
            "Crawled %(pages)d pages (at %(pagerate)d pages/min), "
            "scraped %(items)d items (at %(itemrate)d items/min)"
        )
        log_args = {
            "pages": self.pages,
            "pagerate": self.prate,
            "items": self.items,
            "itemrate": self.irate,
        }
        logger.info(msg, log_args, extra={"spider": spider})

    def calculate_stats(self) -> None:
        self.items: int = self.stats.get_value("item_scraped_count", 0)
        self.pages: int = self.stats.get_value("response_received_count", 0)
        self.irate: float = (self.items - self.itemsprev) * self.multiplier
        self.prate: float = (self.pages - self.pagesprev) * self.multiplier
        self.pagesprev, self.itemsprev = self.pages, self.items

    def spider_closed(self, spider: Spider, reason: str) -> None:
        if self.task and self.task.running:
            self.task.stop()

        rpm_final, ipm_final = self.calculate_final_stats(spider)
        self.stats.set_value("responses_per_minute", rpm_final)
        self.stats.set_value("items_per_minute", ipm_final)

    def calculate_final_stats(
        self, spider: Spider
    ) -> tuple[None, None] | tuple[float, float]:
        start_time = self.stats.get_value("start_time")
        finished_time = self.stats.get_value("finished_time")

        if not start_time or not finished_time:
            return None, None

        mins_elapsed = (finished_time - start_time).seconds / 60

        items = self.stats.get_value("item_scraped_count", 0)
        pages = self.stats.get_value("response_received_count", 0)

        return (pages / mins_elapsed), (items / mins_elapsed)

参数

LOGSTATS_INTERVAL = 60.0 # 每60s统计一次数据 当为0时,则不进行统计
主要统计page、item的个数等信息,从而计算频率。

7、scrapy.extensions.spiderstate.SpiderState

from __future__ import annotations

import pickle  # nosec
from pathlib import Path
from typing import TYPE_CHECKING

from scrapy import Spider, signals
from scrapy.exceptions import NotConfigured
from scrapy.utils.job import job_dir

if TYPE_CHECKING:
    # typing.Self requires Python 3.11
    from typing_extensions import Self

    from scrapy.crawler import Crawler


[docs]class SpiderState:
    """Store and load spider state during a scraping job"""

    def __init__(self, jobdir: str | None = None):
        self.jobdir: str | None = jobdir

    @classmethod
    def from_crawler(cls, crawler: Crawler) -> Self:
        jobdir = job_dir(crawler.settings)
        if not jobdir:
            raise NotConfigured

        obj = cls(jobdir)
        crawler.signals.connect(obj.spider_closed, signal=signals.spider_closed)
        crawler.signals.connect(obj.spider_opened, signal=signals.spider_opened)
        return obj

    def spider_closed(self, spider: Spider) -> None:
        if self.jobdir:
            with Path(self.statefn).open("wb") as f:
                assert hasattr(spider, "state")  # set in spider_opened
                pickle.dump(spider.state, f, protocol=4)

    def spider_opened(self, spider: Spider) -> None:
        if self.jobdir and Path(self.statefn).exists():
            with Path(self.statefn).open("rb") as f:
                spider.state = pickle.load(f)  # type: ignore[attr-defined]  # nosec
        else:
            spider.state = {}  # type: ignore[attr-defined]

    @property
    def statefn(self) -> str:
        assert self.jobdir
        return str(Path(self.jobdir, "spider.state"))

参数

JOBDIR='' # 项目spider state保存地址
配置JOBDIR时,会自动创建文件夹然后保存spider state到文件夹内。默认是不配置的。

8、scrapy.extensions.throttle.AutoThrottle

class AutoThrottle:
 
    def __init__(self, crawler):
        self.crawler = crawler
        if not crawler.settings.getbool('AUTOTHROTTLE_ENABLED'):
            raise NotConfigured
 
        self.debug = crawler.settings.getbool("AUTOTHROTTLE_DEBUG")
        self.target_concurrency = crawler.settings.getfloat("AUTOTHROTTLE_TARGET_CONCURRENCY")
        crawler.signals.connect(self._spider_opened, signal=signals.spider_opened)
        crawler.signals.connect(self._response_downloaded, signal=signals.response_downloaded)
 
    @classmethod
    def from_crawler(cls, crawler):
        return cls(crawler)
 
    def _spider_opened(self, spider):
        self.mindelay = self._min_delay(spider)
        self.maxdelay = self._max_delay(spider)
        spider.download_delay = self._start_delay(spider)
 
    def _min_delay(self, spider):
        s = self.crawler.settings
        return getattr(spider, 'download_delay', s.getfloat('DOWNLOAD_DELAY'))
 
    def _max_delay(self, spider):
        return self.crawler.settings.getfloat('AUTOTHROTTLE_MAX_DELAY')
 
    def _start_delay(self, spider):
        return max(self.mindelay, self.crawler.settings.getfloat('AUTOTHROTTLE_START_DELAY'))
 
    def _response_downloaded(self, response, request, spider):
        key, slot = self._get_slot(request, spider)
        latency = request.meta.get('download_latency')
        if latency is None or slot is None:
            return
 
        olddelay = slot.delay
        self._adjust_delay(slot, latency, response)
        if self.debug:
            diff = slot.delay - olddelay
            size = len(response.body)
            conc = len(slot.transferring)
            logger.info(
                "slot: %(slot)s | conc:%(concurrency)2d | "
                "delay:%(delay)5d ms (%(delaydiff)+d) | "
                "latency:%(latency)5d ms | size:%(size)6d bytes",
                {
                    'slot': key, 'concurrency': conc,
                    'delay': slot.delay * 1000, 'delaydiff': diff * 1000,
                    'latency': latency * 1000, 'size': size
                },
                extra={'spider': spider}
            )
 
    def _get_slot(self, request, spider):
        key = request.meta.get('download_slot')
        return key, self.crawler.engine.downloader.slots.get(key)
 
    def _adjust_delay(self, slot, latency, response):
        """Define delay adjustment policy"""
 
        # If a server needs `latency` seconds to respond then
        # we should send a request each `latency/N` seconds
        # to have N requests processed in parallel
        target_delay = latency / self.target_concurrency
 
        # Adjust the delay to make it closer to target_delay
        new_delay = (slot.delay + target_delay) / 2.0
 
        # If target delay is bigger than old delay, then use it instead of mean.
        # It works better with problematic sites.
        new_delay = max(target_delay, new_delay)
 
        # Make sure self.mindelay <= new_delay <= self.max_delay
        new_delay = min(max(self.mindelay, new_delay), self.maxdelay)
 
        # Dont adjust delay if response status != 200 and new delay is smaller
        # than old one, as error pages (and redirections) are usually small and
        # so tend to reduce latency, thus provoking a positive feedback by
        # reducing delay instead of increase.
        if response.status != 200 and new_delay <= slot.delay:
            return
 
        slot.delay = new_delay

参数

AUTOTHROTTLE_ENABLED = False # 是否开启自适应下载延迟
AUTOTHROTTLE_DEBUG = False # 是否开启自适应DEBUG
AUTOTHROTTLE_MAX_DELAY = 60.0 # 最大延迟60s
AUTOTHROTTLE_START_DELAY = 5.0 # 开始延迟5s
AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0 # 自动调整精度为1s

该功能默认不开启。

参考链接

Scrapy 源码分析 4 extensions middlewares详解_scrapy.extensions.logstats-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2400940.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spark实战能力测评模拟题精析【模拟考】

1.println(Array(1,2,3,4,5).filter(_%20).toList() 输出结果是&#xff08;B&#xff09; A. 2 4 B. List(2,4) C. List(1,3,5) D. 1 3 5 2.println(Array("tom","team","pom") .filter(_.matches("")).toList) 输出结果为(List(tom,…

【OSG学习笔记】Day 15: 路径动画与相机漫游

本章来学习下漫游相机。 路径动画与相机漫游 本届内容比较简单&#xff0c;其实就是实现物体的运动和相机的运动 当然这两个要一起执行。 贝塞尔曲线 贝塞尔曲线&#xff08;Bzier curve&#xff09;是一种在计算机图形学、动画制作、工业设计等领域广泛应用的参数曲线&am…

PostgreSQL(PostGIS)触发器+坐标转换案例

需求&#xff0c;只录入一份坐标参考为4326的数据&#xff0c;但是发布的数据要求坐标必须是3857 对这种需求可以利用数据库触发器实现数据的同步 步骤&#xff1a; 1. 使用ArcGIS Pro创建一个名字为testfc_4326的图层&#xff0c;坐标参考为4326 2. 使用Pro再创建一个名字…

Constraints and Triggers

目录 Kinds of Constraints Single-Attribute Keys Multiattribute Key Foreign Keys Expressing Foreign Keys Enforcing Foreign-Key Constraints Actions Taken Attribute-Based Checks Timing of Checks Tuple-Based Checks Assertions Timing of Assertion Ch…

BERT:让AI真正“读懂”语言的革命

BERT&#xff1a;让AI真正“读懂”语言的革命 ——图解谷歌神作《BERT: Pre-training of Deep Bidirectional Transformers》 2018年&#xff0c;谷歌AI团队扔出一篇核弹级论文&#xff0c;引爆了整个NLP领域。这个叫BERT的模型在11项任务中屠榜&#xff0c;甚至超越人类表现…

冷雨泉教授团队:新型视觉驱动智能假肢手,拟人化抓握技术突破,助力截肢者重获生活自信

研究背景&#xff1a;日常生活中&#xff0c;健康人依靠手完成对物体的操作。对于手部截肢患者&#xff0c;手部的缺失导致他们难以有效地操作物体&#xff0c;进而影响正常的日常生活。拥有一个能够实现拟人地自然抓取多种日常物体的五指动力假手是手部截肢患者的夙愿&#xf…

pikachu靶场通关笔记14 XSS关卡10-XSS之js输出(五种方法渗透)

目录 一、源码分析 1、进入靶场 2、代码审计 二、渗透实战 1、根据提示输入tmac 2、XSS探测 3、注入Payload1 4、注入Payload2 5、注入Payload3 6、注入Payload4 7、注入Payload5 本系列为通过《pikachu靶场通关笔记》的XSS关卡(共10关&#xff09;渗透集合&#x…

李沐-动手学深度学习:RNN

1.RNN从零开始实现 import math import torch from torch import nn from torch.nn import functional as F from d2l import torch as d2l#8.3.4节 #batch_size&#xff1a;每个小批量中子序列样本的数目&#xff0c;num_steps&#xff1a;每个子序列中预定义的时间步数 #loa…

【教学类-36-10】20250531蝴蝶图案描边,最适合大小(一页1图1图、2图图案不同、2图图案相同对称)

背景说明: 之前做了动物头像扇子(描边20),并制作成一页一套图案对称两张 【教学类-36-09】20250526动物头像扇子的描边(通义万相)对称图40张,根据图片长宽,自动旋转图片,最大化图片-CSDN博客文章浏览阅读1k次,点赞37次,收藏6次。【教学类-36-09】20250526动物头像…

高效DBA的日常运维主题沙龙

2024年11月10日&#xff0c;在宁波组织了高效DBA的日常运维沙龙活动&#xff0c;大概有20人左右现场参加。会议的主题为&#xff1a; 目标&#xff1a; 1、识别高频低效操作并制定自动化方案 2、建立关键运维指标健康度体系 3、输出可立即落地的优化清单 会议议程 一、效能瓶…

AAAI 2025论文分享│STD-PLM:基于预训练语言模型的时空数据预测与补全方法

本文详细介绍了一篇发表于人工智能顶级会议AAAI 2025的论文《STD-PLM: Understanding Both Spatial and Temporal Properties of Spatial-Temporal Data with PLM》。该论文提出了一种基于预训练语言模型&#xff08;Pre-trained Language Model‌&#xff0c;PLM&#xff09;的…

Ethernet/IP转DeviceNet网关:驱动大型矿山自动化升级的核心纽带

在大型矿山自动化系统中&#xff0c;如何高效整合新老设备、打通数据孤岛、实现统一控制&#xff0c;是提升效率与安全的关键挑战。JH-EIP-DVN疆鸿智能EtherNet/IP转DeviceNet网关&#xff0c;正是解决这一难题的核心桥梁&#xff0c;为矿山各环节注入强劲连接力&#xff1a; …

[蓝桥杯]模型染色

模型染色 题目描述 在电影《超能陆战队》中&#xff0c;小宏可以使用他的微型机器人组合成各种各样的形状。 现在他用他的微型机器人拼成了一个大玩具给小朋友们玩。为了更加美观&#xff0c;他决定给玩具染色。 小宏的玩具由 nn 个球型的端点和 mm 段连接这些端点之间的边…

卡西欧模拟器:Windows端功能强大的计算器

引言 大家还记得初中高中时期用的计算器吗&#xff1f;今天给大家分享的就是一款windows端的卡西欧计算器。 软件介绍 大家好&#xff0c;我是逍遥小欢。 CASIO fx-9860G是一款功能强大的图形计算器&#xff0c;适用于数学、科学和工程计算。以下是其主要功能和特点的详细介…

机器学习基础(三) 逻辑回归

目录 逻辑回归的概念核心思想 Sigmoid 函数 逻辑回归的原理和底层优化手段伯努利分布最大似然估计 Maximum Likelihood Estimation &#xff08;MLE&#xff09;伯努利分布的似然函数交叉熵损失函数&#xff08;Cross-Entropy Loss&#xff09;&#xff0c;也称为 对数损失&…

Qwen-3 微调实战:用 Python 和 Unsloth 打造专属 AI 模型

虽然大家都忙着在 DeepSeek 上构建应用&#xff0c;但那些聪明的开发者们却悄悄发现了 Qwen-3 的微调功能&#xff0c;这可是一个隐藏的宝藏&#xff0c;能把通用型 AI 变成你的专属数字专家。 通过这篇文章&#xff0c;你将学到如何针对特定用途微调最新的 Qwen-3 模型。无论…

微软Build 2025:Copilot Studio升级,解锁多智能体协作未来

微软Build 2025大会圆满落幕&#xff0c;作为年度科技盛会&#xff0c;它一直是开发与AI技术突破性创新的重要展示平台。对于工程师、创作者和领域专家来说&#xff0c;这是了解微软生态未来动向的关键时刻。今年&#xff0c;Microsoft Copilot Studio推出了一系列新功能&#…

设计模式——系统数据建模设计

摘要 本文主要介绍了UML在软件系统分析和设计中的应用&#xff0c;详细阐述了六大类关系&#xff08;泛化、实现、依赖、关联、聚合、组合&#xff09;及其在UML类图中的表示方法&#xff0c;并通过具体例子说明了这些关系在实际编程中的应用。同时&#xff0c;文章还概述了UM…

解决docker运行zentao 报错:ln: failed to create symbolic link ‘/opt/zbox/tmp/mysq

1 背景描述 禅道使用docker部署运行过一段&#xff0c;服务正常。 后因服务器断电重启&#xff0c;禅道服务也随docker一起启动&#xff0c;但是服务却无法访问。如下如&#xff1a; 2 查看日志&#xff0c;定位原因 查看禅道日志&#xff1a; # docker logs zentao容器di…

OA工程自动化办公系统 – 免费Java源码

概述 功能完备的OA工程自动化办公系统Java源码&#xff0c;采用主流技术栈开发&#xff0c;无论是学习SpringBoot框架还是开发企业级应用&#xff0c;都是不可多得的优质资源。 主要内容 技术架构 ​​后端技术栈​​&#xff1a; 核心框架&#xff1a;SpringBoot 2.xORM框…