189 8069 5689

Python自动化开发学习-TinyScrapy-创新互联

这里通过代码一步一步的演变,最后完成的是一个精简的Scrapy。在Scrapy内部,基本的流程就是这么实现的。主要是为了能通过学习了解Scrapy大致的流程,对之后再要去看Scrapy的源码也是有帮助的。

让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:域名注册、网页空间、营销软件、网站建设、坊子网站维护、网站推广。

Twisted使用

因为Scrapy是基于Twisted实现的,所以先看Twisted怎么用

基本使用

基本使用的示例:

from twisted.web.client import getPage, defer
from twisted.internet import reactor

# 所有任务完成后的回调函数
def all_done(arg):
    """所有爬虫执行完后执行,循环终止"""
    print("All Done")
    reactor.stop()

# 单个任务的回调函数
def callback(contents):
    """每个爬虫获取到结果后执行"""
    print(contents)

deferred_list = []

url_list = [
    'http://www.bing.com',
    'http://www.baidu.com',
    'http://edu.51cto.com',
]

for url in url_list:
    deferred = getPage(bytes(url, encoding='utf-8'))
    deferred.addCallback(callback)
    deferred_list.append(deferred)

dlist = defer.DeferredList(deferred_list)
dlist.addBoth(all_done)

if __name__ == '__main__':
    reactor.run()

在for循环里,创建了对象,还给对象加了回调函数,这是单个任务完成后执行的。此时还没有进行下载,而是把所有的对象加到一个列表里。
之后的defer.DeferredList的调用,才是执行所有的任务。并且又加了一个回调函数all_done,这个是所有任务都完成后才执行的。

基于装饰器1

基于装饰器也可以实现,下面的代码是基于上面的示例做了简单的转换:

from twisted.web.client import getPage, defer
from twisted.internet import reactor

def all_done(arg):
    print("All Done")
    reactor.stop()

def one_done(response):
    print(response)

@defer.inlineCallbacks
def task(url):
    deferred = getPage(bytes(url, encoding='utf-8'))
    deferred.addCallback(one_done)
    yield deferred

deferred_list = []

url_list = [
    'http://www.bing.com',
    'http://www.baidu.com',
    'http://edu.51cto.com',
]

for url in url_list:
    deferred = task(url)
    deferred_list.append(deferred)

dlist = defer.DeferredList(deferred_list)
dlist.addBoth(all_done)

if __name__ == '__main__':
    reactor.run()

把原来for循环里的2行代码封装的了一个task函数里,并且加了装饰器。
这个task函数有3个要素:装饰器、deferred对象、通过yield返回返回对象。这个是Twisted里标准的写法。

基于装饰器2

在上面的示例的基础上,把整个for循环都移到task函数里了:

from twisted.web.client import getPage, defer
from twisted.internet import reactor

def all_done(arg):
    print("All Done")
    reactor.stop()

def one_done(response):
    print(response)

@defer.inlineCallbacks
def task():
    for url in url_list:
        deferred = getPage(bytes(url, encoding='utf-8'))
        deferred.addCallback(one_done)
        yield deferred

url_list = [
    'http://www.bing.com',
    'http://www.baidu.com',
    'http://edu.51cto.com',
]

ret = task()
ret.addBoth(all_done)

if __name__ == '__main__':
    reactor.run()

上面说个的3要素:装饰器、deferred对象、yield都有。

基于装饰器永不退出

在前面的示例中,每完成一个任务,就会返回并执行一个回调函数one_done。所有任务如果都返回了,程序就会退出(退出前会执行回调函数all_done)。
这里所做的,就是添加一个不会返回的任务,这样程序的一直不会退出了:

from twisted.web.client import getPage, defer
from twisted.internet import reactor

def all_done(arg):
    print("All Done")
    reactor.stop()

def one_done(response):
    print(response)

@defer.inlineCallbacks
def task():
    for url in url_list:
        deferred = getPage(bytes(url, encoding='utf-8'))
        deferred.addCallback(one_done)
        yield deferred
    # 下面的这个任务永远不会完成
    stop_deferred = defer.Deferred()  # 这是一个空任务,不会去下载,所以永远不会返回
    # stop_deferred.callback(None)  # 执行这句可以让这个任务返回
    stop_deferred.addCallback(lambda s: print(s))
    stop_deferred.callback("stop_deferred")
    yield stop_deferred

