python协程async/await

简介

转载于:python 协程 async/await

前言

python因为自身的GIL的问题导致并发不能像java和C一样,但并不是说Python不能实现并发。常见的有两种:

  • 计算密集型
  • IO密集型

计算密集型

计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。

计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。

IO密集型

第二种任务的类型是IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。

IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。

异步IO

考虑到CPU和IO之间巨大的速度差异,一个任务在执行的过程中大部分时间都在等待IO操作,单进程单线程模型会导致别的任务无法并行执行,因此,我们才需要多进程模型或者多线程模型来支持多任务并发执行。

现代操作系统对IO操作已经做了巨大的改进,最大的特点就是支持异步IO。如果充分利用操作系统提供的异步IO支持,就可以用单进程单线程模型来执行多任务,这种全新的模型称为事件驱动模型,Nginx就是支持异步IO的Web服务器,它在单核CPU上采用单进程模型就可以高效地支持多任务。在多核CPU上,可以运行多个进程(数量与CPU核心数相同),充分利用多核CPU。由于系统总的进程数量十分有限,因此操作系统调度非常高效。用异步IO编程模型来实现多任务是一个主要的趋势。

同步和异步

同步是指完成事务的逻辑,先执行第一个事务,如果阻塞了,会一直等待,直到这个事务完成,再执行第二个事务,顺序执行。
异步是和同步相对的,异步是指在处理调用这个事务的之后,不会等待这个事务的处理结果,直接处理第二个事务去了,通过状态、通知、回调来通知调用者处理结果。

async\await 的使用

普通的函数执行的时候是不会中断的。所以如果要写一个可以被中断的函数,此时需要给函数加上 async 关键字。

async 关键字用来定义一个函数为异步函数,异步函数的特点能在函数执行过程中中断挂起,去执行其他的异步函数,等待挂起解挂后(抢占到资源),比如sleep(1)结束后, 再回来执行。

await 用来声明程序挂起,比如异步程序执行到某一步时需要等待的时间很长,就将此挂起,去执行其他的异步程序。await 后面只能跟异步程序或有await属性的对象,因为异步程序与一般程序不同。

两个异步程序async a、async b:

  • a中一步有await,当程序碰到关键字await b后;
  • 异步程序a挂起,去执行异步b程序(就相当于从一个函数内部跳出去执行其他函数);
  • 当挂起条件结束时候,不管b是否执行完,要马上从b程序中跳出来,回到原程序a执行原来的操作;
  • 如果await后面跟的b函数不是异步函数,那么操作就只能等b执行完再返回,无法在b执行的过程中返回,这样就相当于直接调用b函数,没必要使用await关键字了。

因此,需要await后面跟的是异步函数。

这里引用一个博主的博客,以洗衣服为例,eg. 我们有三台洗衣机, 现在有三批衣服需要分别放到这三台洗衣机里面洗。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# demo1
import time

# """
# 假设有三台洗衣机, 现在有三批衣服需要分别放到这三台洗衣机里面洗.
# """

def washing1():
time.sleep(3) # 第一台洗衣机, 需要洗3秒才能洗完 (只是打个比方)
print('小朋友的衣服洗完了') # 洗完的时候, 洗衣机会响一下, 告诉我们洗完了

def washing2():
time.sleep(2)
print('爷爷奶奶的衣服洗完了')

def washing3():
time.sleep(5)
print('爸爸妈妈的衣服洗完了')

startTime = time.time()
washing1()
washing2()
washing3()
endTime = time.time()
print("洗完三批衣服共耗时: ",endTime-startTime)

输出如下:

1
2
3
4
washer1 finished
washer2 finished
washer3 finished
洗完三批衣服共耗时: 10.022438526153564

实际上这样我们会发现有点不合理,因为实际上我们日常中比如有两台洗衣机,往往是这台开机洗衣之后就去操作另一台洗衣机的。这样就不用一直等待一个洗衣机洗完之后才去操作第二台洗衣机。为了提高效率,这里使用协程,“并发的”洗衣服。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# demo2
import time
async def washing1():
time.sleep(3)
print('小朋友的衣服洗完了')

