协程实现原理及使用

协程

在上一篇文章中:【IO 多路复用】,我们介绍了如何基于 select(poll/epoll) + 回调 + 事件循环编写一个高并发的 IO 多路复用模型,也知道了协程的设计理念也是和 IO 多路复用是类似的。今天我们将学习 【协程】。

在介绍协程之前,我们需要理解准备知识:

  • 回调之痛
  • 生成器

回调之痛

在 【IO 多路复用】的实现过程中,我们使用 select 监听不同的事件描述符,并对不同的事件(写,读)注册不同的回调函数。在阅读代码时可以发现,其整体逻辑是没有同步代码编写清楚的,而且还有一些问题, 例如:

  • 如果回调函数执行异常时,我们应该如何处理?
  • 如果回调函数内有嵌套关系,如何处理
  • 当嵌套多层时,其中某个环节出错了会导致什么结果?
  • 。。。

总结下来就是:可读性差,共享状态管理困难,异常处理困难

那么我们应该如何优化呢?

结论就是:我们希望既有回调模式的高性能,又希望能够以同步的形式编写代码

什么是协程?

首先我们可以了解一下 C10M 的问题?

如何利用 8 核CPU,64G 内存,在 10gbps 的网络上保持 1000 万的并发连接。

在前面的文章中,我们实现了 IO 多路复用的模型,在单线程中如何实现并发。在【回调之痛】中,我们又希望进一步进行优化:回调的高性能,单线程下进行代码切换,同步的形式进行编写代码。

我们知道进程,线程都是由操作系统完成的,在单线程下进行切换是用户态的,也就是需要程序员自己进行函数切换,这个时候,就避免了多线程下的锁机制和切换代价,并发性就会很高。

那么问题来了,如何能够实现在单线程下,实现函数间的切换?

我们知道,在传统代码编写中 A 函数调用 B 函数后,就会直接 return

1
2
3
4
5
6
def A():
b = B() # 此处暂停,切换
print("A")

def B():
print("b")

而我们需要的是当执行 B 时,在调用位置暂停,去执行 A 的逻辑。

于是,我们需要的是一个可以暂停的函数,并且在适当的时候恢复该函数的执行。所以我们就引出了 【协程】的概念,一个具有多个入口的函数,可以暂停的函数(可以向暂停的地方传入值)

在 python 中,我们知道生成器是可以暂停的,其中就包含 yield /yield from 的用法,关于迭代器和生成器可以参考我的文章:【迭代器和生成器的实现原理】

yield from

在 python 3.3 中新增了 yield from 语法

1
2
3
4
5
6
7
8
9
10
11
from itertools import chain

my_list = [1,2,3]
my_dict = {
"bobby1":"http://projectsedu.com",
"bobby2":"http://www.imooc.com",
}

if __name__ == '__main__':
for value in chain(my_list, my_dict, range(5,10)):
print(value)
1
2
3
4
5
6
7
8
9
10
1
2
3
bobby1
bobby2
5
6
7
8
9

chain 函数会依次将可迭代对象遍历然后打印, 同样,我们将使用 yield 也可以实现

1
2
3
4
5
6
7
8
def my_chain(*args, **kwargs):
for my_iterable in args:
for value in my_iterable:
yield value

if __name__ == '__main__':
for value in my_chain(my_list, my_dict, range(5,10)):
print(value)

同样,我们也可以使用 yield from 实现上述效果:

1
2
3
4
5
6
7
def my_chain(*args, **kwargs):
for my_iterable in args:
yield from my_iterable

if __name__ == '__main__':
for value in my_chain(my_list, my_dict, range(5,10)):
print(value)

于是我们可以简单的将 yield from g 理解为 for v in g: yield v

其次,其最重要的功能是在调用者和子生成器之间建立透明的双向连接

