博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python 开源异步并发框架的未来
阅读量:6039 次
发布时间:2019-06-20

本文共 11505 字,大约阅读时间需要 38 分钟。

http://segmentfault.com/a/1190000000471602

开源

 是开源的,介绍的这几个框架 、、 和  也都是开源的,最后这个演讲是在开源大会弄的,所以标题里肯定少不了开源。另外,我的  项目也是开源的——貌似不少同学被我起的极品名字给搞混了,特别说明一下, 虽然有跟  一样的接口外貌,但底层却是  驱动的(考虑把名字改回 gulip 之类的);请区别于将来会支持 Python 3 的  1.1。

非阻塞

先上一段代码。请原谅我用  代码充当伪代码了,但  的语法实在是太简单了,忍不住啊。

import sockets = socket.socket()s.connect(('www.google.com', 80)) print("We are connected to %s:%d" % s.getpeername())

这是很简单的一个客户端 TCP 连接程序。假如网络状况不是很好,执行这段程序时,我们很有可能要等个几秒钟,才能看到 We are connected 的输出字样。

对于这样的代码,我们就可以说程序阻塞在了 connect() 的调用上;而这样的函数我们叫做阻塞式的。

那么非阻塞呢?还是看一段代码。

import sockets = socket.socket()s.setblocking(0) try: s.connect(('www.google.com', 80)) except socket.error as e: print(str(e)) i = 0 while True: try: print("We are connected to %s:%d" % s.getpeername()) break except: print("Let's do some math while waiting: %d" % i) i += 1 else: print("We are connected to %s:%d" % s.getpeername())

这一下代码就多了——但是并不复杂。

首先看一开始的变化,多了一句 s.setblocking(0)。这是说,将这个 socket 对象变成非阻塞式的。这样一来,接下来的许多本应阻塞的调用将不会阻塞。

比如 connect()。非阻塞的 connect() 调用将会立即结束,而不管这个 TCP 连接是否真正建立了——如果 TCP 连接还没有完成握手,那么 connect() 会抛出一个异常说“开始连了,别着急一会儿就好”;否则(应该没有否则)就会“正常”地走 try...else 的路线。

抓到这个异常之后呢,我们就可以充分利用这段原本要阻塞的时间,在连接完全建立之前做一些有意义的事情——比如数数。我这里网络条件还凑合,一般情况下数到一万多的时候就能跟 Google 连上了。

异步

可以看得出来,阻塞和非阻塞是说函数调用的,调用了之后要等到底层完事儿了之后才能继续的叫做阻塞;调用了之后,要么立即返回,要么立即抛异常,这就是非阻塞。

而与之如影随行的一对儿概念——同步和异步——则说的是一段程序的执行处理方式。一般情况下,阻塞式的调用都可以叫做同步,但非阻塞式的调用不一定是异步的。怎么讲呢,我们还是来看几个例子。

while server.running:    request = server.receive()    response = handle(request)    server.send(response)

这片代码片段示意的是同步的处理方式。可以看得出来,接收请求、处理请求、发送响应依次执行,前一个任务完成了才会做下一个;最外面还有一个 while 循环,使之不断地收请求发响应,且是发送完上一个响应之后才会接收下一个请求。请注意,我们并没有看到 receive() 等函数的实现细节,他们在底层可以是阻塞的,也可以是非阻塞的,这都不会影响我们看到的这片代码片段是同步的。

那么异步的代码看上去是什么样的呢?请允许我用  风格的代码来展示,因为异步的代码太“扭曲”了:

while server.running:    deferred = server.receive()    deferred.addCallback(on_request)def on_request(request): deferred = handle(request) deferred.addCallback(on_response) def on_response(response): server.send(response)

让我来大概地解释一下。为了实现异步,这里的 receive() 和 handle() 都必须是非阻塞的。在  中非阻塞的函数会立即返回一个  对象,通过给  对象添加回调函数,我们可以实现在这件事情真正完成之后,执行回调函数中定义的接下来要做的事儿。

