好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

38. 使用多线程

例如,我们通过 https://intrinio测试数据/tutorial/web_api 这个网址提供的api获取股市信息的csv数据,现在要下载大量csv数据文件,并将其转换为xml文件。

要求:使用多线程来提高下载并处理的效率。

解决方案:

使用标准库 threading.Thread 类创建多个线程,在每个线程中下载并转换一只csv数据。

对于 threading.Thread 类:
class?threading.Thread(group=None,?target=None,?name=None,?args=(),?kwargs={},?*,?daemon=None)

调用该类时,必需带有关键字参数。参数如下:

group应该为None;为了日后扩展ThreadGroup类实现而保留。

target是用于run()方法调用的可调用对象。默认是None,表示不需要调用任何方法。

name是线程名称。默认情况下,由"Thread-N"?格式构成一个唯一的名称,其中N是小的十进制数。

args是用于调用目标函数的参数元组。默认是()。

kwargs是用于调用目标函数的关键字参数字典。默认是{}。

如果daemon不是None,线程将被显式的设置为守护模式,不管该线程是否是守护模式。如果是None(默认值),线程将继承当前线程的守护模式属性。

threading.Thread 类有以下方法:

start()

开始线程活动。

它在一个线程里最多只能被调用一次。它安排对象的run()方法在一个独立的控制进程中调用。如果同一个线程对象中调用这个方法的次数大于一次,会抛出RuntimeError。

run()

代表线程活动的方法。

你可以在子类型里重载这个方法。?标准的run()方法会对作为target参数传递给该对象构造器的可调用对象(如果存在)发起调用,并附带从args和kwargs参数分别获取的位置和关键字参数。

join(timeout=None)

等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用join()的线程终结,不管是正常终结还是抛出未处理异常,或者直到发生超时,超时选项是可选的。

当timeout参数存在而且不是None时,它应该是一个用于指定操作超时的以秒为单位的浮点数(或者分数)。因为join()总是返回None,所以你一定要在join()后调用is_alive()才能判断是否发生超时,如果线程仍然存货,则join()超时。

当timeout参数不存在或者是None?,这个操作会阻塞直到线程终结。一个线程可以被join()很多次。

如果尝试加入当前线程会导致死锁,join()会引起RuntimeError异常。如果尝试join()一个尚未开始的线程,也会抛出相同的异常。

方案示例:
import?requestsimport?base64import?csvimport?timefrom?io?import?StringIOfrom?xml.etree.ElementTree?import?ElementTree,?Element,?SubElement

USERNAME?=?b'7f304a2df40829cd4f1b17d10cda0304'PASSWORD?=?b'aff978c42479491f9541ace709081b99'def?download_csv(page_number):
????print('download?csv?data?[page=%s]'?%?page_number)
????url?=?"https://api.intrinio测试数据/price.csv?ticker=AAPL&hide_paging=true&page_size=200&page_number=%s"?%?page_number
????auth?=?b'Basic'?+?base64.b64encode(b'%s:%s'?%?(USERNAME,?PASSWORD))
????headers?=?{'Authorization'?:?auth}
????response?=?requests.get(url,?headers=headers)

????if?response.ok:
????????return?StringIO(response.text)def?csv_to_xml(csv_file,?xml_path):
????print('Convert?csv?data?to?%s'?%?xml_path)
????reader?=?csv.reader(csv_file)
????headers?=?next(reader)

????root?=?Element('Data')
????root.text?=?'\n\t'
????root.tail?=?'\n'

????for?row?in?reader:
????????book?=?SubElement(root,?'Row')
????????book.text?=?'\n\t\t'
????????book.tail?=?'\n\t'

????????for?tag,?text?in?zip(headers,?row):
????????????e?=?SubElement(book,?tag)
????????????e.text?=?text
????????????e.tail?=?'\n\t\t'
????????e.tail?=?'\n\t'
????book.tail?=?'\n'

????ElementTree(root).write(xml_path,?encoding='utf8')def?download_and_save(page_number,?xml_path):
????#?IO
????csv_file?=?None
????while?not?csv_file:
????????csv_file?=?download_csv(page_number)
????#?CPU
????csv_to_xml(csv_file,?'data%s.xml'?%?page_number)if?__name__?==?'__main__':
????t0?=?time.time()
????for?i?in?range(1,?6):
????????download_and_save(i,?'data%s.xml'?%?i)
????print(time.time()?-?t0)
????print('main?thread?end.')

改进:使用 threading.Thread 类创建多个线程,同时下载转换。

import?requestsimport?base64import?csvimport?timefrom?io?import?StringIOfrom?xml.etree.ElementTree?import?ElementTree,?Element,?SubElementfrom?threading?import?Thread

USERNAME?=?b'7f304a2df40829cd4f1b17d10cda0304'PASSWORD?=?b'aff978c42479491f9541ace709081b99'def?download_csv(page_number):
????print('download?csv?data?[page=%s]'?%?page_number)
????url?=?"https://api.intrinio测试数据/price.csv?ticker=AAPL&hide_paging=true&page_size=200&page_number=%s"?%?page_number
????auth?=?b'Basic'?+?base64.b64encode(b'%s:%s'?%?(USERNAME,?PASSWORD))
????headers?=?{'Authorization'?:?auth}
????response?=?requests.get(url,?headers=headers)

????if?response.ok:
????????return?StringIO(response.text)def?csv_to_xml(csv_file,?xml_path):
????print('Convert?csv?data?to?%s'?%?xml_path)
????reader?=?csv.reader(csv_file)
????headers?=?next(reader)

????root?=?Element('Data')
????root.text?=?'\n\t'
????root.tail?=?'\n'

????for?row?in?reader:
????????book?=?SubElement(root,?'Row')
????????book.text?=?'\n\t\t'
????????book.tail?=?'\n\t'

????????for?tag,?text?in?zip(headers,?row):
????????????e?=?SubElement(book,?tag)
????????????e.text?=?text
????????????e.tail?=?'\n\t\t'
????????e.tail?=?'\n\t'
????book.tail?=?'\n'

????ElementTree(root).write(xml_path,?encoding='utf8')def?download_and_save(page_number,?xml_path):
????#?IO
????csv_file?=?None
????while?not?csv_file:
????????csv_file?=?download_csv(page_number)
????#?CPU
????csv_to_xml(csv_file,?'data%s.xml'?%?page_number)class?MyThread(Thread):
????def?__init__(self,?page_number,?xml_path):
????????super().__init__()
????????self.page_number?=?page_number
????????self.xml_path?=?xml_path????
????def?run(self):??????????????#线程运行download_and_save()方法
????????download_and_save(self.page_number,?self.xml_path)if?__name__?==?'__main__':
????t0?=?time.time()
????thread_list?=?[]
????for?i?in?range(1,?6):
????????t?=?MyThread(i,?'data%s.xml'?%?i)
????????t.start()???????????????#启动线程
????????thread_list.append(t)

????for?t?in?thread_list:
????????t.join()????????????????#阻塞线程,主线程等待所有子线程结束

????print(time.time()?-?t0)
????print('main?thread?end.')

上面url已失效,无法看到实际耗时效果,但通过多个线程可以节约IO时间,Python中的多线程不适合处理CPU密集型的任务。

查看更多关于38. 使用多线程的详细内容...

  阅读:47次