(3)分布式下的爬虫Scrapy应该如何做-递归爬取方式,数据输出方式以及数据库链接
[2016-11-21更新]关于demo代码,请参考: ===>scrapy_demo<===
放假这段时间好好的思考了一下关于Scrapy的一些常用操作,主要解决了三个问题
- 如何连续爬取
- 数据输出方式
- 数据库链接
如何连续爬取
思考:要达到连续爬取,逻辑上无非从以下的方向着手
-
预加载需要爬取的列表,直接到这个列表都处理完,相应的爬取工作都已经完成了。
-
从第一页开始爬取,遇到有下一页标签的,那继续爬取,如果没有下一页类似的标签,那表示已经爬到最后一页
-
分析当前页面的所有链接,对于链接符合某种特定规则的,继续爬取,如果没有那表示爬取工作完成(此时需要建立已经爬取列表,防止重复操作)
一般会于定向的爬虫,比如爬取某宝或者某东的数据时,可以采用方式一,二,写好规则就可以了,也方便维护。
1.1 对于预加载的列表,那根据需要生成列表就可以了。
在start_urls 里面生成相应的列表就可以,这里引入一个概念,列表推导式。
我们将代码变换成如下:
from scrapy.spider import BaseSpider
from scrapy.selector import HtmlXPathSelector
from cnblogs.items import CnblogsItem
class CnblogsSpider(BaseSpider):
name = "cnblogs"
allowed_domains = ["cnblogs.com"]
start_urls = [
'http://www.cnblogs.com/#p%s' % p for p in xrange(1, 11)
]
def parse(self, response):
self.log("Fetch douban homepage page: %s" % response.url)
hxs = HtmlXPathSelector(response)
# authors = hxs.select('//a[@class="titlelnk"]')
items = hxs.select('//a[contains(@class, "titlelnk")]')
listitems = []
for author in items:
# print author.select('text()').extract()
item = CnblogsItem()
# property
item['Title'] = ''.join(author.select('text()').extract())
item['TitleUrl'] = author.select('@href').extract()
listitems.append(item)
return listitems
在这里,start_urls里面使用列表推导式,爬出了一共10页的数据。
1.2对于爬取下一页实现全趴取的过程,就需要使用yield关键字
我们就虫师的博客来进行测试实验:
http://www.cnblogs.com/fnng/default.aspx?page=1
这里介绍一个scrapy 一个非常有用的技巧,scrapy shell ,因为使用 xpath 可以帮助我们调试xpath语法(或者使用firebug又或者是chrome都可以)
语法:scrapy shell http://你要调试xpath的网址
这里我就不继续讲xpath的语法了,自己去搜一下,相比正则要相对简单好理解。
相应的Spider可以这样编写:
# -*- coding: utf-8 -*-
from scrapy.spider import BaseSpider
from scrapy.selector import HtmlXPathSelector
from cnblogs.items import CnblogsItem
from scrapy.http import Request
from scrapy import log
# please pay attention to the encoding of info,otherwise raise error of decode
import sys
reload(sys)
sys.setdefaultencoding('utf8')
class BlogsSpider(BaseSpider):
name = "cnblogs_blogs"
allowed_domains = ["cnblogs.com"]
start_urls = [
'http://www.cnblogs.com/fnng/default.aspx?page=1'
]
def parse(self, response):
hxs = HtmlXPathSelector(response)
# authors = hxs.select('//a[@class="titlelnk"]')
# sel.xpath('//a[@class="PostTitle"]').xpath('text()')
items = hxs.select('//a[@class="PostTitle"]')
a_page = hxs.select('//div[@id="pager"]/a')
for a_item in items:
item = CnblogsItem()
# property
item['Title'] = ''.join(a_item.xpath('text()').extract())
item['TitleUrl'] = a_item.xpath('@href').extract()
yield item
# get the page index
log.msg(len(a_page))
if len(a_page) > 0:
for a_item in a_page:
page_text = ''.join(a_item.xpath('text()').extract())
if page_text == '下一页'.encode('utf-8') or 'Next' in page_text:
next_url = ''.join(a_item.xpath('@href').extract())
log.msg(next_url)
yield Request(next_url, callback=self.parse)
break
我们来运行看看效果如何:
所有的数据完整,效果还是不错的。
关于第三种,以规则来规划爬虫的机制,在以后会介绍 ~ ~
数据输出的方式
使用内置命令
上面的scrapy命令是:scrapy crawl cnblogs\_blogs -nolog -o cnblogs\_blogs.json -t json
那结果输出的就是json格式的文件,-t 指的是输出文件格式,json ,-t 支持下列参数:
xml
csv
json
jsonlines
jl
pickle
marshal
一般选择xml ,csv,json三种格式就够了,这样可以很方便的导入各种数据库。
更多的参考:http://doc.scrapy.org/en/latest/topics/feed-exports.html
数据库连接
数据保存为文件的形式然后导入是一个不错的选择,不过一般都会有一定的IO开销,一般可以将Item直接保存到数据库中,这个时候就要引入pipelines这个部件了。
在我们项目的根目录下有一个名为:pipelines.py文件,我们在设置里面首先启用这个文件,在启用之后,spider得到的item都会传入到这个部件中进行二次处理,
3.1在settings.py中启用pipelines
ITEM_PIPELINES = {
'cnblogs.pipelines.CnblogsPipelineobj': 300,
}
注意命名方式:botname.moudlename.classname 要不然会找不到指定的模块。
3.2 编写pipelines
# -*- coding: utf-8 -*-
import MySQLdb
import MySQLdb.cursors
import logging
from twisted.enterprise import adbapi
class CnblogsPipelineobj(object):
def __init__(self):
self.dbpool = adbapi.ConnectionPool(
dbapiName ='MySQLdb',
host ='127.0.0.1',
db = 'cnblogs',
user = 'root',
passwd = '密码',
cursorclass = MySQLdb.cursors.DictCursor,
charset = 'utf8',
use_unicode = False
)
# pipeline dafault function
def process_item(self, item, spider):
query = self.dbpool.runInteraction(self._conditional_insert, item)
logging.debug(query)
return item
# insert the data to databases
def _conditional_insert(self, tx, item):
parms = (item['Title'], item['TitleUrl'])
sql = "insert into blogs values('%s','%s') " % parms
#logging.debug(sql)
tx.execute(sql)
OK.运行一下看一下效果如何
3.3 [重要更新]
3.2的方法已经过时很多,这里使用另外一种方法,sqlalchemy去连数据库
准备models文件
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String, Date, DateTime, Text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from settings import MYSQL_CONN
from datetime import datetime
# declare a Mapping,this is the class describe map to table column
Base = declarative_base()
class Cnbeta(Base):
__tablename__ = 'cnbeta'
id = Column(Integer, primary_key=True, autoincrement=True)
score = Column(Integer, nullable=False, default=0)
catid = Column(Integer, nullable=False, default=0)
score_story = Column(String(512), nullable=False, default='')
hometext = Column(String(1024), nullable=False, default='')
counter = Column(Integer, nullable=False, default=0)
inputtime = Column(DateTime, nullable=False, default=datetime.now())
topic = Column(Integer, nullable=False, default=0)
source = Column(String(128), nullable=False, default='')
mview = Column(Integer, nullable=False, default=0)
comments = Column(Integer, nullable=False, default=0)
crawled_datetime = Column(DateTime, nullable=False, default=datetime.now())
rate_sum = Column(Integer, nullable=False, default=0)
title = Column(String(512), nullable=False, default='')
url_show = Column(String(512), nullable=False, default='')
thumb = Column(String(256), nullable=False, default='')
def create_session():
# declare the connecting to the server
engine = create_engine(MYSQL_CONN['mysql_uri']
.format(user=MYSQL_CONN['user'], pwd=MYSQL_CONN['password'], host=MYSQL_CONN['host'],
db=MYSQL_CONN['db'])
, echo=False)
# connect session to active the action
Session = sessionmaker(bind=engine)
session = Session()
return session
def map_orm_item(scrapy_item,sql_item):
for k, v in scrapy_item.iteritems():
sql_item.__setattr__(k, v)
return sql_item
def convert_date(date_str):
pass
在settings.py 里面的配置好数据库链接
MYSQL_CONN = {
'host':'127.0.0.1',
'user':'user_name',
'password':'user_pwd',
'db':'test_db',
'table':'test_tb',
'mysql_uri':'mysql://{user}:{pwd}@{host}:3306/{db}?charset=utf8'
}
在中间件中写好代码
from settings import MONGODB
import pymongo
import models
class CnbetaMysqlPipeline(object):
def __init__(self):
self.session = models.create_session()
def process_item(self, item, spider):
sql_cnbeta = models.cnbeta()
sql_cnbeta = models.map_orm_item(scrapy_item=item, sql_item=sql_cnbeta)
self.session.add(sql_cnbeta)
self.session.commit()
self.session.close()
return item
更多的详细源代码参考==>源代码<===
总结
本次主要多三个方向来解决连续爬取文章内容,并将获得内容保存的问题,不过文中主要介绍的,还是以定向为基础的爬取,和以规则来构建的爬虫还是有区别,下篇文章将介绍。
- 原文作者:大鱼
- 原文链接:https://brucedone.com/archives/140/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议. 进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。