笔者在工作中经常接到这样的需求:利用现有的 redis 快照生成一个新的 redis 实例,并提供新实例的域名解析;由于生成 redis 和域名解析记录都是耗时操作;因此,顺理成章的把它构造成一个简单的生产者 - 消费者模型:
生产者利用 redis 快照生成新的实例 将新实例 id 传入队列消费者接受队列消息中 id 并更新 dns 解析到新的实例上
代码如下:
# 省略具体的业务逻辑代码,主要实现生产者-消费者的逻辑
import json
import threading
import time
import queue
class Producer(threading.Thread):
def __init__(self, source_clusterid_queue, target_clusterid_queue):
threading.Thread.__init__(self)
self.source_clusterid_queue = source_clusterid_queue
self.target_clusterid_queue = target_clusterid_queue
def _restore_redis(self,clusterid):
time.sleep(10)
print(f"{clusterid} has restored")
def run(self):
source_clusterid = self.source_clusterid_queue.get()
self._restore_redis(source_clusterid)
self.target_clusterid_queue.put(source_clusterid)
class Consumer(threading.Thread):
def __init__(self, target_clusterid_queue):
threading.Thread.__init__(self)
self.clusterid_queue = target_clusterid_queue
def _dns_update(self,cluster_id):
time.sleep(2)
print(f"{cluster_id} has been consumed")
def run(self):
while True:
ClusterId = self.clusterid_queue.get()
self._dns_update(ClusterId)
if __name__=='__main__':
source_clusterid_queue = queue.Queue()
target_clusterid_queue = queue.Queue()
clusterid_list = ['a','b']
restore_th_list=[]
#启动生产者和消费者线程
for _ in clusterid_list:
dns_update_th = Consumer(target_clusterid_queue)
dns_update_th.start()
restore_th = Producer(source_clusterid_queue,target_clusterid_queue)
restore_th_list.append(restore_th)
restore_th.start()
for clusterid in clusterid_list:
source_clusterid_queue.put(clusterid)这里定义了两个队列,source_clusterid_queue接受外部元素,生产者获取该队列的中的元素,完成生产任务后,将clusterid 扔到 target_cluster_id_queue中,消费者从中获取元素进行消费
但是,运行该脚本你会发现,在消费完队列中的所有元素后,脚本会处于无限挂起的状态
原因在于消费者线程使用的 queue.get() 默认是一个阻塞方法,等到队列中的元素消费完毕,队列为空,消费者线程就会一直处于阻塞状态
阻塞改造
既然queue.get()会产生阻塞问题,那我们能否在消费完毕后,向队列中加入一个结束标记,消费者线程get()到该标记后主动退出? 于是有了如下改动:
# 仅显示改动部分
class Consumer(threading.Thread):
def __init__(self, target_clusterid_queue):
threading.Thread.__init__(self)
self.clusterid_queue = target_clusterid_queue
def _dns_update(self,cluster_id):
if cluster_id == '_finish':
pass
time.sleep(2)
print(f"{cluster_id} has consumed")
def run(self):
while True:
ClusterId = self.clusterid_queue.get()
self._dns_update(ClusterId)
if ClusterId == '_finish':
break
if __name__=='__main__':
source_clusterid_queue = queue.Queue()
target_clusterid_queue = queue.Queue()
clusterid_list = ['a','b']
restore_th_list=[]
#启动生产者和消费者线程
for _ in clusterid_list:
dns_update_th = Consumer(target_clusterid_queue)
dns_update_th.start()
restore_th = Producer(source_clusterid_queue,target_clusterid_queue)
restore_th_list.append(restore_th)
restore_th.start()
for clusterid in clusterid_list:
source_clusterid_queue.put(clusterid)
#向queue发送结束结束信号,解除消费者队列阻塞
for clusterid in clusterid_list:
target_clusterid_queue.put('_finish')如上所示,我们在主线程中主动向队列添加"_finish"标记;在消费者线程中判断元素值,如果等于"_finish"就跳出循环,如此,就能解决get()的阻塞问题
再次运行脚本,如你所愿,脚本确实不会挂起了,但是执行逻辑却与初衷背道而驰了,消费者线程提前消费了"_finish"标记,却没有消费真正的业务信息就草草结束了
join 和 task_done的引入
原因在于, 消费者消费队列元素需要一定的时间,但我们的主线程并没有等待消费者线程消费结束,就提前结束消费者线程了;解决措施显而易见,我们需要在执行逻辑中等待队列完成,才能发起结束标记;因此,引入 queue.join()和 queue.task_done() 就水到渠成了。
官方对于两者的定义: queue.join(): 阻塞当前队列,只有队列中的任务都完成后,才往下运行
queue.task_done(): 表明入队的任务已经完成,一般被消费者线程调用; 消费者线程完成任务后,向队列告知任务已经完成
再次做改动:
class Producer(threading.Thread):
def __init__(self, source_clusterid_queue, target_clusterid_queue):
threading.Thread.__init__(self)
self.source_clusterid_queue = source_clusterid_queue
self.target_clusterid_queue = target_clusterid_queue
def _restore_redis(self,clusterid):
time.sleep(10)
print(f"{clusterid} has restored")
def run(self):
source_clusterid = self.source_clusterid_queue.get()
self._restore_redis(source_clusterid)
self.target_clusterid_queue.put(source_clusterid)
# 告知队列任务已完成,解除阻塞
self.source_clusterid_queue.task_done()
class Consumer(threading.Thread):
def __init__(self, target_clusterid_queue):
threading.Thread.__init__(self)
self.clusterid_queue = target_clusterid_queue
def _dns_update(self,cluster_id):
if cluster_id == '_finish':
pass
time.sleep(2)
print(f"{cluster_id} has consumed")
def run(self):
while True:
ClusterId = self.clusterid_queue.get()
self._dns_update(ClusterId)
# 告知队列任务已完成,解除阻塞
self.clusterid_queue.task_done()
if ClusterId == '_finish':
break
if __name__=='__main__':
source_clusterid_queue = queue.Queue()
target_clusterid_queue = queue.Queue()
clusterid_list = ['a','b']
restore_th_list=[]
# 启动生产者和消费者线程
for _ in clusterid_list:
dns_update_th = Consumer(target_clusterid_queue)
dns_update_th.start()
restore_th = Producer(source_clusterid_queue,target_clusterid_queue)
restore_th_list.append(restore_th)
restore_th.start()
for clusterid in clusterid_list:
source_clusterid_queue.put(clusterid)
# 阻塞生产者队列,等待生产完成
source_clusterid_queue.join()
# 向queue发送结束结束信号,解除消费者队列阻塞
for clusterid in clusterid_list:
target_clusterid_queue.put('_finish')
#等待所有的消费完成
target_clusterid_queue.join()
print('All restore work done')如上所示,我们主线程中分别加入了两个队列的join()方法:
第一个join()方法的含义是等待生产线程生产完成,发送完所有的消息后,才开始向消费者发送结束消息标记;
第二个join()方法等待消费者线程结束后打印日志;
分别对应了消费者和生产者线程中的task_done()方法;在最后一个task_done()执行完后,join()就会解除阻塞,从而执行下面的逻辑,这样我们就完整的实现了消费者线程结束的功能
如图所示,消费者正确执行了所有的消费任务,才结束了脚本
查看更多关于queue.task_done()与 queue.join() 的相爱相杀的详细内容...