url_list = [
    'http://www.bing.com',
    'http://www.baidu.com',
    'http://edu.51cto.com',
]

ret = task()
ret.addBoth(all_done)

if __name__ == '__main__':
    reactor.run()

这里的做法,就是加了一个额外的任务。要求返回的是Deferred对象,这里就创建了一个空的Deferred对象,并把这个对象返回。
在这里,我们并没有让这个空的Deferred对象去下载,所以也就永远不会有返回。
永不退出的意义
这里目的就是不让程序退出,让这个事件循环一直在那里执行。之后还可以继续往里面添加任务,然后执行新的任务。
程序退出的方法
还是可以让程序退出的。就是调用stop_deferred的callback方法,在上面的代码里注释掉了。执行这个方法,就是强制执行该任务的回调函数。
之前都是等任务执行完返回后,会自动调用callback方法,这里就是强制调用了。
并且由于代码里没有为stop_deferred指定回调函数,所有调用方法后不会执行任何函数。不过调用callback方法必须有一个参数,这里随便写个就好了。
也可以给stop_deferred加一个回调函数,然后再调用callback方法:

stop_deferred.addCallback(lambda s: print(s))
stop_deferred.callback("stop_deferred")

Scrapy里的做法
这就是Scrapy里运行完终止的逻辑。第一次只有一个url,执行完就返回了,并且此时应该是所有任务都返回了,那么就会退出程序。
在Scrapy里,也是这样加了一个永远不会返回的任务,不让程序退出。然后之前的结果返回后,又会生成新的任务到调度器,这样就会动态的添加任务继续执行。
要让程序可以退出,这里还需要做一个检测。在下载完成之后的回调函数里,会生成新的任务继续给执行。这里可以执行2个回调函数。
第一个回调函数就是生成新的任务放入调度器,第二个回调函数就是检测等待执行的任务的数量,以及正在执行的任务数量。如果都是0,表示程序可以结束了。
程序结束的方法就是上面的用的调用执行callback方法。

执行完毕后停止事件循环

基于上面的说的,这里的代码实现了全部任务执行完毕后可以调用stop_deferred的callback方法来退出:

from twisted.web.client import getPage, defer
from twisted.internet import reactor

task_list = []
stop_deferred = None

def all_done(arg):
    print("All Done")
    reactor.stop()

def one_done(response):
    print(response)

def check_empty(response, *args, **kw):
    url = kw.get('url')
    if url in running_list:
        running_list.remove(url)
    if not running_list:
        stop_deferred.callback()

@defer.inlineCallbacks
def task():
    global running_list, stop_deferred  # 全局变量
    running_list = url_list.copy()
    for url in url_list:
        deferred = getPage(bytes(url, encoding='utf-8'))
        deferred.addCallback(one_done)
        deferred.addCallback(check_empty, url=url)
        yield deferred
    stop_deferred = defer.Deferred()
    yield stop_deferred

url_list = [
    'http://www.bing.com',
    'http://www.baidu.com',
    'http://edu.51cto.com',
]

ret = task()
ret.addBoth(all_done)

if __name__ == '__main__':
    reactor.run()

代码优化

上面的代码功能上都实现了,但是实现方法有点不太好。
首先,task函数里分成了两部分,一部分是我们自己调度的任务,一部分是为了不让程序退出,而加的一个空任务。可以把这两部分拆开放在两个函数里。分拆之后,只有第一部分的函数是需要留给用户使用的。下面是把原来的task函数分拆后的代码,并且每个函数也都需要加上装饰器:

from twisted.web.client import getPage, defer
from twisted.internet import reactor

task_list = []
stop_deferred = None

def all_done(arg):
    print("All Done")
    reactor.stop()

def one_done(response):
    print(response)

def check_empty(response, url):
    if url in running_list:
        running_list.remove(url)
    if not running_list:
        stop_deferred.callback()

