迭代器和生成器的实现原理

迭代器

什么是迭代协议 ?

这里我们可以查看 collections.abc 中的两个对象:Iterable Iterator

1
2
3
4
5
6
7
8
9
10
@runtime_checkable
class Iterable(Protocol[_T_co]):
@abstractmethod
def __iter__(self) -> Iterator[_T_co]: ...

@runtime_checkable
class Iterator(Iterable[_T_co], Protocol[_T_co]):
@abstractmethod
def __next__(self) -> _T_co: ...
def __iter__(self) -> Iterator[_T_co]: ...

这里我们来观察一下 list 是否是可迭代对象或者是一个迭代器:

1
2
3
4
5
6
7
8
9
10
11
from collections.abc import Iterator, Iterable


if __name__ == '__main__':
a = [1, 2, 3]
print(isinstance(a, Iterable))
print(isinstance(a, Iterator))
# output
True
False

我们知道 list 只实现了一个 __iter__ 方法,并未实现,所以 list 是一个可迭代对象而不是迭代器;

所以,实现了 __iter__ 魔法方法的对象是一个可迭代对象,实现了 __iter__ && __next__ 方法的是一个迭代器。

如何将一个可迭代对象转换为迭代器

答案是 python 内置的 iter()

这里只需要使用 iter() 就可以将 a 转换为一个迭代器:

1
2
3
4
5
6
7
8
if __name__ == '__main__':
a = [1, 2, 3]
a = iter(a)
print(isinstance(a, Iterable))
print(isinstance(a, Iterator))
# output
True
True

接下来,我们来看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Company:
def __init__(self, employee_list):
self.employee = employee_list

# def __iter__(self):
# return self.employee

def __getitem__(self, item):
return self.employee[item]

def __len__(self):
return len(self.employee)


if __name__ == '__main__':
c = Company([1, 2, 3])
for i in c:
print(i)

在这个类中 ,我们并未实现 __iter__ 函数,而是实现了 __getitem__ 函数,并且也能使用 for 循环,这是为什么呢?

实际上,for 循环时,会自动去调用 iter(),然后去查找魔法方法 __iter__ ,如果没有就会去查找 __getitem__ 方法。

为了更好地理解这些逻辑,我们将模仿 iter() 实现一个迭代器,将上述的 company 变为一个可迭代对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class Company:
def __init__(self, employee_list):
self.employee = employee_list

def __iter__(self):
return MyIter(self.employee)

# def __getitem__(self, item):
# return self.employee[item]

def __len__(self):
return len(self.employee)


class MyIter:
def __init__(self, employee_list):
self.employee = employee_list
self.index = 0

def __iter__(self):
return self

def __next__(self):
try:
word = self.employee[self.index]
except IndexError:
raise StopIteration

self.index += 1
return word


if __name__ == '__main__':
c = Company([1, 2, 3])
for i in c:
print(i)

# output: 1 2 3

实际上将 __next__ 类定义在 Company 也可以,但是基于迭代器的设计模式,对于另外的属性,例如 self.index 会污染 Company 类属性。

生成器

函数中只要有 yield 关键字,就是一个生成器函数

python 编译字节码的时候就生成了一个生成器对象

先来看一段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def gen_func():
yield 1
yield 2
yield 3


def func():
return 1


if __name__ == '__main__':
gen = gen_func() # 返回一个 generator 对象
fun = func() # 返回 1

for i in gen:
print(i)

# output 1, 2, 3

在这段代码中,我们将 yield 关键字和 return 关键字进行了比较,可以看到,其两个函数返回的对象是不一致的,gen 返回了一个生成器对象,而 fun 返回了一个 int 类型的值。并且 gen 是可以被迭代的。

这是为什么呢?这是因为 在 python 解释器编译字节码时,遇到 yield 关键字时,自动将其转换为了一个 生成器对象,且生成器也是实现了迭代器协议的。

为了更好地解释 yield 关键字的用途,这里采用斐波拉契函数来进一步理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def fib(index):
if index == 0:
return 0
if 0 < index <= 2:
return 1
else:
return fib(index-1) + fib(index-2)

def fib2(index):
re_list = []
n, a, b = 0, 0, 1
while n < index:
re_list.append(b)
a, b = b, a+b
n = n+1
return re_list

if __name__ == '__main__':
print(fib(10))
# output 55

在上述的代码中,fib2 还原了斐波拉契的生成过程,记录了每次的值,但是当数据量非常大时,例如几千万上亿时,那么这个数据是非常大的,就会特别消耗内存。而 yield 关键字刚好能解决这个问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def fib2(index):
n, a, b = 0, 0, 1
while n < index:
yield b
a, b = b, a+b
n = n+1

if __name__ == '__main__':
for i in fib2(10):
print(i)
# output
"""
1
1
2
3
5
8
13
21
34
55
"""

这样改动下来,yield 只会占用一点点内存,且在 for 循环时,会记录每次的生成值,刚好解决了这个问题。

生成器原理

python 中函数的工作原理

先了解一个 python 中函数的调用原理:

在Python中,栈帧(stack frame)是独立存在的。每当一个函数被调用时,Python会在调用栈中创建一个栈帧来保存该函数的局部变量、参数、返回地址等信息。

栈帧是函数调用的上下文,它包含了函数执行过程中的相关信息。每个栈帧都是独立的,包含了函数的局部状态,并且按照调用的顺序进行组织。当函数执行结束后,对应的栈帧会被弹出栈,从而回到上一个函数的执行环境。

每个栈帧都包含以下几个重要的组成部分:

  1. 局部变量:栈帧中保存了函数的局部变量,包括函数参数和在函数内部定义的变量。
  2. 返回地址:栈帧中保存了函数执行结束后的返回地址,用于指示程序继续执行的位置。
  3. 异常处理信息:栈帧中保存了与异常处理相关的信息,用于捕获和处理异常。

