好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

40. 在线程间进行事件通知

在之前通过使用多个DownloadThread线程进行下载(I/O)及使用一个ConvertThread线程进行转换(CPU),我们达到了多线程下载csv数据并转换为xml文件的目的。但现在有额外的要求:

实现一个打包线程TarThread,将转换出的xml文件压缩打包。例如转换线程每生产出5个xml文件,就通知打包线程将它们打包成一个xxx.tgz文件,并删除xml文件。打包完成后,打包线程反过来通知转换进程,转换进程继续转换。

解决方案:

线程间的事件通知,可以使用标准库中 Threading.Event 类:

等待事件,一端调用 wait() 方法;

通知事件,一端调用 set() 方法。

对于 threading.Event 类:
class?threading.Event

实现事件对象的类。事件对象管理一个内部标志,调用 set() 方法可将其设置为true。调用 clear() 方法可将其设置为false。调用 wait() 方法将进入阻塞直到标志为true。这个标志初始时为false。

threading.Event 类有以下方法:

is_set()

当且仅当内部标志为True时返回True。

set()

将内部标志设置为True。所有正在等待这个事件的线程将被唤醒。当标志为True时,调用wait()方法的线程不会被被阻塞。

clear()

将内部标志设置为Talse。之后调用wait()方法的线程将会被阻塞,直到调用set()方法将内部标志再次设置为true。

wait(timeout=None)

阻塞线程直到内部变量为True。如果调用时内部标志为True,将立即返回。否则将阻塞线程,直到调用set()方法将标志设置为True或者发生可选的超时。

当提供了timeout参数且不是None时,它应该是一个浮点数,代表操作的超时时间,以秒为单位(可以为小数)。

当内部标志在调用wait进入阻塞后被设置为True,或者调用wait时已经被设置为True时,方法返回True。也就是说,除非设定了超时且发生了超时的情况下将会返回False,其他情况该方法都将返回True。

方案示例:
import?requestsimport?base64import?csvimport?timeimport?osimport?tarfilefrom?io?import?StringIOfrom?xml.etree.ElementTree?import?ElementTree,?Element,?SubElementfrom?threading?import?Thread,Eventfrom?queue?import?Queue

USERNAME?=?b'7f304a2df40829cd4f1b17d10cda0304'PASSWORD?=?b'aff978c42479491f9541ace709081b99'class?DownloadThread(Thread):
????def?__init__(self,?page_number,?queue):
????????super().__init__()
????????self.page_number?=?page_number
????????self.queue?=?queue????
????def?run(self):
????????#?IO
????????csv_file?=?None
????????while?not?csv_file:
????????????csv_file?=?self.download_csv(self.page_number)
????????self.queue.put((self.page_number,?csv_file))????????????????#存数据到队列中

????def?download_csv(self,?page_number):
????????print('download?csv?data?[page=%s]'?%?page_number)
????????url?=?"https://api.intrinio测试数据/price.csv?ticker=AAPL&hide_paging=true&page_size=200&page_number=%s"?%?page_number
????????auth?=?b'Basic'?+?base64.b64encode(b'%s:%s'?%?(USERNAME,?PASSWORD))
????????headers?=?{'Authorization'?:?auth}
????????response?=?requests.get(url,?headers=headers)

????????if?response.ok:
????????????return?StringIO(response.text)class?ConvertThread(Thread):
????def?__init__(self,?queue,?c_event,?t_event):
????????super().__init__()
????????self.queue?=?queue
????????self.c_event?=?c_event
????????self.t_event?=?t_event????def?run(self):
????????count?=?0
????????while?True:
????????????page_number,?csv_file?=?self.queue.get()????????????????#从队列中取出数据
????????????if?page_number?==?-1:
????????????????self.c_event.set()
????????????????self.c_event.wait()?????????????#当最后不足5个时也打包
????????????????break
????????????count?+=?1
????????????self.csv_to_xml(csv_file,?'data%s.xml'?%?page_number)

????????????if?count?==?5:
????????????????#?通知转换完成
????????????????self.c_event.set()

????????????????#等待打包完成
????????????????self.c_event.wait()
????????????????self.c_event.clear()
????????????????count?=?0

????def?csv_to_xml(self,?csv_file,?xml_path):
????????print('Convert?csv?data?to?%s'?%?xml_path)
????????reader?=?csv.reader(csv_file)
????????headers?=?next(reader)

????????root?=?Element('Data')
????????root.text?=?'\n\t'
????????root.tail?=?'\n'

????????for?row?in?reader:
????????????book?=?SubElement(root,?'Row')
????????????book.text?=?'\n\t\t'
????????????book.tail?=?'\n\t'

????????????for?tag,?text?in?zip(headers,?row):
????????????????e?=?SubElement(book,?tag)
????????????????e.text?=?text
????????????????e.tail?=?'\n\t\t'
????????????e.tail?=?'\n\t'
????????book.tail?=?'\n'

????????ElementTree(root).write(xml_path,?encoding='utf8')class?TarThread(Thread):
????def?__init__(self,?c_event,?t_event):
????????super().__init__(daemon=True)???????????????#守护线程
????????self.count?=?0
????????self.c_event?=?c_event
????????self.t_event?=?t_event????def?run(self):
????????while?True:
????????????#?等待转换完成
????????????self.c_event.wait()
????????????self.c_event.clear()

????????????#?打包
????????????self.tar_xml()

????????????#?通知打包完成
????????????self.c_event.set()

????def?tar_xml(self):
????????self.count?+=?1
????????tfname?=?'data%s.tgz'?%?self.count????????print('tar?%s...'?%?tfname)
????????tf?=?tarfile.open(tfname,?'w:gz')???????????????#tar压缩
????????for?fname?in?os.listdir('.'):
????????????if?fname.endswith('.xml'):
????????????????tf.add(fname)
????????????????os.remove(fname)
????????tf.close()

????????if?not?tf.members:
????????????os.remove(tfname)if?__name__?==?'__main__':
????queue?=?Queue()
????c_event?=?Event()
????t_event?=?Event()
????t0?=?time.time()
????thread_list?=?[]
????for?i?in?range(1,?11):
????????t?=?DownloadThread(i,?queue)
????????t.start()???????????????#启动下载线程
????????thread_list.append(t)

????convert_thread?=?ConvertThread(queue,?c_event,?t_event)
????convert_thread.start()??????????????#启动转换线程

????tar_thread?=?TarThread(c_event,?t_event)
????tar_thread.start()??????????????#启动打包线程

????for?t?in?thread_list:
????????t.join()????????????????#阻塞线程,主线程等待所有子线程结束

????#?通知Convert线程退出
????queue.put((-1,?None))???????????????#将page_number置为-1

????#?等待转换线程结束
????convert_thread.join()

????print(time.time()?-?t0)
????print('main?thread?end.')

在线程间进行事件通知,目的就是线程间同步。

查看更多关于40. 在线程间进行事件通知的详细内容...

  阅读:51次