简介
综合转载于:
- python 存储文件加速_如何加速 Python 代码?
- Effective Python 并行与并发
- Python multiprocessing使用详解
- Python之mmap内存映射模块(大文本处理)说明
整体思路
测量,不要猜测。 测量代码中哪些部分运行时间最长,先把重点放在那些部分上。
实现缓存。 如果你从磁盘、网络和数据库执行多次重复的查找,这可能是一个很大的优化之处。
重用对象,而不是在每次迭代中创建一个新对象。Python 必须清理你创建的每个对象才能释放内存,这就是所谓的“垃圾回收”。许多未使用对象的垃圾回收会大大降低软件速度。
尽可能减少代码中的迭代次数,特别是减少迭代中的操作次数。
避免(深度)递归。 对于 Python 解释器来说,它需要大量的内存和维护(Housekeeping)。改用生成器和迭代之类的工具。
减少内存使用。 一般来说,尽量减少内存的使用。例如,对一个巨大的文件进行逐行解析,而不是先将其加载到内存中。
使用PyPy等方案
使用多线程
在等待来自网络或磁盘的应答时,你可以使用多个线程使其他部分保持运行状态。一个线程是一个独立的执行序列。默认情况下,Python 程序有一个主线程。但你可以创建更多的主线程,并让 Python 在它们之间切换。这种切换发生得如此之快,以至于它们看上去就好像是在同时并排运行一样。但与其他编程语言不同的是,Python 并不是同时运行的,而是轮流运行。这是因为 Python 中有一种全局解释器锁(Global Interpreter Lock,GIL)机制。我们得到的结论是,线程对于 IO 密集型的软件有很大的影响,但对 CPU 密集型的软件毫无用处。这是为什么呢?很简单。当一个线程在等待来自网络的答复时,其他线程可以继续运行。如果你要执行大量的网络请求,线程可以带来巨大的差异。如果你的线程正在进行繁重的计算,那么它们只是等待轮到它们继续计算,线程化只会带来更多的开销。
使用Asyncio
Asyncio 是 Python 中一个相对较新的核心库。它解决了与线程相同的问题:它加快了 IO 密集型软件的速度,但这是以不同的方式实现的。它相当复杂,特别是对于初学者来说。我遇到的另一个问题是, asyncio 库在过去几年中有了很大的发展。网上的教程和示例代码常常已经过时。不过,这并不意味着它就毫无用处。
使用多进程
如果你的软件是 CPU 密集型的,你通常可以用一种可以同时使用更多处理器的方式重写你的代码。通过这种方式,你就可以线性地调整执行速度。这就是所谓的并行性,但并不是所有的算法都可以并行运行。例如,简单的将递归算法进行并行化是不可能的。但是几乎总有一种替代算法可以很好地并行工作。使用更多处理处理器有两种方式:
- 在同一台机器内使用多个处理器和 / 或内核。在 Python 中,这可以通过 multiprocessing 库来完成。
- 使用计算机网络来使用多个处理器,分布在多台计算机上。我们称之为分布式计算。
与 threading 库不同, multiprocessing 库绕过了 Python 的全局解释器锁。它实际上是通过派生多个 Python 实例来实现这一点的。因此,现在你可以让多个 Python 进程同时运行你的代码,而不是在单个 Python 进程中轮流运行线程。
multiprocessing 库和 threading 库非常相似。可能出现的问题是:为什么还要考虑线程呢?答案是可以猜得到的。线程是“轻量”的:它需要更少的内存,因为它只需要一个正在运行的 Python 解释器。产生新进程也还有其开销。因此,如果你的代码是 IO 密集型的,线程可能就足够好了。
一旦你实现了软件的并行工作,那么在使用 Hadoop 之类的分布式计算方面就前进了一小步。通过利用云计算平台,你可以相对轻松地进行扩展规模。例如,你可以在云端中处理大型数据集,并在本地使用结果。使用混合操作的方式,你可以节省一些资金,因为云端中的算力非常昂贵。
性能分析工具
CPU
工具 | 特点 | 文档 | 其他 |
---|---|---|---|
cProfile | Python标准库自带的分析工具 | https://docs.python.org/3/library/profile.html#module-cProfile | 比较简单方便,快速找出耗时较高的函数 |
timeit | Python标准库自带分析工具,分析短代码块的执行时间 | https://docs.python.org/3/library/timeit.html | |
vmprof | 具备一定可视化能力 | https://vmprof.readthedocs.io/en/latest/vmprof.html | 可视化不错 |
pycharm profile工具 | 可视化强(需要专业版) | https://www.jetbrains.com/help/pycharm/profiler.html#start-profiling |
例子
cProfile
1 | from cProfile import Profile |
1 | 100002 function calls in 10.191 seconds |
vmprof
首先需要参考 https://github.com/vmprof/vmprof-server 起一个可视化服务端
运行profile程序
python -m vmprof --web --web-url=http://127.0.0.1:8000/ XXX
在本地查看可视化结果
内存
工具 | 特点 | 文档 | 其他 |
---|---|---|---|
memory_profiler | 纯python实现的逐行内存占用分析工具 | https://pypi.org/project/memory-profiler | |
obj_graph | 用于分析python对象之间的调用关系,主要用于排查内存泄漏等;可结合graphviz做可视化 | https://mg.pov.lt/objgraph |
例子
memory_profiler
首先用profile装饰需要分析的代码块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19from cProfile import Profile
from pstats import Stats
from memory_profiler import profile
@profile
def do():
for i in range(10000):
print("11111")
profiler = Profile()
profiler.runcall(do)
stats = Stats(profiler)
# 清除路径前缀
stats.strip_dirs()
# 按累计时间排序
stats.sort_stats('cumulative')
stats.print_stats(100)然后运行命令行分析:
python -m memory-profiler XXXX.py
结果:
1
2
3
4
5
6Line # Mem usage Increment Occurrences Line Contents
=============================================================
26 41.0 MiB 41.0 MiB 1 @profile
27 def do():
28 41.0 MiB 0.0 MiB 10001 for i in range(10000):
29 41.0 MiB 0.0 MiB 10000 print("11111")
性能优化
CPU
Python为什么慢?
Source code->Compiler->Byte code+Library modules->Virtual machine->Running code
原因 | 解决方案 | 例子 |
---|---|---|
解释型 | 1. 换解释器 2. 编译 | 1. pypy 2. numba, jax, taichi(图形学) |
GIL | 1. 去掉GIL | 1. nogil 2. numba(nogil模式) |
优化方案
工具 | AOT(提前编译)/JIT(即时编译) | 中间产物 | 简介 | 上手难度 | 文档 | 支持的语法 | 适用场景 |
---|---|---|---|---|---|---|---|
Numba | JIT&AOT | LLVM IR | Numba is an open source JIT compiler that translates a subset of Python and NumPy code into fast machine code. | ** 简单(不过对于部分场景需要特殊编码) | http://numba.pydata.org/ | https://numba.readthedocs.io/en/stable/reference/pysupported.html | 数值计算场景加速明显 |
jax | JIT | LLVM IR | JAX is NumPy on the CPU, GPU, and TPU, with great automatic differentiation for high-performance machine learning research. | * 较为简单 和numpy API类似 | https://jax.readthedocs.io/en/latest/notebooks/quickstart.html | ||
taichi | JIT | LLVM IR | Taichi Lang is an open-source, imperative, parallel programming language for high-performance numerical computation. | * 较为简单 | https://github.com/taichi-dev/taichi | 数值计算 | |
PyPy | JIT | - | a fast, compliant alternative implementation of the Python language | * 简单(无需修改任何代码) | https://doc.pypy.org/en/latest | 平均比Cython快4.5倍,实测似乎没太大区别 | |
Cython | AOT | C/C++ | is a programming language that makes writing C extensions for the Python language as easy as Python itself | ** 复杂,需要有C基础 | https://cython.readthedocs.io/en/latest/src/quickstart/overview.html | ||
nogil | AOT | - | Python Multithreading without GIL | * 简单 | https://github.com/colesbury/nogil | 单线程有一点点性能损失,适用于多核多线程场景(比Cython的多核多进程轻量) | |
pyjion | JIT | IL(ECMA335CIL) instructions | Pyjion is a drop-in JIT Compiler for Python 3.10 | * 简单 | https://www.trypyjion.com/ | 只支持python 3.10 | 比python快约1-2倍 |
numba例子
测试环境:Python3.8 13寸M1 MAC
测试结果:
条件 | 平均耗时 | 加速比 |
---|---|---|
不使用numba加速 | 2000ms | x1 |
njit | 20ms | x100 |
pelt_helper函数不用njit,其他函数用 | 320ms | x7 |
1 | from distutils.util import change_root |
IO
背景
通常在UNIX下面处理文本文件的方法是sed、awk等shell命令,对于处理大文件受CPU,IO等因素影响,对服务器也有一定的压力。关于sed的说明可以看 了解sed的工作原理,本文将介绍通过python的mmap模块来实现对大文件的处理,来对比看他们的差异。
说明
mmap是一种虚拟内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。关于系统中mmap的理论说明可以看 百度百科和 维基百科说明以及mmap函数介绍,这里的说明是针对在Python下 mmap模块的使用说明。
创建对象
1 | m=mmap.mmap(fileno, length[, flags[, prot[, access[, offset]]]]) |
fileno:文件描述符,可以是file对象的
fileno()
方法,或者来自os.open()
,在调用mmap()
之前打开文件,不再需要文件时要关闭1
2
3
4
5
6
7
8
9
10
11
12
13
14os.O_RDONLY 以只读的方式打开 Read only
os.O_WRONLY 以只写的方式打开 Write only
os.O_RDWR 以读写的方式打开 Read and write
os.O_APPEND 以追加的方式打开
os.O_CREAT 创建并打开一个新文件
os.O_EXCL os.O_CREAT| os.O_EXCL 如果指定的文件存在,返回错误
os.O_TRUNC 打开一个文件并截断它的长度为零(必须有写权限)
os.O_BINARY 以二进制模式打开文件(不转换)
os.O_NOINHERIT 阻止创建一个共享的文件描述符
os.O_SHORT_LIVED
os.O_TEMPORARY 与O_CREAT一起创建临时文件
os.O_RANDOM 缓存优化,但不限制从磁盘中随机存取
os.O_SEQUENTIAL 缓存优化,但不限制从磁盘中序列存取
os.O_TEXT 以文本的模式打开文件(转换)length:要映射文件部分的大小(以字节为单位),这个值为0,则映射整个文件,如果大小大于文件当前大小,则扩展这个文件
- flags:
MAP_PRIVATE
:这段内存映射只有本进程可用;mmap.MAP_SHARED
:将内存映射和其他进程共享,所有映射了同一文件的进程,都能够看到其中一个所做的更改 - prot:
mmap.PROT_READ
,mmap.PROT_WRITE
和mmap.PROT_WRITE | mmap.PROT_READ
。最后一者的含义是同时可读可写 - access:在mmap中有可选参数access的值有
- ACCESS_READ:读访问。
- ACCESS_WRITE:写访问,默认。
- ACCESS_COPY:拷贝访问,不会把更改写入到文件,使用flush把更改写到文件。
对象方法
1 | m.close() |
使用说明
测试文本:test.txt,mmap对象m
1 | -- MySQL dump 10.13 Distrib 5.6.19, for osx10.7 (x86_64) |
m.close(),关闭对象
1
2
3
4
5
6
7
8
9>>> import os,mmap
>>> m=mmap.mmap(os.open('test.txt',os.O_RDWR),0) #创建内存映射对象,
>>> m.read(10) #可以使用方法
'-- MySQL d'
>>> m.close() #关闭对象
>>> m.read(10) #方法不可用
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ValueError: mmap closed or invalidm.find(str, start=0),从start的位置开始寻找第一次出现的str。
1
2>>> m.find('SET',0) #从头开始查找第一次出现SET的字符串
197m.read(n),返回一个从 m对象文件中读取的n个字节的字符串,将会把 m 对象的位置指针向后移动,后续读取会继续往下读。
1
2
3
4>>> m.read(10) #读取10字节的字符串
'-- MySQL d'
>>> m.read(10) #读取上面10字节后,再往后的10字节数据
'ump 10.13 'm.read_byte(),返回一个1字节长的字符串,从 m 对应的文件中读1个字节
1
2
3
4
5
6>>> m.read_byte() #读取第一个字节
'-'
>>> m.read_byte() #读取第二个字节
'-'
>>> m.read_byte() #读取第三个字节
' 'm.readline():返回一个字符串,从 m 对应文件的当前位置到下一个’\n’,当调用 readline() 时文件位于 EOF,则返回空字符串
1
2
3
4>>> m.readline() #读取一正行
'-- MySQL dump 10.13 Distrib 5.6.19, for osx10.7 (x86_64)\n'
>>> m.readline() #读取下一正行
'--\n'm.size():返回 m 对应文件的长度(不是 m 对象的长度len(m))
1
2>>> m.size() #整个文件的大小
782m.tell():返回 m 对应文件的当前光标位置
1
2
3
4
5
6>>> m.tell() #当前光标的位置0
0
>>> m.read(10) #读取10个字节
'-- MySQL d'
>>> m.tell() #当前光标位置10
10m.seek(pos, how=0),改变 m 对应的文件的当前位置
1
2
3
4
5>>> m.seek(10) #当前光标定位到10
>>> m.tell() #读取当前光标的位置
10
>>> m.read(10) #读取当前光标之后的10字节内容
'ump 10.13 'm.move(dstoff, srcoff, n):等于 m[dstoff:dstoff+n] = m[srcoff:srcoff+n],把从 srcoff 开始的 n 个字节复制到从 dstoff 开始的n个字节
1
2
3
4
5
6
7>>> m[101:108] #切片101到108的值
'-------'
>>> m[1:8] #切片1到8的值
'- MySQL'
>>> m.move(1,101,8) #从101开始到后面的8字节(108),替换从1开始到后面的8字节(8)效果:m[1:8]=m[101:108]
>>> m[1:8] #被替换后
'-------'m.write(str):把 str 写到 m 对应文件的当前光标位置(覆盖对应长度),如果从 m 对应文件的当前光标位置到 m 结尾剩余的空间不足len(str),则抛出 ValueError
1
2
3>>> m.tell() #当前光标位置
0
>>> m.write('zhoujy') #写入str,要是写入的大小大于原本的文件,会报错。m.write_byte(byte)不会报错。1
2>>> m.tell() #写入后光标位置
61
>>> m.seek(0) #重置,光标从头开始
1
2>>> m.read(10) #查看10个字节,确定是否被修改成功
'zhoujy---d'm.flush():把 m 中从offset开始的n个字节刷到对应的文件中
注意:对于m的修改操作,可以当成一个列表进行切片操作,但是对于切片操作的修改需要改成同样长度的字符串,否则都会报错。如m中的10个字符串进行修改,必须改成10个字符的长度。
应用说明
读取整个文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15#!/usr/bin/python
# -*- encoding: utf-8 -*-
import mmap
import contextlib
f = open('test.txt', 'r')
with contextlib.closing(mmap.mmap(f.fileno(), 0,access=mmap.ACCESS_READ)) as m:
#readline需要循环才能读取整个文件
while True:
line = m.readline().strip()
print line
#光标到最后位置(读完),就退出
if m.tell()==m.size():
break1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17~$ python untitled.py 1 ↵
-- ZHOUJY ---dump 10.13 Distrib 5.6.19, for osx10.7 (x86_64)
--
-- Host: localhost Database: test
-- ------------------------------------------------------
-- Server version 5.6.19
/*!40101 ZHOUJY SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */ZHOUJY;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!40101 SET NAMES utf8 */;
/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
/*!40103 SET TIME_ZONE='+00:00' */ ZHOUJY;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;逐步读取指定字节数文件
1
2
3
4
5
6
7
8
9
10
11#!/usr/bin/python
# -*- encoding: utf-8 -*-
import mmap
import contextlib
with open('test.txt', 'r') as f:
with contextlib.closing(mmap.mmap(f.fileno(), 0,access=mmap.ACCESS_READ)) as m:
print '读取10个字节的字符串 :', m.read(10)
print '支持切片,对读取到的字符串进行切片操作:', m[2:10]
print '读取之前光标后的10个字符串', m.read(10)1
2
3
4~$ python untitled.py
读取10个字节的字符串 : -- ZHOUJY
支持切片,对读取到的字符串进行切片操作: ZHOUJY
读取之前光标后的10个字符串 ---dump 1从整个文件查找所有匹配的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21#!/usr/bin/python
# -*- encoding: utf-8 -*-
import mmap
import contextlib
word = 'ZHOUJY'
print '查找:', word
f = open('test.txt', 'r')
with contextlib.closing(mmap.mmap(f.fileno(), 0,access=mmap.ACCESS_READ)) as m:
#也可以通过find(str,pos)来处理
while True:
line = m.readline().strip()
if line.find(word)>=0:
print "结果:"
print line
elif m.tell()==m.size():
break
else:
pass1
2
3
4
5
6
7
8~$ python untitled.py
查找: ZHOUJY
结果:
-- ZHOUJY ---dump 10.13 Distrib 5.6.19, for osx10.7 (x86_64)
结果:
/*!40101 ZHOUJY SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */ZHOUJY;
结果:
/*!40103 SET TIME_ZONE='+00:00' */ ZHOUJY;从整个文件里查找,找到就退出(确认到底是否存在)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16#!/usr/bin/python
# -*- encoding: utf-8 -*-
import mmap
import contextlib
word = 'ZHOUJY'
print '查找:', word
f = open('test.txt', 'r')
with contextlib.closing(mmap.mmap(f.fileno(), 0,access=mmap.ACCESS_READ)) as m:
#不需要循环,只要找到一个就可以了
loc = m.find(word)
if loc >= 0:
print loc
print m[loc:loc+len(word)]1
2
3
4~$ python untitled.py
查找: ZHOUJY
194
ZHOUJY通过正则查找,(找出40开头的数字)
1
2
3
4
5
6
7
8
9
10
11
12#!/usr/bin/python
# -*- encoding: utf-8 -*-
import mmap
import re
import contextlib
pattern = re.compile(r'(40\d*)')
with open('test.txt', 'r') as f:
with contextlib.closing(mmap.mmap(f.fileno(), 0,access=mmap.ACCESS_READ)) as m:
print pattern.findall(m)1
2~$ python untitled.py
['40101', '40101', '40101', '40101', '40103', '40103', '40014', '40014', '40101', '40111']替换文本中出现一次的内容。比如想把A库的备份文件(9G)还原到B库,需要把里面的USE
A
改成USEB
。- sed:时间消耗近105s;磁盘IO几乎跑满;内存几乎没消耗、CPU消耗10~20%之间。
1
2
3
4
5
6
7
8
91:替换文本中第一次出现的内容
~$ date && sed -i '0,/USE `edcba`;/s//USE `ABCDE`;/' test.sql && date
2016年 11月 16日 星期三 12:04:17 CST
2016年 11月 16日 星期三 12:06:02 CST
2:替换文本中指定行的内容
~$ date && sed -i '24s/USE `ABCDE`;/USE `edcba`;/' test.sql && date
2016年 11月 16日 星期三 12:09:05 CST
2016年 11月 16日 星期三 12:10:50 CSTIO消耗:
1
2
3
4
5
6
7
8
9
10
11
12
13
14Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
sda 1.00 7.00 772.00 105.00 87.22 92.06 418.65 27.90 31.35 2.21 245.56 1.14 100.00
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
sda 1.00 4.00 778.00 102.00 87.59 90.03 413.36 25.08 30.30 2.59 241.65 1.13 99.60
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
sda 2.00 5.00 771.00 101.00 87.48 88.04 412.22 29.80 30.24 2.34 243.21 1.14 99.60
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
sda 1.00 18.00 431.00 137.00 49.08 122.04 616.99 66.20 70.25 3.02 281.75 1.75 99.60
Device: rrqm/s wrqm/s r/s w/s rMB/s wMB/s avgrq-sz avgqu-sz await r_await w_await svctm %util
sda 0.00 1.00 1.00 248.00 0.00 177.04 1456.16 105.24 416.53 24.00 418.11 4.02 100.00python处理:时间消耗是毫秒级别的,几乎是秒级别完成,该情况比较特别:搜索的关键词在大文本里比较靠前的位置,这样处理上T的大文件也是非常快的,要是搜索的关键词靠后怎会怎么样呢?后面会说明。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19#!/usr/bin/python
# -*- encoding: utf-8 -*-
import mmap
import contextlib
import re
word = 'USE `EDCBA`;'
replace = 'USE `ABCDE`;'
print '查找:', word
print'替换:', replace
f = open('test.sql', 'r+')
with contextlib.closing(mmap.mmap(f.fileno(), 0,access=mmap.ACCESS_WRITE)) as m:
loc = m.find(word)
if loc >=0:
print loc
m[loc:loc + len(word)] = replace1
2
3
4
5
6~$ date && python mmap_python.py && date
2016年 11月 16日 星期三 12:14:19 CST
查找: USE `EDCBA`;
替换: USE `ABCDE`;
929
2016年 11月 16日 星期三 12:14:19 CST
替换文本中所有匹配的关键词。比如想把备份文件里的ENGINE=MYISAM改成ENGINE=InnoDB,看看性能如何。
sed处理:时间消耗110s;磁盘IO几乎跑满(读写IO高);内存几乎没消耗、CPU消耗10~30%之间。
1
2
3~$ date && sed -i 's/ENGINE=InnoDB/ENGINE=MyISAM/g' test.sql && date
2016年 11月 16日 星期三 12:19:30 CST
2016年 11月 16日 星期三 12:21:20 CST和①中sed的执行效果差不多,其实对于处理一条还是多条记录,sed都是做同样工作量的事情,至于原因可以看 了解sed的工作原理说明,个人理解大致意思就是:sed是1行1行读取(所以内存消耗很小),放入到自己设置的缓冲区里,替换完之后再写入(所以IO很高),处理速度受限于CPU和IO。
python处理:时间消耗20多秒,比sed少。因为不用重写所有内容,只需要替换指定的内容即可,并且是在内存中处理的,所以写IO的压力几乎没有。当关键词比较靠后,其读入的数据就比较大,文件需要从磁盘读入到内存,这时磁盘的读IO也很高,写IO还是没有。因为是虚拟内存映射文件,所以占用的物理内存不多,虽然通过TOP看到的内存使用率%mem很高,这里可以不用管,因为大部分都是在SHR列里的消耗,真正使用掉的内存可以通过RES-SHR来计算。关于top中SHR的意思,可以去看相关文章说明。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25#!/usr/bin/python
# -*- encoding: utf-8 -*-
import mmap
import contextlib
word = 'ENGINE=MyISAM'
replace = 'ENGINE=InnoDB'
print '查找:', word
print'替换:', replace
loc = 0
f = open('test.sql', 'r+')
with contextlib.closing(mmap.mmap(f.fileno(), 0,access=mmap.ACCESS_WRITE)) as m:
while True:
loc = m.find(word,loc)
if loc >=0:
print loc
m[loc:loc + len(word)] = replace
#要是access=mmap.ACCESS_COPY需要执行flush
#m.flush()
elif loc == -1:
break
else:
pass1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20~$ date && python mmap_python.py && date
2016年 11月 16日 星期三 13:19:30 CST
查找: ENGINE=MyISAM
替换: ENGINE=InnoDB
1663
5884938
11941259
12630481
12904261
64852169
64859312
65018692
65179617
65181544
65709930
149571849
3592900115
5874952354
7998151839
2016年 11月 16日 星期三 13:19:55 CST
正则匹配修改,这个可以通过上面介绍的查找方法,做下修改即可,就不再做说明。
小结:对比sed和python处理文件的方法,这里来小结下:对于sed不管修改的关键字在文本中的任意位置、次数,修改的工作量都一样(全文的读写IO),差距不大;对于python mmap的修改,要是关键字出现在比较靠前的地方,修改起来速度非常快,否则修改也会有大量的读IO,写IO没有。通过上面的对比分析来看,mmap的修改要比sed修改性能高。
Python还有另一个读取操作的方法:open中的read、readline、readlines,这个方法是把文件全部载入内存,再进行操作。若内存不足直接用swap或则报错退出,内存消耗和文本大小成正比,而通过mmap模块的方法可以很好的避免了这个问题。
总结
通过上面的介绍,大致知道如何使用mmap模块了,其大致特点如下:
- 普通文件被映射到虚拟地址空间后,程序可以向访问普通内存一样对文件进行访问,在有些情况下可以提高IO效率。
- 它占用物理内存空间少,可以解决内存空间不足的问题,适合处理超大文件。
- 不同于通常的字符串对象,它是可变的,可以通过切片的方式更改,也可以定位当前文件位置
m.tell()
或m.seek()
定位到文件的指定位置,再进行m.write(str)
固定长度的修改操作。
最后,可以把mmap封装起来进行使用了,脚本信息:
1 | #!/usr/bin/python |
1 | ~$ python mmap_search.py -h |
1 | 1)sed:替换文本中第一次出现的内容 |
结论:修改大文本文件,通过sed处理,不管被修改的词在哪个位置都需要重写整个文件;而mmap修改文本,被修改的词越靠前性能越好,不需要重写整个文本,只要替换被修改词语的长度即可。
并行与并发
本文是对《Effective Python》37、38、39条中关于python 多线程的总结。主要分为以下3个部分:
- 并发与并行
- 多线程的数据共享和竞态
- 在阻塞式I/O任务中使用Queue来协调多线程C:\Users\陈润青\Downloads\Practice\test.py
并发(concurrency)和并行(parallelism)
- 并发:计算机似乎在同一时间做了多个任务,但实际上只是在多个任务间快速切换。比如一个单核CPU上在1分钟处理了4个任务,实际上只是每个任务执行1s后就换另外一个任务。
- 并行:计算机确实在同一时间做着多个任务。比如在4核CPU上,每个核心处理一个任务,1分钟过后,每个任务都做了1分钟。而上面并发的例子中,每个任务只做了1/4分钟。
并行与并发的关键区别,就在于能不能提速(speedup)。关于并发核并行的区别, geeksforgeeks总结得较好:
并发
多线程的数据共享
比如有个程序,它做的操作只有一条cnt = cnt + 1
,如果将这个程序写成多线程(假设两个),那么可能最后的输出是1,而不是2。要理解背后的原因,需要将cnt = cnt + 1
写成汇编形式
1 | // 将共享变了cnt加载到accumulator register |
如果执行顺序为:线程1执行step1,线程2执行step1,线程1执行step2,线程2执行step2,线程1执行step3,线程2执行step3,结果显然为1。为了解决多线程的数据竞争,需要对数据合理加锁。对于上述多线程中的race condition问题,可以阅读CSAPP 12.5节《Synchronizing Threads with Semaphores》。实操一下《Effective Python》中的例子:
1 | from threading import Thread |
这里值得注意的是,虽然how_many
设置为10000时,得到的结果并不是5倍的how_many
,但是how_many
很小时,比如1000,结果确实是5倍的how_many
。这是因为第二个线程开始时,第一个线程已经完成了worker内的计算。
为了解决上述竞态问题,下面是经过数据加锁的代码,threading中的Lock类是用标准方法实现的互斥锁(mutex):
1 | from threading import Thread, Lock |
试了一下在counter.increment(1)
处加锁,也有用,和原文不一致。
在阻塞式I/O中使用多线程
在做一些项目的过程中,会遇到有以下特点的任务:
- 整个任务可以划分成按序执行的多个阶段(可以表示成pipeline):
Task = stage1-> stage2 -> ... -> stageN
- 其中有些stage是阻塞式I/O操作
举个例子:Stage1: 从网络下载图片;Stage 2: 判断图片是否包含小动物; Stage 3: 将包含小动物的图像通过网络传递给客户A。这个过程中Stage 1和Stage 3都是非计算密集型的I/O操作,它可能只需要一条接收或发送语句,接下来等待得到数据或对方接收到数据就行了,主要的计算在Stage 2中。
面对具有这样特点的任务,就可以考虑使用Pyhton中的多线程来提高速度。(注: Python中由于GIL的存在,如果这些stage都是计算密集型的任务,使用多线程无法提高效率,在3.2中我们会具体解释)
Queue
处理上述特点任务时,我们通常会使用Queue
来协调各线程间的工作,下面简单介绍一下内置queue
模块中的Queue
类。
task_done()
方法:标识队列中的某个元素已经出队列了(某个任务已经完成了)join()
方法:阻塞,直到队列中所有元素都出队列了(队列为空)
如果队列获取某个元素,并对其执行一系列操作后,并未调用task_done()进行标识,调用join()会一直阻塞。
举个例子,下面这段代码永远不会执行最后一句print语句
1 | from queue import Queue |
为什么这类任务可以考虑多线程
还是举《Effective Python》中的例子,考虑一个3阶段的任务:
- 从网络下载图片download;
- 对图片进行处理resize;
- 将图片上传upload。该任务有阻塞式I/O操作(图片还没完全下载下来,下一个步骤就进行不了)。
如果在编写代码时,将download、resize和upload3个函数进行如下实现:
1 | from threading import Thread |
运行代码,得到的计算时间满足T = 10 + (N - 1) * 5
。看到这个结果,有人可能会纳闷,不是说Python中由于GIL的存在,多个线程只有一个能获得对Python Interpreter的锁,相当于只使用了一个CPU核心吗,这样应该无法提速啊。其实应该注意到time.sleep()
操作应该是不占用CPU的,sleep的过程和阻塞式I/O的等待过程类似,而这正是多线程为什么在这类任务上可以提高效率的原因。
如果将这3个函数实现为计算密集型版本(必须使用CPU),并重新计算花费的时间,在这种情况下,使用多线程就不能带来速度上的提升了。
1 | def download(item): |
并行
multiprocessing包是Python中的多进程管理包。与threading.Thread
类似,它可以利用multiprocessing.Process
对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start()
, run()
, join()
的方法。此外multiprocessing包中也有Lock
/Event
/Semaphore
/Condition
类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。但在使用这些共享API的时候,我们要注意以下几点:
- 在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
- multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用
Lock
/Event
/Semaphore
/Condition
等同步方式 (因为它们占据的不是用户进程的资源)。 - 多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存和Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。
- 在多进程下run方法启动相当于直接调用函数,并没有真正意义上使用多进程,这一点我们可以通过pid看的出来。而start启动却是真正意义上调用了多进程,同样我们可以通过pid看的出来
Process.PID中保存有PID,如果进程还没有start()
,则PID为None。
我们可以从下面的程序中看到Thread对象和Process对象在使用上的相似性与结果上的不同。各个线程和进程都做一件事:打印PID。但问题是,所有的任务在打印的时候都会向同一个标准输出(stdout)输出。这样输出的字符会混合在一起,无法阅读。使用Lock同步,在一个任务输出完成之后,再允许另一个任务输出,可以避免多个任务同时向终端输出
1 | import os |
1 | Main: 10012 |
Pipe和Queue
正如我们在Linux多线程中介绍的管道PIPE和消息队列message queue,multiprocessing包中有Pipe类和Queue类来分别支持这两种IPC机制。Pipe和Queue可以用来传送常见的对象。
Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过
mutiprocessing.Pipe(duplex=False)
创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。下面的程序展示了Pipe的使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23import multiprocessing as mul
def proc1(pipe):
pipe.send('hello')
print('proc1 rec:', pipe.recv())
def proc2(pipe):
print('proc2 rec:', pipe.recv())
pipe.send('hello, too')
# Build a pipe
pipe = mul.Pipe()
if __name__ == '__main__':
# Pass an end of the pipe to process 1
p1 = mul.Process(target=proc1, args=(pipe[0],))
# Pass the other end of the pipe to process 2
p2 = mul.Process(target=proc2, args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join()1
2proc2 rec: hello
proc1 rec: hello, too这里的Pipe是双向的。Pipe对象建立的时候,返回一个含有两个元素的表,每个元素代表Pipe的一端(Connection对象)。我们对Pipe的某一端调用
send()
方法来传送对象,在另一端使用recv()
来接收。Queue与Pipe相类似,都是先进先出的结构。但Queue允许多个进程放入,多个进程从队列取出对象。Queue使用
mutiprocessing.Queue(maxsize)
创建,maxsize表示队列中可以存放对象的最大数量。下面的程序展示了Queue的使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42import os
import multiprocessing
import time
#==================
# input worker
def inputQ(queue):
info = str(os.getpid()) + '(put):' + str(time.time())
queue.put(info)
# output worker
def outputQ(queue,lock):
info = queue.get()
lock.acquire()
print (str(os.getpid()) + ' get: ' + info)
lock.release()
#===================
# Main
record1 = [] # store input processes
record2 = [] # store output processes
lock = multiprocessing.Lock() # To prevent messy print
queue = multiprocessing.Queue(3)
if __name__ == '__main__':
# input processes
for i in range(10):
process = multiprocessing.Process(target=inputQ,args=(queue,))
process.start()
record1.append(process)
# output processes
for i in range(10):
process = multiprocessing.Process(target=outputQ,args=(queue,lock))
process.start()
record2.append(process)
for p in record1:
p.join()
queue.close() # No more object will come, close the queue
for p in record2:
p.join()1
2
3
4
5
6
7
8
9
108572 get: 6300(put):1555486924.3676226
8136 get: 3464(put):1555486924.412625
9576 get: 9660(put):1555486924.5126307
6936 get: 5064(put):1555486924.5976355
10652 get: 8688(put):1555486924.5976355
6992 get: 10988(put):1555486924.7526445
6548 get: 6836(put):1555486924.7456443
3504 get: 7284(put):1555486924.7666454
8652 get: 4960(put):1555486924.8536503
10868 get: 460(put):1555486924.8606508一些进程使用
put()
在Queue中放入字符串,这个字符串中包含PID和时间。另一些进程从Queue中取出,并打印自己的PID以及get()
的字符串。
进程池
进程池 (Process Pool)可以创建多个进程。这些进程就像是随时待命的士兵,准备执行任务(程序)。一个进程池中可以容纳多个待命的进程。
1 | import multiprocessing as mul |
1 | [1, 4, 9, 16, 25, 36, 49, 64, 81, 100] |
我们创建了一个容许5个进程的进程池 (Process Pool) 。Pool运行的每个进程都执行f()函数。我们利用map()
方法,将f()
函数作用到表的每个元素上。这与built-in的map()
函数类似,只是这里用5个进程并行处理。如果进程运行结束后,还有需要处理的元素,那么进程会被用于重新运行f()
函数。除了map()
方法外,Pool还有下面的常用方法。
apply_async(func,args)
从进程池中取出一个进程执行func,args参数。它将返回一个AsyncResult的对象,你可以对该对象调用get()
方法以获得结果。close()
进程池不再创建新的进程join()
wait进程池中的全部进程。必须对Pool先调用close()
方法才能join。
共享内存
实例代码:
1 | import multiprocessing |
1 | 0.0 |
这里我们实际上只有主进程和Process对象代表的进程。我们在主进程的内存空间中创建共享的内存,也就是Value和Array两个对象。对象Value被设置成为双精度数(d), 并初始化为1.0。而Array则类似于C中的数组,有固定的类型(i, 也就是整数)。在Process进程中,我们修改了Value和Array对象。回到主程序,打印出结果,主程序也看到了两个对象的改变,说明资源确实在两个进程之间共享。
Manager
Manager是通过共享进程的方式共享数据。Manager管理的共享数据类型有:Value、Array、dict、list、Lock、Semaphore等等,同时Manager还可以共享类的实例对象。
实例代码:
1 | from multiprocessing import Process,Manager |
1 | [21, 22, 23, 24, 25] |