1、 概念普及 2、进程&线程&协程 2.1、进程 函数的方式创建进程 面向对象的方式创建进程 守护进程 进程间通信 锁 队列 管道【很少使用】 2.2、线程 函数的方式创建: 面向对象的方式创建: 守护线程 全局解释器锁GIL: 2.3、协程 创建 猴子补丁 进程与线程的区别 线程与协程的区别
1、 概念普及 并发编程的应用:
网络应用:爬虫(直接应用并发编程)
网络架构django、flask、tornado 源码-并发编程
socket server 源码-并发编程
计算机操作系统发展史
手工操作-读穿孔的纸带、用户独占全机、cpu等待手工操作、cpu利用不充分
批处理-磁带存储(联机批处理{一边写入磁带,一边从磁带读出计算}、脱机批处理、只能运行一个程序,遇到IO就等待闲置
多道程序系统
同时执行多个任务,遇到io就切换,
即空间隔离(多个程序运行),时空复用(IO切换)的特点
分时系统
同时执行多个任务,没有遇到IO也切换,固定时间片到了就切
时间片轮转
多路性、交互性、独立性、及时性
切会浪费cpu时间,降低了cpu效率,但提高了用户体验。
实时系统
及时响应
高可靠性
时刻等待响应,即时处理
通用操作系统:
多道批处理
分时
实时
操作系统分类:
个人计算机操作系统
网络操作系统
分布式操作系统
操作系统的作用:
是一个协调、管理、和控制计算机硬件资源和软件资源的控制程序。【调度硬件资源、调用管理软件】
隐藏了丑陋的硬件调用接口,提供良好的抽象接口
管理、调度进程,并将多个进程对硬件的竞争变得有序
I/O操作:(相对内存来说的)
输入【读到内存中】:input、f.read、accept、recv、connect
输出【从内存读出】:print、f.write、send、connect
文件操作/网络操作都是IO操作
异步:两个任务同时运行(并行)
同步:多个任务 串行 【按顺序执行】
阻塞:等待 input、accept、recv
非阻塞:不等待,直接执行
并行:多个任务同时执行【多个CPU在同时执行任务】
并发:只有一个CPU,交替执行多个任务【宏观上同时执行,实际是轮转】,两则区别可参见:【转】编程概念篇-并发与并行
同步执行:一个进程在执行某个任务时,另外一个进程必须等待其执行完毕,才能继续执行
异步执行:一个进程在执行某个任务时,另外一个进程无需等待其执行完毕,就可以继续执行,当有消息返回时,系统会通知后者进行处理,这样可以提高执行效率
程序:没有运行
进程:运行中的程序、计算机中最小的资源分配单元
进程调度:
多个进程(运行中的程序)在操作系统的控制下被CPU执行,去享用计算机资源
先来先服务调度算法
短作业优先
时间片轮转
多级反馈队列
进程调度的过程是不能随意被程序影响得
进程三大状态
就绪 程序开始运行,进入就绪队列,等待cpu分配时间
运行 执行进程,单个时间片内运行完成,就释放资源,没有作业完,就又自动切换到就绪队列等待下次的调度
阻塞 执行中的进程遇到事件入IO等导致无法执行,进入阻塞状态,解除后进入就绪队列等待进程调度
进程与父进程
进程 PID 通过os.getpid() 可以得到
父进程 PPID 负责回收一些子进程的资源后才关闭 通过os.getppid() 2、进程&线程&协程 2.1、进程 定义:计算机最小的资源分配单元【一个软件正在进行的过程,ps -ef】特点:
进程与进程之间内存中的数据是隔离的
进程与进程之间不能自由的交换内存数据【内存空间是不能共享的】
全局变量在子进程中修改,其他进程是感知不到的【子进程的执行结果父进程是获取不到的】
进程间通信需要通过队列【管道+锁】或管道创建进程:
系统初始化
一个进程在运行过程中开启了子进程
用户的交互式请求,而创建一个新进程
一个批处理作业的初始化函数的方式创建进程
import os
from multiprocessing import Process
def task(num):
print(f"父进程:{os.getppid()}-创建子进程{os.getpid()},执行任务:{num}")
if __name__ == '__main__':
print("当前环境进程:", os.getpid())
p = Process(group=None, target=task, args=("吃饭",), kwargs={})
p.start()
p.join()
"""
执行结果为:
当前环境进程:14596
父进程:14596-创建子进程14676,执行任务:吃饭
"""
"""
进程创建语法参数解释:
group 值默认始终为None
target 表示调用对象
args 调用对象的位置参数 类型必须是元组,必须是元组,必须是元组!
kwargs 调用对象得字典
name为子进程的名称
进程常见方法解释:
p.start() : 启动进程,调用操作系统的命令,传达要创建进程的申请,并调用子进程中的p.run()方法,非阻塞
p.run(): 进程启动时运行的方法,其实真正去调用target指定的函数,自定义类,就需要自己定义run方法
p.terminate(): 强制终止进程p
p.is_alive(): 查看进程状态值,True表示运行中
p.join(): 主进程等待子进程p运行结束后才运行 阻塞
"""
面向对象的方式创建进程
import os
from multiprocessing import Process
class MyProcess(Process):
"""重点是继承Process类,及重写run方法"""
def __init__(self, argument):
super(MyProcess, self).__init__()
self.name = argument
def run(self) -> None:
print(f"run:父进程:{os.getppid()}-创建子进程{os.getpid()}:任务名:{self.name}")
self.task()
def task(self):
print(f"父进程:{os.getppid()}-创建子进程{os.getpid()},执行任务:{self.name}")
if __name__ == '__main__':
p = MyProcess("人生")
p.start()
p.join()
"""
执行结果为:
run:父进程:12864-创建子进程6352:任务名:人生
父进程:12864-创建子进程6352,执行任务:人生
"""
守护进程 特点:守护进程的生命周期只和主进程的代码有关系,和其他子进程没有关系,会随着主进程代码执行结束而结束
作用:监控主进程生命状态
注意点:
主进程创建的守护进程
守护进程内无法创建子进程
守护进程属性,默认False
配置守护进程应该在主进程开子进程之前即p.start()之前
# 主进程代码执行完了,守护进程就不会再执行了,等其他的子进程结束了,主进程就结束了
import os
import time
from multiprocessing import Process
def check_is_live():
while True:
print(f"父进程:{os.getppid()}-创建守护子进程{os.getpid()},父进程还没有挂")
time.sleep(0.5)
print(f"父进程如果挂了,你是看不到我的")
def task(num):
print(f"父进程:{os.getppid()}-创建子进程{os.getpid()},执行任务:{num}")
time.sleep(2)
print(f"任务{num}结束")
if __name__ == '__main__':
print(f"当前进程为:{os.getppid()}")
p = Process(target=check_is_live)
p.daemon = True
p.start()
p_job = Process(target=task, args=("任务007",))
p_job.start()
time.sleep(1)
print(f"主进程{os.getppid()},执行结束")
"""
执行结果:
当前进程为:11080
父进程:9720-创建守护子进程1916,父进程还没有挂
父进程:9720-创建子进程15356,执行任务:任务007
父进程如果挂了,你是看不到我的
父进程:9720-创建守护子进程1916,父进程还没有挂
主进程11080,执行结束
任务任务007结束
"""
进程间通信
进程间数据交互,本质是socket通信,不过都是本地的,基于文件。
锁 作用:保证多个进程修改同一数据源时,同一时间只能有一个任务可以进行操作,保证安全性,损失效率
语法:
from multiprocessing import Lock lock=Lock() # 互斥锁 lock.acquire() # 获取锁,没归还前一直阻塞 lock.release() # 归还锁,其他进程可以继续作业
示例:
import json
import time
import random
from multiprocessing import Process, Lock
def query(user):
with open('sku.txt', "r", encoding='utf-8') as f:
sku_info_dict = json.load(f)
time.sleep(random.random())
surplus = sku_info_dict.get('count')
print(f"用户:{user}查看库存,还剩:{surplus}")
return surplus, sku_info_dict
def change(user):
count, sku_info_dict = query(user)
if int(count) < 1:
print("库存已不足,请通知管理员补货")
else:
with open('sku.txt', 'w', encoding='utf-8') as f:
sku_info_dict["count"] -= 1
json.dump(sku_info_dict, f)
time.sleep(random.random())
print(f"{user}购买成功")
def task(u, l):
l.acquire()
change(u)
l.release()
if __name__ == '__main__':
sku_ = {"count": 3}
f = open("sku.txt", "w")
json.dump(sku_, f)
f.flush()
f.close()
lock = Lock()
jobs = []
for i in range(5):
p = Process(target=task, args=(i, lock))
p.start()
jobs.append(p)
for j in jobs:
j.join()
"""
执行结果为: 用户:1查看库存,还剩:3 1购买成功 用户:0查看库存,还剩:2 0购买成功 用户:2查看库存,还剩:1 2购买成功 用户:3查看库存,还剩:0 库存已不足,请通知管理员补货 用户:4查看库存,还剩:0 库存已不足,请通知管理员补货 """ # 如果不加锁,就很可能所有人都得到通知买到了,但实际只有3张票,而5个人买,实际有2个人应该买不到的
队列
特点:先进先出
本质:管道+锁
语法:
from multiprocessing import Queue q = Queue() q.get() # 从前到后获取队列中的数据,没有数据则一直阻塞等待数据的放入 q.put() # 向队列中放入数据
示例:
from multiprocessing import Queue, Process
def get(q):
while True:
ret = q.get()
print(f"父进程:{os.getppid()}-创建子进程{os.getpid()},获取的数据为:{ret}")
def put(q, num):
print(f"父进程:{os.getppid()}-创建子进程{os.getpid()},向队列中投入:{num}")
q.put(num)
if __name__ == '__main__':
queue = Queue()
x = 10000
queue.put(x)
print(f"主进程{os.getppid()}向队列中投入:{x}")
Process(target=get, args=(queue,), name="专门获取的任务").start()
for i in range(3):
# 一直放
Process(target=put, args=(queue, random.randint(1, 10)), name="专门放数据的任务").start()
"""
执行结果为:
主进程11080向队列中投入:10000
父进程:16356-创建子进程16080,获取的数据为:10000
父进程:16356-创建子进程9496,向队列中投入:2
父进程:16356-创建子进程16100,向队列中投入:4
父进程:16356-创建子进程8724,向队列中投入:3
父进程:16356-创建子进程16080,获取的数据为:2
父进程:16356-创建子进程16080,获取的数据为:4
父进程:16356-创建子进程16080,获取的数据为:3
一直等待中……
"""
管道【很少使用】
特点:左放右收或左收右放,全双攻模式
注意点:
进程间通信,数据不安全,队列安全
需要在创建子进程前创建
from multiprocessing import Process, Pipe
def consumer(left, right):
# 如果消费者右边接数据,那么把左边关闭
left.close()
while True:
try:
print(f"接收到数据{right.recv()}")
except EOFError:
break
if __name__ == '__main__':
left, right = Pipe()
Process(target=consumer, args=(left, right)).start()
right.close() # 主进程准备通过左边发数据,那么右边就必须关闭
left.send(6666)
left.close() # 不用就关闭
2.2、线程
定义:CPU调度的最小单位
特点:
依赖于进程
独立调度和分派的基本单位(线程间切换快,开销小)
共享进程资源,同一进程内线程间可通信
并发执行常用方法:
t.is_alive()或者t.isAlive() # 查看主线程是否存活 t.getName() # 放回线程名如: Thread-1 t.setName() # 设置线程名 t.start() # 创建线程,调用run方法,运行 t.join() # 主线程等待t线程执行结束后才继续,阻塞 threading.currentTread() # 返回当前的线程变量,即对象:<_MainThread(MainThread, started 4644)> threading.enumerate() # 返回正在运行的线程对象列表,列表中线程对象肯定包括主线程+开启的子线程 threading.activeCount() # 返回正在运行的线程数,即 len(threading.enumerate() )
函数的方式创建:
import os
from threading import Thread, get_ident
def task(i):
print(f"进程{os.getpid()},创建线程:{get_ident()},执行任务:{i}")
if __name__ == '__main__':
start = time.time()
print(f"当前进程为:{os.getpid()},父进程为:{os.getppid()}")
t=Thread(target=task, args=(j,))
t.start()
t.join()
面向对象的方式创建:
import os
from threading import Thread, get_ident
class MyThread(Thread):
def __init__(self, i):
super(MyThread, self).__init__()
self.i = i
def run(self) -> None:
self.task()
def task(self):
print(f"进程{os.getpid()},创建线程:{get_ident()},执行任务:{self.i}")
if __name__ == '__main__':
t = MyThread("test")
t.start()
t.join()
示例:线程间数据共享,可修改全局变量,不加锁也不安全
import os
from threading import Thread, get_ident
total = 100
def task():
global total
total -= 1
print(f"进程{os.getpid()},创建线程:{get_ident()},执行任务完后total为:{total}")
if __name__ == '__main__':
jobs = []
for i in range(100):
t = Thread(target=task)
t.start()
jobs.append(t)
for j in jobs:
j.join()
print(f"执行完所有任务后total为:{total}") ----> 0
守护线程
特点:随主线程运行结束而结束,不同于守护进程,随主进程代码结束而结束
# 守护线程等待主线程结束而结束
# 主线程结束,必须等待所有非守护线程结束
# 主线程结束了,进程就结束了,即进程必须保证所有非守护线程结束才行
import os
import time
from threading import Thread, get_ident
def daemons():
"""守护线程的任务应该配置为不重要的,因为其可能没有执行,服务就完毕了"""
print(f"进程{os.getpid()},创建守护线程:{get_ident()}")
time.sleep(5)
print("守护结束")
def task():
time.sleep(2)
print(f"进程{os.getpid()},创建子线程:{get_ident()}")
if __name__ == '__main__':
t_d = Thread(target=daemons)
t_d.setDaemon(True)
t_d.start()
t = Thread(target=task)
t.start()
# t.join()
print(f"主进程{os.getpid()},执行完毕")
"""
进程13500,创建守护线程:15928
主进程13500,执行完毕
进程13500,创建子线程:3208 子线程结束了,主线程就结束,不去看守护线程
"""
全局解释器锁GIL:
同一个进程里的每一个线程同一时间只能有一个线程访问cpu,锁的是线程,而不是具体的内存,但并不妨碍多个进程同时访问数据,这将导致访问同一数据不安全现象
队列和栈
from queue import Queue # 队列:先进先出 from queue import LifoQueue # 栈:先进后出
定时器
from threading import Timer
def task():
print("执行任务")
# 创建线程后,多久执行
t = Timer(1, task).start()
2.3、协程
定义:用户态的轻量级线程,由用户创造与控制协程间的切换
特点:
依赖线程
可以无限执行,充分利用cpu,尽量规避了IO,其遇到io就自动切换到另一个线程
开销比线程还小基于yield或yield from创建
def t1():
yield 1
# yield from 后必须是生成器,python3.3才开始使用
yield from t2()
yield 2
def t2():
yield 3
yield 4
ret = t1()
for i in ret:
print(i)
基于gevent创建
pip install gevent
import gevent
def task(*args, **kwargs):
print(f"协程执行任务:{args}|{kwargs}")
return kwargs.get("order")
g1 = gevent.spawn(task, 1, 2, 3, order=123456)
g2 = gevent.spawn(task, 4, order=88888)
# g1.join()
# g2.join()
gevent.joinall([g1, g2])
# 获取任务执行的结果
print(g1.value)
猴子补丁 解决非gevent识别的io阻塞,放在顶行
from gevent import monkey monkey.patch_all()
进程与线程的区别
线程依赖于进程,进程负责获取资源,线程负责执行任务
每个进程中至少有一个线程
多进程之间内存相互隔离,多线程之间共享同一进程数据
进程不能自由通信,需借助管道,队列等工具,线程可以自由通信
多进程开销大,多线程开销小线程与协程的区别 1.线程的切换由操作系统控制调度,协程由用户【开发者】自行控制调度
2.线程开销比协程大,协程通过gevent可实现切换无开销
3.线程依赖于进程,协程依赖于单线程
4.线程协程都无法利用多核