Python性能分析

简介

综合转载于:

整体思路

  • 测量,不要猜测。 测量代码中哪些部分运行时间最长,先把重点放在那些部分上。

  • 实现缓存。 如果你从磁盘、网络和数据库执行多次重复的查找,这可能是一个很大的优化之处。

  • 重用对象,而不是在每次迭代中创建一个新对象。Python 必须清理你创建的每个对象才能释放内存,这就是所谓的“垃圾回收”。许多未使用对象的垃圾回收会大大降低软件速度。

  • 尽可能减少代码中的迭代次数,特别是减少迭代中的操作次数。

  • 避免(深度)递归。 对于 Python 解释器来说,它需要大量的内存和维护(Housekeeping)。改用生成器和迭代之类的工具。

  • 减少内存使用。 一般来说,尽量减少内存的使用。例如,对一个巨大的文件进行逐行解析,而不是先将其加载到内存中。

  • 使用PyPy等方案

  • 使用多线程

    在等待来自网络或磁盘的应答时,你可以使用多个线程使其他部分保持运行状态。一个线程是一个独立的执行序列。默认情况下,Python 程序有一个主线程。但你可以创建更多的主线程,并让 Python 在它们之间切换。这种切换发生得如此之快,以至于它们看上去就好像是在同时并排运行一样。但与其他编程语言不同的是,Python 并不是同时运行的,而是轮流运行。这是因为 Python 中有一种全局解释器锁(Global Interpreter Lock,GIL)机制。我们得到的结论是,线程对于 IO 密集型的软件有很大的影响,但对 CPU 密集型的软件毫无用处。这是为什么呢?很简单。当一个线程在等待来自网络的答复时,其他线程可以继续运行。如果你要执行大量的网络请求,线程可以带来巨大的差异。如果你的线程正在进行繁重的计算,那么它们只是等待轮到它们继续计算,线程化只会带来更多的开销。

  • 使用Asyncio

    Asyncio 是 Python 中一个相对较新的核心库。它解决了与线程相同的问题:它加快了 IO 密集型软件的速度,但这是以不同的方式实现的。它相当复杂,特别是对于初学者来说。我遇到的另一个问题是, asyncio 库在过去几年中有了很大的发展。网上的教程和示例代码常常已经过时。不过,这并不意味着它就毫无用处。

  • 使用多进程

    如果你的软件是 CPU 密集型的,你通常可以用一种可以同时使用更多处理器的方式重写你的代码。通过这种方式,你就可以线性地调整执行速度。这就是所谓的并行性,但并不是所有的算法都可以并行运行。例如,简单的将递归算法进行并行化是不可能的。但是几乎总有一种替代算法可以很好地并行工作。使用更多处理处理器有两种方式:

    • 在同一台机器内使用多个处理器和 / 或内核。在 Python 中,这可以通过 multiprocessing 库来完成。
    • 使用计算机网络来使用多个处理器,分布在多台计算机上。我们称之为分布式计算。

    与 threading 库不同, multiprocessing 库绕过了 Python 的全局解释器锁。它实际上是通过派生多个 Python 实例来实现这一点的。因此,现在你可以让多个 Python 进程同时运行你的代码,而不是在单个 Python 进程中轮流运行线程。

    multiprocessing 库和 threading 库非常相似。可能出现的问题是:为什么还要考虑线程呢?答案是可以猜得到的。线程是“轻量”的:它需要更少的内存,因为它只需要一个正在运行的 Python 解释器。产生新进程也还有其开销。因此,如果你的代码是 IO 密集型的,线程可能就足够好了。

    一旦你实现了软件的并行工作,那么在使用 Hadoop 之类的分布式计算方面就前进了一小步。通过利用云计算平台,你可以相对轻松地进行扩展规模。例如,你可以在云端中处理大型数据集,并在本地使用结果。使用混合操作的方式,你可以节省一些资金,因为云端中的算力非常昂贵。

性能分析工具

CPU

工具 特点 文档 其他
cProfile Python标准库自带的分析工具 https://docs.python.org/3/library/profile.html#module-cProfile 比较简单方便,快速找出耗时较高的函数
timeit Python标准库自带分析工具,分析短代码块的执行时间 https://docs.python.org/3/library/timeit.html
vmprof 具备一定可视化能力 https://vmprof.readthedocs.io/en/latest/vmprof.html 可视化不错
pycharm profile工具 可视化强(需要专业版) https://www.jetbrains.com/help/pycharm/profiler.html#start-profiling

例子

cProfile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from cProfile import Profile
from pstats import Stats


def do():
for i in range(100000):
print("11111")


profiler = Profile()
profiler.runcall(do)
stats = Stats(profiler)
# 清除路径前缀
stats.strip_dirs()
# 按累计时间排序
stats.sort_stats('cumulative')
stats.print_stats(100)
1
2
3
4
5
6
7
8
      100002 function calls in 10.191 seconds

Ordered by: cumulative time

ncalls tottime percall cumtime percall filename:lineno(function)
1 0.106 0.106 10.191 10.191 test.py:25(do)
100000 10.085 0.000 10.085 0.000 {built-in method builtins.print}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}

vmprof

  1. 首先需要参考 https://github.com/vmprof/vmprof-server 起一个可视化服务端

  2. 运行profile程序

    python -m vmprof --web --web-url=http://127.0.0.1:8000/ XXX

  3. 在本地查看可视化结果

内存

工具 特点 文档 其他
memory_profiler 纯python实现的逐行内存占用分析工具 https://pypi.org/project/memory-profiler
obj_graph 用于分析python对象之间的调用关系,主要用于排查内存泄漏等;可结合graphviz做可视化 https://mg.pov.lt/objgraph

例子

memory_profiler

  1. 首先用profile装饰需要分析的代码块

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    from cProfile import Profile
    from pstats import Stats
    from memory_profiler import profile


    @profile
    def do():
    for i in range(10000):
    print("11111")


    profiler = Profile()
    profiler.runcall(do)
    stats = Stats(profiler)
    # 清除路径前缀
    stats.strip_dirs()
    # 按累计时间排序
    stats.sort_stats('cumulative')
    stats.print_stats(100)
  2. 然后运行命令行分析:

