IO编程中,Stream(流)的概念:Input stream 就是数据从外面(磁盘、网络)流进内存,output stream就是数据从内存流到外面去。对于浏览网页来说:浏览器与服务器之间至少要建立两根水管,才可以既能发数据,又能收数据。
由于CPU和内存的速度远远高于外设的速度,解决IO编程中速度严重不匹配的情况(两者区别:是否等待IO执行的结果)
- 一种是CPU等着,程序暂停执行后序代码,等数据写入磁盘再接着往下执行,称为同步IO
- CPU不等待,去执行其他的事情, 后续代码可以立刻接着执行,这种模式称为异步IO
- 使用异步编程的性能会远远高于同步IO,但是编程的模型复杂
每一种编程语言都会把操作系统提供的低级C接口封装起来方便使用,异步IO在涉及到服务器端程序开发时再讨论。
文件读写
stringIO 和BytesIO
操作文件和目录
序列化
异步IO
一个IO操作就阻塞了当前线程,导致其他代码无法执行,所以我们必须使用多线程或者多进程来并发执行代码,为多个用户服务。每个用户都会分配一个线程,如果遇到IO导致线程被挂起,其他用户的线程不受影响。
多线程和多进程的模型虽然解决了并发问题,但是系统不能无上限地增加线程。由于系统切换线程的开销也很大,所以,一旦线程数量过多,CPU的时间就花在线程切换上了,真正运行代码的时间就少了,结果导致性能严重下降。
目的:解决的问题是CPU高速执行能力与IO设备的龟速的严重不匹配:
- 多线程与多进程
- 异步IO,当代码需要执行一个耗时的IO操作的时候,只发出指令,并不等待IO结果,去执行其他代码,一段时间结束后,当IO返回结果的时候,通知CPU执行处理
do_some_coad()
f = open('/path/to/file','r')
r = f.read() # <==线程停在此处等待IO操作结果
# IO操作完成后线程才能继续执行
do_some_code(r)
异步IO模型需要一个消息循环,在消息循环中,主线不断地重复“读取消息-处理消息”这一过程:
loop=get_event_loop()
while True:
event = loop.get_event()
process_event(event)
这里附上一个比较好地理解的方式:
老张爱喝茶,废话不说,煮开水。 出场人物:老张,水壶两把(普通水壶,简称水壶;会响的水壶,简称响水壶)。
1 老张把水壶放到火上,立等水开。(同步阻塞) 老张觉得自己有点傻
2 老张把水壶放到火上,去客厅看电视,时不时去厨房看看水开没有。(同步非阻塞) 老张还是觉得自己有点傻,于是变高端了,买了把会响笛的那种水壶。水开之后,能大声发出嘀~~~~的噪音。
3 老张把响水壶放到火上,立等水开。(异步阻塞) 老张觉得这样傻等意义不大
4 老张把响水壶放到火上,去客厅看电视,水壶响之前不再去看它了,响了再去拿壶。(异步非阻塞) 老张觉得自己聪明了。
所谓同步异步,只是对于水壶而言。 普通水壶,同步;响水壶,异步。 虽然都能干活,但响水壶可以在自己完工之后,提示老张水开了。这是普通水壶所不能及的。 同步只能让调用者去轮询自己(情况2中),造成老张效率的低下。
所谓阻塞非阻塞,仅仅对于老张而言。 立等的老张,阻塞;看电视的老张,非阻塞。 情况1和情况3中老张就是阻塞的,媳妇喊他都不知道。虽然3中响水壶是异步的,可对于立等的老张没有太大的意义。所以一般异步是配合非阻塞使用的,这样才能发挥异步的效用。
协程
协程又称微线程,英文名字为Coroutine,以下为与子程序的对比:
- 子程序是通过栈实现的,一个线程就是执行一个子程序,子程序调用总是一个入口,一次返回,调用顺序是明确的
- 协程在执行的过程中,在子程序内部可以中断,然后执行别的子程序,在适当的时候再返回来接着执行
- 在一个子程序中中断,去执行其他的子程序,不是函数的调用,有点类似CPU的中断
另外的资料辅助理解:
- 协程不是计算机提供的,程序员人为创造
- 通过一个线程实现代码的相互切换
以下用两个函数来理解:
def A():
print('1')
print('2')
print('3')
def B():
print('x')
print('y')
print('z')
如果由协程来执行,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,相应的可能的结果为:
1
2
x
y
3
z
但是在A中是没有调用B的,协程的调用比函数的调用要复杂一点。执行看似有点像多线程,但是实际上是一个线程执行,相比多线程的优势如下:
- 协程极高的执行效率,子程序切换不是线程切换,是程序自身控制,也就没有线程切换的开销,当线程数量越多的时候,协程的性能优势就越明显。
- 不需要多线程的锁机制,只有一个线程,不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,执行效率比多线程的效率高很多。
- 如何利用多核CPU?:多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能
对协程的支持
python对协程的支持通过generator来实现,不但可以通过for循环来迭代,还可以不断调用next()函数获取由yield语句返回的下一个值(可能刚开始还不太理解:python中的yield不但可以返回一个值,它还可以接受调用者发出的参数)
目前实现协程的几种方式:
- greenlet,早期模块
- yield关键字
- asyncio装饰器(python3.4)
- async await关键字(python3.5)[最主流的,也是官方最推荐的方式]
以一个例子来举例:
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但是一不小心可能死锁,现在换一种方式:如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率会大大提高
def consumer():
r=' '
while True:
n=yield r # yield对应挂起(yield语句执行后,进入暂停,赋值语句在下一次启动生成器的时候首先被执行)
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'
def produce(c):
# send在接受None参数的情况下,等同于next(generator)的功能,同时send也可以接受其他的参数,后面举例说明
c.send(None) # 调用c.send(None),启动生成器
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
c = consumer()
produce(c)
# 执行结果如下:
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK
以下为对这段代码的解释:
consumer函数就是一个generator ,把一个consumer 传入produce后
- 调用c.send(None),启动生成器
- 一旦生产了东西,通过c.send(n),切换到consumer执行
- consumer通过yield拿到消息,处理,又通过yield把结果传回
- produce拿到consumer处理的结果,继续生下一条消息
- produce决定不生产,通过c.close()关闭consumer整个过程结束
为了更好的理解上面的c.send(n)的用法,这里也是参照别人的例子来辅助理解:
def num():
a = yield 1
while True:
a = yield a
c = num()
c.send(None)
print(c.send(5))
print(c.send(100))
# 打印结果
5
100
对上面的运行的理解:
- 首先使用c.send(None),返回生成器的第一个值 a=yield 1(只是挂起,并没有执行赋值语句)
- 使用c.send(5),再次启动生成器,这里传入了参数5,再次启动的时候,从上次yield语句断掉的地方开始执行,由于此时传入参数5,所以a被赋值5
- 然后程序进入while循环,当程序执行到a=yield a,此时先返回生成器的值5,下次启动生成器的时候,再执行赋值赋值语句
- 到这里再次理解:python的yield不但可以返回一个值,它还可以接收调用者发出的参数
- 有没有什么注意事项:在一个生成器函数未启动前,是不能传递值进去,也即在使用c.send(n)之前,必须先使用c.send(None)或者next(c)来返回生成器的第一个值
再次结合别人的整理来理解:
使用协程比较奇特:把数据传过去了,又神奇的接受到了数据,协程被创造出来就是用来解决异步任务的,不能够和多线程弄混,多线程是用来完成并发任务,异步和并发是两个不同的概念,同步和异步是一种描述指令执行或事件产生顺序,并发与并行指多个或多段可以独立运行的程序对系统资源(主要是CPU)的占用。异步任务最常见的就是读取文件,网络请求等IO操作。
以前处理异步任务都是使用回调(缺点就是地狱回调:回调函数套回调函数),协程的目标就是将这些烦人的异步代码可以像同步代码一样写,就是异步代码的同步化
协程的核心概念就是函数或一段程序能够被挂起,稍后在挂起的位置恢复,挂起和恢复是可以控制的,始终围绕的是挂起和恢复这两个概念,还有一个重要的组成—状态机,协程将原本连续的逻辑拆散,需要状态机来维护状态,每一次的挂起和恢复都会切换到对应的状态,以便下一次的操作可以知道该做什么?
这里结合这位答主更好的去理解:
另外还附上另一份参考资料:
https://www.zhihu.com/question/21823699
https://blog.csdn.net/SL_World/article/details/86597738
def consumer():
result = ''
while True:
print('[CONSUMER] 挂起')
# 1. 启动协程后运行到yield,挂起函数,并将状态扭转为挂起。函数栈跳转到producer
# 3.consumer函数从yield处恢复,从恢复状态中拿到sendData的值赋值给next。一个循环后到达yield,继续将函数挂起,并将状态扭转为挂起,result的值保存在状态中。函数栈跳到producer
next = yield result
print('[CONSUMER] 恢复--并接受到数据%s' % next)
result = '200 OK'
def produce(coroutine):
print('[PRODUCER] 准备启动协程')
# 这里采用next(coroutine)是启动协程,与c.send(None)的功能是一样的
# 启动协程就是运行这个函数,比较特殊,普通的调用方式无法运行,需要使用next(c)或者c.send(None)
next(coroutine)
sendData = 0
while sendData < 2:
sendData = sendData + 1
print('[PRODUCER] 发送数据 %s' % sendData)
# 2. producer函数继续执行到send方法,send恢复当前挂起,将状态扭转为恢复并将sendData的值保存在状态中。函数栈跳转到consumer
# 4. producer函数send方法从挂起状态中拿到result的值赋值给result
result = coroutine.send(sendData)
print('[PRODUCER] 处理结果: %s' % result)
coroutine.close()
coroutine = consumer()
produce(coroutine)
# 运行结果如下:
[PRODUCER] 准备启动协程
[CONSUMER] 挂起
[PRODUCER] 发送数据 1
[CONSUMER] 恢复--并接受到数据1
[CONSUMER] 挂起
[PRODUCER] 处理结果: 200 OK
[PRODUCER] 发送数据 2
[CONSUMER] 恢复--并接受到数据2
[CONSUMER] 挂起
[PRODUCER] 处理结果: 200 OK
asyncio
直接内置了对异步IO的支持
asyncio的编程模型就是一个消息循环,从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO
用asyncio实现Hello world的代码:后面有点逐层深入的感觉,感觉理解起来并没有那么容易呀:
# 用asynicio实现Hello world
import asyncio
# 把一个generator标记为coroutine类型,然后把coroutine扔到EventLoop中执行
@asyncio.coroutine
def hello():
print("Hello world!")
# 异步调用asyncio.sleep(1):
# yield from语法可以让我们方便地调用另一个generator
# 把asyncio.sleep(1)看成是一个耗时1秒的IO操作
r=yield from asyncio.sleep(1)
print("Hello again!")
# 获取EventLoop:
loop =asyncio.get_event_loop()
# 执行coroutine
loop.run_until_complete(hello())
loop.close
接下来用Task封装两个coroutine(这里需要理解,也是很重要的一点:也算是比较难理解的一点)
# 用Task封装两个coroutine
import threading
import asyncio
@asyncio.coroutine
def hello():
print("Hello word! (%s)" % threading.currentThrentThread())
yield from asyncio.sleep(1)
print("Hello again!(%s)" % threading.currentThread())
loop = asyncio.get_event_loop()
tasks = [hello(),hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
这里为了搞懂协程的知识以及这部分章节的理解又看了其他的一些理解(说实话才发现前面的理解的有点浅了,这些概念都还是十分抽象的),这里引用别人博客的文章来辅助理解。
从yield from 到 async的使用
“对同步和异步的看法,上面自己总结的时候也用了一些例子来说明”
【同步】:就是发出一个“调用”时,在没有得到结果之前,该“调用”就不返回,“调用者”需要一直等待该“调用”结束,才能进行下一步工作。
【异步】:“调用”在发出之后,就直接返回了,也就没有返回结果。“被调用者”完成任务后,通过状态来通知“调用者”继续回来处理该“调用”。
下面的这个例子通过顺序实现两个同步IO任务taskIO_1()和taskIO_2()很好的解释了如果cpu运行完毕之后要闲置很长时间取等待IO任务完成才能进行下一个任务计算这种模型的低效率。
# 用普通同步代码实现多个IO任务的案例
import time
def taskIO_1():
print('开始运行IO任务1...')
time.sleep(2) # 假设该任务耗时2s
print('IO任务1已完成,耗时2s')
def taskIO_2():
print('开始运行IO任务2...')
time.sleep(3) # 假设该任务耗时3s
print('IO任务2已完成,耗时3s')
start = time.time()
taskIO_1()
taskIO_2()
print('所有IO任务总耗时%.5f秒' % float(time.time()-start))
# 运行结果如下:
开始运行IO任务1...
IO任务1已完成,耗时2s
开始运行IO任务2...
IO任务2已完成,耗时3s
所有IO任务总耗时5.02295秒
因此需要用到异步的方式来处理上述的任务,极大的增大效率,就是运用协程,在python生成器的关键字yield中可以实现中断功能,也即协程是基于生成器的变形实现的,之后虽然编码的形式有变化,但是基本原理不变
使用yield from和@asyncio.coroutine实现协程
协程都是通过使用yield from和asyncio中的@asyncio.coroutine来实现的,asyncio专门被用来实现异步IO操作
yield与yield from的区别:yield在生成器中有中断的功能,可以传出值,也可以从函数外部接收值,yield from的实现就是简化了yield操作。
其中yield titles返回了titles完整列表
yield from titles等价于下面的这一小段代码
for title in titles: # 等价于yield from titles
yield title
用下面的例子来解释这种区别:代码比较简单,但是对于概念的理解非常有帮助
def generator_1(titles):
yield titles
def generator_2(titles):
yield from titles
titles = ['Python','Java','C++']
for title in generator_1(titles):
print('生成器1:',title)
for title in generator_2(titles):
print('生成器2:',title)
# 打印结果如下:
生成器1: ['Python', 'Java', 'C++']
生成器2: Python
生成器2: Java
生成器2: C++
于此同时,yield from还可以省去很多异常的处理,不需要我们去手动编写,内部已经实现了大部分的异常处理
继续…
通过生成器来实现一个整数加和的程序,通过send()函数向生成器中传入要加和的数字,然后返回None结束,total保存最后加和的总数
def generator_1():
total = 0
while True:
x=yield
print("加",x)
if not x:
break
total +=x
return total
def generator_2(): # 委托生成器
while True:
total = yield from generator_1() # 子生成器
print("加和总数是:",total)
def main(): # 调用方
g1=generator_1()
g1.send(None)
g1.send(2)
g1.send(3)
g1.send(None)
# g2 = generator_2()
# g2.send(None)
# g2.send(2)
# g2.send(3)
# g2.send(None)
main()
如果是按照上述这么写的话,会报错,但是如果将main()函数的代码改过来,就不会出错,此时g2即使传入了None也不报异常
def main(): # 调用方
# g1=generator_1()
# g1.send(None)
# g1.send(2)
# g1.send(3)
# g1.send(None)
g2 = generator_2()
g2.send(None)
g2.send(2)
g2.send(3)
g2.send(None)
# 最后打印结果
加 2
加 3
加 None
加和总数是: 5
综合上面的几个例子做一点相应的梳理:
- 子生成器:yield from后的generator_1()生成器函数是子生成器
- 委托生成器:也即上面的generator_2(),负责委托子生成器完成具体的任务
- 调用方:main()是程序中的调用方,负责调用委托生成器
由此引出了yield from的另外一个关键作用:建立调用方和子生成器的通道
- main()每次在调用send(value)时,value不是传递给了委托生成器generator_2(),而是借助yield from传递给了子生成器generator_1中的yield(这几句话的概括通过代码的调试就能够大致的了解了。)
- 子生成器中的数据也是通过yield直接发送到调用方main()中(代码调试就能够清楚这样的一个过程)
- 也因此我们在写代码的时候依据:调用方-子生成器-委托生成器的规范形式来书写(现在可能还无法体会,后面接触的多了也许会有相应的了解。)
如何结合@asyncio.coroutine实现协程
对初始的那个例子进行相应的更改(同步IO修改成协程的用法)
# 使用同步方式编写异步功能
import time
import asyncio
@asyncio.coroutine # 标志协程的装饰器
def taskIO_1():
print("开始进行IO任务1...")
yield from asyncio.sleep(2) # 假设该任务耗时2s
print("IO任务1已完成,耗时2s")
return taskIO_1.__name__
@asyncio.coroutine # 标志协程的装饰器
def taskIO_2():
print("开始进行IO任务2...")
yield from asyncio.sleep(3) # 假设该任务耗时3s
print("IO任务2已完成,耗时3s")
return taskIO_2.__name__
@asyncio.coroutine # 标志协程的装饰器
def main(): # 调用方
tasks=[taskIO_1(),taskIO_2()] # 把所有任务添加到task中
done,pending = yield from asyncio.wait(tasks) # 子生成器
for r in done: #done和pending都是一个任务,所以返回结果需要逐个调用result()
print("协程无序返回值:"+r.result())
if __name__=='__main__':
start = time.time()
loop=asyncio.get_event_loop() # 创建一个事件循环对象loop
try:
loop.run_until_complete(main()) # 完成事件循环,直到最后一个任务结束
finally:
loop.close() # 结束事件循环
print("所有IO任务总耗时%.5f秒" % float(time.time()-start))
# 打印结果如下:
开始进行IO任务2...
开始进行IO任务1...
IO任务1已完成,耗时2s
IO任务2已完成,耗时3s
协程无序返回值:taskIO_1
协程无序返回值:taskIO_2
所有IO任务总耗时3.02165秒
对这段代码进行相应的分析:
使用方法:
- @asyncio.coroutine装饰器是协程函数的标志,我们需要在每一个任务函数前加这个装饰器,并在函数中使用yield from
- 在同步IO任务的代码中使用的time.sleep(2)来假设任务执行了2秒,在协程中yield from后面必须是子生成器函数,time.sleep()并不是生成器,所以替换成使用内置模块提供的生成器函数asyncio.sleep()
功能:
通过使用协议,极大的增大了多任务的执行效率,最后消耗的时间就是任务队列中耗时最多的时间,总耗时就是taskIO_2的耗时时间
执行过程:(理解的关键,结合调试一起理解) - 先通过get_event_loop()获取了一个标准事件循环loop(单线程)
- 通过run_until_complete(main())来运行协程:run_until_complete意思是直到循环事件的所有事件都处理完才能完整结束
- 进入调用方协程,我们把多个任务[taskIO_1()和taskIO_2()]放到一个task列表中,可理解为打包任务。
- 使用asyncio.wait(tasks)来获取一个awaitable objects即可等待对象的集合(此处的aws是协程的列表),并发运行传入的aws,同时通过yield from返回一个包含(done, pending)的元组,done表示已完成的任务列表,pending表示未完成的任务列表;如果使用asyncio.as_completed(tasks)则会按完成顺序生成协程的迭代器(常用于for循环中),因此当你用它迭代时,会尽快得到每个可用的结果。【此外,当轮询到某个事件时(如taskIO_1()),直到遇到该任务中的yield from中断,开始处理下一个事件(如taskIO_2())),当yield from后面的子生成器完成任务时,该事件才再次被唤醒】
- done里面有我们需要的返回结果,但它目前还是个任务列表,所以要取出返回的结果值,我们遍历它并逐个调用result()取出结果即可
- 通过loop.close()关闭事件循环
总结:协程的完整实现是靠:事件循环+协程(这也是一开始在廖神的课程中总结的,要真正理解这句话真的是有点难呀)
使用async和await实现协程
在python3.5后引入新的语法:async和await,可以简化并更好的标识异步IO,如果需要用需要做如下的替换:
- 把@asyncio.coroutine替换为async
- 把yield from 替换为await
将上面已经有的代码进行相应的更新:
# 重点要把上下两段代码做一下相应的比较就知道相应的修改的地方了
import time
import asyncio
async def taskIO_1():
print("开始进行IO任务1...")
await asyncio.sleep(2) # 假设该任务耗时2s
print("IO任务1已完成,耗时2s")
return taskIO_1.__name__
async def taskIO_2():
print("开始进行IO任务2...")
await asyncio.sleep(3) # 假设该任务耗时3s
print("IO任务2已完成,耗时3s")
return taskIO_2.__name__
async def main(): # 调用方
tasks=[taskIO_1(),taskIO_2()] # 把所有任务添加到task中
done,pending = await asyncio.wait(tasks) # 子生成器
for r in done: #done和pending都是一个任务,所以返回结果需要逐个调用result()
print("协程无序返回值:"+r.result())
if __name__=='__main__':
start = time.time()
loop=asyncio.get_event_loop() # 创建一个事件循环对象loop
try:
loop.run_until_complete(main()) # 完成事件循环,直到最后一个任务结束
finally:
loop.close() # 结束事件循环
print("所有IO任务总耗时%.5f秒" % float(time.time()-start))
总结
引出问题:
- 同步编程的并发性不高
- 多进程编程受CPU核数限制,当任务数量远大于CPU核数时,执行效率会降低
- 多线程编程需要线程之间的通信,而且需要锁机制来防止共享变量被不同线程乱改(也是前面提到的提纲性的概念),由于python中的GIL(全局解释器锁),实际上无法做到正真的并行。(暂时没有接触到实际的项目,可能理解没有那么全面)
产生的需求: - 可不可采用同步的方式来编写异步功能代码?(已经实现)
- 能不能只用一个单线程就能做到不同任务之间的切换?这样没有线程切换的时间消耗,也不用使用锁机制来削弱多任务并发效率
- 对于IO密集型任务,可否有更高的处理方式来节省CPU的等待时间?
针对以上的3点需求,就有了协程的出现,多进程核多线程是内核级别的程序,而协程是函数级别的程序,是可以通过程序员进行调试的