IO 多路复用原理及实现

IO 多路复用原理及实现

IO 多路复用是一种用于提高程序性能的网络编程技术。它通过在一个线程内同时监听多个输入输出流(通常是网络套接字),实现了在单个线程中处理多个并发的 IO 操作。这样可以避免创建大量的线程或进程,从而减少资源消耗和提高程序的并发处理能力。

今天我们一起学习 IO 多路复用的网络模型。在学习之前,请允许我介绍一下什么是 IO 多路复用模型?以及为什么要用到它?

在传统的阻塞 IO 模型中,当一个线程处理一个 IO 操作时,如果 IO 操作未完成,线程会被阻塞,无法处理其他的 IO 操作,这会导致效率低下。当然在非阻塞式的 IO 模型中,可以通过轮询的方式去获取任务是否处理完成,适用于不同类型任务的机制,但是这样也会特别消耗 cpu 资源,增加代码的复杂度。

IO 多路复用则是很好的避免了这些,其是基于 select/pool/epool 系统函数来监听多个 IO 的状态,当某个 IO 的状态发生改变时,调用回调函数,从而实现在单个线程中处理多个 IO 任务的能力,提高并发。

其中有 Nginx, Tornado, netty 等开源框架,都使用了 IO 多路复用模型,被广泛使用。

关于 unix io 模型的种类,这篇文章有详细介绍:http://localhost:4000/posts/baf320b1/

select && poll && epoll

在介绍 IO 多路复用之前,先介绍一下这三个系统调用函数,其在不同的操作系统中提供了类似的功能,运行程序同时监听多个 IO 对象的状态。

select

select 是最古老和最常见的 IO 多路复用机制之一,它使用三个文件描述符集合(读、写、异常)来检查多个 IO 对象的状态,然后其会阻塞等待,直到其中至少一个 IO 对象就绪或达到超时时间。然后其就会返回就绪的 IO 对象,应用程序可以对其进行相应的操作。

但是 select 每次在调用时都会重新构建文件的描述符集合,效率较低。

pool

pool 相对于 select 而言,是更加现代化和高效的 IO 多路复用机制pool 使用一个文件描述符组来检查多个 IO 对象的状态。与 select 不同的是,pool 不需要重新构建文件描述符集合,可以直接使用数组,从而提高了效率。它也能处理更大的文件描述符数量。

epoll

epoll 是在 linux 系统中提供的高效的 IO 多路复用机制。它引入了 三个重要的概念:epoll_createepoll_ctlepoll_wait 。epoll_create 用于创建一个 epoll 对象,epoll_ctl 用于向 epoll 对象注册或者删除 IO 对象,epoll_wait 用于等待 IO 对象就绪。与 select 和 poll 不同的是,epoll 使用了事件驱动的方式,只通知就绪的 IO 对象,从而避免了遍历整个 IO 对象集合的开销,提高了性能和可扩展性。其还支持边缘触发和水平触发两种模式。

对比:对于 select 和 poll 而言,进程需要不断轮询每个文件描述符状态,以确定哪些描述符已经就绪,这种方式叫做轮询式(polling)。这样的轮询机制在文件描述符较小的情况下还可以接受,但是随着文件描述符数量的增多,效率会明显降低,因为大部分时间都在空转浪费 CPU 资源。

epoll 通过时间驱动的方式解决了这个问题。在使用 epoll 的时候,进程首先会注册一组文件描述符到 Epoll 实例中,然后调用 Epoll 等待函数,例如 epoll_wait() 。epoll 等待函数会将进程置于阻塞的状态,直到以下三种事件之一发生:

  • 文件描述符变为可读状态(有数据可以从该文件描述符读取)
  • 文件描述符可写(可以向该文件描述符写入数据)
  • 文件描述符异常(例如关闭)

