目录 1、多进程 示例:创建多进程 示例:创建多进程获取子进程任务结果 2、进程池 同步 异步 多个不同的任务方法执行同一个数据源返回不同的结果 多个数据源,执行相同的任务方法校验,发现不合格,就退出不再执行 回调函数-接收子进程返回结果,实时处理
1、多进程 示例:创建多进程
import os import time from multiprocessing import Process def task(num): print(f"父进程:{os.getppid()}-创建子进程{os.getpid()},执行任务:{num}") time.sleep(3) if __name__ == '__main__': start = time.time() p_list = [] print("当前环境进程:", os.getpid()) for i in range(5): p = Process(group=None, target=task, args=(i,), kwargs={}, name=f"进程名:校长{i}") p.start() p_list.append(p) [o.join() for o in p_list] print(f"父进程等待每个子进程任务结束,总耗时:{time.time()-start}") """ 结果如下: 当前环境进程:14276 父进程:14276-创建子进程8492,执行任务:0 父进程:14276-创建子进程15080,执行任务:1 父进程:14276-创建子进程14920,执行任务:2 父进程:14276-创建子进程13528,执行任务:4 父进程:14276-创建子进程13452,执行任务:3 父进程等待每个子进程任务结束,总耗时:3.257822036743164 因为每个子进程内存空间是隔离的,所以此时是无法得到每个任务的结果的,一般想得到结果可以采用队列的方式保存! """
示例:创建多进程获取子进程任务结果
import os from multiprocessing import (Process, Queue) def task(num, q): print(f"父进程:{os.getppid()}-创建子进程{os.getpid()},执行任务:{num}") q.put(f"任务:{os.getpid()}执行结果:{num}") if __name__ == '__main__': q_obj = Queue() p_list = [] for i in range(5): p = Process(target=task, args=(i, q_obj)) p.start() p_list.append(p) # 遍历每个子进程,确认其执行完毕 [o.join() for o in p_list] # 遍历每个子进程,获取对应子进程执行任务结果 response = [q_obj.get() for j in p_list] print(f"子进程执行的结果集为:{response}") """ 执行结果为: 父进程:14472-创建子进程10172,执行任务:1 父进程:14472-创建子进程13464,执行任务:0 父进程:14472-创建子进程1548,执行任务:3 父进程:14472-创建子进程10692,执行任务:4 父进程:14472-创建子进程13412,执行任务:2 子进程执行的结果集为:['任务:10172执行结果:1', '任务:13464执行结果:0', '任务:1548执行结果:3', '任务:10692执行结果:4', '任务:13412执行结果:2'] """
2、进程池 特点:同时开启指定数量的进程(一般CPU个数),并行执行任务,用于高计算,并行,有任务执行返回值
同步 任务结果顺序是按照提交任务结果的顺序,同步也就是按进程创建的顺序
import os import time from multiprocessing import Pool # 获取cpu个数 print(os.cpu_count()) def task(flag): print(f"进程:{os.getppid()},创建子进程:{os.getpid()},执行任务{flag}") time.sleep(2) return flag if __name__ == '__main__': start = time.time() pool = Pool(processes=os.cpu_count()) job_list = range(10) results = [] for i in job_list: # apply的方式 ret = pool.apply(task, (i,)) # 同步,进程池是有返回值的 # map的方式 # ret = p.map(task, (job,)) # 同步,一共执行10个任务 results.append(ret) pool.close() # 关闭 并不是进程池中的进程不工作了,而是关闭了进程池,让任务不能再继续提交了 pool.join() # 等待这个池中提交的任务都执行完,表示等待所有子进程中的代码都执行完 主进程才结束 print(f"apply-任务执行结果合集:{results}") ----> 任务执行结果合集:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] print(f"apply-总耗时:{time.time()-start}") ----> 总耗时:20.55962562561035 print(f"map-执行的结果为:{result_list}") ---> 执行的结果为:[[0], [1], [4], [9], [16], [25], [36], [49], [64], [81]] print(f"map-总耗时:{time.time() - start}") ---> 总耗时:20.336746215820312
异步 任务结果是按照子进程提交任务的顺序,结果顺序不可控,要求任务关联性不高
import os import time from multiprocessing import Pool # 获取cpu个数 print(os.cpu_count()) def task(flag): print(f"进程:{os.getppid()},创建子进程:{os.getpid()},执行任务{flag}") time.sleep(2) return flag if __name__ == '__main__': start = time.time() pool = Pool(processes=os.cpu_count()) job_list = range(10) results = [] for i in job_list: # apply_async方式 ret = pool.apply_async(task, (i,)) # 异步,一次并行执行池子里配置的数量的任务 # map_async方式 ret = p.map_async(task, (job,)) # 同步,一共执行5个任务 results.append(ret) pool.close() pool.join() ret = [job.get() for job in results] # 异步需要调用get方法 print(f"apply_async-任务执行结果合集:{ret}") ----> 异步时,任务执行结果合集:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] print(f"apply_async-总耗时:{time.time() - start}") ----> 异步时,总耗时:4.462151765823364 print(f"map_async-执行的结果为:{result_list}") ---> 执行的结果为:[[0], [1], [4], [9], [16], [25], [36], [49], [64], [81]] print(f"map_async-总耗时:{time.time() - start}") ---> 总耗时:4.353601694107056 # 在异步提交中,可以不用join(),主进程会执行完代码,但会等待进程池中的任务结束,才结束
多个不同的任务方法执行同一个数据源返回不同的结果
# 对同一份请求报文,执行不同的校验,返回对应的结果 import os from multiprocessing import Pool def task1(data): print(f"子进程:{os.getpid()},执行任务") print(data.get("num")) # 5 code = 1 if data["num"] > 10: code = 0 return dict(code=code) def task2(data): print(f"子进程:{os.getpid()},执行任务") code = 1 if data["num"] < 9: code = 0 return dict(code=code) def task3(data): print(f"子进程:{os.getpid()},执行任务") code = 1 if 2 < data["num"] < 9: code = 0 return dict(code=code) if __name__ == '__main__': pool = Pool(os.cpu_count()) jobs = [task1, task2, task3] results = [] data = {"num": 5} for i in jobs: # apply_async方式 ret = pool.apply_async(i, (data,)) results.append(ret) pool.close() pool.join() print(f"结果集为:{[j.get() for j in results]}") ---> 结果集为:[{'code': 1}, {'code': 0}, {'code': 0}] # map_async方式 pool = Pool(os.cpu_count()) jobs = [task1, task2, task3] results = [] data = {"num": 5} for i in jobs: ret = pool.map_async(job, (data,)) results.append(ret) pool.close() pool.join() print(f"结果集为:{[j.get() for j in results]}") ---> 结果集为:[[{'code': 1}], [{'code': 0}], [{'code': 0}]
多个数据源,执行相同的任务方法校验,发现不合格,就退出不再执行 只要得到自己想要的结果,就结束,节约资源
import os import time from queue import Queue from multiprocessing import Pool """不同的数据源,执行相同的任务""" def task(data): time.sleep(0.05) print(data) if data == 5: return False else: return True if __name__ == '__main__': pool = Pool(os.cpu_count()) q = Queue() data_list = range(1000) for i in data_list: ret = pool.apply_async(task, args=(i,)) q.put(ret) pool.close() while True: p_result_obj = q.get() flag = p_result_obj.get() print(flag) if flag is False: # 如果校验失败,则退出不再执行后面的 print("发现校验失败,结束进程池中的所有子进程") pool.terminate() break pool.join() print("执行后面的逻辑") # 升级版本:多进程+多线程 import os import time from queue import Queue from threading import Thread,get_ident from multiprocessing import Pool def task(data): time.sleep(0.05) print(f"主进程{os.getppid()},创建子进程{os.getpid()},执行任务{data}") print(data) if data == 5: return False else: return True def pool_th(data_list, q, pool): for i in data_list: # 创建多个子进程,异步只是将任务添加到队列,还没有执行完 q.put(pool.apply_async(task, args=(i,))) def result_th(q, p): while True: flag = q.get().get() # 获取子进程结果 print(f"在子线程{get_ident()}中获取子进程执行的结果进行判断:{flag}") if not flag: p.terminate() # 结束所有子进程 break if __name__ == '__main__': result_q = Queue() pool = Pool() data_list = range(1000) # 开启多线程 t1 = Thread(target=pool_th, args=(data_list, result_q, pool)) t2 = Thread(target=result_th, args=(result_q, pool)) t1.start() t2.start() t1.join() t2.join() pool.join() print("执行后面的逻辑") # 这里有个通病,那就是结束进程池子进程任务时,有的子进程已经执行了,因为每个子进程执行的结果插入顺序不是有序的,而我们需要的那个结果也许虽然按任务来说在前面,但它执行的时间并不一定在前面
回调函数-接收子进程返回值,实时处理 定义:将一个进程的执行结果的返回值,会当callback参数来执行配置的callback函数,从而减少获取子进程结果等I/O操作浪费的时间
作用:进程池中的任何一个任务一旦处理完了,就立即告知主进程,我已执行完毕,你可以处理我的结果了,主进程则调用一个函数【你配置的回调函数】去处理该结果。
注意:回调函数是没有返回值,所以回调函数一般可用于对子进程结果的判断后,然后写库等等操作
from multiprocessing import Pool import time import os def task(num): time.sleep(2) print(f"父进程{os.getppid()},子进程:{os.getpid()}执行任务{num}") if num > 3: res = {"job_num": num, "result": 0} else: res = {"job_num": num, "result": 1} return res def back(args): print(f"回调即时处理每个子进程返回的结果{args},其父进程为:{os.getpid()}") if args["result"] == 0: print("写库") else: print(f"记录日志,任务{args['job_num']},执行失败,结果为:{args['result']}") if __name__ == '__main__': print(f"当前进程:{os.getpid()},父:{os.getppid()}") jobs = range(10) p = Pool(os.cpu_count()) for job in jobs: p.apply_async(func=task, args=(job,), callback=back) p.close() p.join()
声明:本文来自网络,不代表【好得很程序员自学网】立场,转载请注明出处:http://haodehen.cn/did162765