接下来我们再来看个例子:

  1. main 调用方 g1(委托生成器) gen 子生成器
  2. yield from 会在调用方与子生成器之间建立一个双向通道这是yield 最核心的点,也为协程的同步实现提供了可能。
1
2
3
4
5
6
7
def g1(gen):
yield from gen


def main():
g = g1()
g.send(None)

yield from 功能十分强大,具体可参考:https://stackoverflow.com/questions/9708902/in-practice-what-are-the-main-uses-for-the-yield-from-syntax-in-python-3-3

协程

协程仍然是事件循环+协程模式

async && await

asyncio

官方使用可以参考:https://docs.python.org/3/library/asyncio.html

用于协程调度

  • 包含各种特定系统实现的模块化事件循环,包括 windows/linux 等系统
  • 传输和协议抽象
  • 对 TCP/UDP/SSL 子进程、延时调用以及其他具体支持
  • 模仿 futures 模块但适用于事件循环使用的 Future 类
  • 基于 yield from 的协议和任务,可以让你用顺序的方式编写并发代码
  • 必须使用一个将产生阻塞 IO 的调用时,有接口可以把这个事件转移到线程池
  • 模仿 threading 模块中的同步原语,可以用在单线程内的协程之间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
import time


async def get_html(url):
print("start get url")
await asyncio.sleep(2)
print("end get url")

if __name__ == '__main__':
start_time = time.time()
loop = asyncio.get_event_loop()
tasks = [get_html("1") for i in range(10)]
loop.run_until_complete(asyncio.wait(tasks))
end_time = time.time()
print(end_time-start_time)

output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
start get url
start get url
start get url
start get url
start get url
start get url
start get url
start get url
start get url
start get url
end get url
end get url
end get url
end get url
end get url
end get url
end get url
end get url
end get url
end get url
2.0052013397216797

Process finished with exit code 0

可以看到一次性提交了 10 个任务,并共计耗时 2 s,实现了并发。

常规方法使用介绍:

  • get_event_loop 获取事件循环
  • loop.run_until_complete 设定任务结束标志,提交任务

gather && wait

gather 用于提交多个任务,相对 wait 而言是高层的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
import time


async def get_html(url):
print("start get url")
await asyncio.sleep(2)
print("end get url")

if __name__ == '__main__':
start_time = time.time()
loop = asyncio.get_event_loop()
tasks = [get_html("1") for i in range(10)]
loop.run_until_complete(asyncio.gather(*tasks))
end_time = time.time()
print(end_time-start_time)

gather 使用较为灵活,还可以对 tasks 进行分组:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
import time


async def get_html(url):
print("start get url")
await asyncio.sleep(2)
print("end get url")

if __name__ == '__main__':
start_time = time.time()
loop = asyncio.get_event_loop()
tasks = [get_html("1") for i in range(10)]

# 分组
group1 = asyncio.gather(*tasks)
loop.run_until_complete(group1)
end_time = time.time()
print(end_time-start_time)

run_until_complete && run_forever

前者是如何得知 协程是全部完成的,查看源码可以知道其调用了 add_done_callback(),让事件循环停止

如何取消 task 的运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio
import time


async def get_html(sleep_time):
print(" waiting")
await asyncio.sleep(sleep_time)
print(f"done after {sleep_time}s")

if __name__ == '__main__':
start_time = time.time()
loop = asyncio.get_event_loop()
task1 = get_html(1)
task2 = get_html(2)
task3 = get_html(3)

tasks = [task1, task2, task3]
loop = asyncio.get_event_loop()

try:
loop.run_until_complete(asyncio.gather(*tasks))
except KeyboardInterrupt as e:
all_tasks = asyncio.Task.all_tasks()
for task in all_tasks:
print("call task")
task.cancel()
loop.stop()
loop.run_forever()

finally:
loop.close()

子协程

待补充

其他方法

待补充

ThreadPollExecutor + asyncio

待补充

aiohttp

待补充

-------------THANKS FOR READING-------------