看到扭曲的程度了吧。先接收一个请求——等等,你不一定立即就能接收到。好吧,等到接收到了的时候(on_request),我们把这个请求送去处理,然后——等等,处理不一定马上能完成。那好吧,等到处理完成之后(on_response),我们再把这个响应发送回去。说实话,我没忍心写,其实发送也不会立即完成……

虽然上面这段代码示例有些过份,仍有一些可以变得更简洁的地方,但是这对于大型项目中异步代码的描述并不失真。难道用所谓的异步框架写代码都会是这么扭曲么?

前面我们说的异步只是异步编码——从编写代码的方式上来判断。而通常说的异步框架,往往还会展现给用户一些同步的接口(后面还会提到),在框架内部,这些接口也都是用非阻塞的异步代码来实现的。对于这样的框架,我们仍然叫他们异步框架——总不能叫非阻塞框架,或是同步框架吧。

另外,异步编码也不一定就非要扭曲人性,还是有很多项目可以简洁明了地编写异步代码的,只不过对于程序员的要求会比编写同步代码稍高一些罢了。

并发与并行

好了,让我们先把纠结的异步放下,来看看另外两个容易混淆的概念。

估计您已经从视频里听了我办港澳通行证的惨痛经历了,这里就不重复了,但仍然用这个例子来解释一下并发和并行的概念吧。

并行的概念着重于处理端,也就是办理通行证的工作人员。有 5 个窗口开放,就意味着同一时间可以有 5 个业务可以得到并行的处理。对于计算机来说,并行势必要有多颗处理器,真正从物理上可以并行地处理多个任务;单 CPU 用多线程实现的叫做——也许除外。

相对于并行着重于处理端,并发的概念则是关于请求端,也就是关于用户的。当我们谈及朝阳区出入境办证大厅的并发量的时候,我们是在说该大厅在某一时刻能容纳的前来办证的人数,最大并发量说白了就是大厅里能站下多少人——包括正在办的和排队的。

包括排队的?那往大厅外面使劲儿排呗,这并发量岂不是无限大了?

与并发一起的还有很重要的一个概念,就是处理时间。如果一味追求并发量,势必会导致处理时间的大幅上升,大量请求多半时间在排队,这样并不能算是一个高效的系统设计。所以在系统资源到达瓶颈的时候,也许限制并发量,拒绝一些请求也许是一个明智的选择。

并发并不是不关心处理端,只不过多核并行或者单核时分复用都能实现并发,而且在实践中这两种实现方法往往会同时使用。多核并行实现的并发,其任务调度主要由操作系统完成,我们接下来着重关心一下单线程并发的任务调度问题。

事件驱动的单线程并发

只有一个线程,用阻塞调用是肯定无法实现并发的——除非把每次仅服务一个客户叫做“并发量为 1 的并发”。所以,我们必然会用到非阻塞调用。

请回忆一下前面我们演示非阻塞调用的那个例子,我们在等待连接建立的过程中,做了一些其他的有意义的事情,一旦连接建立成功,我们会接着之前做一些关于连接的事情——输出对方的地址。现在我们试着扩展这个例子,实现并发连接——我们同时启动 100 个 TCP 连接,任何一个连接成功了就立即输出对方地址。一开始我们可以这么写:

import socketsockets = {}for i in range(100): s = socket.socket() sockets[s.fileno()] = s s.setblocking(0) try: s.connect(('www.google.com', 80)) except: pass

我们将这 100 个 socket 对象按照他们的保存在了一个叫做 sockets 的字典里,并且一一调用了非阻塞的connect() 函数。

可是,接下来怎么写呢?难道要重复调用每一个 socket 对象的 getpeername() 函数,直到他们都正确返回了为止?CPU 消耗太大了吧。

操作系统给我们提供了一些接口,专门用于这类问题的: 及其升级版 (Linux) 和 (*BSD 和 Mac OS X),他们通常也被统称为 select 函数。select 是一种阻塞调用,专门用于从一些中,选出那些有新事件到达的描述符,其中事件包括可读、可写和出错。换句话讲呢,就是监视给出的 socket,任何一个有动静了就立即返回有动静的描述符。