async def washing2():
time.sleep(2)
print('爷爷奶奶的衣服洗完了')

async def washing3():
time.sleep(5)
print('爸爸妈妈的衣服洗完了')


startTime = time.time()
washing1()
washing2()
washing3()
endTime = time.time()
print("洗完三批衣服共耗时: ",endTime-startTime)

输出结果如下:

1
2
3
4
5
6
洗完三批衣服共耗时:  0.06781911849975586
washing1()
E:/code/asnicio/exercise5.py:40: RuntimeWarning: coroutine 'washing2' was never awaited
washing2()
E:/code/asnicio/exercise5.py:41: RuntimeWarning: coroutine 'washing3' was never awaited
washing3()

其实上面的脚本是无法运行的。
从正常人的理解来看,我们现在有了异步函数,但是却忘了定义应该什么时候 “离开” 一台洗衣机,去看看另一个…这就会导致,现在的情况是我们一边看着第一台洗衣机,一边着急地想着”是不是该去开第二台洗衣机了呢?” 但又不敢去(只是打个比方),最终还是花了10秒的时间才把衣服洗完。

参考前言1.2中介绍的函数是要被指定为可中断的,且中断的函数中需要指定为可等待的。现在我们吸取了上次的教训,告诉自己洗衣服的过程是 “可等待的”(awaitable),在它开始洗衣服的时候,我们可以去弄别的机器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# demo3
import time
async def washing1():
await time.sleep(3)
print('小朋友的衣服洗完了')


async def washing2():
await time.sleep(2)
print('爷爷奶奶的衣服洗完了')


async def washing3():
await time.sleep(5)
print('爸爸妈妈的衣服洗完了')


startTime = time.time()
washing1()
washing2()
washing3()
endTime = time.time()
print("洗完三批衣服共耗时: ",endTime-startTime)

输出结果为:

1
2
3
4
5
6
7
E:/code/asnicio/exercise5.py:39: RuntimeWarning: coroutine 'washing1' was never awaited
washing1()
洗完三批衣服共耗时: 0.03390932083129883
E:/code/asnicio/exercise5.py:40: RuntimeWarning: coroutine 'washing2' was never awaited
washing2()
E:/code/asnicio/exercise5.py:41: RuntimeWarning: coroutine 'washing3' was never awaited
washing3()

尝试运行一下,我们会发现还是会报错(报错内容和 demo2 一样)。这里说一下原因,以及在demo4 中会给出一个最终答案:

第一个问题是 await 后面必须跟一个 awaitable 类型或者具有 await 属性的对象。这个 awaitable,并不是我们认为 sleep() 是 awaitable 就可以 await 了,常见的 awaitable 对象应该是:await asyncio.sleep(3)。asyncio 库的 sleep() 机制与 time.sleep() 不同,前者是 “假性睡眠”,后者是会导致线程阻塞的 “真性睡眠”。await an_async_function() 一个异步的函数,也是可等待的对象。

1
2
3
4
5
6
以下是不可等待的:
await time.sleep(3)
x = await 'hello' # <class 'str'> doesn't define '__await__'
x = await 3 + 2 # <class 'int'> dosen't define '__await__'
x = await None # ...
x = await a_sync_function() # 普通的函数, 是不可等待的

第二个问题是,如果我们要执行异步函数,不能用这样的调用方法:

1
2
3
washing1()
washing2()
washing3()

而应该用 asyncio 库中的事件循环机制来启动(具体见 demo4 讲解)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# deom4
import time
import asyncio

async def washing1():
await asyncio.sleep(3)
print('小朋友的衣服洗完了')

async def washing2():
await asyncio.sleep(2)
print('爷爷奶奶的衣服洗完了')


async def washing3():
await asyncio.sleep(5)
print('爸爸妈妈的衣服洗完了')

# 2. 将异步函数加入事件队列
tasks = [
washing1(),
washing2(),
washing3(),
]


