环境

  • python 2.7
  • scrapy 1.3.0

背景

我们在写scrapy一些拓展功能的时候,少不了会用到scrapy的extention机制,官网也有提供各种的示例,比如我们在自己手动创建一个新的project的时候,template 会创建一个middlewares.py的文件 ,里面也会有各种和信号打交道的机制,比如这段代码

代码链接:https://github.com/scrapy/scrapy/blob/1.3/scrapy/templates/project/module/middlewares.py.tmpl

from scrapy import signals


class CnblogsSpiderMiddleware(object):
    # Not all methods need to be defined. If a method is not defined,
    # scrapy acts as if the spider middleware does not modify the
    # passed objects.

    @classmethod
    def from_crawler(cls, crawler):
        # This method is used by Scrapy to create your spiders.
        s = cls()
        crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
        return s

    def process_spider_input(self, response, spider):
        # Called for each response that goes through the spider
        # middleware and into the spider.

        # Should return None or raise an exception.
        return None

    def process_spider_output(self, response, result, spider):
        # Called with the results returned from the Spider, after
        # it has processed the response.

        # Must return an iterable of Request, dict or Item objects.
        for i in result:
            yield i

    def process_spider_exception(self, response, exception, spider):
        # Called when a spider or process_spider_input() method
        # (from other spider middleware) raises an exception.

        # Should return either None or an iterable of Response, dict
        # or Item objects.
        pass

    def process_start_requests(self, start_requests, spider):
        # Called with the start requests of the spider, and works
        # similarly to the process_spider_output() method, except
        # that it doesn’t have a response associated.

        # Must return only requests (not items).
        for r in start_requests:
            yield r

    def spider_opened(self, spider):
        spider.logger.info('Spider opened: %s' % spider.name)

    def process_get_response(self, *args, **kwargs):
        print 'response_received'

这里就直接将spider_open的"事件" 与当前的 spider open 方法链接起来了,如果还需要一些特定的信号应该如何处理呢?

意外

以上的代码是在使用scrapy 1.3.0在创建新的项目的时候会出问题TypeError: process_start_requests() takes exactly 2 arguments (3 given),一般这种问题都是没有加入self这个参数,然后在观察到官方的scrapy 1.3的template 相关的代码的时候,发现了这个错误

对于官方的这个issue,已经提了pr给它的团队: https://github.com/scrapy/scrapy/pull/2882

分析

关于信号的处理,主要由3个部分组件

  • 信号体
  • 发生方
  • 订阅方

信号体

代码所在:https://github.com/scrapy/scrapy/blob/1.3/scrapy/signals.py

"""
Scrapy signals
These signals are documented in docs/topics/signals.rst. Please don't add new
signals here without documenting them there.
"""

engine_started = object()
engine_stopped = object()
spider_opened = object()
spider_idle = object()
spider_closed = object()
spider_error = object()
request_scheduled = object()
request_dropped = object()
response_received = object()
response_downloaded = object()
item_scraped = object()
item_dropped = object()

# for backwards compatibility
stats_spider_opened = spider_opened
stats_spider_closing = spider_closed
stats_spider_closed = spider_closed

item_passed = item_scraped

request_received = request_scheduled

以上就是目前版中常见的信号(或者说事件),拿item_scraped来说,他是怎么被发送给那些被事件订阅者的呢

发生方

代码所在:https://github.com/scrapy/scrapy/blob/eb5d396527c2d63fa6475dfd9b6372a19bce5488/scrapy/core/scraper.py

def _itemproc_finished(self, output, item, response, spider):
    """ItemProcessor finished for the given ``item`` and returned ``output``
    """
    self.slot.itemproc_size -= 1
    if isinstance(output, Failure):
        ex = output.value
        if isinstance(ex, DropItem):
            logkws = self.logformatter.dropped(item, ex, response, spider)
            logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
            return self.signals.send_catch_log_deferred(
                signal=signals.item_dropped, item=item, response=response,
                spider=spider, exception=output.value)
        else:
            logger.error('Error processing %(item)s', {'item': item},
                            exc_info=failure_to_exc_info(output),
                            extra={'spider': spider})
    else:
        logkws = self.logformatter.scraped(output, response, spider)
        logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
        return self.signals.send_catch_log_deferred(
            signal=signals.item_scraped, item=output, response=response,
            spider=spider)

我们看到,他是使用self.signals.send_catch_log_deferred方法,将事件发送出去了,那self.signals是个啥呢?继续查看代码

