在之前通过使用多个DownloadThread线程进行下载(I/O)及使用一个ConvertThread线程进行转换(CPU),我们达到了多线程下载csv数据并转换为xml文件的目的。但现在有额外的要求:
实现一个打包线程TarThread,将转换出的xml文件压缩打包。例如转换线程每生产出5个xml文件,就通知打包线程将它们打包成一个xxx.tgz文件,并删除xml文件。打包完成后,打包线程反过来通知转换进程,转换进程继续转换。
解决方案:
线程间的事件通知,可以使用标准库中 Threading.Event 类:
等待事件,一端调用 wait() 方法;
通知事件,一端调用 set() 方法。
对于 threading.Event 类:class?threading.Event
实现事件对象的类。事件对象管理一个内部标志,调用 set() 方法可将其设置为true。调用 clear() 方法可将其设置为false。调用 wait() 方法将进入阻塞直到标志为true。这个标志初始时为false。
threading.Event 类有以下方法:
is_set()
当且仅当内部标志为True时返回True。
set()
将内部标志设置为True。所有正在等待这个事件的线程将被唤醒。当标志为True时,调用wait()方法的线程不会被被阻塞。
clear()
将内部标志设置为Talse。之后调用wait()方法的线程将会被阻塞,直到调用set()方法将内部标志再次设置为true。
wait(timeout=None)
阻塞线程直到内部变量为True。如果调用时内部标志为True,将立即返回。否则将阻塞线程,直到调用set()方法将标志设置为True或者发生可选的超时。 当提供了timeout参数且不是None时,它应该是一个浮点数,代表操作的超时时间,以秒为单位(可以为小数)。 当内部标志在调用wait进入阻塞后被设置为True,或者调用wait时已经被设置为True时,方法返回True。也就是说,除非设定了超时且发生了超时的情况下将会返回False,其他情况该方法都将返回True。方案示例:
import?requestsimport?base64import?csvimport?timeimport?osimport?tarfilefrom?io?import?StringIOfrom?xml.etree.ElementTree?import?ElementTree,?Element,?SubElementfrom?threading?import?Thread,Eventfrom?queue?import?Queue USERNAME?=?b'7f304a2df40829cd4f1b17d10cda0304'PASSWORD?=?b'aff978c42479491f9541ace709081b99'class?DownloadThread(Thread): ????def?__init__(self,?page_number,?queue): ????????super().__init__() ????????self.page_number?=?page_number ????????self.queue?=?queue???? ????def?run(self): ????????#?IO ????????csv_file?=?None ????????while?not?csv_file: ????????????csv_file?=?self.download_csv(self.page_number) ????????self.queue.put((self.page_number,?csv_file))????????????????#存数据到队列中 ????def?download_csv(self,?page_number): ????????print('download?csv?data?[page=%s]'?%?page_number) ????????url?=?"https://api.intrinio测试数据/price.csv?ticker=AAPL&hide_paging=true&page_size=200&page_number=%s"?%?page_number ????????auth?=?b'Basic'?+?base64.b64encode(b'%s:%s'?%?(USERNAME,?PASSWORD)) ????????headers?=?{'Authorization'?:?auth} ????????response?=?requests.get(url,?headers=headers) ????????if?response.ok: ????????????return?StringIO(response.text)class?ConvertThread(Thread): ????def?__init__(self,?queue,?c_event,?t_event): ????????super().__init__() ????????self.queue?=?queue ????????self.c_event?=?c_event ????????self.t_event?=?t_event????def?run(self): ????????count?=?0 ????????while?True: ????????????page_number,?csv_file?=?self.queue.get()????????????????#从队列中取出数据 ????????????if?page_number?==?-1: ????????????????self.c_event.set() ????????????????self.c_event.wait()?????????????#当最后不足5个时也打包 ????????????????break ????????????count?+=?1 ????????????self.csv_to_xml(csv_file,?'data%s.xml'?%?page_number) ????????????if?count?==?5: ????????????????#?通知转换完成 ????????????????self.c_event.set() ????????????????#等待打包完成 ????????????????self.c_event.wait() ????????????????self.c_event.clear() ????????????????count?=?0 ????def?csv_to_xml(self,?csv_file,?xml_path): ????????print('Convert?csv?data?to?%s'?%?xml_path) ????????reader?=?csv.reader(csv_file) ????????headers?=?next(reader) ????????root?=?Element('Data') ????????root.text?=?'\n\t' ????????root.tail?=?'\n' ????????for?row?in?reader: ????????????book?=?SubElement(root,?'Row') ????????????book.text?=?'\n\t\t' ????????????book.tail?=?'\n\t' ????????????for?tag,?text?in?zip(headers,?row): ????????????????e?=?SubElement(book,?tag) ????????????????e.text?=?text ????????????????e.tail?=?'\n\t\t' ????????????e.tail?=?'\n\t' ????????book.tail?=?'\n' ????????ElementTree(root).write(xml_path,?encoding='utf8')class?TarThread(Thread): ????def?__init__(self,?c_event,?t_event): ????????super().__init__(daemon=True)???????????????#守护线程 ????????self.count?=?0 ????????self.c_event?=?c_event ????????self.t_event?=?t_event????def?run(self): ????????while?True: ????????????#?等待转换完成 ????????????self.c_event.wait() ????????????self.c_event.clear() ????????????#?打包 ????????????self.tar_xml() ????????????#?通知打包完成 ????????????self.c_event.set() ????def?tar_xml(self): ????????self.count?+=?1 ????????tfname?=?'data%s.tgz'?%?self.count????????print('tar?%s...'?%?tfname) ????????tf?=?tarfile.open(tfname,?'w:gz')???????????????#tar压缩 ????????for?fname?in?os.listdir('.'): ????????????if?fname.endswith('.xml'): ????????????????tf.add(fname) ????????????????os.remove(fname) ????????tf.close() ????????if?not?tf.members: ????????????os.remove(tfname)if?__name__?==?'__main__': ????queue?=?Queue() ????c_event?=?Event() ????t_event?=?Event() ????t0?=?time.time() ????thread_list?=?[] ????for?i?in?range(1,?11): ????????t?=?DownloadThread(i,?queue) ????????t.start()???????????????#启动下载线程 ????????thread_list.append(t) ????convert_thread?=?ConvertThread(queue,?c_event,?t_event) ????convert_thread.start()??????????????#启动转换线程 ????tar_thread?=?TarThread(c_event,?t_event) ????tar_thread.start()??????????????#启动打包线程 ????for?t?in?thread_list: ????????t.join()????????????????#阻塞线程,主线程等待所有子线程结束 ????#?通知Convert线程退出 ????queue.put((-1,?None))???????????????#将page_number置为-1 ????#?等待转换线程结束 ????convert_thread.join() ????print(time.time()?-?t0) ????print('main?thread?end.')
在线程间进行事件通知,目的就是线程间同步。
声明:本文来自网络,不代表【好得很程序员自学网】立场,转载请注明出处:http://haodehen.cn/did126746