if __name__ == '__main__':
# 1. 创建一个事件循环
loop = asyncio.get_event_loop()
startTime = time.time()
# 3.执行队列实践,直到最晚的一个事件被处理完毕后结束
loop.run_until_complete(asyncio.wait(tasks))
# 4.如果不在使用loop,建议使用关闭,类似操作文件的close()函数
loop.close()
endTime = time.time()
print("洗完三批衣服共耗时: ",endTime-startTime)

输出结果为:

1
2
3
4
爷爷奶奶的衣服洗完了
小朋友的衣服洗完了
爸爸妈妈的衣服洗完了
洗完三批衣服共耗时: 5.01391339302063

run_until_complete(asyncio.wait(tasks)) 检测task任务的运行情况并返回结果,run_until_complete 是一个阻塞(blocking)调用,直到协程运行结束,它才返回。这一点从函数名不难看出。因此async /await简单用法总结如下:

  1. async def Function(): 定义async异步函数,中间可以添加await async.sleep(N) 来设定中断并执行下一个循环消息
  2. tasks = [] 任务则是对协程进一步封装,其中包含任务的各种状态。即多个coroutine函数可以封装成一组Task然后并发执行
  3. loop = asyncio.get_event_loop() #获取“事件循环”对象
  4. loop.run_until_complete(asyncio.wait(tasks)) #通过事件循环,去调用协程函数
  5. loop.close() 结束时间循环

关于loop可以参考Asyncio简介

aiohttp

如果需要并发http请求,通常是用requests,但requests是同步的库,如果想异步的话需要引入aiohttp。这里引入一个类,from aiohttp import ClientSession,首先要建立一个session对象,然后用session对象去打开网页。session可以进行多项操作,比如post, get, put, head等。
基本语法:

1
2
async with ClientSession() as session:
async with session.get(url) as response:

单url异步访问

同时这里也给出aiohttp异步实现的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio
from aiohttp import ClientSession

tasks = []
# 百度搜索框搜索: python asyncio
url = "https://www.baidu.com/s?wd=python+asyncio"
async def hello(url):
async with ClientSession() as session:
async with session.get(url) as response:
response = await response.read()
print(response.title())

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(hello(url))

首先async def 关键字定义了这是个异步函数,await 关键字加在需要等待的操作前面,response.read()等待request响应,是个耗IO操作。然后使用ClientSession类发起http请求。

多url异步访问

如果我们需要请求多个URL该怎么办呢,同步的做法访问多个URL只需要加个for循环就可以了。但异步的实现方式并没那么容易,在之前的基础上需要将 hello() 包装在 asyncio 的 Future 对象中,然后将 Future 对象列表作为任务传递给事件循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time
import asyncio
from aiohttp import ClientSession

tasks = []
url = "https://www.baidu.com/s?wd={}"
async def hello(url):
async with ClientSession() as session:
async with session.get(url) as response:
response = await response.read()
print('Hello World:%s' % time.time())

def run():
for i in range(5):
task = asyncio.ensure_future(hello(url.format(i)))
tasks.append(task)


if __name__ == '__main__':
loop = asyncio.get_event_loop()
run()
loop.run_until_complete(asyncio.wait(tasks))

收集请求响应

上面介绍了访问不同链接的异步实现方式,但是我们只是发出了请求,如果要把响应一一收集到一个列表中,最后保存到本地或者打印出来要怎么实现呢,可通过asyncio.gather(*tasks)将响应全部收集起来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import time
import asyncio
from aiohttp import ClientSession

tasks = []
url = "https://www.baidu.com/s?wd={}"
async def hello(url):
async with ClientSession() as session:
async with session.get(url) as response:
# print(response)
print('Hello World:%s' % time.time())
return await response.read()

def run():
for i in range(5):
task = asyncio.ensure_future(hello(url.format(i)))
tasks.append(task)
result = loop.run_until_complete(asyncio.gather(*tasks))
print(result)

if __name__ == '__main__':
loop = asyncio.get_event_loop()
run()
一分一毛,也是心意。