一旦有任何一个就绪事件发生,Epoll 等待函数就会立即返回,并且告知进程哪些文件描述符已经就绪。这样,进程就只需要处理那些真正需要处理的就绪事件,而不需要遍历所有的文件描述符,进而减少了不必要的 CPU 开销。

通过事件驱动的方式,Epoll 在高并发环境中具有更好的性能,因为它避免了不必要的轮询,并且只关注真正需要处理的事件。这使得 Epoll 成为在 Linux 系统上进行高性能网络编程的首选技术之一。

IO 多路复用的实现

为了更好的介绍 IO 多路复用网络模型,我们先说结论:

IO 多路复用实则是 回调+事件循环+ select(poll, epoll) 完成的, 通过在一个线程中监听多个网络套接字完成高并发。

首先,我先介绍一下 python 中的 select 函数,用于监听文件描述符的状态

1
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selectors 是一个高级 I/O 复用库。它建立在 select 模块原型的基础上, 其定义了一个 BaseSelector 抽象基类,以及多个实际的实现(KqueueSelector, EpollSelector),它们可被用于在多个文件对象上等待 I/O 就绪通知。

详情可见:https://docs.python.org/zh-cn/3/library/selectors.html

好的,基本的准备工作已经介绍完了,接下来,我们将使用以上的知识实现一个 爬虫的工作,对一系列网页进行爬取工作。

首先定义一个 Fetcher 类,用于完成 socket client 的封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Author: tyrone
File: select_test.py
Time: 2023/7/8
"""
# 通过非阻塞io实现http请求
# select + 回调 + 事件循环
# 并发性高
# 使用单线程

import socket
from urllib.parse import urlparse
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
# 使用select完成http请求
urls = []
stop = False


class Fetcher:
def connected(self, key):
selector.unregister(key.fd)
self.client.send(
"GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))
selector.register(self.client.fileno(), EVENT_READ, self.readable)

def readable(self, key):
d = self.client.recv(1024)
if d:
self.data += d
else:
selector.unregister(key.fd)
data = self.data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)
self.client.close()
urls.remove(self.spider_url)
if not urls: # 判断是否已抓取完成
global stop
stop = True

def get_url(self, url):
self.spider_url = url
url = urlparse(url)
self.host = url.netloc
self.path = url.path
self.data = b""
if self.path == "":
self.path = "/"

# 建立socket连接
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client.setblocking(False)

try:
self.client.connect((self.host, 80)) # 阻塞不会消耗cpu
except BlockingIOError as e:
pass

# 注册
selector.register(self.client.fileno(), EVENT_WRITE, self.connected)


def loop():
# 事件循环,不停的请求socket的状态并调用对应的回调函数
# 1. select本身是不支持register模式
# 2. socket状态变化以后的回调是由程序员完成的
while not stop:
ready = selector.select()
for key, mask in ready:
call_back = key.data
call_back(key)
# 回调+事件循环+select(poll\epoll)


if __name__ == "__main__":
fetcher = Fetcher()
import time

start_time = time.time()
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
urls.append(url)
fetcher = Fetcher()
fetcher.get_url(url)
loop()
print(time.time() - start_time)

在上述的代码中,其中比较重要的概念是:

  • 回调,即当文件描述符的状态发生变化时,应该回调什么函数去处理:selector.register(self.client.fileno(), EVENT_WRITE, self.connected) && selector.register(self.client.fileno(), EVENT_READ, self.readable)
  • 事件循环函数,控制整个进程的循环回调
  • 回调结束标志位的结束

于是,我们就实现了一个 IO 多路复用的模型。

总结

本文介绍了一个 IO 多路复用模型的 python 代码实现,主要目的是介绍什么是 IO 多路复用模型,为什么要用到 IO 多路复用网络模型,以及其重要组成。

其实 IO 多路复用的设计理念是协程设计的基础,协程也是基于 select(poll, epoll) + 回调 + 事件循环实现的。

-------------THANKS FOR READING-------------