[scrapy]scrapy源码分析–信号事件是如何加载以及自定义信号
环境
- python 2.7
- scrapy 1.3.0
背景
我们在写scrapy一些拓展功能的时候,少不了会用到scrapy的extention机制,官网也有提供各种的示例,比如我们在自己手动创建一个新的project的时候,template 会创建一个middlewares.py
的文件 ,里面也会有各种和信号打交道的机制,比如这段代码
代码链接:https://github.com/scrapy/scrapy/blob/1.3/scrapy/templates/project/module/middlewares.py.tmpl
from scrapy import signals
class CnblogsSpiderMiddleware(object):
# Not all methods need to be defined. If a method is not defined,
# scrapy acts as if the spider middleware does not modify the
# passed objects.
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
return s
def process_spider_input(self, response, spider):
# Called for each response that goes through the spider
# middleware and into the spider.
# Should return None or raise an exception.
return None
def process_spider_output(self, response, result, spider):
# Called with the results returned from the Spider, after
# it has processed the response.
# Must return an iterable of Request, dict or Item objects.
for i in result:
yield i
def process_spider_exception(self, response, exception, spider):
# Called when a spider or process_spider_input() method
# (from other spider middleware) raises an exception.
# Should return either None or an iterable of Response, dict
# or Item objects.
pass
def process_start_requests(self, start_requests, spider):
# Called with the start requests of the spider, and works
# similarly to the process_spider_output() method, except
# that it doesn’t have a response associated.
# Must return only requests (not items).
for r in start_requests:
yield r
def spider_opened(self, spider):
spider.logger.info('Spider opened: %s' % spider.name)
def process_get_response(self, *args, **kwargs):
print 'response_received'
这里就直接将spider_open的"事件" 与当前的 spider open 方法链接起来了,如果还需要一些特定的信号应该如何处理呢?
意外
以上的代码是在使用scrapy 1.3.0
在创建新的项目的时候会出问题TypeError: process_start_requests() takes exactly 2 arguments (3 given)
,一般这种问题都是没有加入self
这个参数,然后在观察到官方的scrapy 1.3
的template 相关的代码的时候,发现了这个错误
对于官方的这个issue,已经提了pr给它的团队: https://github.com/scrapy/scrapy/pull/2882
分析
关于信号的处理,主要由3个部分组件
- 信号体
- 发生方
- 订阅方
信号体
代码所在:https://github.com/scrapy/scrapy/blob/1.3/scrapy/signals.py
"""
Scrapy signals
These signals are documented in docs/topics/signals.rst. Please don't add new
signals here without documenting them there.
"""
engine_started = object()
engine_stopped = object()
spider_opened = object()
spider_idle = object()
spider_closed = object()
spider_error = object()
request_scheduled = object()
request_dropped = object()
response_received = object()
response_downloaded = object()
item_scraped = object()
item_dropped = object()
# for backwards compatibility
stats_spider_opened = spider_opened
stats_spider_closing = spider_closed
stats_spider_closed = spider_closed
item_passed = item_scraped
request_received = request_scheduled
以上就是目前版中常见的信号(或者说事件),拿item_scraped
来说,他是怎么被发送给那些被事件订阅者的呢
发生方
def _itemproc_finished(self, output, item, response, spider):
"""ItemProcessor finished for the given ``item`` and returned ``output``
"""
self.slot.itemproc_size -= 1
if isinstance(output, Failure):
ex = output.value
if isinstance(ex, DropItem):
logkws = self.logformatter.dropped(item, ex, response, spider)
logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
return self.signals.send_catch_log_deferred(
signal=signals.item_dropped, item=item, response=response,
spider=spider, exception=output.value)
else:
logger.error('Error processing %(item)s', {'item': item},
exc_info=failure_to_exc_info(output),
extra={'spider': spider})
else:
logkws = self.logformatter.scraped(output, response, spider)
logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
return self.signals.send_catch_log_deferred(
signal=signals.item_scraped, item=output, response=response,
spider=spider)
我们看到,他是使用self.signals.send_catch_log_deferred
方法,将事件发送出去了,那self.signals
是个啥呢?继续查看代码
class Scraper(object):
def __init__(self, crawler):
self.slot = None
self.spidermw = SpiderMiddlewareManager.from_crawler(crawler)
itemproc_cls = load_object(crawler.settings['ITEM_PROCESSOR'])
self.itemproc = itemproc_cls.from_crawler(crawler)
self.concurrent_items = crawler.settings.getint('CONCURRENT_ITEMS')
self.crawler = crawler
self.signals = crawler.signals
self.logformatter = crawler.logformatter
self.signals = crawler.signals
是将crawler的signals赋予scraper的,这里不提不提前吐槽一下,这里的signals
的变量的本质 明明就是signals_manager
,导致看的时候有点意义不明确,我们再来看crawler 的代码
代码所在:https://github.com/scrapy/scrapy/blob/master/scrapy/crawler.py
from scrapy.signalmanager import SignalManager
from scrapy.exceptions import ScrapyDeprecationWarning
from scrapy.utils.ossignal import install_shutdown_handlers, signal_names
from scrapy.utils.misc import load_object
from scrapy.utils.log import (
LogCounterHandler, configure_logging, log_scrapy_info,
get_scrapy_root_handler, install_scrapy_root_handler)
from scrapy import signals
logger = logging.getLogger(__name__)
class Crawler(object):
def __init__(self, spidercls, settings=None):
if isinstance(settings, dict) or settings is None:
settings = Settings(settings)
self.spidercls = spidercls
self.settings = settings.copy()
self.spidercls.update_settings(self.settings)
self.signals = SignalManager(self)
看到这里的就意义明确了,这里singals 变量,实际是实例化了SignalManager 对象,在scraper启动的时候,crawler的想关的内容都会被scraper 拿到,那SignalManager是个啥意儿,我们看看代码
from __future__ import absolute_import
from pydispatch import dispatcher
from scrapy.utils import signal as _signal
class SignalManager(object):
def __init__(self, sender=dispatcher.Anonymous):
self.sender = sender
def connect(self, receiver, signal, **kwargs):
"""
Connect a receiver function to a signal.
The signal can be any object, although Scrapy comes with some
predefined signals that are documented in the :ref:`topics-signals`
section.
:param receiver: the function to be connected
:type receiver: callable
:param signal: the signal to connect to
:type signal: object
"""
kwargs.setdefault('sender', self.sender)
return dispatcher.connect(receiver, signal, **kwargs)
def disconnect(self, receiver, signal, **kwargs):
"""
Disconnect a receiver function from a signal. This has the
opposite effect of the :meth:`connect` method, and the arguments
are the same.
"""
kwargs.setdefault('sender', self.sender)
return dispatcher.disconnect(receiver, signal, **kwargs)
def send_catch_log(self, signal, **kwargs):
"""
Send a signal, catch exceptions and log them.
The keyword arguments are passed to the signal handlers (connected
through the :meth:`connect` method).
"""
kwargs.setdefault('sender', self.sender)
return _signal.send_catch_log(signal, **kwargs)
def send_catch_log_deferred(self, signal, **kwargs):
"""
Like :meth:`send_catch_log` but supports returning `deferreds`_ from
signal handlers.
Returns a Deferred that gets fired once all signal handlers
deferreds were fired. Send a signal, catch exceptions and log them.
The keyword arguments are passed to the signal handlers (connected
through the :meth:`connect` method).
.. _deferreds: http://twistedmatrix.com/documents/current/core/howto/defer.html
"""
kwargs.setdefault('sender', self.sender)
return _signal.send_catch_log_deferred(signal, **kwargs)
def disconnect_all(self, signal, **kwargs):
"""
Disconnect all receivers from the given signal.
:param signal: the signal to disconnect from
:type signal: object
"""
kwargs.setdefault('sender', self.sender)
return _signal.disconnect_all(signal, **kwargs)
基本上看到这里,我们就基本了解整个信号的运作机制了
dispatcher.connect(receiver, signal, **kwargs)
使用pydispatch ,通过信号机制,我们轻松的将事件发生方的相关数据发送给相关的接收者
结论
如果我们要自定义一些信号,可以完全按照他的写法,来自己处理一些特定的事件或者信号机制
代码
有一个小demo 关于拿到response 的信号处理机制
代码所在:https://github.com/BruceDone/scrapy_demo/tree/master/cnblogs
参考资料
- 原文作者:大鱼
- 原文链接:https://brucedone.com/archives/1069/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议. 进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。