Python子进程管理与进程信息获取

  1. 1. Python子进程模块subprocess
  2. 2. ThreadPoolExecutor线程池
  3. 3. 系统信息获取模块Psutil

介绍 Python 中的子进程模块 subprocess、线程池模块 ThreadPoolExecutor 以及系统信息获取模块 Psutil 的用法。

1. Python子进程模块subprocess

subprocess 模块允许我们启动一个新进程,并连接到它们的输入/输出/错误管道,从而获取返回值。

(1)run 方法

首先我们来看看 run 方法的使用,该方法的参数如下:

  • args:表示要执行的命令。必须是一个字符串或字符串参数列表。
  • stdinstdoutstderr:子进程的标准输入、输出和错误。其值可以是 subprocess.PIPEsubprocess.DEVNULL、一个已经存在的文件描述符、已经打开的文件对象或者 Nonesubprocess.PIPE 表示为子进程创建新的管道。subprocess.DEVNULL 表示使用 os.devnull。默认使用的是 None,表示什么都不做。另外,stderr 可以合并到 stdout 里一起输出。
  • timeout:设置命令超时时间。如果命令执行时间超时,子进程将被杀死,并抛出 TimeoutExpired 异常。
  • check:如果该参数设置为 True,并且进程退出状态码不是 0,则抛出 CalledProcessError 异常。
  • encoding:如果指定了该参数,则 stdinstdoutstderr 可以接收字符串数据,并以该编码方式编码。否则只接收 bytes 类型的数据。
  • shell:如果该参数为 True,将通过操作系统的 Shell 执行指定的命令。
  • capture_output:如果 capture_output = True,则将捕获 stdoutstderr,调用时内部的 Popen 对象将自动使用 stdout = PIPEstderr = PIPE 创建标准输出和标准错误对象;传递 stdoutstderr 参数时不能同时传递 capture_output 参数。如果希望捕获并将两个 stream 合并为一个,使用 stdout = PIPEstderr = STDOUT

下面我们来看一个例子,run 方法调用方式返回 CompletedProcess 实例:

1
2
3
4
5
6
7
8
9
10
11
import subprocess

args1 = ['python', 'src/Python子进程测试程序1.py']

ret = subprocess.run(args=args1, capture_output=True, encoding='utf-8') # 相当于在命令行执行:python src/Python子进程测试程序1.py
print(ret) # CompletedProcess(args=['python', 'src/Python子进程测试程序1.py'], returncode=0, stdout='子进程输出: Hello World!\n', stderr='')

if ret.returncode == 0:
print('Success, stdout:', ret.stdout) # Success, stdout: 子进程输出: Hello World!
else:
print('Error, stderr:', ret.stderr) # Error, stderr: python: can't open file 'D:\xxx\src\Python子进程测试程序1_Wrong.py': [Errno 2] No such file or directory

其中,Python子进程测试程序1.py 内容如下:

1
print('子进程输出: Hello World!')

(2)Popen 方法

Popensubprocess 的核心,子进程的创建和管理都靠它处理。

Popen 方法的参数如下:

  • args:Shell 命令,可以是字符串或者序列类型(如:列表、元组)。
  • bufsize:缓冲区大小。当创建标准流的管道对象时使用,默认为 -10 表示不使用缓冲区,1 表示行缓冲,仅当 universal_newlines = True 时可用,也就是文本模式。正数表示缓冲区大小,负数表示使用系统默认的缓冲区大小。
  • stdinstdoutstderr:分别表示程序的标准输入、输出、错误句柄。
  • preexec_fn:只在 Unix 平台下有效,用于指定一个可执行对象(callable object),它将在子进程运行之前被调用。
  • shell:如果该参数为 True,将通过操作系统的 Shell 执行指定的命令。
  • cwd:用于设置子进程的当前目录。
  • env:用于指定子进程的环境变量。如果 env = None,子进程的环境变量将从父进程中继承。