比如前面这个例子里,我们希望在任何一个连接成功建立的时候,输出该连接的目的地址。于是接下来就可以这么写:

import selectwhile sockets:    fds = select.select([], list(sockets.keys()), [])[1] for fd in fds: s = sockets.pop(fd) print("%d connected to %s:%d" % ((fd,) + s.getpeername()))

也就是说,每次循环,我们都会从剩余的连接中,选出一些可写的 socket 对象——那意味着连接已经成功建立了,然后将他们的目标地址输出出来。

这就是一个很简单的事件驱动的异步并发了,虽然我们只是创建了 100 个 TCP 连接,但我们并发了,是事件驱动的了,而且我们异步地调用了后续的操作——输出目的地址。

异步并发不过如此,而已。

框架

只用 socket 和 select 来写一个异步 web 服务器也行,只不过会出一两条人命而已。虽然是开玩笑,但是我们多数情况下还是会选择使用一些现有的框架。

何谓框架呢,其实就是把上一小节的例子代码给拆开,一部分是仅包含 www.google.com 和 print() 的所谓用户代码,另一部分就是所有剩下的叫做框架的东西。比如这样:

import socketsockets = {}for i in range( ): s = sockets[s.fileno()] = s s.setblocking(0) try: s. ( ) except: pass import select while sockets: fds = select.select([], list(sockets.keys()), [])[ ] for fd in fds: s = sockets.pop(fd) ( s. )

当然这段代码并不是一个框架,因为它根本无法运行。但是我们可以通过它看到一个异步框架应该有的东西:

  1. 用于创建与框架契合的、非阻塞的 I/O 对象的接口
  2. 有一个主循环,用户可以启动它
  3. 用户可以在关心的事件发生时,执行自己的代码

回调函数和 Tornado

让我们以  为例,来看一下最基本的异步框架是怎么用的——虽然  并不仅限于此。

sock = socket.socket()sock.setblocking(0)sock.bind((“”, 80))sock.listen(128)def on_conn(fd, events): conn, address = sock.accept() conn.send(b’Hello’) io_loop = ioloop.IOLoop.instance() io_loop.add_handler(sock.fileno(), on_conn, io_loop.READ) io_loop.start()

这是一个简单的服务器程序,它会向每一个连进来的客户端发送一句问候。其中 add_handler() 的调用就是——我认为——  的经典用法,也就是注册回调函数。当有连接进来的时候, 就会根据要求来调用 on_conn(),后者随即会与客户端连接并送上问候。

Twisted 和封装……和回调函数

 里是各种封装,通过 Transport 将 socket 对象封装的更隐蔽,通过 Protocol 来实现用户协议的封装,像这样:

from twisted.internet import protocol, reactorclass Echo(protocol.Protocol): def dataReceived(self, data): self.transport.write(data) class EchoFactory(protocol.Factory): def buildProtocol(self, addr): return Echo() reactor.listenTCP(1234, EchoFactory()) reactor.run()

对于回调函数, 则发明了著名的  用以实现事件源与回调函数的分离,其实本质上没有区别,只是在写法上略有不同,这里就不多说了。

同步地异步

正如前面提到的,异步的编码方式——无论是  的回调函数,还是  的 ——想要用的出彩,需要程序员有相对较高的心理素质和职业修养。那如果能正常地、用同步的方式来编写异步执行的代码呢?

借助 Python 的 , 和  纷纷提供了这样的功能。比如下面这一段  的代码(请关注开头的修饰器和代码中的 yield):