python -m memory-profiler XXXX.py

  1. 结果:

    1
    2
    3
    4
    5
    6
    Line #    Mem usage    Increment  Occurrences   Line Contents
    =============================================================
    26 41.0 MiB 41.0 MiB 1 @profile
    27 def do():
    28 41.0 MiB 0.0 MiB 10001 for i in range(10000):
    29 41.0 MiB 0.0 MiB 10000 print("11111")

性能优化

CPU

Python为什么慢?

​ Source code->Compiler->Byte code+Library modules->Virtual machine->Running code

原因 解决方案 例子
解释型 1. 换解释器 2. 编译 1. pypy 2. numba, jax, taichi(图形学)
GIL 1. 去掉GIL 1. nogil 2. numba(nogil模式)

优化方案

工具 AOT(提前编译)/JIT(即时编译) 中间产物 简介 上手难度 文档 支持的语法 适用场景
Numba JIT&AOT LLVM IR Numba is an open source JIT compiler that translates a subset of Python and NumPy code into fast machine code. ** 简单(不过对于部分场景需要特殊编码) http://numba.pydata.org/ https://numba.readthedocs.io/en/stable/reference/pysupported.html 数值计算场景加速明显
jax JIT LLVM IR JAX is NumPy on the CPU, GPU, and TPU, with great automatic differentiation for high-performance machine learning research. * 较为简单 和numpy API类似 https://jax.readthedocs.io/en/latest/notebooks/quickstart.html
taichi JIT LLVM IR Taichi Lang is an open-source, imperative, parallel programming language for high-performance numerical computation. * 较为简单 https://github.com/taichi-dev/taichi 数值计算
PyPy JIT - a fast, compliant alternative implementation of the Python language * 简单(无需修改任何代码) https://doc.pypy.org/en/latest 平均比Cython快4.5倍,实测似乎没太大区别
Cython AOT C/C++ is a programming language that makes writing C extensions for the Python language as easy as Python itself ** 复杂,需要有C基础 https://cython.readthedocs.io/en/latest/src/quickstart/overview.html
nogil AOT - Python Multithreading without GIL * 简单 https://github.com/colesbury/nogil 单线程有一点点性能损失,适用于多核多线程场景(比Cython的多核多进程轻量)
pyjion JIT IL(ECMA335CIL) instructions Pyjion is a drop-in JIT Compiler for Python 3.10 * 简单 https://www.trypyjion.com/ 只支持python 3.10 比python快约1-2倍

numba例子

测试环境:Python3.8 13寸M1 MAC

测试结果:

条件 平均耗时 加速比
不使用numba加速 2000ms x1
njit 20ms x100
pelt_helper函数不用njit,其他函数用 320ms x7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from distutils.util import change_root
import enum
import numpy as np
from scipy.spatial.distance import pdist, squareform
from numba import njit

@njit(fastmath=True)
def get_sum_prefix(gram: np.array) -> np.array:
"""
二维数组前缀和
:param gram:
:return:
"""
row, col = gram.shape
res = np.zeros((row+1, col+1), dtype=np.float32)
for i in range(row+1):
for j in range(col+1):
res[i][j] = res[i-1][j] + res[i][j-1] + gram[i-1][j-1] - res[i-1][j-1]
return res

@njit(fastmath=True)
def get_sum(sum_array: np.array, s: int, t: int) -> float:
"""
区间求和
:param sum_array:
:param s:
:param t:
:return:
"""
s += 1
return sum_array[t][t] - sum_array[t][s-1] - sum_array[s-1][t] + sum_array[s-1][s-1]

@njit(fastmath=True)
def find_min(arr: np.array, val=0.0):
return float(np.min(arr)) + val, np.argmin(arr)

@njit(fastmath=True)
def pelt_helper(gram: np.array, length: int, pen: float):
if pen is None:
pen = np.log(length)

F = np.zeros(length + 1)
R = np.array([0], dtype=np.int64)
candidates = np.zeros(length+1, dtype=np.int64)

F[0] = -pen

sum_array = get_sum_prefix(gram)

for tstar in range(2, length+1):
cpt_cands = R
seg_costs = np.zeros(len(cpt_cands))
for i in range(0, len(cpt_cands)):
seg_costs[i] = (tstar - cpt_cands[i]) - (get_sum(sum_array, cpt_cands[i], tstar) / (tstar - cpt_cands[i]))

F_cost = F[cpt_cands] + seg_costs
F[tstar], tau =find_min(F_cost, pen)
candidates[tstar] = cpt_cands[tau]

ineq_prune = [val < F[tstar] for val in F_cost]
R = [cpt_cands[j] for j, val in enumerate(ineq_prune) if val]
R.append(tstar - 1)
R = np.array(R, dtype=np.int64)

last = candidates[-1]
changepoints = [last]
while last > 0:
last = candidates[last]
changepoints.append(last)
return sorted(changepoints)


def pelt(data, length, pen=None):
"""
pelt算法
:param data:
:param length:
:param pen:
:return:
"""
if data.ndim == 1:
data = data.reshape(-1, 1)
K = pdist(data, metric="sqeuclidean")
gamma = 1.0
K_median = np.median(K)
if K_median != 0:
gamma = 1 / K_median
k *= gamma
np.clip(K, 1e-2, 1e2, K)
gram = np.exp(squareform(-K))

return pelt_helper(gram, length, pen)

IO

背景

通常在UNIX下面处理文本文件的方法是sed、awk等shell命令,对于处理大文件受CPU,IO等因素影响,对服务器也有一定的压力。关于sed的说明可以看 了解sed的工作原理,本文将介绍通过python的mmap模块来实现对大文件的处理,来对比看他们的差异。

说明

mmap是一种虚拟内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。关于系统中mmap的理论说明可以看 百度百科和 维基百科说明以及mmap函数介绍,这里的说明是针对在Python下 mmap模块的使用说明。

创建对象

