转自:http://lesliezhu.github.io/public/2015-04-20-python-multi-process-thread.html
目录
1 GIL(Global Interpretor Lock,全局解释器锁)
see:
如果其他条件不变,Python程序的执行速度直接与解释器的“速度”相关。不管你怎样优化自己的程序,你的程序的执行速度还是依赖于解释器执行你的程序的效率。
目前来说,多线程执行还是利用多核系统最常用的方式。尽管多线程编程大大好于“顺序”编程,不过即便是仔细的程序员也没法在代码中将并发性做到最好。
对于任何Python程序,不管有多少的处理器,任何时候都总是只有一个线程在执行。
事实上,这个问题被问得如此频繁以至于Python的专家们精心制作了一个标准答案:”不要使用多线程,请使用多进程。“但这个答案比那个问题更加让人困惑。
GIL对诸如当前线程状态和为垃圾回收而用的堆分配对象这样的东西的访问提供着保护。然而,这对Python语言来说没什么特殊的,它需要使用一个GIL。这是该实现的一种典型产物。现在也有其它的Python解释器(和编译器)并不使用GIL。虽然,对于CPython来说,自其出现以来已经有很多不使用GIL的解释器。
不管某一个人对Python的GIL感觉如何,它仍然是Python语言里最困难的技术挑战。想要理解它的实现需要对操作系统设计、多线程编程、C语言、解释器设计和CPython解释器的实现有着非常彻底的理解。单是这些所需准备的就妨碍了很多开发者去更彻底的研究GIL。
2 threading
threading
模块提供比/基于 thread
模块更高层次的接口;如果此模块由于 thread
丢失而无法使用,可以使用 dummy_threading
来代替。
CPython implementation detail: In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.
举例:
12345678910111213141516171819 import threading, zipfileclass AsyncZip(threading.Thread):def __init__(self, infile, outfile):threading.Thread.__init__(self)self.infile = infileself.outfile = outfiledef run(self):f = zipfile.ZipFile(self.outfile, 'w', zipfile.ZIP_DEFLATED)f.write(self.infile)f.close()print 'Finished background zip of: ', self.infilebackground = AsyncZip('mydata.txt', 'myarchive.zip')background.start()print 'The main program continues to run in foreground.'background.join() # Wait for the background task to finishprint 'Main program waited until background was done.'
2.1 创建线程
1234567891011 import threadingimport datetimeclass ThreadClass(threading.Thread):def run(self):now = datetime.datetime.now()print "%s says Hello World at time: %s" % (self.getName(), now)for i in range(2):t = ThreadClass()t.start()
2.2 使用线程队列
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 import Queueimport threadingimport urllib2import timefrom BeautifulSoup import BeautifulSouphosts = ["http://yahoo.com", "http://google.com", "http://amazon.com","http://ibm.com", "http://apple.com"]queue = Queue.Queue()out_queue = Queue.Queue()class ThreadUrl(threading.Thread):"""Threaded Url Grab"""def __init__(self, queue, out_queue):threading.Thread.__init__(self)self.queue = queueself.out_queue = out_queuedef run(self):while True:#grabs host from queuehost = self.queue.get()#grabs urls of hosts and then grabs chunk of webpageurl = urllib2.urlopen(host)chunk = url.read()#place chunk into out queueself.out_queue.put(chunk)#signals to queue job is doneself.queue.task_done()class DatamineThread(threading.Thread):"""Threaded Url Grab"""def __init__(self, out_queue):threading.Thread.__init__(self)self.out_queue = out_queuedef run(self):while True:#grabs host from queuechunk = self.out_queue.get()#parse the chunksoup = BeautifulSoup(chunk)print soup.findAll(['title'])#signals to queue job is doneself.out_queue.task_done()start = time.time()def main():#spawn a pool of threads, and pass them queue instancefor i in range(5):t = ThreadUrl(queue, out_queue)t.setDaemon(True)t.start()#populate queue with datafor host in hosts:queue.put(host)for i in range(5):dt = DatamineThread(out_queue)dt.setDaemon(True)dt.start()#wait on the queue until everything has been processedqueue.join()out_queue.join()main()print "Elapsed Time: %s" % (time.time() - start)
3 dummy_threading(threading的备用方案)
dummy_threading
模块提供完全复制了threading模块的接口,如果无法使用thread,则可以用这个模块替代.
使用方法:
1234 try:import threading as _threadingexcept ImportError:import dummy_threading as _threading
4 thread
在Python3中叫 _thread
,应该尽量使用 threading
模块替代。
5 dummy_thread(thead的备用方案)
dummy_thread
模块提供完全复制了thread模块的接口,如果无法使用thread,则可以用这个模块替代.
在Python3中叫 _dummy_thread
, 使用方法:
1234 try:import thread as _threadexcept ImportError:import dummy_thread as _thread
最好使用 dummy_threading
来代替.
6 multiprocessing(基于thread接口的多进程)
see:
使用 multiprocessing
模块创建子进程而不是线程来克服GIL引起的问题.
举例:
12345678 from multiprocessing import Pooldef f(x):return x*xif __name__ == '__main__':p = Pool(5)print(p.map(f, [1, 2, 3]))
6.1 Process类
创建进程是使用Process类:
123456789 from multiprocessing import Processdef f(name):print 'hello', nameif __name__ == '__main__':p = Process(target=f, args=('bob',))p.start()p.join()
6.2 进程间通信
Queue
方式:
1234567891011 from multiprocessing import Process, Queuedef f(q):q.put([42, None, 'hello'])if __name__ == '__main__':q = Queue()p = Process(target=f, args=(q,))p.start()print q.get() # prints "[42, None, 'hello']"p.join()
Pipe
方式:
1234567891011 from multiprocessing import Process, Pipedef f(conn):conn.send([42, None, 'hello'])conn.close()if __name__ == '__main__':parent_conn, child_conn = Pipe()p = Process(target=f, args=(child_conn,))p.start()print parent_conn.recv() # prints "[42, None, 'hello']"
6.3 同步
添加锁:
123456789101112 from multiprocessing import Process, Lockdef f(l, i):l.acquire()print 'hello world', il.release()if __name__ == '__main__':lock = Lock()for num in range(10):Process(target=f, args=(lock, num)).start()
6.4 共享状态
应该尽量避免共享状态.
共享内存方式:
1234567891011121314151617 from multiprocessing import Process, Value, Arraydef f(n, a):n.value = 3.1415927for i in range(len(a)):a[i] = -a[i]if __name__ == '__main__':num = Value('d', 0.0)arr = Array('i', range(10))p = Process(target=f, args=(num, arr))p.start()p.join()print num.valueprint arr[:]
Server进程方式:
1234567891011121314151617181920 from multiprocessing import Process, Managerdef f(d, l):d[1] = '1'd['2'] = 2d[0.25] = Nonel.reverse()if __name__ == '__main__':manager = Manager()d = manager.dict()l = manager.list(range(10))p = Process(target=f, args=(d, l))p.start()p.join()print dprint l
第二种方式支持更多的数据类型,如list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value ,Array.
6.5 Pool类
通过Pool类可以建立进程池:
12345678910 from multiprocessing import Pooldef f(x):return x*xif __name__ == '__main__':pool = Pool(processes=4) # start 4 worker processesresult = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronouslyprint result.get(timeout=1) # prints "100" unless your computer is *very* slowprint pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
7 multiprocessing.dummy
在官方文档只有一句话:
multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module.
multiprocessing.dummy
是 multiprocessing 模块的完整克隆,唯一的不同在于 multiprocessing 作用于进程,而 dummy 模块作用于线程;- 可以针对 IO 密集型任务和 CPU 密集型任务来选择不同的库.
IO 密集型任务选择multiprocessing.dummy,CPU 密集型任务选择multiprocessing.
举例:
123456789101112131415161718192021222324252627282930313233 import urllib2from multiprocessing.dummy import Pool as ThreadPoolurls = ['http://www.python.org','http://www.python.org/about/','http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html','http://www.python.org/doc/','http://www.python.org/download/','http://www.python.org/getit/','http://www.python.org/community/','https://wiki.python.org/moin/','http://planet.python.org/','https://wiki.python.org/moin/LocalUserGroups','http://www.python.org/psf/','http://docs.python.org/devguide/','http://www.python.org/community/awards/'# etc..]# Make the Pool of workerspool = ThreadPool(4)# Open the urls in their own threads# and return the resultsresults = pool.map(urllib2.urlopen, urls)#close the pool and wait for the work to finishpool.close()pool.join()results = []for url in urls:result = urllib2.urlopen(url)results.append(result)
8 后记
- 如果选择多线程,则应该尽量使用
threading
模块,同时注意GIL的影响- 如果多线程没有必要,则使用多进程模块
multiprocessing
,此模块也通过multiprocessing.dummy
支持多线程.- 分析具体任务是I/O密集型,还是CPU密集型
9 资源
- https://docs.python.org/2/library/threading.html
- https://docs.python.org/2/library/thread.html#module-thread
- http://segmentfault.com/a/1190000000414339
- http://www.oschina.net/translate/pythons-hardest-problem
- http://www.w3cschool.cc/python/python-multithreading.html
- Python threads: communication and stopping
- Python – parallelizing CPU-bound tasks with multiprocessing
- Python Multithreading Tutorial: Concurrency and Parallelism
- An introduction to parallel programming–using Python’s multiprocessing module
- multiprocessing Basics
- Python多进程模块Multiprocessing介绍
- Multiprocessing vs Threading Python
- Parallelism in one line–A Better Model for Day to Day Threading Tasks
- 一行 Python 实现并行化 – 日常多线程操作的新思路
- 使用 Python 进行线程编程