好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

python多线程,多进程

threading. active_count ()

返回当前存活的线程类  Thread  对象。返回的计数等于  enumerate()  返回的列表长度。

threading. current_thread ()

返回当前对应调用者的控制线程的  Thread  对象。如果调用者的控制线程不是利用  threading  创建,会返回一个功能受限的虚拟线程对象。

threading. get_ident ()

 

返回当前线程的 “线程标识符”。它是一个非零的整数。它的值没有直接含义,主要是用作 magic cookie,比如作为含有线程相关数据的字典的索引。线程标识符可能会在线程退出,新线程创建时被复用。

threading. enumerate ()

以列表形式返回当前所有存活的  Thread  对象。 该列表包含守护线程, current_thread()  创建的虚拟线程对象和主线程。它不包含已终结的线程和尚未开始的线程。

threading. main_thread ()

 

返回主  Thread  对象。一般情况下,主线程是Python解释器开始时创建的线程。

threading. setprofile ( func )

为所有  threading  模块开始的线程设置性能测试函数。在每个线程的  run()  方法被调用前, func  会被传递给  sys.setprofile()  。

threading. stack_size ([ size ])

返回创建线程时用的堆栈大小。可选参数  size  指定之后新建的线程的堆栈大小,而且一定要是0(根据平台或者默认配置)或者最小是32,768(32KiB)的一个正整数。如果  size  没有指定,默认是0。如果不支持改变线程堆栈大小,会抛出  RuntimeError  错误。如果指定的堆栈大小不合法,会抛出  ValueError  错误并且不会修改堆栈大小。32KiB是当前最小的能保证解释器有足够堆栈空间的堆栈大小。需要注意的是部分平台对于堆栈大小会有特定的限制,例如要求大于32KiB的堆栈大小或者需要根据系统内存页面的整数倍进行分配 - 应当查阅平台文档有关详细信息(4KiB页面比较普遍,在没有更具体信息的情况下,建议的方法是使用4096的倍数作为堆栈大小)。

threading. TIMEOUT_MAX

阻塞函数(  Lock.acquire() ,  RLock.acquire() ,  Condition.wait() , ...)中形参  timeout  允许的最大值。传入超过这个值的 timeout 会抛出  OverflowError  异常。

有个 "主线程" 对象;这对应Python程序里面初始的控制线程。它不是一个守护线程。

"虚拟线程对象" 是可以被创建的。这些是对应于“外部线程”的线程对象,它们是在线程模块外部启动的控制线程,例如直接来自C代码。虚拟线程对象功能受限;他们总是被认为是存活的和守护模式,不能被  join()  。因为无法检测外来线程的终结,它们永远不会被删除。

class  threading. Thread ( group=None ,  target=None ,  name=None ,  args=() ,  kwargs={} ,  * ,  daemon=None )

调用这个构造函数时,必需带有关键字参数。参数如下:

group  应该为  None ;为了日后扩展  ThreadGroup  类实现而保留。

target  是用于  run()  方法调用的可调用对象。默认是  None ,表示不需要调用任何方法。

name  是线程名称。默认情况下,由 "Thread- N " 格式构成一个唯一的名称,其中  N  是小的十进制数。

args  是用于调用目标函数的参数元组。默认是  () 。

kwargs  是用于调用目标函数的关键字参数字典。默认是  {} 。

如果  daemon  不是  None ,线程将被显式的设置为  守护模式 ,不管该线程是否是守护模式。如果是  None  (默认值),线程将继承当前线程的守护模式属性。

如果子类型重载了构造函数,它一定要确保在做任何事前,先发起调用基类构造器( Thread.__init__() )。

在 3.3 版更改: 加入  daemon  参数。

start ()

开始线程活动。

它在一个线程里最多只能被调用一次。它安排对象的  run()  方法在一个独立的控制进程中调用。

如果同一个线程对象中调用这个方法的次数大于一次,会抛出  RuntimeError  。

run ()

代表线程活动的方法。

你可以在子类型里重载这个方法。 标准的  run()  方法会对作为  target  参数传递给该对象构造器的可调用对象(如果存在)发起调用,并附带从  args  和  kwargs  参数分别获取的位置和关键字参数。

join ( timeout=None )

等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用  join()  的线程终结 -- 不管是正常终结还是抛出未处理异常 -- 或者直到发生超时,超时选项是可选的。

当  timeout  参数存在而且不是  None  时,它应该是一个用于指定操作超时的以秒为单位的浮点数(或者分数)。因为  join()  总是返回  None  ,所以你一定要在  join()  后调用  is_alive()  才能判断是否发生超时 -- 如果线程仍然存货,则  join()  超时。

当  timeout  参数不存在或者是  None  ,这个操作会阻塞直到线程终结。

一个线程可以被  join()  很多次。