1
m=mmap.mmap(fileno, length[, flags[, prot[, access[, offset]]]])
  • fileno:文件描述符,可以是file对象的fileno()方法,或者来自os.open(),在调用mmap()之前打开文件,不再需要文件时要关闭

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    os.O_RDONLY   以只读的方式打开 Read only
    os.O_WRONLY 以只写的方式打开 Write only
    os.O_RDWR 以读写的方式打开 Read and write
    os.O_APPEND 以追加的方式打开
    os.O_CREAT 创建并打开一个新文件
    os.O_EXCL os.O_CREAT| os.O_EXCL 如果指定的文件存在,返回错误
    os.O_TRUNC 打开一个文件并截断它的长度为零(必须有写权限)
    os.O_BINARY 以二进制模式打开文件(不转换)
    os.O_NOINHERIT 阻止创建一个共享的文件描述符
    os.O_SHORT_LIVED
    os.O_TEMPORARY 与O_CREAT一起创建临时文件
    os.O_RANDOM 缓存优化,但不限制从磁盘中随机存取
    os.O_SEQUENTIAL 缓存优化,但不限制从磁盘中序列存取
    os.O_TEXT 以文本的模式打开文件(转换)
  • length:要映射文件部分的大小(以字节为单位),这个值为0,则映射整个文件,如果大小大于文件当前大小,则扩展这个文件

  • flags:MAP_PRIVATE:这段内存映射只有本进程可用;mmap.MAP_SHARED:将内存映射和其他进程共享,所有映射了同一文件的进程,都能够看到其中一个所做的更改
  • prot:mmap.PROT_READ, mmap.PROT_WRITEmmap.PROT_WRITE | mmap.PROT_READ。最后一者的含义是同时可读可写
  • access:在mmap中有可选参数access的值有
    • ACCESS_READ:读访问。
    • ACCESS_WRITE:写访问,默认。
    • ACCESS_COPY:拷贝访问,不会把更改写入到文件,使用flush把更改写到文件。

对象方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
m.close()
关闭 m 对应的文件;

m.find(str, start=0)
从 start 下标开始,在 m 中从左往右寻找子串 str 最早出现的下标;

m.flush([offset, n])
把 m 中从offset开始的n个字节刷到对应的文件中;

m.move(dstoff, srcoff, n)
等于 m[dstoff:dstoff+n] = m[srcoff:srcoff+n],把从 srcoff 开始的 n 个字节复制到从 dstoff 开始的n个字节,可能会覆盖重叠的部分。

m.read(n)
返回一个字符串,从 m 对应的文件中最多读取 n 个字节,将会把 m 对应文件的位置指针向后移动;

m.read_byte()
返回一个1字节长的字符串,从 m 对应的文件中读1个字节,要是已经到了EOF还调用 read_byte(),则抛出异常 ValueError;

m.readline()
返回一个字符串,从 m 对应文件的当前位置到下一个'\n',当调用 readline() 时文件位于 EOF,则返回空字符串;

m.resize(n) ***有问题,执行不了***
把 m 的长度改为 n,m 的长度和 m 对应文件的长度是独立的;

m.seek(pos, how=0)
同 file 对象的 seek 操作,改变 m 对应的文件的当前位置;

m.size()
返回 m 对应文件的长度(不是 m 对象的长度len(m));

m.tell()
返回 m 对应文件的当前位置;

m.write(str)
把 str 写到 m 对应文件的当前位置,如果从 m 对应文件的当前位置到 m 结尾剩余的空间不足len(str),则抛出 ValueError;

m.write_byte(byte)
把1个字节(对应一个字符)写到 m 对应文件的当前位置,实际上 m.write_byte(ch) 等于 m.write(ch)。如果 m 对应文件的当前位置在 m 的结尾,也就是 m 对应文件的当前位置到 m 结尾剩余的空间不足1个字节,write() 抛出异常ValueError,而 write_byte() 什么都不做。

使用说明

测试文本:test.txt,mmap对象m

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-- MySQL dump 10.13  Distrib 5.6.19, for osx10.7 (x86_64)
--
-- Host: localhost Database: test
-- ------------------------------------------------------
-- Server version 5.6.19

/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!40101 SET NAMES utf8 */;
/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
/*!40103 SET TIME_ZONE='+00:00' */;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
  • m.close(),关闭对象

    1
    2
    3
    4
    5
    6
    7
    8
    9
    >>> import os,mmap
    >>> m=mmap.mmap(os.open('test.txt',os.O_RDWR),0) #创建内存映射对象,
    >>> m.read(10) #可以使用方法
    '-- MySQL d'
    >>> m.close() #关闭对象
    >>> m.read(10) #方法不可用
    Traceback (most recent call last):
    File "<stdin>", line 1, in <module>
    ValueError: mmap closed or invalid
  • m.find(str, start=0),从start的位置开始寻找第一次出现的str。

    1
    2
    >>> m.find('SET',0)      #从头开始查找第一次出现SET的字符串
    197
  • m.read(n),返回一个从 m对象文件中读取的n个字节的字符串,将会把 m 对象的位置指针向后移动,后续读取会继续往下读。

    1
    2
    3
    4
    >>> m.read(10)         #读取10字节的字符串
    '-- MySQL d'
    >>> m.read(10) #读取上面10字节后,再往后的10字节数据
    'ump 10.13 '
  • m.read_byte(),返回一个1字节长的字符串,从 m 对应的文件中读1个字节

    1
    2
    3
    4
    5
    6
    >>> m.read_byte()   #读取第一个字节
    '-'
    >>> m.read_byte() #读取第二个字节
    '-'
    >>> m.read_byte() #读取第三个字节
    ' '
  • m.readline():返回一个字符串,从 m 对应文件的当前位置到下一个’\n’,当调用 readline() 时文件位于 EOF,则返回空字符串

    1
    2
    3
    4
    >>> m.readline()             #读取一正行
    '-- MySQL dump 10.13 Distrib 5.6.19, for osx10.7 (x86_64)\n'
    >>> m.readline() #读取下一正行
    '--\n'
  • m.size():返回 m 对应文件的长度(不是 m 对象的长度len(m))

    1
    2
    >>> m.size()            #整个文件的大小
    782
  • m.tell():返回 m 对应文件的当前光标位置

    1
    2
    3
    4
    5
    6
    >>> m.tell()        #当前光标的位置0
    0
    >>> m.read(10) #读取10个字节
    '-- MySQL d'
    >>> m.tell() #当前光标位置10
    10
  • m.seek(pos, how=0),改变 m 对应的文件的当前位置

    1
    2
    3
    4
    5
    >>> m.seek(10)        #当前光标定位到10
    >>> m.tell() #读取当前光标的位置
    10
    >>> m.read(10) #读取当前光标之后的10字节内容
    'ump 10.13 '
  • m.move(dstoff, srcoff, n):等于 m[dstoff:dstoff+n] = m[srcoff:srcoff+n],把从 srcoff 开始的 n 个字节复制到从 dstoff 开始的n个字节

    1
    2
    3
    4
    5
    6
    7
    >>> m[101:108]            #切片101到108的值
    '-------'
    >>> m[1:8] #切片1到8的值
    '- MySQL'
    >>> m.move(1,101,8) #从101开始到后面的8字节(108),替换从1开始到后面的8字节(8)效果:m[1:8]=m[101:108]
    >>> m[1:8] #被替换后
    '-------'
  • m.write(str):把 str 写到 m 对应文件的当前光标位置(覆盖对应长度),如果从 m 对应文件的当前光标位置到 m 结尾剩余的空间不足len(str),则抛出 ValueError

    1
    2
    3
    >>> m.tell()                #当前光标位置
    0
    >>> m.write('zhoujy') #写入str,要是写入的大小大于原本的文件,会报错。m.write_byte(byte)不会报错。
    1
    2
    >>> m.tell()                #写入后光标位置 
    6
    1
    >>> m.seek(0)               #重置,光标从头开始
    1
    2
    >>> m.read(10)              #查看10个字节,确定是否被修改成功 
    'zhoujy---d'
  • m.flush():把 m 中从offset开始的n个字节刷到对应的文件中

    注意:对于m的修改操作,可以当成一个列表进行切片操作,但是对于切片操作的修改需要改成同样长度的字符串,否则都会报错。如m中的10个字符串进行修改,必须改成10个字符的长度。