@defer.inlineCallbacks
def open_spider():
    global running_list
    running_list = url_list.copy()
    for url in url_list:
        deferred = getPage(bytes(url, encoding='utf-8'))
        deferred.addCallback(one_done)
        deferred.addCallback(check_empty, url)
        yield deferred

@defer.inlineCallbacks
def stop():
    global stop_deferred
    stop_deferred = defer.Deferred()
    yield stop_deferred

@defer.inlineCallbacks
def task():
    yield open_spider()
    yield stop()

url_list = [
    'http://www.bing.com',
    'http://www.baidu.com',
    'http://edu.51cto.com',
]

ret = task()
ret.addBoth(all_done)

if __name__ == '__main__':
    reactor.run()

另外还有全局变量的问题,这里的代码使用了全部变量,这不是一个好的做法。再改下去需要引入class了。

模拟Scrapy

从这里开始,就要使用面向对象的方法,进一步进行封装了。

封装部分

先把之前主要的代码封装到类里:

from twisted.web.client import getPage, defer
from twisted.internet import reactor
import queue

class Request(object):
    """封装请求的url和回调函数"""
    def __init__(self, url, callback):
        self.url = url
        self.callback = callback

class Scheduler(object):
    """调度器"""
    def __init__(self, engine):
        self.engine = engine
        self.q = queue.Queue()

    def enqueue_request(self, request):
        """添加任务"""
        self.q.put(request)

    def next_request(self):
        """获取下一个任务"""
        try:
            req = self.q.get(block=False)
        except queue.Empty:
            req = None
        return req

    def size(self):
        return self.q.qsize()

class ExecutionEngine(object):
    """引擎"""
    def __init__(self):
        self._close_wait = None  # stop_deferred
        self.start_requests = None
        self.scheduler = Scheduler(self)
        self.in_progress = set()  # 正在执行中的任务

    def _next_request(self):
       while self.start_requests:
            request = next(self.start_requests, None)
            if request:
                self.scheduler.enqueue_request(request)
            else:
                self.start_requests = None
       while len(self.in_progress) < 5 and self.scheduler.size() > 0:  # 大编发为5
            request = self.scheduler.next_request()
            if not request:
                break
            self.in_progress.add(request)
            d = getPage(bytes(request.url, encoding='utf-8'))
            # addCallback是正确返回的时候执行,还有addErrback是返回有错误的时候执行
            # addBoth就是上面两种情况返回都会执行
            d.addBoth(self._handle_downloader_output, request)
            d.addBoth(lambda x, req: self.in_progress.remove(req), request)
            d.addBoth(lambda x: self._next_request())
        if len(self.in_progress) == 0 and self.scheduler.size() == 0:
            self._close_wait.callback(None)

    def _handle_downloader_output(self, response, request):
        import types
        gen = request.callback(response)
        if isinstance(gen, types.GeneratorType):  # 是否为生成器类型
            for req in gen:
                # 这里还可以再加判断,如果是request对象则继续爬取
                # 如果是item对象,则可以交给pipline
                self.scheduler.enqueue_request(req)

    @defer.inlineCallbacks
    def open_spider(self, start_requests):
        self.start_requests = start_requests
        yield None
        reactor.callLater(0, self._next_request)  # 过多少秒之后,执行后面的函数

    @defer.inlineCallbacks
    def start(self):
        """原来的stop函数"""
        self._close_wait = defer.Deferred()
        yield self._close_wait

@defer.inlineCallbacks
def crawl(start_requests):
    """原来的task函数"""
    engine = ExecutionEngine()
    start_requests = iter(start_requests)
    yield engine.open_spider(start_requests)
    yield engine.start()

def all_done(arg):
    print("All Done")
    reactor.stop()

def one_done(response):
    print(response)

count = 0
def chouti(response):
    """任务返回后生成新的Request继续交给调度器执行"""
    global count
    count += 1
    print(response)
    if count > 3:
        return None
    for i in range(10):
        yield Request("http://dig.chouti.com/all/hot/recent/%s" % i, lambda x: print(len(x)))

