上一篇我们已经讲了是什么,为什么的问题,从我们一些简单的实例中分析了分布式的一些优势,以及一些现有的,基于scrapy的分布式的框架。接下来就是分享我工作中使用的框架基本思想

源码

我们先从scrapy的Spider源码来看

   def from_crawler(cls, crawler, *args, **kwargs):
        spider = cls(*args, **kwargs)
        spider._set_crawler(crawler)
        return spider

    def set_crawler(self, crawler):
        warnings.warn("set_crawler is deprecated, instantiate and bound the "
                      "spider to this crawler with from_crawler method "
                      "instead.",
                      category=ScrapyDeprecationWarning, stacklevel=2)
        assert not hasattr(self, 'crawler'), "Spider already bounded to a " \
                                             "crawler"
        self._set_crawler(crawler)

    def _set_crawler(self, crawler):
        self.crawler = crawler
        self.settings = crawler.settings
        crawler.signals.connect(self.close, signals.spider_closed)

    def start_requests(self):
        for url in self.start_urls:
            yield self.make_requests_from_url(url)

    def make_requests_from_url(self, url):
        return Request(url, dont_filter=True)

    def parse(self, response):
        raise NotImplementedError

基中的start_requests方法,这几乎是每一个spider的入口函数。那换句话说,如果在给每个start_requests方法都去执行一段拿着和当前spider的特征key 去取分配给当前的spider的队列中的url ,那该spider 是不是就可以分布式了?当然显然是可以的。

  • main_spider 负责生产,生产完成之后将生成的url 放入到redis的url队列里面,打上特殊的key
  • item_spider 负责消费,消费这些url ,item spider可以放入到不同的机器里面

通过这样方式,可以很轻松的实现分布式的基本思想。

配置

先来看看我的item_spider应该如何写

# -*- coding: utf-8 -*-
from scrapy import Request
from scrapy.spiders import Spider
from yohobuy_item_spider import YohobuyItemSpider


class YohobuyItemFromSeedApiSpider(Spider):
    name = 'yohobuy_item_from_seed_api'

    start_urls = []

    def __init__(self, get_seed_req_url):
        self.storage_key = 'yohobuy'
        self.start_urls = [get_seed_req_url % self.storage_key]
        super(YohobuyItemFromSeedApiSpider, self).__init__(self.name)

    def start_requests(self):
        for url in self.start_urls:
            # this is a api call for seed urls. we shouldn't cache it
            yield Request(url, self.parse_urls, meta={'dont_cache': True}, headers={'Accept': 'application/json'})

    def parse_urls(self, response):
        from json import loads
        body = response.body_as_unicode()
        reqs = loads(body, encoding='utf-8')
        for req in reqs.get('urls', []):
            url = req['url']
            yield Request(url,
                          callback=YohobuyItemSpider.parse_item,
                          meta={
                              'storage_key': self.storage_key
                          },
                          headers={
                              'Referer': 'http://m.jd.com/',
                              'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1664.3 Safari/537.36',
                              'accept-language': 'en-US,en;q=0.8,zh-CN;q=0.6,zh;q=0.4',
                              'accept': 'application/json, text/javascript, */*; q=0.01',
                          })

    @classmethod
    def from_settings(cls, settings):
        return cls(settings.get('GET_SEED_REQ_URL'))

    @classmethod
    def from_crawler(cls, crawler, *args, **kwargs):
        spider = YohobuyItemFromSeedApiSpider.from_settings(crawler.settings)
        spider._set_crawler(crawler)
        return spider

其中的from_settings方法是从配置中拿GET_SEED_REQ_URL参数

from_crawler是将拿到的新的配置返回给spider对象

需要在settings.py里面配置如下的信息

GET_SEED_REQ_URL = 'http://127.0.0.1/seed-url/%s?count=100'

通过这样的灵活配置,就可以拿到属于自己的队列,count的意思是每次拿多少的数量,这个数字可以根据自己的机器的性能来配置的。

总结

如果做到后期,建议将数据层单独抽象成一个中间件,关于数据比如,取队列,存数据 ,和队列数据库以及存储数据库交互的都需要放到这个层面来做。方便以后灵活的部署以迁移。