进程 今天我们一起来学习多进程编程,在学习之前,首先介绍一下什么是进程:
进程是计算机中正在运行的程序实例 。它包含程序的执行代码、数据和系统资源 。每个进程都是在自己的虚拟空间中运行,使得它们相互隔离 ,不会直接干扰其他进程的执行。
它是操作系统进行进行调度的基本单位。
进程的状态:
运行态(Running): 当前正在 CPU 上执行的进程。
就绪态(Ready): 等待分配 CPU 资源,一旦获得 CPU 时间片,即可切换到运行态。
阻塞态(Blocked): 因为某些事件(input/output)而暂停运行,直到事件完成后进入就绪态
进程间的通信(IPC):由于进程之间是相互隔离的,如果需要进行进程间的数据传递或同步操作,就需要使用进程间的通信。常见的 IPC 的方式有:管道、消息队列、共享内存等
多进程编程 本次学习使用 python 进行编程。我们知道在 python 中由于 GIL 锁的存在,每次只有一个线程在执行代码,这样的话,在耗 CPU 的情况下,就无法充分利用到计算机的多核,于是就需要使用 多进程编程 。
通过一个例子我们来学习 python 中的多进程编程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import multiprocessingimport timedef get_html (n ): time.sleep(n) print ("sub_progress success" ) return n if __name__ == '__main__' : progress = multiprocessing.Process(target=get_html, args=(2 ,)) progress.start() progress.join() print ("main progress end" )
在 python 中多进程编程的模块是 multiprocessing
, 具体操作的相关函数可以查阅
进程池 接下来,我们将一起学习进程池的使用。
1 2 3 4 5 6 7 8 9 pool = multiprocessing.Pool(multiprocessing.cpu_count()) result = pool.apply_async(get_html, args=(3 ,)) pool.close() pool.join() print (result.get())
imap imap 方法的特点就是它可以将一个函数应用于一个可迭代对象的所有元素,并且在多个进程中并行执行这些函数。这意味着其可以高效地并行处理大量数据,从而加速任务的执行。
1 result = pool.imap(get_html, [1 , 5 , 3 ])
对于这个 result 中,其执行完成的顺序是谁先执行完成,就先输出。
进程间的通信 进程间的通信主要是通过以下几种方式进行:
对于共享全局变量而言是不可行的,因为当一个进程进行 fork 的时候,就会对父进程的全局变量数据进行复制。
Queue 首先我们来学习 Queue 队列,这里的 Queue 和多线程编程是不一样的
以上的 Queue 只适用于多线程。而多进程间的通信则需要使用 multiprocess.Queue
,后续文章中,我会对这二者的底层进行学习,然后对比学习,感兴趣的同学可以自行学习。
这里我使用一个生产者-消费者模型进行学习:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import multiprocessingimport timefrom multiprocessing import Queuedef producer (queue ): queue.put("a" ) time.sleep(1 ) def consumer (queue ): time.sleep(1 ) data = queue.get() print (f"receive data: {data} " ) if __name__ == '__main__' : queue = Queue() p = multiprocessing.Process(target=producer, args=(queue,)) c = multiprocessing.Process(target=consumer, args=(queue,)) p.start() c.start() p.join() c.join()
pipe 进程间的通信还可以使用管道进行通信,使用方法和 queue 类似,但是注意 pipe 的初始化:send_pipe, recv_pipe = Pipe()
, pipe 只适用于两个进程之间的通信。当只需要两个进程时,pipe 的性能高于 Queue.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import multiprocessingimport timefrom multiprocessing import Queue, Pipedef producer (pipe ): pipe.send("a" ) time.sleep(1 ) def consumer (pipe ): time.sleep(1 ) data = pipe.recv() print (f"receive data: {data} " ) if __name__ == '__main__' : send_pipe, recv_pipe = Pipe() p = multiprocessing.Process(target=producer, args=(send_pipe,)) c = multiprocessing.Process(target=consumer, args=(recv_pipe,)) p.start() c.start() p.join() c.join()
Manager 在上文中,我们使用到了进程池 pool, 对于进程池而言, multiprocessing.Queue()
和 multiprocessing.pipe()
都是不可行的。而 Manager 模块的设计正是为了解决这一点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import multiprocessingimport timefrom multiprocessing import Queue, Pipe, Managerdef producer (queue ): queue.put("a" ) time.sleep(1 ) def consumer (queue ): time.sleep(1 ) data = queue.get() print (f"receive data: {data} " ) if __name__ == '__main__' : queue = Manager().Queue() pool = multiprocessing.Pool() p = pool.apply_async(producer, args=(queue,)) c = pool.apply_async(consumer, args=(queue,)) pool.close() pool.join()
在 manager 中有很多共享内存类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class SyncManager (BaseManager ): def BoundedSemaphore (self, value: Any = ... ) -> threading.BoundedSemaphore: ... def Condition (self, lock: Any = ... ) -> threading.Condition: ... def Event (self ) -> threading.Event: ... def Lock (self ) -> threading.Lock: ... def Namespace (self ) -> _Namespace: ... def Queue (self, maxsize: int = ... ) -> queue.Queue[Any ]: ... def RLock (self ) -> threading.RLock: ... def Semaphore (self, value: Any = ... ) -> threading.Semaphore: ... def Array (self, typecode: Any , sequence: Sequence [_T] ) -> Sequence [_T]: ... def Value (self, typecode: Any , value: _T ) -> ValueProxy[_T]: ... def dict (self, sequence: Mapping[_KT, _VT] = ... ) -> _dict[_KT, _VT]: ... def list (self, sequence: Sequence [_T] = ... ) -> _list[_T]: ...
这些都可以用于进程间的通信