介绍 Python 中的子进程模块
subprocess
、线程池模块ThreadPoolExecutor
以及系统信息获取模块Psutil
的用法。
1. Python子进程模块subprocess
subprocess
模块允许我们启动一个新进程,并连接到它们的输入/输出/错误管道,从而获取返回值。
(1)run
方法
首先我们来看看 run
方法的使用,该方法的参数如下:
args
:表示要执行的命令。必须是一个字符串或字符串参数列表。stdin
、stdout
和stderr
:子进程的标准输入、输出和错误。其值可以是subprocess.PIPE
、subprocess.DEVNULL
、一个已经存在的文件描述符、已经打开的文件对象或者None
。subprocess.PIPE
表示为子进程创建新的管道。subprocess.DEVNULL
表示使用os.devnull
。默认使用的是None
,表示什么都不做。另外,stderr
可以合并到stdout
里一起输出。timeout
:设置命令超时时间。如果命令执行时间超时,子进程将被杀死,并抛出TimeoutExpired
异常。check
:如果该参数设置为True
,并且进程退出状态码不是 0,则抛出CalledProcessError
异常。encoding
:如果指定了该参数,则stdin
、stdout
和stderr
可以接收字符串数据,并以该编码方式编码。否则只接收bytes
类型的数据。shell
:如果该参数为True
,将通过操作系统的 Shell 执行指定的命令。capture_output
:如果capture_output = True
,则将捕获stdout
和stderr
,调用时内部的Popen
对象将自动使用stdout = PIPE
和stderr = PIPE
创建标准输出和标准错误对象;传递stdout
和stderr
参数时不能同时传递capture_output
参数。如果希望捕获并将两个stream
合并为一个,使用stdout = PIPE
和stderr = STDOUT
。
下面我们来看一个例子,run
方法调用方式返回 CompletedProcess
实例:
1 | import subprocess |
其中,Python子进程测试程序1.py
内容如下:
1 | print('子进程输出: Hello World!') |
(2)Popen
方法
Popen
是 subprocess
的核心,子进程的创建和管理都靠它处理。
Popen
方法的参数如下:
args
:Shell 命令,可以是字符串或者序列类型(如:列表、元组)。bufsize
:缓冲区大小。当创建标准流的管道对象时使用,默认为-1
。0
表示不使用缓冲区,1
表示行缓冲,仅当universal_newlines = True
时可用,也就是文本模式。正数表示缓冲区大小,负数表示使用系统默认的缓冲区大小。stdin
、stdout
、stderr
:分别表示程序的标准输入、输出、错误句柄。preexec_fn
:只在 Unix 平台下有效,用于指定一个可执行对象(callable object),它将在子进程运行之前被调用。shell
:如果该参数为True
,将通过操作系统的 Shell 执行指定的命令。cwd
:用于设置子进程的当前目录。env
:用于指定子进程的环境变量。如果env = None
,子进程的环境变量将从父进程中继承。
该方法会创建一个 Popen
对象, Popen
对象有以下几种方法:
poll()
:检查进程是否终止,如果终止返回returncode
,否则返回None
。wait(timeout)
:等待子进程终止。communicate(input=None, timeout=None)
:和子进程交互,向子进程发送和读取数据。将input
指定数据发送到stdin
;从stdout
和stderr
读取数据,直到到达文件末尾,等待进程终止。所以,返回值是一个元组:(stdout_data, stderr_data)
。如果timeout
时间内子进程不结束,则会抛出TimeoutExpired
异常。其中需要注意的是,捕获异常之后,可以再次调用该函数,因为子进程并没有被 KILL。因此,如果超时结束程序的话,需要现正确 KILL 子进程。send_signal(singnal)
:发送信号到子进程。terminate()
:停止子进程,也就是发送SIGTERM
信号到子进程。kill()
:杀死子进程,发送SIGKILL
信号到子进程。
Popen
方法的样例如下:
1 | args2 = ['python', 'src/Python子进程测试程序2.py'] |
其中,Python子进程测试程序2.py
内容如下:
1 | s = input() |
现在我们来看一下 communicate
的用法,我们将测试程序修改为运行时间超过一秒:
1 | args2 = ['python', 'src/Python子进程测试程序2.py'] |
现在的 Python子进程测试程序2.py
内容如下:
1 | s = input() |
2. ThreadPoolExecutor线程池
concurrent.futures
模块是 Python3.2 中引入的新模块,用于支持异步执行,以及在多核 CPU 和网络 I/O 中进行高效的并发编程。线程池的基类是 concurrent.futures
模块中的 Executor
,Executor
提供了两个子类,即 ThreadPoolExecutor
和 ProcessPoolExecutor
,简化了跨平台异步编程的实现。其中 ThreadPoolExecutor
用于创建线程池,而 ProcessPoolExecutor
用于创建进程池。如果使用线程池/进程池来管理并发编程,那么只要将相应的 Task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。
首先,让我们先来理解多进程和多线程两种并发编程的方式:
- 多进程:当通过多进程来实现并发编程时,程序会将任务分配给多个进程,这些进程可以在不同的 CPU 上同时运行。进程之间是独立的,各自有自己的内存空间等,可以实现真正的并行执行。不过,进程之间的通信比较耗时,需要使用 IPC(进程间通信)机制,而且进程之间的切换比线程之间的切换耗时,所以创建进程的代价较高。
- 多线程:当通过多线程来实现并发编程时,程序会将任务分配给多个线程,这些线程可以在同一个进程中的不同 CPU 核上同时运行。线程之间共享进程的内存空间,因此开销比较小。但是需要注意,在 Python 解释器中,线程是无法实现真正的并行执行,因为 Python 有 GIL(全局解释器锁),它确保同时只有一个线程运行 Python 代码。因此,一个 Python 进程中的多个线程并不能并行执行,在使用多线程编程时不能完全利用多核 CPU。
ThreadPoolExecutor
创建一个线程池,任务可以提交到这个线程池中执行。ThreadPoolExecutor
比 ProcessPoolExecutor
更容易使用,且没有像进程那样的开销。它可以让我们在一个 Python 解释器中进行跨线程异步编程,因为它规避了 GIL。
Exectuor
提供了如下常用方法:
submit(fn, *args, **kwargs)
:将fn
函数提交给线程池。*args
代表传给fn
函数的参数,**kwargs
代表以关键字参数的形式为fn
函数传入参数。map(func, *iterables, timeout=None, chunksize=1)
:该函数类似于全局函数map(func, *iterables)
,只是该函数将会启动多个线程,以异步方式立即对iterables
执行map
处理。shutdown(wait=True)
:关闭线程池。
程序将 fn
函数 submit
给线程池后,submit
方法会返回一个 Future
对象,Future
类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python
使用 Future
来代表。
Future
对象提供了如下方法:
cancel()
:取消该Future
代表的线程任务。如果该任务正在执行,不可取消,则该方法返回False
;否则,程序会取消该任务,并返回True
。cancelled()
:返回Future
代表的线程任务是否被成功取消。running()
:如果该Future
代表的线程任务正在执行、不可被取消,该方法返回True
。done()
:如果该Funture
代表的线程任务被成功取消或执行完成,则该方法返回True
。result(timeout=None)
:获取该Future
代表的线程任务最后返回的结果。如果Future
代表的线程任务还未完成,该方法将会阻塞当前线程,其中timeout
参数指定最多阻塞多少秒。exception(timeout=None)
:获取该Future
代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回None
。add_done_callback(fn)
:为该Future
代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该fn
函数。
在用完一个线程池后,应该调用该线程池的 shutdown()
方法,该方法将启动线程池的关闭序列。调用 shutdown()
方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。
使用线程池来执行线程任务的步骤如下:
- 调用
ThreadPoolExecutor
类的构造器创建一个线程池。 - 定义一个普通函数作为线程任务。
- 调用
ThreadPoolExecutor
对象的submit()
方法来提交线程任务。 - 当不想提交任何任务时,调用
ThreadPoolExecutor
对象的shutdown()
方法来关闭线程池。
下面我们来看一个例子:
1 | from concurrent.futures import ThreadPoolExecutor |
3. 系统信息获取模块Psutil
现在可能会有人在想那我们如何获取子进程/线程在运行时的时间开销或者内存占用等信息呢?Python 有一个第三方模块 psutil
,专门用来获取操作系统以及硬件相关的信息,比如:CPU、磁盘、网络、内存等等。
首先我们需要安装 psutil
,直接通过 pip
命令安装即可:
1 | pip install psutil |
(1)查看 CPU 相关信息:
1 | import psutil |
(2)查看内存及磁盘相关信息:
1 | print(psutil.virtual_memory()) # 内存使用情况,分别为总内存、可用内存、内存占用率、已使用的内存大小、剩余的内存大小 |
(3)查看网络相关信息:
1 | print(psutil.net_io_counters()) # 网卡的网络IO统计信息 |
(4)查看进程相关信息:
1 | print(psutil.pids()) # 当前存在的所有进程的PID |
现在我们使用 psutil
模块实现获取 ThreadPoolExecutor
线程任务运行的时间与内存占用信息:
1 | args2 = ['python', 'src/Python子进程测试程序2.py'] |
其中,Python子进程测试程序2.py
内容如下:
1 | s = input() |