应用说明

  • 读取整个文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    #!/usr/bin/python
    # -*- encoding: utf-8 -*-

    import mmap
    import contextlib

    f = open('test.txt', 'r')
    with contextlib.closing(mmap.mmap(f.fileno(), 0,access=mmap.ACCESS_READ)) as m:
    #readline需要循环才能读取整个文件
    while True:
    line = m.readline().strip()
    print line
    #光标到最后位置(读完),就退出
    if m.tell()==m.size():
    break
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    ~$ python untitled.py                                                                                                                                1 ↵
    -- ZHOUJY ---dump 10.13 Distrib 5.6.19, for osx10.7 (x86_64)
    --
    -- Host: localhost Database: test
    -- ------------------------------------------------------
    -- Server version 5.6.19

    /*!40101 ZHOUJY SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */ZHOUJY;
    /*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
    /*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
    /*!40101 SET NAMES utf8 */;
    /*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
    /*!40103 SET TIME_ZONE='+00:00' */ ZHOUJY;
    /*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
    /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
    /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
    /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
  • 逐步读取指定字节数文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    #!/usr/bin/python
    # -*- encoding: utf-8 -*-

    import mmap
    import contextlib

    with open('test.txt', 'r') as f:
    with contextlib.closing(mmap.mmap(f.fileno(), 0,access=mmap.ACCESS_READ)) as m:
    print '读取10个字节的字符串 :', m.read(10)
    print '支持切片,对读取到的字符串进行切片操作:', m[2:10]
    print '读取之前光标后的10个字符串', m.read(10)
    1
    2
    3
    4
    ~$ python untitled.py
    读取10个字节的字符串 : -- ZHOUJY
    支持切片,对读取到的字符串进行切片操作: ZHOUJY
    读取之前光标后的10个字符串 ---dump 1
  • 从整个文件查找所有匹配的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    #!/usr/bin/python
    # -*- encoding: utf-8 -*-

    import mmap
    import contextlib

    word = 'ZHOUJY'
    print '查找:', word

    f = open('test.txt', 'r')
    with contextlib.closing(mmap.mmap(f.fileno(), 0,access=mmap.ACCESS_READ)) as m:
    #也可以通过find(str,pos)来处理
    while True:
    line = m.readline().strip()
    if line.find(word)>=0:
    print "结果:"
    print line
    elif m.tell()==m.size():
    break
    else:
    pass
    1
    2
    3
    4
    5
    6
    7
    8
    ~$ python untitled.py
    查找: ZHOUJY
    结果:
    -- ZHOUJY ---dump 10.13 Distrib 5.6.19, for osx10.7 (x86_64)
    结果:
    /*!40101 ZHOUJY SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */ZHOUJY;
    结果:
    /*!40103 SET TIME_ZONE='+00:00' */ ZHOUJY;
  • 从整个文件里查找,找到就退出(确认到底是否存在)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    #!/usr/bin/python
    # -*- encoding: utf-8 -*-

    import mmap
    import contextlib

    word = 'ZHOUJY'
    print '查找:', word

    f = open('test.txt', 'r')
    with contextlib.closing(mmap.mmap(f.fileno(), 0,access=mmap.ACCESS_READ)) as m:
    #不需要循环,只要找到一个就可以了
    loc = m.find(word)
    if loc >= 0:
    print loc
    print m[loc:loc+len(word)]
    1
    2
    3
    4
    ~$ python untitled.py
    查找: ZHOUJY
    194
    ZHOUJY
  • 通过正则查找,(找出40开头的数字)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    #!/usr/bin/python
    # -*- encoding: utf-8 -*-

    import mmap
    import re
    import contextlib

    pattern = re.compile(r'(40\d*)')

    with open('test.txt', 'r') as f:
    with contextlib.closing(mmap.mmap(f.fileno(), 0,access=mmap.ACCESS_READ)) as m:
    print pattern.findall(m)
    1
    2
    ~$ python untitled.py
    ['40101', '40101', '40101', '40101', '40103', '40103', '40014', '40014', '40101', '40111']
  • 替换文本中出现一次的内容。比如想把A库的备份文件(9G)还原到B库,需要把里面的USE A改成USE B

    • sed:时间消耗近105s;磁盘IO几乎跑满;内存几乎没消耗、CPU消耗10~20%之间。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    1:替换文本中第一次出现的内容
    ~$ date && sed -i '0,/USE `edcba`;/s//USE `ABCDE`;/' test.sql && date
    2016年 11月 16日 星期三 12:04:17 CST
    2016年 11月 16日 星期三 12:06:02 CST

    2:替换文本中指定行的内容
    ~$ date && sed -i '24s/USE `ABCDE`;/USE `edcba`;/' test.sql && date
    2016年 11月 16日 星期三 12:09:05 CST
    2016年 11月 16日 星期三 12:10:50 CST

    IO消耗:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    Device:         rrqm/s   wrqm/s     r/s     w/s    rMB/s    wMB/s avgrq-sz avgqu-sz   await r_await w_await  svctm  %util
    sda 1.00 7.00 772.00 105.00 87.22 92.06 418.65 27.90 31.35 2.21 245.56 1.14 100.00

    Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
    sda 1.00 4.00 778.00 102.00 87.59 90.03 413.36 25.08 30.30 2.59 241.65 1.13 99.60

    Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
    sda 2.00 5.00 771.00 101.00 87.48 88.04 412.22 29.80 30.24 2.34 243.21 1.14 99.60

    Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
    sda 1.00 18.00 431.00 137.00 49.08 122.04 616.99 66.20 70.25 3.02 281.75 1.75 99.60

    Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
    sda 0.00 1.00 1.00 248.00 0.00 177.04 1456.16 105.24 416.53 24.00 418.11 4.02 100.00
    • python处理:时间消耗是毫秒级别的,几乎是秒级别完成,该情况比较特别:搜索的关键词在大文本里比较靠前的位置,这样处理上T的大文件也是非常快的,要是搜索的关键词靠后怎会怎么样呢?后面会说明。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      #!/usr/bin/python
      # -*- encoding: utf-8 -*-

      import mmap
      import contextlib
      import re


      word = 'USE `EDCBA`;'
      replace = 'USE `ABCDE`;'
      print '查找:', word
      print'替换:', replace

      f = open('test.sql', 'r+')
      with contextlib.closing(mmap.mmap(f.fileno(), 0,access=mmap.ACCESS_WRITE)) as m:
      loc = m.find(word)
      if loc >=0:
      print loc
      m[loc:loc + len(word)] = replace
      1
      2
      3
      4
      5
      6
      ~$ date && python mmap_python.py && date
      2016年 11月 16日 星期三 12:14:19 CST
      查找: USE `EDCBA`;
      替换: USE `ABCDE`;
      929
      2016年 11月 16日 星期三 12:14:19 CST
  • 替换文本中所有匹配的关键词。比如想把备份文件里的ENGINE=MYISAM改成ENGINE=InnoDB,看看性能如何。

    • sed处理:时间消耗110s;磁盘IO几乎跑满(读写IO高);内存几乎没消耗、CPU消耗10~30%之间。

      1
      2
      3
      ~$ date && sed -i 's/ENGINE=InnoDB/ENGINE=MyISAM/g' test.sql && date
      2016年 11月 16日 星期三 12:19:30 CST
      2016年 11月 16日 星期三 12:21:20 CST

      和①中sed的执行效果差不多,其实对于处理一条还是多条记录,sed都是做同样工作量的事情,至于原因可以看 了解sed的工作原理说明,个人理解大致意思就是:sed是1行1行读取(所以内存消耗很小),放入到自己设置的缓冲区里,替换完之后再写入(所以IO很高),处理速度受限于CPU和IO。

    • python处理:时间消耗20多秒,比sed少。因为不用重写所有内容,只需要替换指定的内容即可,并且是在内存中处理的,所以写IO的压力几乎没有。当关键词比较靠后,其读入的数据就比较大,文件需要从磁盘读入到内存,这时磁盘的读IO也很高,写IO还是没有。因为是虚拟内存映射文件,所以占用的物理内存不多,虽然通过TOP看到的内存使用率%mem很高,这里可以不用管,因为大部分都是在SHR列里的消耗,真正使用掉的内存可以通过RES-SHR来计算。关于top中SHR的意思,可以去看相关文章说明。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      #!/usr/bin/python
      # -*- encoding: utf-8 -*-

      import mmap
      import contextlib

      word = 'ENGINE=MyISAM'
      replace = 'ENGINE=InnoDB'
      print '查找:', word
      print'替换:', replace

      loc = 0
      f = open('test.sql', 'r+')
      with contextlib.closing(mmap.mmap(f.fileno(), 0,access=mmap.ACCESS_WRITE)) as m:
      while True:
      loc = m.find(word,loc)
      if loc >=0:
      print loc
      m[loc:loc + len(word)] = replace
      #要是access=mmap.ACCESS_COPY需要执行flush
      #m.flush()
      elif loc == -1:
      break
      else:
      pass
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      ~$ date && python mmap_python.py && date
      2016年 11月 16日 星期三 13:19:30 CST
      查找: ENGINE=MyISAM
      替换: ENGINE=InnoDB
      1663
      5884938
      11941259
      12630481
      12904261
      64852169
      64859312
      65018692
      65179617
      65181544
      65709930
      149571849
      3592900115
      5874952354
      7998151839
      2016年 11月 16日 星期三 13:19:55 CST
  • 正则匹配修改,这个可以通过上面介绍的查找方法,做下修改即可,就不再做说明。