@defer.inlineCallbacksdef main(endpoint, username="alice", password=“secret”): endpoint = endpoints.clientFromString(reactor, strport) factory = protocol.Factory() factory.protocol = imap4.IMAP4Client try: client = yield endpoint.connect(factory) yield client.login(username, password) yield client.select('INBOX') info = yield client.fetchEnvelope(imap4.MessageSet(1)) print 'First message subject:', info[1]['ENVELOPE'][1] except: print "IMAP4 client interaction failed" failure.Failure().printTraceback() task.react(main, sys.argv[1:])

这里的第一个 yield 中,endpoint.connect() 返回的是一个  对象,其回调函数的参数才是前面的 client 对象。通过 yield 跟 inlineCallbacks 修饰器的配合,我们就把回调函数和 main 函数揉在了一起,后面那三个 yield也是如此,这样的代码看上去是同步的,执行的底层实则是异步的。 也有类似的用法,这里就不多说了。

神奇的 yield!在这里到底发生了什么事情呢?我管它叫做异步切换,具体的代码可以看 inlineCallbacks 的实现。简单来说呢,yield 之前,connect() 在主循环里注册了一个关于连接创立的事件监听,然后通过 yield 把事件的处理权交给了 inlineCallbacks,同时将当前函数的执行状态挂起(yield 的功能,可以把栈保存下来),切换到inlineCallbaks 里继续执行,而 inlineCallbacks 则会返回至主循环,继续执行别的异步任务,直至前述事件发生且主循环排到了该事件,主循环会调用 inlineCallbacks 里的回调函数,后者会将之前挂起的执行状态恢复,这样 client 就被赋上了正确的值。

总的来看,在 yield 的时候,当前执行流程会被暂停以等待事件,别的执行流程会插进来执行,直至事件发生后,当前执行流程才有可能恢复执行。这非常类似于操作系统里面的任务调度,所以我管它叫做异步切换,只不过这种切换是主动进行的,而不是操作系统强制的。所以,如果你不 yield 交出执行权,别的执行流程永远没有办法被执行到,这也是单线程异步并发的一个需要注意的点。另外,单线程异步并发需要有足够的异步切换才能做到近似公平的排程,所以非常适合 I/O 密集型的运算,而 CPU 密集型的运算在这里往往会遇到比较严重的问题。

隐式的异步切换

在写单线程异步代码的时候,切记不要混合调用底层会阻塞的代码,因为那样会阻塞整个线程,导致所有并发的处理时间增加,最终会导致严重的性能问题。如果有一些阻塞的、同步的遗留代码,那该如何是好呢?答案是:把它们统一改成非阻塞的,或者使用多线程/多进程来处理。可是,如果要改成非阻塞的形式,那得加多少 yield 呀!

没关系,还有隐式的异步切换呢。通常我们把这种需要显式地写 yield 的代码叫做显式的异步切换,与之相对的就是隐式的异步切换。比如下面这段代码,我说它有隐式的异步切换,您信吗?

import sockets = socket.socket()s.connect(('www.google.com', 80)) print("We are connected to %s:%d" % s.getpeername())

这不就是文章一开头的那个例子嘛。别急,如果在最前面加这么两句,情况就完全不一样了:

from gevent import monkeymonkey.patch_all()

 就是隐式的异步切换的代表。通过所谓的 monkey patch, 把系统库里的 socket 等模块,替换成了 自己提供的相应的非阻塞模块。这样,上面的代码就变成(底层)异步的了。考虑到 monkey patch 的侵入性,您也可以考虑直接使用  提供的模块,比如这样:

from gevent import socket

如  这样的隐式的异步切换有个好处很明显,就是可以很容易地将阻塞式的遗留代码迁移到  上来,而不需要额外修改大量代码,这对于需要异步并发支持的许多大型现有项目来说,无疑是为数不多的几个选择之一——比如说。

但是,,隐式的异步切换的代价太大——倒不是说它的性能有多差,而是这种写法把异步切换隐藏的太深了,不知道什么时候就切换到别的地方去执行了。这样带来的直接问题就是——跟常规共享状态的多线程编程一样——我们很难保证在一段程序的执行过程中,某些本地状态不会被别的代码修改,再加上状态同步的代价,隐式的异步切换并不被特别看好。如果非得要用,记得尽量少共享状态,多用队列来实现信息传递,然后小心编码,仔细检查。