if __name__ == '__main__':
    url_list = [
        'http://www.bing.com',
        'https://www.baidu.com',
        'http://edu.51cto.com',
    ]
    requests = [Request(url, callback=one_done) for url in url_list]
    # requests = [Request(url, callback=chouti) for url in url_list]
    ret = crawl(requests)
    ret.addBoth(all_done)
    reactor.run()

这里还写了一个回调函数chouti,可以在爬虫返回后,生成新的Request继续爬取。为了控制这个回调函数的调用,又加了一个全局变量。
接下来会对这部分函数继续封装,把所有的代码都封装到类里。
闭包解决全局变量
这里的部分是我自己尝试的思考。
其实还可以通过闭包的方法。通过闭包来保存函数的状态,而不使用全局变量:

def chouti2():
    n = 0

    def func(response):
        print(response)
        nonlocal n
        n += 1
        if n > 3:
            return None
        for i in range(10):
            yield Request("http://dig.chouti.com/all/hot/recent/%s" % i, lambda x: print(len(x)))
    return func

if __name__ == '__main__':
    url_list = [
        'http://www.bing.com',
        'https://www.baidu.com',
        'http://edu.51cto.com',
    ]
    # requests = [Request(url, callback=one_done) for url in url_list]
    # requests = [Request(url, callback=chouti) for url in url_list]
    callback = chouti2()
    requests = [Request(url, callback=callback) for url in url_list]
    ret = crawl(requests)
    ret.addBoth(all_done)
    reactor.run()

完全封装

上面的示例还有几个函数,继续把剩下的函数也封装到类里。下面的这个就是TinyScrapy

from twisted.web.client import getPage, defer
from twisted.internet import reactor
import queue

class Request(object):
    """封装请求的url和回调函数"""
    def __init__(self, url, callback=None):
        self.url = url
        self.callback = callback  # 默认是None,则会去调用Spider对象的parse方法

class Scheduler(object):
    """调度器"""
    def __init__(self, engine):
        self.engine = engine
        self.q = queue.Queue()

    def enqueue_request(self, request):
        """添加任务"""
        self.q.put(request)

    def next_request(self):
        """获取下一个任务"""
        try:
            req = self.q.get(block=False)
        except queue.Empty:
            req = None
        return req

    def size(self):
        return self.q.qsize()

class ExecutionEngine(object):
    """引擎"""
    def __init__(self):
        self._close_wait = None  # stop_deferred
        self.start_requests = None
        self.scheduler = Scheduler(self)
        self.in_progress = set()  # 正在执行中的任务
        self.spider = None  # 在open_spider方法里添加

    def _next_request(self):
       while self.start_requests:
            request = next(self.start_requests, None)
            if request:
                self.scheduler.enqueue_request(request)
            else:
                self.start_requests = None
       while len(self.in_progress) < 5 and self.scheduler.size() > 0:  # 大编发为5
            request = self.scheduler.next_request()
            if not request:
                break
            self.in_progress.add(request)
            d = getPage(bytes(request.url, encoding='utf-8'))
            # addCallback是正确返回的时候执行,还有addErrback是返回有错误的时候执行
            # addBoth就是上面两种情况返回都会执行
            d.addBoth(self._handle_downloader_output, request)
            d.addBoth(lambda x, req: self.in_progress.remove(req), request)
            d.addBoth(lambda x: self._next_request())
        if len(self.in_progress) == 0 and self.scheduler.size() == 0:
            self._close_wait.callback(None)

    # 这个方法和之前的有一点小的变化,主要是用到了新定义的Response对象
    def _handle_downloader_output(self, body, request):
        import types
        response = Response(body, request)
        # 如果没有指定callback就调用Spider类的parse方法
        func = request.callback or self.spider.parse
        gen = func(response)
        if isinstance(gen, types.GeneratorType):  # 是否为生成器类型
            for req in gen:
                # 这里还可以再加判断,如果是request对象则继续爬取
                # 如果是item对象,则可以交给pipline
                self.scheduler.enqueue_request(req)

    @defer.inlineCallbacks
    def open_spider(self, spider, start_requests):
        self.start_requests = start_requests
        self.spider = spider  # 加了这句
        yield None
        reactor.callLater(0, self._next_request)  # 过多少秒之后,执行后面的函数

    @defer.inlineCallbacks
    def start(self):
        """原来的stop函数"""
        self._close_wait = defer.Deferred()
        yield self._close_wait