小结:对比sed和python处理文件的方法,这里来小结下:对于sed不管修改的关键字在文本中的任意位置、次数,修改的工作量都一样(全文的读写IO),差距不大;对于python mmap的修改,要是关键字出现在比较靠前的地方,修改起来速度非常快,否则修改也会有大量的读IO,写IO没有。通过上面的对比分析来看,mmap的修改要比sed修改性能高。

Python还有另一个读取操作的方法:open中的read、readline、readlines,这个方法是把文件全部载入内存,再进行操作。若内存不足直接用swap或则报错退出,内存消耗和文本大小成正比,而通过mmap模块的方法可以很好的避免了这个问题。

总结

通过上面的介绍,大致知道如何使用mmap模块了,其大致特点如下:

  • 普通文件被映射到虚拟地址空间后,程序可以向访问普通内存一样对文件进行访问,在有些情况下可以提高IO效率。
  • 它占用物理内存空间少,可以解决内存空间不足的问题,适合处理超大文件。
  • 不同于通常的字符串对象,它是可变的,可以通过切片的方式更改,也可以定位当前文件位置m.tell()m.seek()定位到文件的指定位置,再进行m.write(str)固定长度的修改操作。

最后,可以把mmap封装起来进行使用了,脚本信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#!/usr/bin/python
# -*- encoding: utf-8 -*-

import mmap
import contextlib
import time
from optparse import OptionParser


def calc_time(func):
def _deco(*args, **kwargs):
begin_time = time.time()
func(*args, **kwargs)
cost_time = time.time() - begin_time
print 'cost time: %s' % (cost_time)
return _deco

