前面我们已经通过多个线程下载csv数据并转换为xml文件。
在Python中由于全局解释器锁(GIL)的存在,多线程进行CPU密集型操作并不能提高执行效率,我们修改程序框架:
使用多个DownloadThread线程进行下载(I/O);
使用一个ConvertThread线程进行转换(CPU);
下载线程把下载数据安全地传递给转换线程。
要求:实现上面的程序框架。
解决方案:使用标准库中的 queue.Queue 类,它是一个线程安全的队列。Download线程把下载数据放入队列,Convert线程从队列里提取数据。
对于 queue.Queue 类:class?queue.Queue(maxsize=0)
FIFO队列类。maxsize是一个整数,它设置可以放置在队列中的项数的上限。一旦达到此大小,插入将阻塞,直到使用队列中的项。如果maxsize小于或等于零,则队列大小为无穷大。
queue.Queue 类有下面方法:
Queue.qsize()
返回队列的大致大小。注意,qsize()?>?0?不保证后续的get()不被阻塞,qsize()?<?maxsize?也不保证put()不被阻塞。
Queue.empty()
如果队列为空,返回True,否则返回False。如果empty()返回True,不保证后续调用的put()不被阻塞。类似的,如果empty()返回False?,也不保证后续调用的get()不被阻塞。
Queue.full()
如果队列是满的返回True,否则返回False。如果full()返回True不保证后续调用的get()不被阻塞。类似的,如果full()返回False也不保证后续调用的put()不被阻塞。
Queue.put(item, block=True, timeout=None)
将?item?放入队列。如果可选参数block是True并且timeout是None(默认),则在必要时阻塞至有空闲插槽可用。如果timeout是个正数,将最多阻塞timeout秒,如果在这段时间没有可用的空闲插槽,将引发Full异常。反之(block是false),如果空闲插槽立即可用,则把item放入队列,否则引发Full异常(在这种情况下,timeout将被忽略)。
Queue.put_nowait(item)
相当于put(item,?False)。
Queue.get(block=True, timeout=None)
从队列中移除并返回一个项目。如果可选参数block是True并且timeout是None(默认值),则在必要时阻塞至项目可得到。如果timeout是个正数,将最多阻塞timeout秒,如果在这段时间内项目不能得到,将引发Empty异常。反之(block是false),如果一个项目立即可得到,则返回一个项目,否则引发Empty异常(这种情况下,timeout将被忽略)。 POSIX系统3.0之前,以及所有版本的Windows系统中,如果block是True并且timeout是None,这个操作将进入基础锁的不间断等待。这意味着,没有异常能发生,尤其是SIGINT将不会触发KeyboardInterrupt异常。
Queue.get_nowait()
相当于get(False)。
Queue.task_done()
表示前面排队的任务已经被完成。被队列的消费者线程使用。每个get()被用于获取一个任务,后续调用task_done()告诉队列,该任务的处理已经完成。 如果join()当前正在阻塞,在所有条目都被处理后,将解除阻塞(意味着每个put()进队列的条目的task_done()都被收到)。 如果被调用的次数多于放入队列中的项目数量,将引发ValueError异常?。
Queue.join()
阻塞至队列中所有的元素都被接收和处理完毕。 当条目添加到队列的时候,未完成任务的计数就会增加。每当消费者线程调用task_done()表示这个条目已经被回收,该条目所有工作已经完成,未完成计数就会减少。当未完成计数降到零的时候,join()阻塞被解除。方案示例:
import?requestsimport?base64import?csvimport?timefrom?io?import?StringIOfrom?xml.etree.ElementTree?import?ElementTree,?Element,?SubElementfrom?threading?import?Threadfrom?queue?import?Queue USERNAME?=?b'7f304a2df40829cd4f1b17d10cda0304'PASSWORD?=?b'aff978c42479491f9541ace709081b99'class?DownloadThread(Thread): ????def?__init__(self,?page_number,?queue): ????????super().__init__() ????????self.page_number?=?page_number ????????self.queue?=?queue???? ????def?run(self): ????????#?IO ????????csv_file?=?None ????????while?not?csv_file: ????????????csv_file?=?self.download_csv(self.page_number) ????????self.queue.put((self.page_number,?csv_file))????????????????#存数据到队列中 ????def?download_csv(self,?page_number): ????????print('download?csv?data?[page=%s]'?%?page_number) ????????url?=?"https://api.intrinio测试数据/price.csv?ticker=AAPL&hide_paging=true&page_size=200&page_number=%s"?%?page_number ????????auth?=?b'Basic'?+?base64.b64encode(b'%s:%s'?%?(USERNAME,?PASSWORD)) ????????headers?=?{'Authorization'?:?auth} ????????response?=?requests.get(url,?headers=headers) ????????if?response.ok: ????????????return?StringIO(response.text)class?ConvertThread(Thread): ????def?__init__(self,?queue): ????????super().__init__() ????????self.queue?=?queue????def?run(self): ????????#?CPU ????????while?True: ????????????page_number,?csv_file?=?self.queue.get()????????????????#从队列中取出数据 ????????????self.csv_to_xml(csv_file,?'data%s.xml'?%?page_number) ????def?csv_to_xml(self,?csv_file,?xml_path): ????????print('Convert?csv?data?to?%s'?%?xml_path) ????????reader?=?csv.reader(csv_file) ????????headers?=?next(reader) ????????root?=?Element('Data') ????????root.text?=?'\n\t' ????????root.tail?=?'\n' ????????for?row?in?reader: ????????????book?=?SubElement(root,?'Row') ????????????book.text?=?'\n\t\t' ????????????book.tail?=?'\n\t' ????????????for?tag,?text?in?zip(headers,?row): ????????????????e?=?SubElement(book,?tag) ????????????????e.text?=?text ????????????????e.tail?=?'\n\t\t' ????????????e.tail?=?'\n\t' ????????book.tail?=?'\n' ????????ElementTree(root).write(xml_path,?encoding='utf8')if?__name__?==?'__main__': ????queue?=?Queue() ????t0?=?time.time() ????thread_list?=?[] ????for?i?in?range(1,?6): ????????t?=?DownloadThread(i,?queue) ????????t.start()???????????????#启动下载线程 ????????thread_list.append(t) ????convert_thread?=?ConvertThread(queue) ????convert_thread.start()??????????????#启动转换线程 ????for?t?in?thread_list: ????????t.join()????????????????#阻塞线程,主线程等待所有子线程结束 ????print(time.time()?-?t0) ????print('main?thread?end.')
上面url已失效,无法看到实际耗时效果。线程间的通信可以通过 queue.Queue 类创建线程安全的队列来完成。
声明:本文来自网络,不代表【好得很程序员自学网】立场,转载请注明出处:http://haodehen.cn/did126747