好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

python中多进程的详细介绍(代码示例)

本篇文章给大家带来的内容是关于python中多进程的详细介绍(代码示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

本节讲学习Python的多进程。

一、多进程和多线程比较

多进程 Multiprocessing 和多线程 threading 类似, 他们都是在 python 中用来 并行 运算的. 不过既然有了 threading, 为什么 Python 还要出一个 multiprocessing 呢? 原因很简单, 就是用来弥补 threading 的一些劣势, 比如在 threading 教程中提到的 GIL .

使用 multiprocessing 也非常简单, 如果对 threading 有一定了解的朋友, 你们的享受时间就到了. 因为 python 把 multiprocessing 和 threading 的使用方法做的几乎差不多. 这样我们就更容易上手. 也更容易发挥你电脑多核系统的威力了!

二、添加进程Process

import multiprocessing as mp
import threading as td

def job(a,d):
    print('aaaaa')

t1 = td.Thread(target=job,args=(1,2))
p1 = mp.Process(target=job,args=(1,2))
t1.start()
p1.start()
t1.join()
p1.join() 

从上面的使用对比代码可以看出,线程和进程的使用方法相似。

使用

在运用时需要添加上一个定义main函数的语句

if __name__=='__main__': 

完整的应用代码:

# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_test.py
@time: 18/8/26 01:12
"""

import multiprocessing as mp

def job(a, d):
    print a, d

if __name__ == '__main__':
    p1 = mp.Process(target=job, args=(1, 2))
    p1.start()
    p1.join() 

运行环境要在terminal环境下,可能其他的编辑工具会出现运行结束后没有打印结果,在terminal中的运行后打印的 结果为:

?  baseLearn python ./process/process_test.py
1 2
?  baseLearn 

三、存储进程 输出Queue

Queue的功能是将每个核或线程的运算结果放在队里中, 等到每个线程或核运行完毕后再从队列中取出结果, 继续加载运算。原因很简单, 多线程调用的函数不能有返回值, 所以使用Queue存储多个线程运算的结果

process_queue.py

# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_queue.py
@time: 18/8/26 01:12
"""

import multiprocessing as mp

# 定义一个被多线程调用的函数,q 就像一个队列,用来保存每次函数运行的结果
def job(q):
    res = 0
    for i in range(1000):
        res += i + i**2 + i**3
    q.put(res)   #queue

if __name__ == '__main__':
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))

    # 分别启动、连接两个线程
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    # 上面是分两批处理的,所以这里分两批 
输出,将结果分别保存 res1 = q.get() res2 = q.get() print res1,res2

打印 输出结果:

? python ./process/process_queue.py
249833583000 249833583000 

四、进程池

进程池 就是我们将所要运行的东西,放到池子里, Python会自行解决多进程的问题 。

1、导入多进程模块

首先 import multiprocessing 和定义 job()

import multiprocessing as mp

def job(x):
    return x*x 
2、进程池Pool()和map()

然后我们定义一个 Pool

pool = mp.Pool() 

有了池子之后,就可以让池子对应某一个函数,我们向池子里丢数据,池子就会返回函数返回的值。 Pool 和之前的 Process的 不同点是 丢向Pool的函数有返回值 ,而 Process 的 没有返回值 。

接下来用 map() 获取结果,在 map() 中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核,返回结果

res = pool.map(job, range(10)) 

让我们来运行一下

def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    
if __name__ == '__main__':
    multicore() 

完成代码:

# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_queue.py
@time: 18/8/26 01:12
"""

import multiprocessing as mp

def job(x):
    return x*x  # 注意这里的函数有return返回值

def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    
if __name__ == '__main__':
    multicore() 

执行结果:

?  baseLearn python ./process/process_pool.py
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] 
3、自定义核数量

我们怎么知道 Pool 是否真的调用了多个核呢?我们可以把迭代次数增大些,然后打开CPU负载看下CPU运行情况

打开CPU负载(Mac):活动监视器 > CPU > CPU负载(单击一下即可)

Pool默认大小是CPU的核数,我们也可以通过在 Pool 中传入 processes 参数即可自定义需要的核数量

def multicore():
    pool = mp.Pool(processes=3) # 定义CPU核数量为3
    res = pool.map(job, range(10))
    print(res) 
4、apply_async()

Pool 除了 map() 外,还有可以返回结果的方式,那就是 apply_async() .

apply_async() 中 只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号, 同时需要用get()方法获取返回值

def multicore():
    pool = mp.Pool() 
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get获得结果
    print(res.get()) 

运行结果;

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]  # map()
4 # apply_async() 
总结

Pool 默认调用是CPU的核数,传入processes参数可自定义CPU核数

map() 放入迭代参数,返回多个结果

apply_async() 只能放入一组参数,并返回一个结果,如果想得到map()的效果需要通过迭代

五、共享内存shared memory

这节我们学习如何定义共享内存。 只有用共享内存才能让CPU之间有交流 。

Shared Value

我们可以通过使用 Value 数据存储在一个共享的内存表中。

import multiprocessing as mp

value1 = mp.Value('i', 0) 
value2 = mp.Value('d', 3.14) 

其中 d 和 i 参数用来设置数据类型的, d 表示一个双精浮点类型 double, i 表示一个带符号的 整型 。

Type code C Type Python Type Minimum size in bytes 'b' signed char int 1 'B' unsigned char int 1 'u' Py_UNICODE Unicode character 2 'h' signed short int 2 'H' unsigned short int 2 'i' signed int int 2 'I' unsigned int int 2 'l' signed long int 4 'L' unsigned long int 4 'q' signed long long int 8 'Q' unsigned long long int 8 'f' float float 4 'd' double float 8 Shared Array

在Python的 mutiprocessing 中,有还有一个 Array 类, 可以和共享内存交互,来实现在进程之间共享数据 。

array = mp.Array('i', [1, 2, 3, 4]) 

这里的 Array 和numpy中的不同,它只能是 一维 的,不能是多维的。同样和 Value 一样,需要定义数据形式,否则会报错。 我们会在后一节举例说明这两种的使用方法.

错误形式

array = mp.Array('i', [[1, 2], [3, 4]]) # 2维list

"""
TypeError: an integer is required
""" 

六、进程锁Lock 不加进程锁

让我们看看没有加进程锁时会产生什么样的结果。

# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_no_lock.py
@time: 18/8/26 09:22
"""

import multiprocessing as mp
import time

def job(v, num):
    for _ in range(5):
        time.sleep(0.5) # 暂停0.5秒,让 
输出效果更明显 v.value += num # v.value获取共享变量值 print(v.value) def multicore(): v = mp.Value('i', 0) # 定义共享变量 p1 = mp.Process(target=job, args=(v, 1)) p2 = mp.Process(target=job, args=(v, 4)) # 设定不同的number看如何抢夺内存 p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicore()

在上面的代码中,我们定义了一个共享变量 v ,两个进程都可以对它进行操作。 在job()中我们想让 v 每隔0.1秒 输出一次累加 num 的结果,但是在两个进程 p1 和 p2 中设定了不同的累加值。所以接下来让我们来看下这两个进程是否会出现冲突。

结果打印:

?  baseLearn python ./process/process_no_lock.py
1
5
9
9
13
13
17
17
18
18
?  baseLearn 

我们可以看到,进程1和进程2在相互 抢 着使用 共享内存v 。

加进程锁

为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁来解决。

首先需要定义一个进程锁

 l = mp.Lock() # 定义一个进程锁 

然后将进程锁的信息传入各个进程中

p1 = mp.Process(target=job, args=(v,1,l)) # 需要将Lock传入
p2 = mp.Process(target=job, args=(v,3,l)) 

在 job() 中设置进程锁的使用, 保证运行时一个进程的对锁内内容的独占

def job(v, num, l):
    l.acquire() # 锁住
    for _ in range(5):
        time.sleep(0.1) 
        v.value += num # v.value获取共享内存
        print(v.value)
    l.release() # 释放 

全部代码:

# -*- coding:utf-8 -*-

"""
@author: Corwien
@file: process_lock.py
@time: 18/8/26 09:22
"""

import multiprocessing as mp
import time

def job(v, num, l):
    l.acquire() # 锁住
    for _ in range(5):
        time.sleep(0.5) # 暂停0.5秒,让 
输出效果更明显 v.value += num # v.value获取共享变量值 print(v.value) l.release() # 释放 def multicore(): l = mp.Lock() # 定义一个进程锁 v = mp.Value('i', 0) # 定义共享变量 p1 = mp.Process(target=job, args=(v, 1, l)) # 需要将lock传入 p2 = mp.Process(target=job, args=(v, 4, l)) # 设定不同的number看如何抢夺内存 p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicore()

运行一下,让我们看看是否还会出现抢占资源的情况:

结果打印:

?  baseLearn python ./process/process_lock.py
1
2
3
4
5
9
13
17
21
25 

显然,进程锁保证了进程 p1 的完整运行,然后才进行了进程 p2 的运行

相关推荐:

python 多进程通信模块

Python守护进程(多线程开发)

以上就是python中多进程的详细介绍(代码示例)的详细内容,更多请关注Gxl网其它相关文章!

查看更多关于python中多进程的详细介绍(代码示例)的详细内容...

  阅读:44次