绿色的 Gevent

 之所以能实现隐式的异步切换,主要归功于 。 是  的一个分项目,用于在标准 CPython 中实现(也称协程、绿色线程)。

Python 中的 Greenlet 跟常规线程类似,也是会在独立的空间中执行一段代码,也有自己独立的栈空间。不同的是:

  1.  并不启动任何操作系统的线程,是绿色产品
  2.  任务之间的调度需要每个微线程里的代码自己显式地实现

用官方的一个例子演示一下这两个特点吧:

from greenlet import greenletdef test1(): print 12 gr2.switch() print 34 def test2(): print 56 gr1.switch() print 78 gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()

这个例子里一共有三个微线程,分别是 main(也就是最外层默认的主微线程,自动创建的)、gr1 和 gr2。程序一直顺序执行,直至最后一句 gr1.switch(),由 main 微线程切换至 gr1gr1 输出 12 之后,又切换至 gr2;接着 gr2 输出 56 后,又切换回 gr1 之前的切出点,继续输出 34;这是 gr1 结束了,系统会自动切换回 gr1 的父微线程——也就是 main 的最后一句 switch() 返回,至此整个程序结束。注意,78 并没有机会被输出。

 的主循环叫做 Hub,跑在一个单独的 greenlet 里。用户的程序从 main greenlet 开始执行,直至第一个异步切换。此时, 会把当前微线程——也就是 main ——与异步事件做一个关联,然后切换到 HubHub 于是开始运转,当某些事件发生时, 就会切换到相应关联的 greenlet 来执行,直至他们结束返回 Hub,或者主动切换回 Hub。比如 main 等待的事件发生了,Hub 就会切到 main 上执行——当然,如果这时 main 结束了,就不会像其他 greenlet 一样再返回 Hub 了。

所以,greenlet 和 generator、Deferred 一样,其实都是用来实现回调封装的一些工具,所以前面提到过的一些异步并发的注意事项, 也都适用。

互操作性存在的问题

多种框架的存在,说好听了是百花齐放各显神通、竞争才有发展,说难听了就是碎片化、选择恐惧症和维护代价巨大。比如说,同样是一个 Python 的 PostgreSQL 连接适配程序,有支持  的 ,有支持  的 ,还有  需要的 ——有啥话咱不能一气儿说完呢?如果上游的  更新了,这么多的适配器,是不是得要跟着更新哪。

再一个问题就是遗留代码。如果一个项目一直在用 ,有一天老板拿着张光盘说给我把这个弄上去,打开一看全都是.pyc 文件,木有源代码——直接调用会有之前提到的阻塞主线程的问题,扔到线程池里做又不甘心。如果能在  里用  就好了(现在确实可以,不过会替换 Twisted 的一部分)。

未来

asyncio 这个项目其实叫做 ,主要开发也都在那里,因为要进 Python 标准库了,所以才几经周折选了 asyncio 这么一个名字。asyncio 是 Python 作者的一个新项目,要求至少是 Python 3.3(手动安装),Python 3.4 里它就已经是标准库的一部分了。

之所以要求 Python 3.3,是因为 asyncio 的微线程依赖于 Python 3.3 的新语法:yield from。区别于 yieldyield from co 实现了类似于这样的功能:

for x in co:    yield x

这里说“类似”,是因为要比这复杂很多,但意思是一样的:将内层迭代器的元素无缝地合并到外层的迭代器里。有了这个,asyncio 就可以很容易地做微线程的嵌套了——也就是在一个微线程里面等待另一个结束返回结果。

asyncio 作为又一个异步并发框架,与其他现有框架差别并不大:主循环类似于  的 reactor,Future 对回调函数进行封装类似于 Deferred,可选的微线程类似于 inlineCallbacks,基于 yield from 的显式的异步切换类似于yield,这里就不多介绍了,总的来看非常像 。但是呢,它能进入标准库,还是有原因的。