@calc_time
def replace_keyword_all(filename,old_word,new_word):
if len(old_word) == len(new_word):
loc = 0
print "%s 替换成 %s " %(new_word,old_word)
with open(filename,'r+') as f:
with contextlib.closing(mmap.mmap(f.fileno(), 0,access=mmap.ACCESS_WRITE)) as m:
while True:
loc = m.find(old_word,loc)
if loc >= 0:
m[loc:loc+len(old_word)] = new_word
elif loc == -1:
break
else:
pass
f.close()
else:
print "替换的词要和被替换的词长度一致!"
exit()


@calc_time
def replace_keyword_once(filename,old_word,new_word):
if len(old_word) == len(new_word):
print "%s 替换成 %s " %(new_word,old_word)
with open(filename,'r+') as f:
with contextlib.closing(mmap.mmap(f.fileno(), 0,access=mmap.ACCESS_WRITE)) as m:
loc = m.find(old_word)
if loc >= 0:
m[loc:loc+len(old_word)] = new_word
f.close()
else:
print "替换的词要和被替换的词长度一致!"
exit()

if __name__ == "__main__":
parser = OptionParser()
parser.add_option("-f", "--filename", help="Filename for search", dest="filename")
parser.add_option("-o", "--oldword", help="the ip to use", dest="old_word")
parser.add_option("-n", "--newword", help="the ip to use", dest="new_word")

(options, args) = parser.parse_args()

if not options.filename:
print 'params filename need to apply'
exit()

if not options.old_word:
print 'params oldword need to apply'
exit()

if not options.new_word:
print 'params newword need to apply'
exit()
# 替换文本中第一次出现的内容(查到一个就处理退出,越靠前越快)
# replace_keyword_once(options.filename,options.old_word,options.new_word)
# 替换文本中出现的内容(查找处理整个文本)
replace_keyword_all(options.filename,options.old_word,options.new_word)
1
2
3
4
5
6
7
8
9
10
11
~$ python mmap_search.py -h
Usage: mmap_search.py [options]

Options:
-h, --help show this help message and exit
-f FILENAME, --filename=FILENAME
Filename for search
-o OLD_WORD, --oldword=OLD_WORD
the ip to use
-n NEW_WORD, --newword=NEW_WORD
the ip to use
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1)sed:替换文本中第一次出现的内容
~$ date && sed -i '0,/USE `EDCBA`;/s//USE `ABCDE`;/' test.sql && date
2016年 11月 17日 星期四 11:15:33 CST
2016年 11月 17日 星期四 11:21:47 CST

2)mmap:替换文本中第一次出现的内容(使用replace_keyword_once方法,查到一个就处理退出,越靠前越快)
~$ python mmap_search.py --filename='test.sql' --oldword="USE \`EDCBA\`;" --newword="USE \`ABCDE\`;"
USE `ABCDE`; 替换成 USE `EDCBA`;
cost time: 0.000128984451294

3)sed:替换文本中出现的内容(查找处理整个文本)
~$ date && sed -i 's/ENGINE=InnoDB/ENGINE=MyISAM/g' test.sql && date
2016年 11月 17日 星期四 10:04:49 CST
2016年 11月 17日 星期四 10:11:34 CST

4)mmap:替换文本中出现的内容(使用replace_keyword_all方法,查找处理整个文本)
~$ python mmap_search.py --filename="test.sql" --oldword="ENGINE=MyISAM" --newword="ENGINE=InnoDB"
ENGINE=InnoDB 替换成 ENGINE=MyISAM
cost time: 198.471223116

结论:修改大文本文件,通过sed处理,不管被修改的词在哪个位置都需要重写整个文件;而mmap修改文本,被修改的词越靠前性能越好,不需要重写整个文本,只要替换被修改词语的长度即可。

并行与并发

本文是对《Effective Python》37、38、39条中关于python 多线程的总结。主要分为以下3个部分:

  • 并发与并行
  • 多线程的数据共享和竞态
  • 在阻塞式I/O任务中使用Queue来协调多线程C:\Users\陈润青\Downloads\Practice\test.py

并发(concurrency)和并行(parallelism)

  • 并发:计算机似乎在同一时间做了多个任务,但实际上只是在多个任务间快速切换。比如一个单核CPU上在1分钟处理了4个任务,实际上只是每个任务执行1s后就换另外一个任务。
  • 并行:计算机确实在同一时间做着多个任务。比如在4核CPU上,每个核心处理一个任务,1分钟过后,每个任务都做了1分钟。而上面并发的例子中,每个任务只做了1/4分钟。

并行与并发的关键区别,就在于能不能提速(speedup)。关于并发核并行的区别, geeksforgeeks总结得较好:

Effective Python 并行与并发_下载图片

并发

多线程的数据共享

比如有个程序,它做的操作只有一条cnt = cnt + 1,如果将这个程序写成多线程(假设两个),那么可能最后的输出是1,而不是2。要理解背后的原因,需要将cnt = cnt + 1写成汇编形式

1
2
3
4
5
6
// 将共享变了cnt加载到accumulator register
movq cnt(%rip), %rdx
// 加1操作
addq %eax
// 将更行的值给回共享变量cnt
movq %eax, cnt(%rip)

如果执行顺序为:线程1执行step1,线程2执行step1,线程1执行step2,线程2执行step2,线程1执行step3,线程2执行step3,结果显然为1。为了解决多线程的数据竞争,需要对数据合理加锁。对于上述多线程中的race condition问题,可以阅读CSAPP 12.5节《Synchronizing Threads with Semaphores》。实操一下《Effective Python》中的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from threading import Thread


class Counter(object):

def __init__(self):
self.count = 0

def increment(self, offset):
self.count += offset


def worker(sensor_index, how_many, counter):
for _ in range(how_many):
counter.increment(1)


def run_threads(func, how_many, counter):
threads = []
for i in range(5):
args = (i, how_many, counter)
thread = Thread(target=func, args=args)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()


how_many = 1000000
counter = Counter()
run_threads(worker, how_many, counter)
print("Counter should be {}, found {}".format(5 * how_many, counter.count))

这里值得注意的是,虽然how_many设置为10000时,得到的结果并不是5倍的how_many,但是how_many很小时,比如1000,结果确实是5倍的how_many。这是因为第二个线程开始时,第一个线程已经完成了worker内的计算。
为了解决上述竞态问题,下面是经过数据加锁的代码,threading中的Lock类是用标准方法实现的互斥锁(mutex):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from threading import Thread, Lock