该方法会创建一个 Popen 对象, Popen 对象有以下几种方法:

  • poll():检查进程是否终止,如果终止返回 returncode,否则返回 None
  • wait(timeout):等待子进程终止。
  • communicate(input=None, timeout=None):和子进程交互,向子进程发送和读取数据。将 input 指定数据发送到 stdin;从 stdoutstderr 读取数据,直到到达文件末尾,等待进程终止。所以,返回值是一个元组:(stdout_data, stderr_data)。如果 timeout 时间内子进程不结束,则会抛出 TimeoutExpired 异常。其中需要注意的是,捕获异常之后,可以再次调用该函数,因为子进程并没有被 KILL。因此,如果超时结束程序的话,需要现正确 KILL 子进程。
  • send_signal(singnal):发送信号到子进程。
  • terminate():停止子进程,也就是发送 SIGTERM 信号到子进程。
  • kill():杀死子进程,发送 SIGKILL 信号到子进程。

Popen 方法的样例如下:

1
2
3
4
5
6
7
8
9
10
11
12
args2 = ['python', 'src/Python子进程测试程序2.py']

proc = subprocess.Popen(args=args2,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8')
print(proc) # <Popen: returncode: None args: ['python', 'src/Python子进程测试程序2.py']>

stdout, stderr = proc.communicate(input='AsanoSaki')
print('stdout:', stdout) # stdout: 子进程输出: AsanoSaki
print('stderr:', stderr) # stderr: 空

其中,Python子进程测试程序2.py 内容如下:

1
2
s = input()
print('子进程输出: ', s)

现在我们来看一下 communicate 的用法,我们将测试程序修改为运行时间超过一秒:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
args2 = ['python', 'src/Python子进程测试程序2.py']

proc = subprocess.Popen(args=args2,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8')

try:
stdout, stderr = proc.communicate(input='AsanoSaki', timeout=1) # 设置1s超时时间
except subprocess.TimeoutExpired:
print('子进程运行超时')
proc.kill() # 需要KILL子进程
stdout, stderr = proc.communicate() # 捕获异常之后,可以再次调用该函数
print('stdout:', stdout) # stdout: 子进程输出: AsanoSaki
print('stderr:', stderr) # stderr: 空

现在的 Python子进程测试程序2.py 内容如下:

1
2
3
4
5
s = input()
print('子进程输出: ', s)

for i in range(10**9):
pass

2. ThreadPoolExecutor线程池

concurrent.futures 模块是 Python3.2 中引入的新模块,用于支持异步执行,以及在多核 CPU 和网络 I/O 中进行高效的并发编程。线程池的基类是 concurrent.futures 模块中的 ExecutorExecutor 提供了两个子类,即 ThreadPoolExecutorProcessPoolExecutor ,简化了跨平台异步编程的实现。其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。如果使用线程池/进程池来管理并发编程,那么只要将相应的 Task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。

首先,让我们先来理解多进程和多线程两种并发编程的方式:

  • 多进程:当通过多进程来实现并发编程时,程序会将任务分配给多个进程,这些进程可以在不同的 CPU 上同时运行。进程之间是独立的,各自有自己的内存空间等,可以实现真正的并行执行。不过,进程之间的通信比较耗时,需要使用 IPC(进程间通信)机制,而且进程之间的切换比线程之间的切换耗时,所以创建进程的代价较高。
  • 多线程:当通过多线程来实现并发编程时,程序会将任务分配给多个线程,这些线程可以在同一个进程中的不同 CPU 核上同时运行。线程之间共享进程的内存空间,因此开销比较小。但是需要注意,在 Python 解释器中,线程是无法实现真正的并行执行,因为 Python 有 GIL(全局解释器锁),它确保同时只有一个线程运行 Python 代码。因此,一个 Python 进程中的多个线程并不能并行执行,在使用多线程编程时不能完全利用多核 CPU。

ThreadPoolExecutor 创建一个线程池,任务可以提交到这个线程池中执行。ThreadPoolExecutorProcessPoolExecutor 更容易使用,且没有像进程那样的开销。它可以让我们在一个 Python 解释器中进行跨线程异步编程,因为它规避了 GIL。

Exectuor 提供了如下常用方法:

  • submit(fn, *args, **kwargs):将 fn 函数提交给线程池。*args 代表传给 fn 函数的参数,**kwargs 代表以关键字参数的形式为 fn 函数传入参数。
  • map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
  • shutdown(wait=True):关闭线程池。

程序将 fn 函数 submit 给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。

Future 对象提供了如下方法:

  • cancel():取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True
  • cancelled():返回 Future 代表的线程任务是否被成功取消。
  • running():如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True
  • done():如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True
  • result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
  • exception(timeout=None):获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None
  • add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。

在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。

使用线程池来执行线程任务的步骤如下:

  1. 调用 ThreadPoolExecutor 类的构造器创建一个线程池。
  2. 定义一个普通函数作为线程任务。
  3. 调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。
  4. 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。

下面我们来看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from concurrent.futures import ThreadPoolExecutor

def thread(num):
print('Threads:', num)

def getResult(): # 有返回结果的函数
return 'Get Result'

# 新建ThreadPoolExecutor对象并指定最大的线程数量
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交多个任务到线程池中
executor.submit(thread, 1) # Threads: 1
executor.submit(thread, 2) # Threads: 2
t = executor.submit(getResult)
print(t.result()) # Get Result

# 或者按如下方式实现
threadPool = ThreadPoolExecutor(max_workers=3)
for i in range(3):
threadPool.submit(thread, i)
threadPool.shutdown(wait=True)
# Threads: 0
# Threads: 1
# Threads: 2

3. 系统信息获取模块Psutil

现在可能会有人在想那我们如何获取子进程/线程在运行时的时间开销或者内存占用等信息呢?Python 有一个第三方模块 psutil,专门用来获取操作系统以及硬件相关的信息,比如:CPU、磁盘、网络、内存等等。

首先我们需要安装 psutil,直接通过 pip 命令安装即可:

1
pip install psutil

(1)查看 CPU 相关信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import psutil

print(psutil.cpu_count()) # CPU的逻辑数量:12

print(psutil.cpu_count(logical=False)) # CPU的物理核心数量:6

print(psutil.cpu_times()) # CPU的用户/系统/空闲时间
# scputimes(user=26860.4375, system=10963.515624999884, idle=676060.796875, interrupt=740.875, dpc=477.75)

for _ in range(3):
# interval表示每隔0.5s刷新一次,percpu表示查看所有的CPU使用率
print(psutil.cpu_percent(interval=0.5, percpu=True))
# [21.2, 0.0, 32.4, 0.0, 16.1, 0.0, 0.0, 0.0, 3.2, 0.0, 0.0, 0.0]
# [25.8, 0.0, 3.1, 0.0, 0.0, 0.0, 3.1, 3.1, 0.0, 0.0, 0.0, 18.8]
# [32.4, 0.0, 21.9, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 6.2, 0.0]

print(psutil.cpu_stats()) # CPU的统计信息,包括上下文切换、中断、软中断以及系统调用次数等
# scpustats(ctx_switches=3399680458, interrupts=1365489476, soft_interrupts=0, syscalls=2283205750)

print(psutil.cpu_freq()) # CPU的频率
# scpufreq(current=2592.0, min=0.0, max=2592.0)

(2)查看内存及磁盘相关信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
print(psutil.virtual_memory())  # 内存使用情况,分别为总内存、可用内存、内存占用率、已使用的内存大小、剩余的内存大小
# svmem(total=17022177280, available=7125008384, percent=58.1, used=9897168896, free=7125008384)

print(psutil.swap_memory()) # 交换内存信息(专门用来临时存储数据)
# sswap(total=6030352384, used=5409898496, free=620453888, percent=89.7, sin=0, sout=0)

print(psutil.disk_partitions()) # 磁盘分区、磁盘使用率和磁盘IO信息
# [sdiskpart(device='C:\\', mountpoint='C:\\', fstype='NTFS', opts='rw,fixed', maxfile=255, maxpath=260),
# sdiskpart(device='D:\\', mountpoint='D:\\', fstype='NTFS', opts='rw,fixed', maxfile=255, maxpath=260),
# sdiskpart(device='E:\\', mountpoint='E:\\', fstype='NTFS', opts='rw,fixed', maxfile=255, maxpath=260)]

print(psutil.disk_usage("C:\\")) # 某个磁盘使用情况
# sdiskusage(total=160253673472, used=101791543296, free=58462130176, percent=63.5)

print(psutil.disk_io_counters()) # 磁盘IO统计信息,分别为读次数、写次数、读的字节数、写的字节数、读时间、写时间
# sdiskio(read_count=1833834, write_count=1831471, read_bytes=69098376704, write_bytes=59881958400, read_time=17952, write_time=2323)

(3)查看网络相关信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
print(psutil.net_io_counters())  # 网卡的网络IO统计信息
# snetio(bytes_sent=629698806, bytes_recv=1756588411, packets_sent=1280472, packets_recv=2023810, errin=0, errout=0, dropin=0, dropout=0)

print(psutil.net_io_counters(pernic=True)) # 列出所有网卡的信息
# {'Ethernet': snetio(bytes_sent=0, bytes_recv=0, packets_sent=0, packets_recv=0, errin=0, errout=0, dropin=0, dropout=0),
# 'Local Area Connection* 3': snetio(bytes_sent=0, bytes_recv=0, packets_sent=0, packets_recv=0, errin=0, errout=0, dropin=0, dropout=0),
# 'Local Area Connection* 4': snetio(bytes_sent=0, bytes_recv=0, packets_sent=0, packets_recv=0, errin=0, errout=0, dropin=0, dropout=0),
# ......]

print(psutil.net_if_addrs()) # 网络接口信息
# {'Ethernet': [snicaddr(family=<AddressFamily.AF_LINK: -1>, address='04-D4-C4-74-A4-F0', netmask=None, broadcast=None, ptp=None), snicaddr(family=<AddressFamily.AF_INET: 2>, address='169.254.216.112', netmask='255.255.0.0', broadcast=None, ptp=None)],
# 'Local Area Connection* 3': [snicaddr(family=<AddressFamily.AF_LINK: -1>, address='38-00-25-26-9C-70', netmask=None, broadcast=None, ptp=None), snicaddr(family=<AddressFamily.AF_INET: 2>, address='169.254.169.242', netmask='255.255.0.0', broadcast=None, ptp=None), snicaddr(family=<AddressFamily.AF_INET6: 23>, address='fe80::5e4e:63b6:7416:787b', netmask=None, broadcast=None, ptp=None)],
# ......]

print(psutil.net_if_stats()) # 网卡的详细信息,包括是否启动、通信类型、传输速度、mtu
# {'Ethernet': snicstats(isup=False, duplex=<NicDuplex.NIC_DUPLEX_FULL: 2>, speed=0, mtu=1500),
# 'vEthernet (Default Switch)': snicstats(isup=True, duplex=<NicDuplex.NIC_DUPLEX_FULL: 2>, speed=4294, mtu=1500),
# 'Loopback Pseudo-Interface 1': snicstats(isup=True, duplex=<NicDuplex.NIC_DUPLEX_FULL: 2>, speed=1073, mtu=1500),
# ......]

print(psutil.net_connections()) # 当前机器的网络连接,里面接受一个参数,默认是"inet",当然我们也可以指定为其它,比如"tcp"
# [sconn(fd=-1, family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_DGRAM: 2>, laddr=addr(ip='127.0.0.1', port=1309), raddr=(), status='NONE', pid=7516),
# sconn(fd=-1, family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_STREAM: 1>, laddr=addr(ip='127.0.0.1', port=9100), raddr=(), status='LISTEN', pid=6004),
# sconn(fd=-1, family=<AddressFamily.AF_INET6: 23>, type=<SocketKind.SOCK_STREAM: 1>, laddr=addr(ip='::', port=49667), raddr=(), status='LISTEN', pid=3768),
# ......]

print(psutil.users()) # 当前登录的用户信息
# [suser(name='AsanoSaki', terminal=None, host=None, started=1694966965.3425539, pid=None)]

import datetime
print(psutil.boot_time()) # 系统的启动时间:1694912508.6818905
print(datetime.datetime.fromtimestamp(psutil.boot_time())) # 2023-09-17 09:01:48.681890

(4)查看进程相关信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
print(psutil.pids())  # 当前存在的所有进程的PID
# [0, 4, 8, 140, 212, 584, 756, 1052, 1160, 1188, 1292, 1364, 1384, ...]

print(psutil.pid_exists(0)) # 某个进程是否存在
# True

print(psutil.process_iter()) # 所有进程对象(Process)组成的迭代器
# <generator object process_iter at 0x00000263C4D2AF20>

print(psutil.Process(pid=10712)) # 根据PID获取一个进程对应的Process对象
# psutil.Process(pid=10712, name='pycharm64.exe', status='running', started='08:56:02')

p = psutil.Process(pid=10712) # 获取该Process对象

print(p.name()) # 进程名称,pycharm64.exe

print(p.exe()) # 进程的exe路径:E:\PyCharm 2020.3.3\bin\pycharm64.exe

print(p.cwd()) # 进程的工作目录:E:\PyCharm 2020.3.3\jbr\bin

print(p.cmdline()) # 进程启动的命令行:['E:\\PyCharm 2020.3.3\\bin\\pycharm64.exe']

print(p.status()) # 进程状态:running

print(p.username()) # 进程用户名:LAPTOP-23NEHV3U\AsanoSaki

print(p.create_time()) # 进程创建时间,返回时间戳:1694998562.3625667

print(p.cpu_times()) # 进程使用的CPU时间
# pcputimes(user=277.09375, system=34.265625, children_user=0.0, children_system=0.0)

print(p.memory_info()) # 进程所使用的内存
# pmem(rss=1507303424, vms=1790066688, num_page_faults=1466884, peak_wset=1536196608,
# wset=1507303424, peak_paged_pool=881616, paged_pool=876144, peak_nonpaged_pool=268096,
# nonpaged_pool=154024, pagefile=1790066688, peak_pagefile=1798070272, private=1790066688)

print(p.num_threads()) # 进程内的线程数量,即这个进程开启了多少个线程:68

现在我们使用 psutil 模块实现获取 ThreadPoolExecutor 线程任务运行的时间与内存占用信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
args2 = ['python', 'src/Python子进程测试程序2.py']

proc = subprocess.Popen(args=args2,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8')

def getProcessInfo(pid):
p = psutil.Process(pid) # 获取pid所代表的子进程
start_time = time.time()
memory = 0
while(True):
try:
memory = max(memory, p.memory_info().rss)
except:
break
runtime = (time.time() - start_time) * 1000
return runtime, memory

threadPool = ThreadPoolExecutor()
task = threadPool.submit(getProcessInfo, proc.pid)
stdout, stderr = proc.communicate(input='AsanoSaki')
runtime, memory = task.result()
print(runtime) # 510.7400417327881
print(memory) # 40865792

threadPool.shutdown(wait=True)
proc.kill()

其中,Python子进程测试程序2.py 内容如下:

1
2
s = input()
print('子进程输出: ', s)