互操作性

asyncio 作为参考实现,与其规格文档  是一起做出来的,蟒爹在做的过程中尤其关注了互操作性。

比如 asyncio 的主循环就是可以任意替换的,任何满足 asyncio 主循环接口要求的核心都可以被安装上去。为了做到这一点, 定义了严格的主循环接口,将 asyncio 的框架代码部分与主循环核心完全分离。这样一来,许多现有框架加个壳就可以支持 asyncio 了——不用改现有代码,写一个现有主循环接口到 asyncio 主循环接口的适配层,替换掉 asyncio 自带的主循环,这样 asyncio 的代码就可以跑在现有框架上面了。

另一个方向也是行得通的。 同样定义了丰富而清晰的用户接口,我们可以使用这些接口来实现一个现有框架的主循环替代品,这样就可以在不替换 asyncio 已有主循环的前提下,将别的框架的代码嫁接到 asyncio 上来。比如说我的 就是这么一个例子,我将  中原有的  代码删掉,用 asyncio 实现了一份 Gevent Hub,这样,gevent 的代码就可以跑在 asyncio 框架上了。更令人兴奋的是,如果 asyncio 使用的主循环核心又恰好是比如说 ,那么原先分别依赖  和  的代码,现在就可以跑在一起了,甚至互相调用也是可以的。

比如下面一段示例代码就演示了三个框架的融合:

import asyncioimport gevent  ## gevent3import redis from gevent import socket from redis import connection from twisted.web import server, resource from twisted.internet import reactor asyncio.set_event_loop(some_twisted_wrapper) class GreenInetConnection(connection.Connection): def _connect(self): #noinspection PyUnresolvedReferences sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(self.socket_timeout) sock.connect((self.host, self.port)) return sock class HelloResource(resource.Resource): isLeaf = True def render_GET(self, request): gevent.spawn(self.green_GET, request) return server.NOT_DONE_YET def green_GET(self, request): r = redis.StrictRedis( connection_pool=connection.ConnectionPool( connection_class=GreenInetConnection)) numberRequests = r.incr("numberRequests") request.setHeader("content-type", "text/plain") request.write("I am request #" + str(numberRequests) + "\n") request.finish() reactor.listenTCP(8080, server.Site(HelloResource())) asyncio.run_forever()

代码演示了一个简单的  web 服务器,使用  来处理逻辑,asyncio 则起到了牵线搭桥的作用。

虽然目前这段代码还不能运行,但是我相信在不久的将来,这种程度的互操作性终将实现。

更新: 项目已改名为 tulipcore(链接仍然有效),第一个 alpha 版本已经发布至 pypi.python.org。

转载地址:http://ahohx.baihongyu.com/

你可能感兴趣的文章
[转] 关于自信 我也说说
查看>>
at org.apache.hadoop.util.RunJar.main(RunJar.java:153)
查看>>
ASP.NET Core 开发-中间件(Middleware)
查看>>
软件质量的定义
查看>>
word中方框中打钩
查看>>
交易系统使用storm,在消息高可靠情况下,如何避免消息重复
查看>>
从乌云的错误漏洞分析看Mifare Classic安全
查看>>
Atitit php java python nodejs错误日志功能的比较
查看>>
StringUtils方法全集(转)
查看>>
第四章 Spring.Net 如何管理您的类___自定义对象行为
查看>>
如何离线安装Visual Studio 2017
查看>>
NPM版本号
查看>>
VS增加插件 Supercharger破解教程
查看>>
easyui-treegrid的案例
查看>>
倾斜摄影自动化建模成果的数据组织和单体化
查看>>
golang gob 有什么优势? gob/protobuf/json/xml 效率对比,benchmark 压力测试
查看>>
RxSwift學習教程之基礎篇
查看>>
分析轮子(七)- RandomAccess.java
查看>>
Zabbix历史数据库迁移 及分区
查看>>
Column 'parent_id' specified twice
查看>>