由于栈帧是独立的,每个栈帧都可以保持自己的状态,并且可以同时存在多个栈帧。这使得Python可以实现递归调用和函数的嵌套调用。

总结:在Python中,栈帧是独立存在的,它保存了函数调用的上下文信息,包括局部变量、返回地址和异常处理信息。每个函数调用都会创建一个新的栈帧,并按照调用顺序组织在调用栈中。栈帧的独立存在使得Python能够支持递归调用和函数的嵌套调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def foo():
bar()

def bar():
pass


if __name__ == '__main__':
import dis # 字节码库
print(dis.dis(foo)) #字节码执行过程
"""
9 0 LOAD_GLOBAL 0 (bar)
2 CALL_FUNCTION 0
4 POP_TOP
6 LOAD_CONST 0 (None)
8 RETURN_VALUE
None
"""

从上述的代码中,我们可以看到,对于 foo 函数而言,python 解释器会创建一个栈祯来保存该函数的相关信息

所有的栈祯都是分配在堆内存上,这就决定了栈祯可以独立于调用者存在

用一段代码来解释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import inspect
frame = None
def foo():
bar()


def bar():
global frame
frame = inspect.currentframe()


if __name__ == '__main__':
foo()
print(frame.f_code.co_name)
caller_frame = frame.f_back
print(caller_frame.f_code.co_name)
# bar
# foo

在上述代码中,我们引入了 inspect 包来查看 栈祯保存的一些信息,

通过 frame = inspect.currentframe() 来保存当前函数 bar 的栈祯。可以看到,通过 frame.f_code.co_name 打印的是 bar, 而且还可以通过 frame.f_back 还可以查看上一个栈祯的信息即 foo。

通过这段代码,我们就能更加理解 python 解释器在运行一个 函数时的操作了,让我们明白对于递归调用而言,函数的调用是可控的,可溯源的,这也是 python 生成器的前提

![image-20230702003450166](/Users/tyronemaxi/Library/Application Support/typora-user-images/image-20230702003450166.png)

![image-20230702003852334](/Users/tyronemaxi/Library/Application Support/typora-user-images/image-20230702003852334.png)

用一段代码解释生成器的运行原理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def gen_func():
yield 1
name = "bobby"
yield 2
age = 30
return "imooc"


if __name__ == '__main__':
import dis
gen = gen_func()
print(dis.dis(gen))
print(gen.gi_frame.f_lasti)
print(gen.gi_frame.f_locals)
next(gen)
print(gen.gi_frame.f_lasti)
print(gen.gi_frame.f_locals)
next(gen)
print(gen.gi_frame.f_lasti)
print(gen.gi_frame.f_locals)
next(gen)
print(gen.gi_frame.f_lasti)
print(gen.gi_frame.f_locals)

打印信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  9           0 LOAD_CONST               1 ('哈哈')
2 YIELD_VALUE
4 POP_TOP

10 6 LOAD_CONST 2 ('bobby')
8 STORE_FAST 0 (name)

11 10 LOAD_CONST 3 ('嘻嘻')
12 YIELD_VALUE
14 POP_TOP

12 16 LOAD_CONST 4 (30)
18 STORE_FAST 1 (age)

13 20 LOAD_CONST 5 ('imooc')
22 RETURN_VALUE
None
-1
{}
2
{}
12
{'name': 'bobby'}

我们分析一下打印信息:

首先,我们使用 dis 模块 print(dis.dis(gen)) 打印栈祯的详细信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  9           0 LOAD_CONST               1 ('哈哈')
2 YIELD_VALUE
4 POP_TOP

10 6 LOAD_CONST 2 ('bobby')
8 STORE_FAST 0 (name)

11 10 LOAD_CONST 3 ('嘻嘻')
12 YIELD_VALUE
14 POP_TOP

12 16 LOAD_CONST 4 (30)
18 STORE_FAST 1 (age)

13 20 LOAD_CONST 5 ('imooc')
22 RETURN_VALUE
None

我们可以看到对于 gen_func 的每一行代码而言,都保存了相关信息;

其中 YIELD_VALUE 就是对应 yield 关键字

通过 gen.gi_frame 打印 PyGenObject 中的 f_lasti f_locals

  • f_lasti 当前祯的最后指令的索引
  • f_locals 当前祯的局部变量字典
1
2
print(gen.gi_frame.f_lasti)
print(gen.gi_frame.f_locals)

生成器如何读取一个大文件

例如现在有一个巨大的文件 500G 左右,没有换行符,且只有一行数据,例如:

1
python is the best language in world | xi xi xi|xxxxxx| hahaha ....
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def read_big_file(f, sep_str):
buf = ""
while True:
while sep_str in buf:
pos = buf.index(sep_str) # 每次只读取一个分隔符内容部分
yield buf[:pos]
buf = buf[pos + len(sep_str):]

chunk = f.read(4096)
if not chunk:
# 已经读到了文件结尾
yield buf
break
buf += chunk


if __name__ == '__main__':
with open("1.txt") as f:
for content in read_big_file(f, "|"):
print(content)

常规分块读取文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def read_file_in_chunks(file_path, chunk_size):
with open(file_path, 'rb') as file:
while True:
chunk = file.read(chunk_size)
if not chunk:
break
yield chunk

file_path = 'path/to/file.txt'
chunk_size = 1024 # 每次读取的块大小

for chunk in read_file_in_chunks(file_path, chunk_size):
# 处理每个块的数据
print(chunk)

协程应用

C 10M 问题

如何利用 8 核 CPU , 64 G内存,在 10 g bps 的网络上保持 1000 万的并发连接;

-------------THANKS FOR READING-------------