class Response(object):
    def __init__(self, body, request):
        self.body = body
        self.request = request
        self.url = request.url

    @property
    def text(self):
        return self.body.decode('utf-8')

class Crawler(object):
    def __init__(self, spider_cls):
        self.spider_cls = spider_cls
        self.spider = None
        self.engine = None

    @defer.inlineCallbacks
    def crawl(self):
        self.engine = ExecutionEngine()
        self.spider = self.spider_cls()
        start_requests = iter(self.spider.start_requests())
        yield self.engine.open_spider(self.spider, start_requests)
        yield self.engine.start()

class CrawlerProcess(object):
    def __init__(self):
        self._active = set()
        self.crawlers = set()

    def crawl(self, spider_cls, *args, **kwargs):
        crawler = Crawler(spider_cls)
        self.crawlers.add(crawler)
        d = crawler.crawl(*args, **kwargs)
        self._active.add(d)
        return d

    def start(self):
        dl = defer.DeferredList(self._active)
        dl.addBoth(self._stop_reactor)
        reactor.run()

    @classmethod
    def _stop_reactor(cls, _=None):
        """原来的all_done函数
        之前的示例中,这个函数都是要接收一个参数的。
        虽然不用,但是调用的模块一定会传过来,所以一定要接收一下。
        这里就用了占位符来接收这个参数,并且设置了默认值None。
        """
        print("All Done")
        reactor.stop()

class Spider(object):
    def __init__(self):
        if not hasattr(self, 'start_urls'):
            self.start_urls = []

    def start_requests(self):
        for url in self.start_urls:
            yield Request(url)

    def parse(self, response):
        print(response.body)

class ChoutiSpider(Spider):
    name = "chouti"
    start_urls = ["http://dig.chouti.com"]

    def parse(self, response):
        print(next((s for s in response.text.split('\n') if "" in s)))

class BingSpider(Spider):
    name = "bing"
    start_urls = ["http://www.bing.com"]

class BaiduSpider(Spider):
    name = "baidu"
    start_urls = ["http://www.baidu.com"]

if __name__ == '__main__':
    spider_cls_list = [ChoutiSpider, BingSpider, BaiduSpider]
    crawler_process = CrawlerProcess()
    for spider_cls in spider_cls_list:
        crawler_process.crawl(spider_cls)
    crawler_process.start()</code></pre><p>这里用的类名、方法名、部分代码都是和Scrapy的源码里一样的。相当于把Scrapy精简了,把其中的核心都提取出来了。如果能看明白这部分代码,再去Scrapy里看源码应该能相对容易一些了。</p><p align=center>另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、<a href="https://www.cdcxhl.com/gaofang/" target="_blank">高防服务器</a>、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。</p>            <br>
            分享标题:Python自动化开发学习-TinyScrapy-创新互联            <br>
            网址分享:<a href="http://gzruizhi.cn/article/ddiijp.html">http://gzruizhi.cn/article/ddiijp.html</a>
        </div>
    </div>
    <div class="other">
        <h3>其他资讯</h3>
        <ul>
            <li>
                    <a href="/article/giddsj.html">动态代理IP有什么网络功能</a>
                </li><li>
                    <a href="/article/gidcss.html">Java类加载机制怎么理解</a>
                </li><li>
                    <a href="/article/gidcij.html">如何使用PHP修改SESSION有效时间</a>
                </li><li>
                    <a href="/article/gidcjo.html">Linux内核链表剖析(二十)</a>
                </li><li>
                    <a href="/article/giddjp.html">安卓中linearLayout下android_layout.weight的属性的详细介绍</a>
                </li>        </ul>
    </div>