如果尝试加入当前线程会导致死锁,  join()  会引起  RuntimeError  异常。如果尝试  join()  一个尚未开始的线程,也会抛出相同的异常。

name

只用于识别的字符串。它没有语义。多个线程可以赋予相同的名称。 初始名称由构造函数设置。

getName () setName ()

旧的  name  取值/设值 API;直接当做特征属性使用它。

ident

这个线程的 ‘线程标识符‘,如果线程尚未开始则为  None  。这是个非零整数。参见  get_ident()  函数。当一个线程退出而另外一个线程被创建,线程标识符会被复用。即使线程退出后,仍可得到标识符。

is_alive ()

返回线程是否存活。

当  run()  方法刚开始直到  run()  方法刚结束,这个方法返回  True  。模块函数  enumerate()  返回包含所有存活线程的列表。

daemon

一个表示这个线程是(True)否(False)守护线程的布尔值。一定要在调用  start()  前设置好,不然会抛出  RuntimeError  。初始值继承于创建线程;主线程不是守护线程,因此主线程创建的所有线程默认都是  daemon  =  False 。

当没有存活的非守护线程时,整个Python程序才会退出。

isDaemon () setDaemon ()

旧的  name  取值/设值 API;建议直接当做特征属性使用它。

#线程是cpu最小的单元

from threading import Thread,active_count,current_thread,enumerate,main_thread,stack_size,Lock
import time
import random

# print(active_count()) #返回当前的线程数
# print(current_thread()) #返回当前的线程Thread对象
# print(enumerate()) #反回当前活动的线程
# print(main_thread()) #返回当前的主线程
# print(stack_size()) #返回创建线程时使用的栈的大小,如果size参数,则用来指定后续创建的线程使用的栈的大小,size必须是0,(表示使用系统默认的值)大于32k的正整数

#
arr = []
def task(num):
    time.sleep(3)
    print(num)
#
start = time.time()

for i in range(0,5):
    t =Thread(target=task,args=(i,))
    t.start()
    arr.append(t)
    # print(enumerate())
    # t.join()

for j in arr:
    j.join()

end = time.time()
print(‘时间:%s‘%(end-start))

‘‘‘
Thread对象常用方法:
start()运行线程
join()阻塞主线程 time阻塞时间
name 线程的名字
is_alive() 判断线程是否存活
ident 线程的标识
daemon 是否为守护进程
‘‘‘

# class MyI(Thread):
#     def __init__(self):
#         super(MyI,self).__init__()
#     def run(self):
#         time.sleep(3)
#         print(self.name,self.ident,self.daemon)
#
# t1 = MyI()
# t1.name = "任务一"
# # t1.daemon=True
# t1.start()

#
# h2=[]
# l = Lock()
# class xm(Thread):
#
#     def __init__(self):
#         super(xm,self).__init__()
#         self.h = True
#     def run(self):
#
#         if len(h2)<5:
#             l.acquire()
#             while self.h:
#                 h2.append(1)
#                 print(‘小明往锅里加了%s‘%len(h2)+‘个丸子‘,‘锅里有%s‘%len(h2)+‘个丸子‘)
#                 if len(h2)==5:
#                     l.release()
#                     self.h = False
#
# class xh(Thread):
#     def __init__(self):
#         super(xh,self).__init__()
#         self.h = True
#
#     def run(self):
#         i = 0
#         if len(h2)==5:
#             l.acquire()
#             while self.h:
#                 i+=1
#                 h2.pop(0)
#                 print(‘小红吃了锅里的%s‘%(i)+‘个丸子‘,‘锅里有%s‘%len(h2)+‘个丸子‘)
#                 if len(h2)==0:
#                     l.release()
#                     self.h = False
#
#
# x1 = xm()
# x1.start()
#
# x2 = xh()
# x2.start()
#
# ‘‘‘
# def m(*args):
#     print(random.randint(1,args))
# Thread(group=None,target=m(),args=(1,2,34,6)) #构造函数
# target 线程启动执行函数
# args 是元组 是线程执行函数的参数
# kwargs 是线程执行函数的参数
# ‘‘‘

multiprocessing  是一个用与  threading  模块相似API的支持产生进程的包。  multiprocessing  包同时提供本地和远程并发,使用子进程代替线程,有效避免 Global Interpreter Lock 带来的影响。因此,  multiprocessing  模块允许程序员充分利用机器上的多个核心。Unix 和 Windows 上都可以运行。

multiprocessing  模块还引入了在  threading  模块中没有类似物的API。这方面的一个主要例子是  Pool  对象,它提供了一种方便的方法,可以跨多个输入值并行化函数的执行,跨进程分配输入数据(数据并行)。以下示例演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本例子使用  Pool  ,

from multiprocessing import Pool def f(x): return x*x if __name__ == ‘__main__‘: with Pool(5) as p: print(p.map(f, [1, 2, 3])) 

