1、 概念普及 2、进程&线程&协程 2.1、进程 函数的方式创建进程 面向对象的方式创建进程 守护进程 进程间通信 锁 队列 管道【很少使用】 2.2、线程 函数的方式创建: 面向对象的方式创建: 守护线程 全局解释器锁GIL: 2.3、协程 创建 猴子补丁 进程与线程的区别 线程与协程的区别
1、 概念普及 并发编程的应用:
网络应用:爬虫(直接应用并发编程)
网络架构django、flask、tornado 源码-并发编程
socket server 源码-并发编程
计算机操作系统发展史
手工操作-读穿孔的纸带、用户独占全机、cpu等待手工操作、cpu利用不充分
批处理-磁带存储(联机批处理{一边写入磁带,一边从磁带读出计算}、脱机批处理、只能运行一个程序,遇到IO就等待闲置
多道程序系统
同时执行多个任务,遇到io就切换,
即空间隔离(多个程序运行),时空复用(IO切换)的特点
分时系统
同时执行多个任务,没有遇到IO也切换,固定时间片到了就切
时间片轮转
多路性、交互性、独立性、及时性
切会浪费cpu时间,降低了cpu效率,但提高了用户体验。
实时系统
及时响应
高可靠性
时刻等待响应,即时处理
通用操作系统:
多道批处理
分时
实时
操作系统分类:
个人计算机操作系统
网络操作系统
分布式操作系统
操作系统的作用:
是一个协调、管理、和控制计算机硬件资源和软件资源的控制程序。【调度硬件资源、调用管理软件】
隐藏了丑陋的硬件调用接口,提供良好的抽象接口
管理、调度进程,并将多个进程对硬件的竞争变得有序
I/O操作:(相对内存来说的)
输入【读到内存中】:input、f.read、accept、recv、connect
输出【从内存读出】:print、f.write、send、connect
文件操作/网络操作都是IO操作
异步:两个任务同时运行(并行)
同步:多个任务 串行 【按顺序执行】
阻塞:等待 input、accept、recv
非阻塞:不等待,直接执行
并行:多个任务同时执行【多个CPU在同时执行任务】
并发:只有一个CPU,交替执行多个任务【宏观上同时执行,实际是轮转】,两则区别可参见:【转】编程概念篇-并发与并行
同步执行:一个进程在执行某个任务时,另外一个进程必须等待其执行完毕,才能继续执行
异步执行:一个进程在执行某个任务时,另外一个进程无需等待其执行完毕,就可以继续执行,当有消息返回时,系统会通知后者进行处理,这样可以提高执行效率
程序:没有运行
进程:运行中的程序、计算机中最小的资源分配单元
进程调度:
多个进程(运行中的程序)在操作系统的控制下被CPU执行,去享用计算机资源
先来先服务调度算法
短作业优先
时间片轮转
多级反馈队列
进程调度的过程是不能随意被程序影响得
进程三大状态
就绪 程序开始运行,进入就绪队列,等待cpu分配时间
运行 执行进程,单个时间片内运行完成,就释放资源,没有作业完,就又自动切换到就绪队列等待下次的调度
阻塞 执行中的进程遇到事件入IO等导致无法执行,进入阻塞状态,解除后进入就绪队列等待进程调度
进程与父进程
进程 PID 通过os.getpid() 可以得到
父进程 PPID 负责回收一些子进程的资源后才关闭 通过os.getppid() 2、进程&线程&协程 2.1、进程 定义:计算机最小的资源分配单元【一个软件正在进行的过程,ps -ef】特点:
进程与进程之间内存中的数据是隔离的
进程与进程之间不能自由的交换内存数据【内存空间是不能共享的】
全局变量在子进程中修改,其他进程是感知不到的【子进程的执行结果父进程是获取不到的】
进程间通信需要通过队列【管道+锁】或管道创建进程:
系统初始化
一个进程在运行过程中开启了子进程
用户的交互式请求,而创建一个新进程
一个批处理作业的初始化函数的方式创建进程
import os from multiprocessing import Process def task(num): print(f"父进程:{os.getppid()}-创建子进程{os.getpid()},执行任务:{num}") if __name__ == '__main__': print("当前环境进程:", os.getpid()) p = Process(group=None, target=task, args=("吃饭",), kwargs={}) p.start() p.join() """ 执行结果为: 当前环境进程:14596 父进程:14596-创建子进程14676,执行任务:吃饭 """ """ 进程创建语法参数解释: group 值默认始终为None target 表示调用对象 args 调用对象的位置参数 类型必须是元组,必须是元组,必须是元组! kwargs 调用对象得字典 name为子进程的名称 进程常见方法解释: p.start() : 启动进程,调用操作系统的命令,传达要创建进程的申请,并调用子进程中的p.run()方法,非阻塞 p.run(): 进程启动时运行的方法,其实真正去调用target指定的函数,自定义类,就需要自己定义run方法 p.terminate(): 强制终止进程p p.is_alive(): 查看进程状态值,True表示运行中 p.join(): 主进程等待子进程p运行结束后才运行 阻塞 """
面向对象的方式创建进程
import os from multiprocessing import Process class MyProcess(Process): """重点是继承Process类,及重写run方法""" def __init__(self, argument): super(MyProcess, self).__init__() self.name = argument def run(self) -> None: print(f"run:父进程:{os.getppid()}-创建子进程{os.getpid()}:任务名:{self.name}") self.task() def task(self): print(f"父进程:{os.getppid()}-创建子进程{os.getpid()},执行任务:{self.name}") if __name__ == '__main__': p = MyProcess("人生") p.start() p.join() """ 执行结果为: run:父进程:12864-创建子进程6352:任务名:人生 父进程:12864-创建子进程6352,执行任务:人生 """
守护进程 特点:守护进程的生命周期只和主进程的代码有关系,和其他子进程没有关系,会随着主进程代码执行结束而结束
作用:监控主进程生命状态
注意点:
主进程创建的守护进程
守护进程内无法创建子进程
守护进程属性,默认False
配置守护进程应该在主进程开子进程之前即p.start()之前
# 主进程代码执行完了,守护进程就不会再执行了,等其他的子进程结束了,主进程就结束了 import os import time from multiprocessing import Process def check_is_live(): while True: print(f"父进程:{os.getppid()}-创建守护子进程{os.getpid()},父进程还没有挂") time.sleep(0.5) print(f"父进程如果挂了,你是看不到我的") def task(num): print(f"父进程:{os.getppid()}-创建子进程{os.getpid()},执行任务:{num}") time.sleep(2) print(f"任务{num}结束") if __name__ == '__main__': print(f"当前进程为:{os.getppid()}") p = Process(target=check_is_live) p.daemon = True p.start() p_job = Process(target=task, args=("任务007",)) p_job.start() time.sleep(1) print(f"主进程{os.getppid()},执行结束") """ 执行结果: 当前进程为:11080 父进程:9720-创建守护子进程1916,父进程还没有挂 父进程:9720-创建子进程15356,执行任务:任务007 父进程如果挂了,你是看不到我的 父进程:9720-创建守护子进程1916,父进程还没有挂 主进程11080,执行结束 任务任务007结束 """
进程间通信
进程间数据交互,本质是socket通信,不过都是本地的,基于文件。
锁 作用:保证多个进程修改同一数据源时,同一时间只能有一个任务可以进行操作,保证安全性,损失效率
语法:
from multiprocessing import Lock lock=Lock() # 互斥锁 lock.acquire() # 获取锁,没归还前一直阻塞 lock.release() # 归还锁,其他进程可以继续作业
示例:
import json import time import random from multiprocessing import Process, Lock def query(user): with open('sku.txt', "r", encoding='utf-8') as f: sku_info_dict = json.load(f) time.sleep(random.random()) surplus = sku_info_dict.get('count') print(f"用户:{user}查看库存,还剩:{surplus}") return surplus, sku_info_dict def change(user): count, sku_info_dict = query(user) if int(count) < 1: print("库存已不足,请通知管理员补货") else: with open('sku.txt', 'w', encoding='utf-8') as f: sku_info_dict["count"] -= 1 json.dump(sku_info_dict, f) time.sleep(random.random()) print(f"{user}购买成功") def task(u, l): l.acquire() change(u) l.release() if __name__ == '__main__': sku_ = {"count": 3} f = open("sku.txt", "w") json.dump(sku_, f) f.flush() f.close() lock = Lock() jobs = [] for i in range(5): p = Process(target=task, args=(i, lock)) p.start() jobs.append(p) for j in jobs: j.join() """
执行结果为: 用户:1查看库存,还剩:3 1购买成功 用户:0查看库存,还剩:2 0购买成功 用户:2查看库存,还剩:1 2购买成功 用户:3查看库存,还剩:0 库存已不足,请通知管理员补货 用户:4查看库存,还剩:0 库存已不足,请通知管理员补货 """ # 如果不加锁,就很可能所有人都得到通知买到了,但实际只有3张票,而5个人买,实际有2个人应该买不到的
队列
特点:先进先出
本质:管道+锁
语法:
from multiprocessing import Queue q = Queue() q.get() # 从前到后获取队列中的数据,没有数据则一直阻塞等待数据的放入 q.put() # 向队列中放入数据
示例:
from multiprocessing import Queue, Process def get(q): while True: ret = q.get() print(f"父进程:{os.getppid()}-创建子进程{os.getpid()},获取的数据为:{ret}") def put(q, num): print(f"父进程:{os.getppid()}-创建子进程{os.getpid()},向队列中投入:{num}") q.put(num) if __name__ == '__main__': queue = Queue() x = 10000 queue.put(x) print(f"主进程{os.getppid()}向队列中投入:{x}") Process(target=get, args=(queue,), name="专门获取的任务").start() for i in range(3): # 一直放 Process(target=put, args=(queue, random.randint(1, 10)), name="专门放数据的任务").start() """ 执行结果为: 主进程11080向队列中投入:10000 父进程:16356-创建子进程16080,获取的数据为:10000 父进程:16356-创建子进程9496,向队列中投入:2 父进程:16356-创建子进程16100,向队列中投入:4 父进程:16356-创建子进程8724,向队列中投入:3 父进程:16356-创建子进程16080,获取的数据为:2 父进程:16356-创建子进程16080,获取的数据为:4 父进程:16356-创建子进程16080,获取的数据为:3 一直等待中…… """
管道【很少使用】
特点:左放右收或左收右放,全双攻模式
注意点:
进程间通信,数据不安全,队列安全
需要在创建子进程前创建
from multiprocessing import Process, Pipe def consumer(left, right): # 如果消费者右边接数据,那么把左边关闭 left.close() while True: try: print(f"接收到数据{right.recv()}") except EOFError: break if __name__ == '__main__': left, right = Pipe() Process(target=consumer, args=(left, right)).start() right.close() # 主进程准备通过左边发数据,那么右边就必须关闭 left.send(6666) left.close() # 不用就关闭
2.2、线程
定义:CPU调度的最小单位
特点:
依赖于进程
独立调度和分派的基本单位(线程间切换快,开销小)
共享进程资源,同一进程内线程间可通信
并发执行常用方法:
t.is_alive()或者t.isAlive() # 查看主线程是否存活 t.getName() # 放回线程名如: Thread-1 t.setName() # 设置线程名 t.start() # 创建线程,调用run方法,运行 t.join() # 主线程等待t线程执行结束后才继续,阻塞 threading.currentTread() # 返回当前的线程变量,即对象:<_MainThread(MainThread, started 4644)> threading.enumerate() # 返回正在运行的线程对象列表,列表中线程对象肯定包括主线程+开启的子线程 threading.activeCount() # 返回正在运行的线程数,即 len(threading.enumerate() )
函数的方式创建:
import os from threading import Thread, get_ident def task(i): print(f"进程{os.getpid()},创建线程:{get_ident()},执行任务:{i}") if __name__ == '__main__': start = time.time() print(f"当前进程为:{os.getpid()},父进程为:{os.getppid()}") t=Thread(target=task, args=(j,)) t.start() t.join()
面向对象的方式创建:
import os from threading import Thread, get_ident class MyThread(Thread): def __init__(self, i): super(MyThread, self).__init__() self.i = i def run(self) -> None: self.task() def task(self): print(f"进程{os.getpid()},创建线程:{get_ident()},执行任务:{self.i}") if __name__ == '__main__': t = MyThread("test") t.start() t.join()
示例:线程间数据共享,可修改全局变量,不加锁也不安全
import os from threading import Thread, get_ident total = 100 def task(): global total total -= 1 print(f"进程{os.getpid()},创建线程:{get_ident()},执行任务完后total为:{total}") if __name__ == '__main__': jobs = [] for i in range(100): t = Thread(target=task) t.start() jobs.append(t) for j in jobs: j.join() print(f"执行完所有任务后total为:{total}") ----> 0
守护线程
特点:随主线程运行结束而结束,不同于守护进程,随主进程代码结束而结束
# 守护线程等待主线程结束而结束 # 主线程结束,必须等待所有非守护线程结束 # 主线程结束了,进程就结束了,即进程必须保证所有非守护线程结束才行 import os import time from threading import Thread, get_ident def daemons(): """守护线程的任务应该配置为不重要的,因为其可能没有执行,服务就完毕了""" print(f"进程{os.getpid()},创建守护线程:{get_ident()}") time.sleep(5) print("守护结束") def task(): time.sleep(2) print(f"进程{os.getpid()},创建子线程:{get_ident()}") if __name__ == '__main__': t_d = Thread(target=daemons) t_d.setDaemon(True) t_d.start() t = Thread(target=task) t.start() # t.join() print(f"主进程{os.getpid()},执行完毕") """ 进程13500,创建守护线程:15928 主进程13500,执行完毕 进程13500,创建子线程:3208 子线程结束了,主线程就结束,不去看守护线程 """
全局解释器锁GIL:
同一个进程里的每一个线程同一时间只能有一个线程访问cpu,锁的是线程,而不是具体的内存,但并不妨碍多个进程同时访问数据,这将导致访问同一数据不安全现象
队列和栈
from queue import Queue # 队列:先进先出 from queue import LifoQueue # 栈:先进后出
定时器
from threading import Timer def task(): print("执行任务") # 创建线程后,多久执行 t = Timer(1, task).start()
2.3、协程
定义:用户态的轻量级线程,由用户创造与控制协程间的切换
特点:
依赖线程
可以无限执行,充分利用cpu,尽量规避了IO,其遇到io就自动切换到另一个线程
开销比线程还小基于yield或yield from创建
def t1(): yield 1 # yield from 后必须是生成器,python3.3才开始使用 yield from t2() yield 2 def t2(): yield 3 yield 4 ret = t1() for i in ret: print(i)
基于gevent创建
pip install gevent import gevent def task(*args, **kwargs): print(f"协程执行任务:{args}|{kwargs}") return kwargs.get("order") g1 = gevent.spawn(task, 1, 2, 3, order=123456) g2 = gevent.spawn(task, 4, order=88888) # g1.join() # g2.join() gevent.joinall([g1, g2]) # 获取任务执行的结果 print(g1.value)
猴子补丁 解决非gevent识别的io阻塞,放在顶行
from gevent import monkey monkey.patch_all()
进程与线程的区别
线程依赖于进程,进程负责获取资源,线程负责执行任务
每个进程中至少有一个线程
多进程之间内存相互隔离,多线程之间共享同一进程数据
进程不能自由通信,需借助管道,队列等工具,线程可以自由通信
多进程开销大,多线程开销小线程与协程的区别 1.线程的切换由操作系统控制调度,协程由用户【开发者】自行控制调度
2.线程开销比协程大,协程通过gevent可实现切换无开销
3.线程依赖于进程,协程依赖于单线程
4.线程协程都无法利用多核