</div>
<footer>
  <div class="foot container">
    <div class="footl fl">
      <h3>联系我们</h3>
      <dl>
        您好HELLO!<br>
        感谢您来到宜宾网站建设公司,若您有合作意向,请您为我们留言或使用以下方式联系我们,

        我们将尽快给你回复,并为您提供真诚的设计服务,谢谢。
      </dl>
      <ul>
        <li>电话:028- <span>86922220 18980695689</span></li>
        <li>商务合作邮箱:631063699@qq.com</li>
        <li>合作QQ: 532337155</li>
        <li>成都网站设计地址:成都市青羊区锣锅巷31号五金站写字楼6楼</li>
      </ul>
    </div>
    <div class="footr fr">
      <h3>冠赛建站工作室</h3>
      <dl>
        宜宾冠赛网站建设公司拥有多年以上互联网从业经验的团队,始终保持务实的风格,以"帮助客户成功"为已任,专注于提供对客户有价值的服务。

        我们已为众企业及上市公司提供专业的网站建设服务。我们不只是一家网站建设的网络公司;我们对营销、技术、管理都有自己独特见解,冠赛建站采取“创意+综合+营销”一体化的方式为您提供更专业的服务!
      </dl>
      <h3>冠赛观点</h3>
      <dl>
        相对传统的宜宾网站建设公司而言,冠赛是互联网中的网站品牌策划,我们精于企业品牌与互联网相结合的整体战略服务。<br>
        我们始终认为,网站必须注入企业基因,真正使网站成为企业vi的一部分,让整个网站品牌策划体系变的深入而持久。
      </dl>
    </div>
  </div>
  <div class="link">
    <div class="container"> <span> 友情链接:</span>
        <a href="http://www.cdkjz.cn/fangan/tour/" title="旅游网站建设方案" target="_blank">旅游网站建设方案</a>   <a href="https://www.cdxwcx.com/city/deyang/" title="德阳网站建设" target="_blank">德阳网站建设</a>   <a href="http://www.kswsj.cn/tuiguang/" title="网站SEO优化排名" target="_blank">网站SEO优化排名</a>   <a href="https://www.cdcxhl.com/weihu/chengdu.html" title="成都网站维护公司" target="_blank">成都网站维护公司</a>   <a href="http://chengdu.cdcxhl.cn/weihu/
" title="成都网站维护" target="_blank">成都网站维护</a>   <a href="http://m.cdcxhl.cn/H5/
" title="H5建站" target="_blank">H5建站</a>   <a href="http://www.cdiso.cn/" title="小谭建站" target="_blank">小谭建站</a>   <a href="http://chengdu.cdcxhl.cn/jianshe/
" title="品牌网站建设" target="_blank">品牌网站建设</a>   <a href="https://www.cdcxhl.cn/
" title="香港虚拟主机" target="_blank">香港虚拟主机</a>   <a href="http://chengdu.cdcxhl.com/seo/" title="成都网站优化" target="_blank">成都网站优化</a>       </div>
  </div>
  <div class="copy"> © Copyright 2023 <a href="http://www.gzruizhi.cn">冠赛建站工作室</a>All Rights Reserved.  <a href="http://beian.miit.gov.cn" target="_blank" rel="nofollow">黔ICP备2021005940号-3</a>  <a href="https://www.cdxwcx.com" target="_blank">成都网站建设</a> / <a href="https://www.cdxwcx.com" target="_blank">成都网站建设</a> / <a href="https://www.cdxwcx.com" target="_blank">响应式网站建设</a> / <a href="https://www.cdcxhl.com" target="_blank">成都网站设计</a>  <a href="https://www.cdcxhl.com/service/weihu.html" target="_blank">成都网站维护</a> <a href="https://www.cdcxhl.com/news/" target="_blank">其他新闻分类</a> </div>
</footer>
</body>
</html>
<script>
    $(".con img").each(function(){
        var src = $(this).attr("src");    //获取图片地址
        var str=new RegExp("http");
        var result=str.test(src);
        if(result==false){
            var url = "https://www.cdcxhl.com"+src;    //绝对路径
            $(this).attr("src",url);
        }
    });
    window.onload=function(){
        document.oncontextmenu=function(){
            return false;
        }
    }
</script>