将打印到标准输出

[1, 4, 9] 

Process  类

在  multiprocessing  中,通过创建一个  Process  对象然后调用它的  start()  方法来生成进程。  Process  和  threading.Thread  API 相同。 一个简单的多进程程序示例是:

from multiprocessing import Process def f(name): print(‘hello‘, name) if __name__ == ‘__main__‘: p = Process(target=f, args=(‘bob‘,)) p.start() p.join() 

要显示所涉及的各个进程ID,这是一个扩展示例:

from multiprocessing import Process import os def info(title): print(title) print(‘module name:‘, __name__) print(‘parent process:‘, os.getppid()) print(‘process id:‘, os.getpid()) def f(name): info(‘function f‘) print(‘hello‘, name) if __name__ == ‘__main__‘: info(‘main line‘) p = Process(target=f, args=(‘bob‘,)) p.start() p.join()  
POOL进程池
# from multiprocessing import Pool # def fun(): #     sum = 0 #     for i in range(0,100000): #         sum+=i #     print(sum) # # if __name__ == ‘__main__‘: #     pool = Pool(5)  #5个进程的进程池 #     start = time.time() #     for i in range(100): #         # pool.apply(fun) #         obj = pool.apply_async(fun) #         print(obj) # #     end  =time.time() #     print(end-start)  
 
       
       

队列

Queue  类是一个近似  queue.Queue  的克隆。 例如:

from multiprocessing import Process, Queue def f(q): q.put([42, None, ‘hello‘]) if __name__ == ‘__main__‘: q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, ‘hello‘]" p.join() 

队列是线程和进程安全的。

管道

Pipe()  函数返回一个由管道连接的连接对象,默认情况下是双工(双向)。例如:

from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, ‘hello‘]) conn.close() if __name__ == ‘__main__‘: parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, ‘hello‘]" p.join() 

返回的两个连接对象  Pipe()  表示管道的两端。每个连接对象都有  send()  和  recv()  方法(相互之间的)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的  同一  端,则管道中的数据可能会损坏。当然,同时使用管道的不同端的进程不存在损坏的风险。

 
       
       

进程之间的同步

multiprocessing  包含来自  threading  的所有同步基本体的等价物。例如,可以使用锁来确保一次只有一个进程打印到标准输出:

from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print(‘hello world‘, i) finally: l.release() if __name__ == ‘__main__‘: lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start() 

不使用来自不同进程的锁输出容易产生混淆。

 
       
       

在进程之间共享状态

如上所述,在进行并发编程时,通常最好尽量避免使用共享状态。使用多个进程时尤其如此。

但是,如果你真的需要使用一些共享数据,那么  multiprocessing  提供了两种方法。

共享内存

可以使用  Value  或  Array  将数据存储在共享内存映射中。例如,以下代码:

from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == ‘__main__‘: num = Value(‘d‘, 0.0) arr = Array(‘i‘, range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:]) 

将打印

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9] 

创建  num  和  arr  时使用的  ‘d‘  和  ‘i‘  参数是  array  模块使用的类型的 typecode :  ‘d‘  表示双精度浮点数,  ‘i‘  表示有符号整数。这些共享对象将是进程和线程安全的。

为了更灵活地使用共享内存,可以使用  multiprocessing.sharedctypes  模块,该模块支持创建从共享内存分配的任意ctypes对象。

服务器进程

由  Manager()  返回的管理器对象控制一个服务器进程,该进程保存Python对象并允许其他进程使用代理操作它们。

Manager()  返回的管理器支持类型:  list  、  dict  、  Namespace  、  Lock  、  RLock  、  Semaphore  、  BoundedSemaphore  、  Condition  、  Event  、  Barrier  、  Queue  、  Value  和  Array  。例如

from multiprocessing import Process, Manager def f(d, l): d[1] = ‘1‘ d[‘2‘] = 2 d[0.25] = None l.reverse() if __name__ == ‘__main__‘: with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l) 

将打印

{0.25: None, 1: ‘1‘, ‘2‘: 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] 

服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢。

服务端的创建 
from multiprocessing.managers import BaseManager from queue import Queue queue = Queue() queue.put(‘lolololol‘) class QueueManager(BaseManager): pass QueueManager.register(‘get_queue‘, callable=lambda:queue) m = QueueManager(address=(‘127.0.0.1‘, 5000), authkey=b‘2ab‘) s = m.get_server() s.serve_forever() #创建服务器  客户端的创建
from multiprocessing.managers import BaseManagerclass QueueManager(BaseManager): passQueueManager.register(‘get_queue‘)m = QueueManager(address=(‘127.0.0.1‘, 5000), authkey=b‘2ab‘)m.connect()queue = m.get_queue()print(queue.get())

查看更多关于python多线程,多进程的详细内容...

  阅读:22次