class Scraper(object):

    def __init__(self, crawler):
        self.slot = None
        self.spidermw = SpiderMiddlewareManager.from_crawler(crawler)
        itemproc_cls = load_object(crawler.settings['ITEM_PROCESSOR'])
        self.itemproc = itemproc_cls.from_crawler(crawler)
        self.concurrent_items = crawler.settings.getint('CONCURRENT_ITEMS')
        self.crawler = crawler
        self.signals = crawler.signals
        self.logformatter = crawler.logformatter

self.signals = crawler.signals 是将crawler的signals赋予scraper的,这里不提不提前吐槽一下,这里的signals的变量的本质 明明就是signals_manager ,导致看的时候有点意义不明确,我们再来看crawler 的代码

代码所在:https://github.com/scrapy/scrapy/blob/master/scrapy/crawler.py

from scrapy.signalmanager import SignalManager
from scrapy.exceptions import ScrapyDeprecationWarning
from scrapy.utils.ossignal import install_shutdown_handlers, signal_names
from scrapy.utils.misc import load_object
from scrapy.utils.log import (
    LogCounterHandler, configure_logging, log_scrapy_info,
    get_scrapy_root_handler, install_scrapy_root_handler)
from scrapy import signals

logger = logging.getLogger(__name__)


class Crawler(object):

    def __init__(self, spidercls, settings=None):
        if isinstance(settings, dict) or settings is None:
            settings = Settings(settings)

        self.spidercls = spidercls
        self.settings = settings.copy()
        self.spidercls.update_settings(self.settings)

        self.signals = SignalManager(self)

看到这里的就意义明确了,这里singals 变量,实际是实例化了SignalManager 对象,在scraper启动的时候,crawler的想关的内容都会被scraper 拿到,那SignalManager是个啥意儿,我们看看代码

from __future__ import absolute_import
from pydispatch import dispatcher
from scrapy.utils import signal as _signal


class SignalManager(object):

    def __init__(self, sender=dispatcher.Anonymous):
        self.sender = sender

    def connect(self, receiver, signal, **kwargs):
        """
        Connect a receiver function to a signal.
        The signal can be any object, although Scrapy comes with some
        predefined signals that are documented in the :ref:`topics-signals`
        section.
        :param receiver: the function to be connected
        :type receiver: callable
        :param signal: the signal to connect to
        :type signal: object
        """
        kwargs.setdefault('sender', self.sender)
        return dispatcher.connect(receiver, signal, **kwargs)

    def disconnect(self, receiver, signal, **kwargs):
        """
        Disconnect a receiver function from a signal. This has the
        opposite effect of the :meth:`connect` method, and the arguments
        are the same.
        """
        kwargs.setdefault('sender', self.sender)
        return dispatcher.disconnect(receiver, signal, **kwargs)

    def send_catch_log(self, signal, **kwargs):
        """
        Send a signal, catch exceptions and log them.
        The keyword arguments are passed to the signal handlers (connected
        through the :meth:`connect` method).
        """
        kwargs.setdefault('sender', self.sender)
        return _signal.send_catch_log(signal, **kwargs)

    def send_catch_log_deferred(self, signal, **kwargs):
        """
        Like :meth:`send_catch_log` but supports returning `deferreds`_ from
        signal handlers.
        Returns a Deferred that gets fired once all signal handlers
        deferreds were fired. Send a signal, catch exceptions and log them.
        The keyword arguments are passed to the signal handlers (connected
        through the :meth:`connect` method).
        .. _deferreds: http://twistedmatrix.com/documents/current/core/howto/defer.html
        """
        kwargs.setdefault('sender', self.sender)
        return _signal.send_catch_log_deferred(signal, **kwargs)

    def disconnect_all(self, signal, **kwargs):
        """
        Disconnect all receivers from the given signal.
        :param signal: the signal to disconnect from
        :type signal: object
        """
        kwargs.setdefault('sender', self.sender)
        return _signal.disconnect_all(signal, **kwargs)

基本上看到这里,我们就基本了解整个信号的运作机制了

    dispatcher.connect(receiver, signal, **kwargs)

使用pydispatch ,通过信号机制,我们轻松的将事件发生方的相关数据发送给相关的接收者

结论

如果我们要自定义一些信号,可以完全按照他的写法,来自己处理一些特定的事件或者信号机制

代码

有一个小demo 关于拿到response 的信号处理机制
代码所在:https://github.com/BruceDone/scrapy_demo/tree/master/cnblogs

参考资料