class Counter(object):

def __init__(self):
self.count = 0
self.lock = Lock()

def increment(self, offset):
with self.lock:
self.count += offset


def worker(sensor_index, how_many, counter):
for _ in range(how_many):
counter.increment(1)


def run_threads(func, how_many, counter):
threads = []
for i in range(5):
args = (i, how_many, counter)
thread = Thread(target=func, args=args)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()


how_many = 1000000
counter = Counter()
run_threads(worker, how_many, counter)
print("Counter should be {}, found {}".format(5 * how_many, counter.count))

试了一下在counter.increment(1)处加锁,也有用,和原文不一致。

在阻塞式I/O中使用多线程

在做一些项目的过程中,会遇到有以下特点的任务:

  • 整个任务可以划分成按序执行的多个阶段(可以表示成pipeline): Task = stage1-> stage2 -> ... -> stageN
  • 其中有些stage是阻塞式I/O操作

举个例子:Stage1: 从网络下载图片;Stage 2: 判断图片是否包含小动物; Stage 3: 将包含小动物的图像通过网络传递给客户A。这个过程中Stage 1和Stage 3都是非计算密集型的I/O操作,它可能只需要一条接收或发送语句,接下来等待得到数据或对方接收到数据就行了,主要的计算在Stage 2中。

面对具有这样特点的任务,就可以考虑使用Pyhton中的多线程来提高速度。(注: Python中由于GIL的存在,如果这些stage都是计算密集型的任务,使用多线程无法提高效率,在3.2中我们会具体解释)

Queue

处理上述特点任务时,我们通常会使用Queue来协调各线程间的工作,下面简单介绍一下内置queue模块中的Queue类。

  • task_done()方法:标识队列中的某个元素已经出队列了(某个任务已经完成了)
  • join()方法:阻塞,直到队列中所有元素都出队列了(队列为空)

如果队列获取某个元素,并对其执行一系列操作后,并未调用task_done()进行标识,调用join()会一直阻塞。
举个例子,下面这段代码永远不会执行最后一句print语句

1
2
3
4
5
6
7
8
9
10
11
12
from queue import Queue

q = Queue()
for i in range(10):
q.put(i)

for i in range(10):
q.get()
# q.task_done()

q.join()
print("Ohh, Finished q.join")

为什么这类任务可以考虑多线程

还是举《Effective Python》中的例子,考虑一个3阶段的任务:

  1. 从网络下载图片download;
  2. 对图片进行处理resize;
  3. 将图片上传upload。该任务有阻塞式I/O操作(图片还没完全下载下来,下一个步骤就进行不了)。

如果在编写代码时,将download、resize和upload3个函数进行如下实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from threading import Thread
from queue import Queue
import time


def download(item):
time.sleep(2)
return item


def resize(item):
time.sleep(3)
return item


def upload(item):
time.sleep(5)
return item


class StoppableWorker(Thread):

def __init__(self, func, in_queue, out_queue):
super(StoppableWorker, self).__init__()
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue

def run(self):
for item in self.in_queue:
result = self.func(item)
self.out_queue.put(result)


class ClosableQueue(Queue):
SENTINEL = object()

def close(self):
self.put(self.SENTINEL)

def __iter__(self):
while True:
item = self.get()
try:
if item is self.SENTINEL:
return
yield item
finally:
self.task_done()


download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
output_queue = ClosableQueue()
threads = [
StoppableWorker(download, download_queue, resize_queue),
StoppableWorker(resize, resize_queue, upload_queue),
StoppableWorker(upload, upload_queue, output_queue)
]

for t in threads:
t.start()

st = time.time()
for i in range(10):
download_queue.put(i)
download_queue.close()
download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print("It took {}".format(time.time() - st))
print(output_queue.qsize(), 'items finished')

运行代码,得到的计算时间满足T = 10 + (N - 1) * 5。看到这个结果,有人可能会纳闷,不是说Python中由于GIL的存在,多个线程只有一个能获得对Python Interpreter的锁,相当于只使用了一个CPU核心吗,这样应该无法提速啊。其实应该注意到time.sleep()操作应该是不占用CPU的,sleep的过程和阻塞式I/O的等待过程类似,而这正是多线程为什么在这类任务上可以提高效率的原因。

如果将这3个函数实现为计算密集型版本(必须使用CPU),并重新计算花费的时间,在这种情况下,使用多线程就不能带来速度上的提升了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def download(item):
number = 18769139
res = 0
for i in range(1, number + 1):
if number % i == 0:
res += 1

def resize(item):
number = 18769139 * 3
res = 0
for i in range(1, number + 1):
if number % i == 0:
res += 1

def upload(item):
number = 18769139 * 5
res = 0
for i in range(1, number + 1):
if number % i == 0:
res += 1

并行

multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。但在使用这些共享API的时候,我们要注意以下几点:

  • 在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
  • multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。
  • 多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存和Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。
  • 在多进程下run方法启动相当于直接调用函数,并没有真正意义上使用多进程,这一点我们可以通过pid看的出来。而start启动却是真正意义上调用了多进程,同样我们可以通过pid看的出来

Process.PID中保存有PID,如果进程还没有start(),则PID为None。

我们可以从下面的程序中看到Thread对象和Process对象在使用上的相似性与结果上的不同。各个线程和进程都做一件事:打印PID。但问题是,所有的任务在打印的时候都会向同一个标准输出(stdout)输出。这样输出的字符会混合在一起,无法阅读。使用Lock同步,在一个任务输出完成之后,再允许另一个任务输出,可以避免多个任务同时向终端输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import os
import threading
import multiprocessing

# Main
print('Main:', os.getpid())

# worker function
def worker(sign, lock):
lock.acquire()
print(sign, os.getpid())
lock.release()


# Multi-thread
record = []
lock = threading.Lock()

# Multi-process
record = []
lock = multiprocessing.Lock()

if __name__ == '__main__':
for i in range(5):
thread = threading.Thread(target=worker, args=('thread', lock))
thread.start()
record.append(thread)

for thread in record:
thread.join()

for i in range(5):
process = multiprocessing.Process(target=worker, args=('process', lock))
process.start()
record.append(process)

