這里通過代碼一步一步的演變,最后完成的是一個(gè)精簡的Scrapy。在Scrapy內(nèi)部,基本的流程就是這么實(shí)現(xiàn)的。主要是為了能通過學(xué)習(xí)了解Scrapy大致的流程,對(duì)之后再要去看Scrapy的源碼也是有幫助的。
網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)!專注于網(wǎng)頁設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、重慶小程序開發(fā)、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了鶴山免費(fèi)建站歡迎大家使用!
因?yàn)镾crapy是基于Twisted實(shí)現(xiàn)的,所以先看Twisted怎么用
基本使用的示例:
from twisted.web.client import getPage, defer
from twisted.internet import reactor
# 所有任務(wù)完成后的回調(diào)函數(shù)
def all_done(arg):
"""所有爬蟲執(zhí)行完后執(zhí)行,循環(huán)終止"""
print("All Done")
reactor.stop()
# 單個(gè)任務(wù)的回調(diào)函數(shù)
def callback(contents):
"""每個(gè)爬蟲獲取到結(jié)果后執(zhí)行"""
print(contents)
deferred_list = []
url_list = [
'http://www.bing.com',
'http://www.baidu.com',
'http://edu.51cto.com',
]
for url in url_list:
deferred = getPage(bytes(url, encoding='utf-8'))
deferred.addCallback(callback)
deferred_list.append(deferred)
dlist = defer.DeferredList(deferred_list)
dlist.addBoth(all_done)
if __name__ == '__main__':
reactor.run()
在for循環(huán)里,創(chuàng)建了對(duì)象,還給對(duì)象加了回調(diào)函數(shù),這是單個(gè)任務(wù)完成后執(zhí)行的。此時(shí)還沒有進(jìn)行下載,而是把所有的對(duì)象加到一個(gè)列表里。
之后的defer.DeferredList的調(diào)用,才是執(zhí)行所有的任務(wù)。并且又加了一個(gè)回調(diào)函數(shù)all_done,這個(gè)是所有任務(wù)都完成后才執(zhí)行的。
基于裝飾器也可以實(shí)現(xiàn),下面的代碼是基于上面的示例做了簡單的轉(zhuǎn)換:
from twisted.web.client import getPage, defer
from twisted.internet import reactor
def all_done(arg):
print("All Done")
reactor.stop()
def one_done(response):
print(response)
@defer.inlineCallbacks
def task(url):
deferred = getPage(bytes(url, encoding='utf-8'))
deferred.addCallback(one_done)
yield deferred
deferred_list = []
url_list = [
'http://www.bing.com',
'http://www.baidu.com',
'http://edu.51cto.com',
]
for url in url_list:
deferred = task(url)
deferred_list.append(deferred)
dlist = defer.DeferredList(deferred_list)
dlist.addBoth(all_done)
if __name__ == '__main__':
reactor.run()
把原來for循環(huán)里的2行代碼封裝的了一個(gè)task函數(shù)里,并且加了裝飾器。
這個(gè)task函數(shù)有3個(gè)要素:裝飾器、deferred對(duì)象、通過yield返回返回對(duì)象。這個(gè)是Twisted里標(biāo)準(zhǔn)的寫法。
在上面的示例的基礎(chǔ)上,把整個(gè)for循環(huán)都移到task函數(shù)里了:
from twisted.web.client import getPage, defer
from twisted.internet import reactor
def all_done(arg):
print("All Done")
reactor.stop()
def one_done(response):
print(response)
@defer.inlineCallbacks
def task():
for url in url_list:
deferred = getPage(bytes(url, encoding='utf-8'))
deferred.addCallback(one_done)
yield deferred
url_list = [
'http://www.bing.com',
'http://www.baidu.com',
'http://edu.51cto.com',
]
ret = task()
ret.addBoth(all_done)
if __name__ == '__main__':
reactor.run()
上面說個(gè)的3要素:裝飾器、deferred對(duì)象、yield都有。
在前面的示例中,每完成一個(gè)任務(wù),就會(huì)返回并執(zhí)行一個(gè)回調(diào)函數(shù)one_done。所有任務(wù)如果都返回了,程序就會(huì)退出(退出前會(huì)執(zhí)行回調(diào)函數(shù)all_done)。
這里所做的,就是添加一個(gè)不會(huì)返回的任務(wù),這樣程序的一直不會(huì)退出了:
from twisted.web.client import getPage, defer
from twisted.internet import reactor
def all_done(arg):
print("All Done")
reactor.stop()
def one_done(response):
print(response)
@defer.inlineCallbacks
def task():
for url in url_list:
deferred = getPage(bytes(url, encoding='utf-8'))
deferred.addCallback(one_done)
yield deferred
# 下面的這個(gè)任務(wù)永遠(yuǎn)不會(huì)完成
stop_deferred = defer.Deferred() # 這是一個(gè)空任務(wù),不會(huì)去下載,所以永遠(yuǎn)不會(huì)返回
# stop_deferred.callback(None) # 執(zhí)行這句可以讓這個(gè)任務(wù)返回
stop_deferred.addCallback(lambda s: print(s))
stop_deferred.callback("stop_deferred")
yield stop_deferred
url_list = [
'http://www.bing.com',
'http://www.baidu.com',
'http://edu.51cto.com',
]
ret = task()
ret.addBoth(all_done)
if __name__ == '__main__':
reactor.run()
這里的做法,就是加了一個(gè)額外的任務(wù)。要求返回的是Deferred對(duì)象,這里就創(chuàng)建了一個(gè)空的Deferred對(duì)象,并把這個(gè)對(duì)象返回。
在這里,我們并沒有讓這個(gè)空的Deferred對(duì)象去下載,所以也就永遠(yuǎn)不會(huì)有返回。
永不退出的意義
這里目的就是不讓程序退出,讓這個(gè)事件循環(huán)一直在那里執(zhí)行。之后還可以繼續(xù)往里面添加任務(wù),然后執(zhí)行新的任務(wù)。
程序退出的方法
還是可以讓程序退出的。就是調(diào)用stop_deferred的callback方法,在上面的代碼里注釋掉了。執(zhí)行這個(gè)方法,就是強(qiáng)制執(zhí)行該任務(wù)的回調(diào)函數(shù)。
之前都是等任務(wù)執(zhí)行完返回后,會(huì)自動(dòng)調(diào)用callback方法,這里就是強(qiáng)制調(diào)用了。
并且由于代碼里沒有為stop_deferred指定回調(diào)函數(shù),所有調(diào)用方法后不會(huì)執(zhí)行任何函數(shù)。不過調(diào)用callback方法必須有一個(gè)參數(shù),這里隨便寫個(gè)就好了。
也可以給stop_deferred加一個(gè)回調(diào)函數(shù),然后再調(diào)用callback方法:
stop_deferred.addCallback(lambda s: print(s))
stop_deferred.callback("stop_deferred")
Scrapy里的做法
這就是Scrapy里運(yùn)行完終止的邏輯。第一次只有一個(gè)url,執(zhí)行完就返回了,并且此時(shí)應(yīng)該是所有任務(wù)都返回了,那么就會(huì)退出程序。
在Scrapy里,也是這樣加了一個(gè)永遠(yuǎn)不會(huì)返回的任務(wù),不讓程序退出。然后之前的結(jié)果返回后,又會(huì)生成新的任務(wù)到調(diào)度器,這樣就會(huì)動(dòng)態(tài)的添加任務(wù)繼續(xù)執(zhí)行。
要讓程序可以退出,這里還需要做一個(gè)檢測。在下載完成之后的回調(diào)函數(shù)里,會(huì)生成新的任務(wù)繼續(xù)給執(zhí)行。這里可以執(zhí)行2個(gè)回調(diào)函數(shù)。
第一個(gè)回調(diào)函數(shù)就是生成新的任務(wù)放入調(diào)度器,第二個(gè)回調(diào)函數(shù)就是檢測等待執(zhí)行的任務(wù)的數(shù)量,以及正在執(zhí)行的任務(wù)數(shù)量。如果都是0,表示程序可以結(jié)束了。
程序結(jié)束的方法就是上面的用的調(diào)用執(zhí)行callback方法。
基于上面的說的,這里的代碼實(shí)現(xiàn)了全部任務(wù)執(zhí)行完畢后可以調(diào)用stop_deferred的callback方法來退出:
from twisted.web.client import getPage, defer
from twisted.internet import reactor
task_list = []
stop_deferred = None
def all_done(arg):
print("All Done")
reactor.stop()
def one_done(response):
print(response)
def check_empty(response, *args, **kw):
url = kw.get('url')
if url in running_list:
running_list.remove(url)
if not running_list:
stop_deferred.callback()
@defer.inlineCallbacks
def task():
global running_list, stop_deferred # 全局變量
running_list = url_list.copy()
for url in url_list:
deferred = getPage(bytes(url, encoding='utf-8'))
deferred.addCallback(one_done)
deferred.addCallback(check_empty, url=url)
yield deferred
stop_deferred = defer.Deferred()
yield stop_deferred
url_list = [
'http://www.bing.com',
'http://www.baidu.com',
'http://edu.51cto.com',
]
ret = task()
ret.addBoth(all_done)
if __name__ == '__main__':
reactor.run()
上面的代碼功能上都實(shí)現(xiàn)了,但是實(shí)現(xiàn)方法有點(diǎn)不太好。
首先,task函數(shù)里分成了兩部分,一部分是我們自己調(diào)度的任務(wù),一部分是為了不讓程序退出,而加的一個(gè)空任務(wù)??梢园堰@兩部分拆開放在兩個(gè)函數(shù)里。分拆之后,只有第一部分的函數(shù)是需要留給用戶使用的。下面是把原來的task函數(shù)分拆后的代碼,并且每個(gè)函數(shù)也都需要加上裝飾器:
from twisted.web.client import getPage, defer
from twisted.internet import reactor
task_list = []
stop_deferred = None
def all_done(arg):
print("All Done")
reactor.stop()
def one_done(response):
print(response)
def check_empty(response, url):
if url in running_list:
running_list.remove(url)
if not running_list:
stop_deferred.callback()
@defer.inlineCallbacks
def open_spider():
global running_list
running_list = url_list.copy()
for url in url_list:
deferred = getPage(bytes(url, encoding='utf-8'))
deferred.addCallback(one_done)
deferred.addCallback(check_empty, url)
yield deferred
@defer.inlineCallbacks
def stop():
global stop_deferred
stop_deferred = defer.Deferred()
yield stop_deferred
@defer.inlineCallbacks
def task():
yield open_spider()
yield stop()
url_list = [
'http://www.bing.com',
'http://www.baidu.com',
'http://edu.51cto.com',
]
ret = task()
ret.addBoth(all_done)
if __name__ == '__main__':
reactor.run()
另外還有全局變量的問題,這里的代碼使用了全部變量,這不是一個(gè)好的做法。再改下去需要引入class了。
從這里開始,就要使用面向?qū)ο蟮姆椒ǎM(jìn)一步進(jìn)行封裝了。
先把之前主要的代碼封裝到類里:
from twisted.web.client import getPage, defer
from twisted.internet import reactor
import queue
class Request(object):
"""封裝請(qǐng)求的url和回調(diào)函數(shù)"""
def __init__(self, url, callback):
self.url = url
self.callback = callback
class Scheduler(object):
"""調(diào)度器"""
def __init__(self, engine):
self.engine = engine
self.q = queue.Queue()
def enqueue_request(self, request):
"""添加任務(wù)"""
self.q.put(request)
def next_request(self):
"""獲取下一個(gè)任務(wù)"""
try:
req = self.q.get(block=False)
except queue.Empty:
req = None
return req
def size(self):
return self.q.qsize()
class ExecutionEngine(object):
"""引擎"""
def __init__(self):
self._close_wait = None # stop_deferred
self.start_requests = None
self.scheduler = Scheduler(self)
self.in_progress = set() # 正在執(zhí)行中的任務(wù)
def _next_request(self):
while self.start_requests:
request = next(self.start_requests, None)
if request:
self.scheduler.enqueue_request(request)
else:
self.start_requests = None
while len(self.in_progress) < 5 and self.scheduler.size() > 0: # 最大編發(fā)為5
request = self.scheduler.next_request()
if not request:
break
self.in_progress.add(request)
d = getPage(bytes(request.url, encoding='utf-8'))
# addCallback是正確返回的時(shí)候執(zhí)行,還有addErrback是返回有錯(cuò)誤的時(shí)候執(zhí)行
# addBoth就是上面兩種情況返回都會(huì)執(zhí)行
d.addBoth(self._handle_downloader_output, request)
d.addBoth(lambda x, req: self.in_progress.remove(req), request)
d.addBoth(lambda x: self._next_request())
if len(self.in_progress) == 0 and self.scheduler.size() == 0:
self._close_wait.callback(None)
def _handle_downloader_output(self, response, request):
import types
gen = request.callback(response)
if isinstance(gen, types.GeneratorType): # 是否為生成器類型
for req in gen:
# 這里還可以再加判斷,如果是request對(duì)象則繼續(xù)爬取
# 如果是item對(duì)象,則可以交給pipline
self.scheduler.enqueue_request(req)
@defer.inlineCallbacks
def open_spider(self, start_requests):
self.start_requests = start_requests
yield None
reactor.callLater(0, self._next_request) # 過多少秒之后,執(zhí)行后面的函數(shù)
@defer.inlineCallbacks
def start(self):
"""原來的stop函數(shù)"""
self._close_wait = defer.Deferred()
yield self._close_wait
@defer.inlineCallbacks
def crawl(start_requests):
"""原來的task函數(shù)"""
engine = ExecutionEngine()
start_requests = iter(start_requests)
yield engine.open_spider(start_requests)
yield engine.start()
def all_done(arg):
print("All Done")
reactor.stop()
def one_done(response):
print(response)
count = 0
def chouti(response):
"""任務(wù)返回后生成新的Request繼續(xù)交給調(diào)度器執(zhí)行"""
global count
count += 1
print(response)
if count > 3:
return None
for i in range(10):
yield Request("http://dig.chouti.com/all/hot/recent/%s" % i, lambda x: print(len(x)))
if __name__ == '__main__':
url_list = [
'http://www.bing.com',
'https://www.baidu.com',
'http://edu.51cto.com',
]
requests = [Request(url, callback=one_done) for url in url_list]
# requests = [Request(url, callback=chouti) for url in url_list]
ret = crawl(requests)
ret.addBoth(all_done)
reactor.run()
這里還寫了一個(gè)回調(diào)函數(shù)chouti,可以在爬蟲返回后,生成新的Request繼續(xù)爬取。為了控制這個(gè)回調(diào)函數(shù)的調(diào)用,又加了一個(gè)全局變量。
接下來會(huì)對(duì)這部分函數(shù)繼續(xù)封裝,把所有的代碼都封裝到類里。
閉包解決全局變量
這里的部分是我自己嘗試的思考。
其實(shí)還可以通過閉包的方法。通過閉包來保存函數(shù)的狀態(tài),而不使用全局變量:
def chouti2():
n = 0
def func(response):
print(response)
nonlocal n
n += 1
if n > 3:
return None
for i in range(10):
yield Request("http://dig.chouti.com/all/hot/recent/%s" % i, lambda x: print(len(x)))
return func
if __name__ == '__main__':
url_list = [
'http://www.bing.com',
'https://www.baidu.com',
'http://edu.51cto.com',
]
# requests = [Request(url, callback=one_done) for url in url_list]
# requests = [Request(url, callback=chouti) for url in url_list]
callback = chouti2()
requests = [Request(url, callback=callback) for url in url_list]
ret = crawl(requests)
ret.addBoth(all_done)
reactor.run()
上面的示例還有幾個(gè)函數(shù),繼續(xù)把剩下的函數(shù)也封裝到類里。下面的這個(gè)就是TinyScrapy:
from twisted.web.client import getPage, defer
from twisted.internet import reactor
import queue
class Request(object):
"""封裝請(qǐng)求的url和回調(diào)函數(shù)"""
def __init__(self, url, callback=None):
self.url = url
self.callback = callback # 默認(rèn)是None,則會(huì)去調(diào)用Spider對(duì)象的parse方法
class Scheduler(object):
"""調(diào)度器"""
def __init__(self, engine):
self.engine = engine
self.q = queue.Queue()
def enqueue_request(self, request):
"""添加任務(wù)"""
self.q.put(request)
def next_request(self):
"""獲取下一個(gè)任務(wù)"""
try:
req = self.q.get(block=False)
except queue.Empty:
req = None
return req
def size(self):
return self.q.qsize()
class ExecutionEngine(object):
"""引擎"""
def __init__(self):
self._close_wait = None # stop_deferred
self.start_requests = None
self.scheduler = Scheduler(self)
self.in_progress = set() # 正在執(zhí)行中的任務(wù)
self.spider = None # 在open_spider方法里添加
def _next_request(self):
while self.start_requests:
request = next(self.start_requests, None)
if request:
self.scheduler.enqueue_request(request)
else:
self.start_requests = None
while len(self.in_progress) < 5 and self.scheduler.size() > 0: # 最大編發(fā)為5
request = self.scheduler.next_request()
if not request:
break
self.in_progress.add(request)
d = getPage(bytes(request.url, encoding='utf-8'))
# addCallback是正確返回的時(shí)候執(zhí)行,還有addErrback是返回有錯(cuò)誤的時(shí)候執(zhí)行
# addBoth就是上面兩種情況返回都會(huì)執(zhí)行
d.addBoth(self._handle_downloader_output, request)
d.addBoth(lambda x, req: self.in_progress.remove(req), request)
d.addBoth(lambda x: self._next_request())
if len(self.in_progress) == 0 and self.scheduler.size() == 0:
self._close_wait.callback(None)
# 這個(gè)方法和之前的有一點(diǎn)小的變化,主要是用到了新定義的Response對(duì)象
def _handle_downloader_output(self, body, request):
import types
response = Response(body, request)
# 如果沒有指定callback就調(diào)用Spider類的parse方法
func = request.callback or self.spider.parse
gen = func(response)
if isinstance(gen, types.GeneratorType): # 是否為生成器類型
for req in gen:
# 這里還可以再加判斷,如果是request對(duì)象則繼續(xù)爬取
# 如果是item對(duì)象,則可以交給pipline
self.scheduler.enqueue_request(req)
@defer.inlineCallbacks
def open_spider(self, spider, start_requests):
self.start_requests = start_requests
self.spider = spider # 加了這句
yield None
reactor.callLater(0, self._next_request) # 過多少秒之后,執(zhí)行后面的函數(shù)
@defer.inlineCallbacks
def start(self):
"""原來的stop函數(shù)"""
self._close_wait = defer.Deferred()
yield self._close_wait
class Response(object):
def __init__(self, body, request):
self.body = body
self.request = request
self.url = request.url
@property
def text(self):
return self.body.decode('utf-8')
class Crawler(object):
def __init__(self, spider_cls):
self.spider_cls = spider_cls
self.spider = None
self.engine = None
@defer.inlineCallbacks
def crawl(self):
self.engine = ExecutionEngine()
self.spider = self.spider_cls()
start_requests = iter(self.spider.start_requests())
yield self.engine.open_spider(self.spider, start_requests)
yield self.engine.start()
class CrawlerProcess(object):
def __init__(self):
self._active = set()
self.crawlers = set()
def crawl(self, spider_cls, *args, **kwargs):
crawler = Crawler(spider_cls)
self.crawlers.add(crawler)
d = crawler.crawl(*args, **kwargs)
self._active.add(d)
return d
def start(self):
dl = defer.DeferredList(self._active)
dl.addBoth(self._stop_reactor)
reactor.run()
@classmethod
def _stop_reactor(cls, _=None):
"""原來的all_done函數(shù)
之前的示例中,這個(gè)函數(shù)都是要接收一個(gè)參數(shù)的。
雖然不用,但是調(diào)用的模塊一定會(huì)傳過來,所以一定要接收一下。
這里就用了占位符來接收這個(gè)參數(shù),并且設(shè)置了默認(rèn)值None。
"""
print("All Done")
reactor.stop()
class Spider(object):
def __init__(self):
if not hasattr(self, 'start_urls'):
self.start_urls = []
def start_requests(self):
for url in self.start_urls:
yield Request(url)
def parse(self, response):
print(response.body)
class ChoutiSpider(Spider):
name = "chouti"
start_urls = ["http://dig.chouti.com"]
def parse(self, response):
print(next((s for s in response.text.split('\n') if "" in s)))
class BingSpider(Spider):
name = "bing"
start_urls = ["http://www.bing.com"]
class BaiduSpider(Spider):
name = "baidu"
start_urls = ["http://www.baidu.com"]
if __name__ == '__main__':
spider_cls_list = [ChoutiSpider, BingSpider, BaiduSpider]
crawler_process = CrawlerProcess()
for spider_cls in spider_cls_list:
crawler_process.crawl(spider_cls)
crawler_process.start()
這里用的類名、方法名、部分代碼都是和Scrapy的源碼里一樣的。相當(dāng)于把Scrapy精簡了,把其中的核心都提取出來了。如果能看明白這部分代碼,再去Scrapy里看源碼應(yīng)該能相對(duì)容易一些了。