for process in record:
process.join()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Main: 10012
thread 10012
thread 10012
thread 10012
thread 10012
thread 10012
Main: 6052
process 6052
Main: 8080
Main: 4284
Main: 7240
process 8080
process 4284
process 7240
Main: 10044
process 10044

Pipe和Queue

正如我们在Linux多线程中介绍的管道PIPE和消息队列message queue,multiprocessing包中有Pipe类和Queue类来分别支持这两种IPC机制。Pipe和Queue可以用来传送常见的对象。

  1. Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。

    下面的程序展示了Pipe的使用:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    import multiprocessing as mul

    def proc1(pipe):
    pipe.send('hello')
    print('proc1 rec:', pipe.recv())


    def proc2(pipe):
    print('proc2 rec:', pipe.recv())
    pipe.send('hello, too')


    # Build a pipe
    pipe = mul.Pipe()
    if __name__ == '__main__':
    # Pass an end of the pipe to process 1
    p1 = mul.Process(target=proc1, args=(pipe[0],))
    # Pass the other end of the pipe to process 2
    p2 = mul.Process(target=proc2, args=(pipe[1],))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    1
    2
    proc2 rec: hello
    proc1 rec: hello, too

    这里的Pipe是双向的。Pipe对象建立的时候,返回一个含有两个元素的表,每个元素代表Pipe的一端(Connection对象)。我们对Pipe的某一端调用send()方法来传送对象,在另一端使用recv()来接收。

  2. Queue与Pipe相类似,都是先进先出的结构。但Queue允许多个进程放入,多个进程从队列取出对象。Queue使用mutiprocessing.Queue(maxsize)创建,maxsize表示队列中可以存放对象的最大数量。

    下面的程序展示了Queue的使用:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    import os
    import multiprocessing
    import time
    #==================
    # input worker
    def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.time())
    queue.put(info)

    # output worker
    def outputQ(queue,lock):
    info = queue.get()
    lock.acquire()
    print (str(os.getpid()) + ' get: ' + info)
    lock.release()
    #===================
    # Main
    record1 = [] # store input processes
    record2 = [] # store output processes
    lock = multiprocessing.Lock() # To prevent messy print
    queue = multiprocessing.Queue(3)

    if __name__ == '__main__':
    # input processes
    for i in range(10):
    process = multiprocessing.Process(target=inputQ,args=(queue,))
    process.start()
    record1.append(process)

    # output processes
    for i in range(10):
    process = multiprocessing.Process(target=outputQ,args=(queue,lock))
    process.start()
    record2.append(process)

    for p in record1:
    p.join()

    queue.close() # No more object will come, close the queue

    for p in record2:
    p.join()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    8572 get: 6300(put):1555486924.3676226
    8136 get: 3464(put):1555486924.412625
    9576 get: 9660(put):1555486924.5126307
    6936 get: 5064(put):1555486924.5976355
    10652 get: 8688(put):1555486924.5976355
    6992 get: 10988(put):1555486924.7526445
    6548 get: 6836(put):1555486924.7456443
    3504 get: 7284(put):1555486924.7666454
    8652 get: 4960(put):1555486924.8536503
    10868 get: 460(put):1555486924.8606508

    一些进程使用put()在Queue中放入字符串,这个字符串中包含PID和时间。另一些进程从Queue中取出,并打印自己的PID以及get()的字符串。

进程池

进程池 (Process Pool)可以创建多个进程。这些进程就像是随时待命的士兵,准备执行任务(程序)。一个进程池中可以容纳多个待命的进程。

1
2
3
4
5
6
7
8
9
10
11
import multiprocessing as mul


def f(x):
return x ** 2


if __name__ == '__main__':
pool = mul.Pool(5)
rel = pool.map(f, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
print(rel)
1
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

我们创建了一个容许5个进程的进程池 (Process Pool) 。Pool运行的每个进程都执行f()函数。我们利用map()方法,将f()函数作用到表的每个元素上。这与built-in的map()函数类似,只是这里用5个进程并行处理。如果进程运行结束后,还有需要处理的元素,那么进程会被用于重新运行f()函数。除了map()方法外,Pool还有下面的常用方法。

  • apply_async(func,args) 从进程池中取出一个进程执行func,args参数。它将返回一个AsyncResult的对象,你可以对该对象调用get()方法以获得结果。
  • close() 进程池不再创建新的进程
  • join() wait进程池中的全部进程。必须对Pool先调用close()方法才能join。

共享内存

实例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import multiprocessing

# Value/Array
def func1(a, arr):
a.value = 3.14
for i in range(len(arr)):
arr[i] = 0
a.value = 0

if __name__ == '__main__':
num = multiprocessing.Value('d', 1.0) # num=0
arr = multiprocessing.Array('i', range(10)) # arr=range(10)
p = multiprocessing.Process(target=func1, args=(num, arr))
p.start()
p.join()
print (num.value)
print (arr[:])
1
2
0.0
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

这里我们实际上只有主进程和Process对象代表的进程。我们在主进程的内存空间中创建共享的内存,也就是Value和Array两个对象。对象Value被设置成为双精度数(d), 并初始化为1.0。而Array则类似于C中的数组,有固定的类型(i, 也就是整数)。在Process进程中,我们修改了Value和Array对象。回到主程序,打印出结果,主程序也看到了两个对象的改变,说明资源确实在两个进程之间共享。

Manager

Manager是通过共享进程的方式共享数据。Manager管理的共享数据类型有:Value、Array、dict、list、Lock、Semaphore等等,同时Manager还可以共享类的实例对象。

实例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from multiprocessing import Process,Manager
def func1(shareList,shareValue,shareDict,lock):
with lock:
shareValue.value+=1
shareDict[1]='1'
shareDict[2]='2'
for i in xrange(len(shareList)):
shareList[i]+=1

if __name__ == '__main__':
manager=Manager()
list1=manager.list([1,2,3,4,5])
dict1=manager.dict()
array1=manager.Array('i',range(10))
value1=manager.Value('i',1)
lock=manager.Lock()
proc=[Process(target=func1,args=(list1,value1,dict1,lock)) for i in xrange(20)]
for p in proc:
p.start()
for p in proc:
p.join()
print list1
print dict1
print array1
print value1
1
2
3
4
[21, 22, 23, 24, 25]
{1: '1', 2: '2'}
array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
Value('i', 21)
一分一毛,也是心意。